ZIO AMQP
ZIO AMQP is a ZIO-based wrapper around the RabbitMQ client. It provides a streaming interface to AMQP queues and helps to prevent you from shooting yourself in the foot with thread-safety issues.
Installation
Add the following lines to your build.sbt
file:
libraryDependencies += "dev.zio" %% "zio-amqp" % "1.0.0-alpha.3"
Consuming
The example below creates a connection to an AMQP server and then creates a channel. Both are created as Managed resources, which means they are closed automatically after using even in the face of errors.
The example then creates a stream of the messages consumed from a queue named "queueName"
. Each received message is acknowledged back to the AMQP server.
Producing
Also in the example bellow is a producer which publishes to a given queue
import zio.amqp._
import zio.amqp.model._
import java.net.URI
import zio._
import zio.Console._
val channel: ZIO[Scope, Throwable, Channel] = for {
connection <- Amqp.connect(URI.create("amqp://my_amqp_server_uri"))
channel <- Amqp.createChannel(connection)
} yield channel
val effect: ZIO[Any, Throwable, Unit] =
ZIO.scoped {
channel.flatMap { channel =>
channel
.consume(queue = QueueName("queueName"), consumerTag = ConsumerTag("test"))
.mapZIO { record =>
val deliveryTag = record.getEnvelope.getDeliveryTag
printLine(s"Received ${deliveryTag}: ${new String(record.getBody)}") *>
channel.ack(DeliveryTag(deliveryTag))
}
.take(5)
.runDrain
}
}
val producer = ZIO.scoped {
channel.flatMap { channel =>
channel.publish(
exchange = ExchangeName(""),
routingKey = RoutingKey("queueName"),
body = "Hello world".getBytes
)
}
}
See the ZIO documentation for more information on how to run this effect or integrate with an existing application.
Documentation
Learn more on the ZIO AMQP homepage!
Contributing
For the general guidelines, see ZIO contributor's guide.
Code of Conduct
See the Code of Conduct