diff --git a/raft/cluster_test.go b/raft/cluster_test.go index a08215990..8173b72f2 100644 --- a/raft/cluster_test.go +++ b/raft/cluster_test.go @@ -9,7 +9,7 @@ import ( func TestBuildCluster(t *testing.T) { tests := []struct { size int - ids []int + ids []int64 }{ {1, nil}, {3, nil}, @@ -18,9 +18,9 @@ func TestBuildCluster(t *testing.T) { {9, nil}, {13, nil}, {51, nil}, - {1, []int{1}}, - {3, []int{1, 3, 5}}, - {5, []int{1, 4, 7, 10, 13}}, + {1, []int64{1}}, + {3, []int64{1, 3, 5}}, + {5, []int64{1, 4, 7, 10, 13}}, } for i, tt := range tests { @@ -35,7 +35,7 @@ func TestBuildCluster(t *testing.T) { } // ensure same leader - w := 0 + var w int64 if tt.ids != nil { w = tt.ids[0] } @@ -44,16 +44,16 @@ func TestBuildCluster(t *testing.T) { } // ensure same peer map - p := map[int]struct{}{} + p := map[int64]struct{}{} for k := range n.sm.ins { p[k] = struct{}{} } - wp := map[int]struct{}{} + wp := map[int64]struct{}{} for k := 0; k < tt.size; k++ { if tt.ids != nil { wp[tt.ids[k]] = struct{}{} } else { - wp[k] = struct{}{} + wp[int64(k)] = struct{}{} } } if !reflect.DeepEqual(p, wp) { @@ -105,11 +105,11 @@ func TestBasicCluster(t *testing.T) { // This function is full of heck now. It will go away when we finish our // network Interface, and ticker infrastructure. -func buildCluster(size int, ids []int) (nt *network, nodes []*Node) { +func buildCluster(size int, ids []int64) (nt *network, nodes []*Node) { if ids == nil { - ids = make([]int, size) + ids = make([]int64, size) for i := 0; i < size; i++ { - ids[i] = i + ids[i] = int64(i) } } diff --git a/raft/node.go b/raft/node.go index 4084a07f5..f063d78a1 100644 --- a/raft/node.go +++ b/raft/node.go @@ -13,7 +13,7 @@ type Interface interface { type tick int type Config struct { - NodeId int + NodeId int64 Addr string } @@ -25,7 +25,7 @@ type Node struct { heartbeat tick } -func New(id int, heartbeat, election tick) *Node { +func New(id int64, heartbeat, election tick) *Node { if election < heartbeat*3 { panic("election is least three times as heartbeat [election: %d, heartbeat: %d]") } @@ -33,13 +33,13 @@ func New(id int, heartbeat, election tick) *Node { n := &Node{ heartbeat: heartbeat, election: election, - sm: newStateMachine(id, []int{id}), + sm: newStateMachine(id, []int64{id}), } return n } -func (n *Node) Id() int { return n.sm.id } +func (n *Node) Id() int64 { return n.sm.id } func (n *Node) HasLeader() bool { return n.sm.lead != none } @@ -52,9 +52,9 @@ func (n *Node) propose(t int, data []byte) { func (n *Node) Campaign() { n.Step(Message{Type: msgHup}) } -func (n *Node) Add(id int, addr string) { n.updateConf(AddNode, &Config{NodeId: id, Addr: addr}) } +func (n *Node) Add(id int64, addr string) { n.updateConf(AddNode, &Config{NodeId: id, Addr: addr}) } -func (n *Node) Remove(id int) { n.updateConf(RemoveNode, &Config{NodeId: id}) } +func (n *Node) Remove(id int64) { n.updateConf(RemoveNode, &Config{NodeId: id}) } func (n *Node) Msgs() []Message { return n.sm.Msgs() } diff --git a/raft/node_test.go b/raft/node_test.go index adb50e0a2..f3f7fe212 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -11,7 +11,7 @@ const ( func TestTickMsgHup(t *testing.T) { n := New(0, defaultHeartbeat, defaultElection) - n.sm = newStateMachine(0, []int{0, 1, 2}) + n.sm = newStateMachine(0, []int64{0, 1, 2}) // simulate to patch the join log n.Step(Message{Type: msgApp, Commit: 1, Entries: []Entry{Entry{}}}) @@ -36,7 +36,7 @@ func TestTickMsgBeat(t *testing.T) { n := dictate(New(0, defaultHeartbeat, defaultElection)) n.Next() for i := 1; i < k; i++ { - n.Add(i, "") + n.Add(int64(i), "") for _, m := range n.Msgs() { if m.Type == msgApp { n.Step(Message{From: m.To, Type: msgAppResp, Index: m.Index + len(m.Entries)}) @@ -78,7 +78,7 @@ func TestResetElapse(t *testing.T) { for i, tt := range tests { n := New(0, defaultHeartbeat, defaultElection) - n.sm = newStateMachine(0, []int{0, 1, 2}) + n.sm = newStateMachine(0, []int64{0, 1, 2}) n.sm.term = 2 n.sm.log.committed = 1 diff --git a/raft/raft.go b/raft/raft.go index 0699c4dc2..cc8c45b18 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -63,8 +63,8 @@ func (st stateType) String() string { type Message struct { Type messageType - To int - From int + To int64 + From int64 Term int LogTerm int Index int @@ -90,27 +90,27 @@ func (in *index) decr() { } type stateMachine struct { - id int + id int64 // the term we are participating in at any time term int // who we voted for in term - vote int + vote int64 // the log log *log - ins map[int]*index + ins map[int64]*index state stateType - votes map[int]bool + votes map[int64]bool msgs []Message // the leader id - lead int + lead int64 // pending reconfiguration pendingConf bool @@ -118,8 +118,8 @@ type stateMachine struct { snapshoter Snapshoter } -func newStateMachine(id int, peers []int) *stateMachine { - sm := &stateMachine{id: id, log: newLog(), ins: make(map[int]*index)} +func newStateMachine(id int64, peers []int64) *stateMachine { + sm := &stateMachine{id: id, log: newLog(), ins: make(map[int64]*index)} for _, p := range peers { sm.ins[p] = &index{} } @@ -131,7 +131,7 @@ func (sm *stateMachine) setSnapshoter(snapshoter Snapshoter) { sm.snapshoter = snapshoter } -func (sm *stateMachine) poll(id int, v bool) (granted int) { +func (sm *stateMachine) poll(id int64, v bool) (granted int) { if _, ok := sm.votes[id]; !ok { sm.votes[id] = v } @@ -151,7 +151,7 @@ func (sm *stateMachine) send(m Message) { } // sendAppend sends RRPC, with entries to the given peer. -func (sm *stateMachine) sendAppend(to int) { +func (sm *stateMachine) sendAppend(to int64) { in := sm.ins[to] m := Message{} m.To = to @@ -199,7 +199,7 @@ func (sm *stateMachine) reset(term int) { sm.term = term sm.lead = none sm.vote = none - sm.votes = make(map[int]bool) + sm.votes = make(map[int64]bool) for i := range sm.ins { sm.ins[i] = &index{next: sm.log.lastIndex() + 1} if i == sm.id { @@ -226,7 +226,7 @@ func (sm *stateMachine) promotable() bool { return sm.log.committed != 0 } -func (sm *stateMachine) becomeFollower(term, lead int) { +func (sm *stateMachine) becomeFollower(term int, lead int64) { sm.reset(term) sm.lead = lead sm.state = stateFollower @@ -311,12 +311,12 @@ func (sm *stateMachine) handleSnapshot(m Message) { sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()}) } -func (sm *stateMachine) addNode(id int) { +func (sm *stateMachine) addNode(id int64) { sm.ins[id] = &index{next: sm.log.lastIndex() + 1} sm.pendingConf = false } -func (sm *stateMachine) removeNode(id int) { +func (sm *stateMachine) removeNode(id int64) { delete(sm.ins, id) sm.pendingConf = false } @@ -424,7 +424,7 @@ func (sm *stateMachine) restore(s Snapshot) { } sm.log.restore(s.Index, s.Term) - sm.ins = make(map[int]*index) + sm.ins = make(map[int64]*index) for _, n := range s.Nodes { sm.ins[n] = &index{next: sm.log.lastIndex() + 1} if n == sm.id { @@ -445,8 +445,8 @@ func (sm *stateMachine) needSnapshot(i int) bool { return false } -func (sm *stateMachine) nodes() []int { - nodes := make([]int, 0, len(sm.ins)) +func (sm *stateMachine) nodes() []int64 { + nodes := make([]int64, 0, len(sm.ins)) for k := range sm.ins { nodes = append(nodes, k) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 0dadb091f..aeaa4b629 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -230,7 +230,7 @@ func TestDuelingCandidates(t *testing.T) { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } base := ltoa(tt.log) - if sm, ok := nt.peers[i].(*stateMachine); ok { + if sm, ok := nt.peers[int64(i)].(*stateMachine); ok { l := ltoa(sm.log) if g := diffu(base, l); g != "" { t.Errorf("#%d: diff:\n%s", i, g) @@ -433,9 +433,9 @@ func TestCommit(t *testing.T) { } for i, tt := range tests { - ins := make(map[int]*index) + ins := make(map[int64]*index) for j := 0; j < len(tt.matches); j++ { - ins[j] = &index{tt.matches[j], tt.matches[j] + 1} + ins[int64(j)] = &index{tt.matches[j], tt.matches[j] + 1} } sm := &stateMachine{log: &log{ents: tt.logs}, ins: ins, term: tt.smTerm} sm.maybeCommit() @@ -449,7 +449,7 @@ func TestRecvMsgVote(t *testing.T) { tests := []struct { state stateType i, term int - voteFor int + voteFor int64 w int }{ {stateFollower, 0, 0, none, -1}, @@ -505,7 +505,7 @@ func TestStateTransition(t *testing.T) { to stateType wallow bool wterm int - wlead int + wlead int64 }{ {stateFollower, stateFollower, true, 1, none}, {stateFollower, stateCandidate, true, 1, none}, @@ -530,7 +530,7 @@ func TestStateTransition(t *testing.T) { } }() - sm := newStateMachine(0, []int{0}) + sm := newStateMachine(0, []int64{0}) sm.state = tt.from switch tt.to { @@ -553,7 +553,7 @@ func TestStateTransition(t *testing.T) { } func TestConf(t *testing.T) { - sm := newStateMachine(0, []int{0}) + sm := newStateMachine(0, []int64{0}) sm.becomeCandidate() sm.becomeLeader() @@ -588,7 +588,7 @@ func TestConfChangeLeader(t *testing.T) { } for i, tt := range tests { - sm := newStateMachine(0, []int{0}) + sm := newStateMachine(0, []int64{0}) sm.log = &log{ents: []Entry{{}, {Type: tt.et}}} sm.becomeCandidate() @@ -617,7 +617,7 @@ func TestAllServerStepdown(t *testing.T) { tterm := 3 for i, tt := range tests { - sm := newStateMachine(0, []int{0, 1, 2}) + sm := newStateMachine(0, []int64{0, 1, 2}) switch tt.state { case stateFollower: sm.becomeFollower(1, 0) @@ -658,7 +658,7 @@ func TestLeaderAppResp(t *testing.T) { for i, tt := range tests { // sm term is 1 after it becomes the leader. // thus the last log term must be 1 to be committed. - sm := newStateMachine(0, []int{0, 1, 2}) + sm := newStateMachine(0, []int64{0, 1, 2}) sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}} sm.becomeCandidate() sm.becomeLeader() @@ -693,7 +693,7 @@ func TestRecvMsgBeat(t *testing.T) { } for i, tt := range tests { - sm := newStateMachine(0, []int{0, 1, 2}) + sm := newStateMachine(0, []int64{0, 1, 2}) sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}} sm.term = 1 sm.state = tt.state @@ -723,7 +723,7 @@ func TestMaybeCompact(t *testing.T) { } for i, tt := range tests { - sm := newStateMachine(0, []int{0, 1, 2}) + sm := newStateMachine(0, []int64{0, 1, 2}) sm.setSnapshoter(tt.snapshoter) for i := 0; i < defaultCompactThreshold*2; i++ { sm.log.append(i, Entry{Term: i + 1}) @@ -745,10 +745,12 @@ func TestMaybeCompact(t *testing.T) { } w := sm.nodes() - sort.Ints(w) - sort.Ints(s.Nodes) - if !reflect.DeepEqual(s.Nodes, w) { - t.Errorf("#%d: snap.Nodes = %+v, want %+v", i, s.Nodes, w) + sw := int64Slice(w) + sg := int64Slice(s.Nodes) + sort.Sort(sw) + sort.Sort(sg) + if !reflect.DeepEqual(sg, sw) { + t.Errorf("#%d: snap.Nodes = %+v, want %+v", i, sg, sw) } } } @@ -758,7 +760,7 @@ func TestRestore(t *testing.T) { s := Snapshot{ Index: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1, - Nodes: []int{0, 1, 2}, + Nodes: []int64{0, 1, 2}, } tests := []struct { @@ -779,7 +781,7 @@ func TestRestore(t *testing.T) { } }() - sm := newStateMachine(0, []int{0, 1}) + sm := newStateMachine(0, []int64{0, 1}) sm.setSnapshoter(tt.snapshoter) sm.restore(s) @@ -789,11 +791,12 @@ func TestRestore(t *testing.T) { if sm.log.term(s.Index) != s.Term { t.Errorf("#%d: log.lastTerm = %d, want %d", i, sm.log.term(s.Index), s.Term) } - g := sm.nodes() - sort.Ints(g) - sort.Ints(s.Nodes) - if !reflect.DeepEqual(g, s.Nodes) { - t.Errorf("#%d: sm.Nodes = %+v, want %+v", i, g, s.Nodes) + sg := int64Slice(sm.nodes()) + sw := int64Slice(s.Nodes) + sort.Sort(sg) + sort.Sort(sw) + if !reflect.DeepEqual(sg, sw) { + t.Errorf("#%d: sm.Nodes = %+v, want %+v", i, sg, sw) } if !reflect.DeepEqual(sm.snapshoter.GetSnap(), s) { t.Errorf("%d: snapshoter.getSnap = %+v, want %+v", sm.snapshoter.GetSnap(), s) @@ -806,9 +809,9 @@ func TestProvideSnap(t *testing.T) { s := Snapshot{ Index: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1, - Nodes: []int{0, 1}, + Nodes: []int64{0, 1}, } - sm := newStateMachine(0, []int{0}) + sm := newStateMachine(0, []int64{0}) sm.setSnapshoter(new(logSnapshoter)) // restore the statemachin from a snapshot // so it has a compacted log and a snapshot @@ -846,11 +849,11 @@ func TestRestoreFromSnapMsg(t *testing.T) { s := Snapshot{ Index: defaultCompactThreshold + 1, Term: defaultCompactThreshold + 1, - Nodes: []int{0, 1}, + Nodes: []int64{0, 1}, } m := Message{Type: msgSnap, From: 0, Term: 1, Snapshot: s} - sm := newStateMachine(1, []int{0, 1}) + sm := newStateMachine(1, []int64{0, 1}) sm.setSnapshoter(new(logSnapshoter)) sm.Step(m) @@ -900,7 +903,7 @@ func ents(terms ...int) *stateMachine { } type network struct { - peers map[int]Interface + peers map[int64]Interface dropm map[connem]float64 ignorem map[messageType]bool } @@ -911,31 +914,32 @@ type network struct { // When using stateMachine, the address list is always [0, n). func newNetwork(peers ...Interface) *network { size := len(peers) - defaultPeerAddrs := make([]int, size) + defaultPeerAddrs := make([]int64, size) for i := 0; i < size; i++ { - defaultPeerAddrs[i] = i + defaultPeerAddrs[i] = int64(i) } - npeers := make(map[int]Interface, size) + npeers := make(map[int64]Interface, size) for id, p := range peers { + nid := int64(id) switch v := p.(type) { case nil: - sm := newStateMachine(id, defaultPeerAddrs) + sm := newStateMachine(nid, defaultPeerAddrs) sm.setSnapshoter(new(logSnapshoter)) - npeers[id] = sm + npeers[nid] = sm case *stateMachine: - v.id = id - v.ins = make(map[int]*index) + v.id = nid + v.ins = make(map[int64]*index) for i := 0; i < size; i++ { - v.ins[i] = &index{} + v.ins[int64(i)] = &index{} } v.reset(0) - npeers[id] = v + npeers[nid] = v case *Node: npeers[v.sm.id] = v default: - npeers[id] = v + npeers[nid] = v } } return &network{ @@ -954,20 +958,21 @@ func (nw *network) send(msgs ...Message) { } } -func (nw *network) drop(from, to int, perc float64) { +func (nw *network) drop(from, to int64, perc float64) { nw.dropm[connem{from, to}] = perc } -func (nw *network) cut(one, other int) { +func (nw *network) cut(one, other int64) { nw.drop(one, other, 1) nw.drop(other, one, 1) } -func (nw *network) isolate(id int) { +func (nw *network) isolate(id int64) { for i := 0; i < len(nw.peers); i++ { - if i != id { - nw.drop(id, i, 1.0) - nw.drop(i, id, 1.0) + nid := int64(i) + if nid != id { + nw.drop(id, nid, 1.0) + nw.drop(nid, id, 1.0) } } } @@ -1003,7 +1008,7 @@ func (nw *network) filter(msgs []Message) []Message { } type connem struct { - from, to int + from, to int64 } type blackHole struct{} @@ -1017,7 +1022,7 @@ type logSnapshoter struct { snapshot Snapshot } -func (s *logSnapshoter) Snap(index, term int, nodes []int) { +func (s *logSnapshoter) Snap(index, term int, nodes []int64) { s.snapshot = Snapshot{ Index: index, Term: term, @@ -1031,3 +1036,10 @@ func (s *logSnapshoter) Restore(ss Snapshot) { func (s *logSnapshoter) GetSnap() Snapshot { return s.snapshot } + +// int64Slice implements sort interface +type int64Slice []int64 + +func (p int64Slice) Len() int { return len(p) } +func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/raft/snapshot.go b/raft/snapshot.go index aa7b6e8d6..ab2ca65eb 100644 --- a/raft/snapshot.go +++ b/raft/snapshot.go @@ -4,7 +4,7 @@ type Snapshot struct { Data []byte // the configuration - Nodes []int + Nodes []int64 // the index at which the snapshot was taken. Index int // the log term of the index @@ -14,7 +14,7 @@ type Snapshot struct { // A snapshoter can make a snapshot of its current state atomically. // It can restore from a snapshot and get the latest snapshot it took. type Snapshoter interface { - Snap(index, term int, nodes []int) + Snap(index, term int, nodes []int64) Restore(snap Snapshot) GetSnap() Snapshot }