こんな感じでディレクトリ作成します
1 2 3 4 5 6 7 8 9  | ├── bin ├── pom.xml └── src  └── main  └── java  └── emrhadoop  ├── WordCountMain.java  ├── WordCountMapper.java  └── WordCountReducer.java  | 
pom.xmlを作成します
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69  | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>jp.qri.emr</groupId>  <artifactId>emrhive</artifactId>  <version>1.0-SNAPSHOT</version>  <packaging>jar</packaging>  <properties>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  </properties>  <repositories>  <repository>  <id>cloudera</id>  <url>https://repository.cloudera.com/content/repositories/releases/</url>  </repository>  </repositories>  <dependencies>  <dependency>  <groupId>junit</groupId>  <artifactId>junit</artifactId>  <version>4.12</version>  </dependency>  <dependency>  <groupId>org.apache.hadoop</groupId>  <artifactId>hadoop-core</artifactId>  <version>1.2.1</version>  </dependency>  </dependencies>  <build>  <plugins>  <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-dependency-plugin</artifactId>  <configuration>  <outputDirectory>  ${project.build.directory}  </outputDirectory>  </configuration>  </plugin>  <plugin>  <artifactId>maven-assembly-plugin</artifactId>  <configuration>  <descriptorRefs>  <descriptorRef>jar-with-dependencies</descriptorRef>  </descriptorRefs>  </configuration>  </plugin>  <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-shade-plugin</artifactId>  <version>2.4.2</version>  <configuration>  <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">  <resource>reference.conf</resource>  </transformer>  </configuration>  <executions>  <execution>  <phase>package</phase>  <goals>  <goal>shade</goal>  </goals>  </execution>  </executions>  </plugin>  </plugins>  </build> </project>  | 
eclipseで読み込めるようにします
1  | mvn eclipse:eclipse  | 
Javaファイルはこんな感じ
WordCountMain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60  | package emrhadoop; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCountMain {  /**  * Jobを設定して実行する  *   * @param args  * @throws Exception  */  public static void main(String[] args) throws Exception {  System.out.println("Masterノード start");  // スレーブノードで実行するJobを設定する  Job job = Job.getInstance();  job.setJarByClass(WordCountMain.class);  job.setJobName("wordcount");  // Reducerへの出力キー、バリューの型を指定する  job.setOutputKeyClass(Text.class);  job.setOutputValueClass(IntWritable.class);  // Mapper、Reducerのクラスを指定する  job.setMapperClass(WordCountMapper.class);  job.setReducerClass(WordCountReducer.class);  // もしReducerが必要なければ、このように指定する job.setNumReduceTasks(0);  // データを読み込み、Mapperへ渡すデータ・フォーマットを指定する  job.setInputFormatClass(TextInputFormat.class);  // Reducerからデータを受け取り、出力を行う際のデータ・フォーマットを指定する  job.setOutputFormatClass(TextOutputFormat.class);  // 引数取得  // arg[0] は、CLIから実行した場合はメインコントローラークラス名が設定される場合もあるようだったので注意。  String inputPath = args[0];  System.out.println("arg 0 : " + inputPath);  String outputPath = args[1];  System.out.println("arg 1 : " + outputPath);  // 入力ファイル・出力ファイルのパスを設定  FileInputFormat.setInputPaths(job, new Path(inputPath));  FileOutputFormat.setOutputPath(job, new Path(outputPath));  // Job実行  boolean result = job.waitForCompletion(true);  System.out.println("result : " + result);  System.out.println("Masterノード end");  } }  | 
WordCountMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55  | package emrhadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /**  * Mapper  *   * 継承の際のジェネリクス指定によって、mapメソッドの型を指定出来る  * Mapper<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>  */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {  /**  * 初期化処理  */  @Override  public void setup(Context context) throws IOException, InterruptedException {  System.out.println("Mapper setup");  }  @Override  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  // 入力値を取り出す(1行データ)  String line = value.toString();  // 単語に分解する  StringTokenizer tokenizer = new StringTokenizer(line);  IntWritable one = new IntWritable(1);  Text word = new Text();  // 単語ごとに繰り返し  while (tokenizer.hasMoreTokens()) {  word.set(tokenizer.nextToken());  // 1単語ごとにReducerへ値を渡す。(単語, 集計数)。ここでは単純に1単語につき1を渡しているだけだが、Mapper側で一度集計してからReducerに渡してもいい。  context.write(word, one);  }  }  /**  * 終了処理  */  @Override  public void cleanup(Context context) throws IOException,InterruptedException {  System.out.println("Mapper cleanup");  } }  | 
WordCountReducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  | package emrhadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /**  * Reducer  *   * 継承の際のジェネリクス指定によって、reduceメソッドの型を指定出来る  * Reducer<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>  */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {  @Override  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  // Mapperから渡された値を集計  int sum = 0;  for (IntWritable value : values) {  sum += value.get();  }  // 書き込み  context.write(key, new IntWritable(sum));  } }  | 
Jar作成
1  | mvn package  | 
AWS Console
- まずEMRを作成します
 - Cleate Clusterから Go to advanced optionsへ
 - Hardware ConfigurationでEC2 instance typeを必要に応じ変更。m1.mediumが最安かな?
 - VPCに対応したのでVPC内に作成したい場合にはここで選択
 - その他、キーとかSecurityGroupなどを適宜設定します
 - その後、作成したJarをS3へコピーしておきます
 - WordCount用のファイルをS3へコピーします
 
1  | aws s3 cp input.txt s3://bucket/input/  | 
Stepsから起動します
- StepTypeはCustomJARを選択
 - JAR locationに先ほどコピーしたJarファイルのS3のLocationを入力
 - Argumentsに
 
1  | emrhadoop.WordCountMain s3n://bucket/input/input.txt s3n://bucket/output  | 
ちなみにouputディレクトリをあらかじめ作成しておくとエラーになります