From 9d05a0d95971337f67a74a2ef342a9d780d1ec08 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 22 Dec 2015 03:42:56 -0800 Subject: [PATCH] etcdserver: apply v3 database updates outside server event loop raft's applyc writes block on the server loop's database IO since the next applyc read must wait on the db operation to finish. Instead, stream applyc to a run queue outside the server loop. --- etcdserver/server.go | 76 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 62 insertions(+), 14 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index a86b1f157..496cbb9fa 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -474,35 +474,72 @@ type etcdProgress struct { appliedi uint64 } +// newApplier buffers apply operations and streams their results over an +// etcdProgress output channel. This is so raftNode won't block on sending +// new applies, timing out (since applies can be slow). The goroutine begins +// shutdown on close(s.done) and closes the etcdProgress channel when finished. +func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress { + etcdprogc := make(chan etcdProgress) + go func() { + defer close(etcdprogc) + pending := []apply{} + sdonec := s.done + apdonec := make(chan struct{}) + // serialized function + f := func(ap apply) { + s.applyAll(&ep, &ap) + etcdprogc <- ep + apdonec <- struct{}{} + } + for sdonec != nil || len(pending) > 0 { + select { + // launch if no pending apply packet, queue up the rest + case ap := <-s.r.apply(): + pending = append(pending, ap) + if len(pending) == 1 { + go f(pending[0]) + } + // pending apply serviced, schedule the next one + case <-apdonec: + pending = pending[1:] + if len(pending) != 0 { + go f(pending[0]) + } + // run() is finished; drain pending and exit + case <-sdonec: + sdonec = nil + } + } + }() + return etcdprogc +} + func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } s.r.start(s) - defer func() { - s.r.stop() - close(s.done) - }() + // asynchronously accept apply packets, dispatch progress in-order ep := etcdProgress{ confState: snap.Metadata.ConfState, snapi: snap.Metadata.Index, appliedi: snap.Metadata.Index, } + etcdprogc := s.newApplier(ep) + + defer func() { + s.r.stop() + close(s.done) + for range etcdprogc { + /* wait for outstanding applys */ + } + }() for { select { - case apply := <-s.r.apply(): - s.applySnapshot(&ep, &apply) - s.applyEntries(&ep, &apply) - // wait for the raft routine to finish the disk writes before triggering a - // snapshot. or applied index might be greater than the last index in raft - // storage, since the raft routine might be slower than apply routine. - apply.done <- struct{}{} - - // trigger snapshot - s.triggerSnapshot(&ep) + case ep = <-etcdprogc: case m := <-s.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) s.r.transport.SendSnapshot(merged) @@ -514,6 +551,17 @@ func (s *EtcdServer) run() { return } } + +} + +func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { + s.applySnapshot(ep, apply) + s.applyEntries(ep, apply) + // wait for the raft routine to finish the disk writes before triggering a + // snapshot. or applied index might be greater than the last index in raft + // storage, since the raft routine might be slower than apply routine. + apply.done <- struct{}{} + s.triggerSnapshot(ep) } func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {