fix(grpcproxy): support grpc context propagation in grpc proxy;fix the problem that watch exits when recvLoop exits but sendLoop still running

Signed-off-by: ximenzaoshi <wanglu_bx@163.com>
This commit is contained in:
saiwl 2024-05-15 10:42:16 +08:00 committed by ximenzaoshi
parent 7b9013da46
commit 1282b04219
No known key found for this signature in database
GPG Key ID: 8FB607D82F7A671A
2 changed files with 20 additions and 3 deletions

View File

@ -40,6 +40,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/logutil"
@ -422,6 +423,20 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
return cmux.New(l)
}
func contextPropagationUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
ctx = metadata.NewOutgoingContext(ctx, md)
}
return handler(ctx, req)
}
}
func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
if grpcProxyEnableOrdering {
vf := ordering.NewOrderViolationSwitchEndpointClosure(client)
@ -467,6 +482,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
}
grpcChainUnaryList := []grpc.UnaryServerInterceptor{
grpc_prometheus.UnaryServerInterceptor,
contextPropagationUnaryServerInterceptor(),
}
if grpcProxyEnableLogging {
grpcChainStreamList = append(grpcChainStreamList,

View File

@ -123,7 +123,8 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
// post to stopc => terminate server stream; can't use a waitgroup
// since all goroutines will only terminate after Watch() exits.
stopc := make(chan struct{}, 3)
stopc := make(chan struct{}, 2)
leaderc := make(chan struct{}, 1)
go func() {
defer func() { stopc <- struct{}{} }()
wps.recvLoop()
@ -134,7 +135,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
}()
// tear down watch if leader goes down or entire watch proxy is terminated
go func() {
defer func() { stopc <- struct{}{} }()
defer func() { leaderc <- struct{}{} }()
select {
case <-lostLeaderC:
case <-ctx.Done():
@ -142,7 +143,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
}
}()
<-stopc
<-leaderc
cancel()
// recv/send may only shutdown after function exits;