No consensus in exactly-once

Exactly-once semantics is an intriguing, controversial, and for me an exciting topic. I have been dealing with it in the recent past across different systems and one particular issue that got me interested is the connection I have seen a few people make to FLP, the distributed consensus impossibility result [1]. I argue here informally that this connection is not correct, which is not supposed to be a final word on exactly once, but it might shed some light on terminology and properties.

Deriving and understanding impossibility results are as important as deriving distributed algorithms. Impossibility results set the limits of what can and cannot be done. In the case of consensus, the FLP result states that the consensus problem cannot be solved in a purely asynchronous system; an asynchronous system is one with no time bounds on processing and message propagation. Although the proof requires a bit of thinking, the essence is that in an asynchronous system, we cannot really tell whether a process is crashed or just slow to respond. In some situations, under such uncertainty about other processes, we could end up violating agreement among processes if a process takes a step to guarantee termination.

To make any argument about the connection between exactly once and consensus, we need to be able to define both. The definition of consensus using the validity, agreement, termination properties is well know, but what about exactly once? What is supposed to happen exactly once? Is it delivery or something else? Before I get into definitions, I’d like to first explain the context in which I have seen others refer to exactly once.

In the following sections, we cover the following:

  • The context in which exactly once is often brought up and some different ways that it can be interpreted, which can overlap, but are not necessarily equivalent.
  • The (dis)connection between exactly-once delivery and consensus. I argue that exactly-once delivery and consensus are not equivalent and conclude that as such the FLP impossibility result does not apply to exactly-once delivery alone.

A few ways of thinking about exactly-once semantics

The discussion in this section is not supposed to be exhaustive, but it illustrates the context in which I have seen exactly-once brought up recently. I’m not covering remote procedure calls (RPCs) explicitly, although it is a valid example where exactly-once guarantees apply as observed in the work of Spector [2] and Birrell [3].


In a number of scenarios, systems claim to satisfy exactly-once semantics when they mean to say that they avoid duplicates. We can think of an arbitrary data structure over which we want to avoid applying the same transformation multiple times. For example, adding the same element multiple times to a multiset, incrementing a counter multiple times, appending the same message to a log multiple times, etc.

To avoid duplicate transformations, we can keep a history of all requests that have been submitted and only apply a transformation in the case the data structure does not reflect the transformation.  In some cases, we can use sequence numbers to spot duplicates and reject them rather than keep a history of transformations around. For example, if we are implementing a distributed log, then it is desirable not to have a second log around for each log instance.

With such data structures, the property we want is idempotence as we want to avoid transformations being executed multiple times. Idempotence makes our data structure at-most-once, but what about the at-least-once part. There is nothing really we can do to make a strong guarantee about it, unless we are willing to block indefinitely to avoid a violation of safety, which reminds me of the FLP proof in which we violate agreement because a process takes a decision step in a bivalent configuration to satisfy termination. Retransmitting typically has it executed at least once, but in some hopefully rare scenarios, we need to either give up and move on, or error permanently.

Note that the argument up to this point has nothing to do with how many times the data of the structure is read. For example, if I have a distributed log that is deduplicated, there is nothing that prevents me from reading or delivering the items in the  log multiples times. The scheme only makes sure that the log has no duplicates. To avoid multiple deliveries or duplicates downstream, then we need something else.


Delivery appears typically in the context of message-passing systems. A process p sends a message, while a process q delivers it. Delivering a message m means that q receives the message and processes it fully with some procedure f(m), which is part of some application.

In the presence of crashes and assuming that the execution of f is stateful, the subsystem that calls f has no way to know for sure whether an invocation of has completed successfully or not. The procedure f needs to either be idempotent or be able to roll back to guarantee that the result of processing m is not reflected multiple times in the state of f.

One of the issues with guaranteeing that f is executed in an idempotent manner is to make sure that reprocessing a message upon recovery is not reflected twice in the application state. To avoid such a problem, the application can recover from a snapshot of the state and request that the delivery subsystem resumes from the message immediately after the last transformation in the snapshot. This technique is used in the implementation of replicated state machines, where the replicated log is used to replay transformations over the latest snapshot. Outside recovery, duplicates can be identified via sequence numbers or other form of unique identifiers.

The bottom line is that the messaging subsystem alone, be it a pub-sub, a message bus, or something else, cannot guarantee that delivery of a message happens exactly once or that it is reflected in the state of procedure f in an idempotent manner without application assistance. Application here refers to the implementation of the procedure f, which is outside the scope of the messaging subsystem.


For applications that have complex transformations, idempotence alone does not provide a strong guarantee. For example, if a transformation out of processing a message m implies two separate changes, s1 and s2, then it is possible that a crash hits the process right between s1 and s2. Reprocessing m implies that s1  is going to be duplicated, undesirably. At the same time, we need to process m again because the s2 change is missing. Consequently, we need the ability to roll back or abort partial changes. We could also make the application of these changes idempotent so that s1 is not reflected twice, but there are at least two reasons to consider transactions in the presence of such complex transformations:

  1. The computation is non-deterministic and there is a temporal dependency between s1 and s2  so we need to recompute s1;
  2. The application does not want to expose partial results, so we either make it all visible or nothing (atomicity).

