We are proud to opensource Akka-Stream-Extensions
extending Typesafe Akka-Stream.
The main purpose of this project is to:
-
Develop generic
Sources
/Flows
/Sinks
not provided out-of-the-box by Akka-Stream. -
Make those structures very well tested & production ready.
-
Study/evaluate streaming concepts based on Akka-Stream & other technologies (AWS, Postgres, ElasticSearch, ...).
We have been developing this library in the context of MFG Labs for our production projects after identifying a few primitive structures that were common to many use-cases, not provided by Akka-Stream out of the box and not so easy to implement in a robust way.
Scaladoc is available there.
resolvers += Resolver.bintrayRepo("mfglabs", "maven")
Currently depends on akka-stream-2.4.18
libraryDependencies += "com.mfglabs" %% "akka-stream-extensions" % "0.11.2"
Changelog here
import com.mfglabs.stream._
// Source from a paginated REST Api
val pagesStream: Source[Page, ActorRef] = SourceExt
.bulkPullerAsync(0L) { (currentPosition, downstreamDemand) =>
val futResult: Future[Seq[Page]] = WSService.get(offset = currentPosition, nbPages = downstreamDemand)
futResult.map {
case Nil => Nil -> true // stop the stream if the REST Api delivers no more results
case p => p -> false
}
}
someBinaryStream
.via(FlowExt.rechunkByteStringBySeparator(ByteString("\n"), maximumChunkBytes = 5 * 1024))
.map(_.utf8String)
.via(
FlowExt.customStatefulProcessor(Vector.empty[String])( // grouping by 100 except when we encounter a "flush" line
(acc, line) => {
if (acc.length == 100) (None, acc)
else if (line == "flush") (None, acc :+ line)
else (Some(acc :+ line), Vector.empty)
},
lastPushIfUpstreamEnds = acc => acc
)
)
Many more helpers, check the Scaladoc!
This extension provides tools to stream data from/to Postgres.
libraryDependencies += "com.mfglabs" %% "akka-stream-extensions-postgres" % "0.11.2"
Pull all docker images launched by the tests
docker pull postgres:8.4
docker pull postgres:9.6
import com.mfglabs.stream._
import com.mfglabs.stream.extensions.postgres._
implicit val pgConnection = PgStream.sqlConnAsPgConnUnsafe(sqlConnection)
implicit val blockingEc = ExecutionContextForBlockingOps(someEc)
PgStream
.getQueryResultAsStream(
"select a, b, c from table",
options = Map("FORMAT" -> "CSV")
)
.via(FlowExt.rechunkByteStringBySeparator(ByteString("\n"), maximumChunkBytes = 5 * 1024))
someLineStream
.via(PgStream.insertStreamToTable(
"schema",
"table",
options = Map("FORMAT" -> "CSV")
))
libraryDependencies += "com.mfglabs" %% "akka-stream-extensions-elasticsearch" % "0.11.2"
import com.mfglabs.stream._
import com.mfglabs.stream.extensions.elasticsearch._
import org.elasticsearch.client.Client
import org.elasticsearch.index.query.QueryBuilders
implicit val blockingEc = ExecutionContextForBlockingOps(someEc)
implicit val esClient: Client = // ...
EsStream
.queryAsStream(
QueryBuilders.matchAllQuery(),
index = "index",
`type` = "type",
scrollKeepAlive = 1 minutes,
scrollSize = 1000
)
This extension allows to build at compile-time a fully typed-controlled flow that transforms a HList of Flows to a Flow from the Coproduct of inputs to the Coproduct of outputs.
For more details on the history of this extension, read this article.
libraryDependencies += "com.mfglabs" %% "akka-stream-extensions-shapeless" % "0.11.2"
// 1 - Create a type alias for your coproduct
type C = Int :+: String :+: Boolean :+: CNil
// The sink to consume all output data
val sink = Sink.fold[Seq[C], C](Seq())(_ :+ _)
// 2 - a sample source wrapping incoming data in the Coproduct
val f = GraphDSL.create(sink) { implicit builder => sink =>
import GraphDSL.Implicits._
val s = Source.fromIterator(() => Seq(
Coproduct[C](1),
Coproduct[C]("foo"),
Coproduct[C](2),
Coproduct[C](false),
Coproduct[C]("bar"),
Coproduct[C](3),
Coproduct[C](true)
).toIterator)
// 3 - our typed flows
val flowInt = Flow[Int].map{i => println("i:"+i); i}
val flowString = Flow[String].map{s => println("s:"+s); s}
val flowBool = Flow[Boolean].map{s => println("s:"+s); s}
// >>>>>> THE IMPORTANT THING
// 4 - build the coproductFlow in a 1-liner
val fr = builder.add(ShapelessStream.coproductFlow(flowInt :: flowString :: flowBool :: HNil))
// <<<<<< THE IMPORTANT THING
// 5 - plug everything together using akkastream DSL
s ~> fr.in
fr.out ~> sink
ClosedShape
}
// 6 - run it
RunnableGraph.fromGraph(f).run().futureValue.toSet should equal (Set(
Coproduct[C](1),
Coproduct[C]("foo"),
Coproduct[C](2),
Coproduct[C](false),
Coproduct[C]("bar"),
Coproduct[C](3),
Coproduct[C](true)
))
Check our project MFG Labs/commons-aws also providing streaming extensions for Amazon S3 & SQS.
To test postgres-extensions, you need to have Docker installed and running on your computer (the tests will automatically launch a docker container with a Postgres db).
MFG Labs sponsored the development and the opensourcing of this library.
We hope this library will be useful & interesting to a few ones and that some of you will help us debug & build more useful structures.
This software is licensed under the Apache 2 license, quoted below.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License here.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.