emr-4.2.0をベースにAdvancedOptionでSpark1.5.2を追加しクラスターを作成しておきます
今回はPiをモンテカルロシミュレーションで計算するSpark付属のサンプルプログラムをちょっと改造して使用します
build.sbt
build.sbtはこんな感じ
1 2 3 4 5 6 7 8 9 10 11 12 | name := "spark_sample" version := "1.0-SNAPSHOT" scalaVersion := "2.11.7" // additional libraries libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.5.2", "org.apache.spark" %% "spark-sql" % "1.5.2", "org.apache.spark" %% "spark-mllib" % "1.5.2" ) |
SparkPi
SparkConfを作成する際のここがポイントです
1 | val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | package sample /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import scala.math.random import org.apache.spark.mllib.util.MLUtils import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント val spark = new SparkContext(conf) val slices = 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x * x + y * y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) val outputLocation = args(0) // s3n://bucket/ val pi = 4.0 * count / n val data = spark.makeRDD(Seq(pi)) println(pi) data.saveAsTextFile(outputLocation) spark.stop() } } |
ビルド
1 | $ sbt packge |
Jarファイルが作成されたらS3にアップしておきます
EMRでの実行
AWSコンソールからEMRで作成したクラスターを選択し、AddStepで先ほどアップしたJarファイルを指定し追加します
step typeにはSpark applicationを選択、
Spark-submit optionsに
1 | --class sample.SparkPi --verbose |
Argumentsに出力を保存するS3のロケーションを入れておきます。すでにフォルダがあるとエラーになるので注意
1 | s3n://bucketname/output |
実行後、出力先にファイルが作成されます