diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index f5e6deda6..0bb9ebb32 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -18,7 +18,6 @@ import ( "fmt" "reflect" "sort" - "sync/atomic" "testing" "time" @@ -381,11 +380,11 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { defer testutil.AfterTest(t) // accelerate report interval so test terminates quickly - oldpi := v3rpc.ProgressReportIntervalMilliseconds + oldpi := v3rpc.GetProgressReportInterval() // using atomics to avoid race warnings - atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000) + v3rpc.SetProgressReportInterval(3 * time.Second) pi := 3 * time.Second - defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }() + defer func() { v3rpc.SetProgressReportInterval(oldpi) }() clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index d7f4ea842..2f380a58d 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -42,12 +42,25 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { } var ( - // expose for testing purpose. External test can change this to a - // small value to finish fast. The type is int32 instead of time.Duration - // in order to placate the race detector by setting the value with atomic stores. - ProgressReportIntervalMilliseconds = int32(10 * 60 * 1000) // 10 minutes + // External test can read this with GetProgressReportInterval() + // and change this to a small value to finish fast with + // SetProgressReportInterval(). + progressReportInterval = 10 * time.Minute + progressReportIntervalMu sync.RWMutex ) +func GetProgressReportInterval() time.Duration { + progressReportIntervalMu.RLock() + defer progressReportIntervalMu.RUnlock() + return progressReportInterval +} + +func SetProgressReportInterval(newTimeout time.Duration) { + progressReportIntervalMu.Lock() + defer progressReportIntervalMu.Unlock() + progressReportInterval = newTimeout +} + const ( // We send ctrl response inside the read loop. We do not want // send to block read, but we still want ctrl response we sent to @@ -166,7 +179,7 @@ func (sws *serverWatchStream) sendLoop() { // watch responses pending on a watch id creation message pending := make(map[storage.WatchID][]*pb.WatchResponse) - interval := time.Duration(ProgressReportIntervalMilliseconds) * time.Millisecond + interval := GetProgressReportInterval() progressTicker := time.NewTicker(interval) defer progressTicker.Stop() diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 083926012..50f6cab3a 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -20,7 +20,6 @@ import ( "reflect" "sort" "sync" - "sync/atomic" "testing" "time" @@ -924,11 +923,11 @@ func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.Wat func TestWatchWithProgressNotify(t *testing.T) { // accelerate report interval so test terminates quickly - oldpi := v3rpc.ProgressReportIntervalMilliseconds + oldpi := v3rpc.GetProgressReportInterval() // using atomics to avoid race warnings - atomic.StoreInt32(&v3rpc.ProgressReportIntervalMilliseconds, 3*1000) + v3rpc.SetProgressReportInterval(3 * time.Second) testInterval := 3 * time.Second - defer func() { v3rpc.ProgressReportIntervalMilliseconds = oldpi }() + defer func() { v3rpc.SetProgressReportInterval(oldpi) }() defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 3})