diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 0531f7203..dd8da43f5 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1357,9 +1357,10 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error { n.Record(testutil.Action{Name: "Step"}) return nil } -func (n *nodeRecorder) Status() raft.Status { return raft.Status{} } -func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } -func (n *nodeRecorder) Advance() {} +func (n *nodeRecorder) Status() raft.Status { return raft.Status{} } +func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil } +func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil } +func (n *nodeRecorder) Advance() {} func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState { n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}}) return &raftpb.ConfState{} diff --git a/raft/node.go b/raft/node.go index 3f5f146f4..56b3bee19 100644 --- a/raft/node.go +++ b/raft/node.go @@ -157,6 +157,18 @@ type Node interface { // in snapshots. Will never return nil; it returns a pointer only // to match MemoryStorage.Compact. ApplyConfChange(cc pb.ConfChange) *pb.ConfState + + // ReadIndex request a read state. The read state will be set in the ready. + // Read state has a read index. Once the application advances further than the read + // index, any linearizable read requests issued before the read request can be + // processed safely. The read state will have the same rctx attached. + // + // Note: the current implementation depends on the leader lease. If the clock drift is unbounded, + // leader might keep the lease longer than it should (clock can move backward/pause without any bound). + // ReadIndex is not safe in that case. + // TODO: add clock drift bound into raft configuration. + ReadIndex(ctx context.Context, rctx []byte) error + // Status returns the current status of the raft state machine. Status() Status // ReportUnreachable reports the given node is not reachable for the last send. @@ -487,8 +499,8 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { } } -func (n *node) ReadIndex(ctx context.Context, id uint64, rctx []byte) error { - return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, From: id, Entries: []pb.Entry{{Data: rctx}}}) +func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { + return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) } func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { diff --git a/raft/node_test.go b/raft/node_test.go index ef27a4c4d..410cca98d 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -180,7 +180,7 @@ func TestNodeReadIndex(t *testing.T) { r.step = appendStep wrequestCtx = []byte("somedata2") - n.ReadIndex(context.TODO(), r.id, wrequestCtx) + n.ReadIndex(context.TODO(), wrequestCtx) n.Stop() if len(msgs) != 1 { diff --git a/raft/raft.go b/raft/raft.go index ab048b520..c4ba9e77e 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -681,8 +681,12 @@ func stepLeader(r *raft, m pb.Message) { if r.checkQuorum { ri = r.raftLog.committed } - - r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) + if m.From == None || m.From == r.id { // from local member + r.readState.Index = ri + r.readState.RequestCtx = m.Entries[0].Data + } else { + r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) + } return }