diff --git a/etcdserver/raft.go b/etcdserver/raft.go index a9825d0a5..b603c459c 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -94,14 +94,7 @@ type raftNode struct { term uint64 lead uint64 - mu sync.Mutex - // last lead elected time - lt time.Time - - // to check if msg receiver is removed from cluster - isIDRemoved func(id uint64) bool - - raft.Node + raftNodeConfig // a chan to send/receive snapshot msgSnapC chan raftpb.Message @@ -115,26 +108,49 @@ type raftNode struct { // utility ticker *time.Ticker // contention detectors for raft heartbeat message - td *contention.TimeoutDetector - heartbeat time.Duration // for logging - raftStorage *raft.MemoryStorage - storage Storage - // transport specifies the transport to send and receive msgs to members. - // Sending messages MUST NOT block. It is okay to drop messages, since - // clients should timeout and reissue their messages. - // If transport is nil, server will panic. - transport rafthttp.Transporter + td *contention.TimeoutDetector stopped chan struct{} done chan struct{} } +type raftNodeConfig struct { + // to check if msg receiver is removed from cluster + isIDRemoved func(id uint64) bool + raft.Node + raftStorage *raft.MemoryStorage + storage Storage + heartbeat time.Duration // for logging + // transport specifies the transport to send and receive msgs to members. + // Sending messages MUST NOT block. It is okay to drop messages, since + // clients should timeout and reissue their messages. + // If transport is nil, server will panic. + transport rafthttp.Transporter +} + +func newRaftNode(cfg raftNodeConfig) *raftNode { + r := &raftNode{ + raftNodeConfig: cfg, + // set up contention detectors for raft heartbeat message. + // expect to send a heartbeat within 2 heartbeat intervals. + td: contention.NewTimeoutDetector(2 * cfg.heartbeat), + readStateC: make(chan raft.ReadState, 1), + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + applyc: make(chan apply), + stopped: make(chan struct{}), + done: make(chan struct{}), + } + if r.heartbeat == 0 { + r.ticker = &time.Ticker{} + } else { + r.ticker = time.NewTicker(r.heartbeat) + } + return r +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { - r.applyc = make(chan apply) - r.stopped = make(chan struct{}) - r.done = make(chan struct{}) internalTimeout := time.Second go func() { @@ -147,10 +163,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.Tick() case rd := <-r.Ready(): if rd.SoftState != nil { - if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead { - r.mu.Lock() - r.lt = time.Now() - r.mu.Unlock() + newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead + if newLeader { leaderChanges.Inc() } @@ -162,7 +176,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { atomic.StoreUint64(&r.lead, rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader - rh.updateLeadership() + rh.updateLeadership(newLeader) + r.td.Reset() } if len(rd.ReadStates) != 0 { @@ -316,12 +331,6 @@ func (r *raftNode) apply() chan apply { return r.applyc } -func (r *raftNode) leadElectedTime() time.Time { - r.mu.Lock() - defer r.mu.Unlock() - return r.lt -} - func (r *raftNode) stop() { r.stopped <- struct{}{} <-r.done diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 945f63ce2..fa5c0174b 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -153,13 +153,13 @@ func TestCreateConfigChangeEnts(t *testing.T) { func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { n := newNopReadyNode() - srv := &EtcdServer{r: raftNode{ + r := newRaftNode(raftNodeConfig{ Node: n, storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }} + }) + srv := &EtcdServer{r: *r} srv.r.start(nil) n.readyc <- raft.Ready{} select { @@ -182,16 +182,16 @@ func TestConfgChangeBlocksApply(t *testing.T) { waitApplyc := make(chan struct{}) - srv := &EtcdServer{r: raftNode{ + r := newRaftNode(raftNodeConfig{ Node: n, storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }} + }) + srv := &EtcdServer{r: *r} rh := &raftReadyHandler{ - updateLeadership: func() {}, + updateLeadership: func(bool) {}, waitForApply: func() { <-waitApplyc }, diff --git a/etcdserver/server.go b/etcdserver/server.go index cd9533945..aa4a17359 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -41,7 +41,6 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/backend" - "github.com/coreos/etcd/pkg/contention" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/idutil" "github.com/coreos/etcd/pkg/pbutil" @@ -243,6 +242,9 @@ type EtcdServer struct { // on etcd server shutdown. ctx context.Context cancel context.CancelFunc + + leadTimeMu sync.RWMutex + leadElectedTime time.Time } // NewServer creates a new EtcdServer from the supplied configuration. The @@ -419,19 +421,15 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { snapCount: cfg.SnapCount, errorc: make(chan error, 1), store: st, - r: raftNode{ - isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, - Node: n, - ticker: time.NewTicker(heartbeat), - // set up contention detectors for raft heartbeat message. - // expect to send a heartbeat within 2 heartbeat intervals. - td: contention.NewTimeoutDetector(2 * heartbeat), - heartbeat: heartbeat, - raftStorage: s, - storage: NewStorage(w, ss), - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), - readStateC: make(chan raft.ReadState, 1), - }, + r: *newRaftNode( + raftNodeConfig{ + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, + Node: n, + heartbeat: heartbeat, + raftStorage: s, + storage: NewStorage(w, ss), + }, + ), id: id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: cl, @@ -614,7 +612,7 @@ type etcdProgress struct { // and helps decouple state machine logic from Raft algorithms. // TODO: add a state machine interface to apply the commit entries and do snapshot/recover type raftReadyHandler struct { - updateLeadership func() + updateLeadership func(newLeader bool) updateCommittedIndex func(uint64) waitForApply func() } @@ -644,7 +642,7 @@ func (s *EtcdServer) run() { return } rh := &raftReadyHandler{ - updateLeadership: func() { + updateLeadership: func(newLeader bool) { if !s.isLeader() { if s.lessor != nil { s.lessor.Demote() @@ -654,6 +652,12 @@ func (s *EtcdServer) run() { } setSyncC(nil) } else { + if newLeader { + t := time.Now() + s.leadTimeMu.Lock() + s.leadElectedTime = t + s.leadTimeMu.Unlock() + } setSyncC(s.SyncTicker.C) if s.compactor != nil { s.compactor.Resume() @@ -665,9 +669,6 @@ func (s *EtcdServer) run() { if s.stats != nil { s.stats.BecomeLeader() } - if s.r.td != nil { - s.r.td.Reset() - } }, updateCommittedIndex: func(ci uint64) { cci := s.getCommittedIndex() @@ -1580,7 +1581,9 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { case context.Canceled: return ErrCanceled case context.DeadlineExceeded: - curLeadElected := s.r.leadElectedTime() + s.leadTimeMu.RLock() + curLeadElected := s.leadElectedTime + s.leadTimeMu.RUnlock() prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond) if start.After(prevLeadLost) && start.Before(curLeadElected) { return ErrTimeoutDueToLeaderFail diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 60aabc00e..f8bbb0dc4 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -167,14 +167,14 @@ func TestApplyRepeat(t *testing.T) { st := store.New() cl.SetStore(store.New()) cl.AddMember(&membership.Member{ID: 1234}) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, Cfg: &ServerConfig{}, store: st, cluster: cl, @@ -525,7 +525,7 @@ func TestApplyConfChangeError(t *testing.T) { for i, tt := range tests { n := newNodeRecorder() srv := &EtcdServer{ - r: raftNode{Node: n}, + r: *newRaftNode(raftNodeConfig{Node: n}), cluster: cl, Cfg: &ServerConfig{}, } @@ -552,12 +552,13 @@ func TestApplyConfChangeShouldStop(t *testing.T) { for i := 1; i <= 3; i++ { cl.AddMember(&membership.Member{ID: types.ID(i)}) } + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - id: 1, - r: raftNode{ - Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), - }, + id: 1, + r: *r, cluster: cl, } cc := raftpb.ConfChange{ @@ -592,12 +593,13 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { for i := 1; i <= 5; i++ { cl.AddMember(&membership.Member{ID: types.ID(i)}) } + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - id: 2, - r: raftNode{ - Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), - }, + id: 2, + r: *r, cluster: cl, w: wait.New(), } @@ -630,15 +632,15 @@ func TestDoProposal(t *testing.T) { } for i, tt := range tests { st := mockstore.NewRecorder() + r := newRaftNode(raftNodeConfig{ + Node: newNodeCommitter(), + storage: mockstorage.NewStorageRecorder(""), + raftStorage: raft.NewMemoryStorage(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{ - Node: newNodeCommitter(), - storage: mockstorage.NewStorageRecorder(""), - raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + r: *r, store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, @@ -666,7 +668,7 @@ func TestDoProposalCancelled(t *testing.T) { wt := mockwait.NewRecorder() srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: newNodeNop()}, + r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -688,7 +690,7 @@ func TestDoProposalCancelled(t *testing.T) { func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: newNodeNop()}, + r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -704,7 +706,7 @@ func TestDoProposalTimeout(t *testing.T) { func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: newNodeNop()}, + r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } @@ -723,7 +725,7 @@ func TestSync(t *testing.T) { n := newNodeRecorder() ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ - r: raftNode{Node: n}, + r: *newRaftNode(raftNodeConfig{Node: n}), reqIDGen: idutil.NewGenerator(0, time.Time{}), ctx: ctx, cancel: cancel, @@ -766,7 +768,7 @@ func TestSyncTimeout(t *testing.T) { n := newProposalBlockerRecorder() ctx, cancel := context.WithCancel(context.TODO()) srv := &EtcdServer{ - r: raftNode{Node: n}, + r: *newRaftNode(raftNodeConfig{Node: n}), reqIDGen: idutil.NewGenerator(0, time.Time{}), ctx: ctx, cancel: cancel, @@ -799,15 +801,16 @@ func TestSyncTrigger(t *testing.T) { n := newReadyNode() st := make(chan time.Time, 1) tk := &time.Ticker{C: st} + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + transport: rafthttp.NewNopTransporter(), + storage: mockstorage.NewStorageRecorder(""), + }) + srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), - storage: mockstorage.NewStorageRecorder(""), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + r: *r, store: mockstore.NewNop(), SyncTicker: tk, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -858,13 +861,14 @@ func TestSnapshot(t *testing.T) { s.Append([]raftpb.Entry{{Index: 1}}) st := mockstore.NewRecorderStream() p := mockstorage.NewStorageRecorderStream("") + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + raftStorage: s, + storage: p, + }) srv := &EtcdServer{ - Cfg: &ServerConfig{}, - r: raftNode{ - Node: newNodeNop(), - raftStorage: s, - storage: p, - }, + Cfg: &ServerConfig{}, + r: *r, store: st, } srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex) @@ -914,16 +918,16 @@ func TestTriggerSnap(t *testing.T) { snapc := 10 st := mockstore.NewRecorder() p := mockstorage.NewStorageRecorderStream("") + r := newRaftNode(raftNodeConfig{ + Node: newNodeCommitter(), + raftStorage: raft.NewMemoryStorage(), + storage: p, + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - snapCount: uint64(snapc), - r: raftNode{ - Node: newNodeCommitter(), - raftStorage: raft.NewMemoryStorage(), - storage: p, - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + snapCount: uint64(snapc), + r: *r, store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), SyncTicker: &time.Ticker{}, @@ -962,10 +966,6 @@ func TestTriggerSnap(t *testing.T) { // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with // proposals. func TestConcurrentApplyAndSnapshotV3(t *testing.T) { - const ( - // snapshots that may queue up at once without dropping - maxInFlightMsgSnap = 16 - ) n := newNopReadyNode() st := store.New() cl := membership.NewCluster("abc") @@ -982,19 +982,18 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { rs := raft.NewMemoryStorage() tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) + r := newRaftNode(raftNodeConfig{ + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, + Node: n, + transport: tr, + storage: mockstorage.NewStorageRecorder(testdir), + raftStorage: rs, + }) s := &EtcdServer{ Cfg: &ServerConfig{ DataDir: testdir, }, - r: raftNode{ - isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, - Node: n, - transport: tr, - storage: mockstorage.NewStorageRecorder(testdir), - raftStorage: rs, - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), - ticker: &time.Ticker{}, - }, + r: *r, store: st, cluster: cl, SyncTicker: &time.Ticker{}, @@ -1069,14 +1068,14 @@ func TestAddMember(t *testing.T) { cl := newTestCluster(nil) st := store.New() cl.SetStore(st) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, Cfg: &ServerConfig{}, store: st, cluster: cl, @@ -1111,14 +1110,14 @@ func TestRemoveMember(t *testing.T) { st := store.New() cl.SetStore(store.New()) cl.AddMember(&membership.Member{ID: 1234}) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, Cfg: &ServerConfig{}, store: st, cluster: cl, @@ -1152,14 +1151,14 @@ func TestUpdateMember(t *testing.T) { st := store.New() cl.SetStore(st) cl.AddMember(&membership.Member{ID: 1234}) + r := newRaftNode(raftNodeConfig{ + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: rafthttp.NewNopTransporter(), + }) s := &EtcdServer{ - r: raftNode{ - Node: n, - raftStorage: raft.NewMemoryStorage(), - storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + r: *r, store: st, cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1196,7 +1195,7 @@ func TestPublish(t *testing.T) { readych: make(chan struct{}), Cfg: &ServerConfig{TickMs: 1}, id: 1, - r: raftNode{Node: n, ticker: &time.Ticker{}}, + r: *newRaftNode(raftNodeConfig{Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, cluster: &membership.RaftCluster{}, w: w, @@ -1239,13 +1238,13 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) + r := newRaftNode(raftNodeConfig{ + Node: newNodeNop(), + transport: rafthttp.NewNopTransporter(), + }) srv := &EtcdServer{ - Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{ - Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), - ticker: &time.Ticker{}, - }, + Cfg: &ServerConfig{TickMs: 1}, + r: *r, cluster: &membership.RaftCluster{}, w: mockwait.NewNop(), done: make(chan struct{}), @@ -1267,7 +1266,7 @@ func TestPublishRetry(t *testing.T) { n := newNodeRecorderStream() srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: n, ticker: &time.Ticker{}}, + r: *newRaftNode(raftNodeConfig{Node: n}), w: mockwait.NewNop(), stopping: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1308,7 +1307,7 @@ func TestUpdateVersion(t *testing.T) { srv := &EtcdServer{ id: 1, Cfg: &ServerConfig{TickMs: 1}, - r: raftNode{Node: n, ticker: &time.Ticker{}}, + r: *newRaftNode(raftNodeConfig{Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &membership.RaftCluster{}, w: w,