diff --git a/raft/doc.go b/raft/doc.go index dd27310b5..8943d978c 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -15,6 +15,8 @@ /* Package raft provides an implementation of the raft consensus algorithm. +Usage + The primary object in raft is a Node. You either start a Node from scratch using raft.StartNode or start a Node from some initial state using raft.RestartNode. storage := raft.NewMemoryStorage() @@ -22,42 +24,64 @@ using raft.StartNode or start a Node from some initial state using raft.RestartN Now that you are holding onto a Node you have a few responsibilities: -First, you need to push messages that you receive from other machines into the -Node with n.Step(). +First, you must read from the Node.Ready() channel and process the updates +it contains. This means: + +1. Write HardState, Entries, and Snapshot to persistent storage if they are +not empty. Note that when writing an Entry with Index i, any +previously-persisted entries with Index >= i must be discarded. + +2. Send all Messages to the nodes named in the To field. It is important +that this happen *after* all state has been persisted. + +3. Apply Snapshot (if any) and CommittedEntries to the state machine. +If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() +after applying it. + +4. Call Node.Advance() to signal readiness for the next batch of updates. +This may be done at any time after step 1, although all updates must be processed +in the order they were returned by Ready. + +Second, all persisted log entries must be made available via an +implementation of the Storage interface. The provided MemoryStorage +type can be used for this (if you repopulate its state upon a +restart), or you can supply your own disk-backed implementation. + +Third, when you receive a message from another node, pass it to Node.Step: func recvRaftRPC(ctx context.Context, m raftpb.Message) { n.Step(ctx, m) } -Second, you need to save log entries to storage, process committed log entries -through your application and then send pending messages to peers by reading the -channel returned by n.Ready(). It is important that the user persist any -entries that require stable storage before sending messages to other peers to -ensure fault-tolerance. - -An example MemoryStorage is provided in the raft package. - -And finally you need to service timeouts with Tick(). Raft has two important -timeouts: heartbeat and the election timeout. However, internally to the raft -package time is represented by an abstract "tick". The user is responsible for -calling Tick() on their raft.Node on a regular interval in order to service -these timeouts. +Finally, you need to call Node.Tick() at regular intervals (probably +via a time.Ticker). Raft has two important timeouts: heartbeat and the +election timeout. However, internally to the raft package time is +represented by an abstract "tick". The total state machine handling loop will look something like this: - for { - select { - case <-s.Ticker: - n.Tick() - case rd := <-s.Node.Ready(): - saveToStorage(rd.State, rd.Entries) - send(rd.Messages) - process(rd.CommittedEntries) - s.Node.Advance() - case <-s.done: - return - } - } + for { + select { + case <-s.Ticker: + n.Tick() + case rd := <-s.Node.Ready(): + saveToStorage(rd.State, rd.Entries, rd.Snapshot) + send(rd.Messages) + if !raft.IsEmptySnap(rd.Snapshot) { + processSnapshot(rd.Snapshot) + } + for entry := range rd.CommittedEntries { + process(entry) + if entry.Type == raftpb.EntryConfChange: + var cc raftpb.ConfChange + cc.Unmarshal(entry.Data) + s.Node.ApplyConfChange(cc) + } + s.Node.Advance() + case <-s.done: + return + } + } To propose changes to the state machine from your node take your application data, serialize it into a byte slice and call: @@ -65,21 +89,42 @@ data, serialize it into a byte slice and call: n.Propose(ctx, data) If the proposal is committed, data will appear in committed entries with type -raftpb.EntryNormal. +raftpb.EntryNormal. There is no guarantee that a proposed command will be +committed; you may have to re-propose after a timeout. To add or remove node in a cluster, build ConfChange struct 'cc' and call: n.ProposeConfChange(ctx, cc) After config change is committed, some committed entry with type -raftpb.EntryConfChange will be returned. You should apply it to node through: +raftpb.EntryConfChange will be returned. You must apply it to node through: var cc raftpb.ConfChange cc.Unmarshal(data) n.ApplyConfChange(cc) -Note: An ID represents a unique node in a cluster. A given ID MUST be used -only once even if the old node has been removed. +Note: An ID represents a unique node in a cluster for all time. A +given ID MUST be used only once even if the old node has been removed. +This means that for example IP addresses make poor node IDs since they +may be reused. + +Implementation notes + +This implementation is up to date with the final Raft thesis +(https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our +implementation of the membership change protocol differs somewhat from +that described in chapter 4. The key invariant that membership changes +happen one node at a time is preserved, but in our implementation the +membership change takes effect when its entry is applied, not when it +is added to the log (so the entry is committed under the old +membership instead of the new). This is equivalent in terms of safety, +since the old and new configurations are guaranteed to overlap. + +To ensure that we do not attempt to commit two membership changes at +once by matching log positions (which would be unsafe since they +should have different quorum requirements), we simply disallow any +proposed membership change while any uncommitted change appears in +the leader's log. */ package raft