Non-determinism may arise for a number of reasons, e.g., the input of the computation changes over time, the computation depends on some external source that changes over time, etc. There is some illustration and more insight around this problem in this slide deck from Strata London 2016.

This use of transactions can be applied both in the case of data structures above or with the procedure in the discussion around delivery, assuming the application is able to accommodate such a transactional processing.

A word on the distributed consensus problem

Let’s switch gears and talk about exactly-once and the distributed consensus problem. To make sure that this text is accessible even to the ones who do not know consensus intimately, here we discuss briefly what consensus is about and give some references.

The consensus problem consists of a set of processes, each one with an initial value, such that each proposes its own value and they want to decide on a single one of those values. For example, if three processes p1, p2, p3 propose 0, 0, 0, respectively, then the only possible decision value is 0. If instead they propose 0, 0, 1, then both 0 and 1 are possible decision values.

Consensus is at the core of replicated state machines, which is a technique for implementing fault-tolerant services [4]. To implement such a service, we have replicas of a service executing the same requests in the same order. To guarantee that the replicas execute the same requests, in the same order, we implement a sequence of consensus instances. In each position of the sequence, we decide which requests go into out of the all the ones being proposed via an execution of consensus. This sequence of consensus instances is in rough terms what we know as atomic broadcast.

One known (meta-)protocol for implementing replicated state machines is Paxos [5]. Paxos per se does not give us consensus directly as defined above, but it is relatively straightforward to go from the protocol to implementing consensus with it, once we understand the protocol. There is a learning curve in understanding the protocol, though. Other protocols have been proposed for implementing replicated state-machines based or inspired on Paxos, e.g., Raft [6] and Zab [7].

In some more detail, consensus is defined with three properties:

  • Agreement: If processes p, q ∈ Π are both correct and p decides v, then q also decides v
  • Termination: If process p ∈ Π is correct, then p eventually decides some value v
  • Validity: If a process p decides v, then v is the initial value of some process in Π

where Π is the set of processes. The atomic broadcast protocol is defined as a type of reliable broadcast [8, 9]:

  • RB-Validity. If a correct process broadcasts a message m, then it eventually R-delivers m.
  • RB-Agreement. If a correct process delivers a message m, then all correct processes eventually deliver m.
  • RB-Uniformity. For any message m, every process delivers m at most once, and only if m was previously R-broadcast by sender(m).

that additionally satisfies total order:

  • AB-Total order. If two correct processes p and q deliver two messages m and m’, then p delivers m before m’ if and only if q delivers m before m’.

The literature on consensus and broadcast protocols is vast. The references at the end of this post give some direction and more detail on what is being discussed here.

Exactly once and Consensus: Equivalent?

Let’s now compare consensus and exactly-once delivery. Consensus is known to be equivalent to atomic broadcast [8]. Atomic broadcast guarantees agreement on the set of messages delivered, but also guarantees that all processes deliver messages in the same order. Wait, I said order… Does exactly-once say anything about order?  The exactly-once property does not imply in principle order. Instead, the property implies delivery or transformation happens once and no more than once, the order of delivery or execution does not matter. Perhaps order matters for the application, but I am focusing on the exactly-once property, not the overall set of properties the application requires.

Assuming my observation about order is correct, atomic broadcast and exactly-once delivery cannot be equivalent. If exactly-once delivery and atomic broadcast are not equivalent, then exactly-once delivery and consensus cannot be equivalent either.

Exactly-once delivery could be equivalent to reliable broadcast, which has no order guarantee, is a weaker problem compared to atomic broadcast and not surprisingly is solvable in asynchronous systems. However, because exactly-once delivery implies a property about a send-receive interaction, not necessarily about a broadcast, a more appropriate abstraction is the one of a reliable channel, which guarantees that messages are eventually delivered, and not lost of duplicated [10].

A note on FLP: The FLP result argues and shows that in an asynchronous system, we cannot really tell whether a process is crashed or just slow to respond. In some situations, under such uncertainty about other processes, we could end up violating the agreement property  if a process takes a step to guarantee termination. In practice, there are situations that can cause a system to block indefinitely, but we never violate agreement. There is progress in the case the system is live, and liveness is defined according to the protocol used.

One interesting observation with respect to FLP is that the proof assumes that messages are delivered eventually in the absence of a crash and exactly once. It assumes that messages can be reordered, but the delivery happens eventually, assuming again that the receiver process does not crash. Interestingly, such a channel, that would be sufficient to satisfy exactly-once delivery, is not sufficient to enable consensus in asynchronous systems as the proof shows. Note that the fault model in the FLP proof does not consider recovery, which is critical for practical purposes. However, it does not lose generality as the scenarios under the crash model can also happen under the crash-recovery model.

Exactly once and consensus: The origin?

One relevant question to ask is where this perception that exactly-once delivery and consensus are equivalent problems comes from. I conjecture that it is due to the ability of replicated state machines of having a consistent state despite crashes and to deduplicate using the state of the state machine.

