mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12030 from tangcong/fix-grpc-proxy-hang
proxy/grpcproxy: fix grpc proxy hang when broadcast failed to cancel a watcher
This commit is contained in:
commit
8f19fecb82
@ -182,6 +182,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
|
||||
|
||||
- Fix [`panic on error`](https://github.com/etcd-io/etcd/pull/11694) for metrics handler.
|
||||
- Add [gRPC keepalive related flags](https://github.com/etcd-io/etcd/pull/11711) `grpc-keepalive-min-time`, `grpc-keepalive-interval` and `grpc-keepalive-timeout`.
|
||||
- [Fix grpc watch proxy hangs when failed to cancel a watcher](https://github.com/etcd-io/etcd/pull/12030) .
|
||||
|
||||
### Auth
|
||||
|
||||
|
@ -357,7 +357,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
|
||||
}
|
||||
|
||||
kvp, _ := grpcproxy.NewKvProxy(client)
|
||||
watchp, _ := grpcproxy.NewWatchProxy(client)
|
||||
watchp, _ := grpcproxy.NewWatchProxy(lg, client)
|
||||
if grpcProxyResolverPrefix != "" {
|
||||
grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
@ -44,9 +45,10 @@ type watchProxy struct {
|
||||
|
||||
// kv is used for permission checking
|
||||
kv clientv3.KV
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
||||
func NewWatchProxy(lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
||||
cctx, cancel := context.WithCancel(c.Ctx())
|
||||
wp := &watchProxy{
|
||||
cw: c.Watcher,
|
||||
@ -54,6 +56,7 @@ func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
||||
leader: newLeader(c.Ctx(), c.Watcher),
|
||||
|
||||
kv: c.KV, // for permission checking
|
||||
lg: lg,
|
||||
}
|
||||
wp.ranges = newWatchRanges(wp)
|
||||
ch := make(chan struct{})
|
||||
@ -99,6 +102,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
kv: wp.kv,
|
||||
lg: wp.lg,
|
||||
}
|
||||
|
||||
var lostLeaderC <-chan struct{}
|
||||
@ -181,6 +185,7 @@ type watchProxyStream struct {
|
||||
|
||||
// kv is used for permission checking
|
||||
kv clientv3.KV
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func (wps *watchProxyStream) close() {
|
||||
@ -262,8 +267,10 @@ func (wps *watchProxyStream) recvLoop() error {
|
||||
wps.watchers[w.id] = w
|
||||
wps.ranges.add(w)
|
||||
wps.mu.Unlock()
|
||||
wps.lg.Debug("create watcher", zap.String("key", w.wr.key), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID))
|
||||
case *pb.WatchRequest_CancelRequest:
|
||||
wps.delete(uv.CancelRequest.WatchId)
|
||||
wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
|
||||
default:
|
||||
panic("not implemented")
|
||||
}
|
||||
|
@ -17,9 +17,12 @@ package grpcproxy
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/v3/clientv3"
|
||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// watchBroadcast broadcasts a server watcher to many client watchers.
|
||||
@ -36,15 +39,17 @@ type watchBroadcast struct {
|
||||
receivers map[*watcher]struct{}
|
||||
// responses counts the number of responses
|
||||
responses int
|
||||
lg *zap.Logger
|
||||
}
|
||||
|
||||
func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
|
||||
func newWatchBroadcast(lg *zap.Logger, wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
|
||||
cctx, cancel := context.WithCancel(wp.ctx)
|
||||
wb := &watchBroadcast{
|
||||
cancel: cancel,
|
||||
nextrev: w.nextrev,
|
||||
receivers: make(map[*watcher]struct{}),
|
||||
donec: make(chan struct{}),
|
||||
lg: lg,
|
||||
}
|
||||
wb.add(w)
|
||||
go func() {
|
||||
@ -61,6 +66,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
|
||||
cctx = withClientAuthToken(cctx, w.wps.stream.Context())
|
||||
|
||||
wch := wp.cw.Watch(cctx, w.wr.key, opts...)
|
||||
wp.lg.Debug("watch", zap.String("key", w.wr.key))
|
||||
|
||||
for wr := range wch {
|
||||
wb.bcast(wr)
|
||||
@ -148,5 +154,13 @@ func (wb *watchBroadcast) stop() {
|
||||
}
|
||||
|
||||
wb.cancel()
|
||||
<-wb.donec
|
||||
|
||||
select {
|
||||
case <-wb.donec:
|
||||
// watchProxyStream will hold watchRanges global mutex lock all the time if client failed to cancel etcd watchers.
|
||||
// and it will cause the watch proxy to not work.
|
||||
// please see pr https://github.com/etcd-io/etcd/pull/12030 to get more detail info.
|
||||
case <-time.After(time.Second):
|
||||
wb.lg.Error("failed to cancel etcd watcher")
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ func (wbs *watchBroadcasts) add(w *watcher) {
|
||||
}
|
||||
}
|
||||
// no fit; create a bcast
|
||||
wb := newWatchBroadcast(wbs.wp, w, wbs.update)
|
||||
wb := newWatchBroadcast(wbs.wp.lg, wbs.wp, w, wbs.update)
|
||||
wbs.watchers[w] = wb
|
||||
wbs.bcasts[wb] = struct{}{}
|
||||
}
|
||||
|
@ -123,6 +123,7 @@ func (w *watcher) post(wr *pb.WatchResponse) bool {
|
||||
case w.wps.watchCh <- wr:
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
w.wps.cancel()
|
||||
w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
Loading…
x
Reference in New Issue
Block a user