From 616f39592086ac3c56591c22be91459d1ec324c6 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 11 Feb 2016 19:21:30 -0800 Subject: [PATCH] etcdserver: use fifo scheduler for applier --- etcdserver/server.go | 64 +++++++++++--------------------------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index c1208c605..8be2eb45a 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -39,6 +39,7 @@ import ( "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/runtime" + "github.com/coreos/etcd/pkg/schedule" "github.com/coreos/etcd/pkg/timeutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/wait" @@ -487,51 +488,6 @@ type etcdProgress struct { appliedi uint64 } -// startApplier buffers apply operations so raftNode won't block on sending -// new applies, timing out (since applies can be slow). The goroutine begins -// shutdown on close(s.done) and closes the returned channel when finished. -func (s *EtcdServer) startApplier(ep etcdProgress) <-chan struct{} { - donec := make(chan struct{}) - go func() { - defer close(donec) - pending := []apply{} - sdonec := s.done - apdonec := make(chan struct{}) - // serialized function - f := func(ap apply) { - s.applyAll(&ep, &ap) - select { - // snapshot requested via send() - case m := <-s.msgSnapC: - merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) - s.sendMergedSnap(merged) - default: - } - apdonec <- struct{}{} - } - for sdonec != nil || len(pending) > 0 { - select { - // launch if no pending apply packet, queue up the rest - case ap := <-s.r.apply(): - pending = append(pending, ap) - if len(pending) == 1 { - go f(pending[0]) - } - // pending apply serviced, schedule the next one - case <-apdonec: - pending = pending[1:] - if len(pending) != 0 { - go f(pending[0]) - } - // run() is finished; drain pending and exit - case <-sdonec: - sdonec = nil - } - } - }() - return donec -} - func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { @@ -540,14 +496,17 @@ func (s *EtcdServer) run() { s.r.start(s) // asynchronously accept apply packets, dispatch progress in-order - appdonec := s.startApplier(etcdProgress{ + sched := schedule.NewFIFOScheduler() + ep := etcdProgress{ confState: snap.Metadata.ConfState, snapi: snap.Metadata.Index, appliedi: snap.Metadata.Index, - }) + } defer func() { s.r.stop() + sched.Stop() + // kv, lessor and backend can be nil if running without v3 enabled // or running unit tests. if s.lessor != nil { @@ -560,7 +519,6 @@ func (s *EtcdServer) run() { s.be.Close() } close(s.done) - <-appdonec }() var expiredLeaseC <-chan []*lease.Lease @@ -570,6 +528,9 @@ func (s *EtcdServer) run() { for { select { + case ap := <-s.r.apply(): + f := func(context.Context) { s.applyAll(&ep, &ap) } + sched.Schedule(f) case leases := <-expiredLeaseC: go func() { for _, l := range leases { @@ -594,6 +555,13 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // storage, since the raft routine might be slower than apply routine. <-apply.raftDone s.triggerSnapshot(ep) + select { + // snapshot requested via send() + case m := <-s.msgSnapC: + merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) + s.sendMergedSnap(merged) + default: + } } func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {