mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14258 from ahrtr/3.4_postphone_read_index
[3.4] raft: postpone MsgReadIndex until first commit in the term
This commit is contained in:
commit
de2e8ccc78
@ -181,6 +181,8 @@ type Node interface {
|
||||
// Read state has a read index. Once the application advances further than the read
|
||||
// index, any linearizable read requests issued before the read request can be
|
||||
// processed safely. The read state will have the same rctx attached.
|
||||
// Note that request can be lost without notice, therefore it is user's job
|
||||
// to ensure read index retries.
|
||||
ReadIndex(ctx context.Context, rctx []byte) error
|
||||
|
||||
// Status returns the current status of the raft state machine.
|
||||
|
108
raft/raft.go
108
raft/raft.go
@ -318,6 +318,12 @@ type raft struct {
|
||||
step stepFunc
|
||||
|
||||
logger Logger
|
||||
|
||||
// pendingReadIndexMessages is used to store messages of type MsgReadIndex
|
||||
// that can't be answered as new leader didn't committed any log in
|
||||
// current term. Those will be handled as fast as first log is committed in
|
||||
// current term.
|
||||
pendingReadIndexMessages []pb.Message
|
||||
}
|
||||
|
||||
func newRaft(c *Config) *raft {
|
||||
@ -1079,39 +1085,23 @@ func stepLeader(r *raft, m pb.Message) error {
|
||||
r.bcastAppend()
|
||||
return nil
|
||||
case pb.MsgReadIndex:
|
||||
// If more than the local vote is needed, go through a full broadcast,
|
||||
// otherwise optimize.
|
||||
if !r.prs.IsSingleton() {
|
||||
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
|
||||
// Reject read only request when this leader has not committed any log entry at its term.
|
||||
return nil
|
||||
}
|
||||
|
||||
// thinking: use an interally defined context instead of the user given context.
|
||||
// We can express this in terms of the term and index instead of a user-supplied value.
|
||||
// This would allow multiple reads to piggyback on the same message.
|
||||
switch r.readOnly.option {
|
||||
case ReadOnlySafe:
|
||||
r.readOnly.addRequest(r.raftLog.committed, m)
|
||||
// The local node automatically acks the request.
|
||||
r.readOnly.recvAck(r.id, m.Entries[0].Data)
|
||||
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
||||
case ReadOnlyLeaseBased:
|
||||
ri := r.raftLog.committed
|
||||
if m.From == None || m.From == r.id { // from local member
|
||||
r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data})
|
||||
} else {
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
|
||||
}
|
||||
}
|
||||
} else { // only one voting member (the leader) in the cluster
|
||||
if m.From == None || m.From == r.id { // from leader itself
|
||||
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
|
||||
} else { // from learner member
|
||||
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: r.raftLog.committed, Entries: m.Entries})
|
||||
// only one voting member (the leader) in the cluster
|
||||
if r.prs.IsSingleton() {
|
||||
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
|
||||
r.send(resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Postpone read only request when this leader has not committed
|
||||
// any log entry at its term.
|
||||
if !r.committedEntryInCurrentTerm() {
|
||||
r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
|
||||
return nil
|
||||
}
|
||||
|
||||
sendMsgReadIndexResponse(r, m)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1158,6 +1148,9 @@ func stepLeader(r *raft, m pb.Message) error {
|
||||
}
|
||||
|
||||
if r.maybeCommit() {
|
||||
// committed index has progressed for the term, so it is safe
|
||||
// to respond to pending read index requests
|
||||
releasePendingReadIndexMessages(r)
|
||||
r.bcastAppend()
|
||||
} else if oldPaused {
|
||||
// If we were paused before, this node may be missing the
|
||||
@ -1602,6 +1595,29 @@ func (r *raft) abortLeaderTransfer() {
|
||||
r.leadTransferee = None
|
||||
}
|
||||
|
||||
// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
|
||||
func (r *raft) committedEntryInCurrentTerm() bool {
|
||||
return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
|
||||
}
|
||||
|
||||
// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
|
||||
// itself, a blank value will be returned.
|
||||
func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
|
||||
if req.From == None || req.From == r.id {
|
||||
r.readStates = append(r.readStates, ReadState{
|
||||
Index: readIndex,
|
||||
RequestCtx: req.Entries[0].Data,
|
||||
})
|
||||
return pb.Message{}
|
||||
}
|
||||
return pb.Message{
|
||||
Type: pb.MsgReadIndexResp,
|
||||
To: req.From,
|
||||
Index: readIndex,
|
||||
Entries: req.Entries,
|
||||
}
|
||||
}
|
||||
|
||||
// increaseUncommittedSize computes the size of the proposed entries and
|
||||
// determines whether they would push leader over its maxUncommittedSize limit.
|
||||
// If the new entries would exceed the limit, the method returns false. If not,
|
||||
@ -1654,3 +1670,35 @@ func numOfPendingConf(ents []pb.Entry) int {
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func releasePendingReadIndexMessages(r *raft) {
|
||||
if !r.committedEntryInCurrentTerm() {
|
||||
r.logger.Error("pending MsgReadIndex should be released only after first commit in current term")
|
||||
return
|
||||
}
|
||||
|
||||
msgs := r.pendingReadIndexMessages
|
||||
r.pendingReadIndexMessages = nil
|
||||
|
||||
for _, m := range msgs {
|
||||
sendMsgReadIndexResponse(r, m)
|
||||
}
|
||||
}
|
||||
|
||||
func sendMsgReadIndexResponse(r *raft, m pb.Message) {
|
||||
// thinking: use an internally defined context instead of the user given context.
|
||||
// We can express this in terms of the term and index instead of a user-supplied value.
|
||||
// This would allow multiple reads to piggyback on the same message.
|
||||
switch r.readOnly.option {
|
||||
// If more than the local vote is needed, go through a full broadcast.
|
||||
case ReadOnlySafe:
|
||||
r.readOnly.addRequest(r.raftLog.committed, m)
|
||||
// The local node automatically acks the request.
|
||||
r.readOnly.recvAck(r.id, m.Entries[0].Data)
|
||||
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
||||
case ReadOnlyLeaseBased:
|
||||
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
|
||||
r.send(resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2384,8 +2384,7 @@ func TestReadOnlyForNewLeader(t *testing.T) {
|
||||
t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term)
|
||||
}
|
||||
|
||||
// Ensure peer a accepts read only request after it commits a entry at its term.
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
|
||||
// Ensure peer a processed postponed read only request after it committed an entry at its term.
|
||||
if len(sm.readStates) != 1 {
|
||||
t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates))
|
||||
}
|
||||
@ -2396,6 +2395,19 @@ func TestReadOnlyForNewLeader(t *testing.T) {
|
||||
if !bytes.Equal(rs.RequestCtx, wctx) {
|
||||
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
|
||||
}
|
||||
|
||||
// Ensure peer a accepts read only request after it committed an entry at its term.
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
|
||||
if len(sm.readStates) != 2 {
|
||||
t.Fatalf("len(readStates) = %d, want 2", len(sm.readStates))
|
||||
}
|
||||
rs = sm.readStates[1]
|
||||
if rs.Index != windex {
|
||||
t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
|
||||
}
|
||||
if !bytes.Equal(rs.RequestCtx, wctx) {
|
||||
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaderAppResp(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user