Raft is a distributed consensus algorithm designed to be understandable, reliable, and practical for building fault-tolerant distributed systems. Introduced in 2014 by Diego Ongaro and John Ousterhout in their landmark paper “In Search of an Understandable Consensus Algorithm”, Raft has become the backbone of many production systems including etcd, CockroachDB, TiKV, and Consul.
At its core, Raft solves one of the hardest problems in distributed systems:
How do multiple servers agree on a single value or sequence of values even when some of those servers crash, disconnect, or behave slowly?
This problem is known as the consensus problem, and solving it correctly is what separates reliable distributed databases and coordination services from fragile, inconsistent ones.
Key Properties of Raft
| Property | Description |
|---|---|
| Safety | Never returns incorrect results under any non-Byzantine failure |
| Availability | Fully functional as long as a majority of servers are running |
| No clocks | Doesn’t rely on synchronized clocks for correctness |
| Latency | A minority of slow servers doesn’t impact overall performance |
Before we dive in deeper, please make sure that you watch this youtube video,so you can understand the article better, since it’s a complicated one, here is the youtube video:
َAlso implemented code in this example is available at below repo, feel free to clone and test the project:
https://github.com/mjmichael73/go-raft
Table of Contents
- Why Raft Over Paxos?
- Core Architecture of Raft
- Leader Election
- Log Replication
- Safety and Consistency Guarantees
- Cluster Membership Changes
- Real-World Use Cases
- Practical Go Implementation
- Testing Your Raft Implementation
- Common Pitfalls
- Conclusion
1. Why Raft Over Paxos?
Before Raft, Paxos (created by Leslie Lamport) was the de facto consensus algorithm. However, Paxos suffers from several well-documented issues:
- Extremely difficult to understand — even Lamport himself acknowledged this in later papers
- Incomplete specification — Paxos only describes single-decree consensus, not the full replicated log needed in practice
- Poor pedagogical structure — the algorithm is hard to decompose and teach
- Numerous variations — Multi-Paxos, Fast Paxos, Byzantine Paxos, each with their own trade-offs
Raft was explicitly designed to be decomposable into relatively independent sub-problems:
- Leader election
- Log replication
- Safety
- Membership changes
This decomposition makes Raft significantly easier to implement correctly in production systems.
2. Core Architecture of Raft
2.1 Server States
Every server in a Raft cluster exists in one of exactly three states at any point in time:
┌─────────────┐ times out, ┌─────────────┐
│ │ starts election │ │
│ Follower │──────────────────────▶ Candidate │
│ │ │ │
└─────────────┘ └─────────────┘
▲ │
│ discovers current │ receives votes from
│ leader or new term │ majority of servers
│ ▼
│ ┌─────────────┐
└──────────────────────────────│ │
discovers server with │ Leader │
higher term │ │
└─────────────┘| State | Description |
|---|---|
| Follower | Passive; responds to requests from leaders and candidates |
| Candidate | Trying to become the leader; votes for itself and requests votes |
| Leader | Handles all client requests; replicates log entries to followers |
2.2 Terms
Raft divides time into terms — numbered with consecutive integers. Each term begins with an election. Terms act as a logical clock in Raft:
Term 1 Term 2 Term 3 Term 4
┌──────────┐ ┌──────┐ ┌───────────┐ ┌──────────────
│ Election │ │ Elec │ │ Election │ │ Election...
│ Leader A │ │ Fail │ │ Leader B │ │ Leader C...
└──────────┘ └──────┘ └───────────┘ └──────────────- If a server’s current term is smaller than another’s, it immediately updates its term
- If a candidate or leader discovers its term is out of date, it reverts to follower
- Messages with stale term numbers are rejected
2.3 RPCs (Remote Procedure Calls)
Raft uses only two core RPCs for consensus:
RequestVote RPC — Initiated by candidates during elections
Request Fields:
- term (candidate's current term)
- candidateId (candidate requesting the vote)
- lastLogIndex (index of candidate's last log entry)
- lastLogTerm (term of candidate's last log entry)
Response Fields:
- term (currentTerm for candidate to update itself)
- voteGranted (true means candidate received vote)AppendEntries RPC — Used by leader for log replication AND as heartbeat
Request Fields:
- term (leader's term)
- leaderId (so follower can redirect clients)
- prevLogIndex (index of log entry immediately preceding new ones)
- prevLogTerm (term of prevLogIndex entry)
- entries[] (log entries to store; empty for heartbeat)
- leaderCommit (leader's commitIndex)
Response Fields:
- term (currentTerm for leader to update itself)
- success (true if follower contained entry matching prevLogIndex/prevLogTerm)2.4 Persistent State
Each server must persist certain state to stable storage before responding to RPCs, so it can safely recover from crashes:
| State | Description | Storage |
|---|---|---|
currentTerm | Latest term server has seen | Persistent |
votedFor | CandidateId voted for in current term | Persistent |
log[] | Log entries (command + term) | Persistent |
commitIndex | Index of highest log entry known to be committed | Volatile |
lastApplied | Index of highest log entry applied to state machine | Volatile |
nextIndex[] | For each server, index of next log entry to send (leader only) | Volatile |
matchIndex[] | For each server, highest log entry known to be replicated (leader only) | Volatile |
3. Leader Election
Leader election is the mechanism by which Raft ensures exactly one leader per term.
3.1 Election Process Step-by-Step
Step 1: Follower times out (no heartbeat from leader)
↓
Step 2: Follower increments currentTerm, transitions to Candidate
↓
Step 3: Votes for itself, sends RequestVote RPCs to all other servers
↓
Step 4a: Receives votes from majority → becomes LEADER
Step 4b: Receives AppendEntries from valid leader → reverts to FOLLOWER
Step 4c: Election timeout expires (split vote) → increment term, start over3.2 Election Timeout
Raft uses randomized election timeouts (typically 150–300ms) to avoid split votes. The randomization ensures that in most cases, only one server times out first and wins the election before others wake up.
3.3 Vote Granting Rules
A server grants a vote if and only if both conditions hold:
- The candidate’s term is at least as large as the voter’s current term
- The voter has not already voted in this term for someone else
- The candidate’s log is at least as up-to-date as the voter’s log
Log up-to-date comparison:
- If the last entries in the logs have different terms → higher term wins
- If the logs end with the same term → longer log wins
4. Log Replication
Once a leader is elected, it begins servicing client requests through log replication.
4.1 The Replication Flow
Client ──→ Leader
│
│ 1. Append entry to local log
│ 2. Send AppendEntries RPC to all followers (parallel)
│
├──→ Follower 1 ──→ Success
├──→ Follower 2 ──→ Success ← Majority achieved
└──→ Follower 3 ──→ (slow/crashed)
3. Commit entry (majority replied)
4. Apply to state machine
5. Respond to client
6. Notify followers of commit (via next AppendEntries)4.2 Log Entry Structure
Index: 1 2 3 4 5 6 7
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
Term: │ 1 │ 1 │ 1 │ 2 │ 2 │ 3 │ 3 │
├─────┼─────┼─────┼─────┼─────┼─────┼─────┤
Cmd: │ x=3 │ y=1 │ y=9 │ x=2 │ x=0 │ y=7 │ x=5 │
└─────┴─────┴─────┴─────┴─────┴─────┴─────┘
▲
commitIndex4.3 The Log Matching Property
Raft maintains two critical invariants:
- Index + Term uniqueness: If two entries in different logs have the same index and term, they store the same command
- Consistency: If two logs contain an entry with the same index and term, then all preceding entries are identical
These properties are enforced by the consistency check in AppendEntries: the leader includes prevLogIndex and prevLogTerm, and followers reject the request if their log doesn’t match.
4.4 Handling Inconsistent Logs
When a new leader is elected, follower logs may be inconsistent. The leader handles this by:
- Starting with
nextIndex[follower] = leader's last log index + 1 - If AppendEntries fails due to inconsistency → decrement
nextIndexand retry - Continue until follower’s log matches
- Then send all missing entries in one batch
5. Safety and Consistency Guarantees
5.1 The Leader Completeness Property
Raft guarantees that if a log entry is committed in a given term, that entry will be present in the logs of all leaders for all higher terms. This is enforced through the vote restriction: a candidate cannot win an election unless its log contains all committed entries.
5.2 State Machine Safety
No two servers ever apply different commands to the same log index. This follows from:
- Log entries can only be committed by the current leader
- Once committed, entries are never overwritten
- Only candidates with up-to-date logs can become leaders
5.3 Timing Requirement
For Raft to elect and maintain a stable leader:
broadcastTime << electionTimeout << MTBFWhere:
broadcastTime≈ 0.5ms–20ms (time to send RPC to all servers and receive responses)electionTimeout≈ 10ms–500msMTBF= mean time between failures for a single server
6. Cluster Membership Changes
Adding or removing servers from a live Raft cluster is tricky because you can’t switch all servers atomically.
6.1 Joint Consensus
Raft uses joint consensus for safe configuration changes:
Phase 1: Leader commits C_old,new (joint configuration)
During this phase, decisions require majority of BOTH old and new configs
Phase 2: Leader commits C_new (new configuration)
Decisions only require majority of new config6.2 Single-Server Changes
A simpler approach is changing one server at a time. Adding one server at a time can never result in two non-overlapping majorities, making it safe without joint consensus.
7. Real-World Use Cases
Raft is widely used in production systems across the industry:
| System | Usage |
|---|---|
| etcd | Kubernetes’ primary coordination store; built entirely on Raft via the etcd/raft library |
| CockroachDB | Each range/shard is managed by a Raft group |
| TiKV | Distributed key-value store powering TiDB; uses Multi-Raft |
| Consul | Service mesh and service discovery |
| InfluxDB | Meta-node cluster coordination |
| Nomad | HashiCorp’s workload orchestrator |
| YugabyteDB | Distributed SQL database |
| Dgraph | Distributed graph database |
7.1 When To Use Raft
Raft is ideal for:
- Distributed databases requiring strong consistency
- Configuration stores (key-value, feature flags)
- Distributed locks and leases
- Leader election in microservices
- Distributed job schedulers
- Replicated state machines of any kind
8. Practical Go Implementation
Let’s build a complete, working Raft implementation in Go. This implementation covers leader election, log replication, and basic state machine application.
8.1 Project Structure
raft/
├── go.mod
├── raft.go # Core Raft implementation
├── rpc.go # RPC types and transport
├── log.go # Log management
├── state_machine.go # Key-value state machine
├── server.go # HTTP server exposing the KV store
└── main.go # Entry point and cluster bootstrapping8.2 go.mod
module github.com/yourusername/raft-demo
go 1.218.3 rpc.go — RPC Types
package raft
import "time"
// LogEntry represents a single entry in the Raft log.
type LogEntry struct {
Term int `json:"term"`
Index int `json:"index"`
Command interface{} `json:"command"`
}
// RequestVoteArgs holds the arguments for the RequestVote RPC.
type RequestVoteArgs struct {
Term int `json:"term"`
CandidateID int `json:"candidate_id"`
LastLogIndex int `json:"last_log_index"`
LastLogTerm int `json:"last_log_term"`
}
// RequestVoteReply holds the response for the RequestVote RPC.
type RequestVoteReply struct {
Term int `json:"term"`
VoteGranted bool `json:"vote_granted"`
}
// AppendEntriesArgs holds the arguments for the AppendEntries RPC.
type AppendEntriesArgs struct {
Term int `json:"term"`
LeaderID int `json:"leader_id"`
PrevLogIndex int `json:"prev_log_index"`
PrevLogTerm int `json:"prev_log_term"`
Entries []LogEntry `json:"entries"`
LeaderCommit int `json:"leader_commit"`
}
// AppendEntriesReply holds the response for the AppendEntries RPC.
type AppendEntriesReply struct {
Term int `json:"term"`
Success bool `json:"success"`
ConflictIndex int `json:"conflict_index"` // Optimization: fast log backtracking
ConflictTerm int `json:"conflict_term"`
}
// Command represents a client command to the state machine.
type Command struct {
Op string `json:"op"` // "set", "get", "delete"
Key string `json:"key"`
Value string `json:"value"`
}
// CommitEntry is sent on the commit channel when an entry is committed.
type CommitEntry struct {
Command interface{}
Index int
Term int
}
// PeerConfig holds the configuration for a peer server.
type PeerConfig struct {
ID int
Address string
}
// Config holds the configuration for a Raft node.
type Config struct {
ID int
Peers []PeerConfig
ElectionTimeout time.Duration
HeartbeatTick time.Duration
StoragePath string
}8.4 log.go — Log Management
package raft
import (
"encoding/json"
"fmt"
"os"
"sync"
)
// RaftLog manages the Raft log with persistence.
type RaftLog struct {
mu sync.RWMutex
entries []LogEntry
path string
}
// NewRaftLog creates or restores a Raft log from disk.
func NewRaftLog(storagePath string) (*RaftLog, error) {
rl := &RaftLog{
entries: []LogEntry{
// Sentinel entry at index 0 to simplify index math
{Term: 0, Index: 0},
},
path: storagePath,
}
if storagePath != "" {
if err := rl.load(); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("loading raft log: %w", err)
}
}
return rl, nil
}
// AppendEntry appends a new entry to the log.
func (rl *RaftLog) AppendEntry(entry LogEntry) {
rl.mu.Lock()
defer rl.mu.Unlock()
entry.Index = len(rl.entries)
rl.entries = append(rl.entries, entry)
rl.persist()
}
// AppendEntries appends multiple entries, truncating conflicting entries first.
func (rl *RaftLog) AppendEntries(prevIndex int, entries []LogEntry) {
rl.mu.Lock()
defer rl.mu.Unlock()
for i, entry := range entries {
idx := prevIndex + 1 + i
if idx < len(rl.entries) {
// Conflict detected: truncate from this point
if rl.entries[idx].Term != entry.Term {
rl.entries = rl.entries[:idx]
rl.entries = append(rl.entries, entries[i:]...)
break
}
// Entry already exists and matches; skip
} else {
// Append new entries
rl.entries = append(rl.entries, entries[i:]...)
break
}
}
rl.persist()
}
// GetEntry returns the log entry at the given index.
func (rl *RaftLog) GetEntry(index int) (LogEntry, bool) {
rl.mu.RLock()
defer rl.mu.RUnlock()
if index < 0 || index >= len(rl.entries) {
return LogEntry{}, false
}
return rl.entries[index], true
}
// GetEntriesFrom returns all entries starting from the given index.
func (rl *RaftLog) GetEntriesFrom(index int) []LogEntry {
rl.mu.RLock()
defer rl.mu.RUnlock()
if index >= len(rl.entries) {
return nil
}
result := make([]LogEntry, len(rl.entries)-index)
copy(result, rl.entries[index:])
return result
}
// LastIndex returns the index of the last log entry.
func (rl *RaftLog) LastIndex() int {
rl.mu.RLock()
defer rl.mu.RUnlock()
return len(rl.entries) - 1
}
// LastTerm returns the term of the last log entry.
func (rl *RaftLog) LastTerm() int {
rl.mu.RLock()
defer rl.mu.RUnlock()
if len(rl.entries) == 0 {
return 0
}
return rl.entries[len(rl.entries)-1].Term
}
// TermAt returns the term of the log entry at the given index.
func (rl *RaftLog) TermAt(index int) int {
rl.mu.RLock()
defer rl.mu.RUnlock()
if index < 0 || index >= len(rl.entries) {
return -1
}
return rl.entries[index].Term
}
// Len returns the total number of log entries (including sentinel).
func (rl *RaftLog) Len() int {
rl.mu.RLock()
defer rl.mu.RUnlock()
return len(rl.entries)
}
// persist saves the log to disk.
func (rl *RaftLog) persist() {
if rl.path == "" {
return
}
data, err := json.Marshal(rl.entries)
if err != nil {
return
}
_ = os.WriteFile(rl.path+".log", data, 0644)
}
// load restores the log from disk.
func (rl *RaftLog) load() error {
data, err := os.ReadFile(rl.path + ".log")
if err != nil {
return err
}
return json.Unmarshal(data, &rl.entries)
}8.5 raft.go — The Core Implementation
package raft
import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
)
// ServerState represents the current state of a Raft server.
type ServerState int32
const (
StateFollower ServerState = iota
StateCandidate ServerState = iota
StateLeader ServerState = iota
)
func (s ServerState) String() string {
switch s {
case StateFollower:
return "Follower"
case StateCandidate:
return "Candidate"
case StateLeader:
return "Leader"
default:
return "Unknown"
}
}
// PersistentState holds state that must survive crashes.
type PersistentState struct {
CurrentTerm int `json:"current_term"`
VotedFor int `json:"voted_for"` // -1 if not voted
}
// Raft is the core consensus module.
type Raft struct {
mu sync.Mutex
logger *log.Logger
// Configuration
id int
peers []PeerConfig
config Config
// Persistent state (must be saved to stable storage before responding to RPCs)
persistent PersistentState
// Volatile state
state ServerState
commitIndex int
lastApplied int
// Leader-only volatile state (reinitialized after election)
nextIndex map[int]int
matchIndex map[int]int
// Log
raftLog *RaftLog
// Channels
commitCh chan CommitEntry // entries ready to apply to state machine
newEntryCh chan struct{} // signals new entries appended by leader
stopCh chan struct{} // signals shutdown
// Timers
electionResetTime time.Time
// HTTP client for peer communication
httpClient *http.Client
// For tracking if we're running
running int32
}
// NewRaft creates and starts a new Raft consensus module.
func NewRaft(config Config, commitCh chan CommitEntry) (*Raft, error) {
raftLog, err := NewRaftLog(config.StoragePath)
if err != nil {
return nil, fmt.Errorf("creating raft log: %w", err)
}
r := &Raft{
id: config.ID,
peers: config.Peers,
config: config,
persistent: PersistentState{
CurrentTerm: 0,
VotedFor: -1,
},
state: StateFollower,
commitIndex: 0,
lastApplied: 0,
nextIndex: make(map[int]int),
matchIndex: make(map[int]int),
raftLog: raftLog,
commitCh: commitCh,
newEntryCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
httpClient: &http.Client{
Timeout: 100 * time.Millisecond,
},
logger: log.New(os.Stderr, fmt.Sprintf("[Raft %d] ", config.ID), log.LstdFlags|log.Lmicroseconds),
}
// Load persistent state from disk if available
if err := r.loadPersistentState(); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("loading persistent state: %w", err)
}
atomic.StoreInt32(&r.running, 1)
// Start background goroutines
go r.electionTimer()
go r.applyCommitted()
r.logger.Printf("Started as %s, term=%d", r.state, r.persistent.CurrentTerm)
return r, nil
}
// Submit submits a command to the Raft cluster.
// Returns the log index, term, and whether this server is the leader.
func (r *Raft) Submit(command interface{}) (int, int, bool) {
r.mu.Lock()
defer r.mu.Unlock()
if r.state != StateLeader {
return -1, -1, false
}
entry := LogEntry{
Term: r.persistent.CurrentTerm,
Command: command,
}
r.raftLog.AppendEntry(entry)
index := r.raftLog.LastIndex()
r.logger.Printf("Submit: appended entry at index=%d, term=%d", index, r.persistent.CurrentTerm)
// Signal replication goroutines
select {
case r.newEntryCh <- struct{}{}:
default:
}
return index, r.persistent.CurrentTerm, true
}
// GetState returns the current term and whether this server is the leader.
func (r *Raft) GetState() (int, bool) {
r.mu.Lock()
defer r.mu.Unlock()
return r.persistent.CurrentTerm, r.state == StateLeader
}
// Stop shuts down the Raft module.
func (r *Raft) Stop() {
if atomic.CompareAndSwapInt32(&r.running, 1, 0) {
close(r.stopCh)
r.logger.Println("Stopped")
}
}
// ─────────────────────────────────────────────────────────────
// RPC Handlers
// ─────────────────────────────────────────────────────────────
// RequestVote handles the RequestVote RPC from a candidate.
func (r *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
r.mu.Lock()
defer r.mu.Unlock()
r.logger.Printf("RequestVote from candidate=%d, term=%d", args.CandidateID, args.Term)
// If the candidate's term is greater, update our term and become follower
if args.Term > r.persistent.CurrentTerm {
r.becomeFollower(args.Term)
}
reply.Term = r.persistent.CurrentTerm
reply.VoteGranted = false
// Grant vote only if:
// 1. Candidate's term is current
// 2. We haven't voted yet (or already voted for this candidate)
// 3. Candidate's log is at least as up-to-date as ours
if args.Term == r.persistent.CurrentTerm &&
(r.persistent.VotedFor == -1 || r.persistent.VotedFor == args.CandidateID) &&
r.candidateLogIsUpToDate(args.LastLogIndex, args.LastLogTerm) {
reply.VoteGranted = true
r.persistent.VotedFor = args.CandidateID
r.savePersistentState()
// Reset election timer since we just interacted with a valid candidate
r.electionResetTime = time.Now()
r.logger.Printf("Granted vote to candidate=%d for term=%d", args.CandidateID, args.Term)
} else {
r.logger.Printf("Denied vote to candidate=%d (votedFor=%d, logOK=%v)",
args.CandidateID, r.persistent.VotedFor,
r.candidateLogIsUpToDate(args.LastLogIndex, args.LastLogTerm))
}
}
// AppendEntries handles the AppendEntries RPC from the leader.
func (r *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
r.mu.Lock()
defer r.mu.Unlock()
// If leader's term is greater, update and become follower
if args.Term > r.persistent.CurrentTerm {
r.becomeFollower(args.Term)
}
reply.Success = false
reply.Term = r.persistent.CurrentTerm
// Reject if leader's term is stale
if args.Term < r.persistent.CurrentTerm {
r.logger.Printf("AppendEntries rejected: stale term (leader=%d, ours=%d)", args.Term, r.persistent.CurrentTerm)
return
}
// Valid leader; reset election timer
r.electionResetTime = time.Now()
r.state = StateFollower
// Consistency check: do we have the entry at prevLogIndex with prevLogTerm?
if args.PrevLogIndex > 0 {
if args.PrevLogIndex >= r.raftLog.Len() {
// We don't have the entry at prevLogIndex
reply.ConflictIndex = r.raftLog.Len()
reply.ConflictTerm = -1
r.logger.Printf("AppendEntries: missing prevLogIndex=%d (our log len=%d)", args.PrevLogIndex, r.raftLog.Len())
return
}
prevTerm := r.raftLog.TermAt(args.PrevLogIndex)
if prevTerm != args.PrevLogTerm {
// Term conflict; provide info for fast backtracking
reply.ConflictTerm = prevTerm
// Find first index with conflictTerm
reply.ConflictIndex = args.PrevLogIndex
for reply.ConflictIndex > 1 &&
r.raftLog.TermAt(reply.ConflictIndex-1) == prevTerm {
reply.ConflictIndex--
}
r.logger.Printf("AppendEntries: term conflict at index=%d (expected=%d, got=%d)",
args.PrevLogIndex, args.PrevLogTerm, prevTerm)
return
}
}
// Append new entries (handling conflicts)
if len(args.Entries) > 0 {
r.raftLog.AppendEntries(args.PrevLogIndex, args.Entries)
r.savePersistentState()
}
// Update commit index
if args.LeaderCommit > r.commitIndex {
lastNewIndex := args.PrevLogIndex + len(args.Entries)
if args.LeaderCommit < lastNewIndex {
r.commitIndex = args.LeaderCommit
} else {
r.commitIndex = lastNewIndex
}
r.logger.Printf("AppendEntries: updated commitIndex=%d", r.commitIndex)
}
reply.Success = true
}
// ─────────────────────────────────────────────────────────────
// Election Logic
// ─────────────────────────────────────────────────────────────
// electionTimer runs the election timeout loop.
func (r *Raft) electionTimer() {
for {
select {
case <-r.stopCh:
return
default:
}
timeout := r.electionTimeout()
r.mu.Lock()
r.electionResetTime = time.Now()
r.mu.Unlock()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-r.stopCh:
return
case <-ticker.C:
r.mu.Lock()
state := r.state
elapsed := time.Since(r.electionResetTime)
r.mu.Unlock()
if state == StateLeader {
// Leaders don't need election timers; just wait
time.Sleep(10 * time.Millisecond)
continue
}
if elapsed >= timeout {
r.startElection()
timeout = r.electionTimeout() // Reset with new random timeout
}
}
}
}
}
// electionTimeout returns a random election timeout duration.
func (r *Raft) electionTimeout() time.Duration {
base := r.config.ElectionTimeout
if base == 0 {
base = 150 * time.Millisecond
}
// Add random jitter of 0-150ms
jitter := time.Duration(rand.Int63n(int64(base)))
return base + jitter
}
// startElection begins a leader election.
func (r *Raft) startElection() {
r.mu.Lock()
r.state = StateCandidate
r.persistent.CurrentTerm++
currentTerm := r.persistent.CurrentTerm
r.persistent.VotedFor = r.id
r.electionResetTime = time.Now()
r.savePersistentState()
lastLogIndex := r.raftLog.LastIndex()
lastLogTerm := r.raftLog.LastTerm()
r.logger.Printf("Starting election for term=%d", currentTerm)
r.mu.Unlock()
// Vote counting
var votesReceived int32 = 1 // Vote for ourselves
var wg sync.WaitGroup
for _, peer := range r.peers {
if peer.ID == r.id {
continue
}
wg.Add(1)
go func(peer PeerConfig) {
defer wg.Done()
args := RequestVoteArgs{
Term: currentTerm,
CandidateID: r.id,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
var reply RequestVoteReply
if err := r.sendRequestVote(peer, args, &reply); err != nil {
r.logger.Printf("RequestVote to peer=%d failed: %v", peer.ID, err)
return
}
r.mu.Lock()
defer r.mu.Unlock()
// If we received a higher term, step down
if reply.Term > r.persistent.CurrentTerm {
r.becomeFollower(reply.Term)
return
}
// If we're no longer a candidate (e.g., someone else won), abort
if r.state != StateCandidate || r.persistent.CurrentTerm != currentTerm {
return
}
if reply.VoteGranted {
votes := atomic.AddInt32(&votesReceived, 1)
majority := int32(len(r.peers)/2 + 1)
r.logger.Printf("Vote received from peer=%d (total=%d, needed=%d)",
peer.ID, votes, majority)
if votes >= majority {
r.becomeLeader()
}
}
}(peer)
}
}
// becomeFollower transitions the server to follower state.
// Must be called with r.mu held.
func (r *Raft) becomeFollower(term int) {
r.logger.Printf("Becoming Follower for term=%d (was %s, term=%d)",
term, r.state, r.persistent.CurrentTerm)
r.state = StateFollower
r.persistent.CurrentTerm = term
r.persistent.VotedFor = -1
r.electionResetTime = time.Now()
r.savePersistentState()
}
// becomeLeader transitions the server to leader state.
// Must be called with r.mu held.
func (r *Raft) becomeLeader() {
r.logger.Printf("Becoming Leader for term=%d", r.persistent.CurrentTerm)
r.state = StateLeader
// Initialize leader-specific volatile state
for _, peer := range r.peers {
r.nextIndex[peer.ID] = r.raftLog.LastIndex() + 1
r.matchIndex[peer.ID] = 0
}
// Start heartbeat/replication goroutines for each peer
for _, peer := range r.peers {
if peer.ID == r.id {
continue
}
go r.replicationLoop(peer, r.persistent.CurrentTerm)
}
}
// ─────────────────────────────────────────────────────────────
// Log Replication
// ─────────────────────────────────────────────────────────────
// replicationLoop sends AppendEntries RPCs to a peer continuously.
func (r *Raft) replicationLoop(peer PeerConfig, leaderTerm int) {
ticker := time.NewTicker(r.heartbeatInterval())
defer ticker.Stop()
for {
select {
case <-r.stopCh:
return
case <-ticker.C:
if !r.sendAppendEntries(peer, leaderTerm) {
return // No longer leader
}
case <-r.newEntryCh:
if !r.sendAppendEntries(peer, leaderTerm) {
return
}
}
}
}
// heartbeatInterval returns the heartbeat tick interval.
func (r *Raft) heartbeatInterval() time.Duration {
if r.config.HeartbeatTick > 0 {
return r.config.HeartbeatTick
}
return 50 * time.Millisecond
}
// sendAppendEntries sends an AppendEntries RPC to a peer.
// Returns false if this server is no longer the leader.
func (r *Raft) sendAppendEntries(peer PeerConfig, leaderTerm int) bool {
r.mu.Lock()
// Verify we're still the leader for the same term
if r.state != StateLeader || r.persistent.CurrentTerm != leaderTerm {
r.mu.Unlock()
return false
}
nextIdx := r.nextIndex[peer.ID]
prevLogIndex := nextIdx - 1
prevLogTerm := r.raftLog.TermAt(prevLogIndex)
if prevLogTerm < 0 {
prevLogTerm = 0
}
entries := r.raftLog.GetEntriesFrom(nextIdx)
commitIndex := r.commitIndex
args := AppendEntriesArgs{
Term: leaderTerm,
LeaderID: r.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
LeaderCommit: commitIndex,
}
r.mu.Unlock()
var reply AppendEntriesReply
if err := r.sendAppendEntriesRPC(peer, args, &reply); err != nil {
// Network error; peer might be down
return true // Still leader, just peer is unreachable
}
r.mu.Lock()
defer r.mu.Unlock()
// Check if we received a higher term → step down
if reply.Term > r.persistent.CurrentTerm {
r.becomeFollower(reply.Term)
return false
}
// Verify we're still leader for this term
if r.state != StateLeader || r.persistent.CurrentTerm != leaderTerm {
return false
}
if reply.Success {
// Update nextIndex and matchIndex for this peer
r.matchIndex[peer.ID] = prevLogIndex + len(entries)
r.nextIndex[peer.ID] = r.matchIndex[peer.ID] + 1
r.logger.Printf("AppendEntries success: peer=%d, matchIndex=%d, nextIndex=%d",
peer.ID, r.matchIndex[peer.ID], r.nextIndex[peer.ID])
// Check if we can advance the commit index
r.advanceCommitIndex()
} else {
// Fast backtracking using conflict info
if reply.ConflictTerm >= 0 {
// Find last entry in our log with conflictTerm
conflictIdx := -1
for i := r.raftLog.Len() - 1; i > 0; i-- {
if r.raftLog.TermAt(i) == reply.ConflictTerm {
conflictIdx = i
break
}
}
if conflictIdx >= 0 {
r.nextIndex[peer.ID] = conflictIdx + 1
} else {
r.nextIndex[peer.ID] = reply.ConflictIndex
}
} else {
r.nextIndex[peer.ID] = reply.ConflictIndex
}
if r.nextIndex[peer.ID] < 1 {
r.nextIndex[peer.ID] = 1
}
r.logger.Printf("AppendEntries failed: peer=%d, backtrack nextIndex to %d",
peer.ID, r.nextIndex[peer.ID])
}
return true
}
// advanceCommitIndex checks if the commit index can be advanced.
// Must be called with r.mu held.
func (r *Raft) advanceCommitIndex() {
// Find the highest index N such that:
// 1. N > commitIndex
// 2. log[N].term == currentTerm
// 3. A majority of servers have matchIndex >= N
for n := r.raftLog.LastIndex(); n > r.commitIndex; n-- {
if r.raftLog.TermAt(n) != r.persistent.CurrentTerm {
continue // Only commit entries from current term
}
// Count replications (including ourselves)
replicationCount := 1
for _, peer := range r.peers {
if peer.ID != r.id && r.matchIndex[peer.ID] >= n {
replicationCount++
}
}
majority := len(r.peers)/2 + 1
if replicationCount >= majority {
r.commitIndex = n
r.logger.Printf("Advanced commitIndex to %d (replicated on %d/%d servers)",
n, replicationCount, len(r.peers))
break
}
}
}
// ─────────────────────────────────────────────────────────────
// State Machine Application
// ─────────────────────────────────────────────────────────────
// applyCommitted sends newly committed entries to the commit channel.
func (r *Raft) applyCommitted() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-r.stopCh:
return
case <-ticker.C:
r.mu.Lock()
for r.commitIndex > r.lastApplied {
r.lastApplied++
entry, ok := r.raftLog.GetEntry(r.lastApplied)
if !ok {
r.logger.Printf("WARN: entry at lastApplied=%d not found", r.lastApplied)
break
}
commitEntry := CommitEntry{
Command: entry.Command,
Index: r.lastApplied,
Term: entry.Term,
}
r.mu.Unlock()
r.logger.Printf("Applying entry index=%d, term=%d", r.lastApplied, entry.Term)
r.commitCh <- commitEntry
r.mu.Lock()
}
r.mu.Unlock()
}
}
}
// ─────────────────────────────────────────────────────────────
// Helper Methods
// ─────────────────────────────────────────────────────────────
// candidateLogIsUpToDate returns true if the candidate's log is at
// least as up-to-date as ours.
// Must be called with r.mu held.
func (r *Raft) candidateLogIsUpToDate(candidateLastIndex, candidateLastTerm int) bool {
ourLastIndex := r.raftLog.LastIndex()
ourLastTerm := r.raftLog.LastTerm()
if candidateLastTerm != ourLastTerm {
return candidateLastTerm > ourLastTerm
}
return candidateLastIndex >= ourLastIndex
}
// ─────────────────────────────────────────────────────────────
// Persistence
// ─────────────────────────────────────────────────────────────
// savePersistentState saves term and votedFor to disk.
// Must be called with r.mu held.
func (r *Raft) savePersistentState() {
if r.config.StoragePath == "" {
return
}
data, err := json.Marshal(r.persistent)
if err != nil {
r.logger.Printf("ERROR: marshaling persistent state: %v", err)
return
}
if err := os.WriteFile(r.config.StoragePath+".meta", data, 0644); err != nil {
r.logger.Printf("ERROR: saving persistent state: %v", err)
}
}
// loadPersistentState restores term and votedFor from disk.
func (r *Raft) loadPersistentState() error {
if r.config.StoragePath == "" {
return os.ErrNotExist
}
data, err := os.ReadFile(r.config.StoragePath + ".meta")
if err != nil {
return err
}
return json.Unmarshal(data, &r.persistent)
}
// ─────────────────────────────────────────────────────────────
// HTTP-based RPC Transport
// ─────────────────────────────────────────────────────────────
// sendRequestVote sends a RequestVote RPC over HTTP.
func (r *Raft) sendRequestVote(peer PeerConfig, args RequestVoteArgs, reply *RequestVoteReply) error {
return r.sendRPC(peer.Address+"/raft/request-vote", args, reply)
}
// sendAppendEntriesRPC sends an AppendEntries RPC over HTTP.
func (r *Raft) sendAppendEntriesRPC(peer PeerConfig, args AppendEntriesArgs, reply *AppendEntriesReply) error {
return r.sendRPC(peer.Address+"/raft/append-entries", args, reply)
}
// sendRPC is a generic HTTP-based RPC helper.
func (r *Raft) sendRPC(url string, args, reply interface{}) error {
body, err := json.Marshal(args)
if err != nil {
return fmt.Errorf("marshaling args: %w", err)
}
resp, err := r.httpClient.Post(url, "application/json", bytes.NewReader(body))
if err != nil {
return fmt.Errorf("HTTP POST to %s: %w", url, err)
}
defer resp.Body.Close()
return json.NewDecoder(resp.Body).Decode(reply)
}8.6 state_machine.go — Key-Value State Machine
package raft
import (
"fmt"
"sync"
)
// KVStore is a simple key-value state machine driven by Raft.
type KVStore struct {
mu sync.RWMutex
store map[string]string
commitCh chan CommitEntry
raft *Raft
// Pending client requests waiting for log entries to be committed
pendingMu sync.Mutex
pending map[int]chan Result // index → channel
}
// Result represents the result of a command applied to the state machine.
type Result struct {
Value string
Err error
}
// NewKVStore creates a new key-value store backed by Raft.
func NewKVStore(raft *Raft, commitCh chan CommitEntry) *KVStore {
kv := &KVStore{
store: make(map[string]string),
commitCh: commitCh,
raft: raft,
pending: make(map[int]chan Result),
}
go kv.applyLoop()
return kv
}
// Set stores a key-value pair through Raft consensus.
func (kv *KVStore) Set(key, value string) error {
cmd := Command{Op: "set", Key: key, Value: value}
return kv.propose(cmd)
}
// Delete removes a key through Raft consensus.
func (kv *KVStore) Delete(key string) error {
cmd := Command{Op: "delete", Key: key}
return kv.propose(cmd)
}
// Get reads a value directly from the local state machine.
// Note: For linearizable reads, you'd need to implement a read index mechanism.
func (kv *KVStore) Get(key string) (string, bool) {
kv.mu.RLock()
defer kv.mu.RUnlock()
val, ok := kv.store[key]
return val, ok
}
// propose proposes a command to the Raft cluster and waits for it to be committed.
func (kv *KVStore) propose(cmd Command) error {
index, _, isLeader := kv.raft.Submit(cmd)
if !isLeader {
return fmt.Errorf("not the leader")
}
// Register a pending channel for this log index
ch := make(chan Result, 1)
kv.pendingMu.Lock()
kv.pending[index] = ch
kv.pendingMu.Unlock()
// Wait for the command to be applied
result := <-ch
return result.Err
}
// applyLoop applies committed Raft log entries to the KV state machine.
func (kv *KVStore) applyLoop() {
for entry := range kv.commitCh {
cmd, ok := entry.Command.(Command)
if !ok {
// Try JSON unmarshaling (when decoded from JSON transport)
if m, ok2 := entry.Command.(map[string]interface{}); ok2 {
cmd = Command{
Op: m["op"].(string),
Key: m["key"].(string),
Value: fmt.Sprintf("%v", m["value"]),
}
}
}
var result Result
kv.mu.Lock()
switch cmd.Op {
case "set":
kv.store[cmd.Key] = cmd.Value
case "delete":
delete(kv.store, cmd.Key)
default:
result.Err = fmt.Errorf("unknown operation: %s", cmd.Op)
}
kv.mu.Unlock()
// Notify any waiting client
kv.pendingMu.Lock()
if ch, ok := kv.pending[entry.Index]; ok {
ch <- result
delete(kv.pending, entry.Index)
}
kv.pendingMu.Unlock()
}
}8.7 server.go — HTTP Server
package raft
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
// Server exposes the KV store and Raft RPCs over HTTP.
type Server struct {
id int
address string
raft *Raft
kv *KVStore
srv *http.Server
}
// NewServer creates a new HTTP server.
func NewServer(id int, address string, raft *Raft, kv *KVStore) *Server {
s := &Server{
id: id,
address: address,
raft: raft,
kv: kv,
}
mux := http.NewServeMux()
// Client-facing API
mux.HandleFunc("/kv/get", s.handleGet)
mux.HandleFunc("/kv/set", s.handleSet)
mux.HandleFunc("/kv/delete", s.handleDelete)
mux.HandleFunc("/status", s.handleStatus)
// Raft RPC endpoints
mux.HandleFunc("/raft/request-vote", s.handleRequestVote)
mux.HandleFunc("/raft/append-entries", s.handleAppendEntries)
s.srv = &http.Server{
Addr: address,
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
}
return s
}
// Start starts the HTTP server.
func (s *Server) Start() error {
fmt.Printf("[Server %d] Listening on %s\n", s.id, s.address)
return s.srv.ListenAndServe()
}
// Stop gracefully shuts down the HTTP server.
func (s *Server) Stop(ctx context.Context) error {
return s.srv.Shutdown(ctx)
}
// ─── Client API Handlers ─────────────────────────────────────────────────────
func (s *Server) handleGet(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "key is required", http.StatusBadRequest)
return
}
value, ok := s.kv.Get(key)
if !ok {
http.Error(w, "key not found", http.StatusNotFound)
return
}
respondJSON(w, map[string]string{"key": key, "value": value})
}
func (s *Server) handleSet(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Key string `json:"key"`
Value string `json:"value"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON body", http.StatusBadRequest)
return
}
if err := s.kv.Set(req.Key, req.Value); err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
respondJSON(w, map[string]string{"status": "ok"})
}
func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
key := r.URL.Query().Get("key")
if key == "" {
http.Error(w, "key is required", http.StatusBadRequest)
return
}
if err := s.kv.Delete(key); err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
respondJSON(w, map[string]string{"status": "ok"})
}
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
term, isLeader := s.raft.GetState()
respondJSON(w, map[string]interface{}{
"id": s.id,
"term": term,
"is_leader": isLeader,
})
}
// ─── Raft RPC Handlers ────────────────────────────────────────────────────────
func (s *Server) handleRequestVote(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var args RequestVoteArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
var reply RequestVoteReply
s.raft.RequestVote(args, &reply)
respondJSON(w, reply)
}
func (s *Server) handleAppendEntries(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var args AppendEntriesArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
var reply AppendEntriesReply
s.raft.AppendEntries(args, &reply)
respondJSON(w, reply)
}
// ─── Helper ───────────────────────────────────────────────────────────────────
func respondJSON(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(data); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
}8.8 main.go — Entry Point
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
raft "github.com/yourusername/raft-demo"
)
func main() {
var (
id = flag.Int("id", 1, "Server ID")
address = flag.String("addr", ":8001", "Server address")
peersFlag = flag.String("peers", "", "Comma-separated peer addresses (id:addr,...)")
storagePath = flag.String("storage", "", "Path prefix for persistent storage")
)
flag.Parse()
// Parse peers
peers := parsePeers(*peersFlag)
// Build config (all peers including self)
allPeers := append(peers, raft.PeerConfig{ID: *id, Address: "http://" + *address})
config := raft.Config{
ID: *id,
Peers: allPeers,
ElectionTimeout: 150 * time.Millisecond,
HeartbeatTick: 50 * time.Millisecond,
StoragePath: fmt.Sprintf("%s/node-%d", *storagePath, *id),
}
if config.StoragePath != "/" {
if err := os.MkdirAll(fmt.Sprintf("%s", *storagePath), 0755); err != nil {
log.Fatalf("creating storage directory: %v", err)
}
}
// Create commit channel
commitCh := make(chan raft.CommitEntry, 100)
// Create Raft node
raftNode, err := raft.NewRaft(config, commitCh)
if err != nil {
log.Fatalf("creating raft node: %v", err)
}
// Create KV store
kvStore := raft.NewKVStore(raftNode, commitCh)
// Create and start HTTP server
server := raft.NewServer(*id, *address, raftNode, kvStore)
// Graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\nShutting down...")
raftNode.Stop()
os.Exit(0)
}()
log.Fatalf("Server error: %v", server.Start())
}
// parsePeers parses a comma-separated list of "id:address" peer configs.
// Example: "2:http://localhost:8002,3:http://localhost:8003"
func parsePeers(peersFlag string) []raft.PeerConfig {
if peersFlag == "" {
return nil
}
var peers []raft.PeerConfig
for _, p := range strings.Split(peersFlag, ",") {
parts := strings.SplitN(p, ":", 2)
if len(parts) != 2 {
log.Fatalf("invalid peer format: %s (expected id:address)", p)
}
var id int
fmt.Sscanf(parts[0], "%d", &id)
peers = append(peers, raft.PeerConfig{ID: id, Address: parts[1]})
}
return peers
}9. Running and Testing the Cluster {#testing}
9.1 Starting a 3-Node Cluster
Open three terminals and run:
# Terminal 1 - Node 1
go run . \
-id 1 \
-addr :8001 \
-peers "2:http://localhost:8002,3:http://localhost:8003" \
-storage /tmp/raft
# Terminal 2 - Node 2
go run . \
-id 2 \
-addr :8002 \
-peers "1:http://localhost:8001,3:http://localhost:8003" \
-storage /tmp/raft
# Terminal 3 - Node 3
go run . \
-id 3 \
-addr :8003 \
-peers "1:http://localhost:8001,2:http://localhost:8002" \
-storage /tmp/raft9.2 Interacting with the Cluster
# Check which node is the leader
curl http://localhost:8001/status
curl http://localhost:8002/status
curl http://localhost:8003/status
# Expected output for leader:
# {"id":1,"is_leader":true,"term":3}
# Set a key (must go to leader)
curl -X POST http://localhost:8001/kv/set \
-H "Content-Type: application/json" \
-d '{"key":"hello","value":"world"}'
# {"status":"ok"}
# Get the key from any node
curl "http://localhost:8002/kv/get?key=hello"
# {"key":"hello","value":"world"}
# Delete a key
curl -X DELETE "http://localhost:8001/kv/delete?key=hello"
# {"status":"ok"}9.3 Testing Fault Tolerance
# Kill the leader (Ctrl+C on Terminal 1)
# Wait ~300ms for election...
# New leader should be elected
curl http://localhost:8002/status
# {"id":2,"is_leader":true,"term":4}
# The cluster still works!
curl -X POST http://localhost:8002/kv/set \
-H "Content-Type: application/json" \
-d '{"key":"resilient","value":"true"}'
# {"status":"ok"}
# Restart node 1
go run . -id 1 -addr :8001 \
-peers "2:http://localhost:8002,3:http://localhost:8003" \
-storage /tmp/raft
# Node 1 catches up automatically via AppendEntries
curl "http://localhost:8001/kv/get?key=resilient"
# {"key":"resilient","value":"true"}9.4 Unit Testing Leader Election
package raft_test
import (
"testing"
"time"
raft "github.com/yourusername/raft-demo"
)
// TestLeaderElection verifies that exactly one leader is elected.
func TestLeaderElection(t *testing.T) {
cluster := newTestCluster(t, 3)
defer cluster.stop()
// Wait for leader election
time.Sleep(500 * time.Millisecond)
leaders := 0
leaderTerm := 0
for _, node := range cluster.nodes {
term, isLeader := node.GetState()
if isLeader {
leaders++
leaderTerm = term
}
}
if leaders != 1 {
t.Fatalf("expected exactly 1 leader, got %d", leaders)
}
if leaderTerm == 0 {
t.Fatal("leader term should be > 0")
}
t.Logf("Leader elected successfully in term %d", leaderTerm)
}
// TestLogReplication verifies that committed entries appear on all nodes.
func TestLogReplication(t *testing.T) {
cluster := newTestCluster(t, 3)
defer cluster.stop()
time.Sleep(400 * time.Millisecond)
// Find the leader
var leader *raft.Raft
for _, node := range cluster.nodes {
_, isLeader := node.GetState()
if isLeader {
leader = node
break
}
}
if leader == nil {
t.Fatal("no leader elected")
}
// Submit a command
cmd := raft.Command{Op: "set", Key: "test", Value: "value"}
index, term, ok := leader.Submit(cmd)
if !ok {
t.Fatal("submit failed: not leader")
}
t.Logf("Submitted command at index=%d, term=%d", index, term)
// Wait for replication
time.Sleep(200 * time.Millisecond)
// All nodes should have the committed entry
for i, commitCh := range cluster.commitChs {
select {
case entry := <-commitCh:
t.Logf("Node %d applied entry at index=%d", i+1, entry.Index)
case <-time.After(500 * time.Millisecond):
t.Errorf("Node %d did not apply committed entry", i+1)
}
}
}10. Common Pitfalls and Best Practices {#pitfalls}
10.1 Off-by-One Errors in Log Indexing
Raft logs are 1-indexed by convention (index 0 is a sentinel/dummy entry). Forgetting this leads to subtle bugs:
// Wrong: Treating log as 0-indexed
prevLogIndex := len(log) - 1
// Correct: Accounting for sentinel entry
prevLogIndex := raftLog.LastIndex() // returns len(entries) - 110.2 Not Persisting State Before Responding
State must be written to stable storage before sending any RPC response:
// Wrong order
r.persistent.VotedFor = args.CandidateID
reply.VoteGranted = true
// Crash here loses votedFor!
// Correct order
r.persistent.VotedFor = args.CandidateID
r.savePersistentState() // Write to disk first
reply.VoteGranted = true10.3 Committing Entries From Previous Terms
A leader can only commit entries from its own term directly. Entries from previous terms are committed indirectly (when a new entry in the current term is committed):
// Wrong: Committing by replication count alone
if replicationCount >= majority {
r.commitIndex = n // Could commit stale entries!
}
// Correct: Only commit entries from current term
if r.raftLog.TermAt(n) == r.persistent.CurrentTerm &&
replicationCount >= majority {
r.commitIndex = n
}10.4 Mutex Deadlocks
Never hold the Raft mutex while sending RPCs. RPC calls can block, causing deadlocks:
// Wrong: Holding lock during network call
func (r *Raft) badSendRPC() {
r.mu.Lock()
defer r.mu.Unlock()
r.httpClient.Post(url, ...) // DEADLOCK if reply calls back!
}
// Release lock before network operations
func (r *Raft) goodSendRPC() {
r.mu.Lock()
args := r.buildArgs() // Gather needed data
r.mu.Unlock() // Release before network call
r.httpClient.Post(url, args)
r.mu.Lock()
r.processReply() // Process reply with lock
r.mu.Unlock()
}10.5 Split-Brain During Leader Changes
Always check that you’re still the leader for the same term after receiving an RPC reply:
// Always verify leadership after receiving any reply
r.mu.Lock()
defer r.mu.Unlock()
if r.state != StateLeader || r.persistent.CurrentTerm != savedTerm {
return // No longer relevant
}
// Safe to proceed with leader actions10.6 Election Timer Reset Rules
The election timer should only be reset when:
- An AppendEntries RPC is received from the current leader
- A vote is granted to a candidate
- Starting a new election (becoming candidate)
// Wrong: Resetting on every RPC
func (r *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
r.electionResetTime = time.Now() // Too early! May be stale message
...
}
// Correct: Only reset for valid leaders
if args.Term >= r.persistent.CurrentTerm {
r.electionResetTime = time.Now() // Validated leader
}11. Production Considerations
11.1 Log Compaction and Snapshots
For long-running systems, the Raft log grows unboundedly. Production implementations need log snapshots:
Without Snapshots:
Log: [entry1][entry2]...[entry10000] → Slow restart, high memory
With Snapshots:
Snapshot: {state at index 9000} + Log: [entry9001]...[entry10000]The process:
- State machine periodically creates a snapshot of its state
- Raft discards log entries before the snapshot index
- When a follower is far behind, leader sends snapshot via InstallSnapshot RPC
11.2 Read-Only Query Optimization
Naive reads from the leader still require a round-trip to confirm leadership. Production systems use ReadIndex:
- Leader records its current
commitIndexasreadIndex - Leader sends a heartbeat to confirm it’s still the leader
- Leader waits until
lastApplied >= readIndex - Serves the read from local state
This avoids writing to the log for read-only operations.
11.3 Pre-Vote Extension
To prevent disruptive elections when a partitioned server rejoins, implement the Pre-Vote extension:
Normal Vote: Candidate increments term → Forces others to reset votedFor
Pre-Vote: Candidate asks "would you vote for me?" without incrementing term
Only starts real election if it would receive majority pre-votes11.4 Follower Reads with Lease
For lower-latency reads, leaders can use lease-based reads:
- Leader knows it’s the only leader for at least
electionTimeoutafter last heartbeat - Serve reads without network roundtrip during lease window
- Requires bounded clock skew between nodes
12. Conclusion
The Raft consensus algorithm is a landmark achievement in distributed systems — elegant in its decomposition, rigorous in its safety guarantees, and practical enough to power the world’s most critical infrastructure.
Key Takeaways
- Raft is built on three pillars: Leader election, log replication, and safety constraints that together guarantee correctness
- Strong consistency is achieved through majority voting — the cluster remains available as long as ⌊N/2⌋ servers are alive
- Golang is an excellent choice for Raft implementations thanks to goroutines, channels, and its robust standard library
- Production use requires additional features: log snapshots, linearizable reads, pre-vote, and membership changes
- The devil is in the details — mutex discipline, persistence ordering, and term checking are where most bugs hide
Further Reading
- Raft Paper (Extended Version) — Diego Ongaro’s dissertation
- The Raft Visualization — Interactive animation
- etcd/raft — Production-grade Go library
- Designing Data-Intensive Applications — Chapter 9 covers consensus algorithms
- TLA+ Raft Specification — Formal verification by Ongaro



