こんな感じでディレクトリ作成します
1 2 3 4 5 6 7 8 9 | ├── bin ├── pom.xml └── src └── main └── java └── emrhadoop ├── WordCountMain.java ├── WordCountMapper.java └── WordCountReducer.java |
pom.xmlを作成します
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>jp.qri.emr</groupId> <artifactId>emrhive</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/content/repositories/releases/</url> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>1.2.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <configuration> <outputDirectory> ${project.build.directory} </outputDirectory> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.2</version> <configuration> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> |
eclipseで読み込めるようにします
1 | mvn eclipse:eclipse |
Javaファイルはこんな感じ
WordCountMain.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | package emrhadoop; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCountMain { /** * Jobを設定して実行する * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { System.out.println("Masterノード start"); // スレーブノードで実行するJobを設定する Job job = Job.getInstance(); job.setJarByClass(WordCountMain.class); job.setJobName("wordcount"); // Reducerへの出力キー、バリューの型を指定する job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Mapper、Reducerのクラスを指定する job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // もしReducerが必要なければ、このように指定する job.setNumReduceTasks(0); // データを読み込み、Mapperへ渡すデータ・フォーマットを指定する job.setInputFormatClass(TextInputFormat.class); // Reducerからデータを受け取り、出力を行う際のデータ・フォーマットを指定する job.setOutputFormatClass(TextOutputFormat.class); // 引数取得 // arg[0] は、CLIから実行した場合はメインコントローラークラス名が設定される場合もあるようだったので注意。 String inputPath = args[0]; System.out.println("arg 0 : " + inputPath); String outputPath = args[1]; System.out.println("arg 1 : " + outputPath); // 入力ファイル・出力ファイルのパスを設定 FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); // Job実行 boolean result = job.waitForCompletion(true); System.out.println("result : " + result); System.out.println("Masterノード end"); } } |
WordCountMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | package emrhadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * Mapper * * 継承の際のジェネリクス指定によって、mapメソッドの型を指定出来る * Mapper<入力キーの型, 入力値の型, 出力キーの型, 出力値の型> */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 初期化処理 */ @Override public void setup(Context context) throws IOException, InterruptedException { System.out.println("Mapper setup"); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 入力値を取り出す(1行データ) String line = value.toString(); // 単語に分解する StringTokenizer tokenizer = new StringTokenizer(line); IntWritable one = new IntWritable(1); Text word = new Text(); // 単語ごとに繰り返し while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); // 1単語ごとにReducerへ値を渡す。(単語, 集計数)。ここでは単純に1単語につき1を渡しているだけだが、Mapper側で一度集計してからReducerに渡してもいい。 context.write(word, one); } } /** * 終了処理 */ @Override public void cleanup(Context context) throws IOException,InterruptedException { System.out.println("Mapper cleanup"); } } |
WordCountReducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | package emrhadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * Reducer * * 継承の際のジェネリクス指定によって、reduceメソッドの型を指定出来る * Reducer<入力キーの型, 入力値の型, 出力キーの型, 出力値の型> */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // Mapperから渡された値を集計 int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 書き込み context.write(key, new IntWritable(sum)); } } |
Jar作成
1 | mvn package |
AWS Console
- まずEMRを作成します
- Cleate Clusterから Go to advanced optionsへ
- Hardware ConfigurationでEC2 instance typeを必要に応じ変更。m1.mediumが最安かな?
- VPCに対応したのでVPC内に作成したい場合にはここで選択
- その他、キーとかSecurityGroupなどを適宜設定します
- その後、作成したJarをS3へコピーしておきます
- WordCount用のファイルをS3へコピーします
1 | aws s3 cp input.txt s3://bucket/input/ |
Stepsから起動します
- StepTypeはCustomJARを選択
- JAR locationに先ほどコピーしたJarファイルのS3のLocationを入力
- Argumentsに
1 | emrhadoop.WordCountMain s3n://bucket/input/input.txt s3n://bucket/output |
ちなみにouputディレクトリをあらかじめ作成しておくとエラーになります