結構はまってしまったのでメモ
VPC内に作成したEMRでSparkを動かしました。サンプルはいろいろなところにそこそこあるのですが、どうもきちっと動くものがなく結構苦労してしまいました。
EMR
まずはVPC内にEMRを作成します。EMRのコンソール画面を開き、CreateClusterを押します。
VPC内に作成するためには上の方にあるGo to Advanced optionから進む必要があります。
VPCとサブネットを設定し、パーミッションなどを設定しクラスターを作成します。
計算プログラム
こんな感じでディレクトリ作成します
なおscala 2.10,jdk1.8を使用しています
s3にあらかじめbucketという名のbucketとその下にoutputディレクトリを作成し、EMRから触れるようにパーミッションを設定しておきます
├── build.sbt
├── project
│ └── assembly.sbt
├── src
│ ├── main
│ │ ├── java
│ │ ├── resources
│ │ └── scala
│ │ └── sample
│ │ └── SparkPi.scala
│ └── test
│ ├── resources
│ └── scala
└── target
build.sbt
name := "emrscala"
version := "0.0.1"
scalaVersion := "2.10.5"
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-sql" % "1.3.1").
exclude("org.mortbay.jetty", "servlet-api").
exclude("com.google.guava","guava").
exclude("org.apache.hadoop","hadoop-yarn-api").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-beanutils", "commons-beanutils").
exclude("commons-collections", "commons-collections").
exclude("commons-logging", "commons-logging").
exclude("org.spark-project.spark", "unused").
exclude("com.twitter", "parquet-encoding").
exclude("com.twitter", "parquet-column").
exclude("com.twitter", "parquet-hadoop-bundle").
exclude("org.datanucleus", "datanucleus-api-jdo").
exclude("org.datanucleus", "datanucleus-core").
exclude("org.datanucleus", "datanucleus-rdbms").
exclude("com.esotericsoftware.minlog", "minlog"),
("org.apache.spark" %% "spark-mllib" % "1.3.1").
exclude("org.mortbay.jetty", "servlet-api").
exclude("com.google.guava","guava").
exclude("org.apache.hadoop","hadoop-yarn-api").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-beanutils", "commons-beanutils").
exclude("commons-collections", "commons-collections").
exclude("commons-logging", "commons-logging").
exclude("org.spark-project.spark", "unused").
exclude("com.twitter", "parquet-encoding").
exclude("com.twitter", "parquet-column").
exclude("com.twitter", "parquet-hadoop-bundle").
exclude("org.datanucleus", "datanucleus-api-jdo").
exclude("org.datanucleus", "datanucleus-core").
exclude("org.datanucleus", "datanucleus-rdbms").
exclude("com.esotericsoftware.minlog", "minlog"),
("org.apache.spark" %% "spark-hive" % "1.3.1").
exclude("org.mortbay.jetty", "servlet-api").
exclude("com.google.guava","guava").
exclude("org.apache.hadoop","hadoop-yarn-api").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-beanutils", "commons-beanutils").
exclude("commons-collections", "commons-collections").
exclude("commons-logging", "commons-logging").
exclude("org.spark-project.spark", "unused").
exclude("com.twitter", "parquet-encoding").
exclude("com.twitter", "parquet-column").
exclude("com.twitter", "parquet-hadoop-bundle").
exclude("org.datanucleus", "datanucleus-api-jdo").
exclude("org.datanucleus", "datanucleus-core").
exclude("org.datanucleus", "datanucleus-rdbms").
exclude("com.esotericsoftware.minlog", "minlog"),
("org.apache.spark" %% "spark-sql" % "1.3.1").
exclude("org.mortbay.jetty", "servlet-api").
exclude("com.google.guava","guava").
exclude("org.apache.hadoop","hadoop-yarn-api").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-beanutils", "commons-beanutils").
exclude("commons-collections", "commons-collections").
exclude("commons-logging", "commons-logging").
exclude("org.spark-project.spark", "unused").
exclude("com.twitter", "parquet-encoding").
exclude("com.twitter", "parquet-column").
exclude("com.twitter", "parquet-hadoop-bundle").
exclude("org.datanucleus", "datanucleus-api-jdo").
exclude("org.datanucleus", "datanucleus-core").
exclude("org.datanucleus", "datanucleus-rdbms").
exclude("com.esotericsoftware.minlog", "minlog"),
("org.apache.spark" %% "spark-core" % "1.3.1").
exclude("org.mortbay.jetty", "servlet-api").
exclude("com.google.guava","guava").
exclude("org.apache.hadoop","hadoop-yarn-api").
exclude("commons-beanutils", "commons-beanutils-core").
exclude("commons-beanutils", "commons-beanutils").
exclude("commons-collections", "commons-collections").
exclude("commons-logging", "commons-logging").
exclude("org.spark-project.spark", "unused").
exclude("com.twitter", "parquet-encoding").
exclude("com.twitter", "parquet-column").
exclude("com.twitter", "parquet-hadoop-bundle").
exclude("org.datanucleus", "datanucleus-api-jdo").
exclude("org.datanucleus", "datanucleus-core").
exclude("org.datanucleus", "datanucleus-rdbms").
exclude("com.esotericsoftware.minlog", "minlog")
)
assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
SparkPi.scala
package sample
import scala.math.random
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[2]")
val spark = new SparkContext(conf)
val slices=2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
val outputLocation = "s3n://bucket/output"
val pi=4.0 * count / n
val data=spark.makeRDD(Seq(pi))
data.saveAsTextFile(outputLocation + "/pi")
spark.stop()
}
}
make
sbt assembly
できたjarファイルをS3にコピーします
実行
EMRのadd StepからCustom JARを選択
JARLocationに先ほどアップしたJarを選択
Argumentに
--verbose sample.SparkPi
こんな感じで実行
しばらくたつと s3://bucket/output/piいかに結果が格納されています。