mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: block Stop() on n.done, support idempotency
This commit is contained in:
parent
b271e88c20
commit
2cedf127d4
12
raft/node.go
12
raft/node.go
@ -210,8 +210,15 @@ func newNode() node {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *node) Stop() {
|
func (n *node) Stop() {
|
||||||
n.stop <- struct{}{}
|
select {
|
||||||
<-n.stop
|
case n.stop <- struct{}{}:
|
||||||
|
// Not already stopped, so trigger it
|
||||||
|
case <-n.done:
|
||||||
|
// Node has already been stopped - no need to do anything
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Block until the stop has been acknowledged by run()
|
||||||
|
<-n.done
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *node) run(r *raft) {
|
func (n *node) run(r *raft) {
|
||||||
@ -306,7 +313,6 @@ func (n *node) run(r *raft) {
|
|||||||
r.raftLog.stableTo(prevLastUnstablei)
|
r.raftLog.stableTo(prevLastUnstablei)
|
||||||
advancec = nil
|
advancec = nil
|
||||||
case <-n.stop:
|
case <-n.stop:
|
||||||
n.stop <- struct{}{}
|
|
||||||
close(n.done)
|
close(n.done)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -141,7 +141,7 @@ func TestBlockProposal(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestNodeTick ensures that node.Tick() will increase the
|
// TestNodeTick ensures that node.Tick() will increase the
|
||||||
// elapsed of the underly raft state machine.
|
// elapsed of the underlying raft state machine.
|
||||||
func TestNodeTick(t *testing.T) {
|
func TestNodeTick(t *testing.T) {
|
||||||
n := newNode()
|
n := newNode()
|
||||||
r := newRaft(1, []uint64{1}, 10, 1)
|
r := newRaft(1, []uint64{1}, 10, 1)
|
||||||
@ -154,6 +154,40 @@ func TestNodeTick(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestNodeStop ensures that node.Stop() blocks until the node has stopped
|
||||||
|
// processing, and that it is idempotent
|
||||||
|
func TestNodeStop(t *testing.T) {
|
||||||
|
n := newNode()
|
||||||
|
r := newRaft(1, []uint64{1}, 10, 1)
|
||||||
|
donec := make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
n.run(r)
|
||||||
|
close(donec)
|
||||||
|
}()
|
||||||
|
|
||||||
|
elapsed := r.elapsed
|
||||||
|
n.Tick()
|
||||||
|
n.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-donec:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("timed out waiting for node to stop!")
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.elapsed != elapsed+1 {
|
||||||
|
t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1)
|
||||||
|
}
|
||||||
|
// Further ticks should have no effect, the node is stopped.
|
||||||
|
n.Tick()
|
||||||
|
if r.elapsed != elapsed+1 {
|
||||||
|
t.Errorf("elapsed = %d, want %d", r.elapsed, elapsed+1)
|
||||||
|
}
|
||||||
|
// Subsequent Stops should have no effect.
|
||||||
|
n.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
func TestReadyContainUpdates(t *testing.T) {
|
func TestReadyContainUpdates(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
rd Ready
|
rd Ready
|
||||||
|
Loading…
x
Reference in New Issue
Block a user