Keep moving forward: Liveness in distributed systems

In distributed computing jargon, properties are classified as either safety or liveness properties [1, 2]. Consistency is a typical safety property: the state of the system is never inconsistent for some definition of consistent. Of course, “never inconsistent” assumes an ideal world in which us developers do everything right in the code, which history has shown is rarely the case. There are other good examples of safety properties, like the agreement property for consensus protocols, which says that two processes do not decide differently. Interestingly, in replicated systems, we obtain consistent replicas by guaranteeing that they agree on what is being executed and applied.

But, it isn’t all about safety, is it? We also need to make progress. Being safe by procrastinating is rarely an ideal situation. In systems like ZooKeeper, it is very important to be able to elect a leader. If there is no leader, then the ensemble can’t update its state. In systems like BookKeeper, if ZooKeeper is not available, we can’t create or open a ledger. At this point, let me highlight the difference between safety and liveness. If a ZooKeeper ensemble can’t elect a leader, then it can’t process new updates, but the state is still consistent because either the previous leaders have done the right thing or the state is empty. Consequently, we can have a system that is in a safe state but isn’t live. In fact, the issue of safety and liveness for systems implementing consensus at their core is pretty delicate and fundamental. According to the FLP result [3], we cannot make sure that we have liveness without actually risking a violation of safety in a system that can’t differentiate a crashed server from a slow server. Being perceived as slow can be a side-effect of having some competing load running on the same machine, garbage-collection stalls, network load, etc.

I want to make the argument about liveness a bit more concrete, though. In particular, I want to point out that many parameters in distributed systems could lead to liveness issues inadvertently, and I’m going to use the initLimit parameter of ZooKeeper to illustrate the problem.

What does initLimit do?

The initLimit parameter is there exactly because of liveness. When a new leader is elected, the elected leader (we call it prospective leader regularly) needs to bring the ensemble up to speed before it can start serving new requests. Say that the prospective leader drops dead in the middle of the synchronization with the followers, before it even has a chance to start leading. It makes sense that the followers drop the leader and go elect a new one, and initLimit is supposed to bound the amount of time the servers expect this synchronization phase to take. If it takes longer, then the followers drop the leader and go back to leader election. The leader can also drop followers if it notices that they are falling behind.

But wait, what if the leader is taking long legitimately? What if snapshots are large and it really needs more time than initLimit is giving to synchronize? For example, say that initLimit is 10 and tick time is 2s. The total time the leader has to synchronize is 10 x 2 = 20s. Say the state is 2GB worth of data and that we can pull a snapshot out of the disk at a rate of 50 megabytes per second (this is just for illustration purposes). The leader needs 40s only to pull data out of the disk. In this case, we just need to increase the value of initLimit, right? Increasing the value of initLimit solves the problem temporarily. It is not unlikely that the value of initLimit will be changed again in the future as the amount of data grows. Of course, there are brute force solutions, like:

  • Making the maximum heap size small so that we hit an out of memory error before the initLimit value gives us a false positive;
  • Making the value of initLimit very large, which is likely to work, but it’ll also make initLimit ineffective.

We could also consider having the leader reporting progress periodically with pings, but even if a follower is receiving pings periodically, for how long should it keep receiving these reports? What if the leader is late on sending the reports? How long should a follower wait even for the pings? We still need some estimate of time and cannot really get rid of a time parameter like initLimit.

A solution

A solution is to calibrate initLimit dynamically. The general idea is to estimate the number of bytes the leader needs to send to followers in the case it needs to send a snapshot. Recall that a leader typically sends only a diff of transactions, but in some cases it might need to send a whole snapshot of the state if the follower is lagging behind. If we know the rate at which a leader is supposed to process bytes and send them to followers, we can estimate a value for initLimit by dividing the state size by such rate. There are three points that are complicated here, however. First, how do we even determine what such a byte rate is? Network speed is a good estimate, but there is the processing of the leader and background network traffic as well, so it is unlikely that the leader is able to send at line speed. Second, the leader needs to be able to synchronize with a quorum before it starts. The time to coordinate with a quorum must be taken into account somehow. Third, the followers need to know the value of initLimit so that they can apply locally. Currently, initLimit is set via configuration and servers do not exchange their values. Note that the value at each server can be set differently.

