Merge pull request #11922 from tedyu/non-recursive-watch

clientv3: non-recursive Watch()
This commit is contained in:
Gyuho Lee 2020-05-20 20:36:09 -07:00 committed by GitHub
commit a4ada8cb1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -311,6 +311,8 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
ok := false
ctxKey := streamKeyFromCtx(ctx)
var closeCh chan WatchResponse
for {
// find or allocate appropriate grpc watch stream
w.mu.Lock()
if w.streams == nil {
@ -330,20 +332,24 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
w.mu.Unlock()
// couldn't create channel; return closed channel
closeCh := make(chan WatchResponse, 1)
if closeCh == nil {
closeCh = make(chan WatchResponse, 1)
}
// submit request
select {
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
ok = false
case <-donec:
ok = false
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
continue
}
// receive channel
@ -358,9 +364,11 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
continue
}
}
break
}
close(closeCh)
return closeCh