mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Manually backport etcdserver/raft.go tickMu fix to 3.1
This commit is contained in:
parent
908c0f4f98
commit
6f75c56c5e
@ -95,6 +95,8 @@ type raftNode struct {
|
|||||||
lead uint64
|
lead uint64
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
|
tickMu sync.Mutex
|
||||||
// last lead elected time
|
// last lead elected time
|
||||||
lt time.Time
|
lt time.Time
|
||||||
|
|
||||||
@ -129,6 +131,13 @@ type raftNode struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// raft.Node does not have locks in Raft package
|
||||||
|
func (r *raftNode) tick() {
|
||||||
|
r.tickMu.Lock()
|
||||||
|
r.Tick()
|
||||||
|
r.tickMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
||||||
// to modify the fields after it has been started.
|
// to modify the fields after it has been started.
|
||||||
func (r *raftNode) start(rh *raftReadyHandler) {
|
func (r *raftNode) start(rh *raftReadyHandler) {
|
||||||
@ -144,7 +153,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-r.ticker:
|
case <-r.ticker:
|
||||||
r.Tick()
|
r.tick()
|
||||||
case rd := <-r.Ready():
|
case rd := <-r.Ready():
|
||||||
if rd.SoftState != nil {
|
if rd.SoftState != nil {
|
||||||
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
|
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
|
||||||
@ -327,7 +336,7 @@ func (r *raftNode) resumeSending() {
|
|||||||
// speeding up election process.
|
// speeding up election process.
|
||||||
func (r *raftNode) advanceTicks(ticks int) {
|
func (r *raftNode) advanceTicks(ticks int) {
|
||||||
for i := 0; i < ticks; i++ {
|
for i := 0; i < ticks; i++ {
|
||||||
r.Tick()
|
r.tick()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -506,11 +506,50 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
return srv, nil
|
return srv, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) adjustTicks() {
|
||||||
|
clusterN := len(s.cluster.Members())
|
||||||
|
|
||||||
|
// single-node fresh start, or single-node recovers from snapshot
|
||||||
|
if clusterN == 1 {
|
||||||
|
ticks := s.Cfg.ElectionTicks - 1
|
||||||
|
plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
|
||||||
|
s.r.advanceTicks(ticks)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
|
||||||
|
// until peer connection reports; otherwise:
|
||||||
|
// 1. all connections failed, or
|
||||||
|
// 2. no active peers, or
|
||||||
|
// 3. restarted single-node with no snapshot
|
||||||
|
// then, do nothing, because advancing ticks would have no effect
|
||||||
|
waitTime := rafthttp.ConnReadTimeout
|
||||||
|
itv := 50 * time.Millisecond
|
||||||
|
for i := int64(0); i < int64(waitTime/itv); i++ {
|
||||||
|
select {
|
||||||
|
case <-time.After(itv):
|
||||||
|
case <-s.stopping:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
peerN := s.r.transport.ActivePeers()
|
||||||
|
if peerN > 1 {
|
||||||
|
// multi-node received peer connection reports
|
||||||
|
// adjust ticks, in case slow leader message receive
|
||||||
|
ticks := s.Cfg.ElectionTicks - 2
|
||||||
|
plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
|
||||||
|
s.r.advanceTicks(ticks)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
||||||
// modify a server's fields after it has been sent to Start.
|
// modify a server's fields after it has been sent to Start.
|
||||||
// It also starts a goroutine to publish its server information.
|
// It also starts a goroutine to publish its server information.
|
||||||
func (s *EtcdServer) Start() {
|
func (s *EtcdServer) Start() {
|
||||||
s.start()
|
s.start()
|
||||||
|
s.goAttach(func() { s.adjustTicks() })
|
||||||
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
|
||||||
s.goAttach(s.purgeFile)
|
s.goAttach(s.purgeFile)
|
||||||
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
|
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
|
||||||
|
Loading…
x
Reference in New Issue
Block a user