avast / rabbitmq-scala-client   9.4.4

GitHub

Scala wrapper over standard RabbitMQ Java client library

Scala versions: 2.13 2.12

RabbitMQ client CI Version

This client is Scala wrapper over the standard RabbitMQ Java client. Goal of this library is to simplify basic use cases - to provide FP-oriented API for programmers and to shadow the programmer from an underlying client.

The library is configurable both by case classes (core module) and by HOCON/Lightbend Config (pureconfig module).

The library uses concept of connection and derived producers and consumers. Note that the connection shadows you from the underlying concept of AMQP connection and derived channels - it handles channels automatically according to best practises. Each producer and _ consumer_ can be closed separately while closing connection causes closing all derived channels and all producers and consumers.

Dependency

SBT: "com.avast.clients.rabbitmq" %% "rabbitmq-client-core" % "x.x.x"

Gradle: compile 'com.avast.clients.rabbitmq:rabbitmq-client-core_$scalaVersion:x.x.x'

Modules

  1. api - Contains only basic traits for consumer etc.
  2. core - Main module. The client, configurable by case classes.
  3. pureconfig - Module for configuration from Config.
  4. extras - Module with some extra features.
  5. extras-circe Allows to publish and consume JSON events, using the circe library.
  6. extras-protobuf Allows to publish and consume events defined as Google Protocol Buffers messages (as both JSON and Protobuf), represented as standard Java classes.
  7. extras-scalapb Allows to publish and consume events defined as Google Protocol Buffers messages (as both JSON and Protobuf), generated to Scala using ScalaPB.

Migration

There exists a migration guide between versions 6.1.x and 8.0.x.
There exists a migration guide between versions 8.x and 9.0.x.

Please note that configuration from Typesafe/Lightbend config has been moved to pureconfig module since 8.x.

Usage

The API is finally tagless (read more e.g. here) with cats.effect.Resource which is convenient way how to manage resources in your app. In addition, there is a support for streaming with fs2.Stream.

The API uses conversions for both consumer and producer, that means you don't have to work directly with Bytes (however you still can if you want to) and you touch only your business model class which is then (de)serialized using provided converter.

Monitoring of the library is done via Avast Metrics library, its Scala Effect API in particular. If you don't want the client to be monitored, feel free to pass Monitor.noOp[F] instead.

The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You have to provide both of them:

  1. Blocking executor as ExecutorService
  2. Callback executor as scala.concurrent.ExecutionContext

The default way is to configure the client with manually provided case classes; see pureconfig module for a configuration from HOCON (Lightbend Config).

This is somewhat minimal setup, using Monix Task:

import java.util.concurrent.ExecutorService

import cats.effect.Resource
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.api._
import com.avast.metrics.scalaeffectapi.Monitor
import javax.net.ssl.SSLContext
import monix.eval._
import monix.execution.Scheduler

implicit val sch: Scheduler = ???
val monitor: Monitor = ???

val blockingExecutor: ExecutorService = ???

val sslContext = SSLContext.getDefault

val connectionConfig = RabbitMQConnectionConfig(
  hosts = List("localhost:5432"),
  name = "MyProductionConnection",
  virtualHost = "/",
  credentials = CredentialsConfig(username = "vogon", password = "jeltz")
)

val consumerConfig = ConsumerConfig(
  name = "MyConsumer",
  queueName = "QueueWithMyEvents",
  bindings = List(
    AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent"))
  )
)

val producerConfig = ProducerConfig(
  name = "MyProducer",
  exchange = "MyGreatApp"
)

// see https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources

val rabbitMQProducer: Resource[Task, RabbitMQProducer[Task, Bytes]] = {
  for {
    connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
    /*
      Here you have created the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single
      TCP connection but have separated channels.
      If you expect very high load, you can use separate connections for each producer/consumer, however it's usually not needed.
       */

    consumer <- connection.newConsumer[Bytes](consumerConfig, monitor) {
      case delivery: Delivery.Ok[Bytes] =>
        Task.now(DeliveryResult.Ack)

      case _: Delivery.MalformedContent =>
        Task.now(DeliveryResult.Reject)
    }

    producer <- connection.newProducer[Bytes](producerConfig, monitor)
  } yield {
    producer
  }
}

Streaming support

Note: this has nothing to do with the RabbitMQ Streams. This client is about providing fs2.Stream API instead of the callback-based one but works on top of a normal queue.

