mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #2128 from bdarnell/applied-restart
raft: Add applied index as an argument to newRaft and RestartNode.
This commit is contained in:
commit
e7d539e4ce
@ -116,7 +116,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N
|
||||
}
|
||||
s.SetHardState(st)
|
||||
s.Append(ents)
|
||||
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
|
||||
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0)
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
@ -157,7 +157,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
|
||||
}
|
||||
s.SetHardState(st)
|
||||
s.Append(ents)
|
||||
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
|
||||
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0)
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
|
10
raft/node.go
10
raft/node.go
@ -135,7 +135,7 @@ type Peer struct {
|
||||
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
|
||||
func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node {
|
||||
n := newNode()
|
||||
r := newRaft(id, nil, election, heartbeat, storage)
|
||||
r := newRaft(id, nil, election, heartbeat, storage, 0)
|
||||
|
||||
// become the follower at term 1 and apply initial configuration
|
||||
// entires of term 1
|
||||
@ -171,11 +171,13 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
|
||||
return &n
|
||||
}
|
||||
|
||||
// RestartNode is identical to StartNode but does not take a list of peers.
|
||||
// RestartNode is similar to StartNode but does not take a list of peers.
|
||||
// The current membership of the cluster will be restored from the Storage.
|
||||
func RestartNode(id uint64, election, heartbeat int, storage Storage) Node {
|
||||
// If the caller has an existing state machine, pass in the last log index that
|
||||
// has been applied to it; otherwise use zero.
|
||||
func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node {
|
||||
n := newNode()
|
||||
r := newRaft(id, nil, election, heartbeat, storage)
|
||||
r := newRaft(id, nil, election, heartbeat, storage, applied)
|
||||
|
||||
go n.run(r)
|
||||
return &n
|
||||
|
@ -28,7 +28,7 @@ func BenchmarkOneNode(b *testing.B) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
||||
go n.run(r)
|
||||
|
||||
defer n.Stop()
|
||||
|
@ -116,7 +116,7 @@ func TestNodePropose(t *testing.T) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
||||
go n.run(r)
|
||||
n.Campaign(context.TODO())
|
||||
for {
|
||||
@ -154,7 +154,7 @@ func TestNodeProposeConfig(t *testing.T) {
|
||||
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
||||
go n.run(r)
|
||||
n.Campaign(context.TODO())
|
||||
for {
|
||||
@ -192,7 +192,7 @@ func TestNodeProposeConfig(t *testing.T) {
|
||||
// who is the current leader.
|
||||
func TestBlockProposal(t *testing.T) {
|
||||
n := newNode()
|
||||
r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
||||
go n.run(r)
|
||||
defer n.Stop()
|
||||
|
||||
@ -225,7 +225,7 @@ func TestBlockProposal(t *testing.T) {
|
||||
func TestNodeTick(t *testing.T) {
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
||||
go n.run(r)
|
||||
elapsed := r.elapsed
|
||||
n.Tick()
|
||||
@ -240,7 +240,7 @@ func TestNodeTick(t *testing.T) {
|
||||
func TestNodeStop(t *testing.T) {
|
||||
n := newNode()
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, s, 0)
|
||||
donec := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
@ -364,7 +364,7 @@ func TestNodeRestart(t *testing.T) {
|
||||
storage := NewMemoryStorage()
|
||||
storage.SetHardState(st)
|
||||
storage.Append(entries)
|
||||
n := RestartNode(1, 10, 1, storage)
|
||||
n := RestartNode(1, 10, 1, storage, 0)
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
|
||||
t.Errorf("g = %+v,\n w %+v", g, want)
|
||||
}
|
||||
@ -400,7 +400,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
|
||||
s.SetHardState(st)
|
||||
s.ApplySnapshot(snap)
|
||||
s.Append(entries)
|
||||
n := RestartNode(1, 10, 1, s)
|
||||
n := RestartNode(1, 10, 1, s, 0)
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
|
||||
t.Errorf("g = %+v,\n w %+v", g, want)
|
||||
} else {
|
||||
|
10
raft/raft.go
10
raft/raft.go
@ -139,7 +139,8 @@ type raft struct {
|
||||
step stepFunc
|
||||
}
|
||||
|
||||
func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
|
||||
func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage,
|
||||
applied uint64) *raft {
|
||||
if id == None {
|
||||
panic("cannot use none id")
|
||||
}
|
||||
@ -172,6 +173,9 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
|
||||
if !isHardStateEqual(hs, emptyState) {
|
||||
r.loadState(hs)
|
||||
}
|
||||
if applied > 0 {
|
||||
raftlog.appliedTo(applied)
|
||||
}
|
||||
r.becomeFollower(r.Term, None)
|
||||
|
||||
nodesStrs := make([]string, 0)
|
||||
@ -179,8 +183,8 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
|
||||
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
||||
}
|
||||
|
||||
log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, lastindex: %d, lastterm: %d]",
|
||||
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
||||
log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
|
||||
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
||||
return r
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,7 @@ func TestLeaderUpdateTermFromMessage(t *testing.T) {
|
||||
// it immediately reverts to follower state.
|
||||
// Reference: section 5.1
|
||||
func testUpdateTermFromMessage(t *testing.T, state StateType) {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
switch state {
|
||||
case StateFollower:
|
||||
r.becomeFollower(1, 2)
|
||||
@ -86,7 +86,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
|
||||
fakeStep := func(r *raft, m pb.Message) {
|
||||
called = true
|
||||
}
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.step = fakeStep
|
||||
r.loadState(pb.HardState{Term: 2})
|
||||
|
||||
@ -100,7 +100,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
|
||||
// TestStartAsFollower tests that when servers start up, they begin as followers.
|
||||
// Reference: section 5.2
|
||||
func TestStartAsFollower(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
if r.state != StateFollower {
|
||||
t.Errorf("state = %s, want %s", r.state, StateFollower)
|
||||
}
|
||||
@ -113,7 +113,7 @@ func TestStartAsFollower(t *testing.T) {
|
||||
func TestLeaderBcastBeat(t *testing.T) {
|
||||
// heartbeat interval
|
||||
hi := 1
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage(), 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
for i := 0; i < 10; i++ {
|
||||
@ -155,7 +155,7 @@ func TestCandidateStartNewElection(t *testing.T) {
|
||||
func testNonleaderStartElection(t *testing.T, state StateType) {
|
||||
// election timeout
|
||||
et := 10
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0)
|
||||
switch state {
|
||||
case StateFollower:
|
||||
r.becomeFollower(1, 2)
|
||||
@ -219,7 +219,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) {
|
||||
{5, map[uint64]bool{}, StateCandidate},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage(), 0)
|
||||
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||
for id, vote := range tt.votes {
|
||||
@ -252,7 +252,7 @@ func TestFollowerVote(t *testing.T) {
|
||||
{2, 1, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
|
||||
|
||||
r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
|
||||
@ -278,7 +278,7 @@ func TestCandidateFallback(t *testing.T) {
|
||||
{From: 2, To: 1, Term: 2, Type: pb.MsgApp},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||
if r.state != StateCandidate {
|
||||
t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
|
||||
@ -311,7 +311,7 @@ func TestCandidateElectionTimeoutRandomized(t *testing.T) {
|
||||
// Reference: section 5.2
|
||||
func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
|
||||
et := 10
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage(), 0)
|
||||
timeouts := make(map[int]bool)
|
||||
for round := 0; round < 50*et; round++ {
|
||||
switch state {
|
||||
@ -357,7 +357,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
|
||||
rs := make([]*raft, size)
|
||||
ids := idsBySize(size)
|
||||
for k := range rs {
|
||||
rs[k] = newRaft(ids[k], ids, et, 1, NewMemoryStorage())
|
||||
rs[k] = newRaft(ids[k], ids, et, 1, NewMemoryStorage(), 0)
|
||||
}
|
||||
conflicts := 0
|
||||
for round := 0; round < 1000; round++ {
|
||||
@ -400,7 +400,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
|
||||
// Reference: section 5.3
|
||||
func TestLeaderStartReplication(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
commitNoopEntry(r, s)
|
||||
@ -439,7 +439,7 @@ func TestLeaderStartReplication(t *testing.T) {
|
||||
// Reference: section 5.3
|
||||
func TestLeaderCommitEntry(t *testing.T) {
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, s, 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
commitNoopEntry(r, s)
|
||||
@ -493,7 +493,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) {
|
||||
}
|
||||
for i, tt := range tests {
|
||||
s := NewMemoryStorage()
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1, s)
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1, s, 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
commitNoopEntry(r, s)
|
||||
@ -527,7 +527,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append(tt)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
|
||||
r.loadState(pb.HardState{Term: 2})
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
@ -582,7 +582,7 @@ func TestFollowerCommitEntry(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.becomeFollower(1, 2)
|
||||
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
|
||||
@ -619,7 +619,7 @@ func TestFollowerCheckMsgApp(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append(ents)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
|
||||
r.loadState(pb.HardState{Commit: 2})
|
||||
r.becomeFollower(2, 2)
|
||||
|
||||
@ -675,7 +675,7 @@ func TestFollowerAppendEntries(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage, 0)
|
||||
r.becomeFollower(2, 2)
|
||||
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
|
||||
@ -744,11 +744,11 @@ func TestLeaderSyncFollowerLog(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
leadStorage := NewMemoryStorage()
|
||||
leadStorage.Append(ents)
|
||||
lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage)
|
||||
lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage, 0)
|
||||
lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
|
||||
followerStorage := NewMemoryStorage()
|
||||
followerStorage.Append(tt)
|
||||
follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage)
|
||||
follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage, 0)
|
||||
follower.loadState(pb.HardState{Term: term - 1})
|
||||
// It is necessary to have a three-node cluster.
|
||||
// The second may have more up-to-date log than the first one, so the
|
||||
@ -777,7 +777,7 @@ func TestVoteRequest(t *testing.T) {
|
||||
{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.Step(pb.Message{
|
||||
From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
|
||||
})
|
||||
@ -840,7 +840,7 @@ func TestVoter(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append(tt.ents)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
||||
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
|
||||
|
||||
@ -876,7 +876,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append(ents)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
||||
r.loadState(pb.HardState{Term: 2})
|
||||
// become leader at term 3
|
||||
r.becomeCandidate()
|
||||
|
@ -195,7 +195,7 @@ func TestProgressWaitReset(t *testing.T) {
|
||||
|
||||
// TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat.
|
||||
func TestProgressDecr(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.prs[2].Wait = r.heartbeatTimeout * 2
|
||||
@ -207,7 +207,7 @@ func TestProgressDecr(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestProgressWait(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||
@ -399,9 +399,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDuelingCandidates(t *testing.T) {
|
||||
a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
a := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
b := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
c := newRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
|
||||
nt := newNetwork(a, b, c)
|
||||
nt.cut(1, 3)
|
||||
@ -669,7 +669,7 @@ func TestCommit(t *testing.T) {
|
||||
storage.Append(tt.logs)
|
||||
storage.hardState = pb.HardState{Term: tt.smTerm}
|
||||
|
||||
sm := newRaft(1, []uint64{1}, 5, 1, storage)
|
||||
sm := newRaft(1, []uint64{1}, 5, 1, storage, 0)
|
||||
for j := 0; j < len(tt.matches); j++ {
|
||||
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1)
|
||||
}
|
||||
@ -694,7 +694,7 @@ func TestIsElectionTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.elapsed = tt.elapse
|
||||
c := 0
|
||||
for j := 0; j < 10000; j++ {
|
||||
@ -719,7 +719,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
|
||||
fakeStep := func(r *raft, m pb.Message) {
|
||||
called = true
|
||||
}
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.step = fakeStep
|
||||
sm.Term = 2
|
||||
sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
|
||||
@ -761,7 +761,7 @@ func TestHandleMsgApp(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, storage)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
|
||||
sm.becomeFollower(2, None)
|
||||
|
||||
sm.handleAppendEntries(tt.m)
|
||||
@ -795,7 +795,7 @@ func TestHandleHeartbeat(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
||||
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage)
|
||||
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0)
|
||||
sm.becomeFollower(2, 2)
|
||||
sm.raftLog.commitTo(commit)
|
||||
sm.handleHeartbeat(tt.m)
|
||||
@ -816,7 +816,7 @@ func TestHandleHeartbeat(t *testing.T) {
|
||||
func TestHandleHeartbeatResp(t *testing.T) {
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
|
||||
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage)
|
||||
sm := newRaft(1, []uint64{1, 2}, 5, 1, storage, 0)
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
sm.raftLog.commitTo(sm.raftLog.lastIndex())
|
||||
@ -904,7 +904,7 @@ func TestRecvMsgVote(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.state = tt.state
|
||||
switch tt.state {
|
||||
case StateFollower:
|
||||
@ -964,7 +964,7 @@ func TestStateTransition(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.state = tt.from
|
||||
|
||||
switch tt.to {
|
||||
@ -1003,7 +1003,7 @@ func TestAllServerStepdown(t *testing.T) {
|
||||
tterm := uint64(3)
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
switch tt.state {
|
||||
case StateFollower:
|
||||
sm.becomeFollower(1, None)
|
||||
@ -1062,7 +1062,7 @@ func TestLeaderAppResp(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
// sm term is 1 after it becomes the leader.
|
||||
// thus the last log term must be 1 to be committed.
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.raftLog = &raftLog{
|
||||
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
|
||||
unstable: unstable{offset: 3},
|
||||
@ -1110,7 +1110,7 @@ func TestBcastBeat(t *testing.T) {
|
||||
}
|
||||
storage := NewMemoryStorage()
|
||||
storage.ApplySnapshot(s)
|
||||
sm := newRaft(1, nil, 10, 1, storage)
|
||||
sm := newRaft(1, nil, 10, 1, storage, 0)
|
||||
sm.Term = 1
|
||||
|
||||
sm.becomeCandidate()
|
||||
@ -1169,7 +1169,7 @@ func TestRecvMsgBeat(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
|
||||
sm.Term = 1
|
||||
sm.state = tt.state
|
||||
@ -1212,7 +1212,7 @@ func TestLeaderIncreaseNext(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.raftLog.append(previousEnts...)
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
@ -1236,7 +1236,7 @@ func TestRestore(t *testing.T) {
|
||||
}
|
||||
|
||||
storage := NewMemoryStorage()
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
||||
if ok := sm.restore(s); !ok {
|
||||
t.Fatal("restore fail, want succeed")
|
||||
}
|
||||
@ -1261,7 +1261,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) {
|
||||
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
|
||||
commit := uint64(1)
|
||||
storage := NewMemoryStorage()
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage)
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, storage, 0)
|
||||
sm.raftLog.append(previousEnts...)
|
||||
sm.raftLog.commitTo(commit)
|
||||
|
||||
@ -1302,7 +1302,7 @@ func TestProvideSnap(t *testing.T) {
|
||||
},
|
||||
}
|
||||
storage := NewMemoryStorage()
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, storage)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, storage, 0)
|
||||
sm.restore(s)
|
||||
|
||||
sm.becomeCandidate()
|
||||
@ -1333,7 +1333,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
|
||||
}
|
||||
m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
|
||||
|
||||
sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
sm := newRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
||||
sm.Step(m)
|
||||
|
||||
// TODO(bdarnell): what should this test?
|
||||
@ -1367,7 +1367,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
||||
// it appends the entry to log and sets pendingConf to be true.
|
||||
func TestStepConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
index := r.raftLog.lastIndex()
|
||||
@ -1385,7 +1385,7 @@ func TestStepConfig(t *testing.T) {
|
||||
// the proposal to noop and keep its original state.
|
||||
func TestStepIgnoreConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
|
||||
@ -1412,7 +1412,7 @@ func TestRecoverPendingConfig(t *testing.T) {
|
||||
{pb.EntryConfChange, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.appendEntry(pb.Entry{Type: tt.entType})
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
@ -1431,7 +1431,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
||||
t.Errorf("expect panic, but nothing happens")
|
||||
}
|
||||
}()
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||
r.becomeCandidate()
|
||||
@ -1441,7 +1441,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
||||
|
||||
// TestAddNode tests that addNode could update pendingConf and nodes correctly.
|
||||
func TestAddNode(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.pendingConf = true
|
||||
r.addNode(2)
|
||||
if r.pendingConf != false {
|
||||
@ -1457,7 +1457,7 @@ func TestAddNode(t *testing.T) {
|
||||
// TestRemoveNode tests that removeNode could update pendingConf, nodes and
|
||||
// and removed list correctly.
|
||||
func TestRemoveNode(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
|
||||
r.pendingConf = true
|
||||
r.removeNode(2)
|
||||
if r.pendingConf != false {
|
||||
@ -1481,7 +1481,7 @@ func TestPromotable(t *testing.T) {
|
||||
{[]uint64{2, 3}, false},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage())
|
||||
r := newRaft(id, tt.peers, 5, 1, NewMemoryStorage(), 0)
|
||||
if g := r.promotable(); g != tt.wp {
|
||||
t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
|
||||
}
|
||||
@ -1503,7 +1503,7 @@ func TestRaftNodes(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage())
|
||||
r := newRaft(1, tt.ids, 10, 1, NewMemoryStorage(), 0)
|
||||
if !reflect.DeepEqual(r.nodes(), tt.wids) {
|
||||
t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
|
||||
}
|
||||
@ -1515,7 +1515,7 @@ func ents(terms ...uint64) *raft {
|
||||
for i, term := range terms {
|
||||
storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
|
||||
}
|
||||
sm := newRaft(1, []uint64{}, 5, 1, storage)
|
||||
sm := newRaft(1, []uint64{}, 5, 1, storage, 0)
|
||||
sm.reset(0)
|
||||
return sm
|
||||
}
|
||||
@ -1543,7 +1543,7 @@ func newNetwork(peers ...Interface) *network {
|
||||
switch v := p.(type) {
|
||||
case nil:
|
||||
nstorage[id] = NewMemoryStorage()
|
||||
sm := newRaft(id, peerAddrs, 10, 1, nstorage[id])
|
||||
sm := newRaft(id, peerAddrs, 10, 1, nstorage[id], 0)
|
||||
npeers[id] = sm
|
||||
case *raft:
|
||||
v.id = id
|
||||
|
Loading…
x
Reference in New Issue
Block a user