raft consensus algorithm raft consensus algorithm

Raft Consensus Algorithm: Architecture, Usage, and Practical Go Implementation

Raft is a distributed consensus algorithm designed to be understandablereliable, and practical for building fault-tolerant distributed systems. Introduced in 2014 by Diego Ongaro and John Ousterhout in their landmark paper “In Search of an Understandable Consensus Algorithm”, Raft has become the backbone of many production systems including etcdCockroachDBTiKV, and Consul.

At its core, Raft solves one of the hardest problems in distributed systems:

How do multiple servers agree on a single value or sequence of values even when some of those servers crash, disconnect, or behave slowly?

This problem is known as the consensus problem, and solving it correctly is what separates reliable distributed databases and coordination services from fragile, inconsistent ones.

Key Properties of Raft

PropertyDescription
SafetyNever returns incorrect results under any non-Byzantine failure
AvailabilityFully functional as long as a majority of servers are running
No clocksDoesn’t rely on synchronized clocks for correctness
LatencyA minority of slow servers doesn’t impact overall performance

Before we dive in deeper, please make sure that you watch this youtube video,so you can understand the article better, since it’s a complicated one, here is the youtube video:

َAlso implemented code in this example is available at below repo, feel free to clone and test the project:

https://github.com/mjmichael73/go-raft

Table of Contents

  1. Why Raft Over Paxos?
  2. Core Architecture of Raft
  3. Leader Election
  4. Log Replication
  5. Safety and Consistency Guarantees
  6. Cluster Membership Changes
  7. Real-World Use Cases
  8. Practical Go Implementation
  9. Testing Your Raft Implementation
  10. Common Pitfalls
  11. Conclusion

1. Why Raft Over Paxos?

Before Raft, Paxos (created by Leslie Lamport) was the de facto consensus algorithm. However, Paxos suffers from several well-documented issues:

  • Extremely difficult to understand — even Lamport himself acknowledged this in later papers
  • Incomplete specification — Paxos only describes single-decree consensus, not the full replicated log needed in practice
  • Poor pedagogical structure — the algorithm is hard to decompose and teach
  • Numerous variations — Multi-Paxos, Fast Paxos, Byzantine Paxos, each with their own trade-offs

Raft was explicitly designed to be decomposable into relatively independent sub-problems:

  1. Leader election
  2. Log replication
  3. Safety
  4. Membership changes

This decomposition makes Raft significantly easier to implement correctly in production systems.

2. Core Architecture of Raft

2.1 Server States

Every server in a Raft cluster exists in one of exactly three states at any point in time:

┌─────────────┐     times out,        ┌─────────────┐
│ │ starts election │ │
│ Follower │──────────────────────▶ Candidate │
│ │ │ │
└─────────────┘ └─────────────┘
▲ │
│ discovers current │ receives votes from
│ leader or new term │ majority of servers
│ ▼
│ ┌─────────────┐
└──────────────────────────────│ │
discovers server with │ Leader │
higher term │ │
└─────────────┘
StateDescription
FollowerPassive; responds to requests from leaders and candidates
CandidateTrying to become the leader; votes for itself and requests votes
LeaderHandles all client requests; replicates log entries to followers

2.2 Terms

Raft divides time into terms — numbered with consecutive integers. Each term begins with an election. Terms act as a logical clock in Raft:

Term 1        Term 2    Term 3          Term 4
┌──────────┐ ┌──────┐ ┌───────────┐ ┌──────────────
│ Election │ │ Elec │ │ Election │ │ Election...
│ Leader A │ │ Fail │ │ Leader B │ │ Leader C...
└──────────┘ └──────┘ └───────────┘ └──────────────
  • If a server’s current term is smaller than another’s, it immediately updates its term
  • If a candidate or leader discovers its term is out of date, it reverts to follower
  • Messages with stale term numbers are rejected

2.3 RPCs (Remote Procedure Calls)

Raft uses only two core RPCs for consensus:

RequestVote RPC — Initiated by candidates during elections

