From ab31ba0d2951ec5939597f7dc323f546a39bfc31 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 23 Dec 2015 13:55:39 -0800 Subject: [PATCH] *: fix snapshot sending cycle --- etcdserver/server.go | 39 ++++++++++++++++++++++++++++++++++++ etcdserver/snapshot_merge.go | 1 + rafthttp/snapshot_sender.go | 2 ++ rafthttp/transport.go | 2 +- snap/message.go | 20 +++++++++++++++++- 5 files changed, 62 insertions(+), 2 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index cd2fe2a31..f328e9055 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -68,6 +68,8 @@ const ( // max number of in-flight snapshot messages etcdserver allows to have // This number is more than enough for most clusters with 5 machines. maxInFlightMsgSnap = 16 + + compactionDelayAfterSnapshot = 30 * time.Second ) var ( @@ -184,6 +186,14 @@ type EtcdServer struct { forceVersionC chan struct{} msgSnapC chan raftpb.Message + + cpMu sync.Mutex // guards compactionPaused + // When sending a snapshot, etcd will pause compaction. + // After receives a snapshot, the slow follower needs to get all the entries right after + // the snapshot sent to catch up. If we do not pause compaction, the log entries right after + // the snapshot sent might already be compacted. It happens when the snapshot takes long time + // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. + compactionPaused bool } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -542,7 +552,29 @@ func (s *EtcdServer) run() { case ep = <-etcdprogc: case m := <-s.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) + plog.Noticef("log compaction paused when sending snapshot") + s.cpMu.Lock() + s.compactionPaused = true + s.cpMu.Unlock() + s.r.transport.SendSnapshot(merged) + go func() { + select { + case ok := <-merged.CloseNotify(): + // delay compaction for another 30 seconds. If the follower still + // fails to catch up, it is probably just too slow to catch up. + // We cannot avoid the snapshot cycle anyway. + if ok { + time.Sleep(compactionDelayAfterSnapshot) + } + plog.Noticef("log compaction resumed") + s.cpMu.Lock() + s.compactionPaused = false + s.cpMu.Unlock() + case <-s.stop: + return + } + }() case err := <-s.errorc: plog.Errorf("%s", err) plog.Infof("the data-dir used by this member must be removed.") @@ -643,6 +675,13 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { if ep.appliedi-ep.snapi <= s.snapCount { return } + s.cpMu.Lock() + cp := s.compactionPaused + s.cpMu.Unlock() + if cp { + return + } + plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) s.snapshot(ep.appliedi, ep.confState) ep.snapi = ep.appliedi diff --git a/etcdserver/snapshot_merge.go b/etcdserver/snapshot_merge.go index 429192df3..bfc51999f 100644 --- a/etcdserver/snapshot_merge.go +++ b/etcdserver/snapshot_merge.go @@ -57,6 +57,7 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, return snap.Message{ Message: m, ReadCloser: rc, + Donec: make(chan bool, 1), } } diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 9059a4c43..39502c5fb 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -75,6 +75,7 @@ func (s *snapshotSender) send(merged snap.Message) { err := s.post(req) if err != nil { + merged.FailedAndClose() // errMemberRemoved is a critical error since a removed member should // always be stopped. So we use reportCriticalError to report it to errorc. if err == errMemberRemoved { @@ -98,6 +99,7 @@ func (s *snapshotSender) send(merged snap.Message) { reportSentDuration(sendSnap, m, time.Since(start)) s.status.activate() s.r.ReportSnapshot(m.To, raft.SnapshotFinish) + merged.SucceededAndClose() plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To)) } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 43ae95b26..b49555011 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -285,7 +285,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time { func (t *Transport) SendSnapshot(m snap.Message) { p := t.peers[types.ID(m.To)] if p == nil { - m.ReadCloser.Close() + m.FailedAndClose() return } p.sendSnap(m) diff --git a/snap/message.go b/snap/message.go index 1b7fff192..39c3313bc 100644 --- a/snap/message.go +++ b/snap/message.go @@ -27,8 +27,26 @@ import ( // Message contains the ReadCloser field for handling large snapshot. This avoid // copying the entire snapshot into a byte array, which consumes a lot of memory. // -// User of Message should close the ReadCloser after sending it. +// User of Message should close the Message after sending it. type Message struct { raftpb.Message ReadCloser io.ReadCloser + Donec chan bool +} + +// CloseNotify returns a channel that receives a single value +// when the message sent is finished. true indicates the sent +// is successful. +func (m Message) CloseNotify() <-chan bool { + return m.Donec +} + +func (m Message) SucceededAndClose() { + m.ReadCloser.Close() + m.Donec <- true +} + +func (m Message) FailedAndClose() { + m.ReadCloser.Close() + m.Donec <- false }