mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #9502 from jpbetz/automated-cherry-pick-of-#9415-release-3.2
Automated cherry pick of #9415
This commit is contained in:
commit
29185da0e0
@ -95,6 +95,7 @@ type raftNode struct {
|
||||
term uint64
|
||||
lead uint64
|
||||
|
||||
tickMu *sync.Mutex
|
||||
raftNodeConfig
|
||||
|
||||
// a chan to send/receive snapshot
|
||||
@ -131,6 +132,7 @@ type raftNodeConfig struct {
|
||||
|
||||
func newRaftNode(cfg raftNodeConfig) *raftNode {
|
||||
r := &raftNode{
|
||||
tickMu: new(sync.Mutex),
|
||||
raftNodeConfig: cfg,
|
||||
// set up contention detectors for raft heartbeat message.
|
||||
// expect to send a heartbeat within 2 heartbeat intervals.
|
||||
@ -149,6 +151,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
|
||||
return r
|
||||
}
|
||||
|
||||
// raft.Node does not have locks in Raft package
|
||||
func (r *raftNode) tick() {
|
||||
r.tickMu.Lock()
|
||||
r.Tick()
|
||||
r.tickMu.Unlock()
|
||||
}
|
||||
|
||||
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
||||
// to modify the fields after it has been started.
|
||||
func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
@ -161,7 +170,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
for {
|
||||
select {
|
||||
case <-r.ticker.C:
|
||||
r.Tick()
|
||||
r.tick()
|
||||
case rd := <-r.Ready():
|
||||
if rd.SoftState != nil {
|
||||
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
|
||||
@ -368,13 +377,13 @@ func (r *raftNode) resumeSending() {
|
||||
p.Resume()
|
||||
}
|
||||
|
||||
// advanceTicksForElection advances ticks to the node for fast election.
|
||||
// This reduces the time to wait for first leader election if bootstrapping the whole
|
||||
// cluster, while leaving at least 1 heartbeat for possible existing leader
|
||||
// to contact it.
|
||||
func advanceTicksForElection(n raft.Node, electionTicks int) {
|
||||
for i := 0; i < electionTicks-1; i++ {
|
||||
n.Tick()
|
||||
// advanceTicks advances ticks of Raft node.
|
||||
// This can be used for fast-forwarding election
|
||||
// ticks in multi data-center deployments, thus
|
||||
// speeding up election process.
|
||||
func (r *raftNode) advanceTicks(ticks int) {
|
||||
for i := 0; i < ticks; i++ {
|
||||
r.tick()
|
||||
}
|
||||
}
|
||||
|
||||
@ -415,8 +424,8 @@ func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (i
|
||||
raftStatusMu.Lock()
|
||||
raftStatus = n.Status
|
||||
raftStatusMu.Unlock()
|
||||
advanceTicksForElection(n, c.ElectionTick)
|
||||
return
|
||||
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||
@ -449,7 +458,6 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membe
|
||||
raftStatusMu.Lock()
|
||||
raftStatus = n.Status
|
||||
raftStatusMu.Unlock()
|
||||
advanceTicksForElection(n, c.ElectionTick)
|
||||
return id, cl, n, s, w
|
||||
}
|
||||
|
||||
|
@ -513,11 +513,51 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
||||
// modify a server's fields after it has been sent to Start.
|
||||
// It also starts a goroutine to publish its server information.
|
||||
func (s *EtcdServer) adjustTicks() {
|
||||
clusterN := len(s.cluster.Members())
|
||||
|
||||
// single-node fresh start, or single-node recovers from snapshot
|
||||
if clusterN == 1 {
|
||||
ticks := s.Cfg.ElectionTicks - 1
|
||||
plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
|
||||
s.r.advanceTicks(ticks)
|
||||
return
|
||||
}
|
||||
|
||||
// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
|
||||
// until peer connection reports; otherwise:
|
||||
// 1. all connections failed, or
|
||||
// 2. no active peers, or
|
||||
// 3. restarted single-node with no snapshot
|
||||
// then, do nothing, because advancing ticks would have no effect
|
||||
waitTime := rafthttp.ConnReadTimeout
|
||||
itv := 50 * time.Millisecond
|
||||
for i := int64(0); i < int64(waitTime/itv); i++ {
|
||||
select {
|
||||
case <-time.After(itv):
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
|
||||
peerN := s.r.transport.ActivePeers()
|
||||
if peerN > 1 {
|
||||
// multi-node received peer connection reports
|
||||
// adjust ticks, in case slow leader message receive
|
||||
ticks := s.Cfg.ElectionTicks - 2
|
||||
plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
|
||||
s.r.advanceTicks(ticks)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start performs any initialization of the Server necessary for it to
|
||||
// begin serving requests. It must be called before Do or Process.
|
||||
// Start must be non-blocking; any long-running server functionality
|
||||
// should be implemented in goroutines.
|
||||
func (s *EtcdServer) Start() {
|
||||
s.start()
|
||||
s.goAttach(func() { s.adjustTicks() })
|
||||
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
||||
s.goAttach(s.purgeFile)
|
||||
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
|
||||
|
@ -83,6 +83,7 @@ func (s *nopTransporterWithActiveTime) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporterWithActiveTime) RemoveAllPeers() {}
|
||||
func (s *nopTransporterWithActiveTime) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { return s.activeMap[id] }
|
||||
func (s *nopTransporterWithActiveTime) ActivePeers() int { return 0 }
|
||||
func (s *nopTransporterWithActiveTime) Stop() {}
|
||||
func (s *nopTransporterWithActiveTime) Pause() {}
|
||||
func (s *nopTransporterWithActiveTime) Resume() {}
|
||||
|
@ -83,6 +83,8 @@ type Transporter interface {
|
||||
// If the connection is active since peer was added, it returns the adding time.
|
||||
// If the connection is currently inactive, it returns zero time.
|
||||
ActiveSince(id types.ID) time.Time
|
||||
// ActivePeers returns the number of active peers.
|
||||
ActivePeers() int
|
||||
// Stop closes the connections and stops the transporter.
|
||||
Stop()
|
||||
}
|
||||
@ -362,6 +364,20 @@ func (t *Transport) Resume() {
|
||||
}
|
||||
}
|
||||
|
||||
// ActivePeers returns a channel that closes when an initial
|
||||
// peer connection has been established. Use this to wait until the
|
||||
// first peer connection becomes active.
|
||||
func (t *Transport) ActivePeers() (cnt int) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
for _, p := range t.peers {
|
||||
if !p.activeSince().IsZero() {
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
return cnt
|
||||
}
|
||||
|
||||
type nopTransporter struct{}
|
||||
|
||||
func NewNopTransporter() Transporter {
|
||||
@ -378,6 +394,7 @@ func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
|
||||
func (s *nopTransporter) ActivePeers() int { return 0 }
|
||||
func (s *nopTransporter) Stop() {}
|
||||
func (s *nopTransporter) Pause() {}
|
||||
func (s *nopTransporter) Resume() {}
|
||||
|
Loading…
x
Reference in New Issue
Block a user