Stepping back for a moment, the goal is to set initLimit to a value that is fair for a prospective leader. It is fair in the sense that it should be enough time for the leader to synchronize with a quorum and to not be perceived as being slow because of a large state. The consequence of not calibrating or setting the value of initLimit accurately is either false positives (aggressive) or false negatives (conservative). The simplest way to do this calibration is to check the size of the latest valid snapshot on disk and divide it by some conservative constant for the byte rate. For example, we can assume that a leader should be able to process bytes at a rate of 100 Mbits/s, which is on the low side for the current technology, but gives us a lower bound on the amount of work a prospective leader should be able to perform without being perceived as slow. The drawbacks of such a solution? There are a couple:

  1. It is hard to tell if the specified byte rate is adequate. Making it too conservative will make a ZooKeeper ensemble wait a longer time to recover from a dead or slow leader. If the byte rate is set too large, then a prospective leader might not be able to synchronize with a quorum. The original initLimit parameter was designed to be conservative, however. A conservative estimate is ok for many cases, since it enables progress in scenarios in which a static initLimit value doesn’t.
  2. When a leader starts, its latest snapshot includes all transactions in its transaction log. Consequently, looking at just the snapshot is pretty accurate. After the leader becomes established, the leader might have proposed/accepted some transactions that are not reflected in the latest snapshot. If a follower tries to synchronize with the leader at this point and the leader estimates initLimit using only the snapshot size, then the estimate is not accurate because it does not include transactions committed after the snapshot was taken. For practical purposes, the difference should not be large unless the recent transactions are mostly large writes.

To improve the estimate, we can consequently do two things: estimate the byte rate instead of setting it manually and improve the state size estimate.

Back to liveness (or absence of)

Although I’ve focused quite a bit on the initLimit parameter, the core goal of this post is to bring some attention to liveness in distributed systems. There is a lot of focus on safety (for obvious reasons), but I’ve seen little discussion about liveness. There is of course the A of CAP which is about liveness [4], but the CAP availability refers to the ability of processing requests according to failures and network partitions. Here I refer to scenarios in which the system is not live and yet there is no failure or network partition.

The absence of liveness comes from a parameter being set to a small value compared to what the value should really be. The key observation is that some parameters related to timing might lead to liveness issues.  A potentially better solution compared to just setting a fixed value is to calibrate such parameters dynamically, but doing it is not entirely trivial. It requires estimates and some guesswork. For initLimit specifically, there is a jira open for this particular issue (ZOKEEPER-1977) and we at Microsoft have done some preliminary work to get it adjusted dynamically. We should be posting a patch some time soon.


Thanks to Martin Kleppman and Raúl Gutiérrez Segalés for comments on a draft of this post. I’d like to also acknowledge the contributions of my Microsoft buddies Ian Dimayuga and Greg Foxman on the initLimit work and thank them for discussions and reviews. Keep rocking, guys!


[1] Lamport, L., “Proving the Correctness of Multiprocess Programs“, IEEE Transactions on Software Engineering, Volume SE-3, Issue 2, pp. 125-143, March 1977.

[2] Alpern, B. and Schneider, F., “Recognizing safety and liveness“, Distributed Computing, Volume 2, Issue 3, pp. 117-126, 1987.

[3] Fischer, M., Lynch, N., and Paterson, M., “Impossibility of Consensus with One Faulty Process“, Journal of the ACM, Volume 32, Issue 2, pp. 374-382, April 1985.

[4] Gilbert, S. and Lynch, N., “Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services“, ACM SIGACT News, Volume 33, Issue 2, June 2002.

Dude, where’s my metadata?

This post is about silent data loss in replicated systems (state-machine replication a la ZooKeeper) due to the disk state being wiped out. The disk state is crucial in such systems to guarantee that replicated data isn’t lost in the case a server crashes and recovers. The replication protocol in ZooKeeper assumes that servers can crash and recover, and the ensemble can make progress as long as f + 1 members are up and running, where f is a bound on the number of faulty servers determined by the size of the ensemble. If the ensemble has 5 servers, then the ensemble can make progress as long as 3 are up (f = 2). It additionally assumes that the disk state before a server crash is there during recovery. This post is pointing out that this isn’t always the case, and it is an issue to be aware of. Note that this isn’t unique to ZooKeeper, but I’ll focus on ZooKeeper because I know it well.

