こんな感じでディレクトリ作成します
├── bin ├── pom.xml └── src └── main └── java └── emrhadoop ├── WordCountMain.java ├── WordCountMapper.java └── WordCountReducer.java
pom.xmlを作成します
4.0.0 jp.qri.emr emrhive 1.0-SNAPSHOT jar UTF-8 cloudera https://repository.cloudera.com/content/repositories/releases/ junit junit 4.12 org.apache.hadoop hadoop-core 1.2.1 org.apache.maven.plugins maven-dependency-plugin ${project.build.directory} maven-assembly-plugin jar-with-dependencies org.apache.maven.plugins maven-shade-plugin 2.4.2 reference.conf package shade
eclipseで読み込めるようにします
mvn eclipse:eclipse
Javaファイルはこんな感じ
WordCountMain.java
package emrhadoop; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCountMain { /** * Jobを設定して実行する * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { System.out.println("Masterノード start"); // スレーブノードで実行するJobを設定する Job job = Job.getInstance(); job.setJarByClass(WordCountMain.class); job.setJobName("wordcount"); // Reducerへの出力キー、バリューの型を指定する job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Mapper、Reducerのクラスを指定する job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // もしReducerが必要なければ、このように指定する job.setNumReduceTasks(0); // データを読み込み、Mapperへ渡すデータ・フォーマットを指定する job.setInputFormatClass(TextInputFormat.class); // Reducerからデータを受け取り、出力を行う際のデータ・フォーマットを指定する job.setOutputFormatClass(TextOutputFormat.class); // 引数取得 // arg[0] は、CLIから実行した場合はメインコントローラークラス名が設定される場合もあるようだったので注意。 String inputPath = args[0]; System.out.println("arg 0 : " + inputPath); String outputPath = args[1]; System.out.println("arg 1 : " + outputPath); // 入力ファイル・出力ファイルのパスを設定 FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Job実行 boolean result = job.waitForCompletion(true); System.out.println("result : " + result); System.out.println("Masterノード end"); } }
WordCountMapper.java
package emrhadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * Mapper * * 継承の際のジェネリクス指定によって、mapメソッドの型を指定出来る * Mapper<入力キーの型, 入力値の型, 出力キーの型, 出力値の型> */ public class WordCountMapper extends Mapper{ /** * 初期化処理 */ @Override public void setup(Context context) throws IOException, InterruptedException { System.out.println("Mapper setup"); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 入力値を取り出す(1行データ) String line = value.toString(); // 単語に分解する StringTokenizer tokenizer = new StringTokenizer(line); IntWritable one = new IntWritable(1); Text word = new Text(); // 単語ごとに繰り返し while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); // 1単語ごとにReducerへ値を渡す。(単語, 集計数)。ここでは単純に1単語につき1を渡しているだけだが、Mapper側で一度集計してからReducerに渡してもいい。 context.write(word, one); } } /** * 終了処理 */ @Override public void cleanup(Context context) throws IOException,InterruptedException { System.out.println("Mapper cleanup"); } }
WordCountReducer.java
package emrhadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * Reducer * * 継承の際のジェネリクス指定によって、reduceメソッドの型を指定出来る * Reducer<入力キーの型, 入力値の型, 出力キーの型, 出力値の型> */ public class WordCountReducer extends Reducer{ @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // Mapperから渡された値を集計 int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 書き込み context.write(key, new IntWritable(sum)); } }
Jar作成
mvn package
AWS Console
- まずEMRを作成します
- Cleate Clusterから Go to advanced optionsへ
- Hardware ConfigurationでEC2 instance typeを必要に応じ変更。m1.mediumが最安かな?
- VPCに対応したのでVPC内に作成したい場合にはここで選択
- その他、キーとかSecurityGroupなどを適宜設定します
- その後、作成したJarをS3へコピーしておきます
- WordCount用のファイルをS3へコピーします
aws s3 cp input.txt s3://bucket/input/
Stepsから起動します
- StepTypeはCustomJARを選択
- JAR locationに先ほどコピーしたJarファイルのS3のLocationを入力
- Argumentsに
emrhadoop.WordCountMain s3n://bucket/input/input.txt s3n://bucket/output
ちなみにouputディレクトリをあらかじめ作成しておくとエラーになります