diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 42d196ca2..b7e64daf3 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -52,7 +52,7 @@ const ( func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { wp := &watchProxy{ cw: c.Watcher, - ctx: clientv3.WithRequireLeader(c.Ctx()), + ctx: c.Ctx(), retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond), leaderc: make(chan struct{}), } @@ -63,8 +63,9 @@ func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { // a new streams without opening any watchers won't catch // a lost leader event, so have a special watch to monitor it rev := int64((uint64(1) << 63) - 2) + lctx := clientv3.WithRequireLeader(wp.ctx) for wp.ctx.Err() == nil { - wch := wp.cw.Watch(wp.ctx, lostLeaderKey, clientv3.WithRev(rev)) + wch := wp.cw.Watch(lctx, lostLeaderKey, clientv3.WithRev(rev)) for range wch { } wp.mu.Lock() diff --git a/proxy/grpcproxy/watch_broadcast.go b/proxy/grpcproxy/watch_broadcast.go index 5529fb5a2..5e750bdb0 100644 --- a/proxy/grpcproxy/watch_broadcast.go +++ b/proxy/grpcproxy/watch_broadcast.go @@ -50,27 +50,20 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) wb.add(w) go func() { defer close(wb.donec) - // loop because leader loss will close channel - for cctx.Err() == nil { - opts := []clientv3.OpOption{ - clientv3.WithRange(w.wr.end), - clientv3.WithProgressNotify(), - clientv3.WithRev(wb.nextrev), - clientv3.WithPrevKV(), - } - // The create notification should be the first response; - // if the watch is recreated following leader loss, it - // shouldn't post a second create response to the client. - if wb.responses == 0 { - opts = append(opts, clientv3.WithCreatedNotify()) - } - wch := wp.cw.Watch(cctx, w.wr.key, opts...) - for wr := range wch { - wb.bcast(wr) - update(wb) - } - wp.retryLimiter.Wait(cctx) + opts := []clientv3.OpOption{ + clientv3.WithRange(w.wr.end), + clientv3.WithProgressNotify(), + clientv3.WithRev(wb.nextrev), + clientv3.WithPrevKV(), + clientv3.WithCreatedNotify(), + } + + wch := wp.cw.Watch(cctx, w.wr.key, opts...) + + for wr := range wch { + wb.bcast(wr) + update(wb) } }() return wb diff --git a/test b/test index 376b2fa6d..134258e57 100755 --- a/test +++ b/test @@ -134,6 +134,7 @@ function integration_e2e_pass { function grpcproxy_pass { go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration + go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration } function release_pass {