hjfruit / zio-pulsar   0.3.5

BSD 2-clause "Simplified" License Website GitHub

zio-pulsar fork from github.com/jczuchnowski/zio-pulsar

Scala versions: 3.x

ZIO Pulsar

CI Nexus (Snapshots) Sonatype Nexus (Releases)

Dependency

Scala 3

libraryDependencies += "io.github.jxnu-liguobin" %% "zio-pulsar" % <latest version>

Scala 2.13.6+ (sbt 1.5.x)

libraryDependencies += 
  ("io.github.jxnu-liguobin" %% "zio-pulsar" % NewVersion).cross(CrossVersion.for2_13Use3)

These dependencies are required in the project classpath (ZIO projects only need to pay attention to whether they have imported zio-streams):

libraryDependencies ++= Seq(
  "dev.zio" %% "zio"         % zioVersion,
  "dev.zio" %% "zio-streams" % zioVersion
)

Example1

object SingleMessageExample extends ZIOAppDefault:

  // Note: Do not duplicate the construction of this object, just keep one instance!
  lazy val pulsarClient = PulsarClient.live("localhost", 6650)
  // val pulsarClient = PulsarClient.live(""pulsar://localhost:6650,localhost:6651,localhost:6652"")

  val topic = "single-topic"

  // To avoid frequent client creation, both the client's scope and the consumer/producer's scope should not use Scope.default
  // The client should be a long-term/on-demand object, while the consumer/producer is perishable after use
  val app: ZIO[PulsarClient & Scope, PulsarClientException, Unit] =
    for
      builder  <- ConsumerBuilder.make(JSchema.STRING)
      consumer <- builder
                    .topic(topic)
                    .subscription(Subscription("my-subscription", SubscriptionType.Shared))
                    .build
      producer <- Producer.make(topic, JSchema.STRING)
      _        <- producer.send("Hello!")
      m        <- consumer.receive
      _ = println(m.getValue)
    yield ()

  override def run = app.provideLayer(pulsarClient ++ Scope.default).exitCode