From 91dc6b29a6e000375ba47181db1bfcf8ba92daad Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 27 Mar 2016 02:39:46 -0700 Subject: [PATCH] clientv3/integration: fix race when setting progress report interval --- clientv3/integration/watch_test.go | 13 ++++++++----- etcdserver/api/v3rpc/watch.go | 8 +++++--- integration/v3_watch_test.go | 9 ++++++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index e6081ef66..c24249344 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -18,6 +18,7 @@ import ( "fmt" "reflect" "sort" + "sync/atomic" "testing" "time" @@ -379,17 +380,19 @@ func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNot func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { defer testutil.AfterTest(t) + // accelerate report interval so test terminates quickly + oldpi := v3rpc.ProgressReportIntervalMilliseconds + // using atomics to avoid race warnings + atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000) + pi := 3 * time.Second + defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }() + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) wc := clientv3.NewWatcher(clus.RandClient()) defer wc.Close() - testInterval := 3 * time.Second - pi := v3rpc.ProgressReportInterval - v3rpc.ProgressReportInterval = testInterval - defer func() { v3rpc.ProgressReportInterval = pi }() - opts := []clientv3.OpOption{clientv3.WithProgressNotify()} if watchOnPut { opts = append(opts, clientv3.WithPrefix()) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index f89be8fe5..383925b16 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -42,8 +42,9 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { var ( // expose for testing purpose. External test can change this to a - // small value to finish fast. - ProgressReportInterval = 10 * time.Minute + // small value to finish fast. The type is int32 instead of time.Duration + // in order to placate the race detector by setting the value with atomic stores. + ProgressReportIntervalMilliseconds = int32(10 * 60 * 1000) // 10 minutes ) const ( @@ -160,7 +161,8 @@ func (sws *serverWatchStream) sendLoop() { // watch responses pending on a watch id creation message pending := make(map[storage.WatchID][]*pb.WatchResponse) - progressTicker := time.NewTicker(ProgressReportInterval) + interval := time.Duration(ProgressReportIntervalMilliseconds) * time.Millisecond + progressTicker := time.NewTicker(interval) defer progressTicker.Stop() for { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index e48a4f151..79da449ad 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sort" "sync" + "sync/atomic" "testing" "time" @@ -922,10 +923,12 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat } func TestWatchWithProgressNotify(t *testing.T) { + // accelerate report interval so test terminates quickly + oldpi := v3rpc.ProgressReportIntervalMilliseconds + // using atomics to avoid race warnings + atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000) testInterval := 3 * time.Second - pi := v3rpc.ProgressReportInterval - v3rpc.ProgressReportInterval = testInterval - defer func() { v3rpc.ProgressReportInterval = pi }() + defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }() defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 3})