emr-4.2.0をベースにAdvancedOptionでSpark1.5.2を追加しクラスターを作成しておきます
今回はPiをモンテカルロシミュレーションで計算するSpark付属のサンプルプログラムをちょっと改造して使用します
build.sbt
build.sbtはこんな感じ
name := "spark_sample" version := "1.0-SNAPSHOT" scalaVersion := "2.11.7" // additional libraries libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.5.2", "org.apache.spark" %% "spark-sql" % "1.5.2", "org.apache.spark" %% "spark-mllib" % "1.5.2" )
SparkPi
SparkConfを作成する際のここがポイントです
val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント
package sample /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import scala.math.random import org.apache.spark.mllib.util.MLUtils import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkPi").setMaster("yarn-cluster") // ここがポイント val spark = new SparkContext(conf) val slices = 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x * x + y * y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) val outputLocation = args(0) // s3n://bucket/ val pi = 4.0 * count / n val data = spark.makeRDD(Seq(pi)) println(pi) data.saveAsTextFile(outputLocation) spark.stop() } }
ビルド
$ sbt packge
Jarファイルが作成されたらS3にアップしておきます
EMRでの実行
AWSコンソールからEMRで作成したクラスターを選択し、AddStepで先ほどアップしたJarファイルを指定し追加します
step typeにはSpark applicationを選択、
Spark-submit optionsに
--class sample.SparkPi --verbose
Argumentsに出力を保存するS3のロケーションを入れておきます。すでにフォルダがあるとエラーになるので注意
s3n://bucketname/output
実行後、出力先にファイルが作成されます