PlayFramework+Redis+websocketでプッシュ配信を作成してみる

Websocketを使ったサンプルだとチャットが多いのですが、実際にはチャットのアプリを作ることはあまり有りません。

どちらかというとサーバ側から配信するというような用途が多いのではないでしょうか?

PlayframeworkのサンプルにはWebsocketを使ったサンプルが付属しています。

これを改造してサーバ側から配信できるように改造してみました

  • 環境
    • MacOS10.9
    • jdk1.7
    • playframework2.2
    • redis2.8.4
    • scala2.10
  • playframeworkのチャットサンプル

brewでインストールした場合にはこちらに入っているので、作業エリアにコピーしておきます

/usr/local/Cellar/play/2.2.1/libexec/samples/scala/websocket-chat/

  • unicast

一斉配信したい場合にはサンプルで採用されているbroadcastをそのまま使用してもいいのですが、今回は一対一通信で配信したいのでunicastを使用します。サンプルはこちらを参考にしました。

http://satoshi-m8a.github.io/blog/2013/05/18/scala-concurrent-unicast/

  • build.sbt

scalaからredisに接続するためにはscala-redisライブラリを使用します。そのためbuild.sbtに以下を記述します

import play.Project._

name := "websocket-chat"

version := "1.0"

libraryDependencies ++= Seq(
	cache,
	"net.debasishg" % "redisclient_2.10" % "2.11"
)

playScalaSettings
  • conf/logger.xml

ログ出力用に追加します


		
	
	
	
		 ${application.home}/logs/application.log
		 
			 %date - [%level] - from %logger in %thread %n%message%n%xException%n
		 
	 

	
		
			%coloredLevel %d{HH:mm:ss.SSS} [%thread] %logger{15} - %message%n%xException{5}
		
	
	
	
	

	
		
		
	
	

  • conf/application.conf

redisの接続先をconfに書いておきます


redis.uri="http://localhost:6379/"
  • models/ChatRoom.scala

モデルをscala-redisのテストコードを参考に修正します

package models

import akka.actor._
import scala.concurrent.duration._

import play.api._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._

import akka.util.Timeout
import akka.pattern.ask

import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.iteratee.Concurrent.Channel

import com.redis._
import akka.actor.{ Actor, ActorSystem, Props }
case class PublishMessage(channel: String, message: String)
case class SubscribeMessage(channels: Array[String])
case class UnsubscribeMessage(channels: Array[String])
case object GoDown



object Robot {

	def apply(chatRoom: ActorRef) {

		// Create an Iteratee that logs all messages to the console.
		val loggerIteratee = Iteratee.foreach[JsValue](event => Logger("robot").info(event.toString))

		implicit val timeout = Timeout(1 second)
		// Make the robot join the room
		chatRoom ? (Join("Robot")) map {
			case Connected(robotChannel) =>
				// Apply this Enumerator on the logger.
				robotChannel |>> loggerIteratee
		}

		// Make the robot talk every 30 seconds
		Akka.system.scheduler.schedule(
			30 seconds,
			30 seconds,
			chatRoom,
			Talk("Robot", "I'm still alive")
		)
	}

}

object ChatRoom {

	implicit val timeout = Timeout(1 second)

	// username split ","	ex. "6758,9997"
	def join(username: String): scala.concurrent.Future[(Iteratee[JsValue, _], Enumerator[JsValue])] = {
		val roomActor = Akka.system.actorOf(Props[ChatRoom])
		
		roomActor ! SubscribeMessage(username.split(","))

		Robot(roomActor)

		(roomActor ? Join(username)).map {

			case Connected(enumerator) =>

				// Create an Iteratee to consume the feed
				val iteratee = Iteratee.foreach[JsValue] {
					event =>
						roomActor ! Talk(username, (event \ "text").as[String])
				}.mapDone {
					_ =>
						roomActor ! Quit(username)
				}

				(iteratee, enumerator)

			case CannotConnect(error) =>

				// Connection error

				// A finished Iteratee sending EOF
				val iteratee = Done[JsValue, Unit]((), Input.EOF)

				// Send an error and close the socket
				val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF))

