Core Architecture · Topic 18 of 21

Consensus: Raft & Paxos

300 XP

The Consensus Problem

Distributed consensus is the problem of getting a set of processes to agree on a single value — despite arbitrary process crashes, network partitions, and message delays.

This sounds simple. It is not. The difficulty arises because:

  1. Processes fail independently. A crashed node can’t tell anyone it crashed.
  2. Networks are unreliable. A message that hasn’t arrived might be delayed, or might never arrive.
  3. You can’t distinguish a crashed node from a slow node. If you wait forever for a response, the system halts. If you time out and assume failure, you might be wrong.

Consensus is the foundation for: leader election, distributed locking, atomic broadcast, linearizable storage, and replication with failover. Every major distributed database uses a consensus algorithm internally.


FLP Impossibility

In 1985, Fischer, Lynch, and Paterson proved the FLP impossibility result:

No deterministic consensus algorithm can guarantee termination in an asynchronous distributed system if even one process can crash.

An asynchronous system is one where message delays are unbounded — you can’t distinguish a crashed node from a slow one. This is the realistic model for most networks (including LANs under load, WANs, and cloud networks).

What this means practically: real consensus algorithms work around FLP by using timeouts (introducing partial synchrony — assuming messages usually arrive within some bound). They sacrifice the termination guarantee under worst-case conditions for liveness under typical conditions. Raft uses randomized election timeouts. Paxos uses timeouts to detect proposer failure. None are truly asynchronous — they all assume eventual message delivery.


Byzantine Fault Tolerance (Brief)

The consensus algorithms discussed here assume crash-stop failures: a failed node stops producing output. They do not handle Byzantine failures: a malicious or buggy node that sends arbitrary, incorrect messages.

Byzantine Fault Tolerant (BFT) algorithms (PBFT, Tendermint, HotStuff) require 3f+1 nodes to tolerate f Byzantine failures, compared to 2f+1 for crash-stop. The overhead is significant. BFT is used in blockchain systems and some aerospace applications. For internal distributed databases (where all nodes are under the same administrative domain), crash-stop is the correct model.


The Two Generals Problem

Before consensus: the Two Generals Problem (1978) shows that two processes communicating over an unreliable channel cannot achieve guaranteed agreement. General A wants to coordinate an attack time with General B. Every messenger might be captured (message lost). No finite number of acknowledgment exchanges guarantees both generals will agree.

This proves that perfect consensus over an unreliable channel is impossible without making additional assumptions (like bounded message delay, or a majority of participants being correct). It’s why Paxos and Raft require a quorum (majority), not unanimous agreement.


Paxos: The Original Algorithm

Paxos (Lamport, 1989, published 1998) is the first practical consensus algorithm. It is notoriously difficult to implement correctly — Lamport himself described the paper as intentionally obscure. Google’s Chubby, Amazon’s DynamoDB (coordinator path), and many others have used Paxos variants.

Roles

  • Proposers: Propose values. Usually clients or designated coordinator nodes.
  • Acceptors: Vote on proposals. Must be a majority quorum.
  • Learners: Learn the decided value (often same as acceptors).

Phase 1: Prepare / Promise

The proposer sends Prepare(n) to all acceptors, where n is a proposal number (monotonically increasing, unique per proposer).

Each acceptor:

  • If n is greater than any previous proposal number it has seen, it promises not to accept any proposal numbered lower than n, and returns the highest-numbered proposal it has already accepted (if any).
  • Otherwise, it ignores or rejects.

If the proposer receives promises from a majority of acceptors, it proceeds to Phase 2.

Phase 2: Accept / Accepted

The proposer sends Accept(n, v) to all acceptors, where v is either:

  • Its own proposed value (if no acceptor returned a previously accepted value), or
  • The value from the highest-numbered previously accepted proposal returned in Phase 1.

Each acceptor:

  • If it has not promised to ignore proposals numbered n or lower, it accepts (n, v) and sends Accepted(n, v) to all learners.

If a majority of acceptors accept, consensus is achieved on v.

