Merge pull request #6738 from gyuho/raft-cleanup

etcdserver: move 'EtcdServer.send' to raft.go
This commit is contained in:
Gyu-Ho Lee 2016-10-31 15:15:08 -07:00 committed by GitHub
commit 136c02da71
4 changed files with 70 additions and 63 deletions

View File

@ -24,6 +24,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/contention"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
@ -97,8 +98,14 @@ type raftNode struct {
// last lead elected time
lt time.Time
// to check if msg receiver is removed from cluster
isIDRemoved func(id uint64) bool
raft.Node
// a chan to send/receive snapshot
msgSnapC chan raftpb.Message
// a chan to send out apply
applyc chan apply
@ -106,7 +113,10 @@ type raftNode struct {
readStateC chan raft.ReadState
// utility
ticker <-chan time.Time
ticker <-chan time.Time
// contention detectors for raft heartbeat message
td *contention.TimeoutDetector
heartbeat time.Duration // for logging
raftStorage *raft.MemoryStorage
storage Storage
// transport specifies the transport to send and receive msgs to members.
@ -180,7 +190,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// For more details, check raft thesis 10.2.1
if islead {
// gofail: var raftBeforeLeaderSend struct{}
rh.sendMessage(rd.Messages)
r.sendMessages(rd.Messages)
}
// gofail: var raftBeforeSave struct{}
@ -207,7 +217,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if !islead {
// gofail: var raftBeforeFollowerSend struct{}
rh.sendMessage(rd.Messages)
r.sendMessages(rd.Messages)
}
raftDone <- struct{}{}
r.Advance()
@ -218,6 +228,46 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}()
}
func (r *raftNode) sendMessages(ms []raftpb.Message) {
sentAppResp := false
for i := len(ms) - 1; i >= 0; i-- {
if r.isIDRemoved(ms[i].To) {
ms[i].To = 0
}
if ms[i].Type == raftpb.MsgAppResp {
if sentAppResp {
ms[i].To = 0
} else {
sentAppResp = true
}
}
if ms[i].Type == raftpb.MsgSnap {
// There are two separate data store: the store for v2, and the KV for v3.
// The msgSnap only contains the most recent snapshot of store without KV.
// So we need to redirect the msgSnap to etcd server main loop for merging in the
// current store snapshot and KV snapshot.
select {
case r.msgSnapC <- ms[i]:
default:
// drop msgSnap if the inflight chan if full.
}
ms[i].To = 0
}
if ms[i].Type == raftpb.MsgHeartbeat {
ok, exceed := r.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
plog.Warningf("server is likely overloaded")
}
}
}
r.transport.Send(ms)
}
func (r *raftNode) apply() chan apply {
return r.applyc
}

View File

@ -159,7 +159,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
raftStorage: raft.NewMemoryStorage(),
transport: rafthttp.NewNopTransporter(),
}}
srv.r.start(&raftReadyHandler{sendMessage: func(msgs []raftpb.Message) { srv.send(msgs) }})
srv.r.start(nil)
n.readyc <- raft.Ready{}
select {
case <-srv.r.applyc:

View File

@ -177,8 +177,7 @@ type EtcdServer struct {
snapCount uint64
w wait.Wait
td *contention.TimeoutDetector
w wait.Wait
readMu sync.RWMutex
// read routine notifies etcd server that it waits for reading by sending an empty struct to
@ -233,8 +232,6 @@ type EtcdServer struct {
// to detect the cluster version immediately.
forceVersionC chan struct{}
msgSnapC chan raftpb.Message
// wgMu blocks concurrent waitgroup mutation while server stopping
wgMu sync.RWMutex
// wg is used to wait for the go routines that depends on the server state
@ -399,16 +396,19 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
readych: make(chan struct{}),
Cfg: cfg,
snapCount: cfg.SnapCount,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * heartbeat),
errorc: make(chan error, 1),
store: st,
errorc: make(chan error, 1),
store: st,
r: raftNode{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
ticker: time.Tick(heartbeat),
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * heartbeat),
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
readStateC: make(chan raft.ReadState, 1),
},
id: id,
@ -420,7 +420,6 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
@ -583,7 +582,6 @@ type etcdProgress struct {
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
type raftReadyHandler struct {
leadershipUpdate func()
sendMessage func(msgs []raftpb.Message)
}
func (s *EtcdServer) run() {
@ -629,11 +627,10 @@ func (s *EtcdServer) run() {
if s.compactor != nil {
s.compactor.Resume()
}
if s.td != nil {
s.td.Reset()
if s.r.td != nil {
s.r.td.Reset()
}
},
sendMessage: func(msgs []raftpb.Message) { s.send(msgs) },
}
s.r.start(rh)
@ -745,7 +742,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.triggerSnapshot(ep)
select {
// snapshot requested via send()
case m := <-s.msgSnapC:
case m := <-s.r.msgSnapC:
merged := s.createMergedSnapshotMessage(m, ep.appliedi, ep.confState)
s.sendMergedSnap(merged)
default:
@ -1165,47 +1162,6 @@ func (s *EtcdServer) publish(timeout time.Duration) {
}
}
// TODO: move this function into raft.go
func (s *EtcdServer) send(ms []raftpb.Message) {
sentAppResp := false
for i := len(ms) - 1; i >= 0; i-- {
if s.cluster.IsIDRemoved(types.ID(ms[i].To)) {
ms[i].To = 0
}
if ms[i].Type == raftpb.MsgAppResp {
if sentAppResp {
ms[i].To = 0
} else {
sentAppResp = true
}
}
if ms[i].Type == raftpb.MsgSnap {
// There are two separate data store: the store for v2, and the KV for v3.
// The msgSnap only contains the most recent snapshot of store without KV.
// So we need to redirect the msgSnap to etcd server main loop for merging in the
// current store snapshot and KV snapshot.
select {
case s.msgSnapC <- ms[i]:
default:
// drop msgSnap if the inflight chan if full.
}
ms[i].To = 0
}
if ms[i].Type == raftpb.MsgHeartbeat {
ok, exceed := s.td.Observe(ms[i].To)
if !ok {
// TODO: limit request rate.
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)
plog.Warningf("server is likely overloaded")
}
}
}
s.r.transport.Send(ms)
}
func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
atomic.AddInt64(&s.inflightSnapshots, 1)

View File

@ -971,14 +971,15 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
DataDir: testdir,
},
r: raftNode{
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
transport: tr,
storage: mockstorage.NewStorageRecorder(testdir),
raftStorage: rs,
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
},
store: st,
cluster: cl,
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
store: st,
cluster: cl,
}
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}