diff --git a/clientv3/client.go b/clientv3/client.go index 3139a047e..630d9800b 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -15,6 +15,8 @@ package clientv3 import ( + "net" + "net/url" "sync" "time" @@ -65,12 +67,7 @@ func New(cfg Config) (*Client, error) { if cfg.RetryDialer == nil { cfg.RetryDialer = dialEndpointList } - // use a temporary skeleton client to bootstrap first connection - conn, err := cfg.RetryDialer(&Client{cfg: cfg}) - if err != nil { - return nil, err - } - return newClient(conn, &cfg) + return newClient(&cfg) } // NewFromURL creates a new etcdv3 client from a URL. @@ -78,12 +75,6 @@ func NewFromURL(url string) (*Client, error) { return New(Config{Endpoints: []string{url}}) } -// NewFromConn creates a new etcdv3 client from an established grpc Connection. -func NewFromConn(conn *grpc.ClientConn) *Client { return mustNewClient(conn, nil) } - -// Clone creates a copy of client with the old connection and new API clients. -func (c *Client) Clone() *Client { return mustNewClient(c.conn, &c.cfg) } - // Close shuts down the client's etcd connections. func (c *Client) Close() error { return c.conn.Close() @@ -112,6 +103,15 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { } else { opts = append(opts, grpc.WithInsecure()) } + if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" { + f := func(a string, t time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", a, t) + } + // strip unix:// prefix so certs work + endpoint = url.Host + opts = append(opts, grpc.WithDialer(f)) + } + conn, err := grpc.Dial(endpoint, opts...) if err != nil { return nil, err @@ -119,15 +119,7 @@ func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { return conn, nil } -func mustNewClient(conn *grpc.ClientConn, cfg *Config) *Client { - c, err := newClient(conn, cfg) - if err != nil { - panic("expected no error") - } - return c -} - -func newClient(conn *grpc.ClientConn, cfg *Config) (*Client, error) { +func newClient(cfg *Config) (*Client, error) { if cfg == nil { cfg = &Config{RetryDialer: dialEndpointList} } @@ -140,6 +132,11 @@ func newClient(conn *grpc.ClientConn, cfg *Config) (*Client, error) { c := credentials.NewTLS(tlscfg) creds = &c } + // use a temporary skeleton client to bootstrap first connection + conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds}) + if err != nil { + return nil, err + } return &Client{ KV: pb.NewKVClient(conn), Lease: pb.NewLeaseClient(conn), diff --git a/clientv3/lease.go b/clientv3/lease.go index 2be2abc12..071f6f7b8 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -57,9 +57,11 @@ type Lease interface { type lessor struct { c *Client - mu sync.Mutex // guards all fields - conn *grpc.ClientConn // conn in-use - initedc chan bool + mu sync.Mutex // guards all fields + conn *grpc.ClientConn // conn in-use + + // donec is closed when recvKeepAliveLoop stops + donec chan struct{} remote pb.LeaseClient @@ -78,8 +80,7 @@ func NewLease(c *Client) Lease { c: c, conn: c.ActiveConnection(), - initedc: make(chan bool, 1), - + donec: make(chan struct{}), keepAlives: make(map[lease.LeaseID]chan *LeaseKeepAliveResponse), deadlines: make(map[lease.LeaseID]time.Time), } @@ -87,10 +88,7 @@ func NewLease(c *Client) Lease { l.remote = pb.NewLeaseClient(l.conn) l.stopCtx, l.stopCancel = context.WithCancel(context.Background()) - l.initedc <- false - go l.recvKeepAliveLoop() - go l.sendKeepAliveLoop() return l } @@ -181,11 +179,8 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee } func (l *lessor) Close() error { - l.mu.Lock() - defer l.mu.Unlock() - l.stopCancel() - l.stream = nil + <-l.donec return nil } @@ -208,56 +203,66 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id lease.LeaseID) (*LeaseKee } func (l *lessor) recvKeepAliveLoop() { - if !l.initStream() { - l.Close() - return - } + defer func() { + l.stopCancel() + close(l.donec) + }() + stream, serr := l.resetRecv() for { - stream := l.getKeepAliveStream() - resp, err := stream.Recv() if err != nil { - err = l.switchRemoteAndStream(err) - if err != nil { - l.Close() + if stream, serr = l.resetRecv(); serr != nil { return } continue } - - l.mu.Lock() - lch, ok := l.keepAlives[lease.LeaseID(resp.ID)] - if !ok { - l.mu.Unlock() - continue - } - - if resp.TTL <= 0 { - close(lch) - delete(l.deadlines, lease.LeaseID(resp.ID)) - delete(l.keepAlives, lease.LeaseID(resp.ID)) - } else { - select { - case lch <- (*LeaseKeepAliveResponse)(resp): - l.deadlines[lease.LeaseID(resp.ID)] = - time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second) - default: - } - } - l.mu.Unlock() + l.recvKeepAlive(resp) } } -func (l *lessor) sendKeepAliveLoop() { - if !l.initStream() { - l.Close() +// resetRecv opens a new lease stream and starts sending LeaseKeepAliveRequests +func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { + if err := l.switchRemoteAndStream(nil); err != nil { + return nil, err + } + stream := l.getKeepAliveStream() + go l.sendKeepAliveLoop(stream) + return stream, nil +} + +// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse +func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) { + l.mu.Lock() + defer l.mu.Unlock() + + lch, ok := l.keepAlives[lease.LeaseID(resp.ID)] + if !ok { return } + if resp.TTL <= 0 { + close(lch) + delete(l.deadlines, lease.LeaseID(resp.ID)) + delete(l.keepAlives, lease.LeaseID(resp.ID)) + return + } + + select { + case lch <- (*LeaseKeepAliveResponse)(resp): + l.deadlines[lease.LeaseID(resp.ID)] = + time.Now().Add(1 + time.Duration(resp.TTL/3)*time.Second) + default: + } +} + +// sendKeepAliveLoop sends LeaseKeepAliveRequests for the lifetime of a lease stream +func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) { for { select { case <-time.After(500 * time.Millisecond): + case <-l.donec: + return case <-l.stopCtx.Done(): return } @@ -273,21 +278,10 @@ func (l *lessor) sendKeepAliveLoop() { } l.mu.Unlock() - stream := l.getKeepAliveStream() - - var err error for _, id := range tosend { r := &pb.LeaseKeepAliveRequest{ID: int64(id)} - err = stream.Send(r) - if err != nil { - break - } - } - - if err != nil { - err = l.switchRemoteAndStream(err) - if err != nil { - l.Close() + if err := stream.Send(r); err != nil { + // TODO do something with this error? return } } @@ -359,21 +353,6 @@ func (l *lessor) newStream() error { return nil } -func (l *lessor) initStream() bool { - ok := <-l.initedc - if ok { - return true - } - - err := l.switchRemoteAndStream(nil) - if err == nil { - l.initedc <- true - return true - } - l.initedc <- false - return false -} - // cancelWhenStop calls cancel when the given stopc fires. It returns a done chan. done // should be closed when the work is finished. When done fires, cancelWhenStop will release // its internal resource. diff --git a/integration/cluster.go b/integration/cluster.go index 7ba01d8ee..75c18f3b1 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -31,7 +31,6 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" - "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/credentials" "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" @@ -445,6 +444,7 @@ func (m *member) listenGRPC() error { if err != nil { return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) } + m.grpcAddr = "unix://" + m.grpcAddr m.grpcListener = l return nil } @@ -454,29 +454,12 @@ func NewClientV3(m *member) (*clientv3.Client, error) { if m.grpcAddr == "" { return nil, fmt.Errorf("member not configured for grpc") } - f := func(a string, t time.Duration) (net.Conn, error) { - return net.Dial("unix", a) + cfg := clientv3.Config{ + Endpoints: []string{m.grpcAddr}, + DialTimeout: 5 * time.Second, + TLS: m.ClientTLSInfo, } - unixdialer := grpc.WithDialer(f) - opts := []grpc.DialOption{ - unixdialer, - grpc.WithBlock(), - grpc.WithTimeout(5 * time.Second)} - if m.ClientTLSInfo != nil { - tlscfg, err := m.ClientTLSInfo.ClientConfig() - if err != nil { - return nil, err - } - creds := credentials.NewTLS(tlscfg) - opts = append(opts, grpc.WithTransportCredentials(creds)) - } else { - opts = append(opts, grpc.WithInsecure()) - } - conn, err := grpc.Dial(m.grpcAddr, opts...) - if err != nil { - return nil, err - } - return clientv3.NewFromConn(conn), nil + return clientv3.New(cfg) } // Clone returns a member with the same server configuration. The returned diff --git a/tools/benchmark/cmd/util.go b/tools/benchmark/cmd/util.go index 782c1efa1..b22270fe9 100644 --- a/tools/benchmark/cmd/util.go +++ b/tools/benchmark/cmd/util.go @@ -58,7 +58,7 @@ func mustCreateClients(totalClients, totalConns uint) []*clientv3.Client { clients := make([]*clientv3.Client, totalClients) for i := range clients { - clients[i] = conns[i%int(totalConns)].Clone() + clients[i] = conns[i%int(totalConns)] } return clients }