TensorflowでMNIST(2)

今回は中級?者向けの多層パーセプトロン(multilayer perceptron)を実装します

こちらのコードを流用させていただきました

http://qiita.com/TomokIshii/items/92a266b805d7eee02b1d

前回と同様に、input_data.pyを使わずにデータを自前で作成します

http://d.hatena.ne.jp/anagotan/20160328/1459156607

train.txtとt10k.txtを作成しておきます

tf_mlp.py

#!/bin/env python
# -*- coding: utf-8 -*-
# http://qiita.com/ikki8412/items/95bc81a744dc377d9119
import tensorflow as tf
import numpy as np
import random
import time
import math

NUMCLASS=10
NUMPARAM=784
NUMHIDDEN=625
   
### データ処理用
def label_data(lines):
  labels=[]
  for line in lines:
    # ラベルを1-of-k方式で用意する
    tmp = np.zeros(NUMCLASS)
    tmp[int(line)] = 1
    labels.append(tmp)
  return np.asarray(labels)

def image_data(test):
  test_image=map(lambda n: map(lambda k: float(k)/255.0,n),test[:,1:NUMPARAM+1])
  return np.asarray(test_image)


# 開始時刻
start_time = time.time()
print "開始時刻: " + str(start_time)


### データ取得 --->
# ファイルを開く
f = open("train.txt", 'r')
# データを入れる配列
train = []
for line in f:
    # 改行を除いてスペース区切りにする
    line = line.rstrip()
    l = line.split(" ")
    l = map(lambda n: int(n),l)
    #l=map(lambda n: 0 if n=="0" else 1,l)
    train.append(l)


# numpy形式に変換
train = np.asarray(train)
f.close()

f = open("t10k.txt", 'r')
test = []
for line in f:
    line = line.rstrip()
    l = line.split(" ")
    l = map(lambda n: int(n),l)
    #l=map(lambda n: 0 if n=="0" else 1,l)
    test.append(l)

test = np.asarray(test)
f.close()
### データ取得 ---<


# 訓練画像を入れる変数
# 訓練画像は28x28pxであり、これらを1行784列のベクトルに並び替え格納する
# Noneとなっているのは訓練画像がいくつでも入れられるようにするため
x = tf.placeholder(tf.float32, [None, NUMPARAM], name="x-input")

# 交差エントロピー
# y_は正解データのラベル
# 損失とオプティマイザを定義します
y_ = tf.placeholder(tf.float32, [None, NUMCLASS], name="y-input")


# hidden1
with tf.name_scope("hidden_layer1") as scope:
  w_h = tf.Variable(tf.random_normal([NUMPARAM, NUMHIDDEN],mean=0.0, stddev=0.05))
  b_h = tf.Variable(tf.zeros([NUMHIDDEN]),name='biases')

  h = tf.sigmoid(tf.matmul(x, w_h) + b_h)
# output layer
with tf.name_scope("output_layer") as scope:
  w_o = tf.Variable(tf.truncated_normal([NUMHIDDEN, NUMCLASS],mean=0.0, stddev=0.05))
  b_o = tf.Variable(tf.zeros([NUMCLASS]),name='biases')

  y = tf.nn.softmax((tf.matmul(h, w_o) + b_o))


# 更なる name scopes はグラフ表現をクリーンアップしま
with tf.name_scope("xent") as scope:
  # Cost Function basic term
  cross_entropy = -tf.reduce_sum(y_*tf.log(y))
  
  # Regularization terms (weight decay)
  L2_sqr = tf.nn.l2_loss(w_h) + tf.nn.l2_loss(w_o)
  lambda_2 = 0.01
  # the loss and accuracy
  loss = cross_entropy + lambda_2 * L2_sqr

  # TensorBoardで表示するよう指定
  tf.scalar_summary("cross_entropy", cross_entropy)

  # 勾配硬化法を用い交差エントロピーが最小となるようyを最適化する
  train_step = tf.train.GradientDescentOptimizer(0.001).minimize(cross_entropy)

# 用意した変数Veriableの初期化を実行する
init = tf.initialize_all_variables()

