mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: rework reconnection logic
Avoids go routine flood for tight loops with a dead connection. Now uses request ctx when reconnecting for immediate retry.
This commit is contained in:
parent
3bcd2b5b9f
commit
cdc8f99658
@ -50,6 +50,14 @@ type Client struct {
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
// fields below are managed by connMonitor
|
||||
|
||||
// reconnc accepts writes which signal the client should reconnect
|
||||
reconnc chan error
|
||||
// newconnc is closed on successful connect and set to a fresh channel
|
||||
newconnc chan struct{}
|
||||
lastConnErr error
|
||||
}
|
||||
|
||||
// New creates a new etcdv3 client from a given configuration.
|
||||
@ -87,10 +95,13 @@ func (c *Client) Close() error {
|
||||
}
|
||||
c.cancel()
|
||||
c.cancel = nil
|
||||
err := c.conn.Close()
|
||||
connc := c.newconnc
|
||||
c.mu.Unlock()
|
||||
c.Watcher.Close()
|
||||
c.Lease.Close()
|
||||
return c.conn.Close()
|
||||
<-connc
|
||||
return err
|
||||
}
|
||||
|
||||
// Ctx is a context for "out of band" messages (e.g., for sending
|
||||
@ -161,12 +172,17 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
return nil, err
|
||||
}
|
||||
client := &Client{
|
||||
conn: conn,
|
||||
cfg: *cfg,
|
||||
creds: creds,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
conn: conn,
|
||||
cfg: *cfg,
|
||||
creds: creds,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
reconnc: make(chan error),
|
||||
newconnc: make(chan struct{}),
|
||||
}
|
||||
|
||||
go client.connMonitor()
|
||||
|
||||
client.Cluster = NewCluster(client)
|
||||
client.KV = NewKV(client)
|
||||
client.Lease = NewLease(client)
|
||||
@ -191,7 +207,7 @@ func (c *Client) ActiveConnection() *grpc.ClientConn {
|
||||
}
|
||||
|
||||
// retryConnection establishes a new connection
|
||||
func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.ClientConn, error) {
|
||||
func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if err != nil {
|
||||
@ -200,24 +216,66 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
|
||||
if c.cancel == nil {
|
||||
return nil, c.ctx.Err()
|
||||
}
|
||||
if oldConn != c.conn {
|
||||
// conn has already been updated
|
||||
return c.conn, nil
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
if st, _ := c.conn.State(); st != grpc.Shutdown {
|
||||
// wait so grpc doesn't leak sleeping goroutines
|
||||
c.conn.WaitForStateChange(c.ctx, st)
|
||||
}
|
||||
}
|
||||
|
||||
oldConn.Close()
|
||||
if st, _ := oldConn.State(); st != grpc.Shutdown {
|
||||
// wait for shutdown so grpc doesn't leak sleeping goroutines
|
||||
oldConn.WaitForStateChange(c.ctx, st)
|
||||
}
|
||||
|
||||
conn, dialErr := c.cfg.RetryDialer(c)
|
||||
c.conn, dialErr = c.cfg.RetryDialer(c)
|
||||
if dialErr != nil {
|
||||
c.errors = append(c.errors, dialErr)
|
||||
return nil, dialErr
|
||||
}
|
||||
c.conn = conn
|
||||
return c.conn, nil
|
||||
return c.conn, dialErr
|
||||
}
|
||||
|
||||
// connStartRetry schedules a reconnect if one is not already running
|
||||
func (c *Client) connStartRetry(err error) {
|
||||
select {
|
||||
case c.reconnc <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// connWait waits for a reconnect to be processed
|
||||
func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) {
|
||||
c.mu.Lock()
|
||||
ch := c.newconnc
|
||||
c.mu.Unlock()
|
||||
c.connStartRetry(err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-ch:
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.conn, c.lastConnErr
|
||||
}
|
||||
|
||||
// connMonitor monitors the connection and handles retries
|
||||
func (c *Client) connMonitor() {
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
case err = <-c.reconnc:
|
||||
case <-c.ctx.Done():
|
||||
c.mu.Lock()
|
||||
c.lastConnErr = c.ctx.Err()
|
||||
close(c.newconnc)
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
conn, connErr := c.retryConnection(err)
|
||||
c.mu.Lock()
|
||||
c.lastConnErr = connErr
|
||||
c.conn = conn
|
||||
close(c.newconnc)
|
||||
c.newconnc = make(chan struct{})
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// dialEndpointList attempts to connect to each endpoint in order until a
|
||||
|
@ -15,8 +15,6 @@
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
@ -46,22 +44,15 @@ type Cluster interface {
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
c *Client
|
||||
|
||||
mu sync.Mutex
|
||||
conn *grpc.ClientConn // conn in-use
|
||||
rc *remoteClient
|
||||
remote pb.ClusterClient
|
||||
}
|
||||
|
||||
func NewCluster(c *Client) Cluster {
|
||||
conn := c.ActiveConnection()
|
||||
|
||||
return &cluster{
|
||||
c: c,
|
||||
|
||||
conn: conn,
|
||||
remote: pb.NewClusterClient(conn),
|
||||
}
|
||||
ret := &cluster{}
|
||||
f := func(conn *grpc.ClientConn) { ret.remote = pb.NewClusterClient(conn) }
|
||||
ret.rc = newRemoteClient(c, f)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
@ -75,7 +66,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
go c.switchRemote(err)
|
||||
c.rc.reconnect(err)
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
@ -90,7 +81,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
go c.switchRemote(err)
|
||||
c.rc.reconnect(err)
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
@ -107,8 +98,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
err = c.switchRemote(err)
|
||||
if err != nil {
|
||||
if err = c.rc.reconnectWait(ctx, err); err != nil {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
}
|
||||
@ -126,30 +116,14 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
err = c.switchRemote(err)
|
||||
if err != nil {
|
||||
if err = c.rc.reconnectWait(ctx, err); err != nil {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cluster) getRemote() pb.ClusterClient {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.rc.mu.Lock()
|
||||
defer c.rc.mu.Unlock()
|
||||
return c.remote
|
||||
}
|
||||
|
||||
func (c *cluster) switchRemote(prevErr error) error {
|
||||
newConn, err := c.c.retryConnection(c.conn, prevErr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.conn = newConn
|
||||
c.remote = pb.NewClusterClient(c.conn)
|
||||
return nil
|
||||
}
|
||||
|
@ -15,8 +15,6 @@
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"golang.org/x/net/context"
|
||||
@ -76,23 +74,15 @@ type OpResponse struct {
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
c *Client
|
||||
|
||||
mu sync.Mutex // guards all fields
|
||||
conn *grpc.ClientConn // conn in-use
|
||||
rc *remoteClient
|
||||
remote pb.KVClient
|
||||
}
|
||||
|
||||
func NewKV(c *Client) KV {
|
||||
conn := c.ActiveConnection()
|
||||
remote := pb.NewKVClient(conn)
|
||||
|
||||
return &kv{
|
||||
conn: c.ActiveConnection(),
|
||||
remote: remote,
|
||||
|
||||
c: c,
|
||||
}
|
||||
ret := &kv{}
|
||||
f := func(conn *grpc.ClientConn) { ret.remote = pb.NewKVClient(conn) }
|
||||
ret.rc = newRemoteClient(c, f)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
|
||||
@ -111,17 +101,14 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete
|
||||
}
|
||||
|
||||
func (kv *kv) Compact(ctx context.Context, rev int64) error {
|
||||
remote := kv.getRemote()
|
||||
_, err := remote.Compact(ctx, &pb.CompactionRequest{Revision: rev})
|
||||
_, err := kv.getRemote().Compact(ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if isHaltErr(ctx, err) {
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
|
||||
go kv.switchRemote(remote, err)
|
||||
kv.rc.reconnect(err)
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
|
||||
@ -174,36 +161,18 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
|
||||
|
||||
// do not retry on modifications
|
||||
if op.isWrite() {
|
||||
go kv.switchRemote(remote, err)
|
||||
kv.rc.reconnect(err)
|
||||
return OpResponse{}, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
if nerr := kv.switchRemote(remote, err); nerr != nil {
|
||||
if nerr := kv.rc.reconnectWait(ctx, err); nerr != nil {
|
||||
return OpResponse{}, nerr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (kv *kv) switchRemote(remote pb.KVClient, prevErr error) error {
|
||||
kv.mu.Lock()
|
||||
oldRemote := kv.remote
|
||||
conn := kv.conn
|
||||
kv.mu.Unlock()
|
||||
if remote != oldRemote {
|
||||
return nil
|
||||
}
|
||||
newConn, err := kv.c.retryConnection(conn, prevErr)
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
if err == nil {
|
||||
kv.conn = newConn
|
||||
kv.remote = pb.NewKVClient(kv.conn)
|
||||
}
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
|
||||
func (kv *kv) getRemote() pb.KVClient {
|
||||
kv.mu.Lock()
|
||||
defer kv.mu.Unlock()
|
||||
kv.rc.mu.Lock()
|
||||
defer kv.rc.mu.Unlock()
|
||||
return kv.remote
|
||||
}
|
||||
|
@ -71,14 +71,12 @@ type Lease interface {
|
||||
}
|
||||
|
||||
type lessor struct {
|
||||
c *Client
|
||||
|
||||
mu sync.Mutex // guards all fields
|
||||
conn *grpc.ClientConn // conn in-use
|
||||
mu sync.Mutex // guards all fields
|
||||
|
||||
// donec is closed when recvKeepAliveLoop stops
|
||||
donec chan struct{}
|
||||
|
||||
rc *remoteClient
|
||||
remote pb.LeaseClient
|
||||
|
||||
stream pb.Lease_LeaseKeepAliveClient
|
||||
@ -102,14 +100,12 @@ type keepAlive struct {
|
||||
|
||||
func NewLease(c *Client) Lease {
|
||||
l := &lessor{
|
||||
c: c,
|
||||
conn: c.ActiveConnection(),
|
||||
|
||||
donec: make(chan struct{}),
|
||||
keepAlives: make(map[LeaseID]*keepAlive),
|
||||
}
|
||||
f := func(conn *grpc.ClientConn) { l.remote = pb.NewLeaseClient(conn) }
|
||||
l.rc = newRemoteClient(c, f)
|
||||
|
||||
l.remote = pb.NewLeaseClient(l.conn)
|
||||
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
|
||||
|
||||
go l.recvKeepAliveLoop()
|
||||
@ -386,8 +382,8 @@ func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
|
||||
}
|
||||
|
||||
func (l *lessor) getRemote() pb.LeaseClient {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.rc.mu.Lock()
|
||||
defer l.rc.mu.Unlock()
|
||||
return l.remote
|
||||
}
|
||||
|
||||
@ -399,36 +395,15 @@ func (l *lessor) getKeepAliveStream() pb.Lease_LeaseKeepAliveClient {
|
||||
|
||||
func (l *lessor) switchRemoteAndStream(prevErr error) error {
|
||||
for {
|
||||
l.mu.Lock()
|
||||
conn := l.conn
|
||||
l.mu.Unlock()
|
||||
|
||||
var (
|
||||
err error
|
||||
newConn *grpc.ClientConn
|
||||
)
|
||||
|
||||
if prevErr != nil {
|
||||
conn.Close()
|
||||
newConn, err = l.c.retryConnection(conn, prevErr)
|
||||
err := l.rc.reconnectWait(l.stopCtx, prevErr)
|
||||
if err != nil {
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
if newConn != nil {
|
||||
l.conn = newConn
|
||||
if prevErr = l.newStream(); prevErr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
l.remote = pb.NewLeaseClient(l.conn)
|
||||
l.mu.Unlock()
|
||||
|
||||
prevErr = l.newStream()
|
||||
if prevErr != nil {
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@ package clientv3
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -57,18 +56,15 @@ type Maintenance interface {
|
||||
type maintenance struct {
|
||||
c *Client
|
||||
|
||||
mu sync.Mutex
|
||||
conn *grpc.ClientConn // conn in-use
|
||||
rc *remoteClient
|
||||
remote pb.MaintenanceClient
|
||||
}
|
||||
|
||||
func NewMaintenance(c *Client) Maintenance {
|
||||
conn := c.ActiveConnection()
|
||||
return &maintenance{
|
||||
c: c,
|
||||
conn: conn,
|
||||
remote: pb.NewMaintenanceClient(conn),
|
||||
}
|
||||
ret := &maintenance{c: c}
|
||||
f := func(conn *grpc.ClientConn) { ret.remote = pb.NewMaintenanceClient(conn) }
|
||||
ret.rc = newRemoteClient(c, f)
|
||||
return ret
|
||||
}
|
||||
|
||||
func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
|
||||
@ -85,7 +81,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
|
||||
if isHaltErr(ctx, err) {
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
if err = m.switchRemote(err); err != nil {
|
||||
if err = m.rc.reconnectWait(ctx, err); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -118,8 +114,8 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR
|
||||
if err == nil {
|
||||
return (*AlarmResponse)(resp), nil
|
||||
}
|
||||
if isHaltErr(ctx, err) {
|
||||
go m.switchRemote(err)
|
||||
if !isHaltErr(ctx, err) {
|
||||
m.rc.reconnect(err)
|
||||
}
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
@ -178,19 +174,7 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
||||
}
|
||||
|
||||
func (m *maintenance) getRemote() pb.MaintenanceClient {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.rc.mu.Lock()
|
||||
defer m.rc.mu.Unlock()
|
||||
return m.remote
|
||||
}
|
||||
|
||||
func (m *maintenance) switchRemote(prevErr error) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
newConn, err := m.c.retryConnection(m.conn, prevErr)
|
||||
if err != nil {
|
||||
return rpctypes.Error(err)
|
||||
}
|
||||
m.conn = newConn
|
||||
m.remote = pb.NewMaintenanceClient(m.conn)
|
||||
return nil
|
||||
}
|
||||
|
79
clientv3/remote_client.go
Normal file
79
clientv3/remote_client.go
Normal file
@ -0,0 +1,79 @@
|
||||
// Copyright 2016 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package clientv3
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type remoteClient struct {
|
||||
client *Client
|
||||
conn *grpc.ClientConn
|
||||
updateConn func(*grpc.ClientConn)
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newRemoteClient(client *Client, update func(*grpc.ClientConn)) *remoteClient {
|
||||
ret := &remoteClient{
|
||||
client: client,
|
||||
conn: client.ActiveConnection(),
|
||||
updateConn: update,
|
||||
}
|
||||
ret.mu.Lock()
|
||||
defer ret.mu.Unlock()
|
||||
ret.updateConn(ret.conn)
|
||||
return ret
|
||||
}
|
||||
|
||||
// reconnectWait reconnects the client, returning when connection establishes/fails.
|
||||
func (r *remoteClient) reconnectWait(ctx context.Context, prevErr error) error {
|
||||
r.mu.Lock()
|
||||
updated := r.tryUpdate()
|
||||
r.mu.Unlock()
|
||||
if updated {
|
||||
return nil
|
||||
}
|
||||
conn, err := r.client.connWait(ctx, prevErr)
|
||||
if err == nil {
|
||||
r.mu.Lock()
|
||||
r.conn = conn
|
||||
r.updateConn(conn)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// reconnect will reconnect the client without waiting
|
||||
func (r *remoteClient) reconnect(err error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if r.tryUpdate() {
|
||||
return
|
||||
}
|
||||
r.client.connStartRetry(err)
|
||||
}
|
||||
|
||||
func (r *remoteClient) tryUpdate() bool {
|
||||
activeConn := r.client.ActiveConnection()
|
||||
if activeConn == nil || activeConn == r.conn {
|
||||
return false
|
||||
}
|
||||
r.conn = activeConn
|
||||
r.updateConn(activeConn)
|
||||
return true
|
||||
}
|
@ -141,9 +141,8 @@ func (txn *txn) Commit() (*TxnResponse, error) {
|
||||
kv := txn.kv
|
||||
|
||||
for {
|
||||
remote := kv.getRemote()
|
||||
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
|
||||
resp, err := remote.Txn(txn.ctx, r)
|
||||
resp, err := kv.getRemote().Txn(txn.ctx, r)
|
||||
if err == nil {
|
||||
return (*TxnResponse)(resp), nil
|
||||
}
|
||||
@ -153,11 +152,11 @@ func (txn *txn) Commit() (*TxnResponse, error) {
|
||||
}
|
||||
|
||||
if txn.isWrite {
|
||||
go kv.switchRemote(remote, err)
|
||||
kv.rc.reconnect(err)
|
||||
return nil, rpctypes.Error(err)
|
||||
}
|
||||
|
||||
if nerr := kv.switchRemote(remote, err); nerr != nil {
|
||||
if nerr := kv.rc.reconnectWait(txn.ctx, err); nerr != nil {
|
||||
return nil, nerr
|
||||
}
|
||||
}
|
||||
|
@ -87,8 +87,7 @@ func (wr *WatchResponse) IsProgressNotify() bool {
|
||||
|
||||
// watcher implements the Watcher interface
|
||||
type watcher struct {
|
||||
c *Client
|
||||
conn *grpc.ClientConn
|
||||
rc *remoteClient
|
||||
remote pb.WatchClient
|
||||
|
||||
// ctx controls internal remote.Watch requests
|
||||
@ -142,13 +141,7 @@ type watcherStream struct {
|
||||
|
||||
func NewWatcher(c *Client) Watcher {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
conn := c.ActiveConnection()
|
||||
|
||||
w := &watcher{
|
||||
c: c,
|
||||
conn: conn,
|
||||
remote: pb.NewWatchClient(conn),
|
||||
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
streams: make(map[int64]*watcherStream),
|
||||
@ -159,6 +152,10 @@ func NewWatcher(c *Client) Watcher {
|
||||
donec: make(chan struct{}),
|
||||
errc: make(chan error, 1),
|
||||
}
|
||||
|
||||
f := func(conn *grpc.ClientConn) { w.remote = pb.NewWatchClient(conn) }
|
||||
w.rc = newRemoteClient(c, f)
|
||||
|
||||
go w.run()
|
||||
return w
|
||||
}
|
||||
@ -508,12 +505,9 @@ func (w *watcher) openWatchClient() (ws pb.Watch_WatchClient, err error) {
|
||||
} else if isHaltErr(w.ctx, err) {
|
||||
return nil, v3rpc.Error(err)
|
||||
}
|
||||
newConn, nerr := w.c.retryConnection(w.conn, nil)
|
||||
if nerr != nil {
|
||||
if nerr := w.remoteConn.reconnectWait(w.ctx, nil); nerr != nil {
|
||||
return nil, nerr
|
||||
}
|
||||
w.conn = newConn
|
||||
w.remote = pb.NewWatchClient(w.conn)
|
||||
}
|
||||
return ws, nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user