diff --git a/etcdserver/server.go b/etcdserver/server.go index a86b1f157..496cbb9fa 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -474,35 +474,72 @@ type etcdProgress struct { appliedi uint64 } +// newApplier buffers apply operations and streams their results over an +// etcdProgress output channel. This is so raftNode won't block on sending +// new applies, timing out (since applies can be slow). The goroutine begins +// shutdown on close(s.done) and closes the etcdProgress channel when finished. +func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress { + etcdprogc := make(chan etcdProgress) + go func() { + defer close(etcdprogc) + pending := []apply{} + sdonec := s.done + apdonec := make(chan struct{}) + // serialized function + f := func(ap apply) { + s.applyAll(&ep, &ap) + etcdprogc <- ep + apdonec <- struct{}{} + } + for sdonec != nil || len(pending) > 0 { + select { + // launch if no pending apply packet, queue up the rest + case ap := <-s.r.apply(): + pending = append(pending, ap) + if len(pending) == 1 { + go f(pending[0]) + } + // pending apply serviced, schedule the next one + case <-apdonec: + pending = pending[1:] + if len(pending) != 0 { + go f(pending[0]) + } + // run() is finished; drain pending and exit + case <-sdonec: + sdonec = nil + } + } + }() + return etcdprogc +} + func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } s.r.start(s) - defer func() { - s.r.stop() - close(s.done) - }() + // asynchronously accept apply packets, dispatch progress in-order ep := etcdProgress{ confState: snap.Metadata.ConfState, snapi: snap.Metadata.Index, appliedi: snap.Metadata.Index, } + etcdprogc := s.newApplier(ep) + + defer func() { + s.r.stop() + close(s.done) + for range etcdprogc { + /* wait for outstanding applys */ + } + }() for { select { - case apply := <-s.r.apply(): - s.applySnapshot(&ep, &apply) - s.applyEntries(&ep, &apply) - // wait for the raft routine to finish the disk writes before triggering a - // snapshot. or applied index might be greater than the last index in raft - // storage, since the raft routine might be slower than apply routine. - apply.done <- struct{}{} - - // trigger snapshot - s.triggerSnapshot(&ep) + case ep = <-etcdprogc: case m := <-s.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) s.r.transport.SendSnapshot(merged) @@ -514,6 +551,17 @@ func (s *EtcdServer) run() { return } } + +} + +func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { + s.applySnapshot(ep, apply) + s.applyEntries(ep, apply) + // wait for the raft routine to finish the disk writes before triggering a + // snapshot. or applied index might be greater than the last index in raft + // storage, since the raft routine might be slower than apply routine. + apply.done <- struct{}{} + s.triggerSnapshot(ep) } func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {