*: fix fmt tests, reenable "testEmbedEtcdGracefulStop"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-06-08 13:44:53 -07:00
parent 3b84117f54
commit a3032d3d0b
6 changed files with 38 additions and 39 deletions

View File

@ -49,7 +49,7 @@ type builder struct {
// Build is called initially when creating "ccBalancerWrapper". // Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection. // "grpc.Dial" is called to this client connection.
// Then, resolved addreses will be handled via "HandleResolvedAddrs". // Then, resolved addresses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{ bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36), id: strconv.FormatInt(time.Now().UnixNano(), 36),

View File

@ -24,9 +24,7 @@ import (
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
const ( const scheme = "endpoint"
scheme = "endpoint"
)
var ( var (
targetPrefix = fmt.Sprintf("%s://", scheme) targetPrefix = fmt.Sprintf("%s://", scheme)
@ -42,8 +40,8 @@ func init() {
} }
type builder struct { type builder struct {
mu sync.RWMutex
resolverGroups map[string]*ResolverGroup resolverGroups map[string]*ResolverGroup
sync.RWMutex
} }
// NewResolverGroup creates a new ResolverGroup with the given id. // NewResolverGroup creates a new ResolverGroup with the given id.
@ -54,41 +52,41 @@ func NewResolverGroup(id string) (*ResolverGroup, error) {
// ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target // ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
// up-to-date. // up-to-date.
type ResolverGroup struct { type ResolverGroup struct {
mu sync.RWMutex
id string id string
endpoints []string endpoints []string
resolvers []*Resolver resolvers []*Resolver
sync.RWMutex
} }
func (e *ResolverGroup) addResolver(r *Resolver) { func (e *ResolverGroup) addResolver(r *Resolver) {
e.Lock() e.mu.Lock()
addrs := epsToAddrs(e.endpoints...) addrs := epsToAddrs(e.endpoints...)
e.resolvers = append(e.resolvers, r) e.resolvers = append(e.resolvers, r)
e.Unlock() e.mu.Unlock()
r.cc.NewAddress(addrs) r.cc.NewAddress(addrs)
} }
func (e *ResolverGroup) removeResolver(r *Resolver) { func (e *ResolverGroup) removeResolver(r *Resolver) {
e.Lock() e.mu.Lock()
for i, er := range e.resolvers { for i, er := range e.resolvers {
if er == r { if er == r {
e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...) e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
break break
} }
} }
e.Unlock() e.mu.Unlock()
} }
// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated // SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
// immediately with the new endpoints. // immediately with the new endpoints.
func (e *ResolverGroup) SetEndpoints(endpoints []string) { func (e *ResolverGroup) SetEndpoints(endpoints []string) {
addrs := epsToAddrs(endpoints...) addrs := epsToAddrs(endpoints...)
e.Lock() e.mu.Lock()
e.endpoints = endpoints e.endpoints = endpoints
for _, r := range e.resolvers { for _, r := range e.resolvers {
r.cc.NewAddress(addrs) r.cc.NewAddress(addrs)
} }
e.Unlock() e.mu.Unlock()
} }
// Target constructs a endpoint target using the endpoint id of the ResolverGroup. // Target constructs a endpoint target using the endpoint id of the ResolverGroup.
@ -121,7 +119,7 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
return nil, fmt.Errorf("failed to build resolver: %v", err) return nil, fmt.Errorf("failed to build resolver: %v", err)
} }
r := &Resolver{ r := &Resolver{
endpointId: id, endpointID: id,
cc: cc, cc: cc,
} }
es.addResolver(r) es.addResolver(r)
@ -129,24 +127,24 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
} }
func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) { func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
b.RLock() b.mu.RLock()
es, ok := b.resolverGroups[id] _, ok := b.resolverGroups[id]
b.RUnlock() b.mu.RUnlock()
if !ok { if ok {
es = &ResolverGroup{id: id}
b.Lock()
b.resolverGroups[id] = es
b.Unlock()
} else {
return nil, fmt.Errorf("Endpoint already exists for id: %s", id) return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
} }
es := &ResolverGroup{id: id}
b.mu.Lock()
b.resolverGroups[id] = es
b.mu.Unlock()
return es, nil return es, nil
} }
func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) { func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
b.RLock() b.mu.RLock()
es, ok := b.resolverGroups[id] es, ok := b.resolverGroups[id]
b.RUnlock() b.mu.RUnlock()
if !ok { if !ok {
return nil, fmt.Errorf("ResolverGroup not found for id: %s", id) return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
} }
@ -154,9 +152,9 @@ func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
} }
func (b *builder) close(id string) { func (b *builder) close(id string) {
b.Lock() b.mu.Lock()
delete(b.resolverGroups, id) delete(b.resolverGroups, id)
b.Unlock() b.mu.Unlock()
} }
func (r *builder) Scheme() string { func (r *builder) Scheme() string {
@ -165,7 +163,7 @@ func (r *builder) Scheme() string {
// Resolver provides a resolver for a single etcd cluster, identified by name. // Resolver provides a resolver for a single etcd cluster, identified by name.
type Resolver struct { type Resolver struct {
endpointId string endpointID string
cc resolver.ClientConn cc resolver.ClientConn
sync.RWMutex sync.RWMutex
} }
@ -182,15 +180,18 @@ func epsToAddrs(eps ...string) (addrs []resolver.Address) {
func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {} func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
func (r *Resolver) Close() { func (r *Resolver) Close() {
es, err := bldr.getResolverGroup(r.endpointId) es, err := bldr.getResolverGroup(r.endpointID)
if err != nil { if err != nil {
return return
} }
es.removeResolver(r) es.removeResolver(r)
} }
// Parse endpoint parses a endpoint of the form (http|https)://<host>*|(unix|unixs)://<path>) and returns a // ParseEndpoint endpoint parses an endpoint of the form
// protocol ('tcp' or 'unix'), host (or filepath if a unix socket) and scheme (http, https, unix, unixs). // (http|https)://<host>*|(unix|unixs)://<path>)
// and returns a protocol ('tcp' or 'unix'),
// host (or filepath if a unix socket),
// scheme (http, https, unix, unixs).
func ParseEndpoint(endpoint string) (proto string, host string, scheme string) { func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
proto = "tcp" proto = "tcp"
host = endpoint host = endpoint

View File

@ -23,9 +23,9 @@ import (
var ( var (
// client-side handling retrying of request failures where data was not written to the wire or // client-side handling retrying of request failures where data was not written to the wire or
// where server indicates it did not process the data. gPRC default is default is "FailFast(true)" // where server indicates it did not process the data. gRPC default is default is "FailFast(true)"
// but for etcd we default to "FailFast(false)" to minimize client request error responses due to // but for etcd we default to "FailFast(false)" to minimize client request error responses due to
// transident failures. // transient failures.
defaultFailFast = grpc.FailFast(false) defaultFailFast = grpc.FailFast(false)
// client-side request send limit, gRPC default is math.MaxInt32 // client-side request send limit, gRPC default is math.MaxInt32

View File

@ -123,7 +123,7 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
type serverStreamingRetryingStream struct { type serverStreamingRetryingStream struct {
grpc.ClientStream grpc.ClientStream
client *Client client *Client
bufferedSends []interface{} // single messsage that the client can sen bufferedSends []interface{} // single message that the client can sen
receivedGood bool // indicates whether any prior receives were successful receivedGood bool // indicates whether any prior receives were successful
wasClosedSend bool // indicates that CloseSend was closed wasClosedSend bool // indicates that CloseSend was closed
ctx context.Context ctx context.Context
@ -294,7 +294,7 @@ func contextErrToGrpcErr(err error) error {
var ( var (
defaultOptions = &options{ defaultOptions = &options{
retryPolicy: nonRepeatable, retryPolicy: nonRepeatable,
max: 0, // disabed max: 0, // disable
backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10), backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
retryAuth: true, retryAuth: true,
} }

View File

@ -108,9 +108,8 @@ func TestEmbedEtcd(t *testing.T) {
} }
} }
// TODO: reenable func TestEmbedEtcdGracefulStopSecure(t *testing.T) { testEmbedEtcdGracefulStop(t, true) }
//func TestEmbedEtcdGracefulStopSecure(t *testing.T) { testEmbedEtcdGracefulStop(t, true) } func TestEmbedEtcdGracefulStopInsecure(t *testing.T) { testEmbedEtcdGracefulStop(t, false) }
//func TestEmbedEtcdGracefulStopInsecure(t *testing.T) { testEmbedEtcdGracefulStop(t, false) }
// testEmbedEtcdGracefulStop ensures embedded server stops // testEmbedEtcdGracefulStop ensures embedded server stops
// cutting existing transports. // cutting existing transports.

View File

@ -135,11 +135,10 @@ func (ms *MockServers) StartAt(idx int) (err error) {
pb.RegisterKVServer(svr, &mockKVServer{}) pb.RegisterKVServer(svr, &mockKVServer{})
ms.Servers[idx].GrpcServer = svr ms.Servers[idx].GrpcServer = svr
ms.wg.Add(1)
go func(svr *grpc.Server, l net.Listener) { go func(svr *grpc.Server, l net.Listener) {
ms.wg.Add(1)
svr.Serve(l) svr.Serve(l)
}(ms.Servers[idx].GrpcServer, ms.Servers[idx].ln) }(ms.Servers[idx].GrpcServer, ms.Servers[idx].ln)
return nil return nil
} }