From 6a33f0ffd56004d5a4490f3a0191146d69a57f73 Mon Sep 17 00:00:00 2001 From: Iwasaki Yudai Date: Fri, 14 Oct 2016 14:05:34 -0700 Subject: [PATCH] clientv3: make balancer respect FastFail The simpleBalancer.Get() blocks grpc.Invoke() even when the Invoke() is called with the FailFast option. Therefore currently any requests with the FastFail option actually doesn't fail fast. They get blocked when there is no endpoints available. Get() method needs to respect the BlockingWait option when picks up an endpoint address from the list and fail immediately when the option is enabled and no endpoint is available. --- clientv3/auth.go | 20 +++---- clientv3/balancer.go | 20 +++++++ clientv3/balancer_test.go | 106 ++++++++++++++++++++++++++++++++++++++ clientv3/cluster.go | 6 +-- clientv3/kv.go | 4 +- clientv3/lease.go | 6 +-- 6 files changed, 144 insertions(+), 18 deletions(-) create mode 100644 clientv3/balancer_test.go diff --git a/clientv3/auth.go b/clientv3/auth.go index 9d981cfb1..4d9711101 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -116,32 +116,32 @@ func NewAuth(c *Client) Auth { } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, grpc.FailFast(false)) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, grpc.FailFast(false)) return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { - resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) + resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, grpc.FailFast(false)) return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { - resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) + resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, grpc.FailFast(false)) return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { - resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) + resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, grpc.FailFast(false)) return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { - resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) + resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, grpc.FailFast(false)) return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } @@ -156,12 +156,12 @@ func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { } func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { - resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) + resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, grpc.FailFast(false)) return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { - resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) + resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, grpc.FailFast(false)) return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } @@ -186,12 +186,12 @@ func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { } func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { - resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}) + resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, grpc.FailFast(false)) return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { - resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) + resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, grpc.FailFast(false)) return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/balancer.go b/clientv3/balancer.go index b484b9756..2a55ca83a 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -15,6 +15,7 @@ package clientv3 import ( + "errors" "net/url" "strings" "sync" @@ -23,6 +24,11 @@ import ( "google.golang.org/grpc" ) +// ErrNoAddrAvilable is returned by Get() when the balancer does not have +// any active connection to endpoints at the time. +// This error is returned only when opts.BlockingWait is true. +var ErrNoAddrAvilable = errors.New("there is no address available") + // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path type simpleBalancer struct { @@ -162,6 +168,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { var addr string + + // If opts.BlockingWait is false (for fail-fast RPCs), it should return + // an address it has notified via Notify immediately instead of blocking. + if !opts.BlockingWait { + b.mu.RLock() + addr = b.pinAddr + upEps := len(b.upEps) + b.mu.RUnlock() + if upEps == 0 { + return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable + } + return grpc.Address{Addr: addr}, func() {}, nil + } + for { b.mu.RLock() ch := b.upc diff --git a/clientv3/balancer_test.go b/clientv3/balancer_test.go new file mode 100644 index 000000000..1ac6154d1 --- /dev/null +++ b/clientv3/balancer_test.go @@ -0,0 +1,106 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "errors" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +var ( + endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} +) + +func TestBalancerGetUnblocking(t *testing.T) { + sb := newSimpleBalancer(endpoints) + unblockingOpts := grpc.BalancerGetOptions{BlockingWait: false} + + _, _, err := sb.Get(context.Background(), unblockingOpts) + if err != ErrNoAddrAvilable { + t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) + } + + down1 := sb.Up(grpc.Address{Addr: endpoints[1]}) + down2 := sb.Up(grpc.Address{Addr: endpoints[2]}) + addrFirst, putFun, err := sb.Get(context.Background(), unblockingOpts) + if err != nil { + t.Errorf("Get() with up endpoints should sucess, got %v", err) + } + if addrFirst.Addr != endpoints[1] && addrFirst.Addr != endpoints[2] { + t.Errorf("Get() didn't return expected address, got %v", addrFirst) + } + if putFun == nil { + t.Errorf("Get() returned unexpected nil put function") + } + addrSecond, _, _ := sb.Get(context.Background(), unblockingOpts) + if addrSecond.Addr != addrSecond.Addr { + t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) + } + + down1(errors.New("error")) + down2(errors.New("error")) + _, _, err = sb.Get(context.Background(), unblockingOpts) + if err != ErrNoAddrAvilable { + t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) + } +} + +func TestBalancerGetBlocking(t *testing.T) { + sb := newSimpleBalancer(endpoints) + blockingOpts := grpc.BalancerGetOptions{BlockingWait: true} + + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100) + _, _, err := sb.Get(ctx, blockingOpts) + if err != context.DeadlineExceeded { + t.Errorf("Get() with no up endpoints should timeout, got %v", err) + } + + downC := make(chan func(error), 1) + + go func() { + // ensure sb.Up() will be called after sb.Get() to see if Up() releases blocking Get() + time.Sleep(time.Millisecond * 100) + downC <- sb.Up(grpc.Address{Addr: endpoints[1]}) + }() + addrFirst, putFun, err := sb.Get(context.Background(), blockingOpts) + if err != nil { + t.Errorf("Get() with up endpoints should sucess, got %v", err) + } + if addrFirst.Addr != endpoints[1] { + t.Errorf("Get() didn't return expected address, got %v", addrFirst) + } + if putFun == nil { + t.Errorf("Get() returned unexpected nil put function") + } + down1 := <-downC + + down2 := sb.Up(grpc.Address{Addr: endpoints[2]}) + addrSecond, _, _ := sb.Get(context.Background(), blockingOpts) + if addrSecond.Addr != addrSecond.Addr { + t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) + } + + down1(errors.New("error")) + down2(errors.New("error")) + ctx, _ = context.WithTimeout(context.Background(), time.Millisecond*100) + _, _, err = sb.Get(ctx, blockingOpts) + if err != context.DeadlineExceeded { + t.Errorf("Get() with no up endpoints should timeout, got %v", err) + } +} diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 8b981171b..d85f062b2 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -52,7 +52,7 @@ func NewCluster(c *Client) Cluster { func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { r := &pb.MemberAddRequest{PeerURLs: peerAddrs} - resp, err := c.remote.MemberAdd(ctx, r) + resp, err := c.remote.MemberAdd(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberAddResponse)(resp), nil } @@ -64,7 +64,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { r := &pb.MemberRemoveRequest{ID: id} - resp, err := c.remote.MemberRemove(ctx, r) + resp, err := c.remote.MemberRemove(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberRemoveResponse)(resp), nil } @@ -78,7 +78,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin // it is safe to retry on update. for { r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r) + resp, err := c.remote.MemberUpdate(ctx, r, grpc.FailFast(false)) if err == nil { return (*MemberUpdateResponse)(resp), nil } diff --git a/clientv3/kv.go b/clientv3/kv.go index 834b17d34..1faa8f690 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -148,14 +148,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) { case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV} - resp, err = kv.remote.Put(ctx, r) + resp, err = kv.remote.Put(ctx, r, grpc.FailFast(false)) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} - resp, err = kv.remote.DeleteRange(ctx, r) + resp, err = kv.remote.DeleteRange(ctx, r, grpc.FailFast(false)) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } diff --git a/clientv3/lease.go b/clientv3/lease.go index ed8bb0a53..f1bc591f7 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -148,7 +148,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err for { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(cctx, r) + resp, err := l.remote.LeaseGrant(cctx, r, grpc.FailFast(false)) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -174,7 +174,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, for { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(cctx, r) + resp, err := l.remote.LeaseRevoke(cctx, r, grpc.FailFast(false)) if err == nil { return (*LeaseRevokeResponse)(resp), nil @@ -195,7 +195,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption for { r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(cctx, r) + resp, err := l.remote.LeaseTimeToLive(cctx, r, grpc.FailFast(false)) if err == nil { gresp := &LeaseTimeToLiveResponse{ ResponseHeader: resp.GetHeader(),