Replicated state machines typically replicate a log. Such a log is used to persist and order the state transitions of the state machine. When restarting a  replica, it is sufficient to replay the transitions in the log (we often use snapshots to avoid replaying from scratch).  With such a mechanism, even if we restart a replica after a crash, we end up with a consistent state because the state transitions will be the same every time, and the log is replicated in a totally-ordered manner. The log and the state itself can also be used to determine what transitions or operations have been processed to avoid duplicates during the broadcast phase. For example, a transition with a given identifier such that the identifier is already in the log is not processed again.

What’s the relation to consensus? As mentioned above, consensus is at the core of replicated state machines. Consequently, this mechanism argues that consensus is sufficient, but not that it is necessary, which I have actually argued above that it isn’t the case.

Bottom line

Exactly-once intuitively means that something happens once and only once. In the context of some current systems, however, the term implies that something that happens multiple times is effective only once. Multiple applications of a transformation or message delivery only affect the state of a given application or system once.

Two core concepts to achieve this property are idempotence and transactions. With idempotence, multiple executions of the same given transformation do not cause the state to diverge from the correct one as the transformation is reflected by definition only once. With transactions, aborts have the ability of rolling back partial updates and, as such, they enable an arbitrary number of retries. Idempotence and transactions are not interchangeable, as we briefly pointed out here, but they enable the mechanisms to apply some transformation multiple times without causing state (e.g., of a system, of a data structure, of an application) to diverge from the correct one.

One point often raised in the discussion of exactly-once delivery is its connection to consensus and the FLP impossibility result. I have argued here that exactly-once delivery and consensus are not equivalent, and in fact, suggested that it is a weaker problem compared to consensus primarily because it does not require order of delivery. The treatment of the subject was by far not rigorous, but I have cited results in the literature to support the argument. If it holds, the relevance of this result is that it implies that implementing exactly-once delivery is actually easier than implementing consensus in the absence of other strong requirements such as total order.


Thanks to my colleagues Gregory Chockler, Stephan Ewen, Bhargav Gulavani, and Sean T. Allen for comments that helped me shape the ideas in this post.


[1] Michael J. Fischer, Nancy A. Lynch, and Michael S. Paterson. 1985. Impossibility of distributed consensus with one faulty process. J. ACM 32, 2 (April 1985), 374-382.

[2] Alfred Z. Spector. 1982. Performing remote operations efficiently on a local computer network. Commun. ACM 25, 4 (April 1982), 246-260.

[3] Andrew D. Birrell and Bruce Jay Nelson. 1984. Implementing remote procedure calls. ACM Trans. Comput. Syst. 2, 1 (February 1984), 39-59.

[4] Fred B. Schneider. 1990. Implementing fault-tolerant services using the state machine approach: a tutorial. ACM Comput. Surv. 22, 4 (December 1990), 299-319.

[5] Lamport, L. (2001). Paxos made simple. ACM Sigact News, 32(4), 18—25.

[6] Diego Ongaro and John Ousterhout. 2014. In search of an understandable consensus algorithm. In Proceedings of the 2014 USENIX conference on USENIX Annual Technical Conference (USENIX ATC’14), Garth Gibson and Nickolai Zeldovich (Eds.). USENIX Association, Berkeley, CA, USA, 305-320.

[7] Flavio P. Junqueira, Benjamin C. Reed, and Marco Serafini. 2011. Zab: High-performance broadcast for primary-backup systems. In Proceedings of the 2011 IEEE/IFIP 41st International Conference on Dependable Systems&Networks (DSN ’11). IEEE Computer Society, Washington, DC, USA, 245-256.

[8] Tushar Deepak Chandra and Sam Toueg. 1996. Unreliable failure detectors for reliable distributed systems. J. ACM 43, 2 (March 1996), 225-267.

[9] Romain Boichat and Rachid Guerraoui. 2005. Reliable and total order broadcast in the crash-recovery model. J. Parallel Distrib. Comput. 65, 4 (April 2005), 397-413.

[10] Anindya Basu, Bernadette Charron-Bost, and Sam Toueg. 1996. Solving Problems in the Presence of Process Crashes and Lossy Links. Technical Report. Cornell University, Ithaca, NY, USA.

Note on fencing and distributed locks

This blog post from Martin Kleppmann triggered this note. That blog post discusses an issue with locks in Redis and argues that a solution to avoid the issue of depending on timing is to use a combination of distributed locks with ZooKeeper and fencing. This argument caused some confusion and I wanted to address it here. The following text covers some background on ZooKeeper and distributed locks. It discusses the issue with using ZooKeeper locks alone and how to address it with fencing.

ZooKeeper locks

Apache ZooKeeper is a replicated coordination service. It has a set of clients and a replica set (ensemble) that serves requests from the clients. A simple way to implement a lock with ZooKeeper is to create a znode, say /lock. If a /lock znode exists, then any other client that attempts to create it will fail. We also make this znode ephemeral. Being ephemeral means that if the client that created the znode crashes, then the znode will be automatically deleted. That’s the basics of ephemerals, but it is bit more subtle than that, though. Any ZooKeeper client needs to establish a session to the ZooKeeper ensemble before submitting any request. The session has a timeout associated to it: if the ensemble leader doesn’t hear from the client within the timeout period, then it expires the session. Expiring the session causes all ephemeral nodes of the session to be deleted.

