Websocketを使ったサンプルだとチャットが多いのですが、実際にはチャットのアプリを作ることはあまり有りません。
どちらかというとサーバ側から配信するというような用途が多いのではないでしょうか?
PlayframeworkのサンプルにはWebsocketを使ったサンプルが付属しています。
これを改造してサーバ側から配信できるように改造してみました
- 環境
- MacOS10.9
- jdk1.7
- playframework2.2
- redis2.8.4
- scala2.10
- playframeworkのチャットサンプル
brewでインストールした場合にはこちらに入っているので、作業エリアにコピーしておきます
/usr/local/Cellar/play/2.2.1/libexec/samples/scala/websocket-chat/
- unicast
一斉配信したい場合にはサンプルで採用されているbroadcastをそのまま使用してもいいのですが、今回は一対一通信で配信したいのでunicastを使用します。サンプルはこちらを参考にしました。
http://satoshi-m8a.github.io/blog/2013/05/18/scala-concurrent-unicast/
- build.sbt
scalaからredisに接続するためにはscala-redisライブラリを使用します。そのためbuild.sbtに以下を記述します
import play.Project._ name := "websocket-chat" version := "1.0" libraryDependencies ++= Seq( cache, "net.debasishg" % "redisclient_2.10" % "2.11" ) playScalaSettings
- conf/logger.xml
ログ出力用に追加します
${application.home}/logs/application.log %date - [%level] - from %logger in %thread %n%message%n%xException%n %coloredLevel %d{HH:mm:ss.SSS} [%thread] %logger{15} - %message%n%xException{5}
- conf/application.conf
redisの接続先をconfに書いておきます
redis.uri="http://localhost:6379/"
- models/ChatRoom.scala
モデルをscala-redisのテストコードを参考に修正します
package models import akka.actor._ import scala.concurrent.duration._ import play.api._ import play.api.libs.json._ import play.api.libs.iteratee._ import play.api.libs.concurrent._ import akka.util.Timeout import akka.pattern.ask import play.api.Play.current import play.api.libs.concurrent.Execution.Implicits._ import play.api.libs.iteratee.Concurrent.Channel import com.redis._ import akka.actor.{ Actor, ActorSystem, Props } case class PublishMessage(channel: String, message: String) case class SubscribeMessage(channels: Array[String]) case class UnsubscribeMessage(channels: Array[String]) case object GoDown object Robot { def apply(chatRoom: ActorRef) { // Create an Iteratee that logs all messages to the console. val loggerIteratee = Iteratee.foreach[JsValue](event => Logger("robot").info(event.toString)) implicit val timeout = Timeout(1 second) // Make the robot join the room chatRoom ? (Join("Robot")) map { case Connected(robotChannel) => // Apply this Enumerator on the logger. robotChannel |>> loggerIteratee } // Make the robot talk every 30 seconds Akka.system.scheduler.schedule( 30 seconds, 30 seconds, chatRoom, Talk("Robot", "I'm still alive") ) } } object ChatRoom { implicit val timeout = Timeout(1 second) // username split "," ex. "6758,9997" def join(username: String): scala.concurrent.Future[(Iteratee[JsValue, _], Enumerator[JsValue])] = { val roomActor = Akka.system.actorOf(Props[ChatRoom]) roomActor ! SubscribeMessage(username.split(",")) Robot(roomActor) (roomActor ? Join(username)).map { case Connected(enumerator) => // Create an Iteratee to consume the feed val iteratee = Iteratee.foreach[JsValue] { event => roomActor ! Talk(username, (event \ "text").as[String]) }.mapDone { _ => roomActor ! Quit(username) } (iteratee, enumerator) case CannotConnect(error) => // Connection error // A finished Iteratee sending EOF val iteratee = Done[JsValue, Unit]((), Input.EOF) // Send an error and close the socket val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF)) (iteratee, enumerator) } } } class ChatRoom extends Actor { println("starting subscription service ..") val system = ActorSystem("sub") val uri = new java.net.URI(Play.configuration.getString("redis.uri").get) val r = new RedisClient(uri.getHost,uri.getPort) val s = system.actorOf(Props(new Subscriber(r))) s ! Register(callback) def receive = { case SubscribeMessage(chs) => sub(chs) case UnsubscribeMessage(chs) => unsub(chs) case GoDown => r.quit system.shutdown() system.awaitTermination() //case x => println("Got in Sub " + x) case Join(username) => { sender ! Connected(chatEnumerator) } case NotifyJoin(username) => { notifyAll("join", username, "has entered the room") } case Talk(username, text) => { notifyAll("talk", username, text) } case Quit(username) => { notifyAll("quit", username, "has left the room") } } def sub(channels: Array[String]) = { s ! Subscribe(channels.toArray) } def unsub(channels: Array[String]) = { s ! Unsubscribe(channels.toArray) } def callback(pubsub: PubSubMessage) = pubsub match { case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup") case S(channel, no) => println("subscribed to " + channel + " and count = " + no) case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) case M(channel, msg) => msg match { // exit will unsubscribe from all channels and stop subscription service case "exit" => println("unsubscribe all ..") r.unsubscribe // message "+x" will subscribe to channel x case x if x startsWith "+" => val s: Seq[Char] = x s match { case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } } // message "-x" will unsubscribe from channel x case x if x startsWith "-" => val s: Seq[Char] = x s match { case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) } // other message receive case x => println("received message on channel " + channel + " as : " + x) notifyAll("talk", channel, x) } } var chatChannel: Option[Channel[JsValue]] = None def onStart: Channel[JsValue] => Unit = { channel => chatChannel = Some(channel) println("start") self ! NotifyJoin("you") } def onError: (String, Input[JsValue]) => Unit = { (message, input) => println("onError " + message) } def onComplete = println("onComplete") val chatEnumerator = Concurrent.unicast[JsValue](onStart, onComplete, onError) /* def receive = { case Join(username) => { sender ! Connected(chatEnumerator) } case NotifyJoin(username) => { notifyAll("join", username, "has entered the room") } case Talk(username, text) => { notifyAll("talk", username, text) } case Quit(username) => { notifyAll("quit", username, "has left the room") } } * */ def notifyAll(kind: String, user: String, text: String) { val msg = JsObject( Seq( "kind" -> JsString(kind), "user" -> JsString(user), "message" -> JsString(text) ) ) chatChannel match { case Some(channel) => channel.push(msg) case _ => println("nothing") } } } case class Join(username: String) case class Quit(username: String) case class Talk(username: String, text: String) case class NotifyJoin(username: String) case class Connected(enumerator: Enumerator[JsValue]) case class CannotConnect(msg: String)
これで準備完了
- 実行
play runで実行し、ユーザ名部分にカンマ区切りでキーを入力します。このキーはカンマ区切りで複数入力可能で、このキーがredisのキーとなります。
play run
画面が起動したら、a,b でログインしてみます
その後Reidsのコマンドで値を送ってみます
redis-cli publish a test
画面にa test が表示されます