Akka Actor support for Dsl.scala

Build Status

Dsl.scala-akka-actor provides the Akka Actor support for Dsl.scala. It is an alternative to Akka FSM, for building actors with complex states from simple native Scala control flows.

Getting started

This project allows !-notation to receive messages in the Akka actors, which requires BangNotation and ResetEverywhere compiler plugins, along with libraries of this project. For sbt, add the following settings to your build.sbt:

addCompilerPlugin("com.thoughtworks.dsl" %% "compilerplugins-bangnotation" % "latest.release")
addCompilerPlugin("com.thoughtworks.dsl" %% "compilerplugins-reseteverywhere" % "latest.release")

// The library for actors
libraryDependencies += "com.yang-bo.dsl.keywords.akka.actor" %% "receivemessage" % "latest.release"

// The library for typed actors
libraryDependencies += "com.yang-bo.dsl.domains.akka.actor" %% "typed" % "latest.release"

By combining !-notation and native Scala control flows, a complex state machine can be created from simple Scala code. For example, the following state machine contains two states and two transitions between them.

Finite state machine example with comments, 1st Macguy314, reworked by Perhelion German translation by Babakus [Public domain], via Wikipedia Commons

It can be created as a simple while loop with the help of ReceiveMessage.Partial:

import akka.actor.typed._

sealed trait State
case object Opened extends State
case object Closed extends State

sealed trait Transition
case class Open(response: ActorRef[State]) extends Transition
case class Close(response: ActorRef[State]) extends Transition

def doorActor: Behavior[Transition] = {
  while (true) {
    val open = !ReceiveMessage.Partial[Open]
    open.response ! Opened
    val close = !ReceiveMessage.Partial[Close]
    close.response ! Closed
  }
  throw new Exception("Unreachable code!")
}

The doorActor should reply the current state after performing an transition, which is built from some !-notation on ReceiveMessage.Partial keywords, which are in available in functions that return either akka.actor.typed.Behavior or akka.actor.Actor.Receive.

import akka.actor.testkit.typed.scaladsl._
val door = BehaviorTestKit(doorActor)
val state = TestInbox[State]()
door.run(Open(state.ref))
state.expectMessage(Opened)
door.run(Close(state.ref))
state.expectMessage(Closed)
door.run(Open(state.ref))
state.expectMessage(Opened)
door.run(Close(state.ref))
state.expectMessage(Closed)

Previously, state machines in Akka can be created from Akka FSM API, which consists of some domain-specific keywords like when, goto and stay. Unfortunately, you cannot embedded those keywords into your ordinary if / while / match control flows, because Akka FSM DSL is required to be split into small closures, preventing ordinary control flows from crossing the boundary of those closures.

With the help this Dsl.scala-akka-actor project, you can receive messages in a blocking flavor, without explicitly creating those closures. Therefore, a state machine can be described in ordinary if / while / match control flows, just like pseudo-code but it is runnable.

Exception handling

To use try / catch / finally expressions with !-notation, the return type of enclosing function should be Behavior !! Throwable, as shown in the following createDecoderActor method. It will open an InputStream, read String from the stream, and close the stream in a finally block.

import akka.actor.typed._
import akka.actor.typed.scaladsl._
import com.thoughtworks.dsl.Dsl.!!
import java.io._
import java.net._

sealed trait Command
case class Open(open: () => InputStream) extends Command
case class Read(response: ActorRef[String]) extends Command
case object Close extends Command

class DecoderException(cause: Throwable) extends Exception(cause)

def createDecoderActor: Behavior[Command] !! Throwable = {
  while (true) {
    val inputStream = (!ReceiveMessage.Partial[Open]).open()
    try {
      val Read(replyTo) = !ReceiveMessage.Partial[Read]
      replyTo ! new java.io.DataInputStream(inputStream).readUTF()
      !ReceiveMessage.Partial[Close.type]
    } catch {
      case e: IOException =>
        throw new DecoderException(e)
    } finally {
      inputStream.close()
    }
  }
  throw new AssertionError("Unreachable code!")
}

The return type Behavior[Command] !! Throwable is a type alias of (Throwable => Behavior[Command]) => Behavior[Command], which receives message of the type Command, and accepts an additional callback function to handle exceptions that are not handled in createDecoderActor.

import akka.actor.testkit.typed.scaladsl._
val errorHandler = mockFunction[Throwable, Behavior[Command]]
val decoderActor = BehaviorTestKit(createDecoderActor(errorHandler))

Given an InputStream that throws an IOException when read from it,

val inputStream: InputStream = mock[InputStream]
toMockFunction0(inputStream.read _).expects().throws(new IOException())
decoderActor.run(Open(() => inputStream))

when the decoderActor read a String from the stream, it should close the stream due to finally block triggered by the exception.

val inbox = TestInbox[String]()
errorHandler.expects(where[Throwable](_.isInstanceOf[DecoderException])).returns(Behaviors.stopped)
toMockFunction0(inputStream.close _).expects().returns(()).once()
decoderActor.run(Read(inbox.ref))
inbox.receiveAll() should be(empty)

Exception handling in Dsl.scala is as simple as ordinary Scala code, though it is difficult to be gratefully handled in Akka FSM API.

Modules

ReceiveMessage.Partial accepts a type parameter, and it will skip messages whose types are not match the specified type parameter. To avoid the behavior of skipping message , use ReceiveMessage instead.

The above examples create some typed actors (i.e. akka.actor.typed.Behavior), but ReceiveMessage.Partial and ReceiveMessage supports untyped actors as well. To create untyped actor from Dsl.scala-akka-actor, just change the return type from akka.actor.typed.Behavior to akka.actor.Actor.Receive.

These features are separated into two modules in this project:

receivemessage

Scaladoc

receivemessage library contains both ReceiveMessage.Partial and ReceiveMessage, to provide the direct style DSL to receive messages in Akka Actors.

typed

Scaladoc

typed enables the above direct style DSL in typed Akka Actors.

Related projects

!-notation is a general notation in implemented in Dsl.scala, for extracting the value from a keyword. This project is based on Dsl.scala and provides specific ReceiveMessage.Partial and ReceiveMessage keywords, which work in functions that return Behavior, Receive or curried functions that finally returns Behavior / Receive.

This project also supports Scala.js, with the help of Akka.js.

Examples in previous sections are written in ScalaTest and ScalaMock.

There is also a Dsl.scala-akka-http project to provide direct style DSL and exception handling feature to Akka HTTP.

Links