mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge fc7902a7541f9b4647f225362261b58d1bbc5ba4 into c86c93ca2951338115159dcdd20711603044e1f1
This commit is contained in:
commit
f412edb782
@ -101,6 +101,9 @@ type raftNode struct {
|
|||||||
|
|
||||||
stopped chan struct{}
|
stopped chan struct{}
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
|
||||||
|
// used by liveness probe to check whether the raftloop is blocked.
|
||||||
|
dummyc chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type raftNodeConfig struct {
|
type raftNodeConfig struct {
|
||||||
@ -145,6 +148,7 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
|
|||||||
applyc: make(chan toApply),
|
applyc: make(chan toApply),
|
||||||
stopped: make(chan struct{}),
|
stopped: make(chan struct{}),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
dummyc: make(chan struct{}),
|
||||||
}
|
}
|
||||||
if r.heartbeat == 0 {
|
if r.heartbeat == 0 {
|
||||||
r.ticker = &time.Ticker{}
|
r.ticker = &time.Ticker{}
|
||||||
@ -332,6 +336,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
// notify etcdserver that raft has already been notified or advanced.
|
// notify etcdserver that raft has already been notified or advanced.
|
||||||
raftAdvancedC <- struct{}{}
|
raftAdvancedC <- struct{}{}
|
||||||
}
|
}
|
||||||
|
case <-r.dummyc:
|
||||||
|
r.lg.Debug("Received dummy event")
|
||||||
case <-r.stopped:
|
case <-r.stopped:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -424,6 +430,17 @@ func (r *raftNode) onStop() {
|
|||||||
close(r.done)
|
close(r.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *raftNode) trySendDummyEvent(timeout time.Duration) error {
|
||||||
|
select {
|
||||||
|
case r.dummyc <- struct{}{}:
|
||||||
|
case <-r.done:
|
||||||
|
case <-time.After(timeout):
|
||||||
|
return fmt.Errorf("failed to send dummy event in %s", timeout.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// for testing
|
// for testing
|
||||||
func (r *raftNode) pauseSending() {
|
func (r *raftNode) pauseSending() {
|
||||||
p := r.transport.(rafthttp.Pausable)
|
p := r.transport.(rafthttp.Pausable)
|
||||||
|
|||||||
@ -22,6 +22,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||||
@ -322,3 +323,74 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTrySendDummyEvent(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
drainApply bool
|
||||||
|
stopped bool
|
||||||
|
expectBlocked bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "normal case",
|
||||||
|
drainApply: true,
|
||||||
|
stopped: false,
|
||||||
|
expectBlocked: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "blocked on apply",
|
||||||
|
drainApply: false,
|
||||||
|
stopped: false,
|
||||||
|
expectBlocked: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "not blocked due to stopped",
|
||||||
|
drainApply: false,
|
||||||
|
stopped: true,
|
||||||
|
expectBlocked: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
n := newNopReadyNode()
|
||||||
|
|
||||||
|
r := newRaftNode(raftNodeConfig{
|
||||||
|
lg: zaptest.NewLogger(t),
|
||||||
|
Node: n,
|
||||||
|
storage: mockstorage.NewStorageRecorder(""),
|
||||||
|
raftStorage: raft.NewMemoryStorage(),
|
||||||
|
transport: newNopTransporter(),
|
||||||
|
})
|
||||||
|
srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zaptest.NewLogger(t), r: *r}
|
||||||
|
|
||||||
|
leadershipC := make(chan struct{}, 1)
|
||||||
|
srv.r.start(&raftReadyHandler{
|
||||||
|
getLead: func() uint64 { return 0 },
|
||||||
|
updateLead: func(uint64) {},
|
||||||
|
updateLeadership: func(bool) {
|
||||||
|
leadershipC <- struct{}{}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
defer srv.r.Stop()
|
||||||
|
|
||||||
|
n.readyc <- raft.Ready{
|
||||||
|
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
|
||||||
|
Entries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
|
||||||
|
}
|
||||||
|
<-leadershipC // ensure the raft loop is already in progress of processing the event
|
||||||
|
|
||||||
|
if tc.drainApply {
|
||||||
|
_ = <-srv.r.applyc
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.stopped {
|
||||||
|
close(r.done)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.trySendDummyEvent(2 * time.Second)
|
||||||
|
assert.Equal(t, tc.expectBlocked, err != nil, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1316,6 +1316,17 @@ func (s *EtcdServer) Stop() {
|
|||||||
s.HardStop()
|
s.HardStop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsRaftLoopBlocked checks whether the raft loop has blocked for at least
|
||||||
|
// the duration specified by `timeout`, and it defaults to 2*ElectionTimeout,
|
||||||
|
// which is the maximum time to trigger a new leader election.
|
||||||
|
// If the returned error isn't nil, then it's blocked; otherwise not.
|
||||||
|
func (s *EtcdServer) IsRaftLoopBlocked(timeout time.Duration) error {
|
||||||
|
if timeout == 0 {
|
||||||
|
timeout = 2 * s.Cfg.ElectionTimeout()
|
||||||
|
}
|
||||||
|
return s.r.trySendDummyEvent(timeout)
|
||||||
|
}
|
||||||
|
|
||||||
// ReadyNotify returns a channel that will be closed when the server
|
// ReadyNotify returns a channel that will be closed when the server
|
||||||
// is ready to serve client requests
|
// is ready to serve client requests
|
||||||
func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
|
func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user