jeanadrien / gatling-mqtt-protocol   1.1.0

Apache License 2.0 GitHub

Unofficial MQTT plugin for Gatling load testing framework

Scala versions: 2.12 2.11

Gatling-MQTT-Protocol

Gatling-MQTT-Protocol is an unofficial plugin for the Gatling load testing framework. It enables usage and measurement of performances of services using MQTT pub/sub protocol.

The plugin provides gatling actions corresponding to the high level commands of the MQTT protocol. This allows better measurement of the MQTT server performance, and customizable scenario.

Two different mqtt clients can be used out of the box to run the tests:

Alternatively, it is possible to use a custom implementation of the MqttClient actor. See below for details.

Installation

Locally, build and add the plugin into the lib directory of Gatling home.

  1. Checkout the code, and build the plugin using sbt
$ sbt assembly
  1. Copy the generated jar into the Gatling library directory
$ cp target/scala-2.12/gatling-mqtt-protocol-assembly-{VERSION}.jar ${GATLING_HOME}/lib

It is also possible to use the plugin on Flood.io. Please refer to the ad-hoc documentation on how to add custom library to Flood IO.

Integration into a scala project

Gatling-MQTT-Protocol is available on Maven Central.

It is possible to import Gatling and the Gatling-MQTT-Protocol plugin into your scala project source using sbt and the Gatling SBT Plugin.

Check the compatibility matrix. Ensure you're using the right version of scala, gatling and the plugin.

Documentation

Quickstart

Gatling-MQTT-Protocol plugin provides a protocol configuration and multiple actions to build a Gatling simulation.

  1. Import the following packages into your Simulation file
import com.github.jeanadrien.gatling.mqtt.Predef._
import io.gatling.core.Predef._
import scala.concurrent.duration._
  1. Configure the MQTT Client.
val mqttConf = mqtt.host("tcp://localhost:1883")
  1. Define your Scenario in your simulation file using provided actions.
val scn = scenario("MQTT Test")
            .exec(connect)
            .exec(subscribe("myTopic"))
            .during(20 minutes) {
                pace(1 second).exec(publish("myTopic", "myPayload"))
            }
setUp(scn.inject(rampUsers(5000) over (10 minutes))).protocols(mqttConf)

The above Scenario connects up to 5000 MQTT clients to your localhost MQTT server at a rate of 500 clients per minute. Each connected client subscribes to myTopic and then issues one PUBLISH command on that same myTopic topic each second. Therefore, the resulting performance test ramps up from 0 to 300k PUBLISH rpm.

MQTT Protocol configuration

These options are configurable using method chaining on the mqtt object. They are based on the Fusesource MQTT-Client available options: https://github.com/fusesource/mqtt-client#controlling-mqtt-options.

  • host(host: Expression[String])
  • clientId(clientId: Expression[String]) : Default is a random String, and it is probably a good choice in the context of load tests since client ID must be unique.
  • cleanSession(cleanSession: Boolean) : Default is true
  • keepAlive(keepAlive: Short) : Interval in seconds between client PINGREQ messages. Default is 30
  • userName(userName: Expression[String])
  • password(password: Expression[String])
  • willTopic(willTopic: Expression[String])
  • willMessage(willMessage: Expression[String])
  • willQos(willQos: QoS)
  • willRetain(willRetain: Boolean)
  • version(version: Expression[String]) : MQTT protocol version. Default is 3.1
  • connectAttemptsMax(connectAttemptsMax: Long)
  • reconnectAttemptsMax(reconnectAttemptsMax: Long)
  • reconnectDelay(reconnectDelay: Long)
  • reconnectDelayMax(reconnectDelayMax: Long)
  • reconnectBackOffMultiplier(reconnectBackOffMultiplier: Double)
  • receiveBufferSize(receiveBufferSize: Int) Fusesource client only
  • sendBufferSize(sendBufferSize: Int) Fusesource client only
  • trafficClass(trafficClass: Int) Fusesource client only
  • maxReadRate(maxReadRate: Int) Fusesource client only
  • maxWriteRate(maxWriteRate: Int) Fusesource client only

Scenario actions

Gatling-MQTT-Protocol plugin provides the following scenario actions:

  • connect : CONNECT MQTT command. Must run at the beginning of the scenario, before any publish / subscribe actions.
  • subscribe(topic : Expression[String]) : SUBSCRIBE MQTT command. Client subscribes to one topic.
  • publish(topic : Expression[String], payload : Expression[Array[Byte]]) : PUBLISH MQTT command. Client publishes the given payload on the given topic.

