From 62f8ec25c0cfc45efaf2aa5a32eeed0399cdf38c Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 7 Jun 2016 17:03:48 -0700 Subject: [PATCH] clientv3: use grpc reconnection logic --- clientv3/auth.go | 29 ++-- clientv3/client.go | 230 ++++--------------------- clientv3/cluster.go | 50 ++---- clientv3/config.go | 8 - clientv3/integration/kv_test.go | 33 ++-- clientv3/integration/lease_test.go | 13 +- clientv3/integration/txn_test.go | 42 ++--- clientv3/integration/watch_test.go | 41 +++-- clientv3/kv.go | 59 ++----- clientv3/lease.go | 61 ++----- clientv3/maintenance.go | 49 ++---- clientv3/remote_client.go | 108 ------------ clientv3/txn.go | 17 +- clientv3/watch.go | 23 +-- etcdserver/api/v3rpc/rpctypes/error.go | 2 - 15 files changed, 171 insertions(+), 594 deletions(-) delete mode 100644 clientv3/remote_client.go diff --git a/clientv3/auth.go b/clientv3/auth.go index 8d013400e..a1fb49250 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -19,7 +19,6 @@ import ( "strings" "github.com/coreos/etcd/auth/authpb" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" "google.golang.org/grpc" @@ -109,47 +108,47 @@ func NewAuth(c *Client) Auth { func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) - return (*AuthEnableResponse)(resp), rpctypes.Error(err) + return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) - return (*AuthDisableResponse)(resp), rpctypes.Error(err) + return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) - return (*AuthUserAddResponse)(resp), rpctypes.Error(err) + return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) - return (*AuthUserDeleteResponse)(resp), rpctypes.Error(err) + return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) - return (*AuthUserChangePasswordResponse)(resp), rpctypes.Error(err) + return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) - return (*AuthUserGrantRoleResponse)(resp), rpctypes.Error(err) + return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}) - return (*AuthUserGetResponse)(resp), rpctypes.Error(err) + return (*AuthUserGetResponse)(resp), toErr(ctx, err) } func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) - return (*AuthUserRevokeRoleResponse)(resp), rpctypes.Error(err) + return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) - return (*AuthRoleAddResponse)(resp), rpctypes.Error(err) + return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) { @@ -158,22 +157,22 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key stri PermType: authpb.Permission_Type(permType), } resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}) - return (*AuthRoleGrantPermissionResponse)(resp), rpctypes.Error(err) + return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}) - return (*AuthRoleGetResponse)(resp), rpctypes.Error(err) + return (*AuthRoleGetResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key string) (*AuthRoleRevokePermissionResponse, error) { resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key}) - return (*AuthRoleRevokePermissionResponse)(resp), rpctypes.Error(err) + return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) - return (*AuthRoleDeleteResponse)(resp), rpctypes.Error(err) + return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } func StrToPermissionType(s string) (PermissionType, error) { @@ -191,7 +190,7 @@ type authenticator struct { func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) - return (*AuthenticateResponse)(resp), rpctypes.Error(err) + return (*AuthenticateResponse)(resp), toErr(ctx, err) } func (auth *authenticator) close() { diff --git a/clientv3/client.go b/clientv3/client.go index 349014d14..b1abeb503 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -23,13 +23,11 @@ import ( "net" "net/url" "strings" - "sync" "time" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "golang.org/x/net/context" - "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" @@ -37,9 +35,6 @@ import ( var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") - - // minConnRetryWait is the minimum time between reconnects to avoid flooding - minConnRetryWait = time.Second ) // Client provides and manages an etcd v3 client session. @@ -54,20 +49,10 @@ type Client struct { conn *grpc.ClientConn cfg Config creds *credentials.TransportAuthenticator - mu sync.RWMutex // protects connection selection and error list - errors []error // errors passed to retryConnection ctx context.Context cancel context.CancelFunc - // fields below are managed by connMonitor - - // reconnc accepts writes which signal the client should reconnect - reconnc chan error - // newconnc is closed on successful connect and set to a fresh channel - newconnc chan struct{} - lastConnErr error - // Username is a username for authentication Username string // Password is a password for authentication @@ -76,9 +61,6 @@ type Client struct { // New creates a new etcdv3 client from a given configuration. func New(cfg Config) (*Client, error) { - if cfg.retryDialer == nil { - cfg.retryDialer = dialEndpointList - } if len(cfg.Endpoints) == 0 { return nil, ErrNoAvailableEndpoints } @@ -101,39 +83,9 @@ func NewFromConfigFile(path string) (*Client, error) { } // Close shuts down the client's etcd connections. -func (c *Client) Close() (err error) { - c.mu.Lock() - defer c.mu.Unlock() - - // acquire the cancel - if c.cancel == nil { - // already canceled - if c.lastConnErr != c.ctx.Err() { - err = c.lastConnErr - } - return - } - cancel := c.cancel - c.cancel = nil - c.mu.Unlock() - - // close watcher and lease before terminating connection - // so they don't retry on a closed client - c.Watcher.Close() - c.Lease.Close() - - // cancel reconnection loop - cancel() - c.mu.Lock() - connc := c.newconnc - c.mu.Unlock() - // connc on cancel() is left closed - <-connc - c.mu.Lock() - if c.lastConnErr != c.ctx.Err() { - err = c.lastConnErr - } - return +func (c *Client) Close() error { + c.cancel() + return toErr(c.ctx, c.conn.Close()) } // Ctx is a context for "out of band" messages (e.g., for sending @@ -144,15 +96,6 @@ func (c *Client) Ctx() context.Context { return c.ctx } // Endpoints lists the registered endpoints for the client. func (c *Client) Endpoints() []string { return c.cfg.Endpoints } -// Errors returns all errors that have been observed since called last. -func (c *Client) Errors() (errs []error) { - c.mu.Lock() - defer c.mu.Unlock() - errs = c.errors - c.errors = nil - return errs -} - type authTokenCredential struct { token string } @@ -241,7 +184,7 @@ func WithRequireLeader(ctx context.Context) context.Context { func newClient(cfg *Config) (*Client, error) { if cfg == nil { - cfg = &Config{retryDialer: dialEndpointList} + cfg = &Config{} } var creds *credentials.TransportAuthenticator if cfg.TLS != nil { @@ -251,26 +194,23 @@ func newClient(cfg *Config) (*Client, error) { // use a temporary skeleton client to bootstrap first connection ctx, cancel := context.WithCancel(context.TODO()) - conn, err := cfg.retryDialer(&Client{cfg: *cfg, creds: creds, ctx: ctx, Username: cfg.Username, Password: cfg.Password}) - if err != nil { - return nil, err - } client := &Client{ - conn: conn, - cfg: *cfg, - creds: creds, - ctx: ctx, - cancel: cancel, - reconnc: make(chan error, 1), - newconnc: make(chan struct{}), + conn: nil, + cfg: *cfg, + creds: creds, + ctx: ctx, + cancel: cancel, } - if cfg.Username != "" && cfg.Password != "" { client.Username = cfg.Username client.Password = cfg.Password } - - go client.connMonitor() + // TODO: use grpc balancer + conn, err := client.Dial(cfg.Endpoints[0]) + if err != nil { + return nil, err + } + client.conn = conn client.Cluster = NewCluster(client) client.KV = NewKV(client) @@ -289,126 +229,30 @@ func newClient(cfg *Config) (*Client, error) { } // ActiveConnection returns the current in-use connection -func (c *Client) ActiveConnection() *grpc.ClientConn { - c.mu.RLock() - defer c.mu.RUnlock() - if c.conn == nil { - panic("trying to return nil active connection") - } - return c.conn -} - -// retryConnection establishes a new connection -func (c *Client) retryConnection(err error) { - oldconn := c.conn - - // return holding lock so old connection can be cleaned up in this defer - defer func() { - if oldconn != nil { - oldconn.Close() - if st, _ := oldconn.State(); st != grpc.Shutdown { - // wait so grpc doesn't leak sleeping goroutines - oldconn.WaitForStateChange(context.Background(), st) - } - } - c.mu.Unlock() - }() - - c.mu.Lock() - if err != nil { - c.errors = append(c.errors, err) - } - if c.cancel == nil { - // client has called Close() so don't try to dial out - return - } - c.mu.Unlock() - - nc, dialErr := c.cfg.retryDialer(c) - - c.mu.Lock() - if nc != nil { - c.conn = nc - } - if dialErr != nil { - c.errors = append(c.errors, dialErr) - } - c.lastConnErr = dialErr -} - -// connStartRetry schedules a reconnect if one is not already running -func (c *Client) connStartRetry(err error) { - c.mu.Lock() - ch := c.reconnc - defer c.mu.Unlock() - select { - case ch <- err: - default: - } -} - -// connWait waits for a reconnect to be processed -func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) { - c.mu.RLock() - ch := c.newconnc - c.mu.RUnlock() - c.connStartRetry(err) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-ch: - } - c.mu.RLock() - defer c.mu.RUnlock() - if c.cancel == nil { - return c.conn, rpctypes.ErrConnClosed - } - return c.conn, c.lastConnErr -} - -// connMonitor monitors the connection and handles retries -func (c *Client) connMonitor() { - var err error - - defer func() { - c.retryConnection(c.ctx.Err()) - close(c.newconnc) - }() - - limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1) - for limiter.Wait(c.ctx) == nil { - select { - case err = <-c.reconnc: - case <-c.ctx.Done(): - return - } - c.retryConnection(err) - c.mu.Lock() - close(c.newconnc) - c.newconnc = make(chan struct{}) - c.reconnc = make(chan error, 1) - c.mu.Unlock() - } -} - -// dialEndpointList attempts to connect to each endpoint in order until a -// connection is established. -func dialEndpointList(c *Client) (*grpc.ClientConn, error) { - var err error - for _, ep := range c.Endpoints() { - conn, curErr := c.Dial(ep) - if curErr != nil { - err = curErr - } else { - return conn, nil - } - } - return nil, err -} +func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn } // isHaltErr returns true if the given error and context indicate no forward // progress can be made, even after reconnecting. func isHaltErr(ctx context.Context, err error) bool { - isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ") - return isRPCError || ctx.Err() != nil || err == rpctypes.ErrConnClosed + if ctx != nil && ctx.Err() != nil { + return true + } + if err == nil { + return false + } + return strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ") || + strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) +} + +func toErr(ctx context.Context, err error) error { + if err == nil { + return nil + } + err = rpctypes.Error(err) + if ctx.Err() != nil && strings.Contains(err.Error(), "context") { + err = ctx.Err() + } else if strings.Contains(err.Error(), grpc.ErrClientConnClosing.Error()) { + err = grpc.ErrClientConnClosing + } + return err } diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 22a17843c..b981e0310 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -15,10 +15,8 @@ package clientv3 import ( - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" - "google.golang.org/grpc" ) type ( @@ -44,62 +42,47 @@ type Cluster interface { } type cluster struct { - rc *remoteClient remote pb.ClusterClient } func NewCluster(c *Client) Cluster { - ret := &cluster{} - f := func(conn *grpc.ClientConn) { ret.remote = pb.NewClusterClient(conn) } - ret.rc = newRemoteClient(c, f) - return ret + return &cluster{remote: pb.NewClusterClient(c.conn)} } func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { r := &pb.MemberAddRequest{PeerURLs: peerAddrs} - resp, err := c.getRemote().MemberAdd(ctx, r) + resp, err := c.remote.MemberAdd(ctx, r) if err == nil { return (*MemberAddResponse)(resp), nil } - if isHaltErr(ctx, err) { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } - - c.rc.reconnect(err) - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { r := &pb.MemberRemoveRequest{ID: id} - resp, err := c.getRemote().MemberRemove(ctx, r) + resp, err := c.remote.MemberRemove(ctx, r) if err == nil { return (*MemberRemoveResponse)(resp), nil } - if isHaltErr(ctx, err) { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } - - c.rc.reconnect(err) - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { // it is safe to retry on update. for { r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.getRemote().MemberUpdate(ctx, r) + resp, err := c.remote.MemberUpdate(ctx, r) if err == nil { return (*MemberUpdateResponse)(resp), nil } - if isHaltErr(ctx, err) { - return nil, rpctypes.Error(err) - } - - if err = c.rc.reconnectWait(ctx, err); err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } } } @@ -107,23 +90,12 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { // it is safe to retry on list. for { - resp, err := c.getRemote().MemberList(ctx, &pb.MemberListRequest{}) + resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}) if err == nil { return (*MemberListResponse)(resp), nil } - if isHaltErr(ctx, err) { - return nil, rpctypes.Error(err) - } - - if err = c.rc.reconnectWait(ctx, err); err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } } } - -func (c *cluster) getRemote() pb.ClusterClient { - c.rc.mu.Lock() - defer c.rc.mu.Unlock() - return c.remote -} diff --git a/clientv3/config.go b/clientv3/config.go index 050ce1ce6..066b41ece 100644 --- a/clientv3/config.go +++ b/clientv3/config.go @@ -22,20 +22,12 @@ import ( "github.com/coreos/etcd/pkg/tlsutil" "github.com/ghodss/yaml" - "google.golang.org/grpc" ) -// EndpointDialer is a policy for choosing which endpoint to dial next -type EndpointDialer func(*Client) (*grpc.ClientConn, error) - type Config struct { // Endpoints is a list of URLs Endpoints []string - // retryDialer chooses the next endpoint to use - // keep private until the grpc rebalancer is sorted out - retryDialer EndpointDialer - // DialTimeout is the timeout for failing to establish a connection. DialTimeout time.Duration diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index e9b320756..e5a65ab80 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -27,6 +27,7 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/testutil" "golang.org/x/net/context" + "google.golang.org/grpc" ) func TestKVPutError(t *testing.T) { @@ -299,8 +300,8 @@ func TestKVGetErrConnClosed(t *testing.T) { go func() { defer close(donec) _, err := kv.Get(context.TODO(), "foo") - if err != nil && err != rpctypes.ErrConnClosed { - t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err) + if err != nil && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) } }() @@ -331,8 +332,8 @@ func TestKVNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { kv := clientv3.NewKV(cli) - if _, err := kv.Get(context.TODO(), "foo"); err != rpctypes.ErrConnClosed { - t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err) + if _, err := kv.Get(context.TODO(), "foo"); err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) } close(donec) }() @@ -579,11 +580,10 @@ func TestKVPutFailGetRetry(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.Client(0)) - ctx := context.TODO() - clus.Members[0].Stop(t) - <-clus.Members[0].StopNotify() + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() _, err := kv.Put(ctx, "foo", "bar") if err == nil { t.Fatalf("got success on disconnected put, wanted error") @@ -592,7 +592,7 @@ func TestKVPutFailGetRetry(t *testing.T) { donec := make(chan struct{}) go func() { // Get will fail, but reconnect will trigger - gresp, gerr := kv.Get(ctx, "foo") + gresp, gerr := kv.Get(context.TODO(), "foo") if gerr != nil { t.Fatal(gerr) } @@ -642,20 +642,11 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { defer clus.Terminate(t) cli := clus.Client(0) clus.Members[0].Stop(t) + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) // this Put fails and triggers an asynchronous connection retry - _, err := cli.Put(context.TODO(), "abc", "123") - if err == nil || - (!strings.Contains(err.Error(), "connection is closing") && - !strings.Contains(err.Error(), "transport is closing")) { - t.Fatal(err) - } - - // wait some so the client closes with the retry in-flight - time.Sleep(time.Second) - - // get the timeout - clus.TakeClient(0) - if err := cli.Close(); err == nil || !strings.Contains(err.Error(), "timed out") { + _, err := cli.Put(ctx, "abc", "123") + cancel() + if !strings.Contains(err.Error(), "context deadline") { t.Fatal(err) } } diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 11af4a05c..6f781b0e0 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" "golang.org/x/net/context" + "google.golang.org/grpc" ) func TestLeaseNotFoundError(t *testing.T) { @@ -262,8 +263,8 @@ func TestLeaseGrantErrConnClosed(t *testing.T) { go func() { defer close(donec) _, err := le.Grant(context.TODO(), 5) - if err != nil && err != rpctypes.ErrConnClosed { - t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err) + if err != nil && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) } }() @@ -294,8 +295,8 @@ func TestLeaseGrantNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { le := clientv3.NewLease(cli) - if _, err := le.Grant(context.TODO(), 5); err != rpctypes.ErrConnClosed { - t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err) + if _, err := le.Grant(context.TODO(), 5); err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) } close(donec) }() @@ -327,8 +328,8 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) { donec := make(chan struct{}) go func() { - if _, err := le.Revoke(context.TODO(), leaseID); err != rpctypes.ErrConnClosed { - t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err) + if _, err := le.Revoke(context.TODO(), leaseID); err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) } close(donec) }() diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index 68633e6fa..3969c8bf0 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -58,57 +58,45 @@ func TestTxnWriteFail(t *testing.T) { defer clus.Terminate(t) kv := clientv3.NewKV(clus.Client(0)) - ctx := context.TODO() clus.Members[0].Stop(t) - <-clus.Members[0].StopNotify() - donec := make(chan struct{}) + txnc, getc := make(chan struct{}), make(chan struct{}) go func() { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() resp, err := kv.Txn(ctx).Then(clientv3.OpPut("foo", "bar")).Commit() if err == nil { t.Fatalf("expected error, got response %v", resp) } - donec <- struct{}{} + close(txnc) }() - dialTimeout := 5 * time.Second - select { - case <-time.After(dialTimeout + time.Second): - t.Fatalf("timed out waiting for txn to fail") - case <-donec: - // don't restart cluster until txn errors out - } - go func() { - // reconnect so terminate doesn't complain about double-close - clus.Members[0].Restart(t) - // wait for etcdserver to get established (CI races and get req times out) - time.Sleep(2 * time.Second) - donec <- struct{}{} - + select { + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for txn fail") + case <-txnc: + } // and ensure the put didn't take - gresp, gerr := kv.Get(ctx, "foo") + gresp, gerr := clus.Client(1).Get(context.TODO(), "foo") if gerr != nil { t.Fatal(gerr) } if len(gresp.Kvs) != 0 { t.Fatalf("expected no keys, got %v", gresp.Kvs) } - donec <- struct{}{} + close(getc) }() select { case <-time.After(5 * time.Second): - t.Fatalf("timed out waiting for restart") - case <-donec: + t.Fatalf("timed out waiting for get") + case <-getc: } - select { - case <-time.After(5 * time.Second): - t.Fatalf("timed out waiting for get") - case <-donec: - } + // reconnect so terminate doesn't complain about double-close + clus.Members[0].Restart(t) } func TestTxnReadRetry(t *testing.T) { diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 30f6b7265..e16fb421e 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -16,6 +16,7 @@ package integration import ( "fmt" + "math/rand" "reflect" "sort" "testing" @@ -28,16 +29,19 @@ import ( mvccpb "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/testutil" "golang.org/x/net/context" + "google.golang.org/grpc" ) type watcherTest func(*testing.T, *watchctx) type watchctx struct { - clus *integration.ClusterV3 - w clientv3.Watcher - wclient *clientv3.Client - kv clientv3.KV - ch clientv3.WatchChan + clus *integration.ClusterV3 + w clientv3.Watcher + wclient *clientv3.Client + kv clientv3.KV + wclientMember int + kvMember int + ch clientv3.WatchChan } func runWatchTest(t *testing.T, f watcherTest) { @@ -46,18 +50,20 @@ func runWatchTest(t *testing.T, f watcherTest) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - wclient := clus.RandClient() + wclientMember := rand.Intn(3) + wclient := clus.Client(wclientMember) w := clientv3.NewWatcher(wclient) defer w.Close() // select a different client from wclient so puts succeed if // a test knocks out the watcher client - kvclient := clus.RandClient() - for kvclient == wclient { - kvclient = clus.RandClient() + kvMember := rand.Intn(3) + for kvMember == wclientMember { + kvMember = rand.Intn(3) } + kvclient := clus.Client(kvMember) kv := clientv3.NewKV(kvclient) - wctx := &watchctx{clus, w, wclient, kv, nil} + wctx := &watchctx{clus, w, wclient, kv, wclientMember, kvMember, nil} f(t, wctx) } @@ -185,7 +191,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) { defer close(donec) // take down watcher connection for { - wctx.wclient.ActiveConnection().Close() + wctx.clus.Members[wctx.wclientMember].DropConnections() select { case <-timer: // spinning on close may live lock reconnection @@ -219,8 +225,7 @@ func testWatchReconnInit(t *testing.T, wctx *watchctx) { if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { t.Fatalf("expected non-nil channel") } - // take down watcher connection - wctx.wclient.ActiveConnection().Close() + wctx.clus.Members[wctx.wclientMember].DropConnections() // watcher should recover putAndWatch(t, wctx, "a", "a") } @@ -237,7 +242,7 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) { } putAndWatch(t, wctx, "a", "a") // take down watcher connection - wctx.wclient.ActiveConnection().Close() + wctx.clus.Members[wctx.wclientMember].DropConnections() // watcher should recover putAndWatch(t, wctx, "a", "b") } @@ -572,8 +577,8 @@ func TestWatchErrConnClosed(t *testing.T) { go func() { defer close(donec) wc.Watch(context.TODO(), "foo") - if err := wc.Close(); err != nil && err != rpctypes.ErrConnClosed { - t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err) + if err := wc.Close(); err != nil && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) } }() @@ -605,8 +610,8 @@ func TestWatchAfterClose(t *testing.T) { go func() { wc := clientv3.NewWatcher(cli) wc.Watch(context.TODO(), "foo") - if err := wc.Close(); err != nil && err != rpctypes.ErrConnClosed { - t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err) + if err := wc.Close(); err != nil && err != grpc.ErrClientConnClosing { + t.Fatalf("expected %v, got %v", grpc.ErrClientConnClosing, err) } close(donec) }() diff --git a/clientv3/kv.go b/clientv3/kv.go index 41ac5113f..82d4ed159 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -15,10 +15,8 @@ package clientv3 import ( - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" - "google.golang.org/grpc" ) type ( @@ -78,47 +76,33 @@ func (op OpResponse) Get() *GetResponse { return op.get } func (op OpResponse) Del() *DeleteResponse { return op.del } type kv struct { - rc *remoteClient remote pb.KVClient } func NewKV(c *Client) KV { - ret := &kv{} - f := func(conn *grpc.ClientConn) { ret.remote = pb.NewKVClient(conn) } - ret.rc = newRemoteClient(c, f) - return ret + return &kv{remote: pb.NewKVClient(c.conn)} } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { r, err := kv.Do(ctx, OpPut(key, val, opts...)) - return r.put, rpctypes.Error(err) + return r.put, toErr(ctx, err) } func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) { r, err := kv.Do(ctx, OpGet(key, opts...)) - return r.get, rpctypes.Error(err) + return r.get, toErr(ctx, err) } func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) { r, err := kv.Do(ctx, OpDelete(key, opts...)) - return r.del, rpctypes.Error(err) + return r.del, toErr(ctx, err) } func (kv *kv) Compact(ctx context.Context, rev int64) error { - remote, err := kv.getRemote(ctx) - if err != nil { - return rpctypes.Error(err) + if _, err := kv.remote.Compact(ctx, &pb.CompactionRequest{Revision: rev}); err != nil { + return toErr(ctx, err) } - defer kv.rc.release() - _, err = remote.Compact(ctx, &pb.CompactionRequest{Revision: rev}) - if err == nil { - return nil - } - if isHaltErr(ctx, err) { - return rpctypes.Error(err) - } - kv.rc.reconnect(err) - return rpctypes.Error(err) + return nil } func (kv *kv) Txn(ctx context.Context) Txn { @@ -135,26 +119,17 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { return resp, nil } if isHaltErr(ctx, err) { - return resp, rpctypes.Error(err) + return resp, toErr(ctx, err) } // do not retry on modifications if op.isWrite() { - kv.rc.reconnect(err) - return resp, rpctypes.Error(err) - } - if nerr := kv.rc.reconnectWait(ctx, err); nerr != nil { - return resp, rpctypes.Error(nerr) + return resp, toErr(ctx, err) } } } func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { - remote, err := kv.getRemote(ctx) - if err != nil { - return OpResponse{}, err - } - defer kv.rc.release() - + var err error switch op.t { // TODO: handle other ops case tRange: @@ -165,21 +140,21 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target) } - resp, err = remote.Range(ctx, r) + resp, err = kv.remote.Range(ctx, r) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} - resp, err = remote.Put(ctx, r) + resp, err = kv.remote.Put(ctx, r) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end} - resp, err = remote.DeleteRange(ctx, r) + resp, err = kv.remote.DeleteRange(ctx, r) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } @@ -188,11 +163,3 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { } return OpResponse{}, err } - -// getRemote must be followed by kv.rc.release() call. -func (kv *kv) getRemote(ctx context.Context) (pb.KVClient, error) { - if err := kv.rc.acquire(ctx); err != nil { - return nil, err - } - return kv.remote, nil -} diff --git a/clientv3/lease.go b/clientv3/lease.go index 53ceba627..83763cae3 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -21,7 +21,6 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" - "google.golang.org/grpc" ) type ( @@ -76,7 +75,6 @@ type lessor struct { // donec is closed when recvKeepAliveLoop stops donec chan struct{} - rc *remoteClient remote pb.LeaseClient stream pb.Lease_LeaseKeepAliveClient @@ -102,14 +100,10 @@ func NewLease(c *Client) Lease { l := &lessor{ donec: make(chan struct{}), keepAlives: make(map[LeaseID]*keepAlive), + remote: pb.NewLeaseClient(c.conn), } - f := func(conn *grpc.ClientConn) { l.remote = pb.NewLeaseClient(conn) } - l.rc = newRemoteClient(c, f) - l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) - go l.recvKeepAliveLoop() - return l } @@ -120,7 +114,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err for { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.getRemote().LeaseGrant(cctx, r) + resp, err := l.remote.LeaseGrant(cctx, r) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -131,10 +125,9 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err return gresp, nil } if isHaltErr(cctx, err) { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } - - if nerr := l.switchRemoteAndStream(err); nerr != nil { + if nerr := l.newStream(); nerr != nil { return nil, nerr } } @@ -147,16 +140,15 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, for { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.getRemote().LeaseRevoke(cctx, r) + resp, err := l.remote.LeaseRevoke(cctx, r) if err == nil { return (*LeaseRevokeResponse)(resp), nil } if isHaltErr(ctx, err) { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } - - if nerr := l.switchRemoteAndStream(err); nerr != nil { + if nerr := l.newStream(); nerr != nil { return nil, nerr } } @@ -202,11 +194,10 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive return resp, err } if isHaltErr(ctx, err) { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } - nerr := l.switchRemoteAndStream(err) - if nerr != nil { + if nerr := l.newStream(); nerr != nil { return nil, nerr } } @@ -254,19 +245,19 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.getRemote().LeaseKeepAlive(cctx) + stream, err := l.remote.LeaseKeepAlive(cctx) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } resp, rerr := stream.Recv() if rerr != nil { - return nil, rpctypes.Error(rerr) + return nil, toErr(ctx, rerr) } karesp := &LeaseKeepAliveResponse{ @@ -304,7 +295,7 @@ func (l *lessor) recvKeepAliveLoop() { // resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { - if err := l.switchRemoteAndStream(nil); err != nil { + if err := l.newStream(); err != nil { return nil, err } stream := l.getKeepAliveStream() @@ -380,38 +371,18 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { } } -func (l *lessor) getRemote() pb.LeaseClient { - l.rc.mu.Lock() - defer l.rc.mu.Unlock() - return l.remote -} - func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient { l.mu.Lock() defer l.mu.Unlock() return l.stream } -func (l *lessor) switchRemoteAndStream(prevErr error) error { - for { - if prevErr != nil { - err := l.rc.reconnectWait(l.stopCtx, prevErr) - if err != nil { - return rpctypes.Error(err) - } - } - if prevErr = l.newStream(); prevErr == nil { - return nil - } - } -} - func (l *lessor) newStream() error { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.getRemote().LeaseKeepAlive(sctx) + stream, err := l.remote.LeaseKeepAlive(sctx) if err != nil { cancel() - return rpctypes.Error(err) + return toErr(sctx, err) } l.mu.Lock() diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 2e0a10fe3..1c9647ec1 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -17,10 +17,8 @@ package clientv3 import ( "io" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" - "google.golang.org/grpc" ) type ( @@ -54,17 +52,12 @@ type Maintenance interface { } type maintenance struct { - c *Client - - rc *remoteClient + c *Client remote pb.MaintenanceClient } func NewMaintenance(c *Client) Maintenance { - ret := &maintenance{c: c} - f := func(conn *grpc.ClientConn) { ret.remote = pb.NewMaintenanceClient(conn) } - ret.rc = newRemoteClient(c, f) - return ret + return &maintenance{c: c, remote: pb.NewMaintenanceClient(c.conn)} } func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { @@ -74,15 +67,12 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { Alarm: pb.AlarmType_NONE, // all } for { - resp, err := m.getRemote().Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req) if err == nil { return (*AlarmResponse)(resp), nil } if isHaltErr(ctx, err) { - return nil, rpctypes.Error(err) - } - if err = m.rc.reconnectWait(ctx, err); err != nil { - return nil, err + return nil, toErr(ctx, err) } } } @@ -97,38 +87,36 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE { ar, err := m.AlarmList(ctx) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } ret := AlarmResponse{} for _, am := range ar.Alarms { dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am)) if derr != nil { - return nil, rpctypes.Error(derr) + return nil, toErr(ctx, derr) } ret.Alarms = append(ret.Alarms, dresp.Alarms...) } return &ret, nil } - resp, err := m.getRemote().Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req) if err == nil { return (*AlarmResponse)(resp), nil } - if !isHaltErr(ctx, err) { - m.rc.reconnect(err) - } - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) { conn, err := m.c.Dial(endpoint) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } + defer conn.Close() remote := pb.NewMaintenanceClient(conn) resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } return (*DefragmentResponse)(resp), nil } @@ -136,20 +124,21 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) { conn, err := m.c.Dial(endpoint) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } + defer conn.Close() remote := pb.NewMaintenanceClient(conn) resp, err := remote.Status(ctx, &pb.StatusRequest{}) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } return (*StatusResponse)(resp), nil } func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { - ss, err := m.getRemote().Snapshot(ctx, &pb.SnapshotRequest{}) + ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}) if err != nil { - return nil, rpctypes.Error(err) + return nil, toErr(ctx, err) } pr, pw := io.Pipe() @@ -172,9 +161,3 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { }() return pr, nil } - -func (m *maintenance) getRemote() pb.MaintenanceClient { - m.rc.mu.Lock() - defer m.rc.mu.Unlock() - return m.remote -} diff --git a/clientv3/remote_client.go b/clientv3/remote_client.go deleted file mode 100644 index 3bc3484c2..000000000 --- a/clientv3/remote_client.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package clientv3 - -import ( - "sync" - - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" - - "golang.org/x/net/context" - "google.golang.org/grpc" -) - -type remoteClient struct { - client *Client - conn *grpc.ClientConn - updateConn func(*grpc.ClientConn) - mu sync.Mutex -} - -func newRemoteClient(client *Client, update func(*grpc.ClientConn)) *remoteClient { - ret := &remoteClient{ - client: client, - conn: client.ActiveConnection(), - updateConn: update, - } - ret.mu.Lock() - defer ret.mu.Unlock() - ret.updateConn(ret.conn) - return ret -} - -// reconnectWait reconnects the client, returning when connection establishes/fails. -func (r *remoteClient) reconnectWait(ctx context.Context, prevErr error) error { - r.mu.Lock() - updated := r.tryUpdate() - r.mu.Unlock() - if updated { - return nil - } - conn, err := r.client.connWait(ctx, prevErr) - if err == nil { - r.mu.Lock() - r.conn = conn - r.updateConn(conn) - r.mu.Unlock() - } - return err -} - -// reconnect will reconnect the client without waiting -func (r *remoteClient) reconnect(err error) { - r.mu.Lock() - defer r.mu.Unlock() - if r.tryUpdate() { - return - } - r.client.connStartRetry(err) -} - -func (r *remoteClient) tryUpdate() bool { - activeConn := r.client.ActiveConnection() - if activeConn == nil || activeConn == r.conn { - return false - } - r.conn = activeConn - r.updateConn(activeConn) - return true -} - -// acquire gets the client read lock on an established connection or -// returns an error without holding the lock. -func (r *remoteClient) acquire(ctx context.Context) error { - for { - r.mu.Lock() - r.client.mu.RLock() - closed := r.client.cancel == nil - c := r.client.conn - lastConnErr := r.client.lastConnErr - match := r.conn == c - r.mu.Unlock() - if lastConnErr == nil && match { - // new connection already - return nil - } - r.client.mu.RUnlock() - if closed { - return rpctypes.ErrConnClosed - } - if err := r.reconnectWait(ctx, nil); err != nil { - return err - } - } -} - -func (r *remoteClient) release() { r.client.mu.RUnlock() } diff --git a/clientv3/txn.go b/clientv3/txn.go index 77e7642ca..a451e33ac 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -17,7 +17,6 @@ package clientv3 import ( "sync" - "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" ) @@ -143,27 +142,17 @@ func (txn *txn) Commit() (*TxnResponse, error) { return resp, err } if isHaltErr(txn.ctx, err) { - return nil, rpctypes.Error(err) + return nil, toErr(txn.ctx, err) } if txn.isWrite { - txn.kv.rc.reconnect(err) - return nil, rpctypes.Error(err) - } - if nerr := txn.kv.rc.reconnectWait(txn.ctx, err); nerr != nil { - return nil, nerr + return nil, toErr(txn.ctx, err) } } } func (txn *txn) commit() (*TxnResponse, error) { - rem, rerr := txn.kv.getRemote(txn.ctx) - if rerr != nil { - return nil, rerr - } - defer txn.kv.rc.release() - r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas} - resp, err := rem.Txn(txn.ctx, r) + resp, err := txn.kv.remote.Txn(txn.ctx, r) if err != nil { return nil, err } diff --git a/clientv3/watch.go b/clientv3/watch.go index f3a8d7bbb..64656578b 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -22,7 +22,6 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" mvccpb "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" - "google.golang.org/grpc" ) const ( @@ -87,7 +86,6 @@ func (wr *WatchResponse) IsProgressNotify() bool { // watcher implements the Watcher interface type watcher struct { - rc *remoteClient remote pb.WatchClient // ctx controls internal remote.Watch requests @@ -142,6 +140,7 @@ type watcherStream struct { func NewWatcher(c *Client) Watcher { ctx, cancel := context.WithCancel(context.Background()) w := &watcher{ + remote: pb.NewWatchClient(c.conn), ctx: ctx, cancel: cancel, streams: make(map[int64]*watcherStream), @@ -152,10 +151,6 @@ func NewWatcher(c *Client) Watcher { donec: make(chan struct{}), errc: make(chan error, 1), } - - f := func(conn *grpc.ClientConn) { w.remote = pb.NewWatchClient(conn) } - w.rc = newRemoteClient(c, f) - go w.run() return w } @@ -203,7 +198,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch func (w *watcher) Close() error { close(w.stopc) <-w.donec - return v3rpc.Error(<-w.errc) + return toErr(w.ctx, <-w.errc) } func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { @@ -500,29 +495,19 @@ func (w *watcher) resume() (ws pb.Watch_WatchClient, err error) { // openWatchClient retries opening a watchclient until retryConnection fails func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { - if err = w.rc.acquire(w.ctx); err != nil { - return nil, err - } - select { case <-w.stopc: if err == nil { err = context.Canceled } - w.rc.release() return nil, err default: } if ws, err = w.remote.Watch(w.ctx); ws != nil && err == nil { - w.rc.release() break - } else if isHaltErr(w.ctx, err) { - w.rc.release() - return nil, v3rpc.Error(err) } - w.rc.release() - if nerr := w.rc.reconnectWait(w.ctx, err); nerr != nil { - return nil, v3rpc.Error(nerr) + if isHaltErr(w.ctx, err) { + return nil, v3rpc.Error(err) } } return ws, nil diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 1c53c0753..5023d5f26 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -110,8 +110,6 @@ var ( ErrNoLeader = Error(ErrGRPCNoLeader) ErrNotCapable = Error(ErrGRPCNotCapable) - - ErrConnClosed = EtcdError{code: codes.Unavailable, desc: "clientv3: connection closed"} ) // EtcdError defines gRPC server errors.