EMRのSparkでWordCount

BODY:

EMRではSparkでファイルを開く際には*が使えるみたいだ 

package sample
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object WordCount {
			def main(args: Array[String]) {
				println("wordcount,args="+args(0)+","+args(1))
			val conf = new SparkConf().setAppName("wordcount").setMaster("yarn-cluster")
			val sc = new SparkContext(conf)
			
				val textFile = sc.textFile(args(0))	// s3n://bucket/*gz
			val counts = textFile.flatMap(line => line.split(" "))
										 .map(word => (word, 1))
										 .reduceByKey(_ + _)
				println("counts="+counts.id+","+counts.name)
				println(counts.toDebugString)
			counts.saveAsTextFile(args(1))
			}
}

こんな感じのBOWを数えるスクリプトを作成

s3にはgzで固められたファイルがたくさんある場合には

spark-submit --deploy-mode cluster --class sample.WordCount s3://bucket/dir/wordcount.jar s3n://bucket/log/*.gz s3n://bucket/output

このような指定で起動すると全ファイルを解凍しながら計算し、outputへ結果を保存します

EMRでSparkSQLサンプル

SparkからHiveが使いづらいというか使えない?のでSparkSQLを使ってみました。

そこそこ試行錯誤する必要があったのでメモです。

データファイル

銘柄コード,日付,始値,高値,安値,終値,出来高

のフォーマットのファイルを用意しておきます。こんな感じ。

1301,2004-04-01,198,198,195,196,651000
1301,2004-04-02,194,196,194,196,490000
1301,2004-04-05,196,200,195,197,1478000
1301,2004-04-06,202,208,200,207,4324000

これをS3へアップしておきます

build.sbt

こんな感じで記述します。build assemblyでエラーが出るのでこんな記述にしています。

name := "spark_sample"

version := "1.0-SNAPSHOT"

scalaVersion := "2.11.7"

// additional libraries
libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
	"org.apache.spark" %% "spark-sql" % "1.5.2",
	"org.apache.spark" %% "spark-hive" % "1.5.2",
	"org.apache.spark" %% "spark-streaming" % "1.5.2",
	"org.apache.spark" %% "spark-streaming-kafka" % "1.5.2",
	"org.apache.spark" %% "spark-streaming-flume" % "1.5.2",
	"org.apache.spark" %% "spark-mllib" % "1.5.2",
	"org.apache.commons" % "commons-lang3" % "3.0",
	"org.eclipse.jetty"	% "jetty-client" % "8.1.14.v20131031",
	"com.typesafe.play" %% "play-json" % "2.3.10",
	"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.4",
	"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.3",
	"org.elasticsearch" % "elasticsearch-hadoop-mr" % "2.0.0.RC1",
	"net.sf.opencsv" % "opencsv" % "2.0",
	"com.twitter.elephantbird" % "elephant-bird" % "4.5",
	"com.twitter.elephantbird" % "elephant-bird-core" % "4.5",
	"com.hadoop.gplcompression" % "hadoop-lzo" % "0.4.17",
	"mysql" % "mysql-connector-java" % "5.1.31",
	"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M3",
	"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-M3",
	"com.github.scopt" %% "scopt" % "3.2.0",
	"org.scalatest" %% "scalatest" % "2.2.1" % "test",
	"com.holdenkarau" %% "spark-testing-base" %	"1.5.1_0.2.1",
	"org.apache.hive" % "hive-jdbc" % "1.2.1"
)

resolvers ++= Seq(
	"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
	"Spray Repository" at "http://repo.spray.cc/",
	"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
	"Akka Repository" at "http://repo.akka.io/releases/",
	"Twitter4J Repository" at "http://twitter4j.org/maven2/",
	"Apache HBase" at "https://repository.apache.org/content/repositories/releases",
	"Twitter Maven Repo" at "http://maven.twttr.com/",
	"scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
	"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
	"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
	"Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
	Resolver.sonatypeRepo("public")
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
	{
		case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
		case m if m.startsWith("META-INF") => MergeStrategy.discard
		case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
		case PathList("org", "apache", xs @ _*) => MergeStrategy.first
		case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
		case "about.html"	=> MergeStrategy.rename
		case "reference.conf" => MergeStrategy.concat
		case _ => MergeStrategy.first
	}
}

