diff --git a/clientv3/client.go b/clientv3/client.go index 5821fd494..3a7e69a49 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -27,6 +27,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "golang.org/x/net/context" + "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" @@ -34,6 +35,9 @@ import ( var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") + + // minConnRetryWait is the minimum time between reconnects to avoid flooding + minConnRetryWait = time.Second ) // Client provides and manages an etcd v3 client session. @@ -191,7 +195,7 @@ func newClient(cfg *Config) (*Client, error) { creds: creds, ctx: ctx, cancel: cancel, - reconnc: make(chan error), + reconnc: make(chan error, 1), newconnc: make(chan struct{}), } @@ -248,8 +252,11 @@ func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr e // connStartRetry schedules a reconnect if one is not already running func (c *Client) connStartRetry(err error) { + c.mu.Lock() + ch := c.reconnc + defer c.mu.Unlock() select { - case c.reconnc <- err: + case ch <- err: default: } } @@ -273,15 +280,20 @@ func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, err // connMonitor monitors the connection and handles retries func (c *Client) connMonitor() { var err error - for { + + defer func() { + _, err = c.retryConnection(c.ctx.Err()) + c.mu.Lock() + c.lastConnErr = err + close(c.newconnc) + c.mu.Unlock() + }() + + limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1) + for limiter.Wait(c.ctx) == nil { select { case err = <-c.reconnc: case <-c.ctx.Done(): - _, err = c.retryConnection(c.ctx.Err()) - c.mu.Lock() - c.lastConnErr = err - close(c.newconnc) - c.mu.Unlock() return } conn, connErr := c.retryConnection(err) @@ -290,6 +302,7 @@ func (c *Client) connMonitor() { c.conn = conn close(c.newconnc) c.newconnc = make(chan struct{}) + c.reconnc = make(chan error, 1) c.mu.Unlock() } }