				(iteratee, enumerator)

		}
	}
}

class ChatRoom extends Actor {
	println("starting subscription service ..")
	val system = ActorSystem("sub")
	val uri = new java.net.URI(Play.configuration.getString("redis.uri").get)
	val r = new RedisClient(uri.getHost,uri.getPort)
	val s = system.actorOf(Props(new Subscriber(r)))
	s ! Register(callback)
	
	def receive = {
		case SubscribeMessage(chs) => sub(chs)
		case UnsubscribeMessage(chs) => unsub(chs)
		case GoDown =>
			r.quit
			system.shutdown()
			system.awaitTermination()

		//case x => println("Got in Sub " + x)
		
		
		
		case Join(username) => {
			sender ! Connected(chatEnumerator)
		}

		case NotifyJoin(username) => {
			notifyAll("join", username, "has entered the room")
		}

		case Talk(username, text) => {
			notifyAll("talk", username, text)
		}

		case Quit(username) => {
			notifyAll("quit", username, "has left the room")
		}
	}

	def sub(channels: Array[String]) = {
		s ! Subscribe(channels.toArray)
	}

	def unsub(channels: Array[String]) = {
		s ! Unsubscribe(channels.toArray)
	}

	def callback(pubsub: PubSubMessage) = pubsub match {
		case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")
		case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
		case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
		case M(channel, msg) =>
			msg match {
				// exit will unsubscribe from all channels and stop subscription service
				case "exit" =>
					println("unsubscribe all ..")
					r.unsubscribe

				// message "+x" will subscribe to channel x
				case x if x startsWith "+" =>
					val s: Seq[Char] = x
					s match {
						case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
					}

				// message "-x" will unsubscribe from channel x
				case x if x startsWith "-" =>
					val s: Seq[Char] = x
					s match {
						case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
					}

				// other message receive
				case x =>
					println("received message on channel " + channel + " as : " + x)
					notifyAll("talk", channel, x)
			}
	}
	

	var chatChannel: Option[Channel[JsValue]] = None

	def onStart: Channel[JsValue] => Unit = {
		channel =>
			chatChannel = Some(channel)
			println("start")
			self ! NotifyJoin("you")
	}

	def onError: (String, Input[JsValue]) => Unit = {
		(message, input) =>
			println("onError " + message)
	}

	def onComplete = println("onComplete")

	val chatEnumerator = Concurrent.unicast[JsValue](onStart, onComplete, onError)
/*
	def receive = {

		case Join(username) => {
			sender ! Connected(chatEnumerator)
		}

		case NotifyJoin(username) => {
			notifyAll("join", username, "has entered the room")
		}

		case Talk(username, text) => {
			notifyAll("talk", username, text)
		}

		case Quit(username) => {
			notifyAll("quit", username, "has left the room")
		}

	}
	* 
	*/

	def notifyAll(kind: String, user: String, text: String) {
		val msg = JsObject(
			Seq(
				"kind" -> JsString(kind),
				"user" -> JsString(user),
				"message" -> JsString(text)
			)
		)
		chatChannel match {
			case Some(channel) => channel.push(msg)
			case _ => println("nothing")
		}
	}
}

case class Join(username: String)
case class Quit(username: String)
case class Talk(username: String, text: String)
case class NotifyJoin(username: String)
case class Connected(enumerator: Enumerator[JsValue])
case class CannotConnect(msg: String)

これで準備完了

  • 実行

play runで実行し、ユーザ名部分にカンマ区切りでキーを入力します。このキーはカンマ区切りで複数入力可能で、このキーがredisのキーとなります。

play run

画面が起動したら、a,b でログインしてみます

その後Reidsのコマンドで値を送ってみます

redis-cli publish a test

画面にa test が表示されます