mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Skip leadership check if the etcd instance is active processing heartbeats
Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
This commit is contained in:
parent
b71ae9be55
commit
9dee9b7ab5
@ -79,7 +79,9 @@ type apply struct {
|
|||||||
type raftNode struct {
|
type raftNode struct {
|
||||||
lg *zap.Logger
|
lg *zap.Logger
|
||||||
|
|
||||||
tickMu *sync.Mutex
|
tickMu *sync.RWMutex
|
||||||
|
// timestamp of the latest tick
|
||||||
|
latestTickTs time.Time
|
||||||
raftNodeConfig
|
raftNodeConfig
|
||||||
|
|
||||||
// a chan to send/receive snapshot
|
// a chan to send/receive snapshot
|
||||||
@ -131,8 +133,9 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
|
|||||||
raft.SetLogger(lg)
|
raft.SetLogger(lg)
|
||||||
r := &raftNode{
|
r := &raftNode{
|
||||||
lg: cfg.lg,
|
lg: cfg.lg,
|
||||||
tickMu: new(sync.Mutex),
|
tickMu: new(sync.RWMutex),
|
||||||
raftNodeConfig: cfg,
|
raftNodeConfig: cfg,
|
||||||
|
latestTickTs: time.Now(),
|
||||||
// set up contention detectors for raft heartbeat message.
|
// set up contention detectors for raft heartbeat message.
|
||||||
// expect to send a heartbeat within 2 heartbeat intervals.
|
// expect to send a heartbeat within 2 heartbeat intervals.
|
||||||
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
|
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
|
||||||
@ -154,9 +157,16 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
|
|||||||
func (r *raftNode) tick() {
|
func (r *raftNode) tick() {
|
||||||
r.tickMu.Lock()
|
r.tickMu.Lock()
|
||||||
r.Tick()
|
r.Tick()
|
||||||
|
r.latestTickTs = time.Now()
|
||||||
r.tickMu.Unlock()
|
r.tickMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *raftNode) getLatestTickTs() time.Time {
|
||||||
|
r.tickMu.RLock()
|
||||||
|
defer r.tickMu.RUnlock()
|
||||||
|
return r.latestTickTs
|
||||||
|
}
|
||||||
|
|
||||||
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
||||||
// to modify the fields after it has been started.
|
// to modify the fields after it has been started.
|
||||||
func (r *raftNode) start(rh *raftReadyHandler) {
|
func (r *raftNode) start(rh *raftReadyHandler) {
|
||||||
|
@ -1174,10 +1174,32 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isActive checks if the etcd instance is still actively processing the
|
||||||
|
// heartbeat message (ticks). It returns false if no heartbeat has been
|
||||||
|
// received within 3 * tickMs.
|
||||||
|
func (s *EtcdServer) isActive() bool {
|
||||||
|
latestTickTs := s.r.getLatestTickTs()
|
||||||
|
threshold := 3 * time.Duration(s.Cfg.TickMs) * time.Millisecond
|
||||||
|
return latestTickTs.Add(threshold).After(time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
// ensureLeadership checks whether current member is still the leader.
|
// ensureLeadership checks whether current member is still the leader.
|
||||||
func (s *EtcdServer) ensureLeadership() bool {
|
func (s *EtcdServer) ensureLeadership() bool {
|
||||||
lg := s.Logger()
|
lg := s.Logger()
|
||||||
|
|
||||||
|
if s.isActive() {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Debug("The member is active, skip checking leadership",
|
||||||
|
zap.Time("latestTickTs", s.r.getLatestTickTs()),
|
||||||
|
zap.Time("now", time.Now()))
|
||||||
|
} else {
|
||||||
|
plog.Debugf("The member is active, skip checking leadership, latestTickTs: %s, now: %s",
|
||||||
|
s.r.getLatestTickTs(), time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
|
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := s.linearizableReadNotify(ctx); err != nil {
|
if err := s.linearizableReadNotify(ctx); err != nil {
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/etcd/etcdserver/api/membership"
|
"go.etcd.io/etcd/etcdserver/api/membership"
|
||||||
"go.etcd.io/etcd/etcdserver/api/rafthttp"
|
"go.etcd.io/etcd/etcdserver/api/rafthttp"
|
||||||
"go.etcd.io/etcd/etcdserver/api/snap"
|
"go.etcd.io/etcd/etcdserver/api/snap"
|
||||||
@ -1995,3 +1996,45 @@ func TestWaitAppliedIndex(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsActive(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
tickMs uint
|
||||||
|
durationSinceLastTick time.Duration
|
||||||
|
expectActive bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "1.5*tickMs,active",
|
||||||
|
tickMs: 100,
|
||||||
|
durationSinceLastTick: 150 * time.Millisecond,
|
||||||
|
expectActive: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "2*tickMs,active",
|
||||||
|
tickMs: 200,
|
||||||
|
durationSinceLastTick: 400 * time.Millisecond,
|
||||||
|
expectActive: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "4*tickMs,not active",
|
||||||
|
tickMs: 150,
|
||||||
|
durationSinceLastTick: 600 * time.Millisecond,
|
||||||
|
expectActive: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
s := EtcdServer{
|
||||||
|
Cfg: ServerConfig{
|
||||||
|
TickMs: tc.tickMs,
|
||||||
|
},
|
||||||
|
r: raftNode{
|
||||||
|
tickMu: new(sync.RWMutex),
|
||||||
|
latestTickTs: time.Now().Add(-tc.durationSinceLastTick),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, tc.expectActive, s.isActive())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user