clientv3: non-recursive watch

Signed-off-by: Ted Yu <yuzhihong@gmail.com>
This commit is contained in:
Ted Yu 2020-05-20 15:44:41 -07:00
parent 732df43cf8
commit f976138186

View File

@ -311,55 +311,63 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
ok := false ok := false
ctxKey := streamKeyFromCtx(ctx) ctxKey := streamKeyFromCtx(ctx)
// find or allocate appropriate grpc watch stream var closeCh chan WatchResponse
w.mu.Lock() for {
if w.streams == nil { // find or allocate appropriate grpc watch stream
// closed w.mu.Lock()
w.mu.Unlock() if w.streams == nil {
ch := make(chan WatchResponse) // closed
close(ch) w.mu.Unlock()
return ch ch := make(chan WatchResponse)
} close(ch)
wgs := w.streams[ctxKey] return ch
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
// couldn't create channel; return closed channel
closeCh := make(chan WatchResponse, 1)
// submit request
select {
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
} }
// retry; may have dropped stream from no ctxs wgs := w.streams[ctxKey]
return w.Watch(ctx, key, opts...) if wgs == nil {
} wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
// receive channel // couldn't create channel; return closed channel
if ok { if closeCh == nil {
closeCh = make(chan WatchResponse, 1)
}
// submit request
select { select {
case ret := <-wr.retc: case reqc <- wr:
return ret ok = true
case <-ctx.Done(): case <-wr.ctx.Done():
ok = false
case <-donec: case <-donec:
ok = false
if wgs.closeErr != nil { if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break break
} }
// retry; may have dropped stream from no ctxs // retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...) continue
} }
// receive channel
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
continue
}
}
break
} }
close(closeCh) close(closeCh)