diff --git a/clientv3/client.go b/clientv3/client.go index e5bc4cc00..dec664605 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) var ( @@ -478,14 +479,14 @@ func isHaltErr(ctx context.Context, err error) bool { if err == nil { return false } - code := grpc.Code(err) + ev, _ := status.FromError(err) // Unavailable codes mean the system will be right back. // (e.g., can't connect, lost leader) // Treat Internal codes as if something failed, leaving the // system in an inconsistent state, but retrying could make progress. // (e.g., failed in middle of send, corrupted frame) // TODO: are permanent Internal errors possible from grpc? - return code != codes.Unavailable && code != codes.Internal + return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal } func toErr(ctx context.Context, err error) error { @@ -496,7 +497,8 @@ func toErr(ctx context.Context, err error) error { if _, ok := err.(rpctypes.EtcdError); ok { return err } - code := grpc.Code(err) + ev, _ := status.FromError(err) + code := ev.Code() switch code { case codes.DeadlineExceeded: fallthrough diff --git a/clientv3/leasing/kv.go b/clientv3/leasing/kv.go index 39b38bd40..d44182b10 100644 --- a/clientv3/leasing/kv.go +++ b/clientv3/leasing/kv.go @@ -26,8 +26,8 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc/mvccpb" - "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type leasingKV struct { @@ -282,7 +282,10 @@ func (lkv *leasingKV) acquire(ctx context.Context, key string, op v3.Op) (*v3.Tx return resp, nil } // retry if transient error - if _, ok := err.(rpctypes.EtcdError); ok || grpc.Code(err) != codes.Unavailable { + if _, ok := err.(rpctypes.EtcdError); ok { + return nil, err + } + if ev, _ := status.FromError(err); ev.Code() != codes.Unavailable { return nil, err } } diff --git a/clientv3/retry.go b/clientv3/retry.go index a5d748c24..272b62b92 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -22,6 +22,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type rpcFunc func(ctx context.Context) error @@ -35,12 +36,16 @@ func isReadStopError(err error) bool { return true } // only retry if unavailable - return grpc.Code(err) != codes.Unavailable + ev, _ := status.FromError(err) + return ev.Code() != codes.Unavailable } func isWriteStopError(err error) bool { - return grpc.Code(err) != codes.Unavailable || - grpc.ErrorDesc(err) != "there is no address available" + ev, _ := status.FromError(err) + if ev.Code() != codes.Unavailable { + return true + } + return rpctypes.ErrorDesc(err) != "there is no address available" } func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { diff --git a/integration/v3_grpc_inflight_test.go b/integration/v3_grpc_inflight_test.go index 0c9f892a3..dd0d180cc 100644 --- a/integration/v3_grpc_inflight_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" @@ -79,8 +80,10 @@ func TestV3KVInflightRangeRequests(t *testing.T) { go func() { defer wg.Done() _, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false)) - if err != nil && grpc.ErrorDesc(err) != context.Canceled.Error() { - t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err) + if err != nil { + if err != nil && rpctypes.ErrorDesc(err) != context.Canceled.Error() { + t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err) + } } }() } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 345ce762e..b063124f9 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1778,7 +1778,7 @@ func TestGRPCRequireLeader(t *testing.T) { md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) ctx := metadata.NewOutgoingContext(context.Background(), md) reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} - if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + if _, err := toGRPC(client).KV.Put(ctx, reqput); rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) } } @@ -1809,7 +1809,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) { // existing stream should be rejected _, err = wStream.Recv() - if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) } @@ -1819,7 +1819,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) { t.Fatalf("wAPI.Watch error: %v", err) } _, err = wStream.Recv() - if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) } diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 39df1cabe..82f9be24f 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -25,7 +25,6 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/testutil" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -560,7 +559,7 @@ func TestV3LeaseRequireLeader(t *testing.T) { if err == nil { t.Fatalf("got response %+v, expected error", resp) } - if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) } }() diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go index 2ee91ae10..6e8d3c85b 100644 --- a/proxy/grpcproxy/cluster.go +++ b/proxy/grpcproxy/cluster.go @@ -22,10 +22,10 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/naming" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/time/rate" - "google.golang.org/grpc" gnaming "google.golang.org/grpc/naming" ) @@ -89,7 +89,7 @@ func (cp *clusterProxy) monitor(wa gnaming.Watcher) { ups, err := wa.Next() if err != nil { plog.Warningf("clusterProxy watcher error (%v)", err) - if grpc.ErrorDesc(err) == naming.ErrWatcherClosed.Error() { + if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() { return } } diff --git a/proxy/grpcproxy/leader.go b/proxy/grpcproxy/leader.go index 7dde221c4..042c949b7 100644 --- a/proxy/grpcproxy/leader.go +++ b/proxy/grpcproxy/leader.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "golang.org/x/time/rate" "google.golang.org/grpc" @@ -68,7 +69,7 @@ func (l *leader) recvLoop() { } if cresp.Err() != nil { l.loseLeader() - if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() { + if rpctypes.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() { close(l.disconnc) return } diff --git a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go index 1e95958ce..2df7c1c17 100644 --- a/tools/functional-tester/etcd-runner/command/lease_renewer_command.go +++ b/tools/functional-tester/etcd-runner/command/lease_renewer_command.go @@ -24,8 +24,8 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/spf13/cobra" - "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ( @@ -68,7 +68,8 @@ func runLeaseRenewerFunc(cmd *cobra.Command, args []string) { for { lk, err = c.Lease.KeepAliveOnce(ctx, l.ID) - if grpc.Code(err) == codes.NotFound { + ev, _ := status.FromError(err) + if ev.Code() == codes.NotFound { if time.Since(expire) < 0 { log.Fatalf("bad renew! exceeded: %v", time.Since(expire)) for { diff --git a/tools/functional-tester/etcd-tester/key_stresser.go b/tools/functional-tester/etcd-tester/key_stresser.go index d94a7c7bf..3b29fb199 100644 --- a/tools/functional-tester/etcd-tester/key_stresser.go +++ b/tools/functional-tester/etcd-tester/key_stresser.go @@ -106,7 +106,7 @@ func (s *keyStresser) run(ctx context.Context) { continue } - switch grpc.ErrorDesc(err) { + switch rpctypes.ErrorDesc(err) { case context.DeadlineExceeded.Error(): // This retries when request is triggered at the same time as // leader failure. When we terminate the leader, the request to