diff --git a/proxy/grpcproxy/adapter/chan_stream.go b/proxy/grpcproxy/adapter/chan_stream.go index 82e341193..1af514b1f 100644 --- a/proxy/grpcproxy/adapter/chan_stream.go +++ b/proxy/grpcproxy/adapter/chan_stream.go @@ -18,7 +18,9 @@ import ( "context" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) // chanServerStream implements grpc.ServerStream with a chanStream @@ -120,7 +122,7 @@ func (s *chanStream) RecvMsg(m interface{}) error { select { case msg, ok := <-s.recvc: if !ok { - return grpc.ErrClientConnClosing + return status.Error(codes.Canceled, "the client connection is closing") } if err, ok := msg.(error); ok { return err diff --git a/proxy/grpcproxy/lease.go b/proxy/grpcproxy/lease.go index a688d429a..a6e5515ae 100644 --- a/proxy/grpcproxy/lease.go +++ b/proxy/grpcproxy/lease.go @@ -26,7 +26,9 @@ import ( pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) type leaseProxy struct { @@ -214,7 +216,7 @@ func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error case <-lostLeaderC: return rpctypes.ErrNoLeader case <-lp.leader.disconnectNotify(): - return grpc.ErrClientConnClosing + return status.Error(codes.Canceled, "the client connection is closing") default: if err != nil { return err diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 639bf8e2d..8b0c4c003 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -23,8 +23,9 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" - "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) type watchProxy struct { @@ -80,7 +81,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { wp.mu.Unlock() select { case <-wp.leader.disconnectNotify(): - return grpc.ErrClientConnClosing + return status.Error(codes.Canceled, "the client connection is closing") default: return wp.ctx.Err() } @@ -153,7 +154,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { case <-lostLeaderC: return rpctypes.ErrNoLeader case <-wp.leader.disconnectNotify(): - return grpc.ErrClientConnClosing + return status.Error(codes.Canceled, "the client connection is closing") default: return wps.ctx.Err() }