raftexample: Explicitly notify all committed entries are applied

This commit is contained in:
Shintaro Murakami 2021-02-18 21:51:46 +09:00
parent b67ed4e4aa
commit cc2b039817
3 changed files with 58 additions and 41 deletions

View File

@ -38,7 +38,7 @@ type kv struct {
Val string Val string
} }
func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *commit, errorC <-chan error) *kvstore {
s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter} s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter}
snapshot, err := s.loadSnapshot() snapshot, err := s.loadSnapshot()
if err != nil { if err != nil {
@ -70,9 +70,9 @@ func (s *kvstore) Propose(k string, v string) {
s.proposeC <- buf.String() s.proposeC <- buf.String()
} }
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {
for data := range commitC { for commit := range commitC {
if data == nil { if commit == nil {
// signaled to load snapshot // signaled to load snapshot
snapshot, err := s.loadSnapshot() snapshot, err := s.loadSnapshot()
if err != nil { if err != nil {
@ -87,14 +87,17 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
continue continue
} }
var dataKv kv for _, data := range commit.data {
dec := gob.NewDecoder(bytes.NewBufferString(*data)) var dataKv kv
if err := dec.Decode(&dataKv); err != nil { dec := gob.NewDecoder(bytes.NewBufferString(data))
log.Fatalf("raftexample: could not decode message (%v)", err) if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
} }
s.mu.Lock() close(commit.applyDoneC)
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
} }
if err, ok := <-errorC; ok { if err, ok := <-errorC; ok {
log.Fatal(err) log.Fatal(err)

View File

@ -37,11 +37,16 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type commit struct {
data []string
applyDoneC chan<- struct{}
}
// A key-value stream backed by raft // A key-value stream backed by raft
type raftNode struct { type raftNode struct {
proposeC <-chan string // proposed messages (k,v) proposeC <-chan string // proposed messages (k,v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *string // entries committed to log (k,v) commitC chan<- *commit // entries committed to log (k,v)
errorC chan<- error // errors from raft session errorC chan<- error // errors from raft session
id int // client ID for raft session id int // client ID for raft session
@ -80,9 +85,9 @@ var defaultSnapshotCount uint64 = 10000
// commit channel, followed by a nil message (to indicate the channel is // commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC. // current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) { confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {
commitC := make(chan *string) commitC := make(chan *commit)
errorC := make(chan error) errorC := make(chan error)
rc := &raftNode{ rc := &raftNode{
@ -143,7 +148,12 @@ func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
// publishEntries writes committed log entries to commit channel and returns // publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published. // whether all entries could be published.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) {
if len(ents) == 0 {
return nil, true
}
data := make([]string, 0, len(ents))
for i := range ents { for i := range ents {
switch ents[i].Type { switch ents[i].Type {
case raftpb.EntryNormal: case raftpb.EntryNormal:
@ -152,12 +162,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
break break
} }
s := string(ents[i].Data) s := string(ents[i].Data)
select { data = append(data, s)
case rc.commitC <- &s:
case <-rc.stopc:
return false
}
case raftpb.EntryConfChange: case raftpb.EntryConfChange:
var cc raftpb.ConfChange var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data) cc.Unmarshal(ents[i].Data)
@ -170,16 +175,28 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
case raftpb.ConfChangeRemoveNode: case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(rc.id) { if cc.NodeID == uint64(rc.id) {
log.Println("I've been removed from the cluster! Shutting down.") log.Println("I've been removed from the cluster! Shutting down.")
return false return nil, false
} }
rc.transport.RemovePeer(types.ID(cc.NodeID)) rc.transport.RemovePeer(types.ID(cc.NodeID))
} }
} }
// after commit, update appliedIndex
rc.appliedIndex = ents[i].Index
} }
return true
var applyDoneC chan struct{}
if len(data) > 0 {
applyDoneC := make(chan struct{}, 1)
select {
case rc.commitC <- &commit{data, applyDoneC}:
case <-rc.stopc:
return nil, false
}
}
// after commit, update appliedIndex
rc.appliedIndex = ents[len(ents)-1].Index
return applyDoneC, true
} }
func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
@ -346,18 +363,14 @@ func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
var snapshotCatchUpEntriesN uint64 = 10000 var snapshotCatchUpEntriesN uint64 = 10000
func (rc *raftNode) maybeTriggerSnapshot() { func (rc *raftNode) maybeTriggerSnapshot(applyDoneC <-chan struct{}) {
if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount { if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
return return
} }
// wait until all committed entries are applied // wait until all committed entries are applied
// commitC is synchronous channel, so consumption of the message signals if applyDoneC != nil {
// full application of previous messages <-applyDoneC
select {
case rc.commitC <- nil:
case <-rc.stopc:
return
} }
log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex) log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex)
@ -443,11 +456,12 @@ func (rc *raftNode) serveChannels() {
} }
rc.raftStorage.Append(rd.Entries) rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages) rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop() rc.stop()
return return
} }
rc.maybeTriggerSnapshot() rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance() rc.node.Advance()
case err := <-rc.transport.ErrorC: case err := <-rc.transport.ErrorC:

View File

@ -29,7 +29,7 @@ import (
type cluster struct { type cluster struct {
peers []string peers []string
commitC []<-chan *string commitC []<-chan *commit
errorC []<-chan error errorC []<-chan error
proposeC []chan string proposeC []chan string
confChangeC []chan raftpb.ConfChange confChangeC []chan raftpb.ConfChange
@ -44,7 +44,7 @@ func newCluster(n int) *cluster {
clus := &cluster{ clus := &cluster{
peers: peers, peers: peers,
commitC: make([]<-chan *string, len(peers)), commitC: make([]<-chan *commit, len(peers)),
errorC: make([]<-chan error, len(peers)), errorC: make([]<-chan error, len(peers)),
proposeC: make([]chan string, len(peers)), proposeC: make([]chan string, len(peers)),
confChangeC: make([]chan raftpb.ConfChange, len(peers)), confChangeC: make([]chan raftpb.ConfChange, len(peers)),
@ -94,14 +94,14 @@ func TestProposeOnCommit(t *testing.T) {
donec := make(chan struct{}) donec := make(chan struct{})
for i := range clus.peers { for i := range clus.peers {
// feedback for "n" committed entries, then update donec // feedback for "n" committed entries, then update donec
go func(pC chan<- string, cC <-chan *string, eC <-chan error) { go func(pC chan<- string, cC <-chan *commit, eC <-chan error) {
for n := 0; n < 100; n++ { for n := 0; n < 100; n++ {
s, ok := <-cC c, ok := <-cC
if !ok { if !ok {
pC = nil pC = nil
} }
select { select {
case pC <- *s: case pC <- c.data[0]:
continue continue
case err := <-eC: case err := <-eC:
t.Errorf("eC message (%v)", err) t.Errorf("eC message (%v)", err)
@ -143,7 +143,7 @@ func TestCloseProposerInflight(t *testing.T) {
}() }()
// wait for one message // wait for one message
if c, ok := <-clus.commitC[0]; *c != "foo" || !ok { if c, ok := <-clus.commitC[0]; !ok || c.data[0] != "foo" {
t.Fatalf("Commit failed") t.Fatalf("Commit failed")
} }
} }