diff --git a/clientv3/ready_wait.go b/clientv3/ready_wait.go new file mode 100644 index 000000000..c6ef585b5 --- /dev/null +++ b/clientv3/ready_wait.go @@ -0,0 +1,30 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import "context" + +// TODO: remove this when "FailFast=false" is fixed. +// See https://github.com/grpc/grpc-go/issues/1532. +func readyWait(rpcCtx, clientCtx context.Context, ready <-chan struct{}) error { + select { + case <-ready: + return nil + case <-rpcCtx.Done(): + return rpcCtx.Err() + case <-clientCtx.Done(): + return clientCtx.Err() + } +} diff --git a/clientv3/retry.go b/clientv3/retry.go index 170738417..27bacf1d5 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -51,12 +51,8 @@ func isWriteStopError(err error) bool { func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { - select { - case <-c.balancer.ConnectNotify(): - case <-rpcCtx.Done(): - return rpcCtx.Err() - case <-c.ctx.Done(): - return c.ctx.Err() + if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil { + return err } pinned := c.balancer.pinned() err := f(rpcCtx) @@ -68,20 +64,12 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc { } // mark this before endpoint switch is triggered c.balancer.hostPortError(pinned, err) - notify := c.balancer.ConnectNotify() if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { c.balancer.next() } if isStop(err) { return err } - select { - case <-notify: - case <-rpcCtx.Done(): - return rpcCtx.Err() - case <-c.ctx.Done(): - return c.ctx.Err() - } } } }