diff --git a/raft/node.go b/raft/node.go new file mode 100644 index 000000000..03598a8b5 --- /dev/null +++ b/raft/node.go @@ -0,0 +1,31 @@ +package raft + +import "sync" + +type Interface interface { + Step(m Message) +} + +type Node struct { + lk sync.Mutex + sm *stateMachine +} + +func New(k, addr int, next Interface) Interface { + n := &Node{ + sm: newStateMachine(k, addr, next), + } + return n +} + +// Propose asynchronously proposes data be applied to the underlying state machine. +func (n *Node) Propose(data []byte) { + m := Message{Type: msgHup, Data: data} + n.Step(m) +} + +func (n *Node) Step(m Message) { + n.lk.Lock() + defer n.lk.Unlock() + n.sm.Step(m) +} diff --git a/raft/raft.go b/raft/raft.go index af4378a4f..1dfde3123 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -69,10 +69,6 @@ type Message struct { Data []byte } -type stepper interface { - step(m Message) -} - type index struct { match, next int } @@ -112,13 +108,13 @@ type stateMachine struct { votes map[int]bool - next stepper + next Interface // the leader addr lead int } -func newStateMachine(k, addr int, next stepper) *stateMachine { +func newStateMachine(k, addr int, next Interface) *stateMachine { log := make([]Entry, 1, 1024) sm := &stateMachine{k: k, addr: addr, next: next, log: log} sm.reset() @@ -160,7 +156,7 @@ func (sm *stateMachine) isLogOk(i, term int) bool { func (sm *stateMachine) send(m Message) { m.From = sm.addr m.Term = sm.term - sm.next.step(m) + sm.next.Step(m) } // sendAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis. @@ -236,7 +232,7 @@ func (sm *stateMachine) becomeFollower(term, lead int) { sm.state = stateFollower } -func (sm *stateMachine) step(m Message) { +func (sm *stateMachine) Step(m Message) { switch m.Type { case msgHup: sm.term++ diff --git a/raft/raft_test.go b/raft/raft_test.go index 4b364625b..a00eb9d9b 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -45,7 +45,7 @@ func TestLeaderElection(t *testing.T) { } for i, tt := range tests { - tt.step(Message{To: 0, Type: msgHup}) + tt.Step(Message{To: 0, Type: msgHup}) sm := tt.network.ss[0].(*stateMachine) if sm.state != tt.state { t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) @@ -65,7 +65,7 @@ func TestDualingCandidates(t *testing.T) { heal := false next := stepperFunc(func(m Message) { if heal { - tt.step(m) + tt.Step(m) } }) a.next = next @@ -74,12 +74,12 @@ func TestDualingCandidates(t *testing.T) { tt.tee = stepperFunc(func(m Message) { t.Logf("m = %+v", m) }) - tt.step(Message{To: 0, Type: msgHup}) - tt.step(Message{To: 2, Type: msgHup}) + tt.Step(Message{To: 0, Type: msgHup}) + tt.Step(Message{To: 2, Type: msgHup}) t.Log("healing") heal = true - tt.step(Message{To: 2, Type: msgHup}) + tt.Step(Message{To: 2, Type: msgHup}) tests := []struct { sm *stateMachine @@ -115,15 +115,15 @@ func TestCandidateConcede(t *testing.T) { a.next = nopStepper - tt.step(Message{To: 0, Type: msgHup}) - tt.step(Message{To: 2, Type: msgHup}) + tt.Step(Message{To: 0, Type: msgHup}) + tt.Step(Message{To: 2, Type: msgHup}) // heal the partition a.next = tt data := []byte("force follower") // send a proposal to 2 to flush out a msgApp to 0 - tt.step(Message{To: 2, Type: msgProp, Data: data}) + tt.Step(Message{To: 2, Type: msgProp, Data: data}) if g := a.state; g != stateFollower { t.Errorf("state = %s, want %s", g, stateFollower) @@ -142,11 +142,11 @@ func TestCandidateConcede(t *testing.T) { func TestOldMessages(t *testing.T) { tt := newNetwork(nil, nil, nil) // make 0 leader @ term 3 - tt.step(Message{To: 0, Type: msgHup}) - tt.step(Message{To: 0, Type: msgHup}) - tt.step(Message{To: 0, Type: msgHup}) + tt.Step(Message{To: 0, Type: msgHup}) + tt.Step(Message{To: 0, Type: msgHup}) + tt.Step(Message{To: 0, Type: msgHup}) // pretend we're an old leader trying to make progress - tt.step(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}) + tt.Step(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}) if g := diffLogs(defaultLog, tt.logs()); g != nil { for _, diff := range g { t.Errorf("bag log:\n%s", diff) @@ -184,7 +184,7 @@ func TestProposal(t *testing.T) { } } }() - tt.step(m) + tt.Step(m) }) data := []byte("somedata") @@ -224,10 +224,10 @@ func TestProposalByProxy(t *testing.T) { }) // promote 0 the leader - tt.step(Message{To: 0, Type: msgHup}) + tt.Step(Message{To: 0, Type: msgHup}) // propose via follower - tt.step(Message{To: 1, Type: msgProp, Data: []byte("somedata")}) + tt.Step(Message{To: 1, Type: msgProp, Data: []byte("somedata")}) wantLog := []Entry{{}, {Term: 1, Data: data}} if g := diffLogs(wantLog, tt.logs()); g != nil { @@ -277,7 +277,7 @@ func TestVote(t *testing.T) { t.Errorf("#%d, m.Index = %d, want %d", i, m.Index, tt.w) } }) - sm.step(Message{Type: msgVote, Index: tt.i, LogTerm: tt.term}) + sm.Step(Message{Type: msgVote, Index: tt.i, LogTerm: tt.term}) if !called { t.Fatal("#%d: not called", i) } @@ -302,14 +302,14 @@ func TestLogDiff(t *testing.T) { } type network struct { - tee stepper - ss []stepper + tee Interface + ss []Interface } // newNetwork initializes a network from nodes. A nil node will be replaced // with a new *stateMachine. A *stateMachine will get its k, addr, and next // fields set. -func newNetwork(nodes ...stepper) *network { +func newNetwork(nodes ...Interface) *network { nt := &network{ss: nodes} for i, n := range nodes { switch v := n.(type) { @@ -328,11 +328,11 @@ func newNetwork(nodes ...stepper) *network { return nt } -func (nt network) step(m Message) { +func (nt network) Step(m Message) { if nt.tee != nil { - nt.tee.step(m) + nt.tee.Step(m) } - nt.ss[m.To].step(m) + nt.ss[m.To].Step(m) } // logs returns all logs in nt prepended with want. If a node is not a @@ -424,8 +424,6 @@ func diffLogs(base []Entry, logs [][]Entry) []diff { type stepperFunc func(Message) -func (f stepperFunc) step(m Message) { f(m) } +func (f stepperFunc) Step(m Message) { f(m) } var nopStepper = stepperFunc(func(Message) {}) - -type nextStepperFunc func(Message, stepper)