ちなみにproject/assembly.sbtはこれ

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

SqlSample.scala

http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-15-to-16

この辺りを参考に

package sample
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._
import org.apache.spark.api.java._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

object SqlSample {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("SparkSQL").setMaster("yarn-cluster")
		val sc = new SparkContext(conf)	

		val sqlContext = new org.apache.spark.sql.SQLContext(sc)
		// Import Row.
		import org.apache.spark.sql.Row;

		// Import Spark SQL data types
		import org.apache.spark.sql.types.{StructType,StructField,StringType};

		val histRDD = sc.textFile(args(0)).map(_.split(",")).
			map(p => Row(p(0), p(1),p(2),p(3),p(4),p(5),p(6)))
		val schemaString = "code date open high low close volume"
		val schema =
				StructType(
				schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))	
				
		// Apply the schema to the RDD.
		val histDataFrame = sqlContext.createDataFrame(histRDD, schema)
		// Register the DataFrames as a table.
		histDataFrame.registerTempTable("priceHist")
		
		// SQL statements can be run by using the sql methods provided by sqlContext.
		val results = sqlContext.sql("SELECT code,date,open FROM priceHist where code='6758'")

		val ary=results.map(_.getValuesMap[Any](List("code", "date","open"))).collect()

		val outputLocation = args(1) // s3n://bucket/
		val data=sc.makeRDD(ary)
		data.saveAsTextFile(outputLocation)

		sc.stop()
	}
}

build

$ sbt package

これで作成したJarを同じくS3へアップします

EMR

今までと同様にEMRを作成し、AddStepでSparkApplicationを追加します。Jarは先ほどアップしたものを指定します

Spark-submit options
--class sample.SqlSample
Arguments
s3n://bucket/output

ここには出力ファイルが入ります

じっこすればOutputにMapで表現されたデータが保存されます

EMRでSparkサンプル

emr-4.2.0をベースにAdvancedOptionでSpark1.5.2を追加しクラスターを作成しておきます

今回はPiをモンテカルロシミュレーションで計算するSpark付属のサンプルプログラムをちょっと改造して使用します

build.sbt

build.sbtはこんな感じ

name := "spark_sample"

version := "1.0-SNAPSHOT"

scalaVersion := "2.11.7"

// additional libraries
libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "1.5.2",
	"org.apache.spark" %% "spark-sql" % "1.5.2",
	"org.apache.spark" %% "spark-mllib" % "1.5.2"
)

SparkPi

SparkConfを作成する際のここがポイントです

		val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント
package sample

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.	See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.	You may obtain a copy of the License at
 *
 *		http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import scala.math.random

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント
		val spark = new SparkContext(conf)
		val slices = 2
		val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
		val count = spark.parallelize(1 until n, slices).map { i =>
			val x = random * 2 - 1
			val y = random * 2 - 1
			if (x * x + y * y < 1) 1 else 0
		}.reduce(_ + _)
		println("Pi is roughly " + 4.0 * count / n)

		val outputLocation = args(0) // s3n://bucket/

		val pi = 4.0 * count / n
		val data = spark.makeRDD(Seq(pi))

		println(pi)
		data.saveAsTextFile(outputLocation)
		spark.stop()
	}
}

ビルド

$ sbt packge

Jarファイルが作成されたらS3にアップしておきます

EMRでの実行

AWSコンソールからEMRで作成したクラスターを選択し、AddStepで先ほどアップしたJarファイルを指定し追加します

step typeにはSpark applicationを選択、

Spark-submit optionsに

--class sample.SparkPi --verbose

Argumentsに出力を保存するS3のロケーションを入れておきます。すでにフォルダがあるとエラーになるので注意

s3n://bucketname/output

実行後、出力先にファイルが作成されます

AmazonS3にjavaSDKを用いて文字列を書き込む

ポイントは2回InputStreamを作成することです。