Let’s now consider the following sequence of events:

  1. Client C1 creates /lock
  2. Client C1 goes into a long GC pause
  3. The session of C1 expires
  4. Client C2 creates /lock and becomes the new lock holder
  5. Client C1 comes back from the GC pause and it still thinks it holds the lock

At step 5, we have both clients C1 and C2 thinking they hold the lock… Trouble ahead!

Getting shared resources involved

The situation above illustrates that just acquiring a lock via ZooKeeper is not sufficient if you don’t want to trust the session timeout. As the post by Martin Kleppmann very nicely points out, there are many things that can cause your client to stop for arbitrarily long and it is possible to end up with multiple processes holding the lock. However, this is not entirely true if acquiring the lock also implies that the shared resource protected by the lock needs to be involved. Say that every time a client acquires a lock to exclusively access a resource, it goes to the resource and before anything else it marks the resource in such a way that clients that acquired the lock previously cannot access the resource. In the scenario above, client C1 thinks that it still holds the lock, but when it tries to access the shared resource, it fails because it has an earlier mark from C2.

Let’s see a couple of examples of how “marking” a resource works. An obvious candidate is ZooKeeper itself. Say that the lock is used to elect a master and the master needs to update other znodes in ZooKeeper. By step 5 of the scenario, the session of client C1 has expired and it cannot update the ZooKeeper state. The ZooKeeper ensemble fails any operation for an expired session. Note that this argument assumes that a new session is not being created under the hood, otherwise C1 would be able to update the ZooKeeper state using a new session, which violates safety.

A more interesting example involves a different shared resource, say a data store, like in Martin’s post. Before performing any writes, a writer needs to acquire the lock via ZooKeeper and use an epoch number to fence off previous writers. If the epoch number is unique and strictly increasing, then the data store can fail requests from previous writers. The writer can optimistically perform a write to the data store with a new epoch number and the write will succeed only if the epoch number in the request is greater or equal to the one the data store has.

Distributed resources

But wait, what if the data store of the previous example is distributed? What kind of guarantees can we have? The simplest way of doing this is to make sure the writer updates the epoch across all nodes before performing any operation. Upon obtaining a new epoch number, a writer sends a request to update the epoch to all nodes in the data store it needs to access. To simplify the discussion, let’s say that it is a small replicated data store and a writer writes to all nodes. If the writer does it, then no writer with an older epoch is able to perform requests successfully.

The main issue with this approach is that a static system like that is not fault tolerant, any crash of a data store node prevents progress, which is bad. What systems like Apache Kafka and Apache BookKeeper have done to make groups fault tolerant is to rely on ZooKeeper to coordinate the reconfiguration of replica groups. If a node is crashed or simply partitioned away, then it is removed from the current replica group and the new group is agreed upon through ZooKeeper. ZooKeeper here serves as a sequencer for the the group reconfigurations by totally ordering these changes, and it also detects crashes of nodes with ephemeral znodes. This scheme of having a master implementing a Paxos-like protocol to perform reconfigurations of replica groups is the essence of the Vertical Paxos work [1]. The concept of ballot in Vertical Paxos is pretty much the epoch we discuss here. It also relies on process groups, reconfigurations of such groups, and message broadcasts for replication like in Virtual Synchrony [2].

Note that Kafka uses such distributed locks internally. A writer is an elected broker, not a Kafka client. The leader of a replica group is the designated broker that receives and processes all requests to read and write to a topic-partition. Each leader has an associated subset of in-sync replicas (ISR),  and each message produced into Kafka is only declared committed once the ISR, including the leader, acknowledges the message. If there is any change to the ISR, including a leader change, then the new ISR needs to be persisted in ZooKeeper. ZooKeeper guarantees that the changes to the ISR are totally ordered and consecutive ISRs must overlap in at least one broker.

While in Kafka a replica group can have multiple leaders over time and make use of epochs, the single ledger writer in BookKeeper performs the role of leader. The fencing in BookKeeper is much simpler because there can be at most one writer adding new records to a ledger. The only time a new writer is needed for a ledger is when the client writer crashes or is suspected to have crashed.

How to obtain an epoch number?

A simple way to obtain an epoch number to use with the scheme described above is through cversion in ZooKeeper. For example, if the lock znode is /lock, then the cversion of / strictly increases with the number of children. Consequently, every time the /lock znode is created, the version is incremented. Incrementing a value and conditionally updating a znode with that value is also a valid option.


Using locks for mutual exclusion in distributed systems is tricky. When using a lock service, it is often not sufficient to rely on the leases and session schemes these services offer because they depend on timing, leaving shared resources unprotected without any additional mechanism. Here we discussed the use of epochs for fencing resources. The idea is general idea is to make sure the shared resource is consistent by preventing old writers from coming back and messing with the state. It might not always be possible to introduce such epochs with legacy systems, but we do have examples of systems that make use of this scheme.


Thanks to Martin Kleppmann and Eno Thereska for the feedback.


[1] Leslie Lamport, Dahlia Malkhi, and Lidong Zhou. 2009. Vertical paxos and primary-backup replication. In Proceedings of the 28th ACM symposium on Principles of distributed computing (PODC ’09). ACM, New York, NY, USA, 312-313.