It seems quite natural to process RabbitMQ queue with a streaming app. StreamingRabbitMQConsumer provides you an fs2.Stream through which you can easily process incoming messages in a streaming way.

Notice: Using this functionality requires you to know some basics of FS2 library. Please see it's official guide if you're not familiar with it first.

// skipping imports and common things, they are the same as in general example above

val consumerConfig = StreamingConsumerConfig( // notice: StreamingConsumerConfig vs. ConsumerConfig
  name = "MyConsumer",
  queueName = "QueueWithMyEvents",
  bindings = List(
    AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent"))
  )
)

val processMyStream: fs2.Pipe[Task, StreamedDelivery[Task, Bytes], Unit] = { in =>
  in.evalMap(_.handleWith(d => Task.now(DeliveryResult.Ack))) // TODO you probably want to do some real stuff here
}

val deliveryStream: Resource[Task, fs2.Stream[Task, Unit]] = {
  for {
    connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
    streamingConsumer <- connection.newStreamingConsumer[Bytes](consumerConfig, monitor)
  } yield {
    val stream: fs2.Stream[Task, StreamedResult] = streamingConsumer.deliveryStream.through(processMyStream)

    // create resilient (self-restarting) stream; see more information below
    lazy val resilientStream: fs2.Stream[Task, StreamedResult] = stream.handleErrorWith { e =>
      // TODO log the error - something is going wrong!
      resilientStream
    }

    resilientStream
  }
}

Resilient stream

While you should never ever let the stream fail (handle all your possible errors; see Error handling section in official docs how the stream can be failed), it's important you're able to recover the stream when it accidentally happens. You can do that by simply requesting a new stream from the client:

val stream = streamingConsumer.deliveryStream // get stream from client
  .through(processMyStream) // "run" the stream through your processing logic

val failureCounter: Ref[Task, Int] = ??? // TODO: initialize to max recover count!

lazy val resilientStream: fs2.Stream[Task, Unit] = stream.handleErrorWith { err =>
  // handle the error in stream: recover by calling itself
  // TODO don't forget to add some logging/metrics here!
  fs2.Stream.eval(failureCounter.modify(a => (a - 1, a - 1))).flatMap { attemptsRest =>
    if (attemptsRest < 0) fs2.Stream.raiseError[Task](err) else resilientStream
  }
}

resilientStream

or use a prepared extension method:

import com.avast.clients.rabbitmq._

streamingConsumer.deliveryStream // get stream from client
  .through(processMyStream) // "run" the stream through your processing logic
  .makeResilient(maxErrors = 3) { err =>
    Task.delay {
      // TODO don't forget to add some logging/metrics here!
      ()
    }
  }

Please refer to the official guide for understanding more deeply how the recovery of fs2.Stream works.

Producer/consumer listeners

While everyone wants the RabbitMQ to "just work", in reality, it may not be that easy. Servers are restarted, deliveries are processed for too long etc. For such occasions, there exist listeners in this client - connection, channel and consumer kinds.
The listeners are passed to the connection factory method. You are not required to implement/provide them, however, it's strongly recommended doing so. The default implementations are only logging the events while you may want to react differently - increase some counter, mark the app as unhealthy etc. (as some events are not easy to recover from).

Providing converters for producer/consumer

Both the producer and consumer require type argument when creating from connection:

  1. connection.newConsumer[MyClass] which requires implicit DeliveryConverter[MyClass]
  2. connection.newProducer[MyClass] which requires implicit ProductConverter[MyClass]

