SparkからHiveが使いづらいというか使えない?のでSparkSQLを使ってみました。
そこそこ試行錯誤する必要があったのでメモです。
データファイル
| 1 | 銘柄コード,日付,始値,高値,安値,終値,出来高 | 
のフォーマットのファイルを用意しておきます。こんな感じ。
| 1 2 3 4 | 1301,2004-04-01,198,198,195,196,651000 1301,2004-04-02,194,196,194,196,490000 1301,2004-04-05,196,200,195,197,1478000 1301,2004-04-06,202,208,200,207,4324000 | 
これをS3へアップしておきます
build.sbt
こんな感じで記述します。build assemblyでエラーが出るのでこんな記述にしています。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | name := "spark_sample" version := "1.0-SNAPSHOT" scalaVersion := "2.11.7" // additional libraries libraryDependencies ++= Seq(  "org.apache.spark" %% "spark-core" % "1.5.2" % "provided",  "org.apache.spark" %% "spark-sql" % "1.5.2",  "org.apache.spark" %% "spark-hive" % "1.5.2",  "org.apache.spark" %% "spark-streaming" % "1.5.2",  "org.apache.spark" %% "spark-streaming-kafka" % "1.5.2",  "org.apache.spark" %% "spark-streaming-flume" % "1.5.2",  "org.apache.spark" %% "spark-mllib" % "1.5.2",  "org.apache.commons" % "commons-lang3" % "3.0",  "org.eclipse.jetty" % "jetty-client" % "8.1.14.v20131031",  "com.typesafe.play" %% "play-json" % "2.3.10",  "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.4",  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.3",  "org.elasticsearch" % "elasticsearch-hadoop-mr" % "2.0.0.RC1",  "net.sf.opencsv" % "opencsv" % "2.0",  "com.twitter.elephantbird" % "elephant-bird" % "4.5",  "com.twitter.elephantbird" % "elephant-bird-core" % "4.5",  "com.hadoop.gplcompression" % "hadoop-lzo" % "0.4.17",  "mysql" % "mysql-connector-java" % "5.1.31",  "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M3",  "com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-M3",  "com.github.scopt" %% "scopt" % "3.2.0",  "org.scalatest" %% "scalatest" % "2.2.1" % "test",  "com.holdenkarau" %% "spark-testing-base" % "1.5.1_0.2.1",  "org.apache.hive" % "hive-jdbc" % "1.2.1" ) resolvers ++= Seq(  "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",  "Spray Repository" at "http://repo.spray.cc/",  "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",  "Akka Repository" at "http://repo.akka.io/releases/",  "Twitter4J Repository" at "http://twitter4j.org/maven2/",  "Apache HBase" at "https://repository.apache.org/content/repositories/releases",  "Twitter Maven Repo" at "http://maven.twttr.com/",  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",  "Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",  Resolver.sonatypeRepo("public") ) mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>  {  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard  case m if m.startsWith("META-INF") => MergeStrategy.discard  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first  case PathList("org", "apache", xs @ _*) => MergeStrategy.first  case PathList("org", "jboss", xs @ _*) => MergeStrategy.first  case "about.html" => MergeStrategy.rename  case "reference.conf" => MergeStrategy.concat  case _ => MergeStrategy.first  } } | 
ちなみにproject/assembly.sbtはこれ
| 1 2 3 | addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0") | 
SqlSample.scala
http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-15-to-16
この辺りを参考に
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | package sample import org.apache.spark.mllib.util.MLUtils import org.apache.spark._ import org.apache.spark.api.java._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ object SqlSample {  def main(args: Array[String]) {  val conf = new SparkConf().setAppName("SparkSQL").setMaster("yarn-cluster")  val sc = new SparkContext(conf)   val sqlContext = new org.apache.spark.sql.SQLContext(sc)  // Import Row.  import org.apache.spark.sql.Row;  // Import Spark SQL data types  import org.apache.spark.sql.types.{StructType,StructField,StringType};  val histRDD = sc.textFile(args(0)).map(_.split(",")).  map(p => Row(p(0), p(1),p(2),p(3),p(4),p(5),p(6)))  val schemaString = "code date open high low close volume"  val schema =  StructType(  schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))   // Apply the schema to the RDD.  val histDataFrame = sqlContext.createDataFrame(histRDD, schema)  // Register the DataFrames as a table.  histDataFrame.registerTempTable("priceHist")  // SQL statements can be run by using the sql methods provided by sqlContext.  val results = sqlContext.sql("SELECT code,date,open FROM priceHist where code='6758'")  val ary=results.map(_.getValuesMap[Any](List("code", "date","open"))).collect()  val outputLocation = args(1) // s3n://bucket/  val data=sc.makeRDD(ary)  data.saveAsTextFile(outputLocation)  sc.stop()  } } | 
build
| 1 | $ sbt package | 
これで作成したJarを同じくS3へアップします
EMR
今までと同様にEMRを作成し、AddStepでSparkApplicationを追加します。Jarは先ほどアップしたものを指定します
Spark-submit options
| 1 | --class sample.SqlSample | 
Arguments
| 1 | s3n://bucket/output | 
ここには出力ファイルが入ります
じっこすればOutputにMapで表現されたデータが保存されます