diff --git a/clientv3/client.go b/clientv3/client.go index 30b6929d7..a9d63a7b3 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -15,10 +15,13 @@ package clientv3 import ( + "sync" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) +// Client provides and manages an etcd v3 client session. type Client struct { // KV is the keyvalue API for the client's connection. KV pb.KVClient @@ -29,26 +32,36 @@ type Client struct { // Cluster is the cluster API for the client's connection. Cluster pb.ClusterClient - conn *grpc.ClientConn - cfg Config + conn *grpc.ClientConn + cfg Config + mu sync.RWMutex // protects connection selection and error list + errors []error // errors passed to retryConnection } +// EndpointDialer is a policy for choosing which endpoint to dial next +type EndpointDialer func(*Client) (*grpc.ClientConn, error) + type Config struct { // Endpoints is a list of URLs Endpoints []string + // RetryDialer chooses the next endpoint to use + RetryDialer EndpointDialer + // TODO TLS options } // New creates a new etcdv3 client from a given configuration. func New(cfg Config) (*Client, error) { - conn, err := cfg.dialEndpoints() + if cfg.RetryDialer == nil { + cfg.RetryDialer = dialEndpointList + } + // use a temporary skeleton client to bootstrap first connection + conn, err := cfg.RetryDialer(&Client{cfg: cfg}) if err != nil { return nil, err } - client := newClient(conn) - client.cfg = cfg - return client, nil + return newClient(conn, &cfg), nil } // NewFromURL creates a new etcdv3 client from a URL. @@ -57,37 +70,80 @@ func NewFromURL(url string) (*Client, error) { } // NewFromConn creates a new etcdv3 client from an established grpc Connection. -func NewFromConn(conn *grpc.ClientConn) *Client { - return newClient(conn) -} +func NewFromConn(conn *grpc.ClientConn) *Client { return newClient(conn, nil) } // Clone creates a copy of client with the old connection and new API clients. -func (c *Client) Clone() *Client { - cl := newClient(c.conn) - cl.cfg = c.cfg - return cl -} +func (c *Client) Clone() *Client { return newClient(c.conn, &c.cfg) } // Close shuts down the client's etcd connections. func (c *Client) Close() error { return c.conn.Close() } -func newClient(conn *grpc.ClientConn) *Client { +// Endpoints lists the registered endpoints for the client. +func (c *Client) Endpoints() []string { return c.cfg.Endpoints } + +// Errors returns all errors that have been observed since called last. +func (c *Client) Errors() (errs []error) { + c.mu.Lock() + defer c.mu.Unlock() + errs = c.errors + c.errors = nil + return errs +} + +// Dial establishes a connection for a given endpoint using the client's config +func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { + // TODO: enable grpc.WithTransportCredentials(creds) + return grpc.Dial(endpoint, grpc.WithInsecure()) +} + +func newClient(conn *grpc.ClientConn, cfg *Config) *Client { + if cfg == nil { + cfg = &Config{RetryDialer: dialEndpointList} + } return &Client{ KV: pb.NewKVClient(conn), Lease: pb.NewLeaseClient(conn), Watch: pb.NewWatchClient(conn), Cluster: pb.NewClusterClient(conn), conn: conn, + cfg: *cfg, } } -func (cfg *Config) dialEndpoints() (*grpc.ClientConn, error) { +// activeConnection returns the current in-use connection +func (c *Client) activeConnection() *grpc.ClientConn { + c.mu.RLock() + defer c.mu.RUnlock() + return c.conn +} + +// refreshConnection establishes a new connection +func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) *grpc.ClientConn { + c.mu.Lock() + defer c.mu.Unlock() + if err != nil { + c.errors = append(c.errors, err) + } + if oldConn != c.conn { + // conn has already been updated + return c.conn + } + conn, dialErr := c.cfg.RetryDialer(c) + if dialErr != nil { + c.errors = append(c.errors, dialErr) + } + c.conn = conn + return c.conn +} + +// dialEndpoints attempts to connect to each endpoint in order until a +// connection is established. +func dialEndpointList(c *Client) (*grpc.ClientConn, error) { var err error - for _, ep := range cfg.Endpoints { - // TODO: enable grpc.WithTransportCredentials(creds) - conn, curErr := grpc.Dial(ep, grpc.WithInsecure()) + for _, ep := range c.Endpoints() { + conn, curErr := c.Dial(ep) if err != nil { err = curErr } else {