AmazonEMRでSparkを動かす

結構はまってしまったのでメモ

VPC内に作成したEMRでSparkを動かしました。サンプルはいろいろなところにそこそこあるのですが、どうもきちっと動くものがなく結構苦労してしまいました。

EMR

まずはVPC内にEMRを作成します。EMRのコンソール画面を開き、CreateClusterを押します。

VPC内に作成するためには上の方にあるGo to Advanced optionから進む必要があります。

VPCとサブネットを設定し、パーミッションなどを設定しクラスターを作成します。

計算プログラム

こんな感じでディレクトリ作成します

なおscala 2.10,jdk1.8を使用しています

s3にあらかじめbucketという名のbucketとその下にoutputディレクトリを作成し、EMRから触れるようにパーミッションを設定しておきます

├── build.sbt
├── project
│ └── assembly.sbt
├── src
│ ├── main
│  │ ├── java
│  │  ├── resources
│  │ └── scala
│  │ 		 └── sample
│ │ 				└── SparkPi.scala
│ └── test
│ 		├── resources
│ 		└── scala
└── target
build.sbt
name := "emrscala"

version := "0.0.1"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
	("org.apache.spark" %% "spark-sql" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-mllib" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-hive" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-sql" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog"),
	("org.apache.spark" %% "spark-core" % "1.3.1").
		exclude("org.mortbay.jetty", "servlet-api").
		exclude("com.google.guava","guava").
		exclude("org.apache.hadoop","hadoop-yarn-api").
		exclude("commons-beanutils", "commons-beanutils-core").
		exclude("commons-beanutils", "commons-beanutils").
		exclude("commons-collections", "commons-collections").
		exclude("commons-logging", "commons-logging").
		exclude("org.spark-project.spark", "unused"). 
		exclude("com.twitter", "parquet-encoding").
		exclude("com.twitter", "parquet-column").
		exclude("com.twitter", "parquet-hadoop-bundle").
		exclude("org.datanucleus", "datanucleus-api-jdo").
		exclude("org.datanucleus", "datanucleus-core").
		exclude("org.datanucleus", "datanucleus-rdbms").
		exclude("com.esotericsoftware.minlog", "minlog")
)
assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
SparkPi.scala
package sample
import scala.math.random

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._

object SparkPi {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[2]")
		val spark = new SparkContext(conf)
		val slices=2
		val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
		val count = spark.parallelize(1 until n, slices).map { i =>
			val x = random * 2 - 1
			val y = random * 2 - 1
			if (x*x + y*y < 1) 1 else 0
		}.reduce(_ + _)
		println("Pi is roughly " + 4.0 * count / n)
		
		val outputLocation = "s3n://bucket/output"
		val pi=4.0 * count / n
		val data=spark.makeRDD(Seq(pi))
		data.saveAsTextFile(outputLocation + "/pi")
	spark.stop()
	}
}
make
sbt assembly

できたjarファイルをS3にコピーします

実行

EMRのadd StepからCustom JARを選択

JARLocationに先ほどアップしたJarを選択

Argumentに

--verbose sample.SparkPi

こんな感じで実行

しばらくたつと s3://bucket/output/piいかに結果が格納されています。