Merge 1282b0421923bbdcc6f11153d1336ff7bc392dfd into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
ximenzaoshi 2024-09-26 22:00:05 +01:00 committed by GitHub
commit bab7917c8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 3 deletions

View File

@ -40,6 +40,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/logutil"
@ -422,6 +423,20 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
return cmux.New(l)
}
func contextPropagationUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
ctx = metadata.NewOutgoingContext(ctx, md)
}
return handler(ctx, req)
}
}
func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
if grpcProxyEnableOrdering {
vf := ordering.NewOrderViolationSwitchEndpointClosure(client)
@ -467,6 +482,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
}
grpcChainUnaryList := []grpc.UnaryServerInterceptor{
grpc_prometheus.UnaryServerInterceptor,
contextPropagationUnaryServerInterceptor(),
}
if grpcProxyEnableLogging {
grpcChainStreamList = append(grpcChainStreamList,

View File

@ -123,7 +123,8 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
// post to stopc => terminate server stream; can't use a waitgroup
// since all goroutines will only terminate after Watch() exits.
stopc := make(chan struct{}, 3)
stopc := make(chan struct{}, 2)
leaderc := make(chan struct{}, 1)
go func() {
defer func() { stopc <- struct{}{} }()
wps.recvLoop()
@ -134,7 +135,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
}()
// tear down watch if leader goes down or entire watch proxy is terminated
go func() {
defer func() { stopc <- struct{}{} }()
defer func() { leaderc <- struct{}{} }()
select {
case <-lostLeaderC:
case <-ctx.Done():
@ -142,7 +143,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
}
}()
<-stopc
<-leaderc
cancel()
// recv/send may only shutdown after function exits;