From aca0c466ed9b72ac5779c13cde6301fa7816da23 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 22 Dec 2015 03:40:39 -0800 Subject: [PATCH] etcdserver: asynchronously notify applier when raft writes finish The raft loop would block on the applier's done channel after persisting the raft messages; the latency could cause dropped network messages. Instead, asynchronously notify the applier with a buffered channel when the raft writes complete. --- etcdserver/raft.go | 23 ++++++++++------------- etcdserver/server.go | 2 +- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 91d69ec91..3e1570fef 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" @@ -31,8 +32,6 @@ import ( "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" - - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" ) const ( @@ -76,13 +75,14 @@ type RaftTimer interface { Term() uint64 } -// apply contains entries, snapshot be applied. -// After applied all the items, the application needs -// to send notification to done chan. +// apply contains entries, snapshot to be applied. Once +// an apply is consumed, the entries will be persisted to +// to raft storage concurrently; the application must read +// raftDone before assuming the raft messages are stable. type apply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - done chan struct{} + raftDone <-chan struct{} // rx {} after raft has persisted messages } type raftNode struct { @@ -134,6 +134,7 @@ func (r *raftNode) start(s *EtcdServer) { var syncC <-chan time.Time defer r.onStop() + for { select { case <-r.ticker: @@ -158,10 +159,11 @@ func (r *raftNode) start(s *EtcdServer) { } } + raftDone := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, - done: make(chan struct{}), + raftDone: raftDone, } select { @@ -183,12 +185,7 @@ func (r *raftNode) start(s *EtcdServer) { r.raftStorage.Append(rd.Entries) r.s.send(rd.Messages) - - select { - case <-ap.done: - case <-r.stopped: - return - } + raftDone <- struct{}{} r.Advance() case <-syncC: r.s.sync(r.s.cfg.ReqTimeout()) diff --git a/etcdserver/server.go b/etcdserver/server.go index 496cbb9fa..23521e759 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -560,7 +560,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. - apply.done <- struct{}{} + <-apply.raftDone s.triggerSnapshot(ep) }