atry / memcontinuationed   0.3.2

Apache License 2.0 GitHub

Memcached client for Scala

Scala versions: 2.11 2.10

Memcontinuationed

Build Status

Memcontinuationed is an asynchronous memcached client for Scala. Memcontinuationed is the fastest memcached client on JVM, much faster than spymemcached or Whalin's client.

Why is Memcontinuationed so fast?

Reason 1. Better threading model

Memcontinuationed never blocks any threads. On the other hand, spymemcached does not block the IO thread but it does block the user's thread. All Memcontinuationed API are @suspendable, which mean these methods can be invoked by a thread, and return to another thread.

Reason 2. Optimization for huge number of IOPS

Memcontinuationed can merge multiply get or gets requests into one request. On the other hand, spymemcached sends all requests immediately, never waiting for previous response. The spymemcached way consumes more CPU and more TCP overheads than Memcontinuationed. Even worse, the spymemcached way is not compatible with some memcached server.

Note: Because Memcontinuationed queues requests until all the previous response have been received, you may need to create a connection pool of com.dongxiguo.memcontinuationed.Memcontinuationed to maximize the IOPS.

A sample to use Memcontinuationed

import com.dongxiguo.memcontinuationed.Memcontinuationed
import com.dongxiguo.memcontinuationed.StorageAccessor
import java.io._
import java.net._
import java.nio.channels.AsynchronousChannelGroup
import java.util.concurrent.Executors
import scala.util.continuations.reset
import scala.util.control.Exception.Catcher

object Sample {

  def main(args: Array[String]) {
    val threadPool = Executors.newCachedThreadPool()
    val channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool)

    // The locator determines where the memcached server is.
    // You may want to implement ketama hashing here.
    def locator(accessor: StorageAccessor[_]) = {
      new InetSocketAddress("localhost", 1978)
    }

    val memcontinuationed = new Memcontinuationed(channelGroup, locator)

    // The error handler
    implicit def catcher:Catcher[Unit] = {
      case e: Exception =>
        scala.Console.err.print(e)
        sys.exit(-1)
    }

    reset {
      memcontinuationed.set(MyKey("hello"), "Hello, World!")
      val result = memcontinuationed.require(MyKey("hello"))
      assert(result == "Hello, World!")
      println(result)
      sys.exit()
    }
  }
}

/**
 * `MyKey` specifies how to serialize the data of a key/value pair.
 */
case class MyKey(override val key: String) extends StorageAccessor[String] {

  override def encode(output: OutputStream, data: String, flags: Int) {
    output.write(data.getBytes("UTF-8"))
  }

  override def decode(input: InputStream, flags: Int): String = {
    val result = new Array[Byte](input.available)
    input.read(result)
    new String(result, "UTF-8")
  }
}

There is something you need to know:

  • get, set, and most of other methods in Memcontinuationed are @suspendable. You must invoke them in reset or in another @suspendable function you defined.
  • get, set, and most of other methods in Memcontinuationed accept an implicit parameter Catcher. You must use Catcher to handle exceptions from @suspendable functions, instead of try/catch.
  • MyKey is the key you passed to server, which is a custom StorageAccessor. You should implement your own StorageAccessor for each type of data. If your value's format is Protocol Buffers, you can use com.dongxiguo.memcontinuationed.ProtobufAccessor as your custom key's super class.

Build configuration

Add these lines to your build.sbt if you use Sbt:

libraryDependencies += "com.dongxiguo" %% "memcontinuationed" % "0.3.2"
    
libraryDependencies <++= scalaBinaryVersion { bv =>
  bv match {
    case "2.10" => {
      Seq()
    }
    case _ => {
      Seq("org.scala-lang.plugins" % s"scala-continuations-library_$bv" % "1.0.1")
    }
  }
}

libraryDependencies <+= scalaVersion { sv =>
  if (sv.startsWith("2.10.")) {
    compilerPlugin("org.scala-lang.plugins" % "continuations" % sv)
  } else {
    compilerPlugin("org.scala-lang.plugins" % s"scala-continuations-plugin_$sv" % "1.0.1")
  }
}

scalacOptions += "-P:continuations:enable"

Requirement

Memcontinuationed requires Scala 2.10.x or 2.11.x, and JRE 7.

Links