From 6ec03d3f7ccdcbfda8c91ea6b8cd86ca3bbf9fc5 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 26 Oct 2016 16:26:00 -0700 Subject: [PATCH] etcdserver: move 'EtcdServer.send' to raft.go Clear 'TODO' --- etcdserver/raft.go | 56 ++++++++++++++++++++++++++++++-- etcdserver/raft_test.go | 2 +- etcdserver/server.go | 68 +++++++-------------------------------- etcdserver/server_test.go | 7 ++-- 4 files changed, 70 insertions(+), 63 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index c1b9ad6b3..6d6afdb94 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -24,6 +24,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -97,8 +98,14 @@ type raftNode struct { // last lead elected time lt time.Time + // to check if msg receiver is removed from cluster + isIDRemoved func(id uint64) bool + raft.Node + // a chan to send/receive snapshot + msgSnapC chan raftpb.Message + // a chan to send out apply applyc chan apply @@ -106,7 +113,10 @@ type raftNode struct { readStateC chan raft.ReadState // utility - ticker <-chan time.Time + ticker <-chan time.Time + // contention detectors for raft heartbeat message + td *contention.TimeoutDetector + heartbeat time.Duration // for logging raftStorage *raft.MemoryStorage storage Storage // transport specifies the transport to send and receive msgs to members. @@ -180,7 +190,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // For more details, check raft thesis 10.2.1 if islead { // gofail: var raftBeforeLeaderSend struct{} - rh.sendMessage(rd.Messages) + r.sendMessages(rd.Messages) } // gofail: var raftBeforeSave struct{} @@ -207,7 +217,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { if !islead { // gofail: var raftBeforeFollowerSend struct{} - rh.sendMessage(rd.Messages) + r.sendMessages(rd.Messages) } raftDone <- struct{}{} r.Advance() @@ -218,6 +228,46 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +func (r *raftNode) sendMessages(ms []raftpb.Message) { + sentAppResp := false + for i := len(ms) - 1; i >= 0; i-- { + if r.isIDRemoved(ms[i].To) { + ms[i].To = 0 + } + + if ms[i].Type == raftpb.MsgAppResp { + if sentAppResp { + ms[i].To = 0 + } else { + sentAppResp = true + } + } + + if ms[i].Type == raftpb.MsgSnap { + // There are two separate data store: the store for v2, and the KV for v3. + // The msgSnap only contains the most recent snapshot of store without KV. + // So we need to redirect the msgSnap to etcd server main loop for merging in the + // current store snapshot and KV snapshot. + select { + case r.msgSnapC <- ms[i]: + default: + // drop msgSnap if the inflight chan if full. + } + ms[i].To = 0 + } + if ms[i].Type == raftpb.MsgHeartbeat { + ok, exceed := r.td.Observe(ms[i].To) + if !ok { + // TODO: limit request rate. + plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed) + plog.Warningf("server is likely overloaded") + } + } + } + + r.transport.Send(ms) +} + func (r *raftNode) apply() chan apply { return r.applyc } diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index a93de4483..dcd8c9ada 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -159,7 +159,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), }} - srv.r.start(&raftReadyHandler{sendMessage: func(msgs []raftpb.Message) { srv.send(msgs) }}) + srv.r.start(nil) n.readyc <- raft.Ready{} select { case <-srv.r.applyc: diff --git a/etcdserver/server.go b/etcdserver/server.go index 097a413d4..de2fb22fa 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -177,8 +177,7 @@ type EtcdServer struct { snapCount uint64 - w wait.Wait - td *contention.TimeoutDetector + w wait.Wait readMu sync.RWMutex // read routine notifies etcd server that it waits for reading by sending an empty struct to @@ -233,8 +232,6 @@ type EtcdServer struct { // to detect the cluster version immediately. forceVersionC chan struct{} - msgSnapC chan raftpb.Message - // wgMu blocks concurrent waitgroup mutation while server stopping wgMu sync.RWMutex // wg is used to wait for the go routines that depends on the server state @@ -399,16 +396,19 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { readych: make(chan struct{}), Cfg: cfg, snapCount: cfg.SnapCount, - // set up contention detectors for raft heartbeat message. - // expect to send a heartbeat within 2 heartbeat intervals. - td: contention.NewTimeoutDetector(2 * heartbeat), - errorc: make(chan error, 1), - store: st, + errorc: make(chan error, 1), + store: st, r: raftNode{ + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, ticker: time.Tick(heartbeat), + // set up contention detectors for raft heartbeat message. + // expect to send a heartbeat within 2 heartbeat intervals. + td: contention.NewTimeoutDetector(2 * heartbeat), + heartbeat: heartbeat, raftStorage: s, storage: NewStorage(w, ss), + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), readStateC: make(chan raft.ReadState, 1), }, id: id, @@ -420,7 +420,6 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { peerRt: prt, reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), forceVersionC: make(chan struct{}), - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} @@ -583,7 +582,6 @@ type etcdProgress struct { // TODO: add a state machine interface to apply the commit entries and do snapshot/recover type raftReadyHandler struct { leadershipUpdate func() - sendMessage func(msgs []raftpb.Message) } func (s *EtcdServer) run() { @@ -629,11 +627,10 @@ func (s *EtcdServer) run() { if s.compactor != nil { s.compactor.Resume() } - if s.td != nil { - s.td.Reset() + if s.r.td != nil { + s.r.td.Reset() } }, - sendMessage: func(msgs []raftpb.Message) { s.send(msgs) }, } s.r.start(rh) @@ -745,7 +742,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { s.triggerSnapshot(ep) select { // snapshot requested via send() - case m := <-s.msgSnapC: + case m := <-s.r.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState) s.sendMergedSnap(merged) default: @@ -1165,47 +1162,6 @@ func (s *EtcdServer) publish(timeout time.Duration) { } } -// TODO: move this function into raft.go -func (s *EtcdServer) send(ms []raftpb.Message) { - sentAppResp := false - for i := len(ms) - 1; i >= 0; i-- { - if s.cluster.IsIDRemoved(types.ID(ms[i].To)) { - ms[i].To = 0 - } - - if ms[i].Type == raftpb.MsgAppResp { - if sentAppResp { - ms[i].To = 0 - } else { - sentAppResp = true - } - } - - if ms[i].Type == raftpb.MsgSnap { - // There are two separate data store: the store for v2, and the KV for v3. - // The msgSnap only contains the most recent snapshot of store without KV. - // So we need to redirect the msgSnap to etcd server main loop for merging in the - // current store snapshot and KV snapshot. - select { - case s.msgSnapC <- ms[i]: - default: - // drop msgSnap if the inflight chan if full. - } - ms[i].To = 0 - } - if ms[i].Type == raftpb.MsgHeartbeat { - ok, exceed := s.td.Observe(ms[i].To) - if !ok { - // TODO: limit request rate. - plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed) - plog.Warningf("server is likely overloaded") - } - } - } - - s.r.transport.Send(ms) -} - func (s *EtcdServer) sendMergedSnap(merged snap.Message) { atomic.AddInt64(&s.inflightSnapshots, 1) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 22d59d77f..b47e719f9 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -971,14 +971,15 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { DataDir: testdir, }, r: raftNode{ + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, transport: tr, storage: mockstorage.NewStorageRecorder(testdir), raftStorage: rs, + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), }, - store: st, - cluster: cl, - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + store: st, + cluster: cl, } s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}