# Sessionを開始する
# runすることで初めて実行開始される(run(init)しないとinitが実行されない)
sess = tf.Session()
sess.run(init)
# TensorBoardで表示する値の設定
summary_op = tf.merge_all_summaries()
summary_writer = tf.train.SummaryWriter("/tmp/data", sess.graph_def)


# 1000回の訓練(train_step)を実行する
# next_batch(100)で100つのランダムな訓練セット(画像と対応するラベル)を選択する
# 訓練データは60000点あるので全て使いたいところだが費用つまり時間がかかるのでランダムな100つを使う
# 100つでも同じような結果を得ることができる
# feed_dictでplaceholderに値を入力することができる
print "--- 訓練開始 ---"
for i in range(20000):
  train_sample=np.asarray(random.sample(train,100))
  batch_ys=label_data(train_sample[:,0])
  batch_xs=image_data(train_sample)
  train_accuracy=sess.run(train_step, feed_dict={x: batch_xs, y_:batch_ys})

  # 1 step終わるたびにTensorBoardに表示する値を追加する
  summary_str=sess.run(summary_op, feed_dict={x: batch_xs, y_:batch_ys})
  summary_writer.add_summary(summary_str, i)
print "--- 訓練終了 ---"

# 正しいかの予測
# 計算された画像がどの数字であるかの予測yと正解ラベルy_を比較する
# 同じ値であればTrueが返される
# argmaxは配列の中で一番値の大きい箇所のindexが返される
# 一番値が大きいindexということは、それがその数字である確率が一番大きいということ
# Trueが返ってくるということは訓練した結果と回答が同じということ
with tf.name_scope("test") as scope:
  correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))

# 精度の計算
# correct_predictionはbooleanなのでfloatにキャストし、平均値を計算する
# Trueならば1、Falseならば0に変換される
  accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))

  tf.scalar_summary("accuracy", accuracy)

# 精度の実行と表示
# テストデータの画像とラベルで精度を確認する
# ソフトマックス回帰によってWとbの値が計算されているので、xを入力することでyが計算できる
test_label=label_data(test[:,0])
test_image=image_data(test)
print "精度"
print(sess.run(accuracy, feed_dict={x: test_image, y_: test_label}))

# 終了時刻
end_time = time.time()
print "終了時刻: " + str(end_time)
print "かかった時間: " + str(end_time - start_time)

実行

$ python tf_mlp.py
開始時刻: 1459237690.97
--- 訓練開始 ---
--- 訓練終了 ---
精度
0.9562
終了時刻: 1459238511.06
かかった時間: 820.087219

回帰の場合には0.9227だったのですが、0.9562まで精度が上がりました

TensorflowでMNIST(1)

GoogleのDeeplearningプラットフォームであるtensorflowを触ってみました。

https://www.tensorflow.org/

世の中にはMNISTのサンプルを実行したブログが多いのですが、tutorialを開設しているだけのものが多くちょっとよく理解できていませんでした

自分なりに色々と調べてMNISTを理解していきます

まずはBeginnerということのサンプルです。

Beginnerというか、Deeplearningというよりは回帰分析をTensorflowで行っているというサンプルです

データダウンロード

input_data.pyを使うとよくわからないので自分でデータ取得からハンドリングします。

まず、データをダウンロードします。

https://www.tensorflow.org/versions/master/tutorials/mnist/download/index.html

こちらの真ん中ほどにあるリンクから下記4つをダウンロードし解凍しておきます。

train-images-idx3-ubyte.gz

train-labels-idx1-ubyte.gz

t10k-images-idx3-ubyte.gz

t10k-labels-idx1-ubyte.gz

$ gunzip train-images-idx3-ubyte.gz
$ gunzip train-labels-idx1-ubyte.gz
$ gunzip t10k-images-idx3-ubyte.gz
$ gunzip t10k-labels-idx1-ubyte.gz

データの整形

そのままでは使いづらいので整形します