Proposer P1                    Acceptors A1, A2, A3
    │                               │    │    │
    │── Prepare(n=5) ──────────────►│    │    │
    │◄─ Promise(n=5, prev=null) ────│    │    │
    │── Prepare(n=5) ──────────────────►│    │
    │◄─ Promise(n=5, prev=null) ─────────│    │
    │   (majority = 2/3 — skip A3)       │    │
    │                                         │
    │── Accept(n=5, v="commit") ───────►│    │
    │── Accept(n=5, v="commit") ──────────────►│
    │◄─ Accepted(n=5, v="commit") ─────│    │
    │◄─ Accepted(n=5, v="commit") ───────────►│
    │   (majority achieved)
    │── Learn("commit") ─── notify learners

Why Paxos is hard: Multi-Paxos (extending single-value Paxos to a log of values) requires careful handling of leader election, log gaps, and concurrent proposers. The “Paxos Made Simple” paper leaves these as exercises. Production implementations diverge significantly and are not interoperable.


Raft: Designed for Understandability

Raft (Ongaro and Ousterhout, 2014) was explicitly designed to be easier to understand than Paxos while providing equivalent guarantees. It decomposes consensus into three sub-problems: leader election, log replication, and safety.

Core Invariant

Raft guarantees that if any server has applied log entry at index i with term t, no other server will ever apply a different entry at index i with term t.

This is ensured by the election restriction: only a node with the most up-to-date log can be elected leader.

Leader Election

Raft operates in terms (monotonically increasing integers). Each term begins with an election.

  1. All servers start as followers.
  2. Each follower has an election timeout (randomized between 150ms and 300ms). If no heartbeat arrives before the timeout, the follower becomes a candidate.
  3. A candidate increments its term, votes for itself, and sends RequestVote(term, candidateId, lastLogIndex, lastLogTerm) to all other servers.
  4. A server grants a vote if:
    • It hasn’t voted in this term yet, and
    • The candidate’s log is at least as up-to-date as the voter’s (compare lastLogTerm, then lastLogIndex).
  5. If the candidate receives votes from a majority, it becomes leader for this term.

Randomized timeouts make simultaneous candidates rare. If a split vote occurs, all candidates time out and try again with a higher term.

Term 1: Leader = Node A
  Node A crashes at t=200ms
  Node B election timeout fires at t=210ms
  
Term 2 election:
  B sends RequestVote(term=2, lastLogIndex=15, lastLogTerm=1) to C, D, E
  C: "I haven't voted in term 2, B's log is current" → Vote
  D: "I haven't voted in term 2, B's log is current" → Vote
  B has 3 votes (including itself) = majority of 5
  
Term 2: Leader = Node B

Log Replication

Once elected, the leader handles all client writes:

  1. Append the new entry to its own log (entry = {term, index, command}).
  2. Send AppendEntries(term, leaderId, prevLogIndex, prevLogTerm, entries[], leaderCommit) to all followers in parallel.
  3. When a majority of servers (including itself) have appended the entry, the leader marks it committed and advances commitIndex.
  4. The leader applies committed entries to its state machine and returns the result to the client.
  5. The leaderCommit field in subsequent AppendEntries tells followers to advance their own commitIndex.
Client → Leader: "SET x=5"
  Leader appends {term=2, index=16, cmd="SET x=5"} to log
  Leader sends AppendEntries to Followers 1,2,3,4
  Followers 1,2,3 respond OK (majority achieved with leader = 4/5)
  Leader commits, applies to state machine, responds to client
  Leader's next AppendEntries/heartbeat tells followers to commit index 16

Consistency Check: Log Matching Property

AppendEntries includes prevLogIndex and prevLogTerm. A follower rejects the request if its log doesn’t have an entry matching both. If rejected, the leader decrements nextIndex for that follower and retries with an earlier entry. This ensures that once a follower accepts an AppendEntries, its log matches the leader’s log up to that index.

Safety: Election Restriction

A candidate cannot be elected unless its log is at least as up-to-date as a majority of the cluster. “At least as up-to-date” is defined: compare the last log term (higher wins), then the last log index (longer wins). This guarantees that a newly elected leader has all committed entries — committed entries, by definition, were replicated to a majority, and the leader must overlap that majority.


Raft Membership Changes

Changing cluster membership (adding/removing nodes) is dangerous: if done naively, you can briefly have two independent majorities.