Request Fields:
- term (candidate's current term)
- candidateId (candidate requesting the vote)
- lastLogIndex (index of candidate's last log entry)
- lastLogTerm (term of candidate's last log entry)

Response Fields:
- term (currentTerm for candidate to update itself)
- voteGranted (true means candidate received vote)

AppendEntries RPC — Used by leader for log replication AND as heartbeat

Request Fields:
- term (leader's term)
- leaderId (so follower can redirect clients)
- prevLogIndex (index of log entry immediately preceding new ones)
- prevLogTerm (term of prevLogIndex entry)
- entries[] (log entries to store; empty for heartbeat)
- leaderCommit (leader's commitIndex)

Response Fields:
- term (currentTerm for leader to update itself)
- success (true if follower contained entry matching prevLogIndex/prevLogTerm)

2.4 Persistent State

Each server must persist certain state to stable storage before responding to RPCs, so it can safely recover from crashes:

StateDescriptionStorage
currentTermLatest term server has seenPersistent
votedForCandidateId voted for in current termPersistent
log[]Log entries (command + term)Persistent
commitIndexIndex of highest log entry known to be committedVolatile
lastAppliedIndex of highest log entry applied to state machineVolatile
nextIndex[]For each server, index of next log entry to send (leader only)Volatile
matchIndex[]For each server, highest log entry known to be replicated (leader only)Volatile

3. Leader Election

Leader election is the mechanism by which Raft ensures exactly one leader per term.

3.1 Election Process Step-by-Step

Step 1: Follower times out (no heartbeat from leader)

Step 2: Follower increments currentTerm, transitions to Candidate

Step 3: Votes for itself, sends RequestVote RPCs to all other servers

Step 4a: Receives votes from majority → becomes LEADER
Step 4b: Receives AppendEntries from valid leader → reverts to FOLLOWER
Step 4c: Election timeout expires (split vote) → increment term, start over

3.2 Election Timeout

Raft uses randomized election timeouts (typically 150–300ms) to avoid split votes. The randomization ensures that in most cases, only one server times out first and wins the election before others wake up.

3.3 Vote Granting Rules

A server grants a vote if and only if both conditions hold:

  1. The candidate’s term is at least as large as the voter’s current term
  2. The voter has not already voted in this term for someone else
  3. The candidate’s log is at least as up-to-date as the voter’s log

Log up-to-date comparison:

  • If the last entries in the logs have different terms → higher term wins
  • If the logs end with the same term → longer log wins

4. Log Replication

Once a leader is elected, it begins servicing client requests through log replication.

4.1 The Replication Flow

Client ──→ Leader

│ 1. Append entry to local log
│ 2. Send AppendEntries RPC to all followers (parallel)

├──→ Follower 1 ──→ Success
├──→ Follower 2 ──→ Success ← Majority achieved
└──→ Follower 3 ──→ (slow/crashed)

3. Commit entry (majority replied)
4. Apply to state machine
5. Respond to client
6. Notify followers of commit (via next AppendEntries)

4.2 Log Entry Structure

Index:  1     2     3     4     5     6     7
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
Term: │ 1 │ 1 │ 1 │ 2 │ 2 │ 3 │ 3 │
├─────┼─────┼─────┼─────┼─────┼─────┼─────┤
Cmd: │ x=3 │ y=1 │ y=9 │ x=2 │ x=0 │ y=7 │ x=5 │
└─────┴─────┴─────┴─────┴─────┴─────┴─────┘

commitIndex

4.3 The Log Matching Property

Raft maintains two critical invariants:

  1. Index + Term uniqueness: If two entries in different logs have the same index and term, they store the same command
  2. Consistency: If two logs contain an entry with the same index and term, then all preceding entries are identical

These properties are enforced by the consistency check in AppendEntries: the leader includes prevLogIndex and prevLogTerm, and followers reject the request if their log doesn’t match.

4.4 Handling Inconsistent Logs

When a new leader is elected, follower logs may be inconsistent. The leader handles this by:

  1. Starting with nextIndex[follower] = leader's last log index + 1
  2. If AppendEntries fails due to inconsistency → decrement nextIndex and retry
  3. Continue until follower’s log matches
  4. Then send all missing entries in one batch

5. Safety and Consistency Guarantees

5.1 The Leader Completeness Property

Raft guarantees that if a log entry is committed in a given term, that entry will be present in the logs of all leaders for all higher terms. This is enforced through the vote restriction: a candidate cannot win an election unless its log contains all committed entries.

5.2 State Machine Safety

No two servers ever apply different commands to the same log index. This follows from:

  • Log entries can only be committed by the current leader
  • Once committed, entries are never overwritten
  • Only candidates with up-to-date logs can become leaders

5.3 Timing Requirement

For Raft to elect and maintain a stable leader:

broadcastTime << electionTimeout << MTBF

Where:

  • broadcastTime ≈ 0.5ms–20ms (time to send RPC to all servers and receive responses)
  • electionTimeout ≈ 10ms–500ms
  • MTBF = mean time between failures for a single server

6. Cluster Membership Changes

Adding or removing servers from a live Raft cluster is tricky because you can’t switch all servers atomically.

6.1 Joint Consensus

Raft uses joint consensus for safe configuration changes:

Phase 1: Leader commits C_old,new (joint configuration)
During this phase, decisions require majority of BOTH old and new configs

Phase 2: Leader commits C_new (new configuration)
Decisions only require majority of new config

6.2 Single-Server Changes

A simpler approach is changing one server at a time. Adding one server at a time can never result in two non-overlapping majorities, making it safe without joint consensus.

7. Real-World Use Cases

Raft is widely used in production systems across the industry:

SystemUsage
etcdKubernetes’ primary coordination store; built entirely on Raft via the etcd/raft library
CockroachDBEach range/shard is managed by a Raft group
TiKVDistributed key-value store powering TiDB; uses Multi-Raft
ConsulService mesh and service discovery
InfluxDBMeta-node cluster coordination
NomadHashiCorp’s workload orchestrator
YugabyteDBDistributed SQL database
DgraphDistributed graph database

7.1 When To Use Raft

Raft is ideal for:

  • Distributed databases requiring strong consistency
  • Configuration stores (key-value, feature flags)
  • Distributed locks and leases
  • Leader election in microservices
  • Distributed job schedulers
  • Replicated state machines of any kind

8. Practical Go Implementation

Let’s build a complete, working Raft implementation in Go. This implementation covers leader election, log replication, and basic state machine application.

8.1 Project Structure

raft/
├── go.mod
├── raft.go # Core Raft implementation
├── rpc.go # RPC types and transport
├── log.go # Log management
├── state_machine.go # Key-value state machine
├── server.go # HTTP server exposing the KV store
└── main.go # Entry point and cluster bootstrapping

8.2 go.mod

module github.com/yourusername/raft-demo

go 1.21

8.3 rpc.go — RPC Types

package raft

import "time"

// LogEntry represents a single entry in the Raft log.
type LogEntry struct {
Term int `json:"term"`
Index int `json:"index"`
Command interface{} `json:"command"`
}

// RequestVoteArgs holds the arguments for the RequestVote RPC.
type RequestVoteArgs struct {
Term int `json:"term"`
CandidateID int `json:"candidate_id"`
LastLogIndex int `json:"last_log_index"`
LastLogTerm int `json:"last_log_term"`
}

// RequestVoteReply holds the response for the RequestVote RPC.
type RequestVoteReply struct {
Term int `json:"term"`
VoteGranted bool `json:"vote_granted"`
}

// AppendEntriesArgs holds the arguments for the AppendEntries RPC.
type AppendEntriesArgs struct {
Term int `json:"term"`
LeaderID int `json:"leader_id"`
PrevLogIndex int `json:"prev_log_index"`
PrevLogTerm int `json:"prev_log_term"`
Entries []LogEntry `json:"entries"`
LeaderCommit int `json:"leader_commit"`
}

// AppendEntriesReply holds the response for the AppendEntries RPC.
type AppendEntriesReply struct {
Term int `json:"term"`
Success bool `json:"success"`
ConflictIndex int `json:"conflict_index"` // Optimization: fast log backtracking
ConflictTerm int `json:"conflict_term"`
}

// Command represents a client command to the state machine.
type Command struct {
Op string `json:"op"` // "set", "get", "delete"
Key string `json:"key"`
Value string `json:"value"`
}

// CommitEntry is sent on the commit channel when an entry is committed.
type CommitEntry struct {
Command interface{}
Index int
Term int
}

// PeerConfig holds the configuration for a peer server.
type PeerConfig struct {
ID int
Address string
}

// Config holds the configuration for a Raft node.
type Config struct {
ID int
Peers []PeerConfig
ElectionTimeout time.Duration
HeartbeatTick time.Duration
StoragePath string
}

8.4 log.go — Log Management

package raft

import (
"encoding/json"
"fmt"
"os"
"sync"
)

// RaftLog manages the Raft log with persistence.
type RaftLog struct {
mu sync.RWMutex
entries []LogEntry
path string
}

// NewRaftLog creates or restores a Raft log from disk.
func NewRaftLog(storagePath string) (*RaftLog, error) {
rl := &RaftLog{
entries: []LogEntry{
// Sentinel entry at index 0 to simplify index math
{Term: 0, Index: 0},
},
path: storagePath,
}

if storagePath != "" {
if err := rl.load(); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("loading raft log: %w", err)
}
}

return rl, nil
}

// AppendEntry appends a new entry to the log.
func (rl *RaftLog) AppendEntry(entry LogEntry) {
rl.mu.Lock()
defer rl.mu.Unlock()
entry.Index = len(rl.entries)
rl.entries = append(rl.entries, entry)
rl.persist()
}

// AppendEntries appends multiple entries, truncating conflicting entries first.
func (rl *RaftLog) AppendEntries(prevIndex int, entries []LogEntry) {
rl.mu.Lock()
defer rl.mu.Unlock()

for i, entry := range entries {
idx := prevIndex + 1 + i
if idx < len(rl.entries) {
// Conflict detected: truncate from this point
if rl.entries[idx].Term != entry.Term {
rl.entries = rl.entries[:idx]
rl.entries = append(rl.entries, entries[i:]...)
break
}
// Entry already exists and matches; skip
} else {
// Append new entries
rl.entries = append(rl.entries, entries[i:]...)
break
}
}
rl.persist()
}

// GetEntry returns the log entry at the given index.
func (rl *RaftLog) GetEntry(index int) (LogEntry, bool) {
rl.mu.RLock()
defer rl.mu.RUnlock()

if index < 0 || index >= len(rl.entries) {
return LogEntry{}, false
}
return rl.entries[index], true
}

// GetEntriesFrom returns all entries starting from the given index.
func (rl *RaftLog) GetEntriesFrom(index int) []LogEntry {
rl.mu.RLock()
defer rl.mu.RUnlock()

if index >= len(rl.entries) {
return nil
}
result := make([]LogEntry, len(rl.entries)-index)
copy(result, rl.entries[index:])
return result
}

// LastIndex returns the index of the last log entry.
func (rl *RaftLog) LastIndex() int {
rl.mu.RLock()
defer rl.mu.RUnlock()
return len(rl.entries) - 1
}

// LastTerm returns the term of the last log entry.
func (rl *RaftLog) LastTerm() int {
rl.mu.RLock()
defer rl.mu.RUnlock()
if len(rl.entries) == 0 {
return 0
}
return rl.entries[len(rl.entries)-1].Term
}

// TermAt returns the term of the log entry at the given index.
func (rl *RaftLog) TermAt(index int) int {
rl.mu.RLock()
defer rl.mu.RUnlock()
if index < 0 || index >= len(rl.entries) {
return -1
}
return rl.entries[index].Term
}

// Len returns the total number of log entries (including sentinel).
func (rl *RaftLog) Len() int {
rl.mu.RLock()
defer rl.mu.RUnlock()
return len(rl.entries)
}

// persist saves the log to disk.
func (rl *RaftLog) persist() {
if rl.path == "" {
return
}
data, err := json.Marshal(rl.entries)
if err != nil {
return
}
_ = os.WriteFile(rl.path+".log", data, 0644)
}

// load restores the log from disk.
func (rl *RaftLog) load() error {
data, err := os.ReadFile(rl.path + ".log")
if err != nil {
return err
}
return json.Unmarshal(data, &rl.entries)
}

8.5 raft.go — The Core Implementation

package raft

import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
)

// ServerState represents the current state of a Raft server.
type ServerState int32

const (
StateFollower ServerState = iota
StateCandidate ServerState = iota
StateLeader ServerState = iota
)

func (s ServerState) String() string {
switch s {
case StateFollower:
return "Follower"
case StateCandidate:
return "Candidate"
case StateLeader:
return "Leader"
default:
return "Unknown"
}
}

// PersistentState holds state that must survive crashes.
type PersistentState struct {
CurrentTerm int `json:"current_term"`
VotedFor int `json:"voted_for"` // -1 if not voted
}

// Raft is the core consensus module.
type Raft struct {
mu sync.Mutex
logger *log.Logger

// Configuration
id int
peers []PeerConfig
config Config

// Persistent state (must be saved to stable storage before responding to RPCs)
persistent PersistentState

// Volatile state
state ServerState
commitIndex int
lastApplied int

// Leader-only volatile state (reinitialized after election)
nextIndex map[int]int
matchIndex map[int]int

// Log
raftLog *RaftLog

// Channels
commitCh chan CommitEntry // entries ready to apply to state machine
newEntryCh chan struct{} // signals new entries appended by leader
stopCh chan struct{} // signals shutdown

// Timers
electionResetTime time.Time

// HTTP client for peer communication
httpClient *http.Client

// For tracking if we're running
running int32
}

// NewRaft creates and starts a new Raft consensus module.
func NewRaft(config Config, commitCh chan CommitEntry) (*Raft, error) {
raftLog, err := NewRaftLog(config.StoragePath)
if err != nil {
return nil, fmt.Errorf("creating raft log: %w", err)
}

r := &Raft{
id: config.ID,
peers: config.Peers,
config: config,
persistent: PersistentState{
CurrentTerm: 0,
VotedFor: -1,
},
state: StateFollower,
commitIndex: 0,
lastApplied: 0,
nextIndex: make(map[int]int),
matchIndex: make(map[int]int),
raftLog: raftLog,
commitCh: commitCh,
newEntryCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
httpClient: &http.Client{
Timeout: 100 * time.Millisecond,
},
logger: log.New(os.Stderr, fmt.Sprintf("[Raft %d] ", config.ID), log.LstdFlags|log.Lmicroseconds),
}

// Load persistent state from disk if available
if err := r.loadPersistentState(); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("loading persistent state: %w", err)
}

atomic.StoreInt32(&r.running, 1)

// Start background goroutines
go r.electionTimer()
go r.applyCommitted()

r.logger.Printf("Started as %s, term=%d", r.state, r.persistent.CurrentTerm)

return r, nil
}

// Submit submits a command to the Raft cluster.
// Returns the log index, term, and whether this server is the leader.
func (r *Raft) Submit(command interface{}) (int, int, bool) {
r.mu.Lock()
defer r.mu.Unlock()

if r.state != StateLeader {
return -1, -1, false
}

entry := LogEntry{
Term: r.persistent.CurrentTerm,
Command: command,
}
r.raftLog.AppendEntry(entry)
index := r.raftLog.LastIndex()

r.logger.Printf("Submit: appended entry at index=%d, term=%d", index, r.persistent.CurrentTerm)

// Signal replication goroutines
select {
case r.newEntryCh <- struct{}{}:
default:
}

return index, r.persistent.CurrentTerm, true
}

// GetState returns the current term and whether this server is the leader.
func (r *Raft) GetState() (int, bool) {
r.mu.Lock()
defer r.mu.Unlock()
return r.persistent.CurrentTerm, r.state == StateLeader
}

// Stop shuts down the Raft module.
func (r *Raft) Stop() {
if atomic.CompareAndSwapInt32(&r.running, 1, 0) {
close(r.stopCh)
r.logger.Println("Stopped")
}
}

// ─────────────────────────────────────────────────────────────
// RPC Handlers
// ─────────────────────────────────────────────────────────────

// RequestVote handles the RequestVote RPC from a candidate.
func (r *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
r.mu.Lock()
defer r.mu.Unlock()

r.logger.Printf("RequestVote from candidate=%d, term=%d", args.CandidateID, args.Term)

// If the candidate's term is greater, update our term and become follower
if args.Term > r.persistent.CurrentTerm {
r.becomeFollower(args.Term)
}

reply.Term = r.persistent.CurrentTerm
reply.VoteGranted = false

// Grant vote only if:
// 1. Candidate's term is current
// 2. We haven't voted yet (or already voted for this candidate)
// 3. Candidate's log is at least as up-to-date as ours
if args.Term == r.persistent.CurrentTerm &&
(r.persistent.VotedFor == -1 || r.persistent.VotedFor == args.CandidateID) &&
r.candidateLogIsUpToDate(args.LastLogIndex, args.LastLogTerm) {

reply.VoteGranted = true
r.persistent.VotedFor = args.CandidateID
r.savePersistentState()

// Reset election timer since we just interacted with a valid candidate
r.electionResetTime = time.Now()

r.logger.Printf("Granted vote to candidate=%d for term=%d", args.CandidateID, args.Term)
} else {
r.logger.Printf("Denied vote to candidate=%d (votedFor=%d, logOK=%v)",
args.CandidateID, r.persistent.VotedFor,
r.candidateLogIsUpToDate(args.LastLogIndex, args.LastLogTerm))
}
}

// AppendEntries handles the AppendEntries RPC from the leader.
func (r *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
r.mu.Lock()
defer r.mu.Unlock()

// If leader's term is greater, update and become follower
if args.Term > r.persistent.CurrentTerm {
r.becomeFollower(args.Term)
}

reply.Success = false
reply.Term = r.persistent.CurrentTerm

// Reject if leader's term is stale
if args.Term < r.persistent.CurrentTerm {
r.logger.Printf("AppendEntries rejected: stale term (leader=%d, ours=%d)", args.Term, r.persistent.CurrentTerm)
return
}

// Valid leader; reset election timer
r.electionResetTime = time.Now()
r.state = StateFollower

// Consistency check: do we have the entry at prevLogIndex with prevLogTerm?
if args.PrevLogIndex > 0 {
if args.PrevLogIndex >= r.raftLog.Len() {
// We don't have the entry at prevLogIndex
reply.ConflictIndex = r.raftLog.Len()
reply.ConflictTerm = -1
r.logger.Printf("AppendEntries: missing prevLogIndex=%d (our log len=%d)", args.PrevLogIndex, r.raftLog.Len())
return
}

prevTerm := r.raftLog.TermAt(args.PrevLogIndex)
if prevTerm != args.PrevLogTerm {
// Term conflict; provide info for fast backtracking
reply.ConflictTerm = prevTerm
// Find first index with conflictTerm
reply.ConflictIndex = args.PrevLogIndex
for reply.ConflictIndex > 1 &&
r.raftLog.TermAt(reply.ConflictIndex-1) == prevTerm {
reply.ConflictIndex--
}
r.logger.Printf("AppendEntries: term conflict at index=%d (expected=%d, got=%d)",
args.PrevLogIndex, args.PrevLogTerm, prevTerm)
return
}
}

// Append new entries (handling conflicts)
if len(args.Entries) > 0 {
r.raftLog.AppendEntries(args.PrevLogIndex, args.Entries)
r.savePersistentState()
}

// Update commit index
if args.LeaderCommit > r.commitIndex {
lastNewIndex := args.PrevLogIndex + len(args.Entries)
if args.LeaderCommit < lastNewIndex {
r.commitIndex = args.LeaderCommit
} else {
r.commitIndex = lastNewIndex
}
r.logger.Printf("AppendEntries: updated commitIndex=%d", r.commitIndex)
}

reply.Success = true
}

// ─────────────────────────────────────────────────────────────
// Election Logic
// ─────────────────────────────────────────────────────────────

// electionTimer runs the election timeout loop.
func (r *Raft) electionTimer() {
for {
select {
case <-r.stopCh:
return
default:
}

timeout := r.electionTimeout()
r.mu.Lock()
r.electionResetTime = time.Now()
r.mu.Unlock()

ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-r.stopCh:
return
case <-ticker.C:
r.mu.Lock()
state := r.state
elapsed := time.Since(r.electionResetTime)
r.mu.Unlock()

if state == StateLeader {
// Leaders don't need election timers; just wait
time.Sleep(10 * time.Millisecond)
continue
}

if elapsed >= timeout {
r.startElection()
timeout = r.electionTimeout() // Reset with new random timeout
}
}
}
}
}

