SubPub
In Memory Pub/Sub Engine using Akka Actors and Streams.
Setup
Latest Version: 0.1.9
"io.github.d-exclaimation" %% "subpub" % latestVersion
Feature
SubPub main goals are to:
- Handle creation, distribution, management of both the outgoing stream of data and incoming published data.
- Differentiate streams based on topic, which can used to push the proper data into the proper streams.
- Also handles creation in a lazy and concurrent safe way, no need to worry about race conditions.
- All of the above on lightweight code embedded implementation.
Consideration
Similar to PubSub
from graphql-subscriptions, SubPub is also
an in-memory event streaming system that only supports a single server instance. On a production environment, it is
strongly recommended to use other implementation that are backed with an external datastore such as Redis or Kafka.
Consider using Alpakka instead for this scenario.
Examples
REST + Websocket Realtime API
An example using this for HTTP + Websocket Realtime API
import io.github.dexclaimation.subpub.SubPub
object Main extends SprayJsonSupport {
// ...
val pubsub = SubPub()
val route: Route = {
(path("send" / Segment) & post & entity(as[JsValue])) { path =>
entity(as[JsValue]) {
case JsObject(body) => sendMessage(path, body)
case _ => complete(BadRequest -> JsString("Bad message"))
}
} ~ path("websocket" / Segment) { path =>
handleWebSocketMessages(websocketMessage(path))
}
}
// Handle HTTP Post and emit to websocket
def sendMessage(path: String, body: Map[String, JsValue]): Route = {
try {
val content = body("content")
val name = body("name")
val msg = JsObject(
"content" -> content,
"name" -> name,
"createdAt" -> JsString(Instant.now().toString)
)
// Push message to subpub
pubsub.publish(s"chat::$path", msg)
complete(OK -> msg)
} catch {
case NonFatal(_) =>
complete(BadRequest -> "Bad message")
}
}
// Handle Websocket Flow using the topic based Source
def websocketMessage(path: String): Flow[Message, TextMessage.Strict, _] = {
val source = pubsub
.source[JsValue](s"chat::$path")
.map(_.compactPrint)
.map(TextMessage.Strict)
val sink = Flow[Message]
.map(_ => ()) // On Websocket Message
.to(Sink.onComplete(_ => ())) // on Websocket End
Flow.fromSinkAndSource(sink, source)
}
// ...
}
Realtime GraphQL API
Using with a Realtime GraphQL API with Subscription using Sangria and OverLayer.
import io.github.dexclaimation.subpub.SubPub
object Main {
// ...
val MessageType = ???
val (roomArg, stringArg, nameArg) = ???
val QueryType = ???
val MutationType = ObjectType(
"Mutation",
fields[SubPub, Unit](
// GraphQL Mutation to send message
Field("send", MessageType,
arguments = roomArg :: stringArg :: nameArg :: Nil,
resolve = { c =>
val msg = Message(c arg stringArg, c arg nameArg, Instant.now().toString)
// Publish data into subscription
c.ctx.publish[Message](c arg roomArg, msg)
msg
}
)
)
)
val SubscriptionType = ObjectType(
"Subscription",
field[SubPub, Unit](
// GraphQL Subscription to get realtime data stream
Field.subs("room", MessageType,
arguments = roomArg :: Nil,
// Use the Source from SubPub and map it to Action for Sangria
resolve = c => c.ctx.source[Message](c arg roomArg).map(Action(_))
)
)
)
val schema = Schema(QueryType, Some(MutationType), Some(SubscriptionType))
// OverLayer for handling GraphQL over Websocket
val gqlTransport = OverTransportLayer(schema, ())
val pubsub = SubPub()
val route: Route =
path("graphql" / "websocket") {
gqlTransport.applyMiddleware(pubsub)
}
}
Feedback
If you have any feedback, please reach out to me through the issues tab or Twitter @d_exclaimation