EMRのSparkでWordCount

BODY:

EMRではSparkでファイルを開く際には*が使えるみたいだ 

package sample
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object WordCount {
			def main(args: Array[String]) {
				println("wordcount,args="+args(0)+","+args(1))
			val conf = new SparkConf().setAppName("wordcount").setMaster("yarn-cluster")
			val sc = new SparkContext(conf)
			
				val textFile = sc.textFile(args(0))	// s3n://bucket/*gz
			val counts = textFile.flatMap(line => line.split(" "))
										 .map(word => (word, 1))
										 .reduceByKey(_ + _)
				println("counts="+counts.id+","+counts.name)
				println(counts.toDebugString)
			counts.saveAsTextFile(args(1))
			}
}

こんな感じのBOWを数えるスクリプトを作成

s3にはgzで固められたファイルがたくさんある場合には

spark-submit --deploy-mode cluster --class sample.WordCount s3://bucket/dir/wordcount.jar s3n://bucket/log/*.gz s3n://bucket/output

このような指定で起動すると全ファイルを解凍しながら計算し、outputへ結果を保存します