Additionally to this publish (and forget) action, two additional publish actions are available. They perform the PUBLISH command and expect to receive an echo notification from the MQTT server on the same topic. These actions are useful to measure the round-trip performances of the MQTT server. I.e. the time between the PUBLISH command is sent, and the echo PUBLISH command is received back from the server. Note that the client must subscribe to the topic first in the Gatling scenario, otherwise the actions will fail with a timeout.

  • publishAndWait(topic : Expression[String], payload : Expression[Array[Byte]]) : PUBLISH MQTT command. The action does not call the next chained action until the client listener receives the echo payload.

  • publishAndMeasure(topic : Expression[String], payload : Expression[Array[Byte]]) : PUBLISH MQTT command. Unlike the andWait version, the next chained action is called immediately after the PUBLISH command, but the plugin waits for the echo notification to measure the duration of the round-trip.

  • Last, a waitForMessages action is provided to terminate the scenario. It does nothing but waits until all the pending publishAndMeasure actions receive their notifications.

Connect options

The connect action provides the following options using method chaining. They are a subset of the protocol configuration options described here above and allow threads to connect with different client settings e.g. by using a gatling feeder.

  • clientId(clientId: Expression[String])
  • cleanSession(cleanSession: Boolean)
  • userName(userName: Expression[String])
  • password(password: Expression[String])
  • willTopic(willTopic: Expression[String])
  • willMessage(willMessage: Expression[String])
  • willQos(willQos: QoS)
  • willRetain(willRetain: Boolean)

Subscribe options

The subscribe action provides the following MQTT options using method chaining :

  • qosAtMostOnce : Ask for a QoS of 0 (at most once) for the subscribed topic.
  • qosAtLeastOnce : Ask for a QoS of 1 (at least once) for the subscribed topic.
  • qosExactlyOnce : Ask for a QoS of 2 (exactly once) for the subscribed topic.

Publish and publishAndWait options

The publish, publishAndWait and publishAndMeasure actions provide the following options using method chaining

  • qosAtMostOnce : Publish with a QoS of 0 (at most once).
  • qosAtLeastOnce : Publish with a QoS of 1 (at least once).
  • qosExactlyOnce : Publish with a QoS of 2 (exactly once).
  • retain(newRetain : Boolean) : Set the retain flag of the PUBLISH command. Default: false.

Additionally publishAndwait provides useful options to define how to validate the feedback notification received from the server :

  • payloadFeedback(fn : Array[Byte] => Array[Byte] => Boolean) : Define the comparison function to use when notifications are received on the subscribed topic. The default function compares each byte of the payload.
  • timeout(duration : FiniteDuration) : Timeout before failure.

WaitForMessages options

  • timeout(duration : FiniteDuration) : Timeout duration.

Note about metrics

The time measured for the different actions is:

  • connect : Time to get successfully connected. Note that in case of connection failure, the connection is automatically retried, and a single connect action can potentially lead to several KO requests in the statistics.
  • subscribe : Time to get the SUBACK message back from the server
  • publish : Time to perform the full PUBLISH negotiation, the amount of commands necessary depends of the selected QoS.
  • publishAndWait : Time to perform the publish and receive the notification on the topic. Upper bound is the configured timeout

Examples

You can find Simulation examples in the test directory

Client injection

gatling-mqtt-protocol uses Typesafe config library for configuration mechanism. The setting key mqtt.client defines the class name of the MqttClient implementation.

Two implementations are packaged with the plugin:

  • com.github.jeanadrien.gatling.mqtt.client.FuseSourceMqttClient
  • com.github.jeanadrien.gatling.mqtt.client.PahoMqttClient

Other custom clients can be implemented. They need to extend this MqttClient abstract class, which in its turn extends Actor

It also possible to select which implementation to use programmatically in the test code:

MqttClient.clientInjection = { config =>
    Props(new PahoMqttClient(config))
}

Compatibility

Here is the Gatling-MQTT-Protocol vs. Gatling version compatibility table. Note that Gatling v2.1 is not supported.

  • [v1.0] is built with Gatling sources v2.2.3 and scala 2.11.
  • [v1.1] is built with Gatling sources v2.2.5 and scala 2.11.
  • [v1.2] is built with Galting sources v2.3.0 and scala 2.12.
    • It is compatible with Gatling v2.3.1 bundle.
  • [master] is built with Galting sources v2.3.1 and scala 2.12.

Contributing

Yes, please. Feature requests, bug reports, fixes, comments.

Please use this code style file for IntelliJ

Testing

It's possible to the the plugin and to run the example scenario with sbt:

$ sbt gatling:test

Acknowledgments

Gatling-MQTT-Protocol is a rewrite of Gatling-MQTT plugin. Another unofficial Gatling plugin which provides a MQTT connect+publish stress test, also based on Fusesource MQTT Client.

In this extended version, we aim to increase flexibility in MQTT scenario, abstract the MQTT client and provide better performance metrics.

The protocol configuration DSL is widely compliant with by Gatling-MQTT

  • Thanks to @verakruhliakova who wrote the initial version for Gatling 2.1.
  • Thanks to EVRYTHNG for the use cases and the testing.
  • Thanks to the Gatling team for their feedback.

License

Apache License, Version 2.0