[2] K. Birman and T. Joseph. 1987. Exploiting virtual synchrony in distributed systems. SIGOPS Oper. Syst. Rev. 21, 5 (November 1987), 123-138.


Keep moving forward: Liveness in distributed systems

In distributed computing jargon, properties are classified as either safety or liveness properties [1, 2]. Consistency is a typical safety property: the state of the system is never inconsistent for some definition of consistent. Of course, “never inconsistent” assumes an ideal world in which us developers do everything right in the code, which history has shown is rarely the case. There are other good examples of safety properties, like the agreement property for consensus protocols, which says that two processes do not decide differently. Interestingly, in replicated systems, we obtain consistent replicas by guaranteeing that they agree on what is being executed and applied.

But, it isn’t all about safety, is it? We also need to make progress. Being safe by procrastinating is rarely an ideal situation. In systems like ZooKeeper, it is very important to be able to elect a leader. If there is no leader, then the ensemble can’t update its state. In systems like BookKeeper, if ZooKeeper is not available, we can’t create or open a ledger. At this point, let me highlight the difference between safety and liveness. If a ZooKeeper ensemble can’t elect a leader, then it can’t process new updates, but the state is still consistent because either the previous leaders have done the right thing or the state is empty. Consequently, we can have a system that is in a safe state but isn’t live. In fact, the issue of safety and liveness for systems implementing consensus at their core is pretty delicate and fundamental. According to the FLP result [3], we cannot make sure that we have liveness without actually risking a violation of safety in a system that can’t differentiate a crashed server from a slow server. Being perceived as slow can be a side-effect of having some competing load running on the same machine, garbage-collection stalls, network load, etc.

I want to make the argument about liveness a bit more concrete, though. In particular, I want to point out that many parameters in distributed systems could lead to liveness issues inadvertently, and I’m going to use the initLimit parameter of ZooKeeper to illustrate the problem.

What does initLimit do?

The initLimit parameter is there exactly because of liveness. When a new leader is elected, the elected leader (we call it prospective leader regularly) needs to bring the ensemble up to speed before it can start serving new requests. Say that the prospective leader drops dead in the middle of the synchronization with the followers, before it even has a chance to start leading. It makes sense that the followers drop the leader and go elect a new one, and initLimit is supposed to bound the amount of time the servers expect this synchronization phase to take. If it takes longer, then the followers drop the leader and go back to leader election. The leader can also drop followers if it notices that they are falling behind.

But wait, what if the leader is taking long legitimately? What if snapshots are large and it really needs more time than initLimit is giving to synchronize? For example, say that initLimit is 10 and tick time is 2s. The total time the leader has to synchronize is 10 x 2 = 20s. Say the state is 2GB worth of data and that we can pull a snapshot out of the disk at a rate of 50 megabytes per second (this is just for illustration purposes). The leader needs 40s only to pull data out of the disk. In this case, we just need to increase the value of initLimit, right? Increasing the value of initLimit solves the problem temporarily. It is not unlikely that the value of initLimit will be changed again in the future as the amount of data grows. Of course, there are brute force solutions, like:

  • Making the maximum heap size small so that we hit an out of memory error before the initLimit value gives us a false positive;
  • Making the value of initLimit very large, which is likely to work, but it’ll also make initLimit ineffective.

We could also consider having the leader reporting progress periodically with pings, but even if a follower is receiving pings periodically, for how long should it keep receiving these reports? What if the leader is late on sending the reports? How long should a follower wait even for the pings? We still need some estimate of time and cannot really get rid of a time parameter like initLimit.

A solution

A solution is to calibrate initLimit dynamically. The general idea is to estimate the number of bytes the leader needs to send to followers in the case it needs to send a snapshot. Recall that a leader typically sends only a diff of transactions, but in some cases it might need to send a whole snapshot of the state if the follower is lagging behind. If we know the rate at which a leader is supposed to process bytes and send them to followers, we can estimate a value for initLimit by dividing the state size by such rate. There are three points that are complicated here, however. First, how do we even determine what such a byte rate is? Network speed is a good estimate, but there is the processing of the leader and background network traffic as well, so it is unlikely that the leader is able to send at line speed. Second, the leader needs to be able to synchronize with a quorum before it starts. The time to coordinate with a quorum must be taken into account somehow. Third, the followers need to know the value of initLimit so that they can apply locally. Currently, initLimit is set via configuration and servers do not exchange their values. Note that the value at each server can be set differently.