Joint consensus: Raft uses a two-phase approach. First, a C_old,new configuration is committed that requires majorities from both old and new membership. Then C_new is committed. During the transition, both configurations must agree before anything is committed.

Single-server changes: A simpler approach (proposed by Ongaro) adds or removes one server at a time. A single-server change cannot create two independent majorities. etcd uses this approach.


Performance: RTT Analysis

A client write in Raft requires:

  1. Client → Leader: send command (~0.5 RTT)
  2. Leader → Followers: AppendEntries (replicate)
  3. Followers → Leader: acknowledgment (~0.5 RTT = 1 RTT total from step 2)
  4. Leader: commit, apply, respond to client (~0.5 RTT)

Total: ~2 RTTs from client perspective (client-to-leader + leader-to-quorum round-trip).

In a same-datacenter deployment: RTT ≈ 0.5ms → 2 RTTs = ~1ms. Cross-datacenter (US East to US West): RTT ≈ 70ms → 2 RTTs = ~140ms.

Leader lease optimization: The leader can serve linearizable reads without a log round-trip if it can prove it hasn’t been superseded. It records the time it last received a majority heartbeat acknowledgment. If the lease period (election timeout) hasn’t elapsed, it’s still the leader and can serve reads from local state. This trades bounded clock drift tolerance for a significant read latency reduction.


Multi-Paxos vs Raft

Raft and Multi-Paxos are conceptually equivalent. Both:

  • Elect a distinguished leader that serializes all decisions.
  • Replicate decisions to a majority before committing.
  • Use term/epoch numbers to distinguish stale leaders.
  • Handle log gaps and follower catch-up.

The differences are engineering choices, not fundamental:

PropertyMulti-PaxosRaft
Log gap handlingAcceptors can accept out-of-orderNo gaps allowed; strict sequential
Leader completenessMust re-confirm old entriesGuaranteed by election restriction
Specification styleInformal; many variantsFormal; single canonical specification
UnderstandabilityLowerHigher

In practice, implementations labeled “Paxos” often contain Raft-like optimizations, and “Raft” implementations sometimes use Paxos-like gap handling. The label matters less than the implementation correctness.


Real Implementations

SystemAlgorithmNotes
etcdRaftThe reference Raft implementation; used by Kubernetes
ConsulRaftHashiCorp’s service mesh backbone
CockroachDBRaft (per-range)Each key range is a Raft group; thousands of groups per node
TiKV / TiDBRaft (Multi-Raft)Same pattern as CockroachDB
ZooKeeperZAB (Zookeeper Atomic Broadcast)Paxos-like; leader broadcasts in order
Apache Kafka (KRaft)RaftReplaced ZooKeeper dependency in Kafka 3.x
Google SpannerPaxos per tabletUses TrueTime for external consistency across Paxos groups
Apache HBaseZooKeeper for coordinationNot using Raft for data replication

TypeScript Pseudocode: Simplified Raft State Machine

type LogEntry = { term: number; index: number; command: string };
type NodeRole = "follower" | "candidate" | "leader";

class RaftNode {
  // Persistent state (must survive crashes)
  currentTerm = 0;
  votedFor: string | null = null;
  log: LogEntry[] = [];

  // Volatile state
  commitIndex = 0;
  lastApplied = 0;
  role: NodeRole = "follower";
  leaderId: string | null = null;
  electionTimer: ReturnType<typeof setTimeout> | null = null;

  // Leader-only volatile state
  nextIndex: Map<string, number> = new Map();    // follower → next index to send
  matchIndex: Map<string, number> = new Map();   // follower → highest replicated index

  constructor(
    readonly nodeId: string,
    readonly peers: string[],
    readonly sendRPC: (to: string, msg: object) => Promise<object>
  ) {}

  resetElectionTimer(): void {
    if (this.electionTimer) clearTimeout(this.electionTimer);
    const timeout = 150 + Math.random() * 150; // 150-300ms
    this.electionTimer = setTimeout(() => this.startElection(), timeout);
  }

  receiveHeartbeat(leaderId: string, term: number): void {
    if (term >= this.currentTerm) {
      this.currentTerm = term;
      this.role = "follower";
      this.leaderId = leaderId;
      this.resetElectionTimer();
    }
  }

