Websocketを使ったサンプルだとチャットが多いのですが、実際にはチャットのアプリを作ることはあまり有りません。
どちらかというとサーバ側から配信するというような用途が多いのではないでしょうか?
PlayframeworkのサンプルにはWebsocketを使ったサンプルが付属しています。
これを改造してサーバ側から配信できるように改造してみました
- 環境
- MacOS10.9
- jdk1.7
- playframework2.2
- redis2.8.4
- scala2.10
- playframeworkのチャットサンプル
brewでインストールした場合にはこちらに入っているので、作業エリアにコピーしておきます
/usr/local/Cellar/play/2.2.1/libexec/samples/scala/websocket-chat/
- unicast
一斉配信したい場合にはサンプルで採用されているbroadcastをそのまま使用してもいいのですが、今回は一対一通信で配信したいのでunicastを使用します。サンプルはこちらを参考にしました。
http://satoshi-m8a.github.io/blog/2013/05/18/scala-concurrent-unicast/
- build.sbt
scalaからredisに接続するためにはscala-redisライブラリを使用します。そのためbuild.sbtに以下を記述します
1 2 3 4 5 6 7 8 9 10 11 12 | import play.Project._ name := "websocket-chat" version := "1.0" libraryDependencies ++= Seq( cache, "net.debasishg" % "redisclient_2.10" % "2.11" ) playScalaSettings |
- conf/logger.xml
ログ出力用に追加します
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | <configuration> <conversionRule conversionWord="coloredLevel" converterClass="play.api.Logger$ColoredLevel" /> <appender name="FILE" class="ch.qos.logback.core.FileAppender"> <file>${application.home}/logs/application.log</file> <encoder> <pattern>%date - [%level] - from %logger in %thread %n%message%n%xException%n</pattern> </encoder> </appender> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%coloredLevel %d{HH:mm:ss.SSS} [%thread] %logger{15} - %message%n%xException{5}</pattern> </encoder> </appender> <logger name="play" level="INFO" /> <logger name="application" level="INFO" /> <root level="ERROR"> <appender-ref ref="STDOUT" /> <appender-ref ref="FILE" /> </root> </configuration> |
- conf/application.conf
redisの接続先をconfに書いておきます
1 | redis.uri="http://localhost:6379/" |
- models/ChatRoom.scala
モデルをscala-redisのテストコードを参考に修正します
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | package models import akka.actor._ import scala.concurrent.duration._ import play.api._ import play.api.libs.json._ import play.api.libs.iteratee._ import play.api.libs.concurrent._ import akka.util.Timeout import akka.pattern.ask import play.api.Play.current import play.api.libs.concurrent.Execution.Implicits._ import play.api.libs.iteratee.Concurrent.Channel import com.redis._ import akka.actor.{ Actor, ActorSystem, Props } case class PublishMessage(channel: String, message: String) case class SubscribeMessage(channels: Array[String]) case class UnsubscribeMessage(channels: Array[String]) case object GoDown object Robot { def apply(chatRoom: ActorRef) { // Create an Iteratee that logs all messages to the console. val loggerIteratee = Iteratee.foreach[JsValue](event => Logger("robot").info(event.toString)) implicit val timeout = Timeout(1 second) // Make the robot join the room chatRoom ? (Join("Robot")) map { case Connected(robotChannel) => // Apply this Enumerator on the logger. robotChannel |>> loggerIteratee } // Make the robot talk every 30 seconds Akka.system.scheduler.schedule( 30 seconds, 30 seconds, chatRoom, Talk("Robot", "I'm still alive") ) } } object ChatRoom { implicit val timeout = Timeout(1 second) // username split "," ex. "6758,9997" def join(username: String): scala.concurrent.Future[(Iteratee[JsValue, _], Enumerator[JsValue])] = { val roomActor = Akka.system.actorOf(Props[ChatRoom]) roomActor ! SubscribeMessage(username.split(",")) Robot(roomActor) (roomActor ? Join(username)).map { case Connected(enumerator) => // Create an Iteratee to consume the feed val iteratee = Iteratee.foreach[JsValue] { event => roomActor ! Talk(username, (event \ "text").as[String]) }.mapDone { _ => roomActor ! Quit(username) } (iteratee, enumerator) case CannotConnect(error) => // Connection error // A finished Iteratee sending EOF val iteratee = Done[JsValue, Unit]((), Input.EOF) // Send an error and close the socket val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF)) (iteratee, enumerator) } } } class ChatRoom extends Actor { println("starting subscription service ..") val system = ActorSystem("sub") val uri = new java.net.URI(Play.configuration.getString("redis.uri").get) val r = new RedisClient(uri.getHost,uri.getPort) val s = system.actorOf(Props(new Subscriber(r))) s ! Register(callback) def receive = { case SubscribeMessage(chs) => sub(chs) case UnsubscribeMessage(chs) => unsub(chs) case GoDown => r.quit system.shutdown() system.awaitTermination() //case x => println("Got in Sub " + x) case Join(username) => { sender ! Connected(chatEnumerator) } case NotifyJoin(username) => { notifyAll("join", username, "has entered the room") } case Talk(username, text) => { notifyAll("talk", username, text) } case Quit(username) => { notifyAll("quit", username, "has left the room") } } def sub(channels: Array[String]) = { s ! Subscribe(channels.toArray) } def unsub(channels: Array[String]) = { s ! Unsubscribe(channels.toArray) } def callback(pubsub: PubSubMessage) = pubsub match { case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup") case S(channel, no) => println("subscribed to " + channel + " and count = " + no) case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) case M(channel, msg) => msg match { // exit will unsubscribe from all channels and stop subscription service case "exit" => println("unsubscribe all ..") r.unsubscribe // message "+x" will subscribe to channel x case x if x startsWith "+" => val s: Seq[Char] = x s match { case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } } // message "-x" will unsubscribe from channel x case x if x startsWith "-" => val s: Seq[Char] = x s match { case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) } // other message receive case x => println("received message on channel " + channel + " as : " + x) notifyAll("talk", channel, x) } } var chatChannel: Option[Channel[JsValue]] = None def onStart: Channel[JsValue] => Unit = { channel => chatChannel = Some(channel) println("start") self ! NotifyJoin("you") } def onError: (String, Input[JsValue]) => Unit = { (message, input) => println("onError " + message) } def onComplete = println("onComplete") val chatEnumerator = Concurrent.unicast[JsValue](onStart, onComplete, onError) /* def receive = { case Join(username) => { sender ! Connected(chatEnumerator) } case NotifyJoin(username) => { notifyAll("join", username, "has entered the room") } case Talk(username, text) => { notifyAll("talk", username, text) } case Quit(username) => { notifyAll("quit", username, "has left the room") } } * */ def notifyAll(kind: String, user: String, text: String) { val msg = JsObject( Seq( "kind" -> JsString(kind), "user" -> JsString(user), "message" -> JsString(text) ) ) chatChannel match { case Some(channel) => channel.push(msg) case _ => println("nothing") } } } case class Join(username: String) case class Quit(username: String) case class Talk(username: String, text: String) case class NotifyJoin(username: String) case class Connected(enumerator: Enumerator[JsValue]) case class CannotConnect(msg: String) |
これで準備完了
- 実行
play runで実行し、ユーザ名部分にカンマ区切りでキーを入力します。このキーはカンマ区切りで複数入力可能で、このキーがredisのキーとなります。
1 | play run |
画面が起動したら、a,b でログインしてみます
その後Reidsのコマンドで値を送ってみます
1 | redis-cli publish a test |
画面にa test が表示されます