mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: handle non -1 watch ID on cancellation
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
33c732b97c
commit
10522f88f5
@ -367,7 +367,8 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
|
||||||
if resp.WatchId == -1 {
|
// check watch ID for backward compatibility (<= v3.3)
|
||||||
|
if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
|
||||||
// failed; no channel
|
// failed; no channel
|
||||||
close(ws.recvc)
|
close(ws.recvc)
|
||||||
return
|
return
|
||||||
@ -453,6 +454,7 @@ func (w *watchGrpcStream) run() {
|
|||||||
// Watch() requested
|
// Watch() requested
|
||||||
case wreq := <-w.reqc:
|
case wreq := <-w.reqc:
|
||||||
outc := make(chan WatchResponse, 1)
|
outc := make(chan WatchResponse, 1)
|
||||||
|
// TODO: pass custom watch ID?
|
||||||
ws := &watcherStream{
|
ws := &watcherStream{
|
||||||
initReq: *wreq,
|
initReq: *wreq,
|
||||||
id: -1,
|
id: -1,
|
||||||
@ -553,6 +555,7 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
|
|||||||
for i, ev := range pbresp.Events {
|
for i, ev := range pbresp.Events {
|
||||||
events[i] = (*Event)(ev)
|
events[i] = (*Event)(ev)
|
||||||
}
|
}
|
||||||
|
// TODO: return watch ID?
|
||||||
wr := &WatchResponse{
|
wr := &WatchResponse{
|
||||||
Header: *pbresp.Header,
|
Header: *pbresp.Header,
|
||||||
Events: events,
|
Events: events,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user