The disk state can be wiped out for various reasons like faulty drives, misconfiguration, starting a server replica in a different machine virtual or not. Losing disk state might be silent at times for due to software, hardware, or operator errors. This issue has come up occasionally in the community, but to my knowledge hasn’t been documented or discussed extensively.

Here is the problem that losing disk state really induces. Say we have an ensemble of ZooKeeper servers comprising 3 servers: A B C. A leads and B follows; C is dormant. Now say that A and B commit a transaction, say 0x1 – “create znode /foo”. Recall from ZooKeeper 101 that before committing a transaction, a quorum of servers needs to persist it to disk by writing to the transaction log.

It all looks good until say A crashes and its disk state is gone. Now A wakes up and it has no memory of committing 0x1, so it thinks it is starting from scratch. A next forms a quorum with C, and C hasn’t heard anything about 0x1. A and C elect A to lead and declare the empty state to be the new state. In the meanwhile, B is clueless waiting to talk to other servers to form a quorum. When it does, it learns that the new state doesn’t contain 0x1. Transaction 0x1 is now lost forever…

This figure illustrates the execution I just described:

Getting around it: Increase replication

Fundamentally, this is a problem with the assumption that servers only crash in the case they are faulty. Losing this state looks more like an arbitrary fault (or a Byzantine fault) and could be handled as such. If instead of using simple majorities as quorums (more than half of the replicas) we use super majorities (more than 2/3 of the replicas), then we can guarantee that we have enough replicas. To see why it works, convince yourself that the smallest intersection of any two simple majorities has one server, while for super majorities we have always f + 1 in any intersection.

Having a single server in the intersection between quorums causes the problem above because the single server can crash, lose its disk state due to a faulty disk, and recover memoryless. Having f+1 servers in the intersection guarantees that as long as no more than f servers get a faulty disk simultaneously, any intersection between two quorums will always contain at least one server that hasn’t lost its disk state and the ensemble can recover the state it has previously replicated through that server in the interesection.

Using super majorities has one important drawback, which is requiring 3f + 1 servers to tolerate f faulty servers. To tolerate 2 faulty servers, we now need 7 servers rather than 5. Consequently, using 7 servers with super majorities, we can only tolerate 2 servers crashing rather than 3 servers as we have with simple majorities. That’s not great, so can we deal with this problem of silent data loss without having to resort to super-majorities?

Second cut: let the operator decide

An even simpler approach is to not restart servers automatically and perform a sanity check over the disk data before restarting the server. In the case the server does lose its disk state, it could be removed and reintroduced to the ensemble via reconfiguration. The reconfiguration takes care of rebuilding the state onto the server recovering so that it can start making progress with the rest of the ensemble.

Note that for the reconfiguration to work the ensemble must have enough replicas up to be able to make progress without the faulty server. In the example above, A can’t be reintroduced if B doesn’t start talking to C again. Getting B back might require an operator to intervene. But, what if B has also lost disk state? In this case, we have violated our f bound, which is f = 1 for the above example, and the ensemble is stuck, it can’t reconfigure A and B back in.

Third cut: using a token

The assumption that no more than f servers can be faulty and recovering simultaneously is obviously fine, but we have asked the question of whether we could automate even further by enabling progress when we have too many faulty servers, even if some data is lost in some rare cases.

One way is to use a token, a database identifier. Such identifiers have been used extensively in commercial databases like Oracle. Oracle has a create database command that generates a dbid. We would like the token generation to be automated instead of relying a command issued, however.

As with Oracle databases, the token represents an instance of the ZooKeeper database, and a server can only join an ensemble in the case it has the same token as everyone else. The servers consequently need to agree on a common token. Servers need to verify the token upon a handshake with the leader, possibly even during leader election, to make sure they refer to the same instance. They should also enable clients to determine if they are talking to the right incarnation.

