scalaのRedisクライアントbrandoを使ってみる(その3)

PlayFramework2.2でscalaのRedisクライアントbrandoを使用する際にはちょっと注意が必要です。

brando1.0はakka2.3対応らしく、Play2.2ではまだakka2.2の対応となっているためそのままでは動きません。

仕方がないのでbrando0.3にバージョンを落として使用します。

また前回のサンプルだと、キーに値がない場合にはエラーになってしまいます。

そこでbrando0.3を少し改造します

  • brand.Brando.scala
		case Tcp.Received(data) ⇒
			parseReply(data) { reply ⇒
				reply match {
					case Some(List(Some(x: ByteString), Some(channel: ByteString), Some(message: ByteString))) if (x.utf8String == "message") ⇒

						val pubSubMessage = PubSubMessage(channel.utf8String, message.utf8String)
						getSubscribers(channel).map { x ⇒
							x ! pubSubMessage
						}
//ここから
					case None =>
						requesterQueue.dequeue ! Option(ByteString(""))
//ここまで追加
					case _ ⇒
						requesterQueue.dequeue ! (reply match {
							case Some(failure) if failure.isInstanceOf[Status.Failure] ⇒ failure
							case success ⇒ success
						})
				}
			}

これで使いやすくなりました

scalaのRedisクライアントbrandoを使ってみる(その2)

前回brandoを使用してRedisの値をGETできたので今回はSubscribeしてみます。

Redisサーバ側から値をプッシュし、それをPlayのWebsocketでブラウザ側に流すため用に特化しています

試行錯誤をかなりしましたが意外と単純でした。

ポイントは起動するクラスをActorとして作成し、PubSubMessageをReceiveするだけです


import akka.actor._
import akka.util._
import scala.concurrent._
import brando._


class MyPubSub extends Actor{
		implicit val timeout= Timeout(5000)
		val system=ActorSystem("brando")
		val brando = system.actorOf(Brando("localhost", 6379,None,None))	 

		brando ! Request("SUBSCRIBE","key")
		def receive={
			 case msg:PubSubMessage=>println("msg="+msg)
			 case _ => 
		 }
}

object PubSub {
	def main(args: Array[String]): Unit = {
		val system=ActorSystem("a")
		system.actorOf(Props(new MyPubSub()))
	}
}

これだけでした。

scalaのRedisクライアントbrandoを使ってみる

ちょっとはまったのでメモ。

scalaのRedisクライアントには何種類か有ります。

http://redis.io/clients

当初scala-redisを使っていたのですが、Pub/Subする際に、内部的にスレッドを生成しているようで、大量のPub/Subを作成したい場合(数千)にはOutOfMemoryで落ちてしまいます。

そこでActorモデルのbrandoを使用してみました。

ネット上にはほとんど情報もなくちょっとはまってしまったので記録しておきます

https://github.com/chrisdinn/brando

こちらからダウンロードしてsbt packageでJarを作成します。

あとakkaが必要なので2.3をダウンロードしておきます。

いろいろと試行錯誤し以下のコードでとれました

Pub/Subとかこれから確かめます

package brandoTest

import akka.actor._
import akka.util._
import akka.pattern.ask
import scala.concurrent._
import brando._

object Main {

	def main(args: Array[String]): Unit = {
		implicit val timeout= Timeout(5000)
		val system=ActorSystem("brando")
		val brando = system.actorOf(Brando("localhost", 6379,None,None))
		val future=brando.ask (Request("GET","key")).mapTo[Some[ByteString]]
		val res=Await.result(future,timeout.duration)
		res match{
			case Some(x)=>println(x.decodeString("UTF-8"))
		}

	}
}