Remove raft.loadEnts and the ents parameter to raft.RestartNode.

The initial entries are now provided via the Storage interface.
This commit is contained in:
Ben Darnell 2014-11-12 18:31:19 -05:00
parent 147fd614ce
commit 54b07d7974
7 changed files with 21 additions and 32 deletions

View File

@ -54,7 +54,8 @@ func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.S
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents, s) s.Append(ents)
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, s)
return id, n, s, w return id, n, s, w
} }

View File

@ -773,7 +773,8 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (ty
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents, s) s.Append(ents)
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, s)
return id, n, s, w return id, n, s, w
} }

View File

@ -64,17 +64,6 @@ func newLog(storage Storage) *raftLog {
return log return log
} }
func (l *raftLog) load(ents []pb.Entry) {
// TODO(bdarnell): does this method need to support other Storage impls or does it go away?
ms := l.storage.(*MemoryStorage)
if ms.offset != ents[0].Index {
panic("entries loaded don't match offset index")
}
ms.ents = ents
l.unstable = ms.offset + uint64(len(ents))
l.unstableEnts = nil
}
func (l *raftLog) String() string { func (l *raftLog) String() string {
return fmt.Sprintf("unstable=%d committed=%d applied=%d", l.unstable, l.committed, l.applied) return fmt.Sprintf("unstable=%d committed=%d applied=%d", l.unstable, l.committed, l.applied)

View File

@ -168,7 +168,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
// log. // log.
// TODO(bdarnell): remove args that are unnecessary with storage. // TODO(bdarnell): remove args that are unnecessary with storage.
// Maybe this function goes away and is replaced by StartNode with a non-empty Storage. // Maybe this function goes away and is replaced by StartNode with a non-empty Storage.
func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry, storage Storage) Node { func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, storage Storage) Node {
n := newNode() n := newNode()
r := newRaft(id, nil, election, heartbeat, storage) r := newRaft(id, nil, election, heartbeat, storage)
if snapshot != nil { if snapshot != nil {
@ -177,9 +177,6 @@ func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st p
if !isHardStateEqual(st, emptyState) { if !isHardStateEqual(st, emptyState) {
r.loadState(st) r.loadState(st)
} }
if len(ents) != 0 {
r.loadEnts(ents)
}
go n.run(r) go n.run(r)
return &n return &n
} }

View File

@ -233,7 +233,7 @@ func TestNodeRestart(t *testing.T) {
storage := NewMemoryStorage() storage := NewMemoryStorage()
storage.Append(entries) storage.Append(entries)
n := RestartNode(1, 10, 1, nil, st, nil, storage) n := RestartNode(1, 10, 1, nil, st, storage)
if g := <-n.Ready(); !reflect.DeepEqual(g, want) { if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
t.Errorf("g = %+v,\n w %+v", g, want) t.Errorf("g = %+v,\n w %+v", g, want)
} else { } else {

View File

@ -549,10 +549,6 @@ func (r *raft) promotable() bool {
return ok return ok
} }
func (r *raft) loadEnts(ents []pb.Entry) {
r.raftLog.load(ents)
}
func (r *raft) loadState(state pb.HardState) { func (r *raft) loadState(state pb.HardState) {
r.raftLog.committed = state.Commit r.raftLog.committed = state.Commit
r.Term = state.Term r.Term = state.Term

View File

@ -605,8 +605,9 @@ func TestFollowerCheckMsgApp(t *testing.T) {
{3, 3, true}, {3, 3, true},
} }
for i, tt := range tests { for i, tt := range tests {
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) storage := NewMemoryStorage()
r.loadEnts(ents) storage.Append(ents)
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
r.loadState(pb.HardState{Commit: 2}) r.loadState(pb.HardState{Commit: 2})
r.becomeFollower(2, 2) r.becomeFollower(2, 2)
@ -729,11 +730,13 @@ func TestLeaderSyncFollowerLog(t *testing.T) {
}, },
} }
for i, tt := range tests { for i, tt := range tests {
lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) leadStorage := NewMemoryStorage()
lead.loadEnts(ents) leadStorage.Append(ents)
lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage)
lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term}) lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) followerStorage := NewMemoryStorage()
follower.loadEnts(tt) followerStorage.Append(tt)
follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage)
follower.loadState(pb.HardState{Term: term - 1}) follower.loadState(pb.HardState{Term: term - 1})
// It is necessary to have a three-node cluster. // It is necessary to have a three-node cluster.
// The second may have more up-to-date log than the first one, so the // The second may have more up-to-date log than the first one, so the
@ -823,8 +826,9 @@ func TestVoter(t *testing.T) {
{[]pb.Entry{{}, {Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true}, {[]pb.Entry{{}, {Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
} }
for i, tt := range tests { for i, tt := range tests {
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) storage := NewMemoryStorage()
r.loadEnts(tt.ents) storage.Append(tt.ents)
r := newRaft(1, []uint64{1, 2}, 10, 1, storage)
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
@ -858,8 +862,9 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
{3, 3}, {3, 3},
} }
for i, tt := range tests { for i, tt := range tests {
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) storage := NewMemoryStorage()
r.loadEnts(ents) storage.Append(ents)
r := newRaft(1, []uint64{1, 2}, 10, 1, storage)
r.loadState(pb.HardState{Term: 2}) r.loadState(pb.HardState{Term: 2})
// become leader at term 3 // become leader at term 3
r.becomeCandidate() r.becomeCandidate()