etcdserver: use fifo scheduler for applier

This commit is contained in:
Anthony Romano 2016-02-11 19:21:30 -08:00
parent a78826e025
commit 616f395920

View File

@ -39,6 +39,7 @@ import (
"github.com/coreos/etcd/pkg/idutil"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/schedule"
"github.com/coreos/etcd/pkg/timeutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/pkg/wait"
@ -487,51 +488,6 @@ type etcdProgress struct {
appliedi uint64
}
// startApplier buffers apply operations so raftNode won't block on sending
// new applies, timing out (since applies can be slow). The goroutine begins
// shutdown on close(s.done) and closes the returned channel when finished.
func (s *EtcdServer) startApplier(ep etcdProgress) <-chan struct{} {
donec := make(chan struct{})
go func() {
defer close(donec)
pending := []apply{}
sdonec := s.done
apdonec := make(chan struct{})
// serialized function
f := func(ap apply) {
s.applyAll(&ep, &ap)
select {
// snapshot requested via send()
case m := <-s.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
apdonec <- struct{}{}
}
for sdonec != nil || len(pending) > 0 {
select {
// launch if no pending apply packet, queue up the rest
case ap := <-s.r.apply():
pending = append(pending, ap)
if len(pending) == 1 {
go f(pending[0])
}
// pending apply serviced, schedule the next one
case <-apdonec:
pending = pending[1:]
if len(pending) != 0 {
go f(pending[0])
}
// run() is finished; drain pending and exit
case <-sdonec:
sdonec = nil
}
}
}()
return donec
}
func (s *EtcdServer) run() {
snap, err := s.r.raftStorage.Snapshot()
if err != nil {
@ -540,14 +496,17 @@ func (s *EtcdServer) run() {
s.r.start(s)
// asynchronously accept apply packets, dispatch progress in-order
appdonec := s.startApplier(etcdProgress{
sched := schedule.NewFIFOScheduler()
ep := etcdProgress{
confState: snap.Metadata.ConfState,
snapi: snap.Metadata.Index,
appliedi: snap.Metadata.Index,
})
}
defer func() {
s.r.stop()
sched.Stop()
// kv, lessor and backend can be nil if running without v3 enabled
// or running unit tests.
if s.lessor != nil {
@ -560,7 +519,6 @@ func (s *EtcdServer) run() {
s.be.Close()
}
close(s.done)
<-appdonec
}()
var expiredLeaseC <-chan []*lease.Lease
@ -570,6 +528,9 @@ func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
go func() {
for _, l := range leases {
@ -594,6 +555,13 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// storage, since the raft routine might be slower than apply routine.
<-apply.raftDone
s.triggerSnapshot(ep)
select {
// snapshot requested via send()
case m := <-s.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
}
}
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {