diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a72b00323..1ccbf31d3 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1227,7 +1227,7 @@ func TestPublishStopped(t *testing.T) { // TestPublishRetry tests that publish will keep retry until success. func TestPublishRetry(t *testing.T) { - n := newNodeRecorder() + n := newNodeRecorderStream() srv := &EtcdServer{ Cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, @@ -1235,15 +1235,27 @@ func TestPublishRetry(t *testing.T) { stopping: make(chan struct{}), reqIDGen: idutil.NewGenerator(0, time.Time{}), } - // TODO: use fakeClockwork - time.AfterFunc(10*time.Millisecond, func() { close(srv.stopping) }) + // expect multiple proposals from retrying + ch := make(chan struct{}) + go func() { + defer close(ch) + if action, err := n.Wait(2); err != nil { + t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err) + } + close(srv.stopping) + // drain remaing actions, if any, so publish can terminate + for { + select { + case <-ch: + return + default: + n.Action() + } + } + }() srv.publish(10 * time.Nanosecond) - - action := n.Action() - // multiple Proposes - if cnt := len(action); cnt < 2 { - t.Errorf("len(action) = %d, want >= 2", cnt) - } + ch <- struct{}{} + <-ch } func TestUpdateVersion(t *testing.T) { @@ -1350,8 +1362,9 @@ func TestGetOtherPeerURLs(t *testing.T) { type nodeRecorder struct{ testutil.Recorder } -func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} } -func newNodeNop() raft.Node { return newNodeRecorder() } +func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} } +func newNodeRecorderStream() *nodeRecorder { return &nodeRecorder{testutil.NewRecorderStream()} } +func newNodeNop() raft.Node { return newNodeRecorder() } func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) } func (n *nodeRecorder) Campaign(ctx context.Context) error {