Stepping back for a moment, the goal is to set initLimit to a value that is fair for a prospective leader. It is fair in the sense that it should be enough time for the leader to synchronize with a quorum and to not be perceived as being slow because of a large state. The consequence of not calibrating or setting the value of initLimit accurately is either false positives (aggressive) or false negatives (conservative). The simplest way to do this calibration is to check the size of the latest valid snapshot on disk and divide it by some conservative constant for the byte rate. For example, we can assume that a leader should be able to process bytes at a rate of 100 Mbits/s, which is on the low side for the current technology, but gives us a lower bound on the amount of work a prospective leader should be able to perform without being perceived as slow. The drawbacks of such a solution? There are a couple:

  1. It is hard to tell if the specified byte rate is adequate. Making it too conservative will make a ZooKeeper ensemble wait a longer time to recover from a dead or slow leader. If the byte rate is set too large, then a prospective leader might not be able to synchronize with a quorum. The original initLimit parameter was designed to be conservative, however. A conservative estimate is ok for many cases, since it enables progress in scenarios in which a static initLimit value doesn’t.
  2. When a leader starts, its latest snapshot includes all transactions in its transaction log. Consequently, looking at just the snapshot is pretty accurate. After the leader becomes established, the leader might have proposed/accepted some transactions that are not reflected in the latest snapshot. If a follower tries to synchronize with the leader at this point and the leader estimates initLimit using only the snapshot size, then the estimate is not accurate because it does not include transactions committed after the snapshot was taken. For practical purposes, the difference should not be large unless the recent transactions are mostly large writes.

To improve the estimate, we can consequently do two things: estimate the byte rate instead of setting it manually and improve the state size estimate.

Back to liveness (or absence of)

Although I’ve focused quite a bit on the initLimit parameter, the core goal of this post is to bring some attention to liveness in distributed systems. There is a lot of focus on safety (for obvious reasons), but I’ve seen little discussion about liveness. There is of course the A of CAP which is about liveness [4], but the CAP availability refers to the ability of processing requests according to failures and network partitions. Here I refer to scenarios in which the system is not live and yet there is no failure or network partition.

The absence of liveness comes from a parameter being set to a small value compared to what the value should really be. The key observation is that some parameters related to timing might lead to liveness issues.  A potentially better solution compared to just setting a fixed value is to calibrate such parameters dynamically, but doing it is not entirely trivial. It requires estimates and some guesswork. For initLimit specifically, there is a jira open for this particular issue (ZOKEEPER-1977) and we at Microsoft have done some preliminary work to get it adjusted dynamically. We should be posting a patch some time soon.


Thanks to Martin Kleppman and Raúl Gutiérrez Segalés for comments on a draft of this post. I’d like to also acknowledge the contributions of my Microsoft buddies Ian Dimayuga and Greg Foxman on the initLimit work and thank them for discussions and reviews. Keep rocking, guys!


[1] Lamport, L., “Proving the Correctness of Multiprocess Programs“, IEEE Transactions on Software Engineering, Volume SE-3, Issue 2, pp. 125-143, March 1977.

[2] Alpern, B. and Schneider, F., “Recognizing safety and liveness“, Distributed Computing, Volume 2, Issue 3, pp. 117-126, 1987.

[3] Fischer, M., Lynch, N., and Paterson, M., “Impossibility of Consensus with One Faulty Process“, Journal of the ACM, Volume 32, Issue 2, pp. 374-382, April 1985.

[4] Gilbert, S. and Lynch, N., “Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services“, ACM SIGACT News, Volume 33, Issue 2, June 2002.

Dude, where’s my metadata?

This post is about silent data loss in replicated systems (state-machine replication a la ZooKeeper) due to the disk state being wiped out. The disk state is crucial in such systems to guarantee that replicated data isn’t lost in the case a server crashes and recovers. The replication protocol in ZooKeeper assumes that servers can crash and recover, and the ensemble can make progress as long as f + 1 members are up and running, where f is a bound on the number of faulty servers determined by the size of the ensemble. If the ensemble has 5 servers, then the ensemble can make progress as long as 3 are up (f = 2). It additionally assumes that the disk state before a server crash is there during recovery. This post is pointing out that this isn’t always the case, and it is an issue to be aware of. Note that this isn’t unique to ZooKeeper, but I’ll focus on ZooKeeper because I know it well.

The disk state can be wiped out for various reasons like faulty drives, misconfiguration, starting a server replica in a different machine virtual or not. Losing disk state might be silent at times for due to software, hardware, or operator errors. This issue has come up occasionally in the community, but to my knowledge hasn’t been documented or discussed extensively.

Here is the problem that losing disk state really induces. Say we have an ensemble of ZooKeeper servers comprising 3 servers: A B C. A leads and B follows; C is dormant. Now say that A and B commit a transaction, say 0x1 – “create znode /foo”. Recall from ZooKeeper 101 that before committing a transaction, a quorum of servers needs to persist it to disk by writing to the transaction log.

It all looks good until say A crashes and its disk state is gone. Now A wakes up and it has no memory of committing 0x1, so it thinks it is starting from scratch. A next forms a quorum with C, and C hasn’t heard anything about 0x1. A and C elect A to lead and declare the empty state to be the new state. In the meanwhile, B is clueless waiting to talk to other servers to form a quorum. When it does, it learns that the new state doesn’t contain 0x1. Transaction 0x1 is now lost forever…

This figure illustrates the execution I just described:


Getting around it: Increase replication

Fundamentally, this is a problem with the assumption that servers only crash in the case they are faulty. Losing this state looks more like an arbitrary fault (or a Byzantine fault) and could be handled as such. If instead of using simple majorities as quorums (more than half of the replicas) we use super majorities (more than 2/3 of the replicas), then we can guarantee that we have enough replicas. To see why it works, convince yourself that the smallest intersection of any two simple majorities has one server, while for super majorities we have always f + 1 in any intersection.

