EMRでHadoopのJavaサンプル

こんな感じでディレクトリ作成します

├── bin
├── pom.xml
└── src
		└── main
				└── java
						└── emrhadoop
								├── WordCountMain.java
								├── WordCountMapper.java
								└── WordCountReducer.java

pom.xmlを作成します


	4.0.0
	jp.qri.emr
	emrhive
	1.0-SNAPSHOT
	jar
	
		UTF-8
	
	
		
			cloudera
			https://repository.cloudera.com/content/repositories/releases/
		
	
	
		
			junit
			junit
			4.12
		
		
			org.apache.hadoop
			hadoop-core
			1.2.1
		
	
	
		
						
								org.apache.maven.plugins
								maven-dependency-plugin
								
										
												${project.build.directory}
										
								
						
						
								maven-assembly-plugin
								
										
												jar-with-dependencies
										
								
						
						
								org.apache.maven.plugins
								maven-shade-plugin
								2.4.2
								
										
												reference.conf
										
								
								
										
												package
												
														shade
												
										
								
						
				
	

eclipseで読み込めるようにします

mvn eclipse:eclipse

Javaファイルはこんな感じ

WordCountMain.java
package emrhadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountMain {

		/**
		 * Jobを設定して実行する
		 * 
		 * @param args
		 * @throws Exception
		 */
		public static void main(String[] args) throws Exception {

				System.out.println("Masterノード start");

				// スレーブノードで実行するJobを設定する
				Job job = Job.getInstance();
				job.setJarByClass(WordCountMain.class);
				job.setJobName("wordcount");

				// Reducerへの出力キー、バリューの型を指定する
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(IntWritable.class);

				// Mapper、Reducerのクラスを指定する
				job.setMapperClass(WordCountMapper.class);
				job.setReducerClass(WordCountReducer.class);
				// もしReducerが必要なければ、このように指定する job.setNumReduceTasks(0);

				// データを読み込み、Mapperへ渡すデータ・フォーマットを指定する
				job.setInputFormatClass(TextInputFormat.class);
				// Reducerからデータを受け取り、出力を行う際のデータ・フォーマットを指定する
				job.setOutputFormatClass(TextOutputFormat.class);

				// 引数取得
				// arg[0] は、CLIから実行した場合はメインコントローラークラス名が設定される場合もあるようだったので注意。
				String inputPath = args[0];
				System.out.println("arg 0 : " + inputPath);
				String outputPath = args[1];
				System.out.println("arg 1 : " + outputPath);

				// 入力ファイル・出力ファイルのパスを設定
				FileInputFormat.setInputPaths(job, new Path(inputPath));
				FileOutputFormat.setOutputPath(job, new Path(outputPath));

				// Job実行
				boolean result = job.waitForCompletion(true);
				System.out.println("result : " + result);

				System.out.println("Masterノード end");
		}
}
WordCountMapper.java
package emrhadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper
 * 
 * 継承の際のジェネリクス指定によって、mapメソッドの型を指定出来る
 * Mapper<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
 */
public class WordCountMapper extends Mapper {

		/**
		 * 初期化処理
		 */
		@Override
		public void setup(Context context) throws IOException, InterruptedException {
				System.out.println("Mapper setup");
		}

		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

				// 入力値を取り出す(1行データ)
				String line = value.toString();

				// 単語に分解する
				StringTokenizer tokenizer = new StringTokenizer(line);

				IntWritable one = new IntWritable(1);
				Text word = new Text();

				// 単語ごとに繰り返し
				while (tokenizer.hasMoreTokens()) {
						word.set(tokenizer.nextToken());

						// 1単語ごとにReducerへ値を渡す。(単語, 集計数)。ここでは単純に1単語につき1を渡しているだけだが、Mapper側で一度集計してからReducerに渡してもいい。
						context.write(word, one);
				}
		}

		/**
		 * 終了処理
		 */
		@Override
		public void cleanup(Context context) throws IOException,InterruptedException {
				System.out.println("Mapper cleanup");
		}
}
WordCountReducer.java
package emrhadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reducer
 * 
 * 継承の際のジェネリクス指定によって、reduceメソッドの型を指定出来る
 * Reducer<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
 */
public class WordCountReducer extends Reducer {

		@Override
		public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

				// Mapperから渡された値を集計
				int sum = 0;
				for (IntWritable value : values) {
						sum += value.get();
				}

				// 書き込み
				context.write(key, new IntWritable(sum));
		}

}

Jar作成

mvn package

AWS Console

  • まずEMRを作成します
  • Cleate Clusterから Go to advanced optionsへ
  • Hardware ConfigurationでEC2 instance typeを必要に応じ変更。m1.mediumが最安かな?
  • VPCに対応したのでVPC内に作成したい場合にはここで選択
  • その他、キーとかSecurityGroupなどを適宜設定します
  •  その後、作成したJarをS3へコピーしておきます
  • WordCount用のファイルをS3へコピーします
aws s3 cp input.txt s3://bucket/input/

Stepsから起動します

  • StepTypeはCustomJARを選択
  • JAR locationに先ほどコピーしたJarファイルのS3のLocationを入力
  • Argumentsに
emrhadoop.WordCountMain s3n://bucket/input/input.txt s3n://bucket/output

ちなみにouputディレクトリをあらかじめ作成しておくとエラーになります