mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: add read index for RawNode
This commit is contained in:
parent
cfe717e926
commit
eeca614cd3
@ -66,6 +66,9 @@ func (rn *RawNode) commitReady(rd Ready) {
|
||||
if !IsEmptySnap(rd.Snapshot) {
|
||||
rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
|
||||
}
|
||||
if len(rd.ReadStates) != 0 {
|
||||
rn.raft.readStates = nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewRawNode returns a new RawNode given configuration and a list of raft peers.
|
||||
@ -205,6 +208,9 @@ func (rn *RawNode) HasReady() bool {
|
||||
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
|
||||
return true
|
||||
}
|
||||
if len(r.readStates) != 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -236,3 +242,11 @@ func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
|
||||
func (rn *RawNode) TransferLeader(transferee uint64) {
|
||||
_ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee})
|
||||
}
|
||||
|
||||
// ReadIndex requests a read state. The read state will be set in ready.
|
||||
// Read State has a read index. Once the application advances further than the read
|
||||
// index, any linearizable read requests issued before the read request can be
|
||||
// processed safely. The read state will have the same rctx attached.
|
||||
func (rn *RawNode) ReadIndex(rctx []byte) {
|
||||
_ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
|
||||
}
|
||||
|
@ -110,6 +110,66 @@ func TestRawNodeProposeAndConfChange(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRawNodeReadIndex ensures that Rawnode.ReadIndex sends the MsgReadIndex message
|
||||
// to the underlying raft. It also ensures that ReadState can be read out.
|
||||
func TestRawNodeReadIndex(t *testing.T) {
|
||||
msgs := []raftpb.Message{}
|
||||
appendStep := func(r *raft, m raftpb.Message) {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
|
||||
|
||||
s := NewMemoryStorage()
|
||||
c := newTestConfig(1, nil, 10, 1, s)
|
||||
rawNode, err := NewRawNode(c, []Peer{{ID: 1}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rawNode.raft.readStates = wrs
|
||||
// ensure the ReadStates can be read out
|
||||
hasReady := rawNode.HasReady()
|
||||
if hasReady != true {
|
||||
t.Errorf("HasReady() returns %t, want %t", hasReady, true)
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
if !reflect.DeepEqual(rd.ReadStates, wrs) {
|
||||
t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs)
|
||||
}
|
||||
s.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
// ensure raft.readStates is reset after advance
|
||||
if rawNode.raft.readStates != nil {
|
||||
t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil)
|
||||
}
|
||||
|
||||
wrequestCtx := []byte("somedata2")
|
||||
rawNode.Campaign()
|
||||
for {
|
||||
rd = rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
|
||||
if rd.SoftState.Lead == rawNode.raft.id {
|
||||
rawNode.Advance(rd)
|
||||
|
||||
// Once we are the leader, issue a ReadIndex request
|
||||
rawNode.raft.step = appendStep
|
||||
rawNode.ReadIndex(wrequestCtx)
|
||||
break
|
||||
}
|
||||
rawNode.Advance(rd)
|
||||
}
|
||||
// ensure that MsgReadIndex message is sent to the underlying raft
|
||||
if len(msgs) != 1 {
|
||||
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
|
||||
}
|
||||
if msgs[0].Type != raftpb.MsgReadIndex {
|
||||
t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
|
||||
}
|
||||
if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
|
||||
t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBlockProposal from node_test.go has no equivalent in rawNode because there is
|
||||
// no leader check in RawNode.
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user