mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: cleanup read index code (#11528)
Signed-off-by: qupeng <qupeng@pingcap.com>
This commit is contained in:
parent
3f4f7c5a6c
commit
6f850a65a1
59
raft/raft.go
59
raft/raft.go
@ -1078,11 +1078,16 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
r.bcastAppend()
|
r.bcastAppend()
|
||||||
return nil
|
return nil
|
||||||
case pb.MsgReadIndex:
|
case pb.MsgReadIndex:
|
||||||
// If more than the local vote is needed, go through a full broadcast,
|
// only one voting member (the leader) in the cluster
|
||||||
// otherwise optimize.
|
if r.prs.IsSingleton() {
|
||||||
if !r.prs.IsSingleton() {
|
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
|
||||||
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
|
r.send(resp)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Reject read only request when this leader has not committed any log entry at its term.
|
// Reject read only request when this leader has not committed any log entry at its term.
|
||||||
|
if !r.committedEntryInCurrentTerm() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1090,27 +1095,17 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
// We can express this in terms of the term and index instead of a user-supplied value.
|
// We can express this in terms of the term and index instead of a user-supplied value.
|
||||||
// This would allow multiple reads to piggyback on the same message.
|
// This would allow multiple reads to piggyback on the same message.
|
||||||
switch r.readOnly.option {
|
switch r.readOnly.option {
|
||||||
|
// If more than the local vote is needed, go through a full broadcast.
|
||||||
case ReadOnlySafe:
|
case ReadOnlySafe:
|
||||||
r.readOnly.addRequest(r.raftLog.committed, m)
|
r.readOnly.addRequest(r.raftLog.committed, m)
|
||||||
// The local node automatically acks the request.
|
// The local node automatically acks the request.
|
||||||
r.readOnly.recvAck(r.id, m.Entries[0].Data)
|
r.readOnly.recvAck(r.id, m.Entries[0].Data)
|
||||||
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
||||||
case ReadOnlyLeaseBased:
|
case ReadOnlyLeaseBased:
|
||||||
ri := r.raftLog.committed
|
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
|
||||||
if m.From == None || m.From == r.id { // from local member
|
r.send(resp)
|
||||||
r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data})
|
|
||||||
} else {
|
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // only one voting member (the leader) in the cluster
|
|
||||||
if m.From == None || m.From == r.id { // from leader itself
|
|
||||||
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
|
|
||||||
} else { // from learner member
|
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1200,11 +1195,8 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
|
|
||||||
rss := r.readOnly.advance(m)
|
rss := r.readOnly.advance(m)
|
||||||
for _, rs := range rss {
|
for _, rs := range rss {
|
||||||
req := rs.req
|
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
|
||||||
if req.From == None || req.From == r.id { // from local member
|
r.send(resp)
|
||||||
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
|
|
||||||
} else {
|
|
||||||
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case pb.MsgSnapStatus:
|
case pb.MsgSnapStatus:
|
||||||
@ -1601,6 +1593,29 @@ func (r *raft) abortLeaderTransfer() {
|
|||||||
r.leadTransferee = None
|
r.leadTransferee = None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
|
||||||
|
func (r *raft) committedEntryInCurrentTerm() bool {
|
||||||
|
return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
|
||||||
|
}
|
||||||
|
|
||||||
|
// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
|
||||||
|
// itself, a blank value will be returned.
|
||||||
|
func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
|
||||||
|
if req.From == None || req.From == r.id {
|
||||||
|
r.readStates = append(r.readStates, ReadState{
|
||||||
|
Index: readIndex,
|
||||||
|
RequestCtx: req.Entries[0].Data,
|
||||||
|
})
|
||||||
|
return pb.Message{}
|
||||||
|
}
|
||||||
|
return pb.Message{
|
||||||
|
Type: pb.MsgReadIndexResp,
|
||||||
|
To: req.From,
|
||||||
|
Index: readIndex,
|
||||||
|
Entries: req.Entries,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// increaseUncommittedSize computes the size of the proposed entries and
|
// increaseUncommittedSize computes the size of the proposed entries and
|
||||||
// determines whether they would push leader over its maxUncommittedSize limit.
|
// determines whether they would push leader over its maxUncommittedSize limit.
|
||||||
// If the new entries would exceed the limit, the method returns false. If not,
|
// If the new entries would exceed the limit, the method returns false. If not,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user