finagle / finagle-zookeeper   6.24.0

Apache License 2.0 GitHub

Native non-blocking client for ZooKeeper with Finagle

Scala versions: 2.11 2.10

Finagle-ZooKeeper Build Status

finagle-zookeeper provides basic tools to communicate with a Zookeeper server asynchronously.

Note: this is a Google Summer of Code 2014 project, mentored by Twitter, see the mailing list for more details.

Client
  • Client is based on Finagle 6 client model.
Request

Every request returns a twitter.util.Future (see Effective Scala, Finagle documentation and Scaladoc)

Client creation ( more details here )
  val client = ZooKeeper.newRichClient("127.0.0.1:2181,10.0.0.10:2181,192.168.1.1:2181")
  • 127.0.0.1:2181,10.0.0.10:2181,192.168.1.1:2181 is a String representing the server list, separated by a comma
Connect the client
val connect = client.connect
connect onSuccess { _ =>
  logger.info("Connected to zookeeper server: " + client.adress)
} onFailure { exc =>
  logger.severe("Connect Error")
}
Disconnect the client
client.disconnect()

Return value Future[Unit]. It will stop all background operations related to features (preventive search, link checker), will clean everything and disconnect from the server.

Close the service
client.close()

Return value Future[Unit]. It will definitely close the service and stop background jobs. Make sure to use disconnect to close the session correctly before.

First request

Example of request with sequential composition :

val res = for {
  acl <- client.getACL("/zookeeper")
  _ <- client.create("/zookeeper/test", "HELLO".getBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
  _ <- client.exists("/zookeeper/test", true)
  _ <- client.setData("/zookeeper/test", "CHANGE".getBytes, -1)
} yield (acl)
create
client.create("/zookeeper/hello", "HELLO".getBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
  • /zookeeper/hello : the node that you want to create
  • "HELLO".getBytes : the data associated to this node
  • Ids.OPEN_ACL_UNSAFE : default integrated ACL (world:anyone)
  • CreateMode.EPHEMERAL : the creation mode

Return value Future[String] representing the path you have just created

  • CreateMode.PERSISTENT persistent mode
  • CreateMode.EPHEMERAL ephemeral mode
  • CreateMode.PERSISTENT_SEQUENTIAL persistent and sequential mode
  • CreateMode.EPHEMERAL_SEQUENTIAL ephemeral and sequential mode
delete
client.delete("/zookeeper/test", -1)
  • /zookeeper/test : the node that you want to delete
  • -1 : current version of the node (-1 if you don't care)

Return value Future[Unit]

exists
client.exists("/zookeeper/test", false)
  • /zookeeper/test : the node that you want to test
  • false : Boolean if you don't want to set a watch this node

Return value Future[ExistsResponse] ExistsResponse(stat: Option[Stat], watch: Option[Watcher]), watch will be composed of Some(watcher:Watcher) if you previously asked to set a watch on the node, otherwise it will be None.

getACL
client.getACL("/zookeeper")
  • /zookeeper : the node from which you want to retrieve the ACL

Return value Future[GetACLResponse] GetACLResponse(acl: Array[ACL], stat: Stat)

setACL
client.setACL("/zookeeper/test", Ids.OPEN_ACL_UNSAFE, -1)
  • /zookeeper/test : the node that you want to update
  • Ids.OPEN_ACL_UNSAFE : default integrated ACL (world:anyone)
  • -1 : current node's version (-1 if you don't care)

Return value Future[Stat]

getChildren
client.getChildren("/zookeeper", false)
  • /zookeeper : the node that you want to get
  • false : if you don't want to set a watch on this node

Return value Future[GetChildrenResponse] GetChildrenResponse(children: Seq[String], watch: Option[Watcher]) , watch will be composed of Some(watcher:Watcher) if you previously asked to set a watch on the node, otherwise it will be None.

getChildren2
client.getChildren2("/zookeeper", false)
  • /zookeeper : the node that you want to get
  • false : if you don't want to set a watch on this node

Return value Future[GetChildren2Response] GetChildren2Response(children: Seq[String], stat: Stat, watch: Option[Watcher]), watch will be composed of Some(watcher:Watcher) if you previously asked to set a watch on the node, otherwise it will be None.

getData
client.getData("/zookeeper/test", false)
  • /zookeeper/test : the node that you want to get
  • false : if you don't want to set a watch on this node

Return value Future[GetDataResponse] GetDataResponse(data: Array[Byte], stat: Stat, watch: Option[Watcher]), watch will be composed of Some(watcher:Watcher) if you previously asked to set a watch on the node, otherwise it will be None.

setData
client.setData("/zookeeper/test", "CHANGE".getBytes, -1)
  • /zookeeper/test : the node that you want to update
  • "CHANGE".getBytes : the data that you want to set on this node
  • -1 : current node's version (-1 if you don't care)

Return value Future[Stat].

addAuth
client.addAuth("digest", "pat:pass".getBytes)
  • "digest" : the authentication scheme
  • "pat:pass".getBytes : data associated to the scheme

Return value Future[Unit].

sync
client.sync("/zookeeper")
  • /zookeeper : the node that you want to sync

Return value Future[String].

Transaction
val opList = Seq(
  CreateRequest(
    "/zookeeper/hello", "TRANS".getBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL),
  SetDataRequest("/zookeeper/hello", "changing".getBytes, -1),
  DeleteRequest("/zookeeper/hell", -1)
)
val res = client.transaction(opList)

Return value: Future[TransactionResponse]. A TransactionResponse is composed of a responseList, it's a sequence of OpResult. This sequence is ordered in the same order than the original request sequence.

If one of the resquest contained in the TransactionRequest produces an error during the transaction proccessing, all the operations of this transaction are cancelled, and the server will return a TransactionResponse where the sequence of responses is only composed by ErrorResponse.

An ErrorResponse will indicate the cause of the error with its exception value. By reading the sequence of response from the transaction, if the exception is OkException then the corresponding request is not the cause of the failure, you can go throught the list to find the problem (exception will be different of OkException)

A transaction can be composed by one or more OpRequests :

case class CheckVersionRequest(path: String, version: Int)
case class CreateRequest(
  path: String,
  data: Array[Byte],
  aclList: Seq[ACL],
  createMode: Int
  )
case class Create2Request(
  path: String,
  data: Array[Byte],
  aclList: Seq[ACL],
  createMode: Int
  )
case class DeleteRequest(path: String, version: Int)
case class SetDataRequest(
  path: String,
  data: Array[Byte],
  version: Int
  )

Each opRequest will return a response of the same type (expect delete and checkVersion that return an EmptyResponse)