scalaのRedisクライアントbrandoを使ってみる(その3)

PlayFramework2.2でscalaのRedisクライアントbrandoを使用する際にはちょっと注意が必要です。

brando1.0はakka2.3対応らしく、Play2.2ではまだakka2.2の対応となっているためそのままでは動きません。

仕方がないのでbrando0.3にバージョンを落として使用します。

また前回のサンプルだと、キーに値がない場合にはエラーになってしまいます。

そこでbrando0.3を少し改造します

  • brand.Brando.scala
		case Tcp.Received(data) ⇒
			parseReply(data) { reply ⇒
				reply match {
					case Some(List(Some(x: ByteString), Some(channel: ByteString), Some(message: ByteString))) if (x.utf8String == "message") ⇒

						val pubSubMessage = PubSubMessage(channel.utf8String, message.utf8String)
						getSubscribers(channel).map { x ⇒
							x ! pubSubMessage
						}
//ここから
					case None =>
						requesterQueue.dequeue ! Option(ByteString(""))
//ここまで追加
					case _ ⇒
						requesterQueue.dequeue ! (reply match {
							case Some(failure) if failure.isInstanceOf[Status.Failure] ⇒ failure
							case success ⇒ success
						})
				}
			}

これで使いやすくなりました

scalaのRedisクライアントbrandoを使ってみる(その2)

前回brandoを使用してRedisの値をGETできたので今回はSubscribeしてみます。

Redisサーバ側から値をプッシュし、それをPlayのWebsocketでブラウザ側に流すため用に特化しています

試行錯誤をかなりしましたが意外と単純でした。

ポイントは起動するクラスをActorとして作成し、PubSubMessageをReceiveするだけです


import akka.actor._
import akka.util._
import scala.concurrent._
import brando._


class MyPubSub extends Actor{
		implicit val timeout= Timeout(5000)
		val system=ActorSystem("brando")
		val brando = system.actorOf(Brando("localhost", 6379,None,None))	 

		brando ! Request("SUBSCRIBE","key")
		def receive={
			 case msg:PubSubMessage=>println("msg="+msg)
			 case _ => 
		 }
}

object PubSub {
	def main(args: Array[String]): Unit = {
		val system=ActorSystem("a")
		system.actorOf(Props(new MyPubSub()))
	}
}

これだけでした。

scalaのRedisクライアントbrandoを使ってみる

ちょっとはまったのでメモ。

scalaのRedisクライアントには何種類か有ります。

http://redis.io/clients

当初scala-redisを使っていたのですが、Pub/Subする際に、内部的にスレッドを生成しているようで、大量のPub/Subを作成したい場合(数千)にはOutOfMemoryで落ちてしまいます。

そこでActorモデルのbrandoを使用してみました。

ネット上にはほとんど情報もなくちょっとはまってしまったので記録しておきます

https://github.com/chrisdinn/brando

こちらからダウンロードしてsbt packageでJarを作成します。

あとakkaが必要なので2.3をダウンロードしておきます。

いろいろと試行錯誤し以下のコードでとれました

Pub/Subとかこれから確かめます

package brandoTest

import akka.actor._
import akka.util._
import akka.pattern.ask
import scala.concurrent._
import brando._

object Main {

	def main(args: Array[String]): Unit = {
		implicit val timeout= Timeout(5000)
		val system=ActorSystem("brando")
		val brando = system.actorOf(Brando("localhost", 6379,None,None))
		val future=brando.ask (Request("GET","key")).mapTo[Some[ByteString]]
		val res=Await.result(future,timeout.duration)
		res match{
			case Some(x)=>println(x.decodeString("UTF-8"))
		}

	}
}

PlayFramework+Redis+websocketでプッシュ配信(データ圧縮編)

Websocketに最近はまっています。

前回プッシュ配信をRedisのPub/Subを用いて実現した訳ですが、データ量が増えてくるとどうしても配信するデータを圧縮したくなります。

特にAWSなんかだと、従量課金となっていますので、データ転送量は少なければ少ないほど○

Websocket自体はまだデータの圧縮はサポートされていないとのことですので、自前でデータの圧縮伸長を実装してみました。

また、それだけではつまらないので、Reidsにチャンネルを作成し、ReidsのチャンネルごとにPub/Subする仕組みを入れています

図解するとちょっとわかりにくいですがこんな感じ

			user1							userActor			 redisActor	 sub		Redis
		|code1,code2| ---	|	 user1	 | ---	|	code1	| --- |
											 |					 |			|				 |
			user2						|					 | ---	|	code2	| --- |
		|code2,code3| ---	|	 user2	 | ---	|	code3	| --- |

User別に作成したActorと、ユーザが選択したチャンネル(code1,code2,code3)別に作成したActorを別々に作成し、チャンネルがReidsから更新された際には、購読しているUserのActorへ更新をかけるという感じです

こうすることにより、チャンネル数がユーザ数より圧倒的に少ない場合には、Reidsに対するコネクションも削減でき、効率が良くなります。

  • route