// electionTimeout returns a random election timeout duration.
func (r *Raft) electionTimeout() time.Duration {
base := r.config.ElectionTimeout
if base == 0 {
base = 150 * time.Millisecond
}
// Add random jitter of 0-150ms
jitter := time.Duration(rand.Int63n(int64(base)))
return base + jitter
}

// startElection begins a leader election.
func (r *Raft) startElection() {
r.mu.Lock()

r.state = StateCandidate
r.persistent.CurrentTerm++
currentTerm := r.persistent.CurrentTerm
r.persistent.VotedFor = r.id
r.electionResetTime = time.Now()
r.savePersistentState()

lastLogIndex := r.raftLog.LastIndex()
lastLogTerm := r.raftLog.LastTerm()

r.logger.Printf("Starting election for term=%d", currentTerm)
r.mu.Unlock()

// Vote counting
var votesReceived int32 = 1 // Vote for ourselves
var wg sync.WaitGroup

for _, peer := range r.peers {
if peer.ID == r.id {
continue
}
wg.Add(1)
go func(peer PeerConfig) {
defer wg.Done()

args := RequestVoteArgs{
Term: currentTerm,
CandidateID: r.id,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}

var reply RequestVoteReply
if err := r.sendRequestVote(peer, args, &reply); err != nil {
r.logger.Printf("RequestVote to peer=%d failed: %v", peer.ID, err)
return
}

r.mu.Lock()
defer r.mu.Unlock()

// If we received a higher term, step down
if reply.Term > r.persistent.CurrentTerm {
r.becomeFollower(reply.Term)
return
}

// If we're no longer a candidate (e.g., someone else won), abort
if r.state != StateCandidate || r.persistent.CurrentTerm != currentTerm {
return
}

if reply.VoteGranted {
votes := atomic.AddInt32(&votesReceived, 1)
majority := int32(len(r.peers)/2 + 1)

r.logger.Printf("Vote received from peer=%d (total=%d, needed=%d)",
peer.ID, votes, majority)

if votes >= majority {
r.becomeLeader()
}
}
}(peer)
}
}

