raft: introduce Node interface

This commit is contained in:
Jonathan Boulle 2014-09-17 12:23:44 -07:00
parent ba851b2eca
commit b66a40495d
7 changed files with 62 additions and 45 deletions

View File

@ -158,7 +158,7 @@ func testServer(t *testing.T, ns int64) {
for i := int64(0); i < ns; i++ { for i := int64(0); i < ns; i++ {
id := i + 1 id := i + 1
n := raft.Start(id, peers, 10, 1) n := raft.StartNode(id, peers, 10, 1)
tk := time.NewTicker(10 * time.Millisecond) tk := time.NewTicker(10 * time.Millisecond)
defer tk.Stop() defer tk.Stop()
srv := &EtcdServer{ srv := &EtcdServer{
@ -225,7 +225,7 @@ func TestDoProposal(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
ctx, _ := context.WithCancel(context.Background()) ctx, _ := context.WithCancel(context.Background())
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
tk := make(chan time.Time) tk := make(chan time.Time)
// this makes <-tk always successful, which accelerates internal clock // this makes <-tk always successful, which accelerates internal clock
@ -257,7 +257,7 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) { func TestDoProposalCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// node cannot make any progress because there are two nodes // node cannot make any progress because there are two nodes
n := raft.Start(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
wait := &waitRecorder{} wait := &waitRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
@ -292,7 +292,7 @@ func TestDoProposalStopped(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// node cannot make any progress because there are two nodes // node cannot make any progress because there are two nodes
n := raft.Start(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1)
st := &storeRecorder{} st := &storeRecorder{}
tk := make(chan time.Time) tk := make(chan time.Time)
// this makes <-tk always successful, which accelarates internal clock // this makes <-tk always successful, which accelarates internal clock
@ -326,7 +326,7 @@ func TestDoProposalStopped(t *testing.T) {
// TestSync tests sync 1. is nonblocking 2. sends out SYNC request. // TestSync tests sync 1. is nonblocking 2. sends out SYNC request.
func TestSync(t *testing.T) { func TestSync(t *testing.T) {
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
n.Campaign(context.TODO()) n.Campaign(context.TODO())
select { select {
case <-n.Ready(): case <-n.Ready():
@ -372,7 +372,7 @@ func TestSync(t *testing.T) {
// propose SYNC request because there is no leader // propose SYNC request because there is no leader
func TestSyncFail(t *testing.T) { func TestSyncFail(t *testing.T) {
// The node is run without Tick and Campaign, so it has no leader forever. // The node is run without Tick and Campaign, so it has no leader forever.
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
select { select {
case <-n.Ready(): case <-n.Ready():
case <-time.After(time.Millisecond): case <-time.After(time.Millisecond):
@ -406,7 +406,7 @@ func TestSyncFail(t *testing.T) {
} }
func TestSyncTriggerDeleteExpriedKeys(t *testing.T) { func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
n.Campaign(context.TODO()) n.Campaign(context.TODO())
st := &storeRecorder{} st := &storeRecorder{}
syncInterval := 5 * time.Millisecond syncInterval := 5 * time.Millisecond
@ -438,7 +438,7 @@ func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
// snapshot should snapshot the store and cut the persistent // snapshot should snapshot the store and cut the persistent
// TODO: node.Compact is called... we need to make the node an interface // TODO: node.Compact is called... we need to make the node an interface
func TestSnapshot(t *testing.T) { func TestSnapshot(t *testing.T) {
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
defer n.Stop() defer n.Stop()
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
@ -472,7 +472,7 @@ func TestSnapshot(t *testing.T) {
// We need fake node! // We need fake node!
func TestTriggerSnap(t *testing.T) { func TestTriggerSnap(t *testing.T) {
ctx := context.Background() ctx := context.Background()
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1) n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1)
n.Campaign(ctx) n.Campaign(ctx)
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}

View File

@ -24,7 +24,7 @@ func TestSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
n := raft.Start(1, []int64{1}, 0, 0) n := raft.StartNode(1, []int64{1}, 0, 0)
n.Campaign(ctx) n.Campaign(ctx)
srv := &etcdserver.EtcdServer{ srv := &etcdserver.EtcdServer{

View File

@ -99,7 +99,7 @@ func startEtcd() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
n = raft.Start(id, peers.IDs(), 10, 1) n = raft.StartNode(id, peers.IDs(), 10, 1)
} else { } else {
var index int64 var index int64
snapshot, err := snapshotter.Load() snapshot, err := snapshotter.Load()
@ -124,7 +124,7 @@ func startEtcd() {
if wid != 0 { if wid != 0 {
log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
} }
n = raft.Restart(id, peers.IDs(), 10, 1, snapshot, st, ents) n = raft.RestartNode(id, peers.IDs(), 10, 1, snapshot, st, ents)
} }
s := &etcdserver.EtcdServer{ s := &etcdserver.EtcdServer{

View File

@ -16,9 +16,9 @@
Package raft provides an implementation of the raft consensus algorithm. Package raft provides an implementation of the raft consensus algorithm.
The primary object in raft is a Node. You either start a Node from scratch The primary object in raft is a Node. You either start a Node from scratch
using raft.Start or start a Node from some initial state using raft.Restart. using raft.StartNode or start a Node from some initial state using raft.RestartNode.
n := raft.Start(0x01, []int64{0x02, 0x03}, 3, 1) n := raft.StartNode(0x01, []int64{0x02, 0x03}, 3, 1)
Now that you are holding onto a Node you have a few responsibilities: Now that you are holding onto a Node you have a few responsibilities:

View File

@ -10,7 +10,7 @@ func saveStateToDisk(st pb.HardState) {}
func saveToDisk(ents []pb.Entry) {} func saveToDisk(ents []pb.Entry) {}
func Example_Node() { func Example_Node() {
n := Start(0, nil, 0, 0) n := StartNode(0, nil, 0, 0)
// stuff to n happens in other goroutines // stuff to n happens in other goroutines

View File

@ -72,28 +72,37 @@ func (rd Ready) containsUpdates() bool {
len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
} }
type Node struct { type Node interface {
propc chan pb.Message // Tick increments the internal logical clock for the Node by a single tick. Election
recvc chan pb.Message // timeouts and heartbeat timeouts are in units of ticks.
compactc chan []byte Tick()
readyc chan Ready // Campaign causes the Node to transition to candidate state and start campaigning to become leader
tickc chan struct{} Campaign(ctx context.Context) error
done chan struct{} // Propose proposes that data be appended to the log.
Propose(ctx context.Context, data []byte) error
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
// Ready returns a channel that returns the current point-in-time state
Ready() <-chan Ready
// Stop performs any necessary termination of the Node
Stop()
// Compact
Compact(d []byte)
} }
// Start returns a new Node given a unique raft id, a list of raft peers, and // StartNode returns a new Node given a unique raft id, a list of raft peers, and
// the election and heartbeat timeouts in units of ticks. // the election and heartbeat timeouts in units of ticks.
func Start(id int64, peers []int64, election, heartbeat int) Node { func StartNode(id int64, peers []int64, election, heartbeat int) Node {
n := newNode() n := newNode()
r := newRaft(id, peers, election, heartbeat) r := newRaft(id, peers, election, heartbeat)
go n.run(r) go n.run(r)
return n return &n
} }
// Restart is identical to Start but takes an initial State and a slice of // RestartNode is identical to StartNode but takes an initial State and a slice
// entries. Generally this is used when restarting from a stable storage // of entries. Generally this is used when restarting from a stable storage
// log. // log.
func Restart(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node { func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
n := newNode() n := newNode()
r := newRaft(id, peers, election, heartbeat) r := newRaft(id, peers, election, heartbeat)
if snapshot != nil { if snapshot != nil {
@ -102,11 +111,21 @@ func Restart(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snap
r.loadState(st) r.loadState(st)
r.loadEnts(ents) r.loadEnts(ents)
go n.run(r) go n.run(r)
return n return &n
} }
func newNode() Node { // node is the canonical implementation of the Node interface
return Node{ type node struct {
propc chan pb.Message
recvc chan pb.Message
compactc chan []byte
readyc chan Ready
tickc chan struct{}
done chan struct{}
}
func newNode() node {
return node{
propc: make(chan pb.Message), propc: make(chan pb.Message),
recvc: make(chan pb.Message), recvc: make(chan pb.Message),
compactc: make(chan []byte), compactc: make(chan []byte),
@ -116,11 +135,11 @@ func newNode() Node {
} }
} }
func (n *Node) Stop() { func (n *node) Stop() {
close(n.done) close(n.done)
} }
func (n *Node) run(r *raft) { func (n *node) run(r *raft) {
var propc chan pb.Message var propc chan pb.Message
var readyc chan Ready var readyc chan Ready
@ -178,25 +197,24 @@ func (n *Node) run(r *raft) {
// Tick increments the internal logical clock for this Node. Election timeouts // Tick increments the internal logical clock for this Node. Election timeouts
// and heartbeat timeouts are in units of ticks. // and heartbeat timeouts are in units of ticks.
func (n *Node) Tick() { func (n *node) Tick() {
select { select {
case n.tickc <- struct{}{}: case n.tickc <- struct{}{}:
case <-n.done: case <-n.done:
} }
} }
func (n *Node) Campaign(ctx context.Context) error { func (n *node) Campaign(ctx context.Context) error {
return n.Step(ctx, pb.Message{Type: msgHup}) return n.Step(ctx, pb.Message{Type: msgHup})
} }
// Propose proposes data be appended to the log. func (n *node) Propose(ctx context.Context, data []byte) error {
func (n *Node) Propose(ctx context.Context, data []byte) error {
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}}) return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
} }
// Step advances the state machine using msgs. The ctx.Err() will be returned, // Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any. // if any.
func (n *Node) Step(ctx context.Context, m pb.Message) error { func (n *node) Step(ctx context.Context, m pb.Message) error {
ch := n.recvc ch := n.recvc
if m.Type == msgProp { if m.Type == msgProp {
ch = n.propc ch = n.propc
@ -212,12 +230,11 @@ func (n *Node) Step(ctx context.Context, m pb.Message) error {
} }
} }
// ReadState returns the current point-in-time state. func (n *node) Ready() <-chan Ready {
func (n *Node) Ready() <-chan Ready {
return n.readyc return n.readyc
} }
func (n *Node) Compact(d []byte) { func (n *node) Compact(d []byte) {
select { select {
case n.compactc <- d: case n.compactc <- d:
case <-n.done: case <-n.done:

View File

@ -14,7 +14,7 @@ import (
// and other kinds of messages to recvc chan. // and other kinds of messages to recvc chan.
func TestNodeStep(t *testing.T) { func TestNodeStep(t *testing.T) {
for i := range mtmap { for i := range mtmap {
n := &Node{ n := &node{
propc: make(chan raftpb.Message, 1), propc: make(chan raftpb.Message, 1),
recvc: make(chan raftpb.Message, 1), recvc: make(chan raftpb.Message, 1),
} }
@ -39,7 +39,7 @@ func TestNodeStep(t *testing.T) {
// Cancel and Stop should unblock Step() // Cancel and Stop should unblock Step()
func TestNodeStepUnblock(t *testing.T) { func TestNodeStepUnblock(t *testing.T) {
// a node without buffer to block step // a node without buffer to block step
n := &Node{ n := &node{
propc: make(chan raftpb.Message), propc: make(chan raftpb.Message),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -154,7 +154,7 @@ func TestNode(t *testing.T) {
}, },
} }
n := Start(1, []int64{1}, 0, 0) n := StartNode(1, []int64{1}, 0, 0)
n.Campaign(ctx) n.Campaign(ctx)
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
@ -186,7 +186,7 @@ func TestNodeRestart(t *testing.T) {
CommittedEntries: entries[1 : st.Commit+1], CommittedEntries: entries[1 : st.Commit+1],
} }
n := Restart(1, []int64{1}, 0, 0, nil, st, entries) n := RestartNode(1, []int64{1}, 0, 0, nil, st, entries)
if g := <-n.Ready(); !reflect.DeepEqual(g, want) { if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
t.Errorf("g = %+v,\n w %+v", g, want) t.Errorf("g = %+v,\n w %+v", g, want)
} }