Having a single server in the intersection between quorums causes the problem above because the single server can crash, lose its disk state due to a faulty disk, and recover memoryless. Having f+1 servers in the intersection guarantees that as long as no more than f servers get a faulty disk simultaneously, any intersection between two quorums will always contain at least one server that hasn’t lost its disk state and the ensemble can recover the state it has previously replicated through that server in the interesection.

Using super majorities has one important drawback, which is requiring 3f + 1 servers to tolerate f faulty servers. To tolerate 2 faulty servers, we now need 7 servers rather than 5. Consequently, using 7 servers with super majorities, we can only tolerate 2 servers crashing rather than 3 servers as we have with simple majorities. That’s not great, so can we deal with this problem of silent data loss without having to resort to super-majorities?

Second cut: let the operator decide

An even simpler approach is to not restart servers automatically and perform a sanity check over the disk data before restarting the server. In the case the server does lose its disk state, it could be removed and reintroduced to the ensemble via reconfiguration. The reconfiguration takes care of rebuilding the state onto the server recovering so that it can start making progress with the rest of the ensemble.

Note that for the reconfiguration to work the ensemble must have enough replicas up to be able to make progress without the faulty server. In the example above, A can’t be reintroduced if B doesn’t start talking to C again. Getting B back might require an operator to intervene. But, what if B has also lost disk state? In this case, we have violated our f bound, which is f = 1 for the above example, and the ensemble is stuck, it can’t reconfigure A and B back in.

Third cut: using a token

The assumption that no more than f servers can be faulty and recovering simultaneously is obviously fine, but we have asked the question of whether we could automate even further by enabling progress when we have too many faulty servers, even if some data is lost in some rare cases.

One way is to use a token, a database identifier. Such identifiers have been used extensively in commercial databases like Oracle. Oracle has a create database command that generates a dbid. We would like the token generation to be automated instead of relying a command issued, however.

As with Oracle databases, the token represents an instance of the ZooKeeper database, and a server can only join an ensemble in the case it has the same token as everyone else. The servers consequently need to agree on a common token. Servers need to verify the token upon a handshake with the leader, possibly even during leader election, to make sure they refer to the same instance. They should also enable clients to determine if they are talking to the right incarnation.

What makes the token agreement problem interesting is that we need to reach agreement to reach agreement: Zab gives us agreement but we need to agree upon a token to tolerate disk failures. Instead of trying to solve the problem recursively, we simplify the solution by fixing a token distributor and requiring unanimity. A chosen server, not elected dynamically, is responsible for distributing new tokens, and it does it in a 2PC manner, requiring all other servers to acknowledge and commit.

As long as a majority of servers keeps the same token there is no need to generate a new one. In the case any server loses its disk, it recovers by adopting the token provided by a majority.  If no majority holds a token, then the token distributor does its job and generates a new one. Given that it requires unanimity, the token generator should wait until a majority explicitly declares not to have a token. Otherwise, because of timing, it could end up generating new tokens and instantiating the database unnecessarily.

An invariant of the system is that the token a message of the replication protocol carries always matches the token of the receiver, except for the case in which it doesn’t have a token and it is recovering. Servers not bearing a token shouldn’t participate in the protocol.

Back to the initial example, how does this token scheme solve the problem? When A comes back up and connects to C, it won’t have a token so A and C won’t be able to form a quorum and get going with processing requests. Server A needs to wait for B to say what its token is so that it can make progress. Now that A knows that there is a quorum with the same token, it can adopt the token by pulling a copy of the state of B. Why B? Because B has the latest state (latest epoch/zxid).

Let’s be upfront about the weak points of this scheme. First, it isn’t great to have a fixed distributor, but trying to elect one would complicate things and would essentially be a “solving consensus to solve consensus” kind of direction that wouldn’t work. Second, unanimity leaves no room for masking crashes. If any process is unavailable, then the token distribution protocol can’t terminate as described. Both assumptions are OK in the case new tokens aren’t distributed often, though. In fact, it really shouldn’t happen often because a new token represents a new instance of the database, which possibly means lost data. We might be able to copy some data from the previous instance, but there is no guarantee that it will contain everything that has been committed.

The history of the database id (dbid) in ZooKeeper actually dates back to the early days of the project [1]. We haven’t finished incorporating the dbid although the field is present in the persistent state of ZooKeeper. We recently started talking about this feature in the context of virtualization because servers might be restarted in a different host without carrying the disk state, in which case we wanted to avoid the problem mentioned here. Reconfiguration is also a topic I’ll leave for a future post. It affects the unanimity assumption because the set of servers could change dynamically.

Any conclusion?

Reconfiguration already exists in the 3.5 branch of ZooKeeper, so the option of reconfiguring a server in is viable. I also don’t see a strong reason for not automating this process of checking the disk state and reconfiguring if necessary, it sounds doable. The token scheme provides an additional notch of automation, but it will take some effort to implement. The use of super majorities was supposed to be simple with quorum verifiers, but when I tried to implement a verifier for super majorities, I ran into all sort of problems because of the dependency on simple majorities across the code. It sounds like the initial effort to abstract away the type of quorum system we use wasn’t really continued. I haven’t put much effort into getting this one to work, but it shouldn’t be too hard to fix it. I expect it to require some refactoring.

