mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: fix readindex
This commit is contained in:
parent
d3d954d659
commit
1c5754f02d
@ -1359,6 +1359,7 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
|
|||||||
}
|
}
|
||||||
func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
|
func (n *nodeRecorder) Status() raft.Status { return raft.Status{} }
|
||||||
func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
|
func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
|
||||||
|
func (n *nodeRecorder) ReadIndex(ctx context.Context, rctx []byte) error { return nil }
|
||||||
func (n *nodeRecorder) Advance() {}
|
func (n *nodeRecorder) Advance() {}
|
||||||
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
|
func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState {
|
||||||
n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
|
n.Record(testutil.Action{Name: "ApplyConfChange", Params: []interface{}{conf}})
|
||||||
|
16
raft/node.go
16
raft/node.go
@ -157,6 +157,18 @@ type Node interface {
|
|||||||
// in snapshots. Will never return nil; it returns a pointer only
|
// in snapshots. Will never return nil; it returns a pointer only
|
||||||
// to match MemoryStorage.Compact.
|
// to match MemoryStorage.Compact.
|
||||||
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
|
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
|
||||||
|
|
||||||
|
// ReadIndex request a read state. The read state will be set in the ready.
|
||||||
|
// Read state has a read index. Once the application advances further than the read
|
||||||
|
// index, any linearizable read requests issued before the read request can be
|
||||||
|
// processed safely. The read state will have the same rctx attached.
|
||||||
|
//
|
||||||
|
// Note: the current implementation depends on the leader lease. If the clock drift is unbounded,
|
||||||
|
// leader might keep the lease longer than it should (clock can move backward/pause without any bound).
|
||||||
|
// ReadIndex is not safe in that case.
|
||||||
|
// TODO: add clock drift bound into raft configuration.
|
||||||
|
ReadIndex(ctx context.Context, rctx []byte) error
|
||||||
|
|
||||||
// Status returns the current status of the raft state machine.
|
// Status returns the current status of the raft state machine.
|
||||||
Status() Status
|
Status() Status
|
||||||
// ReportUnreachable reports the given node is not reachable for the last send.
|
// ReportUnreachable reports the given node is not reachable for the last send.
|
||||||
@ -487,8 +499,8 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *node) ReadIndex(ctx context.Context, id uint64, rctx []byte) error {
|
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
|
||||||
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, From: id, Entries: []pb.Entry{{Data: rctx}}})
|
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
|
||||||
}
|
}
|
||||||
|
|
||||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
|
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
|
||||||
|
@ -180,7 +180,7 @@ func TestNodeReadIndex(t *testing.T) {
|
|||||||
|
|
||||||
r.step = appendStep
|
r.step = appendStep
|
||||||
wrequestCtx = []byte("somedata2")
|
wrequestCtx = []byte("somedata2")
|
||||||
n.ReadIndex(context.TODO(), r.id, wrequestCtx)
|
n.ReadIndex(context.TODO(), wrequestCtx)
|
||||||
n.Stop()
|
n.Stop()
|
||||||
|
|
||||||
if len(msgs) != 1 {
|
if len(msgs) != 1 {
|
||||||
|
@ -681,8 +681,12 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
if r.checkQuorum {
|
if r.checkQuorum {
|
||||||
ri = r.raftLog.committed
|
ri = r.raftLog.committed
|
||||||
}
|
}
|
||||||
|
if m.From == None || m.From == r.id { // from local member
|
||||||
|
r.readState.Index = ri
|
||||||
|
r.readState.RequestCtx = m.Entries[0].Data
|
||||||
|
} else {
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
|
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user