// becomeFollower transitions the server to follower state.
// Must be called with r.mu held.
func (r *Raft) becomeFollower(term int) {
r.logger.Printf("Becoming Follower for term=%d (was %s, term=%d)",
term, r.state, r.persistent.CurrentTerm)

r.state = StateFollower
r.persistent.CurrentTerm = term
r.persistent.VotedFor = -1
r.electionResetTime = time.Now()
r.savePersistentState()
}

// becomeLeader transitions the server to leader state.
// Must be called with r.mu held.
func (r *Raft) becomeLeader() {
r.logger.Printf("Becoming Leader for term=%d", r.persistent.CurrentTerm)
r.state = StateLeader

// Initialize leader-specific volatile state
for _, peer := range r.peers {
r.nextIndex[peer.ID] = r.raftLog.LastIndex() + 1
r.matchIndex[peer.ID] = 0
}

// Start heartbeat/replication goroutines for each peer
for _, peer := range r.peers {
if peer.ID == r.id {
continue
}
go r.replicationLoop(peer, r.persistent.CurrentTerm)
}
}

// ─────────────────────────────────────────────────────────────
// Log Replication
// ─────────────────────────────────────────────────────────────

// replicationLoop sends AppendEntries RPCs to a peer continuously.
func (r *Raft) replicationLoop(peer PeerConfig, leaderTerm int) {
ticker := time.NewTicker(r.heartbeatInterval())
defer ticker.Stop()

for {
select {
case <-r.stopCh:
return
case <-ticker.C:
if !r.sendAppendEntries(peer, leaderTerm) {
return // No longer leader
}
case <-r.newEntryCh:
if !r.sendAppendEntries(peer, leaderTerm) {
return
}
}
}
}

