From 134a962222776af00ba7f2dff48e63f01ea0b3be Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Sun, 24 Aug 2014 20:09:06 -0700 Subject: [PATCH] raft: move raft2 to raft --- raft/cluster_test.go | 190 ------------------ {raft2 => raft}/diff_test.go | 0 {raft2 => raft}/entry.pb.go | 0 {raft2 => raft}/entry.proto | 0 {raft2 => raft}/example_test.go | 0 {raft2 => raft}/genproto.sh | 0 raft/info.proto | 12 -- {raft2 => raft}/log.go | 0 {raft2 => raft}/log_test.go | 0 raft/node.go | 328 +++++++++----------------------- raft/node_test.go | 223 +--------------------- {raft2 => raft}/raft.go | 0 {raft2 => raft}/raft_test.go | 0 {raft2 => raft}/snapshot.go | 0 {raft2 => raft}/state.pb.go | 0 {raft2 => raft}/state.proto | 0 raft2/node.go | 127 ------------- raft2/node_test.go | 5 - 18 files changed, 93 insertions(+), 792 deletions(-) delete mode 100644 raft/cluster_test.go rename {raft2 => raft}/diff_test.go (100%) rename {raft2 => raft}/entry.pb.go (100%) rename {raft2 => raft}/entry.proto (100%) rename {raft2 => raft}/example_test.go (100%) rename {raft2 => raft}/genproto.sh (100%) delete mode 100644 raft/info.proto rename {raft2 => raft}/log.go (100%) rename {raft2 => raft}/log_test.go (100%) rename {raft2 => raft}/raft.go (100%) rename {raft2 => raft}/raft_test.go (100%) rename {raft2 => raft}/snapshot.go (100%) rename {raft2 => raft}/state.pb.go (100%) rename {raft2 => raft}/state.proto (100%) delete mode 100644 raft2/node.go delete mode 100644 raft2/node_test.go diff --git a/raft/cluster_test.go b/raft/cluster_test.go deleted file mode 100644 index 829580c9b..000000000 --- a/raft/cluster_test.go +++ /dev/null @@ -1,190 +0,0 @@ -package raft - -import ( - "reflect" - "testing" -) - -// TestBuildCluster ensures cluster with various size could be built. -func TestBuildCluster(t *testing.T) { - tests := []struct { - size int - ids []int64 - }{ - {1, nil}, - {3, nil}, - {5, nil}, - {7, nil}, - {9, nil}, - {13, nil}, - {51, nil}, - {1, []int64{1}}, - {3, []int64{1, 3, 5}}, - {5, []int64{1, 4, 7, 10, 13}}, - } - - for i, tt := range tests { - _, nodes := buildCluster(tt.size, tt.ids) - - base := ltoa(nodes[0].sm.raftLog) - for j, n := range nodes { - // ensure same log - l := ltoa(n.sm.raftLog) - if g := diffu(base, l); g != "" { - t.Errorf("#%d.%d: log diff:\n%s", i, j, g) - } - - // ensure same leader - var w int64 - if tt.ids != nil { - w = tt.ids[0] - } - if g := n.sm.lead.Get(); g != w { - t.Errorf("#%d.%d: lead = %d, want %d", i, j, g, w) - } - - // ensure same peer map - p := map[int64]struct{}{} - for k := range n.sm.ins { - p[k] = struct{}{} - } - wp := map[int64]struct{}{} - for k := 0; k < tt.size; k++ { - if tt.ids != nil { - wp[tt.ids[k]] = struct{}{} - } else { - wp[int64(k)] = struct{}{} - } - } - if !reflect.DeepEqual(p, wp) { - t.Errorf("#%d.%d: peers = %+v, want %+v", i, j, p, wp) - } - } - } -} - -func TestInitCluster(t *testing.T) { - node := New(1, defaultHeartbeat, defaultElection) - dictate(node) - node.Next() - - if node.ClusterId() != 0xBEEF { - t.Errorf("clusterId = %x, want %x", node.ClusterId(), 0xBEEF) - } - - func() { - defer func() { - e := recover() - if e != "cannot init a started cluster" { - t.Errorf("err = %v, want cannot init a started cluster", e) - } - }() - node.InitCluster(0xFBEE) - node.Next() - }() -} - -func TestMessageFromDifferentCluster(t *testing.T) { - tests := []struct { - clusterId int64 - wType messageType - }{ - {0xBEEF, msgVoteResp}, - {0xFBEE, msgDenied}, - } - - for i, tt := range tests { - node := New(1, defaultHeartbeat, defaultElection) - dictate(node) - node.Next() - - node.Step(Message{From: 1, ClusterId: tt.clusterId, Type: msgVote, Term: 2, LogTerm: 2, Index: 2}) - msgs := node.Msgs() - if len(msgs) != 1 { - t.Errorf("#%d: len(msgs) = %d, want 1", i, len(msgs)) - } - if msgs[0].Type != tt.wType { - t.Errorf("#%d: msg.Type = %v, want %d", i, msgs[0].Type, tt.wType) - } - } -} - -// TestBasicCluster ensures all nodes can send proposal to the cluster. -// And all the proposals will get committed. -func TestBasicCluster(t *testing.T) { - tests := []struct { - size int - round int - }{ - {1, 3}, - {3, 3}, - {5, 3}, - {7, 3}, - {13, 1}, - } - - for i, tt := range tests { - nt, nodes := buildCluster(tt.size, nil) - - for j := 0; j < tt.round; j++ { - for _, n := range nodes { - data := []byte{byte(n.Id())} - nt.send(Message{From: n.Id(), To: n.Id(), ClusterId: n.ClusterId(), Type: msgProp, Entries: []Entry{{Data: data}}}) - - base := nodes[0].Next() - if len(base) != 1 { - t.Fatalf("#%d: len(ents) = %d, want 1", i, len(base)) - } - if !reflect.DeepEqual(base[0].Data, data) { - t.Errorf("#%d: data = %s, want %s", i, base[0].Data, data) - } - for k := 1; k < tt.size; k++ { - g := nodes[k].Next() - if !reflect.DeepEqual(g, base) { - t.Errorf("#%d.%d: ent = %v, want %v", i, k, g, base) - } - } - } - } - } -} - -// This function is full of heck now. It will go away when we finish our -// network Interface, and ticker infrastructure. -func buildCluster(size int, ids []int64) (nt *network, nodes []*Node) { - if ids == nil { - ids = make([]int64, size) - for i := 0; i < size; i++ { - ids[i] = int64(i) - } - } - - nodes = make([]*Node, size) - nis := make([]Interface, size) - for i := range nodes { - nodes[i] = New(ids[i], defaultHeartbeat, defaultElection) - nis[i] = nodes[i] - } - nt = newNetwork(nis...) - - lead := dictate(nodes[0]) - lead.Next() - for i := 1; i < size; i++ { - lead.Add(ids[i], "", nil) - nt.send(lead.Msgs()...) - for j := 0; j < i; j++ { - nodes[j].Next() - } - } - - for i := 0; i < 10*defaultHeartbeat; i++ { - nodes[0].Tick() - } - msgs := nodes[0].Msgs() - nt.send(msgs...) - - for _, n := range nodes { - n.Next() - } - return -} diff --git a/raft2/diff_test.go b/raft/diff_test.go similarity index 100% rename from raft2/diff_test.go rename to raft/diff_test.go diff --git a/raft2/entry.pb.go b/raft/entry.pb.go similarity index 100% rename from raft2/entry.pb.go rename to raft/entry.pb.go diff --git a/raft2/entry.proto b/raft/entry.proto similarity index 100% rename from raft2/entry.proto rename to raft/entry.proto diff --git a/raft2/example_test.go b/raft/example_test.go similarity index 100% rename from raft2/example_test.go rename to raft/example_test.go diff --git a/raft2/genproto.sh b/raft/genproto.sh similarity index 100% rename from raft2/genproto.sh rename to raft/genproto.sh diff --git a/raft/info.proto b/raft/info.proto deleted file mode 100644 index cc01b334d..000000000 --- a/raft/info.proto +++ /dev/null @@ -1,12 +0,0 @@ -package raft; - -import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto"; - -option (gogoproto.marshaler_all) = true; -option (gogoproto.sizer_all) = true; -option (gogoproto.unmarshaler_all) = true; -option (gogoproto.goproto_getters_all) = false; - -message Info { - required int64 id = 1 [(gogoproto.nullable) = false]; -} diff --git a/raft2/log.go b/raft/log.go similarity index 100% rename from raft2/log.go rename to raft/log.go diff --git a/raft2/log_test.go b/raft/log_test.go similarity index 100% rename from raft2/log_test.go rename to raft/log_test.go diff --git a/raft/node.go b/raft/node.go index 1327c1c11..d81021fac 100644 --- a/raft/node.go +++ b/raft/node.go @@ -1,273 +1,127 @@ +// Package raft implements raft. package raft -import ( - "encoding/binary" - "encoding/json" - "log" - "math/rand" - "sort" - "time" -) +import "code.google.com/p/go.net/context" -type Interface interface { - Step(m Message) bool - Msgs() []Message +type stateResp struct { + st State + ents, cents []Entry + msgs []Message } -type tick int64 +func (a State) Equal(b State) bool { + return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex +} -type Config struct { - NodeId int64 - Addr string - Context []byte +func (sr stateResp) containsUpdates(prev stateResp) bool { + return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0 } type Node struct { - sm *stateMachine - - elapsed tick - electionRand tick - election tick - heartbeat tick - - // TODO: it needs garbage collection later - rmNodes map[int64]struct{} - removed bool + ctx context.Context + propc chan []byte + recvc chan Message + statec chan stateResp + tickc chan struct{} } -func New(id int64, heartbeat, election tick) *Node { - if election < heartbeat*3 { - panic("election is least three times as heartbeat [election: %d, heartbeat: %d]") - } - - rand.Seed(time.Now().UnixNano()) +func Start(ctx context.Context, id int64, peers []int64) *Node { n := &Node{ - heartbeat: heartbeat, - election: election, - electionRand: election + tick(rand.Int31())%election, - sm: newStateMachine(id, []int64{id}), - rmNodes: make(map[int64]struct{}), + ctx: ctx, + propc: make(chan []byte), + recvc: make(chan Message), + statec: make(chan stateResp), + tickc: make(chan struct{}), } - + r := newRaft(id, peers) + go n.run(r) return n } -func Recover(id int64, s *Snapshot, ents []Entry, state State, heartbeat, election tick) *Node { - n := New(id, heartbeat, election) - if s != nil { - n.sm.restore(*s) - } - n.sm.loadEnts(ents) - if !state.IsEmpty() { - n.sm.loadState(state) - } - return n -} +func (n *Node) run(r *raft) { + propc := n.propc + statec := n.statec -func (n *Node) Id() int64 { return n.sm.id } - -func (n *Node) ClusterId() int64 { return n.sm.clusterId } - -func (n *Node) Info() Info { - return Info{Id: n.Id()} -} - -func (n *Node) Index() int64 { return n.sm.index.Get() } - -func (n *Node) Term() int64 { return n.sm.term.Get() } - -func (n *Node) Applied() int64 { return n.sm.raftLog.applied } - -func (n *Node) HasLeader() bool { return n.Leader() != none } - -func (n *Node) IsLeader() bool { return n.Leader() == n.Id() } - -func (n *Node) Leader() int64 { return n.sm.lead.Get() } - -func (n *Node) IsRemoved() bool { return n.removed } - -func (n *Node) Nodes() []int64 { - nodes := make(int64Slice, 0, len(n.sm.ins)) - for k := range n.sm.ins { - nodes = append(nodes, k) - } - sort.Sort(nodes) - return nodes -} - -// Propose asynchronously proposes data be applied to the underlying state machine. -func (n *Node) Propose(data []byte) { n.propose(Normal, data) } - -func (n *Node) propose(t int64, data []byte) { - n.Step(Message{From: n.sm.id, ClusterId: n.ClusterId(), Type: msgProp, Entries: []Entry{{Type: t, Data: data}}}) -} - -func (n *Node) Campaign() { n.Step(Message{From: n.sm.id, ClusterId: n.ClusterId(), Type: msgHup}) } - -func (n *Node) InitCluster(clusterId int64) { - d := make([]byte, 10) - wn := binary.PutVarint(d, clusterId) - n.propose(ClusterInit, d[:wn]) -} - -func (n *Node) Add(id int64, addr string, context []byte) { - n.UpdateConf(AddNode, &Config{NodeId: id, Addr: addr, Context: context}) -} - -func (n *Node) Remove(id int64) { - n.UpdateConf(RemoveNode, &Config{NodeId: id}) -} - -func (n *Node) Msgs() []Message { return n.sm.Msgs() } - -func (n *Node) Step(m Message) bool { - if m.Type == msgDenied { - n.removed = true - return false - } - if n.ClusterId() != none && m.ClusterId != none && m.ClusterId != n.ClusterId() { - log.Printf("deny message from=%d cluster=%d", m.From, m.ClusterId) - n.sm.send(Message{To: m.From, ClusterId: n.ClusterId(), Type: msgDenied}) - return true - } - - if _, ok := n.rmNodes[m.From]; ok { - if m.From != n.sm.id { - n.sm.send(Message{To: m.From, ClusterId: n.ClusterId(), Type: msgDenied}) + var prev stateResp + for { + if r.hasLeader() { + propc = n.propc + } else { + // We cannot accept proposals because we don't know who + // to send them to, so we'll apply back-pressure and + // block senders. + propc = nil } - return true - } - l := len(n.sm.msgs) + sr := stateResp{ + r.State, + r.raftLog.unstableEnts(), + r.raftLog.nextEnts(), + r.msgs, + } - if !n.sm.Step(m) { - return false - } + if sr.containsUpdates(prev) { + statec = n.statec + } else { + statec = nil + } - for _, m := range n.sm.msgs[l:] { - switch m.Type { - case msgAppResp: - // We just heard from the leader of the same term. - n.elapsed = 0 - case msgVoteResp: - // We just heard from the candidate the node voted for. - if m.Index >= 0 { - n.elapsed = 0 - } + select { + case p := <-propc: + r.propose(p) + case m := <-n.recvc: + r.Step(m) // raft never returns an error + case <-n.tickc: + // r.tick() + case statec <- sr: + r.raftLog.resetNextEnts() + r.raftLog.resetUnstable() + r.msgs = nil + case <-n.ctx.Done(): + return } } - return true } -// Next returns all the appliable entries -func (n *Node) Next() []Entry { - ents := n.sm.nextEnts() - for i := range ents { - switch ents[i].Type { - case Normal: - case ClusterInit: - cid, nr := binary.Varint(ents[i].Data) - if nr <= 0 { - panic("init cluster failed: cannot read clusterId") - } - if n.ClusterId() != -1 { - panic("cannot init a started cluster") - } - n.sm.clusterId = cid - case AddNode: - c := new(Config) - if err := json.Unmarshal(ents[i].Data, c); err != nil { - log.Printf("raft: err=%q", err) - continue - } - n.sm.addNode(c.NodeId) - delete(n.rmNodes, c.NodeId) - case RemoveNode: - c := new(Config) - if err := json.Unmarshal(ents[i].Data, c); err != nil { - log.Printf("raft: err=%q", err) - continue - } - n.sm.removeNode(c.NodeId) - n.rmNodes[c.NodeId] = struct{}{} - if c.NodeId == n.sm.id { - n.removed = true - } - default: - panic("unexpected entry type") - } - } - return ents -} - -// Tick triggers the node to do a tick. -// If the current elapsed is greater or equal than the timeout, -// node will send corresponding message to the statemachine. -func (n *Node) Tick() { - if !n.sm.promotable { - return - } - - timeout, msgType := n.electionRand, msgHup - if n.sm.state == stateLeader { - timeout, msgType = n.heartbeat, msgBeat - } - if n.elapsed >= timeout { - n.Step(Message{From: n.sm.id, ClusterId: n.ClusterId(), Type: msgType}) - n.elapsed = 0 - if n.sm.state != stateLeader { - n.electionRand = n.election + tick(rand.Int31())%n.election - } - } else { - n.elapsed++ +func (n *Node) Tick() error { + select { + case n.tickc <- struct{}{}: + return nil + case <-n.ctx.Done(): + return n.ctx.Err() } } -// IsEmpty returns ture if the log of the node is empty. -func (n *Node) IsEmpty() bool { - return n.sm.raftLog.isEmpty() -} - -func (n *Node) UpdateConf(t int64, c *Config) { - data, err := json.Marshal(c) - if err != nil { - panic(err) +// Propose proposes data be appended to the log. +func (n *Node) Propose(ctx context.Context, data []byte) error { + select { + case n.propc <- data: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-n.ctx.Done(): + return n.ctx.Err() } - n.propose(t, data) } -// UnstableEnts retuens all the entries that need to be persistent. -// The first return value is offset, and the second one is unstable entries. -func (n *Node) UnstableEnts() []Entry { - return n.sm.raftLog.unstableEnts() -} - -func (n *Node) UnstableState() State { - if n.sm.unstableState.IsEmpty() { - return EmptyState +// Step advances the state machine using m. +func (n *Node) Step(m Message) error { + select { + case n.recvc <- m: + return nil + case <-n.ctx.Done(): + return n.ctx.Err() } - s := n.sm.unstableState - n.sm.clearState() - return s } -func (n *Node) UnstableSnapshot() Snapshot { - if n.sm.raftLog.unstableSnapshot.IsEmpty() { - return emptySnapshot +// ReadState returns the current point-in-time state. +func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) { + select { + case sr := <-n.statec: + return sr.st, sr.ents, sr.cents, sr.msgs, nil + case <-ctx.Done(): + return State{}, nil, nil, nil, ctx.Err() + case <-n.ctx.Done(): + return State{}, nil, nil, nil, n.ctx.Err() } - s := n.sm.raftLog.unstableSnapshot - n.sm.raftLog.unstableSnapshot = emptySnapshot - return s -} - -func (n *Node) GetSnap() Snapshot { - return n.sm.raftLog.snapshot -} - -func (n *Node) Compact(d []byte) { - n.sm.compact(d) -} - -func (n *Node) EntsLen() int { - return len(n.sm.raftLog.ents) } diff --git a/raft/node_test.go b/raft/node_test.go index 32969f3d3..0a0cadc36 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -1,224 +1,5 @@ package raft -import ( - "reflect" - "testing" -) +import "testing" -const ( - defaultHeartbeat = 1 - defaultElection = 5 -) - -func TestTickMsgHup(t *testing.T) { - n := New(0, defaultHeartbeat, defaultElection) - n.sm = newStateMachine(0, []int64{0, 1, 2}) - n.sm.promotable = true - - for i := 0; i < defaultElection*2; i++ { - n.Tick() - } - - called := false - for _, m := range n.Msgs() { - if m.Type == msgVote { - called = true - } - } - - if !called { - t.Errorf("called = %v, want true", called) - } -} - -func TestTickMsgBeat(t *testing.T) { - k := 3 - n := dictate(New(0, defaultHeartbeat, defaultElection)) - n.Next() - for i := 1; i < k; i++ { - n.Add(int64(i), "", nil) - for _, m := range n.Msgs() { - if m.Type == msgApp { - n.Step(Message{From: m.To, ClusterId: m.ClusterId, Type: msgAppResp, Index: m.Index + int64(len(m.Entries))}) - } - } - // ignore commit index update messages - n.Msgs() - n.Next() - } - - for i := 0; i < defaultHeartbeat+1; i++ { - n.Tick() - } - - called := 0 - for _, m := range n.Msgs() { - if m.Type == msgApp && len(m.Entries) == 0 { - called++ - } - } - - // msgBeat -> k-1 append - w := k - 1 - if called != w { - t.Errorf("called = %v, want %v", called, w) - } -} - -func TestResetElapse(t *testing.T) { - tests := []struct { - msg Message - welapsed tick - }{ - {Message{From: 0, To: 1, Type: msgApp, Term: 2, Entries: []Entry{{Term: 1}}}, 0}, - {Message{From: 0, To: 1, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}, 1}, - {Message{From: 0, To: 1, Type: msgVote, Term: 2, Index: 1, LogTerm: 1}, 0}, - {Message{From: 0, To: 1, Type: msgVote, Term: 1}, 1}, - } - - for i, tt := range tests { - n := New(0, defaultHeartbeat, defaultElection) - n.sm = newStateMachine(0, []int64{0, 1, 2}) - n.sm.promotable = true - n.sm.raftLog.append(0, Entry{Type: Normal, Term: 1}) - n.sm.term = 2 - n.sm.raftLog.committed = 1 - - n.Tick() - if n.elapsed != 1 { - t.Errorf("%d: elpased = %d, want %d", i, n.elapsed, 1) - } - - n.Step(tt.msg) - if n.elapsed != tt.welapsed { - t.Errorf("%d: elpased = %d, want %d", i, n.elapsed, tt.welapsed) - } - } -} - -func TestStartCluster(t *testing.T) { - n := dictate(New(0, defaultHeartbeat, defaultElection)) - n.Next() - - if len(n.sm.ins) != 1 { - t.Errorf("k = %d, want 1", len(n.sm.ins)) - } - if n.sm.id != 0 { - t.Errorf("id = %d, want 0", n.sm.id) - } - if n.sm.state != stateLeader { - t.Errorf("state = %s, want %s", n.sm.state, stateLeader) - } -} - -func TestAdd(t *testing.T) { - n := dictate(New(0, defaultHeartbeat, defaultElection)) - n.Next() - n.Add(1, "", nil) - n.Next() - - if len(n.sm.ins) != 2 { - t.Errorf("k = %d, want 2", len(n.sm.ins)) - } - if n.sm.id != 0 { - t.Errorf("id = %d, want 0", n.sm.id) - } -} - -func TestRemove(t *testing.T) { - n := dictate(New(0, defaultHeartbeat, defaultElection)) - n.Next() - n.Add(1, "", nil) - n.Next() - n.Remove(0) - n.Step(Message{Type: msgAppResp, From: 1, ClusterId: n.ClusterId(), Term: 1, Index: 5}) - n.Next() - - if len(n.sm.ins) != 1 { - t.Errorf("k = %d, want 1", len(n.sm.ins)) - } - if n.sm.id != 0 { - t.Errorf("id = %d, want 0", n.sm.id) - } -} - -func TestDenial(t *testing.T) { - logents := []Entry{ - {Type: AddNode, Term: 1, Data: []byte(`{"NodeId":1}`)}, - {Type: AddNode, Term: 1, Data: []byte(`{"NodeId":2}`)}, - {Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":2}`)}, - } - - tests := []struct { - ent Entry - wdenied map[int64]bool - }{ - { - Entry{Type: AddNode, Term: 1, Data: []byte(`{"NodeId":2}`)}, - map[int64]bool{1: false, 2: false}, - }, - { - Entry{Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":1}`)}, - map[int64]bool{1: true, 2: true}, - }, - { - Entry{Type: RemoveNode, Term: 1, Data: []byte(`{"NodeId":0}`)}, - map[int64]bool{1: false, 2: true}, - }, - } - - for i, tt := range tests { - n := dictate(New(0, defaultHeartbeat, defaultElection)) - n.Next() - n.Msgs() - n.sm.raftLog.append(n.sm.raftLog.committed, append(logents, tt.ent)...) - n.sm.raftLog.committed += int64(len(logents) + 1) - n.Next() - - for id, denied := range tt.wdenied { - n.Step(Message{From: id, To: 0, ClusterId: n.ClusterId(), Type: msgApp, Term: 1}) - w := []Message{} - if denied { - w = []Message{{From: 0, To: id, ClusterId: n.ClusterId(), Term: 1, Type: msgDenied}} - } - if g := n.Msgs(); !reflect.DeepEqual(g, w) { - t.Errorf("#%d: msgs for %d = %+v, want %+v", i, id, g, w) - } - } - } -} - -func TestRecover(t *testing.T) { - ents := []Entry{{Term: 1}, {Term: 2}, {Term: 3}} - state := State{Term: 500, Vote: 1, Commit: 3} - - n := Recover(0, nil, ents, state, defaultHeartbeat, defaultElection) - if g := n.Next(); !reflect.DeepEqual(g, ents) { - t.Errorf("ents = %+v, want %+v", g, ents) - } - if g := n.sm.term; g.Get() != state.Term { - t.Errorf("term = %d, want %d", g, state.Term) - } - if g := n.sm.vote; g != state.Vote { - t.Errorf("vote = %d, want %d", g, state.Vote) - } - if g := n.sm.raftLog.committed; g != state.Commit { - t.Errorf("committed = %d, want %d", g, state.Commit) - } - if g := n.UnstableEnts(); g != nil { - t.Errorf("unstableEnts = %+v, want nil", g) - } - if g := n.UnstableState(); !reflect.DeepEqual(g, state) { - t.Errorf("unstableState = %+v, want %+v", g, state) - } - if g := n.Msgs(); len(g) != 0 { - t.Errorf("#%d: len(msgs) = %d, want 0", len(g)) - } -} - -func dictate(n *Node) *Node { - n.Step(Message{From: n.Id(), Type: msgHup}) - n.InitCluster(0xBEEF) - n.Add(n.Id(), "", nil) - return n -} +func TestNode(t *testing.T) {} diff --git a/raft2/raft.go b/raft/raft.go similarity index 100% rename from raft2/raft.go rename to raft/raft.go diff --git a/raft2/raft_test.go b/raft/raft_test.go similarity index 100% rename from raft2/raft_test.go rename to raft/raft_test.go diff --git a/raft2/snapshot.go b/raft/snapshot.go similarity index 100% rename from raft2/snapshot.go rename to raft/snapshot.go diff --git a/raft2/state.pb.go b/raft/state.pb.go similarity index 100% rename from raft2/state.pb.go rename to raft/state.pb.go diff --git a/raft2/state.proto b/raft/state.proto similarity index 100% rename from raft2/state.proto rename to raft/state.proto diff --git a/raft2/node.go b/raft2/node.go deleted file mode 100644 index d81021fac..000000000 --- a/raft2/node.go +++ /dev/null @@ -1,127 +0,0 @@ -// Package raft implements raft. -package raft - -import "code.google.com/p/go.net/context" - -type stateResp struct { - st State - ents, cents []Entry - msgs []Message -} - -func (a State) Equal(b State) bool { - return a.Term == b.Term && a.Vote == b.Vote && a.LastIndex == b.LastIndex -} - -func (sr stateResp) containsUpdates(prev stateResp) bool { - return !prev.st.Equal(sr.st) || len(sr.ents) > 0 || len(sr.cents) > 0 || len(sr.msgs) > 0 -} - -type Node struct { - ctx context.Context - propc chan []byte - recvc chan Message - statec chan stateResp - tickc chan struct{} -} - -func Start(ctx context.Context, id int64, peers []int64) *Node { - n := &Node{ - ctx: ctx, - propc: make(chan []byte), - recvc: make(chan Message), - statec: make(chan stateResp), - tickc: make(chan struct{}), - } - r := newRaft(id, peers) - go n.run(r) - return n -} - -func (n *Node) run(r *raft) { - propc := n.propc - statec := n.statec - - var prev stateResp - for { - if r.hasLeader() { - propc = n.propc - } else { - // We cannot accept proposals because we don't know who - // to send them to, so we'll apply back-pressure and - // block senders. - propc = nil - } - - sr := stateResp{ - r.State, - r.raftLog.unstableEnts(), - r.raftLog.nextEnts(), - r.msgs, - } - - if sr.containsUpdates(prev) { - statec = n.statec - } else { - statec = nil - } - - select { - case p := <-propc: - r.propose(p) - case m := <-n.recvc: - r.Step(m) // raft never returns an error - case <-n.tickc: - // r.tick() - case statec <- sr: - r.raftLog.resetNextEnts() - r.raftLog.resetUnstable() - r.msgs = nil - case <-n.ctx.Done(): - return - } - } -} - -func (n *Node) Tick() error { - select { - case n.tickc <- struct{}{}: - return nil - case <-n.ctx.Done(): - return n.ctx.Err() - } -} - -// Propose proposes data be appended to the log. -func (n *Node) Propose(ctx context.Context, data []byte) error { - select { - case n.propc <- data: - return nil - case <-ctx.Done(): - return ctx.Err() - case <-n.ctx.Done(): - return n.ctx.Err() - } -} - -// Step advances the state machine using m. -func (n *Node) Step(m Message) error { - select { - case n.recvc <- m: - return nil - case <-n.ctx.Done(): - return n.ctx.Err() - } -} - -// ReadState returns the current point-in-time state. -func (n *Node) ReadState(ctx context.Context) (st State, ents, cents []Entry, msgs []Message, err error) { - select { - case sr := <-n.statec: - return sr.st, sr.ents, sr.cents, sr.msgs, nil - case <-ctx.Done(): - return State{}, nil, nil, nil, ctx.Err() - case <-n.ctx.Done(): - return State{}, nil, nil, nil, n.ctx.Err() - } -} diff --git a/raft2/node_test.go b/raft2/node_test.go deleted file mode 100644 index 0a0cadc36..000000000 --- a/raft2/node_test.go +++ /dev/null @@ -1,5 +0,0 @@ -package raft - -import "testing" - -func TestNode(t *testing.T) {}