From d02b1c982f9421ad13fed8bb69908dfab1d18332 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 26 Feb 2016 12:03:38 -0800 Subject: [PATCH] clientv3: return closed channel on Watch() cancel was returning nil; difficult to use correctly Fixes #4626 --- clientv3/integration/watch_test.go | 20 +++++++++++++++++++ clientv3/watch.go | 31 +++++++++++++++++++----------- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 5ca6160d8..0aac17255 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -226,6 +226,26 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) { putAndWatch(t, wctx, "a", "b") } +// TestWatchCancelImmediate ensures a closed channel is returned +// if the context is cancelled. +func TestWatchCancelImmediate(t *testing.T) { + runWatchTest(t, testWatchCancelImmediate) +} + +func testWatchCancelImmediate(t *testing.T, wctx *watchctx) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + wch := wctx.w.Watch(ctx, "a") + select { + case wresp, ok := <-wch: + if ok { + t.Fatalf("read wch got %v; expected closed channel", wresp) + } + default: + t.Fatalf("closed watcher channel should not block") + } +} + // TestWatchCancelInit tests watcher closes correctly after no events. func TestWatchCancelInit(t *testing.T) { runWatchTest(t, testWatchCancelInit) diff --git a/clientv3/watch.go b/clientv3/watch.go index dd3dd7cc5..2a1119de5 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -135,23 +135,30 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch retc := make(chan chan WatchResponse, 1) wr.retc = retc + ok := false + // submit request select { case w.reqc <- wr: + ok = true case <-wr.ctx.Done(): - return nil case <-w.donec: - return nil } + // receive channel - select { - case ret := <-retc: - return ret - case <-ctx.Done(): - return nil - case <-w.donec: - return nil + if ok { + select { + case ret := <-retc: + return ret + case <-ctx.Done(): + case <-w.donec: + } } + + // couldn't create channel; return closed channel + ch := make(chan WatchResponse) + close(ch) + return ch } func (w *watcher) Close() error { @@ -179,13 +186,15 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { pendingReq.retc <- ret return } + + ret := make(chan WatchResponse) if resp.WatchId == -1 { // failed; no channel - pendingReq.retc <- nil + close(ret) + pendingReq.retc <- ret return } - ret := make(chan WatchResponse) ws := &watcherStream{ initReq: *pendingReq, id: resp.WatchId,