What makes the token agreement problem interesting is that we need to reach agreement to reach agreement: Zab gives us agreement but we need to agree upon a token to tolerate disk failures. Instead of trying to solve the problem recursively, we simplify the solution by fixing a token distributor and requiring unanimity. A chosen server, not elected dynamically, is responsible for distributing new tokens, and it does it in a 2PC manner, requiring all other servers to acknowledge and commit.

As long as a majority of servers keeps the same token there is no need to generate a new one. In the case any server loses its disk, it recovers by adopting the token provided by a majority.  If no majority holds a token, then the token distributor does its job and generates a new one. Given that it requires unanimity, the token generator should wait until a majority explicitly declares not to have a token. Otherwise, because of timing, it could end up generating new tokens and instantiating the database unnecessarily.

An invariant of the system is that the token a message of the replication protocol carries always matches the token of the receiver, except for the case in which it doesn’t have a token and it is recovering. Servers not bearing a token shouldn’t participate in the protocol.

Back to the initial example, how does this token scheme solve the problem? When A comes back up and connects to C, it won’t have a token so A and C won’t be able to form a quorum and get going with processing requests. Server A needs to wait for B to say what its token is so that it can make progress. Now that A knows that there is a quorum with the same token, it can adopt the token by pulling a copy of the state of B. Why B? Because B has the latest state (latest epoch/zxid).

Let’s be upfront about the weak points of this scheme. First, it isn’t great to have a fixed distributor, but trying to elect one would complicate things and would essentially be a “solving consensus to solve consensus” kind of direction that wouldn’t work. Second, unanimity leaves no room for masking crashes. If any process is unavailable, then the token distribution protocol can’t terminate as described. Both assumptions are OK in the case new tokens aren’t distributed often, though. In fact, it really shouldn’t happen often because a new token represents a new instance of the database, which possibly means lost data. We might be able to copy some data from the previous instance, but there is no guarantee that it will contain everything that has been committed.

The history of the database id (dbid) in ZooKeeper actually dates back to the early days of the project [1]. We haven’t finished incorporating the dbid although the field is present in the persistent state of ZooKeeper. We recently started talking about this feature in the context of virtualization because servers might be restarted in a different host without carrying the disk state, in which case we wanted to avoid the problem mentioned here. Reconfiguration is also a topic I’ll leave for a future post. It affects the unanimity assumption because the set of servers could change dynamically.

Any conclusion?

Reconfiguration already exists in the 3.5 branch of ZooKeeper, so the option of reconfiguring a server in is viable. I also don’t see a strong reason for not automating this process of checking the disk state and reconfiguring if necessary, it sounds doable. The token scheme provides an additional notch of automation, but it will take some effort to implement. The use of super majorities was supposed to be simple with quorum verifiers, but when I tried to implement a verifier for super majorities, I ran into all sort of problems because of the dependency on simple majorities across the code. It sounds like the initial effort to abstract away the type of quorum system we use wasn’t really continued. I haven’t put much effort into getting this one to work, but it shouldn’t be too hard to fix it. I expect it to require some refactoring.

Acknowledgments: The token distribution scheme mentioned here has been discussed and developed with Johannes Klein and Satish Thatte. Thanks to Camille Fournier, Martin Kleppman, Patrick Hunt, and Bill Bridge for comments on a draft version of this post. It has changed quite a bit due to the comments.


[1] Apache ZooKeeper Jira – Unique DB identifiers for servers and clients

So many ways of replicating…

This post is about the replication scheme we use in Apache BookKeeper. It is mostly about why we did it this way rather than how to use it. There are other sources of information about how to use BookKeeper.

When we started thinking about the design of BookKeeper and the replication scheme, we had some experience with the one we had used for ZooKeeper (its older sibling). ZooKeeper implements a replicated state machine using the Zab protocol, which enables replicas to agree on the changes to the ZooKeeper service. This agreement is not only about the changes themselves, but also about the order, so we need both agreement on the changes to the ZooKeeper state (znodes and sessions to be more concrete) and the order in which they are applied to the distinct replicas.

