From 0faae33aced16ab595237423058ef3de78496277 Mon Sep 17 00:00:00 2001 From: "swingbach@gmail.com" Date: Fri, 3 Jun 2016 23:20:10 +0800 Subject: [PATCH] raft: implemented read-only query when quorum check is on --- raft/node.go | 22 +++++++++++++- raft/node_test.go | 52 +++++++++++++++++++++++++++++++ raft/raft.go | 36 ++++++++++++++++++++++ raft/raft_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++ raft/raftpb/raft.pb.go | 6 ++++ raft/raftpb/raft.proto | 2 ++ 6 files changed, 186 insertions(+), 1 deletion(-) diff --git a/raft/node.go b/raft/node.go index 0ddcfd5f6..b70732818 100644 --- a/raft/node.go +++ b/raft/node.go @@ -60,6 +60,12 @@ type Ready struct { // HardState will be equal to empty state if there is no update. pb.HardState + // ReadState can be used for node to serve linearizable read requests locally + // when its applied index is greater than the index in ReadState. + // Note that the readState will be returned when raft receives msgReadIndex. + // The returned is only valid for the request that requested to read. + ReadState + // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. Entries []pb.Entry @@ -96,7 +102,7 @@ func IsEmptySnap(sp pb.Snapshot) bool { func (rd Ready) containsUpdates() bool { return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || - len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 + len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || rd.Index != None } // Node represents a node in a raft cluster. @@ -354,7 +360,10 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Metadata.Index } + r.msgs = nil + r.readState.Index = None + r.readState.RequestCtx = nil advancec = n.advancec case <-advancec: if prevHardSt.Commit != 0 { @@ -469,6 +478,10 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { } } +func (n *node) ReadIndex(ctx context.Context, id uint64, rctx []byte) error { + return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, From: id, Entries: []pb.Entry{{Data: rctx}}}) +} + func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { rd := Ready{ Entries: r.raftLog.unstableEntries(), @@ -484,5 +497,12 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot } + if r.readState.Index != None { + c := make([]byte, len(r.readState.RequestCtx)) + copy(c, r.readState.RequestCtx) + + rd.Index = r.readState.Index + rd.RequestCtx = c + } return rd } diff --git a/raft/node_test.go b/raft/node_test.go index d24b3c707..a88e3ff46 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -142,6 +142,58 @@ func TestNodePropose(t *testing.T) { } } +// TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft. +// It also ensures that ReadState can be read out through ready chan. +func TestNodeReadIndex(t *testing.T) { + msgs := []raftpb.Message{} + appendStep := func(r *raft, m raftpb.Message) { + msgs = append(msgs, m) + } + wreadIndex := uint64(1) + wrequestCtx := []byte("somedata") + + n := newNode() + s := NewMemoryStorage() + r := newTestRaft(1, []uint64{1}, 10, 1, s) + r.readState.Index = wreadIndex + r.readState.RequestCtx = wrequestCtx + go n.run(r) + n.Campaign(context.TODO()) + for { + rd := <-n.Ready() + if rd.Index != wreadIndex { + t.Errorf("ReadIndex = %d, want %d", rd.Index, wreadIndex) + } + + if !reflect.DeepEqual(rd.RequestCtx, wrequestCtx) { + t.Errorf("RequestCtx = %v, want %v", rd.RequestCtx, wrequestCtx) + } + + s.Append(rd.Entries) + + if rd.SoftState.Lead == r.id { + n.Advance() + break + } + n.Advance() + } + + r.step = appendStep + wrequestCtx = []byte("somedata2") + n.ReadIndex(context.TODO(), r.id, wrequestCtx) + n.Stop() + + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + } + if msgs[0].Type != raftpb.MsgReadIndex { + t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex) + } + if !reflect.DeepEqual(msgs[0].Entries[0].Data, wrequestCtx) { + t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx) + } +} + // TestNodeProposeConfig ensures that node.ProposeConfChange sends the given configuration proposal // to the underlying raft. func TestNodeProposeConfig(t *testing.T) { diff --git a/raft/raft.go b/raft/raft.go index 49067bd71..40b45a2af 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -133,12 +133,24 @@ func (c *Config) validate() error { return nil } +// ReadState provides state for read only query. +// It's caller's responsibility to send MsgReadIndex first before getting +// this state from ready, It's also caller's duty to differentiate if this +// state is what it requests through RequestCtx, eg. given a unique id as +// RequestCtx +type ReadState struct { + Index uint64 + RequestCtx []byte +} + type raft struct { id uint64 Term uint64 Vote uint64 + readState ReadState + // the log raftLog *raftLog @@ -208,6 +220,7 @@ func newRaft(c *Config) *raft { r := &raft{ id: c.ID, lead: None, + readState: ReadState{Index: None, RequestCtx: nil}, raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, @@ -642,6 +655,14 @@ func stepLeader(r *raft, m pb.Message) { r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) return + case pb.MsgReadIndex: + ri := None + if r.checkQuorum { + ri = r.raftLog.committed + } + + r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) + return } // All other message types require a progress for m.From (pr). @@ -822,6 +843,21 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgTimeoutNow: r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) r.campaign() + case pb.MsgReadIndex: + if r.lead == None { + r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) + return + } + m.To = r.lead + r.send(m) + case pb.MsgReadIndexResp: + if len(m.Entries) != 1 { + r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) + return + } + + r.readState.Index = m.Index + r.readState.RequestCtx = m.Entries[0].Data } } diff --git a/raft/raft_test.go b/raft/raft_test.go index f65fbc365..8318f6f78 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1405,6 +1405,75 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { } } +func TestReadIndexWithCheckQuorum(t *testing.T) { + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + + a.checkQuorum = true + b.checkQuorum = true + c.checkQuorum = true + + nt := newNetwork(a, b, c) + for i := 0; i < b.electionTimeout; i++ { + b.tick() + } + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + if a.state != StateLeader { + t.Fatalf("state = %s, want %s", a.state, StateLeader) + } + + tests := []struct { + sm *raft + proposals int + wri uint64 + wctx []byte + }{ + {b, 10, 11, []byte("ctx1")}, + {c, 10, 21, []byte("ctx2")}, + {b, 10, 31, []byte("ctx3")}, + {c, 10, 41, []byte("ctx4")}, + } + + for _, tt := range tests { + for j := 0; j < tt.proposals; j++ { + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + } + + nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) + + r := tt.sm + if r.readState.Index != tt.wri { + t.Errorf("readIndex = %d, want %d", r.readState.Index, tt.wri) + } + + if !bytes.Equal(r.readState.RequestCtx, tt.wctx) { + t.Errorf("requestCtx = %v, want %v", r.readState.RequestCtx, tt.wctx) + } + } +} + +func TestReadIndexWithoutCheckQuorum(t *testing.T) { + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + + nt := newNetwork(a, b, c) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + ctx := []byte("ctx1") + nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}}) + + if b.readState.Index != None { + t.Errorf("readIndex = %d, want %d", b.readState.Index, None) + } + + if !bytes.Equal(b.readState.RequestCtx, ctx) { + t.Errorf("requestCtx = %v, want %v", b.readState.RequestCtx, ctx) + } +} + func TestLeaderAppResp(t *testing.T) { // initial progress: match = 0; next = 3 tests := []struct { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index ca5d3f8db..479a1c683 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -90,6 +90,8 @@ const ( MsgCheckQuorum MessageType = 12 MsgTransferLeader MessageType = 13 MsgTimeoutNow MessageType = 14 + MsgReadIndex MessageType = 15 + MsgReadIndexResp MessageType = 16 ) var MessageType_name = map[int32]string{ @@ -108,6 +110,8 @@ var MessageType_name = map[int32]string{ 12: "MsgCheckQuorum", 13: "MsgTransferLeader", 14: "MsgTimeoutNow", + 15: "MsgReadIndex", + 16: "MsgReadIndexResp", } var MessageType_value = map[string]int32{ "MsgHup": 0, @@ -125,6 +129,8 @@ var MessageType_value = map[string]int32{ "MsgCheckQuorum": 12, "MsgTransferLeader": 13, "MsgTimeoutNow": 14, + "MsgReadIndex": 15, + "MsgReadIndexResp": 16, } func (x MessageType) Enum() *MessageType { diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 42f10d269..1948fc1e4 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -48,6 +48,8 @@ enum MessageType { MsgCheckQuorum = 12; MsgTransferLeader = 13; MsgTimeoutNow = 14; + MsgReadIndex = 15; + MsgReadIndexResp = 16; } message Message {