http://stackoverflow.com/questions/8351886/amazons3-putobject-with-inputstream-length-example

package awssample;

import java.io.ByteArrayInputStream;

import java.io.InputStream;

import java.sql.SQLException;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.util.IOUtils;

public class S3Write {

	private static String endpoint = "https://s3-ap-northeast-1.amazonaws.com";

	public static void main(String[] args) throws SQLException {

		String s = "2011-01-01,9999,1,100,1.0,-1.0,0.5";
		// 認証オブジェクトを作成
		String accessKey = "xxxxxx";
		String accessSecretKey = "xxxxxxxx";
		AWSCredentials credentials = new BasicAWSCredentials(accessKey, accessSecretKey);

		// ConfigurationでTimeout時間を30秒に設定
		ClientConfiguration clientConfiguration = new ClientConfiguration();
		clientConfiguration.setConnectionTimeout(30000);

		// AmazonS3Clientをインスタンス化
		AmazonS3Client s3 = new AmazonS3Client(credentials, clientConfiguration);
		s3.setEndpoint(endpoint);
		try {
			InputStream is = new ByteArrayInputStream(s.getBytes("UTF-8"));
			byte[] contentBytes = IOUtils.toByteArray(is);
			Long contentLength = Long.valueOf(contentBytes.length);
			System.out.println("contentLength=" + contentLength + ",s=" + s);

			ObjectMetadata metadata = new ObjectMetadata();
			metadata.setContentLength(contentLength);
											 // isではなく new ByteStreamInputStreamでもう一度さくせいしたものを渡す
			s3.putObject(new PutObjectRequest("bucketname", "path/to/file.txt",
					new ByteArrayInputStream(s.getBytes("UTF-8")), metadata));

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

docker buildのエラー

dockerで今までうまくいっていたBuildが突然落ちるようになったりすることがあります

$ docker build .
..
Err http://archive.ubuntu.com/ubuntu/ trusty-security/main libnss3-nssdb all 2:3.19.2.1-0ubuntu0.14.04.1
404	Not Found [IP: 91.189.88.149 80]
Err http://archive.ubuntu.com/ubuntu/ trusty-security/main libnss3 amd64 2:3.19.2.1-0ubuntu0.14.04.1
404	Not Found [IP: 91.189.88.149 80]
Fetched 108 MB in 3min 14s (555 kB/s)
Unable to correct missing packages.
[91mE: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/main/n/nss/libnss3-nssdb_3.19.2.1-0ubuntu0.14.04.1_all.deb	404	Not Found [IP: 91.189.88.149 80]
E: Failed to fetch http://archive.ubuntu.com/ubuntu/pool/main/n/nss/libnss3_3.19.2.1-0ubuntu0.14.04.1_amd64.deb	404	Not Found [IP: 91.189.88.149 80]

こういう時には一度キャッシュをクリーンすれば治ります

$ docker --no-cache build .

EMRでHadoopのJavaサンプル

こんな感じでディレクトリ作成します

├── bin
├── pom.xml
└── src
		└── main
				└── java
						└── emrhadoop
								├── WordCountMain.java
								├── WordCountMapper.java
								└── WordCountReducer.java

pom.xmlを作成します


	4.0.0
	jp.qri.emr
	emrhive
	1.0-SNAPSHOT
	jar
	
		UTF-8
	
	
		
			cloudera
			https://repository.cloudera.com/content/repositories/releases/
		
	
	
		
			junit
			junit
			4.12
		
		
			org.apache.hadoop
			hadoop-core
			1.2.1
		
	
	
		
						
								org.apache.maven.plugins
								maven-dependency-plugin
								
										
												${project.build.directory}
										
								
						
						
								maven-assembly-plugin
								
										
												jar-with-dependencies
										
								
						
						
								org.apache.maven.plugins
								maven-shade-plugin
								2.4.2
								
										
												reference.conf
										
								
								
										
												package
												
														shade
												
										
								
						
				
	

eclipseで読み込めるようにします

mvn eclipse:eclipse

Javaファイルはこんな感じ

WordCountMain.java
package emrhadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountMain {

		/**
		 * Jobを設定して実行する
		 * 
		 * @param args
		 * @throws Exception
		 */
		public static void main(String[] args) throws Exception {

				System.out.println("Masterノード start");

				// スレーブノードで実行するJobを設定する
				Job job = Job.getInstance();
				job.setJarByClass(WordCountMain.class);
				job.setJobName("wordcount");

				// Reducerへの出力キー、バリューの型を指定する
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(IntWritable.class);

				// Mapper、Reducerのクラスを指定する
				job.setMapperClass(WordCountMapper.class);
				job.setReducerClass(WordCountReducer.class);
				// もしReducerが必要なければ、このように指定する job.setNumReduceTasks(0);

				// データを読み込み、Mapperへ渡すデータ・フォーマットを指定する
				job.setInputFormatClass(TextInputFormat.class);
				// Reducerからデータを受け取り、出力を行う際のデータ・フォーマットを指定する
				job.setOutputFormatClass(TextOutputFormat.class);

				// 引数取得
				// arg[0] は、CLIから実行した場合はメインコントローラークラス名が設定される場合もあるようだったので注意。
				String inputPath = args[0];
				System.out.println("arg 0 : " + inputPath);
				String outputPath = args[1];
				System.out.println("arg 1 : " + outputPath);

				// 入力ファイル・出力ファイルのパスを設定
				FileInputFormat.setInputPaths(job, new Path(inputPath));
				FileOutputFormat.setOutputPath(job, new Path(outputPath));

				// Job実行
				boolean result = job.waitForCompletion(true);
				System.out.println("result : " + result);

				System.out.println("Masterノード end");
		}
}
WordCountMapper.java
package emrhadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper
 * 
 * 継承の際のジェネリクス指定によって、mapメソッドの型を指定出来る
 * Mapper<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
 */
public class WordCountMapper extends Mapper {

		/**
		 * 初期化処理
		 */
		@Override
		public void setup(Context context) throws IOException, InterruptedException {
				System.out.println("Mapper setup");
		}

		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

				// 入力値を取り出す(1行データ)
				String line = value.toString();

				// 単語に分解する
				StringTokenizer tokenizer = new StringTokenizer(line);

				IntWritable one = new IntWritable(1);
				Text word = new Text();

				// 単語ごとに繰り返し
				while (tokenizer.hasMoreTokens()) {
						word.set(tokenizer.nextToken());

						// 1単語ごとにReducerへ値を渡す。(単語, 集計数)。ここでは単純に1単語につき1を渡しているだけだが、Mapper側で一度集計してからReducerに渡してもいい。
						context.write(word, one);
				}
		}

		/**
		 * 終了処理
		 */
		@Override
		public void cleanup(Context context) throws IOException,InterruptedException {
				System.out.println("Mapper cleanup");
		}
}
WordCountReducer.java
package emrhadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reducer
 * 
 * 継承の際のジェネリクス指定によって、reduceメソッドの型を指定出来る
 * Reducer<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
 */
public class WordCountReducer extends Reducer {

		@Override
		public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

				// Mapperから渡された値を集計
				int sum = 0;
				for (IntWritable value : values) {
						sum += value.get();
				}

				// 書き込み
				context.write(key, new IntWritable(sum));
		}

}

Jar作成

mvn package

AWS Console

  • まずEMRを作成します
  • Cleate Clusterから Go to advanced optionsへ
  • Hardware ConfigurationでEC2 instance typeを必要に応じ変更。m1.mediumが最安かな?
  • VPCに対応したのでVPC内に作成したい場合にはここで選択
  • その他、キーとかSecurityGroupなどを適宜設定します
  •  その後、作成したJarをS3へコピーしておきます
  • WordCount用のファイルをS3へコピーします
aws s3 cp input.txt s3://bucket/input/

Stepsから起動します

  • StepTypeはCustomJARを選択
  • JAR locationに先ほどコピーしたJarファイルのS3のLocationを入力
  • Argumentsに
emrhadoop.WordCountMain s3n://bucket/input/input.txt s3n://bucket/output

ちなみにouputディレクトリをあらかじめ作成しておくとエラーになります