AmazonEMRでSparkを動かす

結構はまってしまったのでメモ

VPC内に作成したEMRでSparkを動かしました。サンプルはいろいろなところにそこそこあるのですが、どうもきちっと動くものがなく結構苦労してしまいました。

EMR

まずはVPC内にEMRを作成します。EMRのコンソール画面を開き、CreateClusterを押します。

VPC内に作成するためには上の方にあるGo to Advanced optionから進む必要があります。

VPCとサブネットを設定し、パーミッションなどを設定しクラスターを作成します。

計算プログラム

こんな感じでディレクトリ作成します

なおscala 2.10,jdk1.8を使用しています

s3にあらかじめbucketという名のbucketとその下にoutputディレクトリを作成し、EMRから触れるようにパーミッションを設定しておきます

├── build.sbt
├── project
│ └── assembly.sbt
├── src
│ ├── main
│  │ ├── java
│  │  ├── resources
│  │ └── scala
│  │ 		 └── sample
│ │ 				└── SparkPi.scala
│ └── test
│ 		├── resources
│ 		└── scala
└── target
build.sbt
name := "emrscala"

version := "0.0.1"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
	("org.apache.spark" %% "spark-sql" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-mllib" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-hive" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-sql" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-core" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog")
)
assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
SparkPi.scala
package sample
import scala.math.random

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._

object SparkPi {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[2]")
		val spark = new SparkContext(conf)
		val slices=2
		val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
		val count = spark.parallelize(1 until n, slices).map { i =>
			val x = random * 2 - 1
			val y = random * 2 - 1
			if (x*x + y*y < 1) 1 else 0
		}.reduce(_ + _)
		println("Pi is roughly " + 4.0 * count / n)
		
		val outputLocation = "s3n://bucket/output"
		val pi=4.0 * count / n
		val data=spark.makeRDD(Seq(pi))
		data.saveAsTextFile(outputLocation + "/pi")
	spark.stop()
	}
}
make
sbt assembly

できたjarファイルをS3にコピーします

実行

EMRのadd StepからCustom JARを選択

JARLocationに先ほどアップしたJarを選択

Argumentに

--verbose sample.SparkPi

こんな感じで実行

しばらくたつと s3://bucket/output/piいかに結果が格納されています。

AWS EMR でSparkRを使って見る

AWSEMRとは、SparkやらHiveやらそれら一式を簡単に使える様にしてくれている仕組みです。

ぽちぽちっとEMRでサーバを作成。

この間10分程度

SparkRでサンプルデータを解析してみます

こちらの内容をアレンジしてみました

http://engineer.recruit-lifestyle.co.jp/techblog/2015-08-19-sparkr/

データ取得

http://stat-computing.org/dataexpo/2009/the-data.html

こちらから2001、2、3のデータをダウンロード

$ wget http://stat-computing.org/dataexpo/2009/2001.csv.bz2

unzip

$ bunzip2 2001.csv.bz2

s3にアップロード

$ aws s3 cp 2001.csv s3://samplebucket/airline/

同様に2002,2003も繰り返す

Hive

$ hive
hive> add jar /usr/lib/hive/lib/hive-contrib.jar;
Added [/usr/lib/hive/lib/hive-contrib.jar] to class path
Added resources: [/usr/lib/hive/lib/hive-contrib.jar]
hive> create table airline(
		> Year STRING,
		> Month STRING,
		> DayofMonth STRING,
		> DayOfWeek STRING,
		> DepTime STRING,
		> CRSDepTime STRING,
		> ArrTime STRING,
		> CRSArrTime STRING,
		> UniqueCarrier STRING,
		> FlightNum STRING,
		> TailNum STRING,
		> ActualElapsedTime STRING,
		> CRSElapsedTime STRING,
		> AirTime STRING,
		> ArrDelay STRING,
		> DepDelay STRING,
		> Origin STRING,
		> Dest STRING,
		> Distance STRING,
		> TaxiIn STRING,
		> TaxiOut STRING,
		> Cancelled STRING,
		> CancellationCode STRING,
		> Diverted STRING,
		> CarrierDelay STRING,
		> WeatherDelay STRING,
		> NASDelay STRING,
		> SecurityDelay STRING,
		> LateAircraftDelay STRING
		> )
		> ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
		> LOCATION 's3://samplebucket/airline/' tblproperties ("skip.header.line.count"="1");
hive> select * from airline limit 1;
OK
2001	1	17	3	1806	1810	1931	1934	US	375	N700��	85	84	60	-3	-4	BWI	CLT	361	5	20	0	NA	0	NA	NA	NA	NA	NA

SparkR

$ sparkR
> install.packages("magrittr")
> library(magrittr)
> hiveContext <- sparkRHive.init(sc)
> airline<-sql(hiveContext,"select * from airline")
> class(airline)
[1] "DataFrame"
attr(,"package")
[1] "SparkR"
> airline %>%
+	 filter(airline$Origin == "JFK") %>%
+	 group_by(airline$Dest) %>%
+	 agg(count=n(airline$Dest)) %>%
+	 head
	Dest count																																		
1	IAH	1214
2	STL	2922
3	SNA	 805
4	MSP	1580
5	STT	1085
6	SAN	2723

こんな感じで簡単にできました