mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
[3.4] Backport #12671 clientv3: Replace balancer with upstream grpc solution
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
e1430f2a1b
commit
83da5ff575
@ -98,15 +98,6 @@
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"project": "github.com/google/uuid",
|
|
||||||
"licenses": [
|
|
||||||
{
|
|
||||||
"type": "BSD 3-clause \"New\" or \"Revised\" License",
|
|
||||||
"confidence": 0.9663865546218487
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"project": "github.com/gorilla/websocket",
|
"project": "github.com/gorilla/websocket",
|
||||||
"licenses": [
|
"licenses": [
|
||||||
|
@ -1,293 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
// Package balancer implements client balancer.
|
|
||||||
package balancer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer/connectivity"
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer/picker"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"google.golang.org/grpc/balancer"
|
|
||||||
grpcconnectivity "google.golang.org/grpc/connectivity"
|
|
||||||
"google.golang.org/grpc/resolver"
|
|
||||||
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
|
|
||||||
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
|
|
||||||
)
|
|
||||||
|
|
||||||
// Config defines balancer configurations.
|
|
||||||
type Config struct {
|
|
||||||
// Policy configures balancer policy.
|
|
||||||
Policy picker.Policy
|
|
||||||
|
|
||||||
// Picker implements gRPC picker.
|
|
||||||
// Leave empty if "Policy" field is not custom.
|
|
||||||
// TODO: currently custom policy is not supported.
|
|
||||||
// Picker picker.Picker
|
|
||||||
|
|
||||||
// Name defines an additional name for balancer.
|
|
||||||
// Useful for balancer testing to avoid register conflicts.
|
|
||||||
// If empty, defaults to policy name.
|
|
||||||
Name string
|
|
||||||
|
|
||||||
// Logger configures balancer logging.
|
|
||||||
// If nil, logs are discarded.
|
|
||||||
Logger *zap.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
|
|
||||||
// must be invoked at initialization time.
|
|
||||||
func RegisterBuilder(cfg Config) {
|
|
||||||
bb := &builder{cfg}
|
|
||||||
balancer.Register(bb)
|
|
||||||
|
|
||||||
bb.cfg.Logger.Debug(
|
|
||||||
"registered balancer",
|
|
||||||
zap.String("policy", bb.cfg.Policy.String()),
|
|
||||||
zap.String("name", bb.cfg.Name),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
type builder struct {
|
|
||||||
cfg Config
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build is called initially when creating "ccBalancerWrapper".
|
|
||||||
// "grpc.Dial" is called to this client connection.
|
|
||||||
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
|
|
||||||
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
|
|
||||||
bb := &baseBalancer{
|
|
||||||
id: strconv.FormatInt(time.Now().UnixNano(), 36),
|
|
||||||
policy: b.cfg.Policy,
|
|
||||||
name: b.cfg.Name,
|
|
||||||
lg: b.cfg.Logger,
|
|
||||||
|
|
||||||
addrToSc: make(map[resolver.Address]balancer.SubConn),
|
|
||||||
scToAddr: make(map[balancer.SubConn]resolver.Address),
|
|
||||||
scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
|
|
||||||
|
|
||||||
currentConn: nil,
|
|
||||||
connectivityRecorder: connectivity.New(b.cfg.Logger),
|
|
||||||
|
|
||||||
// initialize picker always returns "ErrNoSubConnAvailable"
|
|
||||||
picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: support multiple connections
|
|
||||||
bb.mu.Lock()
|
|
||||||
bb.currentConn = cc
|
|
||||||
bb.mu.Unlock()
|
|
||||||
|
|
||||||
bb.lg.Info(
|
|
||||||
"built balancer",
|
|
||||||
zap.String("balancer-id", bb.id),
|
|
||||||
zap.String("policy", bb.policy.String()),
|
|
||||||
zap.String("resolver-target", cc.Target()),
|
|
||||||
)
|
|
||||||
return bb
|
|
||||||
}
|
|
||||||
|
|
||||||
// Name implements "grpc/balancer.Builder" interface.
|
|
||||||
func (b *builder) Name() string { return b.cfg.Name }
|
|
||||||
|
|
||||||
// Balancer defines client balancer interface.
|
|
||||||
type Balancer interface {
|
|
||||||
// Balancer is called on specified client connection. Client initiates gRPC
|
|
||||||
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
|
|
||||||
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
|
|
||||||
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
|
|
||||||
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
|
|
||||||
// changes, thus requires failover logic in this method.
|
|
||||||
balancer.Balancer
|
|
||||||
|
|
||||||
// Picker calls "Pick" for every client request.
|
|
||||||
picker.Picker
|
|
||||||
}
|
|
||||||
|
|
||||||
type baseBalancer struct {
|
|
||||||
id string
|
|
||||||
policy picker.Policy
|
|
||||||
name string
|
|
||||||
lg *zap.Logger
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
|
||||||
|
|
||||||
addrToSc map[resolver.Address]balancer.SubConn
|
|
||||||
scToAddr map[balancer.SubConn]resolver.Address
|
|
||||||
scToSt map[balancer.SubConn]grpcconnectivity.State
|
|
||||||
|
|
||||||
currentConn balancer.ClientConn
|
|
||||||
connectivityRecorder connectivity.Recorder
|
|
||||||
|
|
||||||
picker picker.Picker
|
|
||||||
}
|
|
||||||
|
|
||||||
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
|
|
||||||
// gRPC sends initial or updated resolved addresses from "Build".
|
|
||||||
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
|
||||||
if err != nil {
|
|
||||||
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
bb.lg.Info("resolved",
|
|
||||||
zap.String("picker", bb.picker.String()),
|
|
||||||
zap.String("balancer-id", bb.id),
|
|
||||||
zap.Strings("addresses", addrsToStrings(addrs)),
|
|
||||||
)
|
|
||||||
|
|
||||||
bb.mu.Lock()
|
|
||||||
defer bb.mu.Unlock()
|
|
||||||
|
|
||||||
resolved := make(map[resolver.Address]struct{})
|
|
||||||
for _, addr := range addrs {
|
|
||||||
resolved[addr] = struct{}{}
|
|
||||||
if _, ok := bb.addrToSc[addr]; !ok {
|
|
||||||
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
|
|
||||||
if err != nil {
|
|
||||||
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
|
|
||||||
bb.addrToSc[addr] = sc
|
|
||||||
bb.scToAddr[sc] = addr
|
|
||||||
bb.scToSt[sc] = grpcconnectivity.Idle
|
|
||||||
sc.Connect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for addr, sc := range bb.addrToSc {
|
|
||||||
if _, ok := resolved[addr]; !ok {
|
|
||||||
// was removed by resolver or failed to create subconn
|
|
||||||
bb.currentConn.RemoveSubConn(sc)
|
|
||||||
delete(bb.addrToSc, addr)
|
|
||||||
|
|
||||||
bb.lg.Info(
|
|
||||||
"removed subconn",
|
|
||||||
zap.String("picker", bb.picker.String()),
|
|
||||||
zap.String("balancer-id", bb.id),
|
|
||||||
zap.String("address", addr.Addr),
|
|
||||||
zap.String("subconn", scToString(sc)),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
|
|
||||||
// The entry will be deleted in HandleSubConnStateChange.
|
|
||||||
// (DO NOT) delete(bb.scToAddr, sc)
|
|
||||||
// (DO NOT) delete(bb.scToSt, sc)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
|
|
||||||
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
|
|
||||||
bb.mu.Lock()
|
|
||||||
defer bb.mu.Unlock()
|
|
||||||
|
|
||||||
old, ok := bb.scToSt[sc]
|
|
||||||
if !ok {
|
|
||||||
bb.lg.Warn(
|
|
||||||
"state change for an unknown subconn",
|
|
||||||
zap.String("picker", bb.picker.String()),
|
|
||||||
zap.String("balancer-id", bb.id),
|
|
||||||
zap.String("subconn", scToString(sc)),
|
|
||||||
zap.Int("subconn-size", len(bb.scToAddr)),
|
|
||||||
zap.String("state", s.String()),
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
bb.lg.Info(
|
|
||||||
"state changed",
|
|
||||||
zap.String("picker", bb.picker.String()),
|
|
||||||
zap.String("balancer-id", bb.id),
|
|
||||||
zap.Bool("connected", s == grpcconnectivity.Ready),
|
|
||||||
zap.String("subconn", scToString(sc)),
|
|
||||||
zap.Int("subconn-size", len(bb.scToAddr)),
|
|
||||||
zap.String("address", bb.scToAddr[sc].Addr),
|
|
||||||
zap.String("old-state", old.String()),
|
|
||||||
zap.String("new-state", s.String()),
|
|
||||||
)
|
|
||||||
|
|
||||||
bb.scToSt[sc] = s
|
|
||||||
switch s {
|
|
||||||
case grpcconnectivity.Idle:
|
|
||||||
sc.Connect()
|
|
||||||
case grpcconnectivity.Shutdown:
|
|
||||||
// When an address was removed by resolver, b called RemoveSubConn but
|
|
||||||
// kept the sc's state in scToSt. Remove state for this sc here.
|
|
||||||
delete(bb.scToAddr, sc)
|
|
||||||
delete(bb.scToSt, sc)
|
|
||||||
}
|
|
||||||
|
|
||||||
oldAggrState := bb.connectivityRecorder.GetCurrentState()
|
|
||||||
bb.connectivityRecorder.RecordTransition(old, s)
|
|
||||||
|
|
||||||
// Update balancer picker when one of the following happens:
|
|
||||||
// - this sc became ready from not-ready
|
|
||||||
// - this sc became not-ready from ready
|
|
||||||
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
|
|
||||||
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
|
|
||||||
if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
|
|
||||||
(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
|
|
||||||
bb.updatePicker()
|
|
||||||
}
|
|
||||||
|
|
||||||
bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bb *baseBalancer) updatePicker() {
|
|
||||||
if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
|
|
||||||
bb.picker = picker.NewErr(balancer.ErrTransientFailure)
|
|
||||||
bb.lg.Info(
|
|
||||||
"updated picker to transient error picker",
|
|
||||||
zap.String("picker", bb.picker.String()),
|
|
||||||
zap.String("balancer-id", bb.id),
|
|
||||||
zap.String("policy", bb.policy.String()),
|
|
||||||
)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// only pass ready subconns to picker
|
|
||||||
scToAddr := make(map[balancer.SubConn]resolver.Address)
|
|
||||||
for addr, sc := range bb.addrToSc {
|
|
||||||
if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
|
|
||||||
scToAddr[sc] = addr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bb.picker = picker.New(picker.Config{
|
|
||||||
Policy: bb.policy,
|
|
||||||
Logger: bb.lg,
|
|
||||||
SubConnToResolverAddress: scToAddr,
|
|
||||||
})
|
|
||||||
bb.lg.Info(
|
|
||||||
"updated picker",
|
|
||||||
zap.String("picker", bb.picker.String()),
|
|
||||||
zap.String("balancer-id", bb.id),
|
|
||||||
zap.String("policy", bb.policy.String()),
|
|
||||||
zap.Strings("subconn-ready", scsToStrings(scToAddr)),
|
|
||||||
zap.Int("subconn-size", len(scToAddr)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close implements "grpc/balancer.Balancer" interface.
|
|
||||||
// Close is a nop because base balancer doesn't have internal state to clean up,
|
|
||||||
// and it doesn't need to call RemoveSubConn for the SubConns.
|
|
||||||
func (bb *baseBalancer) Close() {
|
|
||||||
// TODO
|
|
||||||
}
|
|
@ -1,323 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package balancer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer/picker"
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
|
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
|
||||||
"go.etcd.io/etcd/pkg/mock/mockserver"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
"google.golang.org/grpc/peer"
|
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestRoundRobinBalancedResolvableNoFailover ensures that
|
|
||||||
// requests to a resolvable endpoint can be balanced between
|
|
||||||
// multiple, if any, nodes. And there needs be no failover.
|
|
||||||
func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
serverCount int
|
|
||||||
reqN int
|
|
||||||
network string
|
|
||||||
}{
|
|
||||||
{name: "rrBalanced_1", serverCount: 1, reqN: 5, network: "tcp"},
|
|
||||||
{name: "rrBalanced_1_unix_sockets", serverCount: 1, reqN: 5, network: "unix"},
|
|
||||||
{name: "rrBalanced_3", serverCount: 3, reqN: 7, network: "tcp"},
|
|
||||||
{name: "rrBalanced_5", serverCount: 5, reqN: 10, network: "tcp"},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
ms, err := mockserver.StartMockServersOnNetwork(tc.serverCount, tc.network)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to start mock servers: %v", err)
|
|
||||||
}
|
|
||||||
defer ms.Stop()
|
|
||||||
|
|
||||||
var eps []string
|
|
||||||
for _, svr := range ms.Servers {
|
|
||||||
eps = append(eps, svr.ResolverAddress().Addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
rsv, err := endpoint.NewResolverGroup("nofailover")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer rsv.Close()
|
|
||||||
rsv.SetEndpoints(eps)
|
|
||||||
|
|
||||||
name := genName()
|
|
||||||
cfg := Config{
|
|
||||||
Policy: picker.RoundrobinBalanced,
|
|
||||||
Name: name,
|
|
||||||
Logger: zap.NewExample(),
|
|
||||||
}
|
|
||||||
RegisterBuilder(cfg)
|
|
||||||
conn, err := grpc.Dial(fmt.Sprintf("endpoint://nofailover/*"), grpc.WithInsecure(), grpc.WithBalancerName(name))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to dial mock server: %v", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
cli := pb.NewKVClient(conn)
|
|
||||||
|
|
||||||
reqFunc := func(ctx context.Context) (picked string, err error) {
|
|
||||||
var p peer.Peer
|
|
||||||
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
|
|
||||||
if p.Addr != nil {
|
|
||||||
picked = p.Addr.String()
|
|
||||||
}
|
|
||||||
return picked, err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, picked, err := warmupConnections(reqFunc, tc.serverCount, "")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected failure %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify that we round robin
|
|
||||||
prev, switches := picked, 0
|
|
||||||
for i := 0; i < tc.reqN; i++ {
|
|
||||||
picked, err = reqFunc(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("#%d: unexpected failure %v", i, err)
|
|
||||||
}
|
|
||||||
if prev != picked {
|
|
||||||
switches++
|
|
||||||
}
|
|
||||||
prev = picked
|
|
||||||
}
|
|
||||||
if tc.serverCount > 1 && switches != tc.reqN {
|
|
||||||
t.Fatalf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestRoundRobinBalancedResolvableFailoverFromServerFail ensures that
|
|
||||||
// loads be rebalanced while one server goes down and comes back.
|
|
||||||
func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
|
|
||||||
serverCount := 5
|
|
||||||
ms, err := mockserver.StartMockServers(serverCount)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to start mock servers: %s", err)
|
|
||||||
}
|
|
||||||
defer ms.Stop()
|
|
||||||
var eps []string
|
|
||||||
for _, svr := range ms.Servers {
|
|
||||||
eps = append(eps, svr.ResolverAddress().Addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
rsv, err := endpoint.NewResolverGroup("serverfail")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer rsv.Close()
|
|
||||||
rsv.SetEndpoints(eps)
|
|
||||||
|
|
||||||
name := genName()
|
|
||||||
cfg := Config{
|
|
||||||
Policy: picker.RoundrobinBalanced,
|
|
||||||
Name: name,
|
|
||||||
Logger: zap.NewExample(),
|
|
||||||
}
|
|
||||||
RegisterBuilder(cfg)
|
|
||||||
conn, err := grpc.Dial(fmt.Sprintf("endpoint://serverfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to dial mock server: %s", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
cli := pb.NewKVClient(conn)
|
|
||||||
|
|
||||||
reqFunc := func(ctx context.Context) (picked string, err error) {
|
|
||||||
var p peer.Peer
|
|
||||||
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
|
|
||||||
if p.Addr != nil {
|
|
||||||
picked = p.Addr.String()
|
|
||||||
}
|
|
||||||
return picked, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// stop first server, loads should be redistributed
|
|
||||||
ms.StopAt(0)
|
|
||||||
// stopped server will be transitioned into TRANSIENT_FAILURE state
|
|
||||||
// but it doesn't happen instantaneously and it can still be picked for a short period of time
|
|
||||||
// we ignore "transport is closing" in such case
|
|
||||||
available, picked, err := warmupConnections(reqFunc, serverCount-1, "transport is closing")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected failure %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
reqN := 10
|
|
||||||
prev, switches := picked, 0
|
|
||||||
for i := 0; i < reqN; i++ {
|
|
||||||
picked, err = reqFunc(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("#%d: unexpected failure %v", i, err)
|
|
||||||
}
|
|
||||||
if _, ok := available[picked]; !ok {
|
|
||||||
t.Fatalf("picked unavailable address %q (available %v)", picked, available)
|
|
||||||
}
|
|
||||||
if prev != picked {
|
|
||||||
switches++
|
|
||||||
}
|
|
||||||
prev = picked
|
|
||||||
}
|
|
||||||
if switches != reqN {
|
|
||||||
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
|
|
||||||
}
|
|
||||||
|
|
||||||
// now failed server comes back
|
|
||||||
ms.StartAt(0)
|
|
||||||
available, picked, err = warmupConnections(reqFunc, serverCount, "")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected failure %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
prev, switches = picked, 0
|
|
||||||
recoveredAddr, recovered := eps[0], 0
|
|
||||||
available[recoveredAddr] = struct{}{}
|
|
||||||
|
|
||||||
for i := 0; i < 2*reqN; i++ {
|
|
||||||
picked, err := reqFunc(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("#%d: unexpected failure %v", i, err)
|
|
||||||
}
|
|
||||||
if _, ok := available[picked]; !ok {
|
|
||||||
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
|
|
||||||
}
|
|
||||||
if prev != picked {
|
|
||||||
switches++
|
|
||||||
}
|
|
||||||
if picked == recoveredAddr {
|
|
||||||
recovered++
|
|
||||||
}
|
|
||||||
prev = picked
|
|
||||||
}
|
|
||||||
if switches != 2*reqN {
|
|
||||||
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
|
|
||||||
}
|
|
||||||
if recovered != 2*reqN/serverCount {
|
|
||||||
t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestRoundRobinBalancedResolvableFailoverFromRequestFail ensures that
|
|
||||||
// loads be rebalanced while some requests are failed.
|
|
||||||
func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
|
|
||||||
serverCount := 5
|
|
||||||
ms, err := mockserver.StartMockServers(serverCount)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to start mock servers: %s", err)
|
|
||||||
}
|
|
||||||
defer ms.Stop()
|
|
||||||
var eps []string
|
|
||||||
for _, svr := range ms.Servers {
|
|
||||||
eps = append(eps, svr.ResolverAddress().Addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
rsv, err := endpoint.NewResolverGroup("requestfail")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer rsv.Close()
|
|
||||||
rsv.SetEndpoints(eps)
|
|
||||||
|
|
||||||
name := genName()
|
|
||||||
cfg := Config{
|
|
||||||
Policy: picker.RoundrobinBalanced,
|
|
||||||
Name: name,
|
|
||||||
Logger: zap.NewExample(),
|
|
||||||
}
|
|
||||||
RegisterBuilder(cfg)
|
|
||||||
conn, err := grpc.Dial(fmt.Sprintf("endpoint://requestfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to dial mock server: %s", err)
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
cli := pb.NewKVClient(conn)
|
|
||||||
|
|
||||||
reqFunc := func(ctx context.Context) (picked string, err error) {
|
|
||||||
var p peer.Peer
|
|
||||||
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
|
|
||||||
if p.Addr != nil {
|
|
||||||
picked = p.Addr.String()
|
|
||||||
}
|
|
||||||
return picked, err
|
|
||||||
}
|
|
||||||
|
|
||||||
available, picked, err := warmupConnections(reqFunc, serverCount, "")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected failure %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
reqN := 20
|
|
||||||
prev, switches := "", 0
|
|
||||||
for i := 0; i < reqN; i++ {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
if i%2 == 0 {
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
picked, err = reqFunc(ctx)
|
|
||||||
if i%2 == 0 {
|
|
||||||
if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled {
|
|
||||||
t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, ok := available[picked]; !ok {
|
|
||||||
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
|
|
||||||
}
|
|
||||||
if prev != picked {
|
|
||||||
switches++
|
|
||||||
}
|
|
||||||
prev = picked
|
|
||||||
}
|
|
||||||
if switches != reqN/2 {
|
|
||||||
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type reqFuncT = func(ctx context.Context) (picked string, err error)
|
|
||||||
|
|
||||||
func warmupConnections(reqFunc reqFuncT, serverCount int, ignoreErr string) (map[string]struct{}, string, error) {
|
|
||||||
var picked string
|
|
||||||
var err error
|
|
||||||
available := make(map[string]struct{})
|
|
||||||
// cycle through all peers to indirectly verify that balancer subconn list is fully loaded
|
|
||||||
// otherwise we can't reliably count switches between 'picked' peers in the test assert phase
|
|
||||||
for len(available) < serverCount {
|
|
||||||
picked, err = reqFunc(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
if ignoreErr != "" && strings.Contains(err.Error(), ignoreErr) {
|
|
||||||
// skip ignored errors
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return available, picked, err
|
|
||||||
}
|
|
||||||
available[picked] = struct{}{}
|
|
||||||
}
|
|
||||||
return available, picked, err
|
|
||||||
}
|
|
@ -1,93 +0,0 @@
|
|||||||
// Copyright 2019 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
// Package connectivity implements client connectivity operations.
|
|
||||||
package connectivity
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"google.golang.org/grpc/connectivity"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Recorder records gRPC connectivity.
|
|
||||||
type Recorder interface {
|
|
||||||
GetCurrentState() connectivity.State
|
|
||||||
RecordTransition(oldState, newState connectivity.State)
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a new Recorder.
|
|
||||||
func New(lg *zap.Logger) Recorder {
|
|
||||||
return &recorder{lg: lg}
|
|
||||||
}
|
|
||||||
|
|
||||||
// recorder takes the connectivity states of multiple SubConns
|
|
||||||
// and returns one aggregated connectivity state.
|
|
||||||
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
|
|
||||||
type recorder struct {
|
|
||||||
lg *zap.Logger
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
|
||||||
|
|
||||||
cur connectivity.State
|
|
||||||
|
|
||||||
numReady uint64 // Number of addrConns in ready state.
|
|
||||||
numConnecting uint64 // Number of addrConns in connecting state.
|
|
||||||
numTransientFailure uint64 // Number of addrConns in transientFailure.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc *recorder) GetCurrentState() (state connectivity.State) {
|
|
||||||
rc.mu.RLock()
|
|
||||||
defer rc.mu.RUnlock()
|
|
||||||
return rc.cur
|
|
||||||
}
|
|
||||||
|
|
||||||
// RecordTransition records state change happening in subConn and based on that
|
|
||||||
// it evaluates what aggregated state should be.
|
|
||||||
//
|
|
||||||
// - If at least one SubConn in Ready, the aggregated state is Ready;
|
|
||||||
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
|
|
||||||
// - Else the aggregated state is TransientFailure.
|
|
||||||
//
|
|
||||||
// Idle and Shutdown are not considered.
|
|
||||||
//
|
|
||||||
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
|
|
||||||
func (rc *recorder) RecordTransition(oldState, newState connectivity.State) {
|
|
||||||
rc.mu.Lock()
|
|
||||||
defer rc.mu.Unlock()
|
|
||||||
|
|
||||||
for idx, state := range []connectivity.State{oldState, newState} {
|
|
||||||
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
|
|
||||||
switch state {
|
|
||||||
case connectivity.Ready:
|
|
||||||
rc.numReady += updateVal
|
|
||||||
case connectivity.Connecting:
|
|
||||||
rc.numConnecting += updateVal
|
|
||||||
case connectivity.TransientFailure:
|
|
||||||
rc.numTransientFailure += updateVal
|
|
||||||
default:
|
|
||||||
rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch { // must be exclusive, no overlap
|
|
||||||
case rc.numReady > 0:
|
|
||||||
rc.cur = connectivity.Ready
|
|
||||||
case rc.numConnecting > 0:
|
|
||||||
rc.cur = connectivity.Connecting
|
|
||||||
default:
|
|
||||||
rc.cur = connectivity.TransientFailure
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,16 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
// Package picker defines/implements client balancer picker policy.
|
|
||||||
package picker
|
|
@ -1,39 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package picker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"google.golang.org/grpc/balancer"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewErr returns a picker that always returns err on "Pick".
|
|
||||||
func NewErr(err error) Picker {
|
|
||||||
return &errPicker{p: Error, err: err}
|
|
||||||
}
|
|
||||||
|
|
||||||
type errPicker struct {
|
|
||||||
p Policy
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ep *errPicker) String() string {
|
|
||||||
return ep.p.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
|
|
||||||
return nil, nil, ep.err
|
|
||||||
}
|
|
@ -1,91 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package picker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"google.golang.org/grpc/balancer"
|
|
||||||
"google.golang.org/grpc/resolver"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Picker defines balancer Picker methods.
|
|
||||||
type Picker interface {
|
|
||||||
balancer.Picker
|
|
||||||
String() string
|
|
||||||
}
|
|
||||||
|
|
||||||
// Config defines picker configuration.
|
|
||||||
type Config struct {
|
|
||||||
// Policy specifies etcd clientv3's built in balancer policy.
|
|
||||||
Policy Policy
|
|
||||||
|
|
||||||
// Logger defines picker logging object.
|
|
||||||
Logger *zap.Logger
|
|
||||||
|
|
||||||
// SubConnToResolverAddress maps each gRPC sub-connection to an address.
|
|
||||||
// Basically, it is a list of addresses that the Picker can pick from.
|
|
||||||
SubConnToResolverAddress map[balancer.SubConn]resolver.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
// Policy defines balancer picker policy.
|
|
||||||
type Policy uint8
|
|
||||||
|
|
||||||
const (
|
|
||||||
// Error is error picker policy.
|
|
||||||
Error Policy = iota
|
|
||||||
|
|
||||||
// RoundrobinBalanced balances loads over multiple endpoints
|
|
||||||
// and implements failover in roundrobin fashion.
|
|
||||||
RoundrobinBalanced
|
|
||||||
|
|
||||||
// Custom defines custom balancer picker.
|
|
||||||
// TODO: custom picker is not supported yet.
|
|
||||||
Custom
|
|
||||||
)
|
|
||||||
|
|
||||||
func (p Policy) String() string {
|
|
||||||
switch p {
|
|
||||||
case Error:
|
|
||||||
return "picker-error"
|
|
||||||
|
|
||||||
case RoundrobinBalanced:
|
|
||||||
return "picker-roundrobin-balanced"
|
|
||||||
|
|
||||||
case Custom:
|
|
||||||
panic("'custom' picker policy is not supported yet")
|
|
||||||
|
|
||||||
default:
|
|
||||||
panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a new Picker.
|
|
||||||
func New(cfg Config) Picker {
|
|
||||||
switch cfg.Policy {
|
|
||||||
case Error:
|
|
||||||
panic("'error' picker policy is not supported here; use 'picker.NewErr'")
|
|
||||||
|
|
||||||
case RoundrobinBalanced:
|
|
||||||
return newRoundrobinBalanced(cfg)
|
|
||||||
|
|
||||||
case Custom:
|
|
||||||
panic("'custom' picker policy is not supported yet")
|
|
||||||
|
|
||||||
default:
|
|
||||||
panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy))
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,95 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package picker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"go.uber.org/zap/zapcore"
|
|
||||||
"google.golang.org/grpc/balancer"
|
|
||||||
"google.golang.org/grpc/resolver"
|
|
||||||
)
|
|
||||||
|
|
||||||
// newRoundrobinBalanced returns a new roundrobin balanced picker.
|
|
||||||
func newRoundrobinBalanced(cfg Config) Picker {
|
|
||||||
scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
|
|
||||||
for sc := range cfg.SubConnToResolverAddress {
|
|
||||||
scs = append(scs, sc)
|
|
||||||
}
|
|
||||||
return &rrBalanced{
|
|
||||||
p: RoundrobinBalanced,
|
|
||||||
lg: cfg.Logger,
|
|
||||||
scs: scs,
|
|
||||||
scToAddr: cfg.SubConnToResolverAddress,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type rrBalanced struct {
|
|
||||||
p Policy
|
|
||||||
|
|
||||||
lg *zap.Logger
|
|
||||||
|
|
||||||
mu sync.RWMutex
|
|
||||||
next int
|
|
||||||
scs []balancer.SubConn
|
|
||||||
scToAddr map[balancer.SubConn]resolver.Address
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rb *rrBalanced) String() string { return rb.p.String() }
|
|
||||||
|
|
||||||
// Pick is called for every client request.
|
|
||||||
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
|
|
||||||
rb.mu.RLock()
|
|
||||||
n := len(rb.scs)
|
|
||||||
rb.mu.RUnlock()
|
|
||||||
if n == 0 {
|
|
||||||
return nil, nil, balancer.ErrNoSubConnAvailable
|
|
||||||
}
|
|
||||||
|
|
||||||
rb.mu.Lock()
|
|
||||||
cur := rb.next
|
|
||||||
sc := rb.scs[cur]
|
|
||||||
picked := rb.scToAddr[sc].Addr
|
|
||||||
rb.next = (rb.next + 1) % len(rb.scs)
|
|
||||||
rb.mu.Unlock()
|
|
||||||
|
|
||||||
rb.lg.Debug(
|
|
||||||
"picked",
|
|
||||||
zap.String("picker", rb.p.String()),
|
|
||||||
zap.String("address", picked),
|
|
||||||
zap.Int("subconn-index", cur),
|
|
||||||
zap.Int("subconn-size", n),
|
|
||||||
)
|
|
||||||
|
|
||||||
doneFunc := func(info balancer.DoneInfo) {
|
|
||||||
// TODO: error handling?
|
|
||||||
fss := []zapcore.Field{
|
|
||||||
zap.Error(info.Err),
|
|
||||||
zap.String("picker", rb.p.String()),
|
|
||||||
zap.String("address", picked),
|
|
||||||
zap.Bool("success", info.Err == nil),
|
|
||||||
zap.Bool("bytes-sent", info.BytesSent),
|
|
||||||
zap.Bool("bytes-received", info.BytesReceived),
|
|
||||||
}
|
|
||||||
if info.Err == nil {
|
|
||||||
rb.lg.Debug("balancer done", fss...)
|
|
||||||
} else {
|
|
||||||
rb.lg.Warn("balancer failed", fss...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sc, doneFunc, nil
|
|
||||||
}
|
|
@ -1,248 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
|
|
||||||
package endpoint
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"google.golang.org/grpc/resolver"
|
|
||||||
)
|
|
||||||
|
|
||||||
const scheme = "endpoint"
|
|
||||||
|
|
||||||
var (
|
|
||||||
targetPrefix = fmt.Sprintf("%s://", scheme)
|
|
||||||
|
|
||||||
bldr *builder
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
bldr = &builder{
|
|
||||||
resolverGroups: make(map[string]*ResolverGroup),
|
|
||||||
}
|
|
||||||
resolver.Register(bldr)
|
|
||||||
}
|
|
||||||
|
|
||||||
type builder struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
resolverGroups map[string]*ResolverGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewResolverGroup creates a new ResolverGroup with the given id.
|
|
||||||
func NewResolverGroup(id string) (*ResolverGroup, error) {
|
|
||||||
return bldr.newResolverGroup(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
|
|
||||||
// up-to-date.
|
|
||||||
type ResolverGroup struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
id string
|
|
||||||
endpoints []string
|
|
||||||
resolvers []*Resolver
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ResolverGroup) addResolver(r *Resolver) {
|
|
||||||
e.mu.Lock()
|
|
||||||
addrs := epsToAddrs(e.endpoints...)
|
|
||||||
e.resolvers = append(e.resolvers, r)
|
|
||||||
e.mu.Unlock()
|
|
||||||
r.cc.NewAddress(addrs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ResolverGroup) removeResolver(r *Resolver) {
|
|
||||||
e.mu.Lock()
|
|
||||||
for i, er := range e.resolvers {
|
|
||||||
if er == r {
|
|
||||||
e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
e.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
|
|
||||||
// immediately with the new endpoints.
|
|
||||||
func (e *ResolverGroup) SetEndpoints(endpoints []string) {
|
|
||||||
addrs := epsToAddrs(endpoints...)
|
|
||||||
e.mu.Lock()
|
|
||||||
e.endpoints = endpoints
|
|
||||||
for _, r := range e.resolvers {
|
|
||||||
r.cc.NewAddress(addrs)
|
|
||||||
}
|
|
||||||
e.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Target constructs a endpoint target using the endpoint id of the ResolverGroup.
|
|
||||||
func (e *ResolverGroup) Target(endpoint string) string {
|
|
||||||
return Target(e.id, endpoint)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Target constructs a endpoint resolver target.
|
|
||||||
func Target(id, endpoint string) string {
|
|
||||||
return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsTarget checks if a given target string in an endpoint resolver target.
|
|
||||||
func IsTarget(target string) bool {
|
|
||||||
return strings.HasPrefix(target, "endpoint://")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *ResolverGroup) Close() {
|
|
||||||
bldr.close(e.id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
|
|
||||||
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
|
||||||
if len(target.Authority) < 1 {
|
|
||||||
return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
|
|
||||||
}
|
|
||||||
id := target.Authority
|
|
||||||
es, err := b.getResolverGroup(id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to build resolver: %v", err)
|
|
||||||
}
|
|
||||||
r := &Resolver{
|
|
||||||
endpointID: id,
|
|
||||||
cc: cc,
|
|
||||||
}
|
|
||||||
es.addResolver(r)
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
|
|
||||||
b.mu.RLock()
|
|
||||||
_, ok := b.resolverGroups[id]
|
|
||||||
b.mu.RUnlock()
|
|
||||||
if ok {
|
|
||||||
return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
|
|
||||||
}
|
|
||||||
|
|
||||||
es := &ResolverGroup{id: id}
|
|
||||||
b.mu.Lock()
|
|
||||||
b.resolverGroups[id] = es
|
|
||||||
b.mu.Unlock()
|
|
||||||
return es, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
|
|
||||||
b.mu.RLock()
|
|
||||||
es, ok := b.resolverGroups[id]
|
|
||||||
b.mu.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
|
|
||||||
}
|
|
||||||
return es, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *builder) close(id string) {
|
|
||||||
b.mu.Lock()
|
|
||||||
delete(b.resolverGroups, id)
|
|
||||||
b.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *builder) Scheme() string {
|
|
||||||
return scheme
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resolver provides a resolver for a single etcd cluster, identified by name.
|
|
||||||
type Resolver struct {
|
|
||||||
endpointID string
|
|
||||||
cc resolver.ClientConn
|
|
||||||
sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: use balancer.epsToAddrs
|
|
||||||
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
|
|
||||||
addrs = make([]resolver.Address, 0, len(eps))
|
|
||||||
for _, ep := range eps {
|
|
||||||
_, host, _ := ParseEndpoint(ep)
|
|
||||||
addrs = append(addrs, resolver.Address{Addr: ep, ServerName: host})
|
|
||||||
}
|
|
||||||
return addrs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*Resolver) ResolveNow(o resolver.ResolveNowOptions) {}
|
|
||||||
|
|
||||||
func (r *Resolver) Close() {
|
|
||||||
es, err := bldr.getResolverGroup(r.endpointID)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
es.removeResolver(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ParseEndpoint endpoint parses an endpoint of the form
|
|
||||||
// (http|https)://<host>*|(unix|unixs)://<path>)
|
|
||||||
// and returns a protocol ('tcp' or 'unix'),
|
|
||||||
// host (or filepath if a unix socket),
|
|
||||||
// scheme (http, https, unix, unixs).
|
|
||||||
func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
|
|
||||||
proto = "tcp"
|
|
||||||
host = endpoint
|
|
||||||
url, uerr := url.Parse(endpoint)
|
|
||||||
if uerr != nil || !strings.Contains(endpoint, "://") {
|
|
||||||
return proto, host, scheme
|
|
||||||
}
|
|
||||||
scheme = url.Scheme
|
|
||||||
|
|
||||||
// strip scheme:// prefix since grpc dials by host
|
|
||||||
host = url.Host
|
|
||||||
switch url.Scheme {
|
|
||||||
case "http", "https":
|
|
||||||
case "unix", "unixs":
|
|
||||||
proto = "unix"
|
|
||||||
host = url.Host + url.Path
|
|
||||||
default:
|
|
||||||
proto, host = "", ""
|
|
||||||
}
|
|
||||||
return proto, host, scheme
|
|
||||||
}
|
|
||||||
|
|
||||||
// ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint.
|
|
||||||
// If the target is malformed, an error is returned.
|
|
||||||
func ParseTarget(target string) (string, string, error) {
|
|
||||||
noPrefix := strings.TrimPrefix(target, targetPrefix)
|
|
||||||
if noPrefix == target {
|
|
||||||
return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target)
|
|
||||||
}
|
|
||||||
parts := strings.SplitN(noPrefix, "/", 2)
|
|
||||||
if len(parts) != 2 {
|
|
||||||
return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target)
|
|
||||||
}
|
|
||||||
return parts[0], parts[1], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dialer dials a endpoint using net.Dialer.
|
|
||||||
// Context cancelation and timeout are supported.
|
|
||||||
func Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
|
|
||||||
proto, host, _ := ParseEndpoint(dialEp)
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
dialer := &net.Dialer{}
|
|
||||||
if deadline, ok := ctx.Deadline(); ok {
|
|
||||||
dialer.Deadline = deadline
|
|
||||||
}
|
|
||||||
return dialer.DialContext(ctx, proto, host)
|
|
||||||
}
|
|
@ -1,68 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package balancer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net/url"
|
|
||||||
"sort"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"google.golang.org/grpc/balancer"
|
|
||||||
"google.golang.org/grpc/resolver"
|
|
||||||
)
|
|
||||||
|
|
||||||
func scToString(sc balancer.SubConn) string {
|
|
||||||
return fmt.Sprintf("%p", sc)
|
|
||||||
}
|
|
||||||
|
|
||||||
func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) {
|
|
||||||
ss = make([]string, 0, len(scs))
|
|
||||||
for sc, a := range scs {
|
|
||||||
ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
|
|
||||||
}
|
|
||||||
sort.Strings(ss)
|
|
||||||
return ss
|
|
||||||
}
|
|
||||||
|
|
||||||
func addrsToStrings(addrs []resolver.Address) (ss []string) {
|
|
||||||
ss = make([]string, len(addrs))
|
|
||||||
for i := range addrs {
|
|
||||||
ss[i] = addrs[i].Addr
|
|
||||||
}
|
|
||||||
sort.Strings(ss)
|
|
||||||
return ss
|
|
||||||
}
|
|
||||||
|
|
||||||
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
|
|
||||||
addrs = make([]resolver.Address, 0, len(eps))
|
|
||||||
for _, ep := range eps {
|
|
||||||
u, err := url.Parse(ep)
|
|
||||||
if err != nil {
|
|
||||||
addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend})
|
|
||||||
}
|
|
||||||
return addrs
|
|
||||||
}
|
|
||||||
|
|
||||||
var genN = new(uint32)
|
|
||||||
|
|
||||||
func genName() string {
|
|
||||||
now := time.Now().UnixNano()
|
|
||||||
return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1))
|
|
||||||
}
|
|
@ -1,34 +0,0 @@
|
|||||||
// Copyright 2018 The etcd Authors
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package balancer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"google.golang.org/grpc/resolver"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_epsToAddrs(t *testing.T) {
|
|
||||||
eps := []string{"https://example.com:2379", "127.0.0.1:2379"}
|
|
||||||
exp := []resolver.Address{
|
|
||||||
{Addr: "example.com:2379", Type: resolver.Backend},
|
|
||||||
{Addr: "127.0.0.1:2379", Type: resolver.Backend},
|
|
||||||
}
|
|
||||||
rs := epsToAddrs(eps...)
|
|
||||||
if !reflect.DeepEqual(rs, exp) {
|
|
||||||
t.Fatalf("expected %v, got %v", exp, rs)
|
|
||||||
}
|
|
||||||
}
|
|
@ -18,14 +18,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
@ -33,10 +30,9 @@ import (
|
|||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer"
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer/picker"
|
|
||||||
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
|
|
||||||
"go.etcd.io/etcd/clientv3/credentials"
|
"go.etcd.io/etcd/clientv3/credentials"
|
||||||
|
"go.etcd.io/etcd/clientv3/internal/endpoint"
|
||||||
|
"go.etcd.io/etcd/clientv3/internal/resolver"
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
"go.etcd.io/etcd/pkg/logutil"
|
"go.etcd.io/etcd/pkg/logutil"
|
||||||
)
|
)
|
||||||
@ -44,31 +40,8 @@ import (
|
|||||||
var (
|
var (
|
||||||
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
|
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
|
||||||
ErrOldCluster = errors.New("etcdclient: old cluster version")
|
ErrOldCluster = errors.New("etcdclient: old cluster version")
|
||||||
|
|
||||||
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
lg := zap.NewNop()
|
|
||||||
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
|
|
||||||
lcfg := logutil.DefaultZapLoggerConfig
|
|
||||||
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
lg, err = lcfg.Build() // info level logging
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: support custom balancer
|
|
||||||
balancer.RegisterBuilder(balancer.Config{
|
|
||||||
Policy: picker.RoundrobinBalanced,
|
|
||||||
Name: roundRobinBalancerName,
|
|
||||||
Logger: lg,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Client provides and manages an etcd v3 client session.
|
// Client provides and manages an etcd v3 client session.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
Cluster
|
Cluster
|
||||||
@ -80,10 +53,10 @@ type Client struct {
|
|||||||
|
|
||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
|
|
||||||
cfg Config
|
cfg Config
|
||||||
creds grpccredentials.TransportCredentials
|
creds grpccredentials.TransportCredentials
|
||||||
resolverGroup *endpoint.ResolverGroup
|
resolver *resolver.EtcdManualResolver
|
||||||
mu *sync.RWMutex
|
mu *sync.RWMutex
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@ -153,9 +126,6 @@ func (c *Client) Close() error {
|
|||||||
if c.Lease != nil {
|
if c.Lease != nil {
|
||||||
c.Lease.Close()
|
c.Lease.Close()
|
||||||
}
|
}
|
||||||
if c.resolverGroup != nil {
|
|
||||||
c.resolverGroup.Close()
|
|
||||||
}
|
|
||||||
if c.conn != nil {
|
if c.conn != nil {
|
||||||
return toErr(c.ctx, c.conn.Close())
|
return toErr(c.ctx, c.conn.Close())
|
||||||
}
|
}
|
||||||
@ -182,7 +152,8 @@ func (c *Client) SetEndpoints(eps ...string) {
|
|||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
c.cfg.Endpoints = eps
|
c.cfg.Endpoints = eps
|
||||||
c.resolverGroup.SetEndpoints(eps)
|
|
||||||
|
c.resolver.SetEndpoints(eps)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
|
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
|
||||||
@ -215,29 +186,12 @@ func (c *Client) autoSync() {
|
|||||||
err := c.Sync(ctx)
|
err := c.Sync(ctx)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil && err != c.ctx.Err() {
|
if err != nil && err != c.ctx.Err() {
|
||||||
lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
|
c.lg.Info("Auto sync endpoints failed.", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
|
|
||||||
creds = c.creds
|
|
||||||
switch scheme {
|
|
||||||
case "unix":
|
|
||||||
case "http":
|
|
||||||
creds = nil
|
|
||||||
case "https", "unixs":
|
|
||||||
if creds != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
|
|
||||||
default:
|
|
||||||
creds = nil
|
|
||||||
}
|
|
||||||
return creds
|
|
||||||
}
|
|
||||||
|
|
||||||
// dialSetupOpts gives the dial opts prior to any authentication.
|
// dialSetupOpts gives the dial opts prior to any authentication.
|
||||||
func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
|
func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
|
||||||
if c.cfg.DialKeepAliveTime > 0 {
|
if c.cfg.DialKeepAliveTime > 0 {
|
||||||
@ -250,13 +204,12 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
|
|||||||
}
|
}
|
||||||
opts = append(opts, dopts...)
|
opts = append(opts, dopts...)
|
||||||
|
|
||||||
dialer := endpoint.Dialer
|
|
||||||
if creds != nil {
|
if creds != nil {
|
||||||
opts = append(opts, grpc.WithTransportCredentials(creds))
|
opts = append(opts, grpc.WithTransportCredentials(creds))
|
||||||
} else {
|
} else {
|
||||||
opts = append(opts, grpc.WithInsecure())
|
opts = append(opts, grpc.WithInsecure())
|
||||||
}
|
}
|
||||||
opts = append(opts, grpc.WithContextDialer(dialer))
|
grpc.WithDisableRetry()
|
||||||
|
|
||||||
// Interceptor retry and backoff.
|
// Interceptor retry and backoff.
|
||||||
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
|
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
|
||||||
@ -275,15 +228,11 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
|
|||||||
|
|
||||||
// Dial connects to a single endpoint using the client's config.
|
// Dial connects to a single endpoint using the client's config.
|
||||||
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
|
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
|
||||||
creds, err := c.directDialCreds(ep)
|
creds := c.credentialsForEndpoint(ep)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
// Using ad-hoc created resolver, to guarantee only explicitly given
|
||||||
}
|
// endpoint is used.
|
||||||
// Use the grpc passthrough resolver to directly dial a single endpoint.
|
return c.dial(creds, grpc.WithResolvers(resolver.New(ep)))
|
||||||
// This resolver passes through the 'unix' and 'unixs' endpoints schemes used
|
|
||||||
// by etcd without modification, allowing us to directly dial endpoints and
|
|
||||||
// using the same dial functions that we use for load balancer dialing.
|
|
||||||
return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) getToken(ctx context.Context) error {
|
func (c *Client) getToken(ctx context.Context) error {
|
||||||
@ -307,19 +256,17 @@ func (c *Client) getToken(ctx context.Context) error {
|
|||||||
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
|
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
|
||||||
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
|
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
|
||||||
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
_, host, _ := endpoint.ParseEndpoint(ep)
|
creds := c.credentialsForEndpoint(ep)
|
||||||
target := c.resolverGroup.Target(host)
|
opts := append(dopts, grpc.WithResolvers(c.resolver))
|
||||||
creds := c.dialWithBalancerCreds(ep)
|
return c.dial(creds, opts...)
|
||||||
return c.dial(target, creds, dopts...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// dial configures and dials any grpc balancer target.
|
// dial configures and dials any grpc balancer target.
|
||||||
func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
|
||||||
opts, err := c.dialSetupOpts(creds, dopts...)
|
opts, err := c.dialSetupOpts(creds, dopts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to configure dialer: %v", err)
|
return nil, fmt.Errorf("failed to configure dialer: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Username != "" && c.Password != "" {
|
if c.Username != "" && c.Password != "" {
|
||||||
c.authTokenBundle = credentials.NewBundle(credentials.Config{})
|
c.authTokenBundle = credentials.NewBundle(credentials.Config{})
|
||||||
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
|
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
|
||||||
@ -334,43 +281,21 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials,
|
|||||||
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
|
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := grpc.DialContext(dctx, target, opts...)
|
conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) {
|
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
|
||||||
_, host, scheme := endpoint.ParseEndpoint(ep)
|
if c.creds != nil {
|
||||||
creds := c.creds
|
return c.creds
|
||||||
if len(scheme) != 0 {
|
|
||||||
creds = c.processCreds(scheme)
|
|
||||||
if creds != nil {
|
|
||||||
clone := creds.Clone()
|
|
||||||
// Set the server name must to the endpoint hostname without port since grpc
|
|
||||||
// otherwise attempts to check if x509 cert is valid for the full endpoint
|
|
||||||
// including the scheme and port, which fails.
|
|
||||||
overrideServerName, _, err := net.SplitHostPort(host)
|
|
||||||
if err != nil {
|
|
||||||
// Either the host didn't have a port or the host could not be parsed. Either way, continue with the
|
|
||||||
// original host string.
|
|
||||||
overrideServerName = host
|
|
||||||
}
|
|
||||||
clone.OverrideServerName(overrideServerName)
|
|
||||||
creds = clone
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return creds, nil
|
if endpoint.RequiresCredentials(ep) {
|
||||||
}
|
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
|
||||||
|
|
||||||
func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
|
|
||||||
_, _, scheme := endpoint.ParseEndpoint(ep)
|
|
||||||
creds := c.creds
|
|
||||||
if len(scheme) != 0 {
|
|
||||||
creds = c.processCreds(scheme)
|
|
||||||
}
|
}
|
||||||
return creds
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClient(cfg *Config) (*Client, error) {
|
func newClient(cfg *Config) (*Client, error) {
|
||||||
@ -432,14 +357,7 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
client.callOpts = callOpts
|
client.callOpts = callOpts
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
|
client.resolver = resolver.New(cfg.Endpoints...)
|
||||||
// to dial so the client knows to use this resolver.
|
|
||||||
client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
|
|
||||||
if err != nil {
|
|
||||||
client.cancel()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
client.resolverGroup.SetEndpoints(cfg.Endpoints)
|
|
||||||
|
|
||||||
if len(cfg.Endpoints) < 1 {
|
if len(cfg.Endpoints) < 1 {
|
||||||
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
|
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
|
||||||
@ -448,10 +366,10 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
|
|
||||||
// Use a provided endpoint target so that for https:// without any tls config given, then
|
// Use a provided endpoint target so that for https:// without any tls config given, then
|
||||||
// grpc will assume the certificate server name is the endpoint host.
|
// grpc will assume the certificate server name is the endpoint host.
|
||||||
conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
|
conn, err := client.dialWithBalancer(dialEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
client.cancel()
|
client.cancel()
|
||||||
client.resolverGroup.Close()
|
client.resolver.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// TODO: With the old grpc balancer interface, we waited until the dial timeout
|
// TODO: With the old grpc balancer interface, we waited until the dial timeout
|
||||||
|
@ -22,8 +22,9 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
||||||
grpccredentials "google.golang.org/grpc/credentials"
|
grpccredentials "google.golang.org/grpc/credentials"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config defines gRPC credential configuration.
|
// Config defines gRPC credential configuration.
|
||||||
|
68
clientv3/internal/endpoint/endpoint.go
Normal file
68
clientv3/internal/endpoint/endpoint.go
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package endpoint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/url"
|
||||||
|
"regexp"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
|
||||||
|
)
|
||||||
|
|
||||||
|
func stripPort(ep string) string {
|
||||||
|
return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
|
||||||
|
}
|
||||||
|
|
||||||
|
func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
|
||||||
|
url, err := url.Parse(ep)
|
||||||
|
if err != nil {
|
||||||
|
return ep, stripPort(ep), false
|
||||||
|
}
|
||||||
|
switch url.Scheme {
|
||||||
|
case "http", "https":
|
||||||
|
return url.Host, url.Hostname(), url.Scheme == "https"
|
||||||
|
case "unix", "unixs":
|
||||||
|
requireCreds = url.Scheme == "unixs"
|
||||||
|
if url.Opaque != "" {
|
||||||
|
return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
|
||||||
|
} else if url.Path != "" {
|
||||||
|
return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
|
||||||
|
} else {
|
||||||
|
return "unix:" + url.Host, url.Hostname(), requireCreds
|
||||||
|
}
|
||||||
|
case "":
|
||||||
|
return url.Host + url.Path, url.Host + url.Path, false
|
||||||
|
default:
|
||||||
|
return ep, stripPort(ep), false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequiresCredentials returns whether given endpoint requires
|
||||||
|
// credentials/certificates for connection.
|
||||||
|
func RequiresCredentials(ep string) bool {
|
||||||
|
_, _, requireCreds := translateEndpoint(ep)
|
||||||
|
return requireCreds
|
||||||
|
}
|
||||||
|
|
||||||
|
// Interpret endpoint parses an endpoint of the form
|
||||||
|
// (http|https)://<host>*|(unix|unixs)://<path>)
|
||||||
|
// and returns low-level address (supported by 'net') to connect to,
|
||||||
|
// and a server name used for x509 certificate matching.
|
||||||
|
func Interpret(ep string) (address string, serverName string) {
|
||||||
|
addr, serverName, _ := translateEndpoint(ep)
|
||||||
|
return addr, serverName
|
||||||
|
}
|
65
clientv3/internal/endpoint/endpoint_test.go
Normal file
65
clientv3/internal/endpoint/endpoint_test.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package endpoint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInterpret(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
endpoint string
|
||||||
|
wantAddress string
|
||||||
|
wantServerName string
|
||||||
|
}{
|
||||||
|
{"127.0.0.1", "127.0.0.1", "127.0.0.1"},
|
||||||
|
{"localhost", "localhost", "localhost"},
|
||||||
|
{"localhost:8080", "localhost:8080", "localhost"},
|
||||||
|
|
||||||
|
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
||||||
|
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
||||||
|
|
||||||
|
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
||||||
|
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
||||||
|
|
||||||
|
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
||||||
|
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
||||||
|
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
|
||||||
|
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
|
||||||
|
|
||||||
|
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1"},
|
||||||
|
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
|
||||||
|
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1"},
|
||||||
|
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
|
||||||
|
{"https://localhost:20000", "localhost:20000", "localhost"},
|
||||||
|
|
||||||
|
{"unix:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
|
||||||
|
{"unixs:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
|
||||||
|
{"etcd.io", "etcd.io", "etcd.io"},
|
||||||
|
{"http://etcd.io/abc", "etcd.io", "etcd.io"},
|
||||||
|
{"dns://something-other", "dns://something-other", "dns://something-other"},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.endpoint, func(t *testing.T) {
|
||||||
|
gotAddress, gotServerName := Interpret(tt.endpoint)
|
||||||
|
if gotAddress != tt.wantAddress {
|
||||||
|
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
|
||||||
|
}
|
||||||
|
if gotServerName != tt.wantServerName {
|
||||||
|
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
71
clientv3/internal/resolver/resolver.go
Normal file
71
clientv3/internal/resolver/resolver.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package resolver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.etcd.io/etcd/clientv3/internal/endpoint"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
"google.golang.org/grpc/resolver/manual"
|
||||||
|
"google.golang.org/grpc/serviceconfig"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
|
||||||
|
// using SetEndpoints.
|
||||||
|
type EtcdManualResolver struct {
|
||||||
|
*manual.Resolver
|
||||||
|
endpoints []string
|
||||||
|
serviceConfig *serviceconfig.ParseResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(endpoints ...string) *EtcdManualResolver {
|
||||||
|
r := manual.NewBuilderWithScheme("etcd-endpoints")
|
||||||
|
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build returns itself for Resolver, because it's both a builder and a resolver.
|
||||||
|
func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
||||||
|
r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
|
||||||
|
if r.serviceConfig.Err != nil {
|
||||||
|
return nil, r.serviceConfig.Err
|
||||||
|
}
|
||||||
|
res, err := r.Resolver.Build(target, cc, opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Populates endpoints stored in r into ClientConn (cc).
|
||||||
|
r.updateState()
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EtcdManualResolver) SetEndpoints(endpoints []string) {
|
||||||
|
r.endpoints = endpoints
|
||||||
|
r.updateState()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r EtcdManualResolver) updateState() {
|
||||||
|
if r.CC != nil {
|
||||||
|
addresses := make([]resolver.Address, len(r.endpoints))
|
||||||
|
for i, ep := range r.endpoints {
|
||||||
|
addr, serverName := endpoint.Interpret(ep)
|
||||||
|
addresses[i] = resolver.Address{Addr: addr, ServerName: serverName}
|
||||||
|
}
|
||||||
|
state := resolver.State{
|
||||||
|
Addresses: addresses,
|
||||||
|
ServiceConfig: r.serviceConfig,
|
||||||
|
}
|
||||||
|
r.UpdateState(state)
|
||||||
|
}
|
||||||
|
}
|
1
go.mod
1
go.mod
@ -15,7 +15,6 @@ require (
|
|||||||
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
|
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
|
||||||
github.com/golang/protobuf v1.4.3
|
github.com/golang/protobuf v1.4.3
|
||||||
github.com/google/btree v1.0.0
|
github.com/google/btree v1.0.0
|
||||||
github.com/google/uuid v1.0.0
|
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
|
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/grpc-ecosystem/grpc-gateway v1.11.0
|
github.com/grpc-ecosystem/grpc-gateway v1.11.0
|
||||||
|
2
go.sum
2
go.sum
@ -76,8 +76,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||||
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
|
||||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
|
||||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
|
||||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=
|
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=
|
||||||
|
Loading…
x
Reference in New Issue
Block a user