  async startElection(): Promise<void> {
    this.currentTerm += 1;
    this.role = "candidate";
    this.votedFor = this.nodeId;
    let votes = 1; // vote for self

    const lastLog = this.log[this.log.length - 1];
    const voteRequests = this.peers.map(peer =>
      this.sendRPC(peer, {
        type: "RequestVote",
        term: this.currentTerm,
        candidateId: this.nodeId,
        lastLogIndex: lastLog?.index ?? 0,
        lastLogTerm: lastLog?.term ?? 0,
      }).then((resp: any) => {
        if (resp.voteGranted && resp.term === this.currentTerm) votes++;
      }).catch(() => {}) // network failure — just no vote
    );

    await Promise.all(voteRequests);

    const quorum = Math.floor((this.peers.length + 1) / 2) + 1;
    if (votes >= quorum && this.role === "candidate") {
      this.becomeLeader();
    }
  }

  becomeLeader(): void {
    this.role = "leader";
    // Initialize nextIndex to leader's log length + 1
    for (const peer of this.peers) {
      this.nextIndex.set(peer, this.log.length + 1);
      this.matchIndex.set(peer, 0);
    }
    this.sendHeartbeats();
  }

  async appendEntry(command: string): Promise<boolean> {
    if (this.role !== "leader") return false;
    const entry: LogEntry = {
      term: this.currentTerm,
      index: this.log.length + 1,
      command,
    };
    this.log.push(entry);

    let acks = 1; // leader counts itself
    const quorum = Math.floor((this.peers.length + 1) / 2) + 1;
    await Promise.all(this.peers.map(async peer => {
      const resp: any = await this.sendRPC(peer, {
        type: "AppendEntries",
        term: this.currentTerm,
        leaderId: this.nodeId,
        prevLogIndex: entry.index - 1,
        prevLogTerm: this.log[entry.index - 2]?.term ?? 0,
        entries: [entry],
        leaderCommit: this.commitIndex,
      }).catch(() => null);
      if (resp?.success) acks++;
    }));

    if (acks >= quorum) {
      this.commitIndex = entry.index;
      return true;
    }
    return false;
  }

  sendHeartbeats(): void {
    if (this.role !== "leader") return;
    for (const peer of this.peers) {
      this.sendRPC(peer, {
        type: "AppendEntries",
        term: this.currentTerm,
        leaderId: this.nodeId,
        prevLogIndex: this.log.length,
        prevLogTerm: this.log[this.log.length - 1]?.term ?? 0,
        entries: [],
        leaderCommit: this.commitIndex,
      }).catch(() => {});
    }
    setTimeout(() => this.sendHeartbeats(), 50); // heartbeat every 50ms
  }
}

Interview Deep-Dive: Distributed Database Consistency

Q: How does a distributed database ensure consistency?

A complete answer:

Consistency in a distributed database means that reads always see the most recently committed writes — what we call linearizability. Achieving this requires consensus: before a write is acknowledged, a quorum of nodes must have durably stored it.

Systems like CockroachDB and etcd use Raft for this. Every write goes through the Raft leader, which replicates the entry to a majority of followers. Only after a quorum acknowledges the write does the leader commit it and respond to the client. If the leader crashes, a new leader is elected — and it’s guaranteed to have all committed entries, because committed entries reached a majority and the election rule requires the new leader to have the most up-to-date log.

For reads, there are two approaches. Routing all reads through the leader (with a quorum round-trip or a lease) ensures linearizability but adds latency. Allowing follower reads (as in Cassandra with eventual consistency or PostgreSQL hot standby) trades linearizability for lower read latency.

The cost: every write requires at least 2 network round-trips (leader to quorum, quorum to leader) plus disk fsync on each node. In the same datacenter, this is ~1-5ms. Across regions, it becomes 50-200ms — which is why globally distributed ACID databases (Spanner, CockroachDB) are significantly slower than single-region systems.


Network Partitions and Split-Brain

The most dangerous failure scenario for a consensus group is a network partition that splits the cluster into two groups, each believing the other is dead.

Without consensus, both groups could elect a leader and accept writes — a split-brain scenario. In databases, split-brain means two nodes independently accept conflicting writes, producing diverged state that cannot be automatically reconciled.

