diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 5946361b8..7471afe07 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -40,6 +40,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/logutil" @@ -422,6 +423,20 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { return cmux.New(l) } +func contextPropagationUnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (interface{}, error) { + if md, ok := metadata.FromIncomingContext(ctx); ok { + ctx = metadata.NewOutgoingContext(ctx, md) + } + return handler(ctx, req) + } +} + func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { if grpcProxyEnableOrdering { vf := ordering.NewOrderViolationSwitchEndpointClosure(client) @@ -467,6 +482,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { } grpcChainUnaryList := []grpc.UnaryServerInterceptor{ grpc_prometheus.UnaryServerInterceptor, + contextPropagationUnaryServerInterceptor(), } if grpcProxyEnableLogging { grpcChainStreamList = append(grpcChainStreamList, diff --git a/server/proxy/grpcproxy/watch.go b/server/proxy/grpcproxy/watch.go index 90eb21d4a..ca1ff85f8 100644 --- a/server/proxy/grpcproxy/watch.go +++ b/server/proxy/grpcproxy/watch.go @@ -123,7 +123,8 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { // post to stopc => terminate server stream; can't use a waitgroup // since all goroutines will only terminate after Watch() exits. - stopc := make(chan struct{}, 3) + stopc := make(chan struct{}, 2) + leaderc := make(chan struct{}, 1) go func() { defer func() { stopc <- struct{}{} }() wps.recvLoop() @@ -134,7 +135,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { }() // tear down watch if leader goes down or entire watch proxy is terminated go func() { - defer func() { stopc <- struct{}{} }() + defer func() { leaderc <- struct{}{} }() select { case <-lostLeaderC: case <-ctx.Done(): @@ -142,7 +143,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { } }() - <-stopc + <-leaderc cancel() // recv/send may only shutdown after function exits;