GET		 /board													 controllers.Application.board(code:Option[String])	 # 画面作成用
GET		 /board/data											controllers.Application.data(code) #WSでデータ取得用
GET		 /asset/javascripts/board.js		 #Javascript controllers.Application.boardJs(code:String)
  • Compress.scala

データ圧縮用に作成します。String型をZIP圧縮し、BASE64でエンコードします

package models

import java.util.zip._
import java.io._
import org.apache.commons.codec.binary._

object Compress {
	def encode(str:String):String={
		val out = new ByteArrayOutputStream()
		val defl = new DeflaterOutputStream(out, new Deflater(Deflater.BEST_COMPRESSION, true))
		defl.write(str.getBytes())
		defl.close()
		val compressed = Base64.encodeBase64(out.toByteArray())
		new String(compressed)		
	}
}
  • UserActor.scala
...
	
	def notifyAll(code:String,data:String){
		// 実際にWebSocketでブラウザに送るデータ
		val msg=JsObject(
				Seq(
						"code"->JsString(code),
						"data"->JsString(Compress.encode(data)) // JSONの一部データ部分のみ圧縮
..
  • Javascriptで解凍

JavaScriptでZIP解凍、Base64でコードするために↓からinfrate.js,base64.js,utf.jsなどをダウンロードしておきます。

http://www.onicos.com/staff/iz/amuse/javascript/expert/

@(code:String)(implicit r: RequestHeader)

$(function() {

		var WS = window['MozWebSocket'] ? MozWebSocket : WebSocket
		var socket = new WS("@routes.Application.data(code).webSocketURL()")

		
		var receiveEvent = function(event) {
				var data = JSON.parse(event.data)

				var bs=base64decode(data.data) // Base64デコード
				var dec=zip_inflate(bs)				// ZIP解凍
				var p=JSON.parse(dec)
				$("#data").text(p.data);
			 
		}

		socket.onmessage = receiveEvent

})

サンプルの一式は以下においておきました

https://github.com/anagotan/play_websocket_compressed

PlayFramework+Redis+websocketでプッシュ配信を作成してみる

Websocketを使ったサンプルだとチャットが多いのですが、実際にはチャットのアプリを作ることはあまり有りません。

どちらかというとサーバ側から配信するというような用途が多いのではないでしょうか?

PlayframeworkのサンプルにはWebsocketを使ったサンプルが付属しています。

これを改造してサーバ側から配信できるように改造してみました

  • 環境
    • MacOS10.9
    • jdk1.7
    • playframework2.2
    • redis2.8.4
    • scala2.10
  • playframeworkのチャットサンプル

brewでインストールした場合にはこちらに入っているので、作業エリアにコピーしておきます

/usr/local/Cellar/play/2.2.1/libexec/samples/scala/websocket-chat/

  • unicast

一斉配信したい場合にはサンプルで採用されているbroadcastをそのまま使用してもいいのですが、今回は一対一通信で配信したいのでunicastを使用します。サンプルはこちらを参考にしました。

http://satoshi-m8a.github.io/blog/2013/05/18/scala-concurrent-unicast/

  • build.sbt

scalaからredisに接続するためにはscala-redisライブラリを使用します。そのためbuild.sbtに以下を記述します

import play.Project._

name := "websocket-chat"

version := "1.0"

libraryDependencies ++= Seq(
	cache,
	"net.debasishg" % "redisclient_2.10" % "2.11"
)

playScalaSettings
  • conf/logger.xml

ログ出力用に追加します


		
	
	
	
		 ${application.home}/logs/application.log
		 
			 %date - [%level] - from %logger in %thread %n%message%n%xException%n
		 
	 

	
		
			%coloredLevel %d{HH:mm:ss.SSS} [%thread] %logger{15} - %message%n%xException{5}
		
	
	
	
	

	
		
		
	
	

  • conf/application.conf

redisの接続先をconfに書いておきます


redis.uri="http://localhost:6379/"
  • models/ChatRoom.scala

モデルをscala-redisのテストコードを参考に修正します

package models

import akka.actor._
import scala.concurrent.duration._

import play.api._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._

import akka.util.Timeout
import akka.pattern.ask

import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.iteratee.Concurrent.Channel

import com.redis._
import akka.actor.{ Actor, ActorSystem, Props }
case class PublishMessage(channel: String, message: String)
case class SubscribeMessage(channels: Array[String])
case class UnsubscribeMessage(channels: Array[String])
case object GoDown



object Robot {

	def apply(chatRoom: ActorRef) {

		// Create an Iteratee that logs all messages to the console.
		val loggerIteratee = Iteratee.foreach[JsValue](event => Logger("robot").info(event.toString))

		implicit val timeout = Timeout(1 second)
		// Make the robot join the room
		chatRoom ? (Join("Robot")) map {
			case Connected(robotChannel) =>
				// Apply this Enumerator on the logger.
				robotChannel |>> loggerIteratee
		}

		// Make the robot talk every 30 seconds
		Akka.system.scheduler.schedule(
			30 seconds,
			30 seconds,
			chatRoom,
			Talk("Robot", "I'm still alive")
		)
	}

}

object ChatRoom {

	implicit val timeout = Timeout(1 second)

	// username split ","	ex. "6758,9997"
	def join(username: String): scala.concurrent.Future[(Iteratee[JsValue, _], Enumerator[JsValue])] = {
		val roomActor = Akka.system.actorOf(Props[ChatRoom])
		
		roomActor ! SubscribeMessage(username.split(","))

		Robot(roomActor)

		(roomActor ? Join(username)).map {

			case Connected(enumerator) =>

				// Create an Iteratee to consume the feed
				val iteratee = Iteratee.foreach[JsValue] {
					event =>
						roomActor ! Talk(username, (event \ "text").as[String])
				}.mapDone {
					_ =>
						roomActor ! Quit(username)
				}

				(iteratee, enumerator)

			case CannotConnect(error) =>

				// Connection error

				// A finished Iteratee sending EOF
				val iteratee = Done[JsValue, Unit]((), Input.EOF)

				// Send an error and close the socket
				val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF))

				(iteratee, enumerator)

		}
	}
}

class ChatRoom extends Actor {
	println("starting subscription service ..")
	val system = ActorSystem("sub")
	val uri = new java.net.URI(Play.configuration.getString("redis.uri").get)
	val r = new RedisClient(uri.getHost,uri.getPort)
	val s = system.actorOf(Props(new Subscriber(r)))
	s ! Register(callback)
	
	def receive = {
		case SubscribeMessage(chs) => sub(chs)
		case UnsubscribeMessage(chs) => unsub(chs)
		case GoDown =>
			r.quit
			system.shutdown()
			system.awaitTermination()

		//case x => println("Got in Sub " + x)
		
		
		
		case Join(username) => {
			sender ! Connected(chatEnumerator)
		}

		case NotifyJoin(username) => {
			notifyAll("join", username, "has entered the room")
		}

		case Talk(username, text) => {
			notifyAll("talk", username, text)
		}

		case Quit(username) => {
			notifyAll("quit", username, "has left the room")
		}
	}

	def sub(channels: Array[String]) = {
		s ! Subscribe(channels.toArray)
	}

	def unsub(channels: Array[String]) = {
		s ! Unsubscribe(channels.toArray)
	}

	def callback(pubsub: PubSubMessage) = pubsub match {
		case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")
		case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
		case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
		case M(channel, msg) =>
			msg match {
				// exit will unsubscribe from all channels and stop subscription service
				case "exit" =>
					println("unsubscribe all ..")
					r.unsubscribe

				// message "+x" will subscribe to channel x
				case x if x startsWith "+" =>
					val s: Seq[Char] = x
					s match {
						case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
					}

				// message "-x" will unsubscribe from channel x
				case x if x startsWith "-" =>
					val s: Seq[Char] = x
					s match {
						case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
					}

				// other message receive
				case x =>
					println("received message on channel " + channel + " as : " + x)
					notifyAll("talk", channel, x)
			}
	}
	

	var chatChannel: Option[Channel[JsValue]] = None

	def onStart: Channel[JsValue] => Unit = {
		channel =>
			chatChannel = Some(channel)
			println("start")
			self ! NotifyJoin("you")
	}

	def onError: (String, Input[JsValue]) => Unit = {
		(message, input) =>
			println("onError " + message)
	}

	def onComplete = println("onComplete")

	val chatEnumerator = Concurrent.unicast[JsValue](onStart, onComplete, onError)
/*
	def receive = {

		case Join(username) => {
			sender ! Connected(chatEnumerator)
		}

		case NotifyJoin(username) => {
			notifyAll("join", username, "has entered the room")
		}

		case Talk(username, text) => {
			notifyAll("talk", username, text)
		}

		case Quit(username) => {
			notifyAll("quit", username, "has left the room")
		}

	}
	* 
	*/

	def notifyAll(kind: String, user: String, text: String) {
		val msg = JsObject(
			Seq(
				"kind" -> JsString(kind),
				"user" -> JsString(user),
				"message" -> JsString(text)
			)
		)
		chatChannel match {
			case Some(channel) => channel.push(msg)
			case _ => println("nothing")
		}
	}
}

case class Join(username: String)
case class Quit(username: String)
case class Talk(username: String, text: String)
case class NotifyJoin(username: String)
case class Connected(enumerator: Enumerator[JsValue])
case class CannotConnect(msg: String)

これで準備完了

  • 実行

play runで実行し、ユーザ名部分にカンマ区切りでキーを入力します。このキーはカンマ区切りで複数入力可能で、このキーがredisのキーとなります。

play run

画面が起動したら、a,b でログインしてみます

その後Reidsのコマンドで値を送ってみます

redis-cli publish a test

画面にa test が表示されます