mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8714 from gyuho/aaa
clientv3: handle stale endpoints, clean up logging
This commit is contained in:
commit
c945b7b44a
@ -40,12 +40,12 @@ type balancer interface {
|
||||
grpc.Balancer
|
||||
ConnectNotify() <-chan struct{}
|
||||
|
||||
endpoint(host string) string
|
||||
endpoint(hostPort string) string
|
||||
endpoints() []string
|
||||
// pinned returns the current pinned endpoint.
|
||||
pinned() string
|
||||
// endpointError handles error from server-side.
|
||||
endpointError(addr string, err error)
|
||||
// hostPortError handles error from server-side.
|
||||
hostPortError(hostPort string, err error)
|
||||
|
||||
// up is Up but includes whether the balancer will use the connection.
|
||||
up(addr grpc.Address) (func(error), bool)
|
||||
@ -95,7 +95,7 @@ type simpleBalancer struct {
|
||||
// grpc issues TLS cert checks using the string passed into dial so
|
||||
// that string must be the host. To recover the full scheme://host URL,
|
||||
// have a map from hosts to the original endpoint.
|
||||
host2ep map[string]string
|
||||
hostPort2ep map[string]string
|
||||
|
||||
// pinAddr is the currently pinned address; set to the empty string on
|
||||
// initialization and shutdown.
|
||||
@ -117,7 +117,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
|
||||
downc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
updateAddrsC: make(chan notifyMsg),
|
||||
host2ep: getHost2ep(eps),
|
||||
hostPort2ep: getHostPort2ep(eps),
|
||||
}
|
||||
close(sb.downc)
|
||||
go sb.updateNotifyLoop()
|
||||
@ -134,10 +134,10 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
||||
|
||||
func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc }
|
||||
|
||||
func (b *simpleBalancer) endpoint(host string) string {
|
||||
func (b *simpleBalancer) endpoint(hostPort string) string {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return b.host2ep[host]
|
||||
return b.hostPort2ep[hostPort]
|
||||
}
|
||||
|
||||
func (b *simpleBalancer) endpoints() []string {
|
||||
@ -152,9 +152,9 @@ func (b *simpleBalancer) pinned() string {
|
||||
return b.pinAddr
|
||||
}
|
||||
|
||||
func (b *simpleBalancer) endpointError(addr string, err error) { return }
|
||||
func (b *simpleBalancer) hostPortError(hostPort string, err error) { return }
|
||||
|
||||
func getHost2ep(eps []string) map[string]string {
|
||||
func getHostPort2ep(eps []string) map[string]string {
|
||||
hm := make(map[string]string, len(eps))
|
||||
for i := range eps {
|
||||
_, host, _ := parseEndpoint(eps[i])
|
||||
@ -164,13 +164,13 @@ func getHost2ep(eps []string) map[string]string {
|
||||
}
|
||||
|
||||
func (b *simpleBalancer) updateAddrs(eps ...string) {
|
||||
np := getHost2ep(eps)
|
||||
np := getHostPort2ep(eps)
|
||||
|
||||
b.mu.Lock()
|
||||
|
||||
match := len(np) == len(b.host2ep)
|
||||
match := len(np) == len(b.hostPort2ep)
|
||||
for k, v := range np {
|
||||
if b.host2ep[k] != v {
|
||||
if b.hostPort2ep[k] != v {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
@ -181,7 +181,7 @@ func (b *simpleBalancer) updateAddrs(eps ...string) {
|
||||
return
|
||||
}
|
||||
|
||||
b.host2ep = np
|
||||
b.hostPort2ep = np
|
||||
b.addrs, b.eps = eps2addrs(eps), eps
|
||||
|
||||
// updating notifyCh can trigger new connections,
|
||||
@ -316,7 +316,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
|
||||
}
|
||||
if b.pinAddr != "" {
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/balancer: %s is up but not pinned (already pinned %s)", addr.Addr, b.pinAddr)
|
||||
logger.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
|
||||
}
|
||||
return func(err error) {}, false
|
||||
}
|
||||
@ -325,7 +325,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
|
||||
b.downc = make(chan struct{})
|
||||
b.pinAddr = addr.Addr
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/balancer: pin %s", addr.Addr)
|
||||
logger.Infof("clientv3/balancer: pin %q", addr.Addr)
|
||||
}
|
||||
// notify client that a connection is up
|
||||
b.readyOnce.Do(func() { close(b.readyc) })
|
||||
@ -336,7 +336,7 @@ func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
|
||||
b.pinAddr = ""
|
||||
b.mu.Unlock()
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/balancer: unpin %s (%v)", addr.Addr, err)
|
||||
logger.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
|
||||
}
|
||||
}, true
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ type healthBalancer struct {
|
||||
stopc chan struct{}
|
||||
stopOnce sync.Once
|
||||
|
||||
host2ep map[string]string
|
||||
hostPort2ep map[string]string
|
||||
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
@ -65,7 +65,7 @@ func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *h
|
||||
healthCheck: hc,
|
||||
eps: b.endpoints(),
|
||||
addrs: eps2addrs(b.endpoints()),
|
||||
host2ep: getHost2ep(b.endpoints()),
|
||||
hostPort2ep: getHostPort2ep(b.endpoints()),
|
||||
unhealthy: make(map[string]time.Time),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
@ -98,7 +98,7 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
|
||||
hb.mu.Unlock()
|
||||
f(err)
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (%v)", addr.Addr, err)
|
||||
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -117,9 +117,9 @@ func (hb *healthBalancer) Close() error {
|
||||
}
|
||||
|
||||
func (hb *healthBalancer) updateAddrs(eps ...string) {
|
||||
addrs, host2ep := eps2addrs(eps), getHost2ep(eps)
|
||||
addrs, hostPort2ep := eps2addrs(eps), getHostPort2ep(eps)
|
||||
hb.mu.Lock()
|
||||
hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep
|
||||
hb.addrs, hb.eps, hb.hostPort2ep = addrs, eps, hostPort2ep
|
||||
hb.unhealthy = make(map[string]time.Time)
|
||||
hb.mu.Unlock()
|
||||
hb.balancer.updateAddrs(eps...)
|
||||
@ -128,7 +128,7 @@ func (hb *healthBalancer) updateAddrs(eps ...string) {
|
||||
func (hb *healthBalancer) endpoint(host string) string {
|
||||
hb.mu.RLock()
|
||||
defer hb.mu.RUnlock()
|
||||
return hb.host2ep[host]
|
||||
return hb.hostPort2ep[host]
|
||||
}
|
||||
|
||||
func (hb *healthBalancer) endpoints() []string {
|
||||
@ -143,10 +143,17 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
|
||||
case <-time.After(timeout):
|
||||
hb.mu.Lock()
|
||||
for k, v := range hb.unhealthy {
|
||||
if _, ok := hb.hostPort2ep[k]; !ok {
|
||||
delete(hb.unhealthy, k)
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/health-balancer: removes stale host:port %q from unhealthy", k)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if time.Since(v) > timeout {
|
||||
delete(hb.unhealthy, k)
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/health-balancer: removes %s from unhealthy after %v", k, timeout)
|
||||
logger.Infof("clientv3/health-balancer: removes %q from unhealthy after %v", k, timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -178,17 +185,21 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address {
|
||||
return addrs
|
||||
}
|
||||
|
||||
func (hb *healthBalancer) endpointError(addr string, err error) {
|
||||
func (hb *healthBalancer) hostPortError(hostPort string, err error) {
|
||||
hb.mu.Lock()
|
||||
hb.unhealthy[addr] = time.Now()
|
||||
hb.unhealthy[hostPort] = time.Now()
|
||||
hb.mu.Unlock()
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err)
|
||||
logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
|
||||
hb.mu.RLock()
|
||||
if _, ok := hb.hostPort2ep[addr.Addr]; !ok { // stale host:port
|
||||
hb.mu.RUnlock()
|
||||
return false
|
||||
}
|
||||
skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.addrs) == len(hb.unhealthy)
|
||||
failedTime, bad := hb.unhealthy[addr.Addr]
|
||||
dur := hb.healthCheckTimeout
|
||||
@ -203,7 +214,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
|
||||
// instead, return before grpc-healthcheck if failed within healthcheck timeout
|
||||
if elapsed := time.Since(failedTime); elapsed < dur {
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/health-balancer: %s is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
|
||||
logger.Infof("clientv3/health-balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, dur)
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -212,7 +223,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
|
||||
delete(hb.unhealthy, addr.Addr)
|
||||
hb.mu.Unlock()
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/health-balancer: %s is healthy (health check success)", addr.Addr)
|
||||
logger.Infof("clientv3/health-balancer: %q is healthy (health check success)", addr.Addr)
|
||||
}
|
||||
return true
|
||||
}
|
||||
@ -220,7 +231,7 @@ func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
|
||||
hb.unhealthy[addr.Addr] = time.Now()
|
||||
hb.mu.Unlock()
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/health-balancer: %s becomes unhealthy (health check failed)", addr.Addr)
|
||||
logger.Infof("clientv3/health-balancer: %q becomes unhealthy (health check failed)", addr.Addr)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
)
|
||||
|
||||
type rpcFunc func(ctx context.Context) error
|
||||
type retryRpcFunc func(context.Context, rpcFunc) error
|
||||
type retryRPCFunc func(context.Context, rpcFunc) error
|
||||
type retryStopErrFunc func(error) bool
|
||||
|
||||
func isReadStopError(err error) bool {
|
||||
@ -48,7 +48,7 @@ func isWriteStopError(err error) bool {
|
||||
return rpctypes.ErrorDesc(err) != "there is no address available"
|
||||
}
|
||||
|
||||
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
|
||||
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc {
|
||||
return func(rpcCtx context.Context, f rpcFunc) error {
|
||||
for {
|
||||
select {
|
||||
@ -64,10 +64,10 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
|
||||
return nil
|
||||
}
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned)
|
||||
logger.Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)
|
||||
}
|
||||
// mark this before endpoint switch is triggered
|
||||
c.balancer.endpointError(pinned, err)
|
||||
c.balancer.hostPortError(pinned, err)
|
||||
notify := c.balancer.ConnectNotify()
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
|
||||
c.balancer.next()
|
||||
@ -86,7 +86,7 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) newAuthRetryWrapper() retryRpcFunc {
|
||||
func (c *Client) newAuthRetryWrapper() retryRPCFunc {
|
||||
return func(rpcCtx context.Context, f rpcFunc) error {
|
||||
for {
|
||||
pinned := c.balancer.pinned()
|
||||
@ -95,12 +95,15 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc {
|
||||
return nil
|
||||
}
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/auth-retry: error %v on pinned endpoint %s", err, pinned)
|
||||
logger.Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
|
||||
}
|
||||
// always stop retry on etcd errors other than invalid auth token
|
||||
if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
|
||||
gterr := c.getToken(rpcCtx)
|
||||
if gterr != nil {
|
||||
if logger.V(4) {
|
||||
logger.Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
|
||||
}
|
||||
return err // return the original error for simplicity
|
||||
}
|
||||
continue
|
||||
@ -124,7 +127,7 @@ func RetryKVClient(c *Client) pb.KVClient {
|
||||
|
||||
type retryKVClient struct {
|
||||
*retryWriteKVClient
|
||||
readRetry retryRpcFunc
|
||||
readRetry retryRPCFunc
|
||||
}
|
||||
|
||||
func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
|
||||
@ -137,11 +140,11 @@ func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts .
|
||||
|
||||
type retryWriteKVClient struct {
|
||||
pb.KVClient
|
||||
retryf retryRpcFunc
|
||||
writeRetry retryRPCFunc
|
||||
}
|
||||
|
||||
func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
|
||||
err = rkv.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rkv.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rkv.KVClient.Put(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -149,7 +152,7 @@ func (rkv *retryWriteKVClient) Put(ctx context.Context, in *pb.PutRequest, opts
|
||||
}
|
||||
|
||||
func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
|
||||
err = rkv.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rkv.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rkv.KVClient.DeleteRange(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -157,7 +160,7 @@ func (rkv *retryWriteKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRan
|
||||
}
|
||||
|
||||
func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
|
||||
err = rkv.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rkv.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rkv.KVClient.Txn(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -165,7 +168,7 @@ func (rkv *retryWriteKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts
|
||||
}
|
||||
|
||||
func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
|
||||
err = rkv.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rkv.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rkv.KVClient.Compact(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -174,7 +177,7 @@ func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionReq
|
||||
|
||||
type retryLeaseClient struct {
|
||||
pb.LeaseClient
|
||||
retryf retryRpcFunc
|
||||
readRetry retryRPCFunc
|
||||
}
|
||||
|
||||
// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
|
||||
@ -187,7 +190,7 @@ func RetryLeaseClient(c *Client) pb.LeaseClient {
|
||||
}
|
||||
|
||||
func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
|
||||
err = rlc.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rlc.readRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -196,7 +199,7 @@ func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRe
|
||||
}
|
||||
|
||||
func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
|
||||
err = rlc.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rlc.readRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -205,7 +208,7 @@ func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevoke
|
||||
|
||||
type retryClusterClient struct {
|
||||
pb.ClusterClient
|
||||
retryf retryRpcFunc
|
||||
writeRetry retryRPCFunc
|
||||
}
|
||||
|
||||
// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
|
||||
@ -214,7 +217,7 @@ func RetryClusterClient(c *Client) pb.ClusterClient {
|
||||
}
|
||||
|
||||
func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
|
||||
err = rcc.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rcc.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rcc.ClusterClient.MemberAdd(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -222,7 +225,7 @@ func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRe
|
||||
}
|
||||
|
||||
func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
|
||||
err = rcc.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rcc.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rcc.ClusterClient.MemberRemove(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -230,7 +233,7 @@ func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRe
|
||||
}
|
||||
|
||||
func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
|
||||
err = rcc.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rcc.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rcc.ClusterClient.MemberUpdate(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -239,7 +242,7 @@ func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUp
|
||||
|
||||
type retryAuthClient struct {
|
||||
pb.AuthClient
|
||||
retryf retryRpcFunc
|
||||
writeRetry retryRPCFunc
|
||||
}
|
||||
|
||||
// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
|
||||
@ -248,7 +251,7 @@ func RetryAuthClient(c *Client) pb.AuthClient {
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.AuthEnable(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -256,7 +259,7 @@ func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableReq
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.AuthDisable(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -264,7 +267,7 @@ func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableR
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.UserAdd(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -272,7 +275,7 @@ func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddReque
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.UserDelete(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -280,7 +283,7 @@ func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDelet
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.UserChangePassword(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -288,7 +291,7 @@ func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthU
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.UserGrantRole(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -296,7 +299,7 @@ func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGr
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.UserRevokeRole(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -304,7 +307,7 @@ func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserR
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.RoleAdd(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -312,7 +315,7 @@ func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddReque
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.RoleDelete(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -320,7 +323,7 @@ func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDelet
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.RoleGrantPermission(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
@ -328,7 +331,7 @@ func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.Auth
|
||||
}
|
||||
|
||||
func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
|
||||
err = rac.retryf(ctx, func(rctx context.Context) error {
|
||||
err = rac.writeRetry(ctx, func(rctx context.Context) error {
|
||||
resp, err = rac.AuthClient.RoleRevokePermission(rctx, in, opts...)
|
||||
return err
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user