From 4be152bb4fb652be1ad5250bec7e76465167bb53 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 23 Dec 2015 16:03:23 -0800 Subject: [PATCH] rework --- etcdserver/server.go | 68 +++++++++++++++++------------------- etcdserver/snapshot_merge.go | 6 +--- rafthttp/snapshot_sender.go | 3 +- rafthttp/transport.go | 2 +- rafthttp/util.go | 5 ++- snap/message.go | 25 ++++++++----- 6 files changed, 56 insertions(+), 53 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 1e2531a9b..17310e156 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -69,7 +69,7 @@ const ( // This number is more than enough for most clusters with 5 machines. maxInFlightMsgSnap = 16 - compactionDelayAfterSnapshot = 30 * time.Second + releaseDelayAfterSnapshot = 30 * time.Second ) var ( @@ -187,13 +187,9 @@ type EtcdServer struct { msgSnapC chan raftpb.Message - cpMu sync.Mutex // guards compactionPaused - // When sending a snapshot, etcd will pause compaction. - // After receives a snapshot, the slow follower needs to get all the entries right after - // the snapshot sent to catch up. If we do not pause compaction, the log entries right after - // the snapshot sent might already be compacted. It happens when the snapshot takes long time - // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. - compactionPaused bool + // count the number of inflight snapshots. + // MUST use atomic operation to access this field. + inflightSnapshots int64 } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -552,29 +548,7 @@ func (s *EtcdServer) run() { case ep = <-etcdprogc: case m := <-s.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) - plog.Noticef("log compaction paused when sending snapshot") - s.cpMu.Lock() - s.compactionPaused = true - s.cpMu.Unlock() - - s.r.transport.SendSnapshot(merged) - go func() { - select { - case ok := <-merged.CloseNotify(): - // delay compaction for another 30 seconds. If the follower still - // fails to catch up, it is probably just too slow to catch up. - // We cannot avoid the snapshot cycle anyway. - if ok { - time.Sleep(compactionDelayAfterSnapshot) - } - plog.Noticef("log compaction resumed") - s.cpMu.Lock() - s.compactionPaused = false - s.cpMu.Unlock() - case <-s.done: - return - } - }() + s.sendMergedSnap(merged) case err := <-s.errorc: plog.Errorf("%s", err) plog.Infof("the data-dir used by this member must be removed.") @@ -675,10 +649,13 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { if ep.appliedi-ep.snapi <= s.snapCount { return } - s.cpMu.Lock() - cp := s.compactionPaused - s.cpMu.Unlock() - if cp { + + // When sending a snapshot, etcd will pause compaction. + // After receives a snapshot, the slow follower needs to get all the entries right after + // the snapshot sent to catch up. If we do not pause compaction, the log entries right after + // the snapshot sent might already be compacted. It happens when the snapshot takes long time + // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. + if atomic.LoadInt64(&s.inflightSnapshots) != 0 { return } @@ -952,6 +929,27 @@ func (s *EtcdServer) send(ms []raftpb.Message) { s.r.transport.Send(ms) } +func (s *EtcdServer) sendMergedSnap(merged snap.Message) { + atomic.AddInt64(&s.inflightSnapshots, 1) + + s.r.transport.SendSnapshot(merged) + go func() { + select { + case ok := <-merged.CloseNotify(): + // delay releasing inflight snapshot for another 30 seconds to + // block log compaction. + // If the follower still fails to catch up, it is probably just too slow + // to catch up. We cannot avoid the snapshot cycle anyway. + if ok { + time.Sleep(releaseDelayAfterSnapshot) + } + atomic.AddInt64(&s.inflightSnapshots, -1) + case <-s.done: + return + } + }() +} + // apply takes entries received from Raft (after it has been committed) and // applies them to the current state of the EtcdServer. // The given entries should not be empty. diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index bfc51999f..f6dac7245 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -54,11 +54,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, } m.Snapshot = snapshot - return snap.Message{ - Message: m, - ReadCloser: rc, - Donec: make(chan bool, 1), - } + return *snap.NewMessage(m, rc) } func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser { diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 39502c5fb..475ae2cb2 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -74,8 +74,8 @@ func (s *snapshotSender) send(merged snap.Message) { req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid) err := s.post(req) + defer merged.CloseWithError(err) if err != nil { - merged.FailedAndClose() // errMemberRemoved is a critical error since a removed member should // always be stopped. So we use reportCriticalError to report it to errorc. if err == errMemberRemoved { @@ -99,7 +99,6 @@ func (s *snapshotSender) send(merged snap.Message) { reportSentDuration(sendSnap, m, time.Since(start)) s.status.activate() s.r.ReportSnapshot(m.To, raft.SnapshotFinish) - merged.SucceededAndClose() plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index b49555011..5ae947c6c 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -285,7 +285,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time { func (t *Transport) SendSnapshot(m snap.Message) { p := t.peers[types.ID(m.To)] if p == nil { - m.FailedAndClose() + m.CloseWithError(errMemberNotFound) return } p.sendSnap(m) diff --git a/rafthttp/util.go b/rafthttp/util.go index 75a66cfd7..4efd802a7 100644 --- a/rafthttp/util.go +++ b/rafthttp/util.go @@ -31,7 +31,10 @@ import ( "github.com/coreos/etcd/version" ) -var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") +var ( + errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") + errMemberNotFound = fmt.Errorf("member not found") +) // NewListener returns a listener for raft message transfer between peers. // It uses timeout listener to identify broken streams promptly. diff --git a/snap/message.go b/snap/message.go index 39c3313bc..2d2b21106 100644 --- a/snap/message.go +++ b/snap/message.go @@ -31,22 +31,29 @@ import ( type Message struct { raftpb.Message ReadCloser io.ReadCloser - Donec chan bool + closeC chan bool +} + +func NewMessage(rs raftpb.Message, rc io.ReadCloser) *Message { + return &Message{ + Message: rs, + ReadCloser: rc, + closeC: make(chan bool, 1), + } } // CloseNotify returns a channel that receives a single value // when the message sent is finished. true indicates the sent // is successful. func (m Message) CloseNotify() <-chan bool { - return m.Donec + return m.closeC } -func (m Message) SucceededAndClose() { +func (m Message) CloseWithError(err error) { m.ReadCloser.Close() - m.Donec <- true -} - -func (m Message) FailedAndClose() { - m.ReadCloser.Close() - m.Donec <- false + if err == nil { + m.closeC <- true + } else { + m.closeC <- false + } }