// heartbeatInterval returns the heartbeat tick interval.
func (r *Raft) heartbeatInterval() time.Duration {
if r.config.HeartbeatTick > 0 {
return r.config.HeartbeatTick
}
return 50 * time.Millisecond
}

// sendAppendEntries sends an AppendEntries RPC to a peer.
// Returns false if this server is no longer the leader.
func (r *Raft) sendAppendEntries(peer PeerConfig, leaderTerm int) bool {
r.mu.Lock()

// Verify we're still the leader for the same term
if r.state != StateLeader || r.persistent.CurrentTerm != leaderTerm {
r.mu.Unlock()
return false
}

nextIdx := r.nextIndex[peer.ID]
prevLogIndex := nextIdx - 1
prevLogTerm := r.raftLog.TermAt(prevLogIndex)
if prevLogTerm < 0 {
prevLogTerm = 0
}

entries := r.raftLog.GetEntriesFrom(nextIdx)
commitIndex := r.commitIndex

args := AppendEntriesArgs{
Term: leaderTerm,
LeaderID: r.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: commitIndex,
}
r.mu.Unlock()

var reply AppendEntriesReply
if err := r.sendAppendEntriesRPC(peer, args, &reply); err != nil {
// Network error; peer might be down
return true // Still leader, just peer is unreachable
}

r.mu.Lock()
defer r.mu.Unlock()

// Check if we received a higher term → step down
if reply.Term > r.persistent.CurrentTerm {
r.becomeFollower(reply.Term)
return false
}

// Verify we're still leader for this term
if r.state != StateLeader || r.persistent.CurrentTerm != leaderTerm {
return false
}

if reply.Success {
// Update nextIndex and matchIndex for this peer
r.matchIndex[peer.ID] = prevLogIndex + len(entries)
r.nextIndex[peer.ID] = r.matchIndex[peer.ID] + 1

r.logger.Printf("AppendEntries success: peer=%d, matchIndex=%d, nextIndex=%d",
peer.ID, r.matchIndex[peer.ID], r.nextIndex[peer.ID])

// Check if we can advance the commit index
r.advanceCommitIndex()
} else {
// Fast backtracking using conflict info
if reply.ConflictTerm >= 0 {
// Find last entry in our log with conflictTerm
conflictIdx := -1
for i := r.raftLog.Len() - 1; i > 0; i-- {
if r.raftLog.TermAt(i) == reply.ConflictTerm {
conflictIdx = i
break
}
}
if conflictIdx >= 0 {
r.nextIndex[peer.ID] = conflictIdx + 1
} else {
r.nextIndex[peer.ID] = reply.ConflictIndex
}
} else {
r.nextIndex[peer.ID] = reply.ConflictIndex
}

if r.nextIndex[peer.ID] < 1 {
r.nextIndex[peer.ID] = 1
}

r.logger.Printf("AppendEntries failed: peer=%d, backtrack nextIndex to %d",
peer.ID, r.nextIndex[peer.ID])
}

return true
}

// advanceCommitIndex checks if the commit index can be advanced.
// Must be called with r.mu held.
func (r *Raft) advanceCommitIndex() {
// Find the highest index N such that:
// 1. N > commitIndex
// 2. log[N].term == currentTerm
// 3. A majority of servers have matchIndex >= N
for n := r.raftLog.LastIndex(); n > r.commitIndex; n-- {
if r.raftLog.TermAt(n) != r.persistent.CurrentTerm {
continue // Only commit entries from current term
}

// Count replications (including ourselves)
replicationCount := 1
for _, peer := range r.peers {
if peer.ID != r.id && r.matchIndex[peer.ID] >= n {
replicationCount++
}
}

majority := len(r.peers)/2 + 1
if replicationCount >= majority {
r.commitIndex = n
r.logger.Printf("Advanced commitIndex to %d (replicated on %d/%d servers)",
n, replicationCount, len(r.peers))
break
}
}
}

// ─────────────────────────────────────────────────────────────
// State Machine Application
// ─────────────────────────────────────────────────────────────

// applyCommitted sends newly committed entries to the commit channel.
func (r *Raft) applyCommitted() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-r.stopCh:
return
case <-ticker.C:
r.mu.Lock()
for r.commitIndex > r.lastApplied {
r.lastApplied++
entry, ok := r.raftLog.GetEntry(r.lastApplied)
if !ok {
r.logger.Printf("WARN: entry at lastApplied=%d not found", r.lastApplied)
break
}

commitEntry := CommitEntry{
Command: entry.Command,
Index: r.lastApplied,
Term: entry.Term,
}
r.mu.Unlock()

r.logger.Printf("Applying entry index=%d, term=%d", r.lastApplied, entry.Term)
r.commitCh <- commitEntry

r.mu.Lock()
}
r.mu.Unlock()
}
}
}

