package models
import akka.actor._
import scala.concurrent.duration._
import play.api._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import akka.util.Timeout
import akka.pattern.ask
import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.iteratee.Concurrent.Channel
import com.redis._
import akka.actor.{ Actor, ActorSystem, Props }
case class PublishMessage(channel: String, message: String)
case class SubscribeMessage(channels: Array[String])
case class UnsubscribeMessage(channels: Array[String])
case object GoDown
object Robot {
def apply(chatRoom: ActorRef) {
// Create an Iteratee that logs all messages to the console.
val loggerIteratee = Iteratee.foreach[JsValue](event => Logger("robot").info(event.toString))
implicit val timeout = Timeout(1 second)
// Make the robot join the room
chatRoom ? (Join("Robot")) map {
case Connected(robotChannel) =>
// Apply this Enumerator on the logger.
robotChannel |>> loggerIteratee
}
// Make the robot talk every 30 seconds
Akka.system.scheduler.schedule(
30 seconds,
30 seconds,
chatRoom,
Talk("Robot", "I'm still alive")
)
}
}
object ChatRoom {
implicit val timeout = Timeout(1 second)
// username split "," ex. "6758,9997"
def join(username: String): scala.concurrent.Future[(Iteratee[JsValue, _], Enumerator[JsValue])] = {
val roomActor = Akka.system.actorOf(Props[ChatRoom])
roomActor ! SubscribeMessage(username.split(","))
Robot(roomActor)
(roomActor ? Join(username)).map {
case Connected(enumerator) =>
// Create an Iteratee to consume the feed
val iteratee = Iteratee.foreach[JsValue] {
event =>
roomActor ! Talk(username, (event \ "text").as[String])
}.mapDone {
_ =>
roomActor ! Quit(username)
}
(iteratee, enumerator)
case CannotConnect(error) =>
// Connection error
// A finished Iteratee sending EOF
val iteratee = Done[JsValue, Unit]((), Input.EOF)
// Send an error and close the socket
val enumerator = Enumerator[JsValue](JsObject(Seq("error" -> JsString(error)))).andThen(Enumerator.enumInput(Input.EOF))
(iteratee, enumerator)
}
}
}
class ChatRoom extends Actor {
println("starting subscription service ..")
val system = ActorSystem("sub")
val uri = new java.net.URI(Play.configuration.getString("redis.uri").get)
val r = new RedisClient(uri.getHost,uri.getPort)
val s = system.actorOf(Props(new Subscriber(r)))
s ! Register(callback)
def receive = {
case SubscribeMessage(chs) => sub(chs)
case UnsubscribeMessage(chs) => unsub(chs)
case GoDown =>
r.quit
system.shutdown()
system.awaitTermination()
//case x => println("Got in Sub " + x)
case Join(username) => {
sender ! Connected(chatEnumerator)
}
case NotifyJoin(username) => {
notifyAll("join", username, "has entered the room")
}
case Talk(username, text) => {
notifyAll("talk", username, text)
}
case Quit(username) => {
notifyAll("quit", username, "has left the room")
}
}
def sub(channels: Array[String]) = {
s ! Subscribe(channels.toArray)
}
def unsub(channels: Array[String]) = {
s ! Unsubscribe(channels.toArray)
}
def callback(pubsub: PubSubMessage) = pubsub match {
case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}
// other message receive
case x =>
println("received message on channel " + channel + " as : " + x)
notifyAll("talk", channel, x)
}
}
var chatChannel: Option[Channel[JsValue]] = None
def onStart: Channel[JsValue] => Unit = {
channel =>
chatChannel = Some(channel)
println("start")
self ! NotifyJoin("you")
}
def onError: (String, Input[JsValue]) => Unit = {
(message, input) =>
println("onError " + message)
}
def onComplete = println("onComplete")
val chatEnumerator = Concurrent.unicast[JsValue](onStart, onComplete, onError)
/*
def receive = {
case Join(username) => {
sender ! Connected(chatEnumerator)
}
case NotifyJoin(username) => {
notifyAll("join", username, "has entered the room")
}
case Talk(username, text) => {
notifyAll("talk", username, text)
}
case Quit(username) => {
notifyAll("quit", username, "has left the room")
}
}
*
*/
def notifyAll(kind: String, user: String, text: String) {
val msg = JsObject(
Seq(
"kind" -> JsString(kind),
"user" -> JsString(user),
"message" -> JsString(text)
)
)
chatChannel match {
case Some(channel) => channel.push(msg)
case _ => println("nothing")
}
}
}
case class Join(username: String)
case class Quit(username: String)
case class Talk(username: String, text: String)
case class NotifyJoin(username: String)
case class Connected(enumerator: Enumerator[JsValue])
case class CannotConnect(msg: String)