od -An -v -tu1 -j16 -w784 train-images-idx3-ubyte | sed 's/^ *//' | tr -s ' ' >train-images.txt
od -An -v -tu1 -j8 -w1 train-labels-idx1-ubyte | tr -d ' ' >train-labels.txt
od -An -v -tu1 -j16 -w784 t10k-images-idx3-ubyte | sed 's/^ *//' | tr -s ' ' >t10k-images.txt
od -An -v -tu1 -j8 -w1 t10k-labels-idx1-ubyte | tr -d ' ' >t10k-labels.txt
file_join(){
image=$1
label=$2
ruby < train.txt
file_join t10k-images.txt t10k-labels.txt > t10k.txt

train.txtとt10k.txtというファイルが作成されます。このファイルは1行ごとにMNISTの画像データの数値データ、0-255までの値で構成されています。その行の先頭に正解数字を入れてたデータです。

Deeplearning

Tensorflowのプログラムはこちらの方のサンプルを流用させていただきました

http://tensorflow.classcat.com/2016/02/11/tensorflow-how-tos-visualizing-learning/

#!/bin/env python
# -*- coding: utf-8 -*-
# http://tensorflow.classcat.com/2016/02/11/tensorflow-how-tos-visualizing-learning/
import tensorflow as tf
import numpy as np
import random
import time

NUMCLASS=10
NUMPARAM=784

### データ処理用
def label_data(lines):
  labels=[]
  for line in lines:
    # ラベルを1-of-k方式で用意する
    tmp = np.zeros(NUMCLASS)
    tmp[int(line)] = 1
    labels.append(tmp)
  return np.asarray(labels)

def image_data(test):
  test_image=map(lambda n: map(lambda k: float(k)/255.0,n),test[:,1:NUMPARAM+1])
  return np.asarray(test_image)


# 開始時刻
start_time = time.time()
print "開始時刻: " + str(start_time)


### データ取得 --->
# ファイルを開く
f = open("train.txt", 'r')
# データを入れる配列
train = []
for line in f:
    # 改行を除いてスペース区切りにする
    line = line.rstrip()
    l = line.split(" ")
    l = map(lambda n: int(n),l)
    #l=map(lambda n: 0 if n=="0" else 1,l)
    train.append(l)


# numpy形式に変換
train = np.asarray(train)
f.close()

f = open("t10k.txt", 'r')
test = []
for line in f:
    line = line.rstrip()
    l = line.split(" ")
    l = map(lambda n: int(n),l)
    #l=map(lambda n: 0 if n=="0" else 1,l)
    test.append(l)

test = np.asarray(test)
f.close()
### データ取得 ---<


# ファイルを開く
f = open("train.txt", 'r')
# データを入れる配列
train = []
for line in f:
    # 改行を除いてスペース区切りにする
    line = line.rstrip()
    l = line.split(" ")
    l = map(lambda n: int(n),l)
    #l=map(lambda n: 0 if n=="0" else 1,l)
    train.append(l)


# numpy形式に変換
train = np.asarray(train)
f.close()

f = open("t10k.txt", 'r')
test = []
for line in f:
    line = line.rstrip()
    l = line.split(" ")
    l = map(lambda n: int(n),l)
    #l=map(lambda n: 0 if n=="0" else 1,l)
    test.append(l)

test = np.asarray(test)
f.close()

### データ取得 ---
test_label=label_data(test[:,0])
test_image=image_data(test)
print "精度"
print(sess.run(accuracy, feed_dict={x: test_image, y_: test_label}))

# 終了時刻
end_time = time.time()
print "終了時刻: " + str(end_time)
print "かかった時間: " + str(end_time - start_time)

実行

これを実行します

$ python tf_regression.py
開始時刻: 1459234322.4
--- 訓練開始 ---
--- 訓練終了 ---
精度
0.9227
終了時刻: 1459234921.58
かかった時間: 599.178552866

精度はあまり良くありませんが計算できました

raspberry piでWifiを固定IPで使う

ハードウエア

raspberry pi B+

wifiアダプタ WNG150U

OS

RASPBIAN JESSIE

設定

ハードウエアの確認

$ lsusb
Bus 001 Device 004: ID 04bb:094c I-O Data Device, Inc. 

ESSIDの確認

$ sudo iwlist wlan0 scan | grep ESSID
										ESSID:"ESSID"

wpa_supplicant.conf

$ sudo su 
#	chmod 660 root% chmod 660 /etc/wpa_suoplicant/wpa_supplicant.comf
# wpa_passphrase "SSID" "KEY" >> /etc/wpa_supplicant/wpa_supplicant.conf
# cat /etc/wpa_supplicant/wpa_supplicant.conf
ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
update_config=1
network={
	ssid="SSID"
	#psk="KEY"
	psk=ハッシュ化されたキー
}

wpa_supplicant.confの編集

ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev
update_config=1
network={
	ssid="SSID"
	proto=WPA2
	key_mgmt=WPA-PSK
	pairwise=TKIP CCMP
	group=TKIP CCMP
	#psk="KEY"
	psk=ハッシュ化されたキー
	scan_ssid=1
}
/etc/dhcpcd.conf
>||

interface wlan0
static ip_address=192.168.11.21/24
static routers=192.168.11.254
static domain_name_servers=192.168.10.1

こんな感じでリブート

 $ ifconfig
eth0			Link encap:イーサネット	ハードウェアアドレス b8:27:eb:9c:30:07 
					inetアドレス:192.168.11.20 ブロードキャスト:192.168.11.255	マスク:255.255.255.0
					inet6アドレス: 240f:79:a8f8:1:b289:a029:fc2e:6ac/64 範囲:グローバル
					inet6アドレス: fe80::7474:4b62:ad78:2a00/64 範囲:リンク
					UP BROADCAST RUNNING MULTICAST	MTU:1500	メトリック:1
					RXパケット:2704 エラー:0 損失:594 オーバラン:0 フレーム:0
					TXパケット:607 エラー:0 損失:0 オーバラン:0 キャリア:0
			衝突(Collisions):0 TXキュー長:1000 
					RXバイト:161138 (157.3 KiB)	TXバイト:88612 (86.5 KiB)

lo				Link encap:ローカルループバック	
					inetアドレス:127.0.0.1 マスク:255.0.0.0
					inet6アドレス: ::1/128 範囲:ホスト
					UP LOOPBACK RUNNING	MTU:65536	メトリック:1
					RXパケット:140 エラー:0 損失:0 オーバラン:0 フレーム:0
					TXパケット:140 エラー:0 損失:0 オーバラン:0 キャリア:0
			衝突(Collisions):0 TXキュー長:0 
					RXバイト:11756 (11.4 KiB)	TXバイト:11756 (11.4 KiB)

wlan0		 Link encap:イーサネット	ハードウェアアドレス 34:76:c5:5d:7e:6c 
					inetアドレス:192.168.11.21 ブロードキャスト:192.168.11.255	マスク:255.255.255.0
					inet6アドレス: fe80::d437:7e2c:6380:2ccc/64 範囲:リンク
					inet6アドレス: 240f:79:a8f8:1:98a1:f100:4d2b:b96b/64 範囲:グローバル
					UP BROADCAST RUNNING MULTICAST	MTU:1500	メトリック:1
					RXパケット:3501 エラー:0 損失:632 オーバラン:0 フレーム:0
					TXパケット:119 エラー:0 損失:3 オーバラン:0 キャリア:0
			衝突(Collisions):0 TXキュー長:1000 
					RXバイト:582397 (568.7 KiB)	TXバイト:20034 (19.5 KiB)

認識されました

RINEXデータから緯度経度を取得(簡易版)

rinexデータのヘッダ部分にある「APPROX POSITION XYZ」から緯度経度を求めるスクリプト

rinexデータは国土地理院から取得できるファイルを使用します

lonlat.rb

#!/bin/env ruby 


class LonLat
	def exec(file)
		x,y,z=get_xyz(file)
		lat=Math.asin(z.to_f/6371000)*180.0/Math::PI
		lon=Math.atan2(y.to_f,x.to_f)*180.0/Math::PI
		p "latitude="+lat.to_s
		p "longitude="+lon.to_s
	end

	def get_xyz(file)
		open(file).each do |line|
			if line.include?("APPROX POSITION XYZ") then
				return line.gsub(/APPROX POSITION XYZ/,"").split(" ")
			end
		end
	end
end

if __FILE__ == $0 then
	if ARGV[0] == nil then
		p "usage:"+$0+" rinex.o"
		exit 0
	end
	LonLat.new.exec(ARGV[0])
end

実行結果

$ ruby lonlat.rb 00010700.11o
"latitude=45.1780484488787"
"longitude=141.7504468409626"

EC2インスタンスでTensorflow

GoogleのTensorflow、GPUマシンでないとなかなか性能がでないので

EC2で作成してみます。

TensorflowはCUDA3.5以降対応だとかで、AWSのEC2インスタンスで使用可能なg2.2xlargeではCUDA3.0。ということでそのままでは使えないそうです

というわけで、いろいろ調べたところ、偉い方々が手順を示してくれています。

https://www.tecnos-dsm.co.jp/archives/info/technical_info_04

2016/3/17現在、これらの手順ですとTensorflowをコンパイルする際にエラーになります

$ bazel build -c opt --config=cuda //tensorflow/tools/pip_package:build_pip_package
.......
ERROR: /home/ubuntu/tensorflow/WORKSPACE:16:6: First argument of load() is a path, not a label. It should start with a single slash if it is an absolute path..
ERROR: WORKSPACE file could not be parsed.
ERROR: no such package 'external': Package 'external' contains errors.
INFO: Elapsed time: 0.444s

これの回避策がこちらに

http://stackoverflow.com/questions/34941620/unable-to-build-tensorflow-from-source-with-bazel-22nd-january-2016

単純な話でbazelのバージョンのせいだとか。

bazelをコンパイルしなおします

git clone https://github.com/bazelbuild/bazel.git
cd bazel
git checkout tags/0.1.4
./compile.sh
sudo cp output/bazel /usr/bin

その後、Tensorflowのコンパイル

$ bazel build -c opt --config=cuda //tensorflow/tools/pip_package:build_pip_package
Extracting Bazel installation...
Sending SIGTERM to previous Bazel server (pid=11695)... done.
.......
INFO: Found 1 target...
INFO: From Executing genrule @png_archive//:configure [for host]:
/home/ubuntu/.cache/bazel/_bazel_ubuntu/ad1e09741bb4109fbc70ef8216b59ee2/tensorflow/external/png_archive/libpng-1.2.53 /home/ubuntu/.cache/bazel/_bazel_ubuntu/ad1e09741bb4109fbc70ef8216b59ee2/tensorflow
...

うまくいきました

Docker in DockerのJenkins環境をdockerで構築

dockerで構築したjnekins内でdockerイメージを作成する

これができるとjenkinsでDockerイメージまで作成し、デプロイが簡単になります。

Dindイメージを使って作成してみます

Dockerfile

FROM docker:dind

############### add tools(jdk etc...) ####################
RUN apk update
RUN apk add wget git curl zip unzip	make bzip2 zlib openssl g++ libstdc++

# jdk
RUN apk add	openjdk7

# ant
RUN apk add apache-ant --update-cache --repository http://dl-4.alpinelinux.org/alpine/edge/testing/ --allow-untrusted
RUN echo "export PATH=$PATH:/usr/share/java/apache-ant/bin" >> /root/.bashrc

# node
RUN apk add	nodejs

# grunt
RUN npm install -g grunt-cli
# gulp
RUN npm install -g gulp

# Ruby
RUN apk add libffi libffi-dev gcc
RUN apk add ruby ruby-dev

# add
RUN apk add	expect

RUN apk add freetype freetype-dev fontconfig
RUN apk add xorg-server
RUN apk add bash
RUN apk add rsync --update-cache --repository http://dl-4.alpinelinux.org/alpine/edge/testing/ --allow-untrusted
RUN apk add openssh --update-cache --repository http://dl-4.alpinelinux.org/alpine/edge/testing/ --allow-untrusted

################## set up Jenkins ##########################

RUN apk --update add openjdk7 ttf-dejavu && rm -rf /var/cache/apk/*
ENV JENKINS_HOME /var/lib/jenkins

RUN mkdir -p /usr/local/jenkins
RUN adduser -D -H -s /bin/sh jenkins
RUN chown -R jenkins:jenkins /usr/local/jenkins/
ADD jenkins.war /usr/local/jenkins/jenkins.war
RUN chmod 644 /usr/local/jenkins/jenkins.war

# jst
RUN apk add tzdata --update-cache --repository http://dl-4.alpinelinux.org/alpine/edge/testing/ --allow-untrusted
RUN ln -sf	/usr/share/zoneinfo/Asia/Tokyo /etc/localtime

################### set process ##################
COPY jenkins.sh /usr/local/bin/jenkins.sh
ENTRYPOINT /usr/local/bin/start.sh && /bin/bash

start.sh

#!/bin/sh
set -eo pipefail

# If there are any arguments then we want to run those instead
java -jar /usr/local/jenkins/jenkins.war --httpPort=8080	> /tmp/jenkins.log 2>&1 &	

set -e
docker daemon \
								--host=unix:///var/run/docker.sock \
								--host=tcp://0.0.0.0:2375 \
								--storage-driver=vfs \
--bip=172.17.101.1/24		# docker0のアドレス域を変更

docker-compose.xml

ついでにdocker-composeで起動するように設定します

myjenkins:
	restart: always
	image: myjenkins:1.0
	ports: 
		- "8080:8080"
	volumes:
		- /data/jenkins:/var/lib/jenkins		# host volumeをマップング
		- /data/git:/opt/git:ro
	env_file:
		- ./.docker-compose.env
	environment:
		- JAVA_OPTS=-XX:MaxPermSize=256m
	privileged: true

.docker-compose.env

NO_PROXY=localhost,127.0.0.1
no_proxy=localhost,127.0.0.1
HTTP_PROXY=
HTTPS_PROXY=
http_proxy=
https_proxy=

起動

# docker build -t myjenkins:1.0 .
# docker-compose -f docker-compose.yml up -d myjenkins

RでSHA

RでSHA1を作成する。serialize=Fを入れるのがポイント

> install.packages("digest")
> library(digest)
> digest("abcdefg",algo="sha1",serialize=F)
[1] "2fb5e13419fc89246865e7a324f476ec624e8740"

rubyの出力との比較

$ irb
irb(main):001:0> require 'digest/sha1'
=> true
irb(main):002:0> Digest::SHA1.hexdigest("abcdefg")
=> "2fb5e13419fc89246865e7a324f476ec624e8740"

expectの書き方

よく忘れるので備忘録

#!/bin/bash
my_ssh(){
/usr/bin/expect << Eof
set timeout -1
spawn /usr/bin/ssh -o stricthostkeychecking=no root@hostname bash /path/to/script.sh
match_max 100000
expect -exact "root@hostname's password: "
send -- "password\r"
expect eof
Eof
}
my_ssh

EMRのSparkでWordCount

BODY:

EMRではSparkでファイルを開く際には*が使えるみたいだ 

package sample
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object WordCount {
			def main(args: Array[String]) {
				println("wordcount,args="+args(0)+","+args(1))
			val conf = new SparkConf().setAppName("wordcount").setMaster("yarn-cluster")
			val sc = new SparkContext(conf)
			
				val textFile = sc.textFile(args(0))	// s3n://bucket/*gz
			val counts = textFile.flatMap(line => line.split(" "))
										 .map(word => (word, 1))
										 .reduceByKey(_ + _)
				println("counts="+counts.id+","+counts.name)
				println(counts.toDebugString)
			counts.saveAsTextFile(args(1))
			}
}

こんな感じのBOWを数えるスクリプトを作成

s3にはgzで固められたファイルがたくさんある場合には

spark-submit --deploy-mode cluster --class sample.WordCount s3://bucket/dir/wordcount.jar s3n://bucket/log/*.gz s3n://bucket/output

このような指定で起動すると全ファイルを解凍しながら計算し、outputへ結果を保存します

EMRでSparkSQLサンプル

SparkからHiveが使いづらいというか使えない?のでSparkSQLを使ってみました。

そこそこ試行錯誤する必要があったのでメモです。

データファイル

銘柄コード,日付,始値,高値,安値,終値,出来高

のフォーマットのファイルを用意しておきます。こんな感じ。

1301,2004-04-01,198,198,195,196,651000
1301,2004-04-02,194,196,194,196,490000
1301,2004-04-05,196,200,195,197,1478000
1301,2004-04-06,202,208,200,207,4324000

これをS3へアップしておきます

build.sbt

こんな感じで記述します。build assemblyでエラーが出るのでこんな記述にしています。

name := "spark_sample"

version := "1.0-SNAPSHOT"

scalaVersion := "2.11.7"

// additional libraries
libraryDependencies ++= Seq(
	"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
	"org.apache.spark" %% "spark-sql" % "1.5.2",
	"org.apache.spark" %% "spark-hive" % "1.5.2",
	"org.apache.spark" %% "spark-streaming" % "1.5.2",
	"org.apache.spark" %% "spark-streaming-kafka" % "1.5.2",
	"org.apache.spark" %% "spark-streaming-flume" % "1.5.2",
	"org.apache.spark" %% "spark-mllib" % "1.5.2",
	"org.apache.commons" % "commons-lang3" % "3.0",
	"org.eclipse.jetty"	% "jetty-client" % "8.1.14.v20131031",
	"com.typesafe.play" %% "play-json" % "2.3.10",
	"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.4",
	"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.3",
	"org.elasticsearch" % "elasticsearch-hadoop-mr" % "2.0.0.RC1",
	"net.sf.opencsv" % "opencsv" % "2.0",
	"com.twitter.elephantbird" % "elephant-bird" % "4.5",
	"com.twitter.elephantbird" % "elephant-bird-core" % "4.5",
	"com.hadoop.gplcompression" % "hadoop-lzo" % "0.4.17",
	"mysql" % "mysql-connector-java" % "5.1.31",
	"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M3",
	"com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-M3",
	"com.github.scopt" %% "scopt" % "3.2.0",
	"org.scalatest" %% "scalatest" % "2.2.1" % "test",
	"com.holdenkarau" %% "spark-testing-base" %	"1.5.1_0.2.1",
	"org.apache.hive" % "hive-jdbc" % "1.2.1"
)

resolvers ++= Seq(
	"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
	"Spray Repository" at "http://repo.spray.cc/",
	"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
	"Akka Repository" at "http://repo.akka.io/releases/",
	"Twitter4J Repository" at "http://twitter4j.org/maven2/",
	"Apache HBase" at "https://repository.apache.org/content/repositories/releases",
	"Twitter Maven Repo" at "http://maven.twttr.com/",
	"scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
	"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
	"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
	"Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
	Resolver.sonatypeRepo("public")
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
	{
		case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
		case m if m.startsWith("META-INF") => MergeStrategy.discard
		case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
		case PathList("org", "apache", xs @ _*) => MergeStrategy.first
		case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
		case "about.html"	=> MergeStrategy.rename
		case "reference.conf" => MergeStrategy.concat
		case _ => MergeStrategy.first
	}
}

ちなみにproject/assembly.sbtはこれ

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

SqlSample.scala

http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-15-to-16

この辺りを参考に

package sample
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark._
import org.apache.spark.api.java._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

object SqlSample {
	def main(args: Array[String]) {
		val conf = new SparkConf().setAppName("SparkSQL").setMaster("yarn-cluster")
		val sc = new SparkContext(conf)	

		val sqlContext = new org.apache.spark.sql.SQLContext(sc)
		// Import Row.
		import org.apache.spark.sql.Row;

		// Import Spark SQL data types
		import org.apache.spark.sql.types.{StructType,StructField,StringType};

		val histRDD = sc.textFile(args(0)).map(_.split(",")).
			map(p => Row(p(0), p(1),p(2),p(3),p(4),p(5),p(6)))
		val schemaString = "code date open high low close volume"
		val schema =
				StructType(
				schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))	
				
		// Apply the schema to the RDD.
		val histDataFrame = sqlContext.createDataFrame(histRDD, schema)
		// Register the DataFrames as a table.
		histDataFrame.registerTempTable("priceHist")
		
		// SQL statements can be run by using the sql methods provided by sqlContext.
		val results = sqlContext.sql("SELECT code,date,open FROM priceHist where code='6758'")

		val ary=results.map(_.getValuesMap[Any](List("code", "date","open"))).collect()

		val outputLocation = args(1) // s3n://bucket/
		val data=sc.makeRDD(ary)
		data.saveAsTextFile(outputLocation)

		sc.stop()
	}
}

build

$ sbt package

これで作成したJarを同じくS3へアップします

EMR

今までと同様にEMRを作成し、AddStepでSparkApplicationを追加します。Jarは先ほどアップしたものを指定します

Spark-submit options
--class sample.SqlSample
Arguments
s3n://bucket/output

ここには出力ファイルが入ります

じっこすればOutputにMapで表現されたデータが保存されます