From a3032d3d0bedff89687ee972645bc89b0acbe31d Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 8 Jun 2018 13:44:53 -0700 Subject: [PATCH] *: fix fmt tests, reenable "testEmbedEtcdGracefulStop" Signed-off-by: Gyuho Lee --- clientv3/balancer/balancer.go | 2 +- .../balancer/resolver/endpoint/endpoint.go | 59 ++++++++++--------- clientv3/options.go | 4 +- clientv3/retry_interceptor.go | 4 +- integration/embed_test.go | 5 +- pkg/mock/mockserver/mockserver.go | 3 +- 6 files changed, 38 insertions(+), 39 deletions(-) diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go index 8356aaf00..6ecc5b5f8 100644 --- a/clientv3/balancer/balancer.go +++ b/clientv3/balancer/balancer.go @@ -49,7 +49,7 @@ type builder struct { // Build is called initially when creating "ccBalancerWrapper". // "grpc.Dial" is called to this client connection. -// Then, resolved addreses will be handled via "HandleResolvedAddrs". +// Then, resolved addresses will be handled via "HandleResolvedAddrs". func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { bb := &baseBalancer{ id: strconv.FormatInt(time.Now().UnixNano(), 36), diff --git a/clientv3/balancer/resolver/endpoint/endpoint.go b/clientv3/balancer/resolver/endpoint/endpoint.go index 679f92e9a..104ec773c 100644 --- a/clientv3/balancer/resolver/endpoint/endpoint.go +++ b/clientv3/balancer/resolver/endpoint/endpoint.go @@ -24,9 +24,7 @@ import ( "google.golang.org/grpc/resolver" ) -const ( - scheme = "endpoint" -) +const scheme = "endpoint" var ( targetPrefix = fmt.Sprintf("%s://", scheme) @@ -42,8 +40,8 @@ func init() { } type builder struct { + mu sync.RWMutex resolverGroups map[string]*ResolverGroup - sync.RWMutex } // NewResolverGroup creates a new ResolverGroup with the given id. @@ -54,41 +52,41 @@ func NewResolverGroup(id string) (*ResolverGroup, error) { // ResolverGroup keeps all endpoints of resolvers using a common endpoint:/// target // up-to-date. type ResolverGroup struct { + mu sync.RWMutex id string endpoints []string resolvers []*Resolver - sync.RWMutex } func (e *ResolverGroup) addResolver(r *Resolver) { - e.Lock() + e.mu.Lock() addrs := epsToAddrs(e.endpoints...) e.resolvers = append(e.resolvers, r) - e.Unlock() + e.mu.Unlock() r.cc.NewAddress(addrs) } func (e *ResolverGroup) removeResolver(r *Resolver) { - e.Lock() + e.mu.Lock() for i, er := range e.resolvers { if er == r { e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...) break } } - e.Unlock() + e.mu.Unlock() } // SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated // immediately with the new endpoints. func (e *ResolverGroup) SetEndpoints(endpoints []string) { addrs := epsToAddrs(endpoints...) - e.Lock() + e.mu.Lock() e.endpoints = endpoints for _, r := range e.resolvers { r.cc.NewAddress(addrs) } - e.Unlock() + e.mu.Unlock() } // Target constructs a endpoint target using the endpoint id of the ResolverGroup. @@ -121,7 +119,7 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res return nil, fmt.Errorf("failed to build resolver: %v", err) } r := &Resolver{ - endpointId: id, + endpointID: id, cc: cc, } es.addResolver(r) @@ -129,24 +127,24 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res } func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) { - b.RLock() - es, ok := b.resolverGroups[id] - b.RUnlock() - if !ok { - es = &ResolverGroup{id: id} - b.Lock() - b.resolverGroups[id] = es - b.Unlock() - } else { + b.mu.RLock() + _, ok := b.resolverGroups[id] + b.mu.RUnlock() + if ok { return nil, fmt.Errorf("Endpoint already exists for id: %s", id) } + + es := &ResolverGroup{id: id} + b.mu.Lock() + b.resolverGroups[id] = es + b.mu.Unlock() return es, nil } func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) { - b.RLock() + b.mu.RLock() es, ok := b.resolverGroups[id] - b.RUnlock() + b.mu.RUnlock() if !ok { return nil, fmt.Errorf("ResolverGroup not found for id: %s", id) } @@ -154,9 +152,9 @@ func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) { } func (b *builder) close(id string) { - b.Lock() + b.mu.Lock() delete(b.resolverGroups, id) - b.Unlock() + b.mu.Unlock() } func (r *builder) Scheme() string { @@ -165,7 +163,7 @@ func (r *builder) Scheme() string { // Resolver provides a resolver for a single etcd cluster, identified by name. type Resolver struct { - endpointId string + endpointID string cc resolver.ClientConn sync.RWMutex } @@ -182,15 +180,18 @@ func epsToAddrs(eps ...string) (addrs []resolver.Address) { func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {} func (r *Resolver) Close() { - es, err := bldr.getResolverGroup(r.endpointId) + es, err := bldr.getResolverGroup(r.endpointID) if err != nil { return } es.removeResolver(r) } -// Parse endpoint parses a endpoint of the form (http|https)://*|(unix|unixs)://) and returns a -// protocol ('tcp' or 'unix'), host (or filepath if a unix socket) and scheme (http, https, unix, unixs). +// ParseEndpoint endpoint parses an endpoint of the form +// (http|https)://*|(unix|unixs)://) +// and returns a protocol ('tcp' or 'unix'), +// host (or filepath if a unix socket), +// scheme (http, https, unix, unixs). func ParseEndpoint(endpoint string) (proto string, host string, scheme string) { proto = "tcp" host = endpoint diff --git a/clientv3/options.go b/clientv3/options.go index e158be60e..b82b7554d 100644 --- a/clientv3/options.go +++ b/clientv3/options.go @@ -23,9 +23,9 @@ import ( var ( // client-side handling retrying of request failures where data was not written to the wire or - // where server indicates it did not process the data. gPRC default is default is "FailFast(true)" + // where server indicates it did not process the data. gRPC default is default is "FailFast(true)" // but for etcd we default to "FailFast(false)" to minimize client request error responses due to - // transident failures. + // transient failures. defaultFailFast = grpc.FailFast(false) // client-side request send limit, gRPC default is math.MaxInt32 diff --git a/clientv3/retry_interceptor.go b/clientv3/retry_interceptor.go index 3ace7f0f9..c63047d38 100644 --- a/clientv3/retry_interceptor.go +++ b/clientv3/retry_interceptor.go @@ -123,7 +123,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp type serverStreamingRetryingStream struct { grpc.ClientStream client *Client - bufferedSends []interface{} // single messsage that the client can sen + bufferedSends []interface{} // single message that the client can sen receivedGood bool // indicates whether any prior receives were successful wasClosedSend bool // indicates that CloseSend was closed ctx context.Context @@ -294,7 +294,7 @@ func contextErrToGrpcErr(err error) error { var ( defaultOptions = &options{ retryPolicy: nonRepeatable, - max: 0, // disabed + max: 0, // disable backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10), retryAuth: true, } diff --git a/integration/embed_test.go b/integration/embed_test.go index 32e614ff7..6af58ea1b 100644 --- a/integration/embed_test.go +++ b/integration/embed_test.go @@ -108,9 +108,8 @@ func TestEmbedEtcd(t *testing.T) { } } -// TODO: reenable -//func TestEmbedEtcdGracefulStopSecure(t *testing.T) { testEmbedEtcdGracefulStop(t, true) } -//func TestEmbedEtcdGracefulStopInsecure(t *testing.T) { testEmbedEtcdGracefulStop(t, false) } +func TestEmbedEtcdGracefulStopSecure(t *testing.T) { testEmbedEtcdGracefulStop(t, true) } +func TestEmbedEtcdGracefulStopInsecure(t *testing.T) { testEmbedEtcdGracefulStop(t, false) } // testEmbedEtcdGracefulStop ensures embedded server stops // cutting existing transports. diff --git a/pkg/mock/mockserver/mockserver.go b/pkg/mock/mockserver/mockserver.go index e1ed10559..d72b40b45 100644 --- a/pkg/mock/mockserver/mockserver.go +++ b/pkg/mock/mockserver/mockserver.go @@ -135,11 +135,10 @@ func (ms *MockServers) StartAt(idx int) (err error) { pb.RegisterKVServer(svr, &mockKVServer{}) ms.Servers[idx].GrpcServer = svr + ms.wg.Add(1) go func(svr *grpc.Server, l net.Listener) { - ms.wg.Add(1) svr.Serve(l) }(ms.Servers[idx].GrpcServer, ms.Servers[idx].ln) - return nil }