There are multiple options where to get the converter (it's the same case for DeliveryConverter as for ProductConverter):

  1. Implement your own implicit converter for the type
  2. Modules extras-circe and extras-scalapb provide support for JSON and GPB conversion.
  3. Use identity converter by specifying Bytes type argument. No further action needed in that case.

Poisoned message handler

It's quite often use-case we want to republish failed message but want to avoid the message to be republishing forever. You can use the PoisonedMessageHandler (PMH) to solve this issue. It will count no. of attempts and won't let the message be republished again and again (above the limit you set).
Note: it works ONLY for Republish and not for Retry!

The PoisonedMessageHandler is built into the both "normal" and streaming consumers. After the execution of the poisoned-message action, the delivery is REJECTed (so it's not in the original queue anymore).

All types (except no-op) of the poisoned message handler has maxAttempts configuration option which determines how many times the message can be delivered to the consumer. What it means in practice is that if maxAttempts == 3 and you choose to republish it for the third time, the PMH takes its action - as the next delivery of the message would be already fourth, which is over the configured limit.

Internally, the attempts counting is done via incrementing (or adding, the first time) the X-Republish-Count header in the message. Feel free to use its value for your own logging or whatever. You can even set it to your own value - just bear in mind that you might affect the PMH's functionality (of course, that might be your intention).

While the republishing is meant to not let the other messages starve (it puts the message to the start of the queue, meaning it'll be the last message to be processed at the moment), it may still happen that its retry is immediate - in other words, during the periods with a low traffic, it behaves very similarly to Retry. To prevent this, there is a support (since 9.2.0) for republish delaying - meaning the republishing is in-memory delayed before applying. Since it happens in the client itself and not in your application, the consumer timeout is not involved at that point.
However, it's important to note that if there are too many messages being republished and delayed, it might theoretically stop the consuming absolutely! This is due to the prefetchCount configuration value - the server just won't give the consumer any more messages. While this behavior might be desired (it may be considered as a way of self-throttling for the consumer), it doesn't have to be, and you should setup your consumer and republishing delay properly. There is a gauge in metrics showing how many messages is currently being delayed.

It can happen that you know that you have PMH configured, and you need to republish the message and "not count the attempt" (the typical scenario is that the message processing has failed and it's not fault of your app but of some 3rd party system which you count on to be recovered later). There exists the new countAsPoisoned parameter now (defaults to true) determining whether the PMH (if configured) should count the attempt or not. This is an easy and clean way how to influence the PMH behavior.

On the other hand, you may want to use DirectlyPoison result (since 9.3.0) in cases you know there's no chance the retry will succeed ( e.g. message is not parseable) but still you want to keep the message (move it to the poisoned queue if configured; that is the difference between DirectlyPoison and Retry where the message would be thrown away completely).

Dead-queue poisoned message handler

The most common and useful type, which will take all "poisoned" messages and publish them to a queue of your choice.
In its configuration, you basically configure a producer which is used to send the message into the dead-queue. While the producer will create its exchange it publishes to, you are responsible for creating the queue and binding to the producer's exchange. You can use the additional declaration/bindings functionality of the client. If you forget to do so, your messages will be lost completely.

Logging poisoned message handler

As the name suggests, this PMH only logs the poisoned message before it's thrown away (and lost forever).

No-op poisoned message handler

This PMH does nothing.

Example HOCON configuration

Please mind that the PMH is only responsible for publishing the poisoned messages, not for declaring/binding the queue where they'll end!

myConsumer {
  name = "MyVeryImportantConsumer"

  // ...
  // the usual stuff for consumer - timeout, bindings, ...
  // ...

  poisonedMessageHandling {
    type = "deadQueue" // deadqueue, logging, noop (default noop)

    maxAttempts = 2 // <-- required for deadqueue and logging types

    // required only for deadQueue type:
    deadQueueProducer {
      routingKey = "dead"
      name = "DeadQueueProducer"
      exchange = "EXCHANGE3"
      declare {
        enabled = true
        type = "direct"
      }
    }
  }
}

Payload logging or redaction

By default, the client logs received delivery (on the TRACE level, unless timeout or sth happens - it's on some higher levels then) for better debugging experience. However, if you transfer some sensitive data and you don't want the delivery to be logged, you can easily turn it off by using redactPayload = true parameter in consumer configs (note: producer doesn't log the delivery at all, just its metadata like routing key and properties).

Caveats

  1. null instead of converter instance
    It may happen you run in this problem:
    scala> import io.circe.generic.auto._
    import io.circe.generic.auto._
    
    scala> import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter
    import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter
    
    scala> import com.avast.clients.rabbitmq.DeliveryConverter
    import com.avast.clients.rabbitmq.DeliveryConverter
    
    scala> case class Event(name: String)
    defined class Event
    
    scala> implicit val deliveryConverter: JsonDeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = null
    
    scala> implicit val deliveryConverter: DeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.DeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@5b977aaa
    
    scala> implicit val deliveryConverter = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@4b024fb2
    Notice the results of last three calls differ even though they are supposed to be the same (non-null respectively)! A very similar issue is discussed on the StackOverflow and so is similar the solution:
    1. Remove explicit type completely (not recommended)
    2. Make the explicit type more general (DeliveryConverter instead of JsonDeliveryConverter in this case)

Notes

Extras

There is a module with some optional functionality called extras.

Network recovery

The library offers configurable network recovery, with the functionality itself backed by RabbitMQ client's one (ready in 5+).
You can either disable the recovery or select (and configure one of following types):

  1. Linear
    The client will wait initialDelay for first recovery attempt and if it fails, will try it again each period until it succeeds.
  2. Exponential
    The client will wait initialDelay for first recovery attempt and if it fails, will try it again until it succeeds and prolong the delay between each two attempts exponentially (based on period, factor, attempt number), up to maxLength.
    Example:
    For initialDelay = 3s, period = 2s, factor = 2.0, maxLength = 1 minute, produced delays will be 3, 2, 4, 8, 16, 32, 60 seconds (and it will never go higher).

Do not set too short custom recovery delay intervals (less than 2 seconds) as it is not recommended by the official RabbitMQ API Guide.

DeliveryResult

The consumers readAction returns Future of DeliveryResult. The DeliveryResult has 4 possible values (descriptions of usual use-cases):

  1. Ack - the message was processed; it will be removed from the queue
  2. Reject - the message is corrupted or for some other reason we don't want to see it again; it will be removed from the queue
  3. Retry - the message couldn't be processed at this moment (unreachable 3rd party services?); it will be requeued (inserted on the top of the queue)
  4. Republish - the message may be corrupted, but we're not sure; it will be re-published to the bottom of the queue (as a new message and the original one will be removed). It's usually wise to prevent an infinite republishing of the message - see Poisoned message handler.
  5. DirectlyPoison - the message should be thrown away, without any retry effort. The Poisoned message handler takes care of it - it either moves the message to the poisoned queue (if it's configured) like it has reached the configured limit of republishes; or it just throws the message away (if no-op PMH is configured).

Difference between Retry and Republish

When using Retry the message can effectively cause starvation of other messages in the queue until the message itself can be processed; on the other hand Republish inserts the message to the original queue as a new message and it lets the consumer handle other messages (if they can be processed).

Republishing

Republishing is solved at application level with publishing a new message (with original content, headers, messageId, etc.) to the original queue and acknowledging the old one. This can be done via:

  1. Default exchange Every virtual host in RabbitMQ has default exchange which has implicit bindings to all queues and can be easily used for publishing to basically any queue. This is very handy for functionality such as the republishing however it's also very dangerous and you don't have permissions to use it. In case you do have them, use this option instead of the custom exchange.
    This the default option (in other words, the client will use the default exchange in case you don't tell it not to do so).
  2. Custom exchange In case you're unable to use the default exchange, you have to create your own exchange to replace the functionality. The RabbitMQ client will create it for you together with all necessary bindings and all you have to do is to just configure a name of the exchange, e.g.
       rabbitConnection {
         hosts = ["localhost:5672"]
         virtualHost = "/"
       
         ...
       
         republishStrategy {
           type = CustomExchange
       
           exchangeName = "ExchangeForRepublishing"
    
           exchangeDeclare = true // default
           exchangeAutoBind = true // default
         }
         
         ...
       }
    The exchange is created as direct, durable and without auto-delete flag.

Bind/declare arguments

There is an option to specify bind/declare arguments for queues/exchanges as you may read about at RabbitMQ docs.
Example of configuration with HOCON:

  producer {
  name = "Testing" // this is used for logging etc.

  exchange = "myclient"

  // should the producer declare exchange he wants to send to?
  declare {
    enabled = true // disabled by default

    type = "direct" // fanout, topic

    arguments = {"x-max-length": 10000}
  }
}

Additional declarations and bindings

Sometimes it's necessary to declare an additional queue or exchange which is not directly related to the consumers or producers you have in your application (e.g. dead-letter queue).
The library makes possible to do such thing. Here is example of such configuration with HOCON:

val rabbitConnection: ConfigRabbitMQConnection[F] = ???

rabbitConnection.bindExchange("backupExchangeBinding") // : F[Unit]

where the "backupExchangeBinding" is link to the configuration (use relative path to the declarations block in configuration):

  declarations {
  backupExchangeBinding {
    sourceExchangeName = "mainExchange"
    destExchangeName = "backupExchange"
    routingKeys = ["myMessage"]
    arguments {}
  }
}

Equivalent code with using case classes configuration:

val rabbitConnection: RabbitMQConnection[F] = ???

rabbitConnection.bindExchange(
  BindExchangeConfig(
    sourceExchangeName = "mainExchange",
    destExchangeName = "backupExchange",
    routingKeys = List("myMessage")
  )
) // : F[Unit]

Correlation-ID handling

The library supports CorrelationId (sometimes also called TraceId or TracingId) handling out of the box.

Producer

Producer takes implicit cidStrategy: CorrelationIdStrategy parameter which enables you to configure how the CorrelationId should be derived/generated. You can implement your own strategy to suit your needs.That means that there'll always be "some" CorrelationId going in the message (since v9).
If you don't specify the strategy by yourself, CorrelationIdStrategy.FromPropertiesOrRandomNew is used - it will try to locate the CID in properties (or headers) and generate a new one if it doesn't succeed. In any way, the CID will be part of both logs and resulting (outgoing) RabbitMQ message.

Publisher confirms

By using following configuration

   producer {
      properties {
         confirms {
            enabled = true
            sendAttempts = 2
         }
      }
}

clients can enable publisher confirms. Each send call will wait for ack/nack from broker. This wait is of course non-blocking. sendAttempts is number of all attempts including initial one. If number of sendAttempts is greater than 1 it will try to resend messages again right after it obtains nack from broker.

From implementation point of view, it uses asynchronous acks/nacks combined with Deferred from cats library.

Consumers

You can also get the CorrelationId from the message properties on the consumer side. The CID is taken from both AMQP properties and X-Correlation-Id header (where the property has precedence and the header is just a fallback).

Pull consumer

Sometimes your use-case just doesn't fit the normal consumer scenario. Here you can use the pull consumer which gives you much more control over the received messages. You pull new message from the queue and acknowledge (reject, ...) it somewhere in the future.

The pull consumer uses PullResult as return type:

  • Ok - contains DeliveryWithHandle instance
  • EmptyQueue - there was no message in the queue available

Additionally you can call .toOption method on the PullResult.

A simplified example, using configuration from HOCON:

import cats.effect.Resource
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.pureconfig._
import com.avast.clients.rabbitmq.api._
import monix.eval.Task
import monix.execution.Scheduler

implicit val sch: Scheduler = ???

val consumer: Resource[Task, RabbitMQPullConsumer[Task, Bytes]] = {
  for {
    connection <- RabbitMQConnection.fromConfig[Task](???, ???)
    consumer <- connection.newPullConsumer[Bytes](??? : String, ???)
  } yield {
    consumer
  }
}

val program: Task[Unit] = consumer.use { consumer =>
  Task
    .sequence {
      (1 to 100).map(_ => consumer.pull())
    } // receive "up to" 100 deliveries
    .flatMap { ds =>
      // do your stuff!

      Task.unit
    }
}

MultiFormatConsumer

Quite often you receive a single type of message but you want to support multiple formats of encoding (Protobuf, Json, ...). This is where MultiFormatConsumer could be used.

Modules extras-circe and extras-scalapb provide support for JSON and GPB conversion. They are both used in the example below.

The MultiFormatConsumer is Scala only.

Usage example:

Proto file

import com.avast.bytes.Bytes
import com.avast.cactus.bytes._ // Cactus support for Bytes, see https://github.com/avast/cactus#bytes
import com.avast.clients.rabbitmq.test.ExampleEvents.{NewFileSourceAdded => NewFileSourceAddedGpb}
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.extras.format._
import io.circe.Decoder
import io.circe.generic.auto._ // to auto derive `io.circe.Decoder[A]` with https://circe.github.io/circe/codec.html#fully-automatic-derivation
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

private implicit val d: Decoder[Bytes] = Decoder.decodeString.map(???)

case class FileSource(fileId: Bytes, source: String)

case class NewFileSourceAdded(fileSources: Seq[FileSource])

val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](
  JsonDeliveryConverter.derive(), // requires implicit `io.circe.Decoder[NewFileSourceAdded]`
  GpbDeliveryConverter[NewFileSourceAddedGpb]
    .derive() // requires implicit `com.avast.cactus.Converter[NewFileSourceAddedGpb, NewFileSourceAdded]`
)(_ => ???)

(see unit test for full example)

Implementing own DeliveryConverter

The CheckedDeliveryConverter is usually reacting to Content-Type (like in the example below) but it's not required - it could e.g. analyze the payload (or first bytes) too.

import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.CheckedDeliveryConverter
import com.avast.clients.rabbitmq.api.{ConversionException, Delivery}

val StringDeliveryConverter: CheckedDeliveryConverter[String] = new CheckedDeliveryConverter[String] {
  override def canConvert(d: Delivery[Bytes]): Boolean = d.properties.contentType.contains("text/plain")

  override def convert(b: Bytes): Either[ConversionException, String] = Right(b.toStringUtf8)
}