mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8716 from gyuho/ready-wait
clientv3: separate readyWait for ConnectNotify
This commit is contained in:
commit
a2c61cf04f
30
clientv3/ready_wait.go
Normal file
30
clientv3/ready_wait.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
// Copyright 2017 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package clientv3
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
// TODO: remove this when "FailFast=false" is fixed.
|
||||||
|
// See https://github.com/grpc/grpc-go/issues/1532.
|
||||||
|
func readyWait(rpcCtx, clientCtx context.Context, ready <-chan struct{}) error {
|
||||||
|
select {
|
||||||
|
case <-ready:
|
||||||
|
return nil
|
||||||
|
case <-rpcCtx.Done():
|
||||||
|
return rpcCtx.Err()
|
||||||
|
case <-clientCtx.Done():
|
||||||
|
return clientCtx.Err()
|
||||||
|
}
|
||||||
|
}
|
@ -51,12 +51,8 @@ func isWriteStopError(err error) bool {
|
|||||||
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc {
|
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc {
|
||||||
return func(rpcCtx context.Context, f rpcFunc) error {
|
return func(rpcCtx context.Context, f rpcFunc) error {
|
||||||
for {
|
for {
|
||||||
select {
|
if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil {
|
||||||
case <-c.balancer.ConnectNotify():
|
return err
|
||||||
case <-rpcCtx.Done():
|
|
||||||
return rpcCtx.Err()
|
|
||||||
case <-c.ctx.Done():
|
|
||||||
return c.ctx.Err()
|
|
||||||
}
|
}
|
||||||
pinned := c.balancer.pinned()
|
pinned := c.balancer.pinned()
|
||||||
err := f(rpcCtx)
|
err := f(rpcCtx)
|
||||||
@ -68,20 +64,12 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc {
|
|||||||
}
|
}
|
||||||
// mark this before endpoint switch is triggered
|
// mark this before endpoint switch is triggered
|
||||||
c.balancer.hostPortError(pinned, err)
|
c.balancer.hostPortError(pinned, err)
|
||||||
notify := c.balancer.ConnectNotify()
|
|
||||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
|
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
|
||||||
c.balancer.next()
|
c.balancer.next()
|
||||||
}
|
}
|
||||||
if isStop(err) {
|
if isStop(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
select {
|
|
||||||
case <-notify:
|
|
||||||
case <-rpcCtx.Done():
|
|
||||||
return rpcCtx.Err()
|
|
||||||
case <-c.ctx.Done():
|
|
||||||
return c.ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user