From 9d05a0d95971337f67a74a2ef342a9d780d1ec08 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 22 Dec 2015 03:42:56 -0800 Subject: [PATCH 1/2] etcdserver: apply v3 database updates outside server event loop raft's applyc writes block on the server loop's database IO since the next applyc read must wait on the db operation to finish. Instead, stream applyc to a run queue outside the server loop. --- etcdserver/server.go | 76 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index a86b1f157..496cbb9fa 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -474,35 +474,72 @@ type etcdProgress struct { appliedi uint64 } +// newApplier buffers apply operations and streams their results over an +// etcdProgress output channel. This is so raftNode won't block on sending +// new applies, timing out (since applies can be slow). The goroutine begins +// shutdown on close(s.done) and closes the etcdProgress channel when finished. +func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress { + etcdprogc := make(chan etcdProgress) + go func() { + defer close(etcdprogc) + pending := []apply{} + sdonec := s.done + apdonec := make(chan struct{}) + // serialized function + f := func(ap apply) { + s.applyAll(&ep, &ap) + etcdprogc <- ep + apdonec <- struct{}{} + } + for sdonec != nil || len(pending) > 0 { + select { + // launch if no pending apply packet, queue up the rest + case ap := <-s.r.apply(): + pending = append(pending, ap) + if len(pending) == 1 { + go f(pending[0]) + } + // pending apply serviced, schedule the next one + case <-apdonec: + pending = pending[1:] + if len(pending) != 0 { + go f(pending[0]) + } + // run() is finished; drain pending and exit + case <-sdonec: + sdonec = nil + } + } + }() + return etcdprogc +} + func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } s.r.start(s) - defer func() { - s.r.stop() - close(s.done) - }() + // asynchronously accept apply packets, dispatch progress in-order ep := etcdProgress{ confState: snap.Metadata.ConfState, snapi: snap.Metadata.Index, appliedi: snap.Metadata.Index, } + etcdprogc := s.newApplier(ep) + + defer func() { + s.r.stop() + close(s.done) + for range etcdprogc { + /* wait for outstanding applys */ + } + }() for { select { - case apply := <-s.r.apply(): - s.applySnapshot(&ep, &apply) - s.applyEntries(&ep, &apply) - // wait for the raft routine to finish the disk writes before triggering a - // snapshot. or applied index might be greater than the last index in raft - // storage, since the raft routine might be slower than apply routine. - apply.done <- struct{}{} - - // trigger snapshot - s.triggerSnapshot(&ep) + case ep = <-etcdprogc: case m := <-s.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) s.r.transport.SendSnapshot(merged) @@ -514,6 +551,17 @@ func (s *EtcdServer) run() { return } } + +} + +func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { + s.applySnapshot(ep, apply) + s.applyEntries(ep, apply) + // wait for the raft routine to finish the disk writes before triggering a + // snapshot. or applied index might be greater than the last index in raft + // storage, since the raft routine might be slower than apply routine. + apply.done <- struct{}{} + s.triggerSnapshot(ep) } func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { From aca0c466ed9b72ac5779c13cde6301fa7816da23 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 22 Dec 2015 03:40:39 -0800 Subject: [PATCH 2/2] etcdserver: asynchronously notify applier when raft writes finish The raft loop would block on the applier's done channel after persisting the raft messages; the latency could cause dropped network messages. Instead, asynchronously notify the applier with a buffered channel when the raft writes complete. --- etcdserver/raft.go | 23 ++++++++++------------- etcdserver/server.go | 2 +- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 91d69ec91..3e1570fef 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" @@ -31,8 +32,6 @@ import ( "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" - - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" ) const ( @@ -76,13 +75,14 @@ type RaftTimer interface { Term() uint64 } -// apply contains entries, snapshot be applied. -// After applied all the items, the application needs -// to send notification to done chan. +// apply contains entries, snapshot to be applied. Once +// an apply is consumed, the entries will be persisted to +// to raft storage concurrently; the application must read +// raftDone before assuming the raft messages are stable. type apply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - done chan struct{} + raftDone <-chan struct{} // rx {} after raft has persisted messages } type raftNode struct { @@ -134,6 +134,7 @@ func (r *raftNode) start(s *EtcdServer) { var syncC <-chan time.Time defer r.onStop() + for { select { case <-r.ticker: @@ -158,10 +159,11 @@ func (r *raftNode) start(s *EtcdServer) { } } + raftDone := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, - done: make(chan struct{}), + raftDone: raftDone, } select { @@ -183,12 +185,7 @@ func (r *raftNode) start(s *EtcdServer) { r.raftStorage.Append(rd.Entries) r.s.send(rd.Messages) - - select { - case <-ap.done: - case <-r.stopped: - return - } + raftDone <- struct{}{} r.Advance() case <-syncC: r.s.sync(r.s.cfg.ReqTimeout()) diff --git a/etcdserver/server.go b/etcdserver/server.go index 496cbb9fa..23521e759 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -560,7 +560,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. - apply.done <- struct{}{} + <-apply.raftDone s.triggerSnapshot(ep) }