ZIO Pulsar
Dependency
Scala 3
libraryDependencies += "io.github.jxnu-liguobin" %% "zio-pulsar" % <latest version>
Scala 2.13.6+ (sbt 1.5.x)
libraryDependencies +=
("io.github.jxnu-liguobin" %% "zio-pulsar" % NewVersion).cross(CrossVersion.for2_13Use3)
These dependencies are required in the project classpath (ZIO projects only need to pay attention to whether they have imported zio-streams):
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion
)
Example1
object SingleMessageExample extends ZIOAppDefault:
// Note: Do not duplicate the construction of this object, just keep one instance!
lazy val pulsarClient = PulsarClient.live("localhost", 6650)
// val pulsarClient = PulsarClient.live(""pulsar://localhost:6650,localhost:6651,localhost:6652"")
val topic = "single-topic"
// To avoid frequent client creation, both the client's scope and the consumer/producer's scope should not use Scope.default
// The client should be a long-term/on-demand object, while the consumer/producer is perishable after use
val app: ZIO[PulsarClient & Scope, PulsarClientException, Unit] =
for
builder <- ConsumerBuilder.make(JSchema.STRING)
consumer <- builder
.topic(topic)
.subscription(Subscription("my-subscription", SubscriptionType.Shared))
.build
producer <- Producer.make(topic, JSchema.STRING)
_ <- producer.send("Hello!")
m <- consumer.receive
_ = println(m.getValue)
yield ()
override def run = app.provideLayer(pulsarClient ++ Scope.default).exitCode