Websocketを使ったサンプルだとチャットが多いのですが、実際にはチャットのアプリを作ることはあまり有りません。
どちらかというとサーバ側から配信するというような用途が多いのではないでしょうか?
PlayframeworkのサンプルにはWebsocketを使ったサンプルが付属しています。
これを改造してサーバ側から配信できるように改造してみました
- 環境
- MacOS10.9
- jdk1.7
- playframework2.2
- redis2.8.4
- scala2.10
brewでインストールした場合にはこちらに入っているので、作業エリアにコピーしておきます
/usr/local/Cellar/play/2.2.1/libexec/samples/scala/websocket-chat/
一斉配信したい場合にはサンプルで採用されているbroadcastをそのまま使用してもいいのですが、今回は一対一通信で配信したいのでunicastを使用します。サンプルはこちらを参考にしました。
http://satoshi-m8a.github.io/blog/2013/05/18/scala-concurrent-unicast/
scalaからredisに接続するためにはscala-redisライブラリを使用します。そのためbuild.sbtに以下を記述します
import play.Project._
name := "websocket-chat"
version := "1.0"
libraryDependencies ++= Seq(
cache,
"net.debasishg" % "redisclient_2.10" % "2.11"
)
playScalaSettings
ログ出力用に追加します
${application.home}/logs/application.log
%date - [%level] - from %logger in %thread %n%message%n%xException%n
%coloredLevel %d{HH:mm:ss.SSS} [%thread] %logger{15} - %message%n%xException{5}
redisの接続先をconfに書いておきます
redis.uri="http://localhost:6379/"
モデルをscala-redisのテストコードを参考に修正します
package models
import akka.actor._
import scala.concurrent.duration._
import play.api._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import akka.util.Timeout
import akka.pattern.ask
import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.iteratee.Concurrent.Channel
import com.redis._
import akka.actor.{ Actor, ActorSystem, Props }
case class PublishMessage(channel: String, message: String)
case class SubscribeMessage(channels: Array[String])
case class UnsubscribeMessage(channels: Array[String])
case object GoDown
object Robot {
def apply(chatRoom: ActorRef) {
// Create an Iteratee that logs all messages to the console.
val loggerIteratee = Iteratee.foreach[JsValue](event => Logger("robot").info(event.toString))
implicit val timeout = Timeout(1 second)
// Make the robot join the room
chatRoom ? (Join("Robot")) map {
case Connected(robotChannel) =>
// Apply this Enumerator on the logger.
robotChannel |>> loggerIteratee
}
// Make the robot talk every 30 seconds
Akka.system.scheduler.schedule(
30 seconds,
30 seconds,
chatRoom,
Talk("Robot", "I'm still alive")
)
}
}
object ChatRoom {
implicit val timeout = Timeout(1 second)
// username split "," ex. "6758,9997"
def join(username: String): scala.concurrent.Future[(Iteratee[JsValue, _], Enumerator[JsValue])] = {
val roomActor = Akka.system.actorOf(Props[ChatRoom])
roomActor ! SubscribeMessage(username.split(","))
Robot(roomActor)
(roomActor ? Join(username)).map {
case Connected(enumerator) =>
// Create an Iteratee to consume the feed
val iteratee = Iteratee.foreach[JsValue] {
event =>
roomActor ! Talk(username, (event \ "text").as[String])
}.mapDone {
_ =>
roomActor ! Quit(username)
}
(iteratee, enumerator)
case CannotConnect(error) =>
// Connection error
// A finished Iteratee sending EOF
val iteratee = Done[JsValue, Unit]((), Input.EOF)
// Send an error and close the socket
val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF))
(iteratee, enumerator)
}
}
}
class ChatRoom extends Actor {
println("starting subscription service ..")
val system = ActorSystem("sub")
val uri = new java.net.URI(Play.configuration.getString("redis.uri").get)
val r = new RedisClient(uri.getHost,uri.getPort)
val s = system.actorOf(Props(new Subscriber(r)))
s ! Register(callback)
def receive = {
case SubscribeMessage(chs) => sub(chs)
case UnsubscribeMessage(chs) => unsub(chs)
case GoDown =>
r.quit
system.shutdown()
system.awaitTermination()
//case x => println("Got in Sub " + x)
case Join(username) => {
sender ! Connected(chatEnumerator)
}
case NotifyJoin(username) => {
notifyAll("join", username, "has entered the room")
}
case Talk(username, text) => {
notifyAll("talk", username, text)
}
case Quit(username) => {
notifyAll("quit", username, "has left the room")
}
}
def sub(channels: Array[String]) = {
s ! Subscribe(channels.toArray)
}
def unsub(channels: Array[String]) = {
s ! Unsubscribe(channels.toArray)
}
def callback(pubsub: PubSubMessage) = pubsub match {
case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}
// other message receive
case x =>
println("received message on channel " + channel + " as : " + x)
notifyAll("talk", channel, x)
}
}
var chatChannel: Option[Channel[JsValue]] = None
def onStart: Channel[JsValue] => Unit = {
channel =>
chatChannel = Some(channel)
println("start")
self ! NotifyJoin("you")
}
def onError: (String, Input[JsValue]) => Unit = {
(message, input) =>
println("onError " + message)
}
def onComplete = println("onComplete")
val chatEnumerator = Concurrent.unicast[JsValue](onStart, onComplete, onError)
/*
def receive = {
case Join(username) => {
sender ! Connected(chatEnumerator)
}
case NotifyJoin(username) => {
notifyAll("join", username, "has entered the room")
}
case Talk(username, text) => {
notifyAll("talk", username, text)
}
case Quit(username) => {
notifyAll("quit", username, "has left the room")
}
}
*
*/
def notifyAll(kind: String, user: String, text: String) {
val msg = JsObject(
Seq(
"kind" -> JsString(kind),
"user" -> JsString(user),
"message" -> JsString(text)
)
)
chatChannel match {
case Some(channel) => channel.push(msg)
case _ => println("nothing")
}
}
}
case class Join(username: String)
case class Quit(username: String)
case class Talk(username: String, text: String)
case class NotifyJoin(username: String)
case class Connected(enumerator: Enumerator[JsValue])
case class CannotConnect(msg: String)
これで準備完了
play runで実行し、ユーザ名部分にカンマ区切りでキーを入力します。このキーはカンマ区切りで複数入力可能で、このキーがredisのキーとなります。
play run
画面が起動したら、a,b でログインしてみます
その後Reidsのコマンドで値を送ってみます
redis-cli publish a test
画面にa test が表示されます