(Scala based) NATS / Spark Connectors

That library provides a Scala based Apache Spark (a fast and general engine for large-scale data processing) integration with the NATS messaging system (a highly performant cloud native messaging system) as well as NATS Streaming (a data streaming system powered by NATS).

That library provided a wrapper over the (Java based) NATS / Spark Connectors to facilitate its usage on Scala (which is the de facto language of Spark).

Please refer to that page for additional information.

MIT License Issues wercker status Scaladoc Maven Central

Release Notes

Version 1.0.0

Version 0.4.0

Version 0.3.0

  • Spark version 2.0.1 + Scala version 2.11.8
  • .asStreamOf(ssc) introduced
  • storedAsKeyValue() introduced
  • Message Data can be any Java Object (not limited to String), serialized as byte[] (the native NATS payload format)

Installation

Spark Package

Include this package in your Spark Applications (spark-shell, pyspark, or spark-submit) using:

> $SPARK_HOME/bin/spark-shell --packages com.logimethods:nats-connector-spark-scala_2.11:1.0.0

SBT

resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
resolvers += "Sonatype OSS Release" at "https://oss.sonatype.org/content/groups/public/"

libraryDependencies += "com.logimethods" % "nats-connector-spark-scala_2.11" % "1.0.0"

Maven

In your pom.xml, add:

<dependencies>
  <!-- list of dependencies -->
  <dependency>
    <groupId>com.logimethods</groupId>
    <artifactId>nats-connector-spark-scala_2.11</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>

Usage (in Scala)

See the Java code Documentation to get the list of the available options (properties, subjects, encoder/decoder, etc.).

import com.logimethods.connector.nats.to_spark._
import com.logimethods.scala.connector.spark.to_nats._

val ssc = new StreamingContext(sc, new Duration(2000));

From NATS to Spark

The reception of NATS Messages as Spark Steam is done through the NatsToSparkConnector.receiveFromNats(classOf[Class], ...) method, where [Class] is the Java Class of the objects to deserialize.

Deserialization of the primitive types

Those objects need first to be serialized as byte[] using the right protocol before being stored into the NATS messages payload.

By default, the (Java) number types are then automatically decoded by the connector.

Custom Deserialization

For more complex types, you should provide your own decoder through the withDataDecoder(scala.Function1<byte[], V> dataDecoder) method.

Let's say that the payload have been encoded that way:

def encodePayload(date: LocalDateTime, value: Float): Array[Byte] = {
  // https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html
  val buffer = ByteBuffer.allocate(8+4);
  buffer.putLong(date.atZone(zoneId).toEpochSecond())
  buffer.putFloat(value)

  return buffer.array()    
}

You have to provide your own decoder:

def dataDecoder: Array[Byte] => Tuple2[Long,Float] = bytes => {
      import java.nio.ByteBuffer
      val buffer = ByteBuffer.wrap(bytes);
      val epoch = buffer.getLong()
      val voltage = buffer.getFloat()
      (epoch, voltage)  
    }

import org.apache.spark.streaming.dstream._
val messages: ReceiverInputDStream[(String, (Long, Float))] =
  NatsToSparkConnector
    .receiveFromNatsStreaming(classOf[Tuple2[Long,Float]], StorageLevel.MEMORY_ONLY, clusterId)
    .withNatsURL(natsUrl)
    .withSubjects(inputSubject)
    .withDataDecoder(dataDecoder)
    .asStreamOfKeyValue(ssc)

From NATS Streaming to Spark

val stream = NatsToSparkConnector.receiveFromNatsStreaming(classOf[String], StorageLevel.MEMORY_ONLY, clusterId)
                                 .withNatsURL(natsUrl)
                                 .withSubjects(inputSubject)
                                 .asStreamOf(ssc)

From NATS Streaming to Spark

val properties = new Properties()
val natsUrl = System.getenv("NATS_URI")
val stream = NatsToSparkConnector.receiveFromNats(classOf[Integer], StorageLevel.MEMORY_ONLY)
                                 .withProperties(properties)
                                 .withSubjects(inputSubject)
                                 .asStreamOf(ssc)

From NATS (Streaming or not) to Spark as Key/Value Pairs

The Spark Stream is there made of Key/Value Pairs, where the Key is the Subject and the Value is the Payload of the NATS Messages.

val stream = NatsToSparkConnector.receiveFromNats[Streaming](...)
                                 ...
                                 .withSubjects("main-subject.>")
                                 .asStreamOfKeyValue(ssc)
stream.groupByKey().print()

From Spark to NATS

Serialization of the primitive types

The Spark elements are first serialized as byte[] before being sent to NATS. By default, the (Java) number types are encoded through the com.logimethods.connector.nats_spark.NatsSparkUtilities.encodeData(Object obj) method.

Custom Serialization

Custom serialization can be performed by a java.util.function.Function<[Class], byte[]> & Serializable) function provided through the .publishToNats(...) method, like:

val stream: DStream[(String, (Long, Float))] = .../...

def longFloatTupleEncoder: Tuple2[Long,Float] => Array[Byte] = tuple => {        
      val buffer = ByteBuffer.allocate(8+4);
      buffer.putLong(tuple._1)
      buffer.putFloat(tuple._2)        
      buffer.array()    
    }

SparkToNatsConnectorPool.newStreamingPool(clusterId)
                        .withNatsURL(natsUrl)
                        .withSubjects(outputSubject)
                        .publishToNatsAsKeyValue(stream, longFloatTupleEncoder)

From Spark to NATS Streaming

SparkToNatsConnectorPool.newStreamingPool(clusterId)
                        .withNatsURL(natsUrl)
                        .withSubjects(outputSubject)
                        .publishToNats(stream)

From Spark to NATS Streaming

SparkToNatsConnectorPool.newPool()
                        .withProperties(properties)
                        .withSubjects(outputSubject)
                        .publishToNats(stream)

From Spark as Key/Value Pairs to NATS (Streaming or not)

The Spark Stream should there be made of Key/Value Pairs. .storedAsKeyValue() will publish NATS Messages where the Subject is a composition of the (optional) Global Subject(s) and the Key of the Pairs ; while the NATS Payload will be the Pair's Value.

stream.groupByKey().print()

SparkToNatsConnectorPool.new[Streaming]Pool(...)
                        ...
                        .withSubjects("A1.", "A2.")
                        .publishToNatsAsKeyValue(stream)

will send to NATS such [subject:payload] messages:

[A1.key1:string1]
[A2.key1:string1]
[A1.key2:string2]
[A2.key2:string2]
...

Code Samples

License

(The MIT License)

Copyright (c) 2016-2019 Logimethods.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.