diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1ecbd729d..e658c7ff5 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -604,7 +604,7 @@ func TestSync(t *testing.T) { }) srv.sync(10 * time.Second) timer.Stop() - testutil.ForceGosched() + testutil.WaitSchedule() action := n.Action() if len(action) != 1 { @@ -639,7 +639,7 @@ func TestSyncTimeout(t *testing.T) { timer.Stop() // give time for goroutine in sync to cancel - testutil.ForceGosched() + testutil.WaitSchedule() w := []testutil.Action{{Name: "Propose blocked"}} if g := n.Action(); !reflect.DeepEqual(g, w) { t.Errorf("action = %v, want %v", g, w) @@ -673,7 +673,7 @@ func TestSyncTrigger(t *testing.T) { } // trigger a sync request st <- time.Time{} - testutil.ForceGosched() + testutil.WaitSchedule() action := n.Action() if len(action) != 1 { @@ -707,7 +707,7 @@ func TestSnapshot(t *testing.T) { store: st, } srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}}) - testutil.ForceGosched() + testutil.WaitSchedule() gaction := st.Action() if len(gaction) != 2 { t.Fatalf("len(action) = %d, want 1", len(gaction)) @@ -783,7 +783,7 @@ func TestRecvSnapshot(t *testing.T) { s.start() n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}} // make goroutines move forward to receive snapshot - testutil.ForceGosched() + testutil.WaitSchedule() s.Stop() wactions := []testutil.Action{{Name: "Recovery"}} @@ -824,7 +824,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { }, } // make goroutines move forward to receive snapshot - testutil.ForceGosched() + testutil.WaitSchedule() s.Stop() actions := st.Action() diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 33523f54d..2eac84eb2 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -16,17 +16,13 @@ package testutil import ( "net/url" - "runtime" "testing" + "time" ) -// WARNING: This is a hack. -// Remove this when we are able to block/check the status of the go-routines. -func ForceGosched() { - // possibility enough to sched up to 10 go routines. - for i := 0; i < 10000; i++ { - runtime.Gosched() - } +// TODO: improve this when we are able to know the schedule or status of target go-routine. +func WaitSchedule() { + time.Sleep(3 * time.Millisecond) } func MustNewURLs(t *testing.T, urls []string) []url.URL { diff --git a/raft/node_test.go b/raft/node_test.go index b407136b5..f637971af 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -199,7 +199,7 @@ func TestBlockProposal(t *testing.T) { errc <- n.Propose(context.TODO(), []byte("somedata")) }() - testutil.ForceGosched() + testutil.WaitSchedule() select { case err := <-errc: t.Errorf("err = %v, want blocking", err) @@ -207,7 +207,7 @@ func TestBlockProposal(t *testing.T) { } n.Campaign(context.TODO()) - testutil.ForceGosched() + testutil.WaitSchedule() select { case err := <-errc: if err != nil { diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index e7da528ec..c8af450da 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -39,7 +39,7 @@ func TestPipelineSend(t *testing.T) { p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} - testutil.ForceGosched() + testutil.WaitSchedule() p.stop() if tr.Request() == nil { @@ -60,7 +60,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { // keep the sender busy and make the buffer full // nothing can go out as we block the sender - testutil.ForceGosched() + testutil.WaitSchedule() for i := 0; i < connPerPipeline+pipelineBufSize; i++ { select { case p.msgc <- raftpb.Message{}: @@ -68,7 +68,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { t.Errorf("failed to send out message") } // force the sender to grab data - testutil.ForceGosched() + testutil.WaitSchedule() } // try to send a data when we are sure the buffer is full @@ -80,7 +80,7 @@ func TestPipelineExceedMaximalServing(t *testing.T) { // unblock the senders and force them to send out the data tr.unblock() - testutil.ForceGosched() + testutil.WaitSchedule() // It could send new data after previous ones succeed select { @@ -99,7 +99,7 @@ func TestPipelineSendFailed(t *testing.T) { p := newPipeline(newRespRoundTripper(0, errors.New("blah")), picker, types.ID(2), types.ID(1), types.ID(1), fs, &fakeRaft{}, nil) p.msgc <- raftpb.Message{Type: raftpb.MsgApp} - testutil.ForceGosched() + testutil.WaitSchedule() p.stop() fs.Lock() diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index f36064752..ad5d472cd 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -33,7 +33,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { prevwfc := wfc wfc = &fakeWriteFlushCloser{} sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) - testutil.ForceGosched() + testutil.WaitSchedule() // previous attached connection should be closed if prevwfc != nil && prevwfc.closed != true { t.Errorf("#%d: close of previous connection = %v, want true", i, prevwfc.closed) @@ -44,7 +44,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { } sw.msgc <- raftpb.Message{} - testutil.ForceGosched() + testutil.WaitSchedule() // still working if _, ok := sw.writec(); ok != true { t.Errorf("#%d: working status = %v, want true", i, ok) @@ -73,7 +73,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) sw.msgc <- raftpb.Message{} - testutil.ForceGosched() + testutil.WaitSchedule() // no longer working if _, ok := sw.writec(); ok != false { t.Errorf("working = %v, want false", ok) diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index c56c48b81..61043c69b 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -137,7 +137,7 @@ func TestTransportErrorc(t *testing.T) { } tr.peers[1].Send(raftpb.Message{}) - testutil.ForceGosched() + testutil.WaitSchedule() select { case <-errorc: default: