結構はまってしまったのでメモ
VPC内に作成したEMRでSparkを動かしました。サンプルはいろいろなところにそこそこあるのですが、どうもきちっと動くものがなく結構苦労してしまいました。
EMR
まずはVPC内にEMRを作成します。EMRのコンソール画面を開き、CreateClusterを押します。
VPC内に作成するためには上の方にあるGo to Advanced optionから進む必要があります。
VPCとサブネットを設定し、パーミッションなどを設定しクラスターを作成します。
計算プログラム
こんな感じでディレクトリ作成します
なおscala 2.10,jdk1.8を使用しています
s3にあらかじめbucketという名のbucketとその下にoutputディレクトリを作成し、EMRから触れるようにパーミッションを設定しておきます
├── build.sbt ├── project │ └── assembly.sbt ├── src │ ├── main │ │ ├── java │ │ ├── resources │ │ └── scala │ │ └── sample │ │ └── SparkPi.scala │ └── test │ ├── resources │ └── scala └── target
build.sbt
name := "emrscala" version := "0.0.1" scalaVersion := "2.10.5" libraryDependencies ++= Seq( ("org.apache.spark" %% "spark-sql" % "1.3.1"). exclude("org.mortbay.jetty", "servlet-api"). exclude("com.google.guava","guava"). exclude("org.apache.hadoop","hadoop-yarn-api"). exclude("commons-beanutils", "commons-beanutils-core"). exclude("commons-beanutils", "commons-beanutils"). exclude("commons-collections", "commons-collections"). exclude("commons-logging", "commons-logging"). exclude("org.spark-project.spark", "unused"). exclude("com.twitter", "parquet-encoding"). exclude("com.twitter", "parquet-column"). exclude("com.twitter", "parquet-hadoop-bundle"). exclude("org.datanucleus", "datanucleus-api-jdo"). exclude("org.datanucleus", "datanucleus-core"). exclude("org.datanucleus", "datanucleus-rdbms"). exclude("com.esotericsoftware.minlog", "minlog"), ("org.apache.spark" %% "spark-mllib" % "1.3.1"). exclude("org.mortbay.jetty", "servlet-api"). exclude("com.google.guava","guava"). exclude("org.apache.hadoop","hadoop-yarn-api"). exclude("commons-beanutils", "commons-beanutils-core"). exclude("commons-beanutils", "commons-beanutils"). exclude("commons-collections", "commons-collections"). exclude("commons-logging", "commons-logging"). exclude("org.spark-project.spark", "unused"). exclude("com.twitter", "parquet-encoding"). exclude("com.twitter", "parquet-column"). exclude("com.twitter", "parquet-hadoop-bundle"). exclude("org.datanucleus", "datanucleus-api-jdo"). exclude("org.datanucleus", "datanucleus-core"). exclude("org.datanucleus", "datanucleus-rdbms"). exclude("com.esotericsoftware.minlog", "minlog"), ("org.apache.spark" %% "spark-hive" % "1.3.1"). exclude("org.mortbay.jetty", "servlet-api"). exclude("com.google.guava","guava"). exclude("org.apache.hadoop","hadoop-yarn-api"). exclude("commons-beanutils", "commons-beanutils-core"). exclude("commons-beanutils", "commons-beanutils"). exclude("commons-collections", "commons-collections"). exclude("commons-logging", "commons-logging"). exclude("org.spark-project.spark", "unused"). exclude("com.twitter", "parquet-encoding"). exclude("com.twitter", "parquet-column"). exclude("com.twitter", "parquet-hadoop-bundle"). exclude("org.datanucleus", "datanucleus-api-jdo"). exclude("org.datanucleus", "datanucleus-core"). exclude("org.datanucleus", "datanucleus-rdbms"). exclude("com.esotericsoftware.minlog", "minlog"), ("org.apache.spark" %% "spark-sql" % "1.3.1"). exclude("org.mortbay.jetty", "servlet-api"). exclude("com.google.guava","guava"). exclude("org.apache.hadoop","hadoop-yarn-api"). exclude("commons-beanutils", "commons-beanutils-core"). exclude("commons-beanutils", "commons-beanutils"). exclude("commons-collections", "commons-collections"). exclude("commons-logging", "commons-logging"). exclude("org.spark-project.spark", "unused"). exclude("com.twitter", "parquet-encoding"). exclude("com.twitter", "parquet-column"). exclude("com.twitter", "parquet-hadoop-bundle"). exclude("org.datanucleus", "datanucleus-api-jdo"). exclude("org.datanucleus", "datanucleus-core"). exclude("org.datanucleus", "datanucleus-rdbms"). exclude("com.esotericsoftware.minlog", "minlog"), ("org.apache.spark" %% "spark-core" % "1.3.1"). exclude("org.mortbay.jetty", "servlet-api"). exclude("com.google.guava","guava"). exclude("org.apache.hadoop","hadoop-yarn-api"). exclude("commons-beanutils", "commons-beanutils-core"). exclude("commons-beanutils", "commons-beanutils"). exclude("commons-collections", "commons-collections"). exclude("commons-logging", "commons-logging"). exclude("org.spark-project.spark", "unused"). exclude("com.twitter", "parquet-encoding"). exclude("com.twitter", "parquet-column"). exclude("com.twitter", "parquet-hadoop-bundle"). exclude("org.datanucleus", "datanucleus-api-jdo"). exclude("org.datanucleus", "datanucleus-core"). exclude("org.datanucleus", "datanucleus-rdbms"). exclude("com.esotericsoftware.minlog", "minlog") )
assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
SparkPi.scala
package sample import scala.math.random import org.apache.spark.mllib.util.MLUtils import org.apache.spark._ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[2]") val spark = new SparkContext(conf) val slices=2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) val outputLocation = "s3n://bucket/output" val pi=4.0 * count / n val data=spark.makeRDD(Seq(pi)) data.saveAsTextFile(outputLocation + "/pi") spark.stop() } }
make
sbt assembly
できたjarファイルをS3にコピーします
実行
EMRのadd StepからCustom JARを選択
JARLocationに先ほどアップしたJarを選択
Argumentに
--verbose sample.SparkPi
こんな感じで実行
しばらくたつと s3://bucket/output/piいかに結果が格納されています。