// ─────────────────────────────────────────────────────────────
// Helper Methods
// ─────────────────────────────────────────────────────────────

// candidateLogIsUpToDate returns true if the candidate's log is at
// least as up-to-date as ours.
// Must be called with r.mu held.
func (r *Raft) candidateLogIsUpToDate(candidateLastIndex, candidateLastTerm int) bool {
ourLastIndex := r.raftLog.LastIndex()
ourLastTerm := r.raftLog.LastTerm()

if candidateLastTerm != ourLastTerm {
return candidateLastTerm > ourLastTerm
}
return candidateLastIndex >= ourLastIndex
}

// ─────────────────────────────────────────────────────────────
// Persistence
// ─────────────────────────────────────────────────────────────

// savePersistentState saves term and votedFor to disk.
// Must be called with r.mu held.
func (r *Raft) savePersistentState() {
if r.config.StoragePath == "" {
return
}
data, err := json.Marshal(r.persistent)
if err != nil {
r.logger.Printf("ERROR: marshaling persistent state: %v", err)
return
}
if err := os.WriteFile(r.config.StoragePath+".meta", data, 0644); err != nil {
r.logger.Printf("ERROR: saving persistent state: %v", err)
}
}

// loadPersistentState restores term and votedFor from disk.
func (r *Raft) loadPersistentState() error {
if r.config.StoragePath == "" {
return os.ErrNotExist
}
data, err := os.ReadFile(r.config.StoragePath + ".meta")
if err != nil {
return err
}
return json.Unmarshal(data, &r.persistent)
}

// ─────────────────────────────────────────────────────────────
// HTTP-based RPC Transport
// ─────────────────────────────────────────────────────────────

// sendRequestVote sends a RequestVote RPC over HTTP.
func (r *Raft) sendRequestVote(peer PeerConfig, args RequestVoteArgs, reply *RequestVoteReply) error {
return r.sendRPC(peer.Address+"/raft/request-vote", args, reply)
}

// sendAppendEntriesRPC sends an AppendEntries RPC over HTTP.
func (r *Raft) sendAppendEntriesRPC(peer PeerConfig, args AppendEntriesArgs, reply *AppendEntriesReply) error {
return r.sendRPC(peer.Address+"/raft/append-entries", args, reply)
}

// sendRPC is a generic HTTP-based RPC helper.
func (r *Raft) sendRPC(url string, args, reply interface{}) error {
body, err := json.Marshal(args)
if err != nil {
return fmt.Errorf("marshaling args: %w", err)
}

resp, err := r.httpClient.Post(url, "application/json", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("HTTP POST to %s: %w", url, err)
}
defer resp.Body.Close()

return json.NewDecoder(resp.Body).Decode(reply)
}

8.6 state_machine.go — Key-Value State Machine

package raft

import (
"fmt"
"sync"
)

// KVStore is a simple key-value state machine driven by Raft.
type KVStore struct {
mu sync.RWMutex
store map[string]string
commitCh chan CommitEntry
raft *Raft

// Pending client requests waiting for log entries to be committed
pendingMu sync.Mutex
pending map[int]chan Result // index → channel
}

// Result represents the result of a command applied to the state machine.
type Result struct {
Value string
Err error
}

// NewKVStore creates a new key-value store backed by Raft.
func NewKVStore(raft *Raft, commitCh chan CommitEntry) *KVStore {
kv := &KVStore{
store: make(map[string]string),
commitCh: commitCh,
raft: raft,
pending: make(map[int]chan Result),
}
go kv.applyLoop()
return kv
}

// Set stores a key-value pair through Raft consensus.
func (kv *KVStore) Set(key, value string) error {
cmd := Command{Op: "set", Key: key, Value: value}
return kv.propose(cmd)
}

// Delete removes a key through Raft consensus.
func (kv *KVStore) Delete(key string) error {
cmd := Command{Op: "delete", Key: key}
return kv.propose(cmd)
}

// Get reads a value directly from the local state machine.
// Note: For linearizable reads, you'd need to implement a read index mechanism.
func (kv *KVStore) Get(key string) (string, bool) {
kv.mu.RLock()
defer kv.mu.RUnlock()
val, ok := kv.store[key]
return val, ok
}

// propose proposes a command to the Raft cluster and waits for it to be committed.
func (kv *KVStore) propose(cmd Command) error {
index, _, isLeader := kv.raft.Submit(cmd)
if !isLeader {
return fmt.Errorf("not the leader")
}

// Register a pending channel for this log index
ch := make(chan Result, 1)
kv.pendingMu.Lock()
kv.pending[index] = ch
kv.pendingMu.Unlock()

// Wait for the command to be applied
result := <-ch
return result.Err
}

// applyLoop applies committed Raft log entries to the KV state machine.
func (kv *KVStore) applyLoop() {
for entry := range kv.commitCh {
cmd, ok := entry.Command.(Command)
if !ok {
// Try JSON unmarshaling (when decoded from JSON transport)
if m, ok2 := entry.Command.(map[string]interface{}); ok2 {
cmd = Command{
Op: m["op"].(string),
Key: m["key"].(string),
Value: fmt.Sprintf("%v", m["value"]),
}
}
}

var result Result

kv.mu.Lock()
switch cmd.Op {
case "set":
kv.store[cmd.Key] = cmd.Value
case "delete":
delete(kv.store, cmd.Key)
default:
result.Err = fmt.Errorf("unknown operation: %s", cmd.Op)
}
kv.mu.Unlock()

// Notify any waiting client
kv.pendingMu.Lock()
if ch, ok := kv.pending[entry.Index]; ok {
ch <- result
delete(kv.pending, entry.Index)
}
kv.pendingMu.Unlock()
}
}

8.7 server.go — HTTP Server

package raft

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)

// Server exposes the KV store and Raft RPCs over HTTP.
type Server struct {
id int
address string
raft *Raft
kv *KVStore
srv *http.Server
}

// NewServer creates a new HTTP server.
func NewServer(id int, address string, raft *Raft, kv *KVStore) *Server {
s := &Server{
id: id,
address: address,
raft: raft,
kv: kv,
}

mux := http.NewServeMux()

// Client-facing API
mux.HandleFunc("/kv/get", s.handleGet)
mux.HandleFunc("/kv/set", s.handleSet)
mux.HandleFunc("/kv/delete", s.handleDelete)
mux.HandleFunc("/status", s.handleStatus)

// Raft RPC endpoints
mux.HandleFunc("/raft/request-vote", s.handleRequestVote)
mux.HandleFunc("/raft/append-entries", s.handleAppendEntries)

s.srv = &http.Server{
Addr: address,
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
}

return s
}

