mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: Remove redundant raft.Commit
field.
Keeping this field in sync with `raft.raftLog.committed` was error-prone, so instead we synthesize the `HardState` on demand. Fixes #4278.
This commit is contained in:
parent
8199147cf8
commit
22925a1d2f
@ -160,7 +160,6 @@ func StartNode(c *Config, peers []Peer) Node {
|
||||
// TODO(bdarnell): These entries are still unstable; do we need to preserve
|
||||
// the invariant that committed < unstable?
|
||||
r.raftLog.committed = r.raftLog.lastIndex()
|
||||
r.Commit = r.raftLog.committed
|
||||
// Now apply them, mainly so that the application can call Campaign
|
||||
// immediately after StartNode in tests. Note that these nodes will
|
||||
// be added to raft twice: here and when the application's Ready
|
||||
@ -453,8 +452,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
|
||||
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
|
||||
rd.SoftState = softSt
|
||||
}
|
||||
if !isHardStateEqual(r.HardState, prevHardSt) {
|
||||
rd.HardState = r.HardState
|
||||
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
|
||||
rd.HardState = hardSt
|
||||
}
|
||||
if r.raftLog.unstable.snapshot != nil {
|
||||
rd.Snapshot = *r.raftLog.unstable.snapshot
|
||||
|
30
raft/raft.go
30
raft/raft.go
@ -138,10 +138,11 @@ func (c *Config) validate() error {
|
||||
}
|
||||
|
||||
type raft struct {
|
||||
pb.HardState
|
||||
|
||||
id uint64
|
||||
|
||||
Term uint64
|
||||
Vote uint64
|
||||
|
||||
// the log
|
||||
raftLog *raftLog
|
||||
|
||||
@ -239,6 +240,14 @@ func (r *raft) hasLeader() bool { return r.lead != None }
|
||||
|
||||
func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
|
||||
|
||||
func (r *raft) hardState() pb.HardState {
|
||||
return pb.HardState{
|
||||
Term: r.Term,
|
||||
Vote: r.Vote,
|
||||
Commit: r.raftLog.committed,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
|
||||
|
||||
func (r *raft) nodes() []uint64 {
|
||||
@ -295,7 +304,7 @@ func (r *raft) sendAppend(to uint64) {
|
||||
m.Snapshot = snapshot
|
||||
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
||||
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
||||
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
|
||||
pr.becomeSnapshot(sindex)
|
||||
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
||||
} else {
|
||||
@ -525,7 +534,6 @@ func (r *raft) Step(m pb.Message) error {
|
||||
if r.state != StateLeader {
|
||||
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
||||
r.campaign()
|
||||
r.Commit = r.raftLog.committed
|
||||
} else {
|
||||
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
|
||||
}
|
||||
@ -550,7 +558,6 @@ func (r *raft) Step(m pb.Message) error {
|
||||
return nil
|
||||
}
|
||||
r.step(r, m)
|
||||
r.Commit = r.raftLog.committed
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -742,8 +749,8 @@ func stepFollower(r *raft, m pb.Message) {
|
||||
}
|
||||
|
||||
func (r *raft) handleAppendEntries(m pb.Message) {
|
||||
if m.Index < r.Commit {
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.Commit})
|
||||
if m.Index < r.raftLog.committed {
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
||||
return
|
||||
}
|
||||
|
||||
@ -765,11 +772,11 @@ func (r *raft) handleSnapshot(m pb.Message) {
|
||||
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
||||
if r.restore(m.Snapshot) {
|
||||
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
|
||||
r.id, r.Commit, sindex, sterm)
|
||||
r.id, r.raftLog.committed, sindex, sterm)
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
||||
} else {
|
||||
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
||||
r.id, r.Commit, sindex, sterm)
|
||||
r.id, r.raftLog.committed, sindex, sterm)
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
||||
}
|
||||
}
|
||||
@ -782,13 +789,13 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
||||
}
|
||||
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
||||
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
||||
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
r.raftLog.commitTo(s.Metadata.Index)
|
||||
return false
|
||||
}
|
||||
|
||||
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
||||
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||
|
||||
r.raftLog.restore(s)
|
||||
r.prs = make(map[uint64]*Progress)
|
||||
@ -848,7 +855,6 @@ func (r *raft) loadState(state pb.HardState) {
|
||||
r.raftLog.committed = state.Commit
|
||||
r.Term = state.Term
|
||||
r.Vote = state.Vote
|
||||
r.Commit = state.Commit
|
||||
}
|
||||
|
||||
// isElectionTimeout returns true if r.elapsed is greater than the
|
||||
|
@ -957,8 +957,8 @@ func TestMsgAppRespWaitReset(t *testing.T) {
|
||||
Type: pb.MsgAppResp,
|
||||
Index: 1,
|
||||
})
|
||||
if sm.Commit != 1 {
|
||||
t.Fatalf("expected Commit to be 1, got %d", sm.Commit)
|
||||
if sm.raftLog.committed != 1 {
|
||||
t.Fatalf("expected committed to be 1, got %d", sm.raftLog.committed)
|
||||
}
|
||||
// Also consume the MsgApp messages that update Commit on the followers.
|
||||
sm.readMessages()
|
||||
@ -1046,7 +1046,7 @@ func TestRecvMsgVote(t *testing.T) {
|
||||
case StateLeader:
|
||||
sm.step = stepLeader
|
||||
}
|
||||
sm.HardState = pb.HardState{Vote: tt.voteFor}
|
||||
sm.Vote = tt.voteFor
|
||||
sm.raftLog = &raftLog{
|
||||
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
|
||||
unstable: unstable{offset: 3},
|
||||
|
@ -105,7 +105,7 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
|
||||
}
|
||||
// Set the initial hard and soft states after performing all initialization.
|
||||
rn.prevSoftSt = r.softState()
|
||||
rn.prevHardSt = r.HardState
|
||||
rn.prevHardSt = r.hardState()
|
||||
|
||||
return rn, nil
|
||||
}
|
||||
@ -191,7 +191,7 @@ func (rn *RawNode) HasReady() bool {
|
||||
if !r.softState().equal(rn.prevSoftSt) {
|
||||
return true
|
||||
}
|
||||
if !IsEmptyHardState(r.HardState) && !isHardStateEqual(r.HardState, rn.prevHardSt) {
|
||||
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
|
||||
return true
|
||||
}
|
||||
if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
|
||||
|
@ -33,7 +33,7 @@ type Status struct {
|
||||
// getStatus gets a copy of the current raft status.
|
||||
func getStatus(r *raft) Status {
|
||||
s := Status{ID: r.id}
|
||||
s.HardState = r.HardState
|
||||
s.HardState = r.hardState()
|
||||
s.SoftState = *r.softState()
|
||||
|
||||
s.Applied = r.raftLog.applied
|
||||
|
Loading…
x
Reference in New Issue
Block a user