The protocol in the common case (ignoring leader election and changes of the leader) is fairly simple and looks like this:

A leader proposes a new state change, the followers persist to the transaction log and acknowledge it back. Upon receiving acknowledgements from enough replicas (a quorum out of followers and leader), the leader commits by sending out commit messages. Once a replica gets a commit, it applies the state change (e.g., a new znode is created) and it is ready to serve it (e.g., a getData for that znode).

One interesting observation out of the figure above is that the leader needs to propagate that commit message because each replica needs to learn that the state change has been replicated across a quorum of replicas before it starts serving it. It doesn’t matter what the quorum actually is, just that there are enough replicas out there.  The replicas need to learn about correctly replicated state changes because clients of the service can read the state of ZooKeeper from any server, and all healthy servers need to be ready to respond to read requests.

A couple of details to note, which are not that important for this discussion, but I’ll add here for completeness. Requests to change the state of ZooKeeper can be really submitted directly to the leader or to any follower. If it hits a follower, then the follower forwards it to the leader. The second point is about the commit step. Instead of having a commit step in which followers only send back to the leader, we could have servers sending an acknowledgement to everyone else, in which case servers would equally learn that a state change has been correctly replicated. One disadvantage of this option is that it increases the message complexity. If my math is not wrong, then with 5 servers we have 20 messages total for this 2-step approach compared to 12 messages with the 3-step approach we use. The other disadvantage is that everyone needs to talk to everyone else in the 2-step approach, while in the 3-step approach we only need leader <-> follower communication; it simplifies the implementation.

With this replication scheme in mind, let’s now see what we have in BookKeeper. BookKeeper stores and replicates ledgers. Think of ledgers as transaction logs: they can only be appended to, they are in the critical path of apps, and they need to be safely stored. For each ledger, there is just a single writer client and the writer directly replicates the entries (byte arrays) across storage nodes we call bookies. There is no leader like in ZooKeeper, but the writer behaves somewhat as a leader, so we have essentially one leader per ledger.

Because the writer is the only one that needs to learn that an entry has been correctly replicated, we have a similar pattern as the one we had for ZooKeeper, except that we can completely eliminate the commit step:

Even cooler, for the same reason, we don’t have to add entries always to the same set of bookies. We can spread entries across different sets of bookies like this:

which gives us parallelism when adding entries.

Ok, it’s cool that we can eliminate a step, but now there is a handicap. We write stuff into the bookies, but we might want to retrieve that data eventually, right? Since the bookies don’t know whether a record has been correctly replicated, we need to read them from multiple bookies.

Reading from multiple bookies is necessary, but not sufficient to give us a consistent view into a ledger. Say we read the entries of a ledger while the writer is still finishing a write. In this case, if the writer is half-way through a write of an entry e, one client might end up reading e while another client doesn’t. Consequently, we need a way for reader clients to learn what entries have been correctly replicated in a ledger.

To get reader clients to learn what has been correctly replicated, we need some mechanism that enables readers to quickly determine what those entries are whenever they come. The simplest way to implement this feature was to use ZooKeeper: the writer writes to ZooKeeper the identifier of the last entry that has been correctly replicated. To avoid writing to ZooKeeper a lot, we use a trick, and instead of writing to ZooKeeper upon every entry we add to a ledger, we only do it when we close the ledger. Doing it upon closing a ledger only implies that two readers can only really agree on the content of a ledger once the ledger is closed. It makes sense because while the ledger is being written to, the readers can observe the ledger at different times and with different content.

Of course, BookKeeper allows clients to read from a ledger before it is closed, but the way to do it is more of an API discussion, and I won’t cover it here.

One important conclusion of this discussion is that the replication protocol we use for BookKeeper, ignoring the presence of ZooKeeper, isn’t a complete replacement for Zab. Zab provides stronger guarantees and in this particular scenario complements the BookKeeper protocol. The BookKeeper use case, however, is an example of a replication scheme different from the one in ZooKeeper and other consensus-based services that exploits the nature of this reliable log store application to be more efficient. The efficiency comes from a simpler protocol for the common case and parallel access to the data of ledgers.