// Start starts the HTTP server.
func (s *Server) Start() error {
fmt.Printf("[Server %d] Listening on %s\n", s.id, s.address)
return s.srv.ListenAndServe()
}

// Stop gracefully shuts down the HTTP server.
func (s *Server) Stop(ctx context.Context) error {
return s.srv.Shutdown(ctx)
}

// ─── Client API Handlers ─────────────────────────────────────────────────────

func (s *Server) handleGet(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "key is required", http.StatusBadRequest)
return
}

value, ok := s.kv.Get(key)
if !ok {
http.Error(w, "key not found", http.StatusNotFound)
return
}

respondJSON(w, map[string]string{"key": key, "value": value})
}

func (s *Server) handleSet(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

var req struct {
Key string `json:"key"`
Value string `json:"value"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON body", http.StatusBadRequest)
return
}

if err := s.kv.Set(req.Key, req.Value); err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}

respondJSON(w, map[string]string{"status": "ok"})
}

func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "key is required", http.StatusBadRequest)
return
}

if err := s.kv.Delete(key); err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}

respondJSON(w, map[string]string{"status": "ok"})
}

func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
term, isLeader := s.raft.GetState()
respondJSON(w, map[string]interface{}{
"id": s.id,
"term": term,
"is_leader": isLeader,
})
}

// ─── Raft RPC Handlers ────────────────────────────────────────────────────────

func (s *Server) handleRequestVote(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

var args RequestVoteArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}

var reply RequestVoteReply
s.raft.RequestVote(args, &reply)
respondJSON(w, reply)
}

func (s *Server) handleAppendEntries(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

var args AppendEntriesArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}

var reply AppendEntriesReply
s.raft.AppendEntries(args, &reply)
respondJSON(w, reply)
}

// ─── Helper ───────────────────────────────────────────────────────────────────

func respondJSON(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(data); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
}

8.8 main.go — Entry Point

package main

import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"

raft "github.com/yourusername/raft-demo"
)

func main() {
var (
id = flag.Int("id", 1, "Server ID")
address = flag.String("addr", ":8001", "Server address")
peersFlag = flag.String("peers", "", "Comma-separated peer addresses (id:addr,...)")
storagePath = flag.String("storage", "", "Path prefix for persistent storage")
)
flag.Parse()

// Parse peers
peers := parsePeers(*peersFlag)

// Build config (all peers including self)
allPeers := append(peers, raft.PeerConfig{ID: *id, Address: "http://" + *address})

config := raft.Config{
ID: *id,
Peers: allPeers,
ElectionTimeout: 150 * time.Millisecond,
HeartbeatTick: 50 * time.Millisecond,
StoragePath: fmt.Sprintf("%s/node-%d", *storagePath, *id),
}

if config.StoragePath != "/" {
if err := os.MkdirAll(fmt.Sprintf("%s", *storagePath), 0755); err != nil {
log.Fatalf("creating storage directory: %v", err)
}
}

// Create commit channel
commitCh := make(chan raft.CommitEntry, 100)

// Create Raft node
raftNode, err := raft.NewRaft(config, commitCh)
if err != nil {
log.Fatalf("creating raft node: %v", err)
}

// Create KV store
kvStore := raft.NewKVStore(raftNode, commitCh)

// Create and start HTTP server
server := raft.NewServer(*id, *address, raftNode, kvStore)

// Graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

go func() {
<-sigCh
fmt.Println("\nShutting down...")
raftNode.Stop()
os.Exit(0)
}()

log.Fatalf("Server error: %v", server.Start())
}

// parsePeers parses a comma-separated list of "id:address" peer configs.
// Example: "2:http://localhost:8002,3:http://localhost:8003"
func parsePeers(peersFlag string) []raft.PeerConfig {
if peersFlag == "" {
return nil
}

var peers []raft.PeerConfig
for _, p := range strings.Split(peersFlag, ",") {
parts := strings.SplitN(p, ":", 2)
if len(parts) != 2 {
log.Fatalf("invalid peer format: %s (expected id:address)", p)
}
var id int
fmt.Sscanf(parts[0], "%d", &id)
peers = append(peers, raft.PeerConfig{ID: id, Address: parts[1]})
}
return peers
}

9. Running and Testing the Cluster {#testing}

9.1 Starting a 3-Node Cluster

Open three terminals and run:

# Terminal 1 - Node 1
go run . \
-id 1 \
-addr :8001 \
-peers "2:http://localhost:8002,3:http://localhost:8003" \
-storage /tmp/raft

# Terminal 2 - Node 2
go run . \
-id 2 \
-addr :8002 \
-peers "1:http://localhost:8001,3:http://localhost:8003" \
-storage /tmp/raft

# Terminal 3 - Node 3
go run . \
-id 3 \
-addr :8003 \
-peers "1:http://localhost:8001,2:http://localhost:8002" \
-storage /tmp/raft

9.2 Interacting with the Cluster

# Check which node is the leader
curl http://localhost:8001/status
curl http://localhost:8002/status
curl http://localhost:8003/status

# Expected output for leader:
# {"id":1,"is_leader":true,"term":3}

# Set a key (must go to leader)
curl -X POST http://localhost:8001/kv/set \
-H "Content-Type: application/json" \
-d '{"key":"hello","value":"world"}'
# {"status":"ok"}

# Get the key from any node
curl "http://localhost:8002/kv/get?key=hello"
# {"key":"hello","value":"world"}

# Delete a key
curl -X DELETE "http://localhost:8001/kv/delete?key=hello"
# {"status":"ok"}

9.3 Testing Fault Tolerance

# Kill the leader (Ctrl+C on Terminal 1)
# Wait ~300ms for election...

# New leader should be elected
curl http://localhost:8002/status
# {"id":2,"is_leader":true,"term":4}

# The cluster still works!
curl -X POST http://localhost:8002/kv/set \
-H "Content-Type: application/json" \
-d '{"key":"resilient","value":"true"}'
# {"status":"ok"}

# Restart node 1
go run . -id 1 -addr :8001 \
-peers "2:http://localhost:8002,3:http://localhost:8003" \
-storage /tmp/raft

# Node 1 catches up automatically via AppendEntries
curl "http://localhost:8001/kv/get?key=resilient"
# {"key":"resilient","value":"true"}

9.4 Unit Testing Leader Election

package raft_test

import (
"testing"
"time"
raft "github.com/yourusername/raft-demo"
)

// TestLeaderElection verifies that exactly one leader is elected.
func TestLeaderElection(t *testing.T) {
cluster := newTestCluster(t, 3)
defer cluster.stop()

// Wait for leader election
time.Sleep(500 * time.Millisecond)

leaders := 0
leaderTerm := 0
for _, node := range cluster.nodes {
term, isLeader := node.GetState()
if isLeader {
leaders++
leaderTerm = term
}
}

if leaders != 1 {
t.Fatalf("expected exactly 1 leader, got %d", leaders)
}

if leaderTerm == 0 {
t.Fatal("leader term should be > 0")
}

t.Logf("Leader elected successfully in term %d", leaderTerm)
}

// TestLogReplication verifies that committed entries appear on all nodes.
func TestLogReplication(t *testing.T) {
cluster := newTestCluster(t, 3)
defer cluster.stop()

time.Sleep(400 * time.Millisecond)

// Find the leader
var leader *raft.Raft
for _, node := range cluster.nodes {
_, isLeader := node.GetState()
if isLeader {
leader = node
break
}
}

if leader == nil {
t.Fatal("no leader elected")
}

// Submit a command
cmd := raft.Command{Op: "set", Key: "test", Value: "value"}
index, term, ok := leader.Submit(cmd)
if !ok {
t.Fatal("submit failed: not leader")
}

t.Logf("Submitted command at index=%d, term=%d", index, term)

// Wait for replication
time.Sleep(200 * time.Millisecond)

// All nodes should have the committed entry
for i, commitCh := range cluster.commitChs {
select {
case entry := <-commitCh:
t.Logf("Node %d applied entry at index=%d", i+1, entry.Index)
case <-time.After(500 * time.Millisecond):
t.Errorf("Node %d did not apply committed entry", i+1)
}
}
}

10. Common Pitfalls and Best Practices {#pitfalls}

10.1 Off-by-One Errors in Log Indexing

Raft logs are 1-indexed by convention (index 0 is a sentinel/dummy entry). Forgetting this leads to subtle bugs:

// Wrong: Treating log as 0-indexed
prevLogIndex := len(log) - 1

// Correct: Accounting for sentinel entry
prevLogIndex := raftLog.LastIndex() // returns len(entries) - 1

10.2 Not Persisting State Before Responding

State must be written to stable storage before sending any RPC response:

// Wrong order
r.persistent.VotedFor = args.CandidateID
reply.VoteGranted = true
// Crash here loses votedFor!

// Correct order
r.persistent.VotedFor = args.CandidateID
r.savePersistentState() // Write to disk first
reply.VoteGranted = true

10.3 Committing Entries From Previous Terms

A leader can only commit entries from its own term directly. Entries from previous terms are committed indirectly (when a new entry in the current term is committed):

// Wrong: Committing by replication count alone
if replicationCount >= majority {
r.commitIndex = n // Could commit stale entries!
}

// Correct: Only commit entries from current term
if r.raftLog.TermAt(n) == r.persistent.CurrentTerm &&
replicationCount >= majority {
r.commitIndex = n
}

10.4 Mutex Deadlocks

Never hold the Raft mutex while sending RPCs. RPC calls can block, causing deadlocks:

// Wrong: Holding lock during network call
func (r *Raft) badSendRPC() {
r.mu.Lock()
defer r.mu.Unlock()
r.httpClient.Post(url, ...) // DEADLOCK if reply calls back!
}

// Release lock before network operations
func (r *Raft) goodSendRPC() {
r.mu.Lock()
args := r.buildArgs() // Gather needed data
r.mu.Unlock() // Release before network call

r.httpClient.Post(url, args)

r.mu.Lock()
r.processReply() // Process reply with lock
r.mu.Unlock()
}

10.5 Split-Brain During Leader Changes

Always check that you’re still the leader for the same term after receiving an RPC reply:

// Always verify leadership after receiving any reply
r.mu.Lock()
defer r.mu.Unlock()

if r.state != StateLeader || r.persistent.CurrentTerm != savedTerm {
return // No longer relevant
}
// Safe to proceed with leader actions

10.6 Election Timer Reset Rules

The election timer should only be reset when:

  • An AppendEntries RPC is received from the current leader
  • A vote is granted to a candidate
  • Starting a new election (becoming candidate)
// Wrong: Resetting on every RPC
func (r *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
r.electionResetTime = time.Now() // Too early! May be stale message
...
}

// Correct: Only reset for valid leaders
if args.Term >= r.persistent.CurrentTerm {
r.electionResetTime = time.Now() // Validated leader
}

11. Production Considerations

11.1 Log Compaction and Snapshots

For long-running systems, the Raft log grows unboundedly. Production implementations need log snapshots:

Without Snapshots:
Log: [entry1][entry2]...[entry10000] → Slow restart, high memory

With Snapshots:
Snapshot: {state at index 9000} + Log: [entry9001]...[entry10000]

The process:

  1. State machine periodically creates a snapshot of its state
  2. Raft discards log entries before the snapshot index
  3. When a follower is far behind, leader sends snapshot via InstallSnapshot RPC

11.2 Read-Only Query Optimization

Naive reads from the leader still require a round-trip to confirm leadership. Production systems use ReadIndex:

  1. Leader records its current commitIndex as readIndex
  2. Leader sends a heartbeat to confirm it’s still the leader
  3. Leader waits until lastApplied >= readIndex
  4. Serves the read from local state

This avoids writing to the log for read-only operations.

11.3 Pre-Vote Extension

To prevent disruptive elections when a partitioned server rejoins, implement the Pre-Vote extension:

Normal Vote:  Candidate increments term → Forces others to reset votedFor
Pre-Vote: Candidate asks "would you vote for me?" without incrementing term
Only starts real election if it would receive majority pre-votes

11.4 Follower Reads with Lease

For lower-latency reads, leaders can use lease-based reads:

  • Leader knows it’s the only leader for at least electionTimeout after last heartbeat
  • Serve reads without network roundtrip during lease window
  • Requires bounded clock skew between nodes

12. Conclusion

The Raft consensus algorithm is a landmark achievement in distributed systems — elegant in its decomposition, rigorous in its safety guarantees, and practical enough to power the world’s most critical infrastructure.

Key Takeaways

  • Raft is built on three pillars: Leader election, log replication, and safety constraints that together guarantee correctness
  • Strong consistency is achieved through majority voting — the cluster remains available as long as ⌊N/2⌋ servers are alive
  • Golang is an excellent choice for Raft implementations thanks to goroutines, channels, and its robust standard library
  • Production use requires additional features: log snapshots, linearizable reads, pre-vote, and membership changes
  • The devil is in the details — mutex discipline, persistence ordering, and term checking are where most bugs hide

Further Reading