Merge pull request #4630 from heyitsanthony/clientv3-watcher-closecancel

clientv3: return closed channel on Watch() cancel
This commit is contained in:
Hongchao Deng 2016-02-26 13:39:17 -08:00
commit 2ec138b160
2 changed files with 40 additions and 11 deletions

View File

@ -226,6 +226,26 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
putAndWatch(t, wctx, "a", "b") putAndWatch(t, wctx, "a", "b")
} }
// TestWatchCancelImmediate ensures a closed channel is returned
// if the context is cancelled.
func TestWatchCancelImmediate(t *testing.T) {
runWatchTest(t, testWatchCancelImmediate)
}
func testWatchCancelImmediate(t *testing.T, wctx *watchctx) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
wch := wctx.w.Watch(ctx, "a")
select {
case wresp, ok := <-wch:
if ok {
t.Fatalf("read wch got %v; expected closed channel", wresp)
}
default:
t.Fatalf("closed watcher channel should not block")
}
}
// TestWatchCancelInit tests watcher closes correctly after no events. // TestWatchCancelInit tests watcher closes correctly after no events.
func TestWatchCancelInit(t *testing.T) { func TestWatchCancelInit(t *testing.T) {
runWatchTest(t, testWatchCancelInit) runWatchTest(t, testWatchCancelInit)

View File

@ -135,23 +135,30 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
retc := make(chan chan WatchResponse, 1) retc := make(chan chan WatchResponse, 1)
wr.retc = retc wr.retc = retc
ok := false
// submit request // submit request
select { select {
case w.reqc <- wr: case w.reqc <- wr:
ok = true
case <-wr.ctx.Done(): case <-wr.ctx.Done():
return nil
case <-w.donec: case <-w.donec:
return nil
} }
// receive channel // receive channel
select { if ok {
case ret := <-retc: select {
return ret case ret := <-retc:
case <-ctx.Done(): return ret
return nil case <-ctx.Done():
case <-w.donec: case <-w.donec:
return nil }
} }
// couldn't create channel; return closed channel
ch := make(chan WatchResponse)
close(ch)
return ch
} }
func (w *watcher) Close() error { func (w *watcher) Close() error {
@ -179,13 +186,15 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
pendingReq.retc <- ret pendingReq.retc <- ret
return return
} }
ret := make(chan WatchResponse)
if resp.WatchId == -1 { if resp.WatchId == -1 {
// failed; no channel // failed; no channel
pendingReq.retc <- nil close(ret)
pendingReq.retc <- ret
return return
} }
ret := make(chan WatchResponse)
ws := &watcherStream{ ws := &watcherStream{
initReq: *pendingReq, initReq: *pendingReq,
id: resp.WatchId, id: resp.WatchId,