Acknowledgments: The token distribution scheme mentioned here has been discussed and developed with Johannes Klein and Satish Thatte. Thanks to Camille Fournier, Martin Kleppman, Patrick Hunt, and Bill Bridge for comments on a draft version of this post. It has changed quite a bit due to the comments.


[1] Apache ZooKeeper Jira – Unique DB identifiers for servers and clients

So many ways of replicating…

This post is about the replication scheme we use in Apache BookKeeper. It is mostly about why we did it this way rather than how to use it. There are other sources of information about how to use BookKeeper.

When we started thinking about the design of BookKeeper and the replication scheme, we had some experience with the one we had used for ZooKeeper (its older sibling). ZooKeeper implements a replicated state machine using the Zab protocol, which enables replicas to agree on the changes to the ZooKeeper service. This agreement is not only about the changes themselves, but also about the order, so we need both agreement on the changes to the ZooKeeper state (znodes and sessions to be more concrete) and the order in which they are applied to the distinct replicas.

The protocol in the common case (ignoring leader election and changes of the leader) is fairly simple and looks like this:


A leader proposes a new state change, the followers persist to the transaction log and acknowledge it back. Upon receiving acknowledgements from enough replicas (a quorum out of followers and leader), the leader commits by sending out commit messages. Once a replica gets a commit, it applies the state change (e.g., a new znode is created) and it is ready to serve it (e.g., a getData for that znode).

One interesting observation out of the figure above is that the leader needs to propagate that commit message because each replica needs to learn that the state change has been replicated across a quorum of replicas before it starts serving it. It doesn’t matter what the quorum actually is, just that there are enough replicas out there.  The replicas need to learn about correctly replicated state changes because clients of the service can read the state of ZooKeeper from any server, and all healthy servers need to be ready to respond to read requests.

A couple of details to note, which are not that important for this discussion, but I’ll add here for completeness. Requests to change the state of ZooKeeper can be really submitted directly to the leader or to any follower. If it hits a follower, then the follower forwards it to the leader. The second point is about the commit step. Instead of having a commit step in which followers only send back to the leader, we could have servers sending an acknowledgement to everyone else, in which case servers would equally learn that a state change has been correctly replicated. One disadvantage of this option is that it increases the message complexity. If my math is not wrong, then with 5 servers we have 20 messages total for this 2-step approach compared to 12 messages with the 3-step approach we use. The other disadvantage is that everyone needs to talk to everyone else in the 2-step approach, while in the 3-step approach we only need leader <-> follower communication; it simplifies the implementation.

With this replication scheme in mind, let’s now see what we have in BookKeeper. BookKeeper stores and replicates ledgers. Think of ledgers as transaction logs: they can only be appended to, they are in the critical path of apps, and they need to be safely stored. For each ledger, there is just a single writer client and the writer directly replicates the entries (byte arrays) across storage nodes we call bookies. There is no leader like in ZooKeeper, but the writer behaves somewhat as a leader, so we have essentially one leader per ledger.

Because the writer is the only one that needs to learn that an entry has been correctly replicated, we have a similar pattern as the one we had for ZooKeeper, except that we can completely eliminate the commit step:


Even cooler, for the same reason, we don’t have to add entries always to the same set of bookies. We can spread entries across different sets of bookies like this:


which gives us parallelism when adding entries.

Ok, it’s cool that we can eliminate a step, but now there is a handicap. We write stuff into the bookies, but we might want to retrieve that data eventually, right? Since the bookies don’t know whether a record has been correctly replicated, we need to read them from multiple bookies.

Reading from multiple bookies is necessary, but not sufficient to give us a consistent view into a ledger. Say we read the entries of a ledger while the writer is still finishing a write. In this case, if the writer is half-way through a write of an entry e, one client might end up reading e while another client doesn’t. Consequently, we need a way for reader clients to learn what entries have been correctly replicated in a ledger.

To get reader clients to learn what has been correctly replicated, we need some mechanism that enables readers to quickly determine what those entries are whenever they come. The simplest way to implement this feature was to use ZooKeeper: the writer writes to ZooKeeper the identifier of the last entry that has been correctly replicated. To avoid writing to ZooKeeper a lot, we use a trick, and instead of writing to ZooKeeper upon every entry we add to a ledger, we only do it when we close the ledger. Doing it upon closing a ledger only implies that two readers can only really agree on the content of a ledger once the ledger is closed. It makes sense because while the ledger is being written to, the readers can observe the ledger at different times and with different content.

Of course, BookKeeper allows clients to read from a ledger before it is closed, but the way to do it is more of an API discussion, and I won’t cover it here.

One important conclusion of this discussion is that the replication protocol we use for BookKeeper, ignoring the presence of ZooKeeper, isn’t a complete replacement for Zab. Zab provides stronger guarantees and in this particular scenario complements the BookKeeper protocol. The BookKeeper use case, however, is an example of a replication scheme different from the one in ZooKeeper and other consensus-based services that exploits the nature of this reliable log store application to be more efficient. The efficiency comes from a simpler protocol for the common case and parallel access to the data of ledgers.