raft: postpone MsgReadIndex until first commit in the term

Fixes #12680
This commit is contained in:
wpedrak 2021-03-11 15:39:41 +01:00
parent 456e129422
commit 758ff0163c
3 changed files with 62 additions and 18 deletions

View File

@ -183,6 +183,8 @@ type Node interface {
// Read state has a read index. Once the application advances further than the read // Read state has a read index. Once the application advances further than the read
// index, any linearizable read requests issued before the read request can be // index, any linearizable read requests issued before the read request can be
// processed safely. The read state will have the same rctx attached. // processed safely. The read state will have the same rctx attached.
// Note that request can be lost without notice, therefore it is user's job
// to ensure read index retries.
ReadIndex(ctx context.Context, rctx []byte) error ReadIndex(ctx context.Context, rctx []byte) error
// Status returns the current status of the raft state machine. // Status returns the current status of the raft state machine.

View File

@ -307,6 +307,12 @@ type raft struct {
step stepFunc step stepFunc
logger Logger logger Logger
// pendingReadIndexMessages is used to store messages of type MsgReadIndex
// that can't be answered as new leader didn't committed any log in
// current term. Those will be handled as fast as first log is committed in
// current term.
pendingReadIndexMessages []pb.Message
} }
func newRaft(c *Config) *raft { func newRaft(c *Config) *raft {
@ -1072,26 +1078,15 @@ func stepLeader(r *raft, m pb.Message) error {
return nil return nil
} }
// Reject read only request when this leader has not committed any log entry at its term. // Postpone read only request when this leader has not committed
// any log entry at its term.
if !r.committedEntryInCurrentTerm() { if !r.committedEntryInCurrentTerm() {
r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
return nil return nil
} }
// thinking: use an interally defined context instead of the user given context. sendMsgReadIndexResponse(r, m)
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r.readOnly.option {
// If more than the local vote is needed, go through a full broadcast.
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
// The local node automatically acks the request.
r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
}
return nil return nil
} }
@ -1256,6 +1251,9 @@ func stepLeader(r *raft, m pb.Message) error {
} }
if r.maybeCommit() { if r.maybeCommit() {
// committed index has progressed for the term, so it is safe
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend() r.bcastAppend()
} else if oldPaused { } else if oldPaused {
// If we were paused before, this node may be missing the // If we were paused before, this node may be missing the
@ -1805,3 +1803,35 @@ func numOfPendingConf(ents []pb.Entry) int {
} }
return n return n
} }
func releasePendingReadIndexMessages(r *raft) {
if !r.committedEntryInCurrentTerm() {
r.logger.Error("pending MsgReadIndex should be released only after first commit in current term")
return
}
msgs := r.pendingReadIndexMessages
r.pendingReadIndexMessages = nil
for _, m := range msgs {
sendMsgReadIndexResponse(r, m)
}
}
func sendMsgReadIndexResponse(r *raft, m pb.Message) {
// thinking: use an internally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r.readOnly.option {
// If more than the local vote is needed, go through a full broadcast.
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
// The local node automatically acks the request.
r.readOnly.recvAck(r.id, m.Entries[0].Data)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
}
}
}

View File

@ -2397,8 +2397,7 @@ func TestReadOnlyForNewLeader(t *testing.T) {
t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term) t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term)
} }
// Ensure peer a accepts read only request after it commits a entry at its term. // Ensure peer a processed postponed read only request after it committed an entry at its term.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(sm.readStates) != 1 { if len(sm.readStates) != 1 {
t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates)) t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates))
} }
@ -2409,6 +2408,19 @@ func TestReadOnlyForNewLeader(t *testing.T) {
if !bytes.Equal(rs.RequestCtx, wctx) { if !bytes.Equal(rs.RequestCtx, wctx) {
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx) t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
} }
// Ensure peer a accepts read only request after it committed an entry at its term.
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
if len(sm.readStates) != 2 {
t.Fatalf("len(readStates) = %d, want 2", len(sm.readStates))
}
rs = sm.readStates[1]
if rs.Index != windex {
t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
}
if !bytes.Equal(rs.RequestCtx, wctx) {
t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
}
} }
func TestLeaderAppResp(t *testing.T) { func TestLeaderAppResp(t *testing.T) {