mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: asynchronously notify applier when raft writes finish
The raft loop would block on the applier's done channel after persisting the raft messages; the latency could cause dropped network messages. Instead, asynchronously notify the applier with a buffered channel when the raft writes complete.
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
@@ -31,8 +32,6 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -76,13 +75,14 @@ type RaftTimer interface {
|
||||
Term() uint64
|
||||
}
|
||||
|
||||
// apply contains entries, snapshot be applied.
|
||||
// After applied all the items, the application needs
|
||||
// to send notification to done chan.
|
||||
// apply contains entries, snapshot to be applied. Once
|
||||
// an apply is consumed, the entries will be persisted to
|
||||
// to raft storage concurrently; the application must read
|
||||
// raftDone before assuming the raft messages are stable.
|
||||
type apply struct {
|
||||
entries []raftpb.Entry
|
||||
snapshot raftpb.Snapshot
|
||||
done chan struct{}
|
||||
raftDone <-chan struct{} // rx {} after raft has persisted messages
|
||||
}
|
||||
|
||||
type raftNode struct {
|
||||
@@ -134,6 +134,7 @@ func (r *raftNode) start(s *EtcdServer) {
|
||||
var syncC <-chan time.Time
|
||||
|
||||
defer r.onStop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.ticker:
|
||||
@@ -158,10 +159,11 @@ func (r *raftNode) start(s *EtcdServer) {
|
||||
}
|
||||
}
|
||||
|
||||
raftDone := make(chan struct{}, 1)
|
||||
ap := apply{
|
||||
entries: rd.CommittedEntries,
|
||||
snapshot: rd.Snapshot,
|
||||
done: make(chan struct{}),
|
||||
raftDone: raftDone,
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -183,12 +185,7 @@ func (r *raftNode) start(s *EtcdServer) {
|
||||
r.raftStorage.Append(rd.Entries)
|
||||
|
||||
r.s.send(rd.Messages)
|
||||
|
||||
select {
|
||||
case <-ap.done:
|
||||
case <-r.stopped:
|
||||
return
|
||||
}
|
||||
raftDone <- struct{}{}
|
||||
r.Advance()
|
||||
case <-syncC:
|
||||
r.s.sync(r.s.cfg.ReqTimeout())
|
||||
|
||||
@@ -560,7 +560,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||
// wait for the raft routine to finish the disk writes before triggering a
|
||||
// snapshot. or applied index might be greater than the last index in raft
|
||||
// storage, since the raft routine might be slower than apply routine.
|
||||
apply.done <- struct{}{}
|
||||
<-apply.raftDone
|
||||
s.triggerSnapshot(ep)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user