diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 91d69ec91..3e1570fef 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -23,6 +23,7 @@ import ( "sync/atomic" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" @@ -31,8 +32,6 @@ import ( "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" - - "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" ) const ( @@ -76,13 +75,14 @@ type RaftTimer interface { Term() uint64 } -// apply contains entries, snapshot be applied. -// After applied all the items, the application needs -// to send notification to done chan. +// apply contains entries, snapshot to be applied. Once +// an apply is consumed, the entries will be persisted to +// to raft storage concurrently; the application must read +// raftDone before assuming the raft messages are stable. type apply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - done chan struct{} + raftDone <-chan struct{} // rx {} after raft has persisted messages } type raftNode struct { @@ -134,6 +134,7 @@ func (r *raftNode) start(s *EtcdServer) { var syncC <-chan time.Time defer r.onStop() + for { select { case <-r.ticker: @@ -158,10 +159,11 @@ func (r *raftNode) start(s *EtcdServer) { } } + raftDone := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, - done: make(chan struct{}), + raftDone: raftDone, } select { @@ -183,12 +185,7 @@ func (r *raftNode) start(s *EtcdServer) { r.raftStorage.Append(rd.Entries) r.s.send(rd.Messages) - - select { - case <-ap.done: - case <-r.stopped: - return - } + raftDone <- struct{}{} r.Advance() case <-syncC: r.s.sync(r.s.cfg.ReqTimeout()) diff --git a/etcdserver/server.go b/etcdserver/server.go index a86b1f157..23521e759 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -474,35 +474,72 @@ type etcdProgress struct { appliedi uint64 } +// newApplier buffers apply operations and streams their results over an +// etcdProgress output channel. This is so raftNode won't block on sending +// new applies, timing out (since applies can be slow). The goroutine begins +// shutdown on close(s.done) and closes the etcdProgress channel when finished. +func (s *EtcdServer) newApplier(ep etcdProgress) <-chan etcdProgress { + etcdprogc := make(chan etcdProgress) + go func() { + defer close(etcdprogc) + pending := []apply{} + sdonec := s.done + apdonec := make(chan struct{}) + // serialized function + f := func(ap apply) { + s.applyAll(&ep, &ap) + etcdprogc <- ep + apdonec <- struct{}{} + } + for sdonec != nil || len(pending) > 0 { + select { + // launch if no pending apply packet, queue up the rest + case ap := <-s.r.apply(): + pending = append(pending, ap) + if len(pending) == 1 { + go f(pending[0]) + } + // pending apply serviced, schedule the next one + case <-apdonec: + pending = pending[1:] + if len(pending) != 0 { + go f(pending[0]) + } + // run() is finished; drain pending and exit + case <-sdonec: + sdonec = nil + } + } + }() + return etcdprogc +} + func (s *EtcdServer) run() { snap, err := s.r.raftStorage.Snapshot() if err != nil { plog.Panicf("get snapshot from raft storage error: %v", err) } s.r.start(s) - defer func() { - s.r.stop() - close(s.done) - }() + // asynchronously accept apply packets, dispatch progress in-order ep := etcdProgress{ confState: snap.Metadata.ConfState, snapi: snap.Metadata.Index, appliedi: snap.Metadata.Index, } + etcdprogc := s.newApplier(ep) + + defer func() { + s.r.stop() + close(s.done) + for range etcdprogc { + /* wait for outstanding applys */ + } + }() for { select { - case apply := <-s.r.apply(): - s.applySnapshot(&ep, &apply) - s.applyEntries(&ep, &apply) - // wait for the raft routine to finish the disk writes before triggering a - // snapshot. or applied index might be greater than the last index in raft - // storage, since the raft routine might be slower than apply routine. - apply.done <- struct{}{} - - // trigger snapshot - s.triggerSnapshot(&ep) + case ep = <-etcdprogc: case m := <-s.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) s.r.transport.SendSnapshot(merged) @@ -514,6 +551,17 @@ func (s *EtcdServer) run() { return } } + +} + +func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { + s.applySnapshot(ep, apply) + s.applyEntries(ep, apply) + // wait for the raft routine to finish the disk writes before triggering a + // snapshot. or applied index might be greater than the last index in raft + // storage, since the raft routine might be slower than apply routine. + <-apply.raftDone + s.triggerSnapshot(ep) } func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {