fs2-nakadi
fs2-nakadi is Nakadi client for Scala based on FS2.
Under the hood
Status
Work is still in progress but the basic DSLs are defined.
Installation
libraryDependencies += "io.nigo" %% "fs2-nakadi" % "0.1.0-M2"
Usage
Event Types
There are three main categories of event type defined by Nakadi:
-
Business Event: An event that is part of, or drives a business process, such as a state transition in a customer order.
-
Data Change Event: An event that represents a change to a record or other item, or a new item. Change events are associated with a create, update, delete, or snapshot operation.
-
Undefined Event: A free form category suitable for events that are entirely custom to the producer.
fs2-nakadi provides a simple DSL for dealing with event types:
import cats.effect.IO
import java.net.URI
import fs2.nakadi.client._
import fs2.nakadi.model._
import fs2.nakadi.dsl._
// Define Nakadi setting
implicit val config: NakadiConfig[IO] = NakadiConfig[IO](new URI("<nakadi-uri>"))
// Define EventType
val business = EventType(
name = EventTypeName("business-data"),
owningApplication = "fs2-nakadi",
category = Category.Business
)
// Create the EventType
EventTypeClient[IO].create(business)
// Find the EventType
EventTypeClient[IO].get(EventTypeName("business-data"))
Publish Events
You can define your own ADT of the events and simply publish them to the desired EventType using Events DSL:
import cats.effect.IO
import java.util.UUID
import java.time.ZonedDateTime
import fs2.nakadi.model._
import fs2.nakadi.model.Event.Business
import fs2.nakadi.dsl._
import fs2.Stream
// Define Event ADT
case class User(id: UUID, firstName: String, lastName: String, createdAt: ZonedDateTime)
object User {
import io.circe.{Encoder, Decoder}
import io.circe.derivation._
implicit val userEncoder: Encoder[User] = deriveEncoder(renaming.snakeCase)
implicit val userDecoder: Decoder[User] = deriveDecoder(renaming.snakeCase)
}
// Define Event
val user: User = User(UUID.randomUUID(), "john", "snow", ZonedDateTime.now())
val event: Event[User] = Business(
data = user,
metadata = Metadata()
)
val eventClient = EventClient[IO]
// Publish a list of Event
eventClient.publish[User](EventTypeName("user-data"), List(event))
// Publish a Stream
Stream
.emit(event)
.repeat
.through(eventClient.publishStream[User](EventTypeName("user-data")))
Consume Events
fs2-nakadi supports high-level event consumption using subscriptions
import cats.effect.IO
import java.net.URI
import fs2.nakadi.model._
import fs2.nakadi.dsl._
import fs2.Stream
val subClient = SubscriptionClient[IO]
// Create a subscription if doesn't exist
val sub =
subClient
.createIfDoesntExist(
Subscription(
owningApplication = "fs2-nakadi",
eventTypes = Some(List(EventTypeName("user-data")))
)
)
// Create event stream
Stream
.eval(sub)
.flatMap(s => subClient.eventStream[User](s.id.get, StreamConfig()))
You can also use managedEventStream
which receives a callback and applies it to every event:
val callback: EventCallback[User] =
_.subscriptionEvent.events match {
case Some(ev) =>
ev.foreach(e => println(s"Received Event: ${e.data.toString}"))
true
case _ => true
}
Stream
.eval(sub)
.flatMap { s =>
subClient.managedEventStream[User](1)(s.id.get, callback, StreamConfig())
}