Raft prevents split-brain by requiring a quorum majority for any election or commit. With 5 nodes split into 3 and 2: the group of 3 can elect a leader and commit entries. The group of 2 cannot form a quorum — its old leader will step down when it fails to contact a majority, and its candidates will fail to win an election. No writes are accepted by the minority partition.

This comes at a cost: the minority partition is unavailable during the partition. This is the fundamental CAP theorem trade-off: Raft is CP (Consistent + Partition-tolerant), sacrificing Availability under partitions.

5-node cluster partitioned:
  Partition A (nodes 1,2,3): quorum = 3/5 → can elect, can commit
  Partition B (nodes 4,5):   quorum = 2/5 → cannot elect, stalls

  Partition heals → B's nodes contact A's leader → A's logs replicate to B
  Any entries B might have tentatively applied are overwritten (they couldn't commit, so none were)

Pre-Vote and Check Quorum: Production Hardening

The canonical Raft algorithm has a known issue: a disruptive follower. If a node is partitioned from the leader but still connected to other followers, it will repeatedly time out, increment its term, and send RequestVote RPCs. When the partition heals, this node has a higher term — causing the current leader to step down and forcing an election, even though the system was healthy.

Pre-Vote extension (used by etcd): before starting an election, a candidate first runs a pre-vote phase to check whether it can win. If it can’t reach a quorum, it doesn’t increment its term. This prevents disruptive term inflation.

Check Quorum (used by etcd): the leader periodically verifies it can still reach a quorum. If not, it steps down voluntarily. This prevents a partitioned leader from continuing to respond to clients with stale reads (before lease expiry).


Raft in Production: Sizing and Tuning

Cluster size: Always use an odd number of nodes. Even numbers don’t increase fault tolerance:

  • 3 nodes: tolerates 1 failure (quorum = 2)
  • 4 nodes: tolerates 1 failure (quorum = 3) — same as 3 nodes, worse write latency
  • 5 nodes: tolerates 2 failures (quorum = 3)
  • 7 nodes: tolerates 3 failures (quorum = 4) — rarely needed; tail latency increases with more followers

Heartbeat and election timeout: The relationship must satisfy:

broadcastTime << electionTimeout << MTBF
  broadcastTime: average time to send/receive RPCs to all peers (~0.5-20ms depending on deployment)
  electionTimeout: should be 10-20x broadcastTime (Raft paper recommends 150-300ms)
  MTBF: mean time between failures for a single server (should be hours to days)

Tighten election timeouts for same-DC deployments (can use 50-150ms). Loosen for multi-DC (use 1-5 seconds to avoid spurious elections on cross-DC latency spikes).

Write latency breakdown (5-node cluster, same DC):

Client → Leader:               0.1ms  (network)
Leader writes to local WAL:    0.5ms  (NVMe fsync)
Leader → Followers (parallel): 0.3ms  (network RTT)
Followers write local WAL:     0.5ms  (NVMe fsync)
Followers → Leader:            0.3ms  (network, included in RTT)
Leader commits + responds:     0.1ms  (CPU)
                              ───────
Total:                        ~1.8ms

Key Takeaways

  • Consensus is the problem of agreement despite failures; FLP proves it’s impossible in truly async systems — real algorithms use randomized timeouts for liveness.
  • Paxos uses two phases (Prepare/Promise, Accept/Accepted) and majority quorums; the protocol is correct but underspecified for log replication.
  • Raft decomposes consensus into leader election (randomized timeouts), log replication (AppendEntries quorum), and safety (election restriction).
  • Only a node with the most up-to-date log can be elected — guaranteeing that committed entries are never lost.
  • A Raft write takes ~2 RTTs; same-DC this is ~1-5ms, cross-region is 50-200ms.
  • Leader leases eliminate the log round-trip for linearizable reads by bounding clock drift within the election timeout.
  • Network partitions split clusters; Raft’s quorum requirement ensures the minority becomes unavailable (CP, not AP).
  • Pre-Vote and Check Quorum harden production Raft deployments against disruptive term inflation.
  • etcd, Consul, CockroachDB, Kafka KRaft all use Raft; ZooKeeper uses ZAB; Spanner uses Paxos per tablet.