Merge pull request #16844 from chaochn47/release-3.4-replace-balancer

[3.4] Backport #12671 clientv3: Replace balancer with upstream grpc solution
This commit is contained in:
Benjamin Wang 2023-10-31 09:36:12 +00:00 committed by GitHub
commit 0fb0045780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 359 additions and 1432 deletions

View File

@ -98,15 +98,6 @@
}
]
},
{
"project": "github.com/google/uuid",
"licenses": [
{
"type": "BSD 3-clause \"New\" or \"Revised\" License",
"confidence": 0.9663865546218487
}
]
},
{
"project": "github.com/gorilla/websocket",
"licenses": [

View File

@ -1,293 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package balancer implements client balancer.
package balancer
import (
"strconv"
"sync"
"time"
"go.etcd.io/etcd/clientv3/balancer/connectivity"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)
// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
Policy picker.Policy
// Picker implements gRPC picker.
// Leave empty if "Policy" field is not custom.
// TODO: currently custom policy is not supported.
// Picker picker.Picker
// Name defines an additional name for balancer.
// Useful for balancer testing to avoid register conflicts.
// If empty, defaults to policy name.
Name string
// Logger configures balancer logging.
// If nil, logs are discarded.
Logger *zap.Logger
}
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
bb := &builder{cfg}
balancer.Register(bb)
bb.cfg.Logger.Debug(
"registered balancer",
zap.String("policy", bb.cfg.Policy.String()),
zap.String("name", bb.cfg.Name),
)
}
type builder struct {
cfg Config
}
// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
name: b.cfg.Name,
lg: b.cfg.Logger,
addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
currentConn: nil,
connectivityRecorder: connectivity.New(b.cfg.Logger),
// initialize picker always returns "ErrNoSubConnAvailable"
picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()
bb.lg.Info(
"built balancer",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}
// Name implements "grpc/balancer.Builder" interface.
func (b *builder) Name() string { return b.cfg.Name }
// Balancer defines client balancer interface.
type Balancer interface {
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer
// Picker calls "Pick" for every client request.
picker.Picker
}
type baseBalancer struct {
id string
policy picker.Policy
name string
lg *zap.Logger
mu sync.RWMutex
addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]grpcconnectivity.State
currentConn balancer.ClientConn
connectivityRecorder connectivity.Recorder
picker picker.Picker
}
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Strings("addresses", addrsToStrings(addrs)),
)
bb.mu.Lock()
defer bb.mu.Unlock()
resolved := make(map[resolver.Address]struct{})
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
for addr, sc := range bb.addrToSc {
if _, ok := resolved[addr]; !ok {
// was removed by resolver or failed to create subconn
bb.currentConn.RemoveSubConn(sc)
delete(bb.addrToSc, addr)
bb.lg.Info(
"removed subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)
// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}
}
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()
old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
}
bb.lg.Info(
"state changed",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)
bb.scToSt[sc] = s
switch s {
case grpcconnectivity.Idle:
sc.Connect()
case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}
oldAggrState := bb.connectivityRecorder.GetCurrentState()
bb.connectivityRecorder.RecordTransition(old, s)
// Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
bb.updatePicker()
}
bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}
func (bb *baseBalancer) updatePicker() {
if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
"updated picker to transient error picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
return
}
// only pass ready subconns to picker
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}
bb.picker = picker.New(picker.Config{
Policy: bb.policy,
Logger: bb.lg,
SubConnToResolverAddress: scToAddr,
})
bb.lg.Info(
"updated picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(scToAddr)),
zap.Int("subconn-size", len(scToAddr)),
)
}
// Close implements "grpc/balancer.Balancer" interface.
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (bb *baseBalancer) Close() {
// TODO
}

View File

@ -1,323 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"context"
"fmt"
"strings"
"testing"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/mock/mockserver"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
// TestRoundRobinBalancedResolvableNoFailover ensures that
// requests to a resolvable endpoint can be balanced between
// multiple, if any, nodes. And there needs be no failover.
func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
testCases := []struct {
name string
serverCount int
reqN int
network string
}{
{name: "rrBalanced_1", serverCount: 1, reqN: 5, network: "tcp"},
{name: "rrBalanced_1_unix_sockets", serverCount: 1, reqN: 5, network: "unix"},
{name: "rrBalanced_3", serverCount: 3, reqN: 7, network: "tcp"},
{name: "rrBalanced_5", serverCount: 5, reqN: 10, network: "tcp"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ms, err := mockserver.StartMockServersOnNetwork(tc.serverCount, tc.network)
if err != nil {
t.Fatalf("failed to start mock servers: %v", err)
}
defer ms.Stop()
var eps []string
for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr)
}
rsv, err := endpoint.NewResolverGroup("nofailover")
if err != nil {
t.Fatal(err)
}
defer rsv.Close()
rsv.SetEndpoints(eps)
name := genName()
cfg := Config{
Policy: picker.RoundrobinBalanced,
Name: name,
Logger: zap.NewExample(),
}
RegisterBuilder(cfg)
conn, err := grpc.Dial(fmt.Sprintf("endpoint://nofailover/*"), grpc.WithInsecure(), grpc.WithBalancerName(name))
if err != nil {
t.Fatalf("failed to dial mock server: %v", err)
}
defer conn.Close()
cli := pb.NewKVClient(conn)
reqFunc := func(ctx context.Context) (picked string, err error) {
var p peer.Peer
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
if p.Addr != nil {
picked = p.Addr.String()
}
return picked, err
}
_, picked, err := warmupConnections(reqFunc, tc.serverCount, "")
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}
// verify that we round robin
prev, switches := picked, 0
for i := 0; i < tc.reqN; i++ {
picked, err = reqFunc(context.Background())
if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err)
}
if prev != picked {
switches++
}
prev = picked
}
if tc.serverCount > 1 && switches != tc.reqN {
t.Fatalf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
}
})
}
}
// TestRoundRobinBalancedResolvableFailoverFromServerFail ensures that
// loads be rebalanced while one server goes down and comes back.
func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
serverCount := 5
ms, err := mockserver.StartMockServers(serverCount)
if err != nil {
t.Fatalf("failed to start mock servers: %s", err)
}
defer ms.Stop()
var eps []string
for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr)
}
rsv, err := endpoint.NewResolverGroup("serverfail")
if err != nil {
t.Fatal(err)
}
defer rsv.Close()
rsv.SetEndpoints(eps)
name := genName()
cfg := Config{
Policy: picker.RoundrobinBalanced,
Name: name,
Logger: zap.NewExample(),
}
RegisterBuilder(cfg)
conn, err := grpc.Dial(fmt.Sprintf("endpoint://serverfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name))
if err != nil {
t.Fatalf("failed to dial mock server: %s", err)
}
defer conn.Close()
cli := pb.NewKVClient(conn)
reqFunc := func(ctx context.Context) (picked string, err error) {
var p peer.Peer
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
if p.Addr != nil {
picked = p.Addr.String()
}
return picked, err
}
// stop first server, loads should be redistributed
ms.StopAt(0)
// stopped server will be transitioned into TRANSIENT_FAILURE state
// but it doesn't happen instantaneously and it can still be picked for a short period of time
// we ignore "transport is closing" in such case
available, picked, err := warmupConnections(reqFunc, serverCount-1, "transport is closing")
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}
reqN := 10
prev, switches := picked, 0
for i := 0; i < reqN; i++ {
picked, err = reqFunc(context.Background())
if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err)
}
if _, ok := available[picked]; !ok {
t.Fatalf("picked unavailable address %q (available %v)", picked, available)
}
if prev != picked {
switches++
}
prev = picked
}
if switches != reqN {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
// now failed server comes back
ms.StartAt(0)
available, picked, err = warmupConnections(reqFunc, serverCount, "")
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}
prev, switches = picked, 0
recoveredAddr, recovered := eps[0], 0
available[recoveredAddr] = struct{}{}
for i := 0; i < 2*reqN; i++ {
picked, err := reqFunc(context.Background())
if err != nil {
t.Fatalf("#%d: unexpected failure %v", i, err)
}
if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
}
if prev != picked {
switches++
}
if picked == recoveredAddr {
recovered++
}
prev = picked
}
if switches != 2*reqN {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
if recovered != 2*reqN/serverCount {
t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered)
}
}
// TestRoundRobinBalancedResolvableFailoverFromRequestFail ensures that
// loads be rebalanced while some requests are failed.
func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
serverCount := 5
ms, err := mockserver.StartMockServers(serverCount)
if err != nil {
t.Fatalf("failed to start mock servers: %s", err)
}
defer ms.Stop()
var eps []string
for _, svr := range ms.Servers {
eps = append(eps, svr.ResolverAddress().Addr)
}
rsv, err := endpoint.NewResolverGroup("requestfail")
if err != nil {
t.Fatal(err)
}
defer rsv.Close()
rsv.SetEndpoints(eps)
name := genName()
cfg := Config{
Policy: picker.RoundrobinBalanced,
Name: name,
Logger: zap.NewExample(),
}
RegisterBuilder(cfg)
conn, err := grpc.Dial(fmt.Sprintf("endpoint://requestfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name))
if err != nil {
t.Fatalf("failed to dial mock server: %s", err)
}
defer conn.Close()
cli := pb.NewKVClient(conn)
reqFunc := func(ctx context.Context) (picked string, err error) {
var p peer.Peer
_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
if p.Addr != nil {
picked = p.Addr.String()
}
return picked, err
}
available, picked, err := warmupConnections(reqFunc, serverCount, "")
if err != nil {
t.Fatalf("Unexpected failure %v", err)
}
reqN := 20
prev, switches := "", 0
for i := 0; i < reqN; i++ {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if i%2 == 0 {
cancel()
}
picked, err = reqFunc(ctx)
if i%2 == 0 {
if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled {
t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err)
}
continue
}
if _, ok := available[picked]; !ok {
t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available)
}
if prev != picked {
switches++
}
prev = picked
}
if switches != reqN/2 {
t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
}
}
type reqFuncT = func(ctx context.Context) (picked string, err error)
func warmupConnections(reqFunc reqFuncT, serverCount int, ignoreErr string) (map[string]struct{}, string, error) {
var picked string
var err error
available := make(map[string]struct{})
// cycle through all peers to indirectly verify that balancer subconn list is fully loaded
// otherwise we can't reliably count switches between 'picked' peers in the test assert phase
for len(available) < serverCount {
picked, err = reqFunc(context.Background())
if err != nil {
if ignoreErr != "" && strings.Contains(err.Error(), ignoreErr) {
// skip ignored errors
continue
}
return available, picked, err
}
available[picked] = struct{}{}
}
return available, picked, err
}

View File

@ -1,93 +0,0 @@
// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package connectivity implements client connectivity operations.
package connectivity
import (
"sync"
"go.uber.org/zap"
"google.golang.org/grpc/connectivity"
)
// Recorder records gRPC connectivity.
type Recorder interface {
GetCurrentState() connectivity.State
RecordTransition(oldState, newState connectivity.State)
}
// New returns a new Recorder.
func New(lg *zap.Logger) Recorder {
return &recorder{lg: lg}
}
// recorder takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
type recorder struct {
lg *zap.Logger
mu sync.RWMutex
cur connectivity.State
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
func (rc *recorder) GetCurrentState() (state connectivity.State) {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.cur
}
// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
//
// Idle and Shutdown are not considered.
//
// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
func (rc *recorder) RecordTransition(oldState, newState connectivity.State) {
rc.mu.Lock()
defer rc.mu.Unlock()
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
rc.numReady += updateVal
case connectivity.Connecting:
rc.numConnecting += updateVal
case connectivity.TransientFailure:
rc.numTransientFailure += updateVal
default:
rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String()))
}
}
switch { // must be exclusive, no overlap
case rc.numReady > 0:
rc.cur = connectivity.Ready
case rc.numConnecting > 0:
rc.cur = connectivity.Connecting
default:
rc.cur = connectivity.TransientFailure
}
}

View File

@ -1,16 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package picker defines/implements client balancer picker policy.
package picker

View File

@ -1,39 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"google.golang.org/grpc/balancer"
)
// NewErr returns a picker that always returns err on "Pick".
func NewErr(err error) Picker {
return &errPicker{p: Error, err: err}
}
type errPicker struct {
p Policy
err error
}
func (ep *errPicker) String() string {
return ep.p.String()
}
func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, ep.err
}

View File

@ -1,91 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"fmt"
"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// Picker defines balancer Picker methods.
type Picker interface {
balancer.Picker
String() string
}
// Config defines picker configuration.
type Config struct {
// Policy specifies etcd clientv3's built in balancer policy.
Policy Policy
// Logger defines picker logging object.
Logger *zap.Logger
// SubConnToResolverAddress maps each gRPC sub-connection to an address.
// Basically, it is a list of addresses that the Picker can pick from.
SubConnToResolverAddress map[balancer.SubConn]resolver.Address
}
// Policy defines balancer picker policy.
type Policy uint8
const (
// Error is error picker policy.
Error Policy = iota
// RoundrobinBalanced balances loads over multiple endpoints
// and implements failover in roundrobin fashion.
RoundrobinBalanced
// Custom defines custom balancer picker.
// TODO: custom picker is not supported yet.
Custom
)
func (p Policy) String() string {
switch p {
case Error:
return "picker-error"
case RoundrobinBalanced:
return "picker-roundrobin-balanced"
case Custom:
panic("'custom' picker policy is not supported yet")
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
}
}
// New creates a new Picker.
func New(cfg Config) Picker {
switch cfg.Policy {
case Error:
panic("'error' picker policy is not supported here; use 'picker.NewErr'")
case RoundrobinBalanced:
return newRoundrobinBalanced(cfg)
case Custom:
panic("'custom' picker policy is not supported yet")
default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy))
}
}

View File

@ -1,95 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package picker
import (
"context"
"sync"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
// newRoundrobinBalanced returns a new roundrobin balanced picker.
func newRoundrobinBalanced(cfg Config) Picker {
scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
for sc := range cfg.SubConnToResolverAddress {
scs = append(scs, sc)
}
return &rrBalanced{
p: RoundrobinBalanced,
lg: cfg.Logger,
scs: scs,
scToAddr: cfg.SubConnToResolverAddress,
}
}
type rrBalanced struct {
p Policy
lg *zap.Logger
mu sync.RWMutex
next int
scs []balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
}
func (rb *rrBalanced) String() string { return rb.p.String() }
// Pick is called for every client request.
func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
rb.mu.RLock()
n := len(rb.scs)
rb.mu.RUnlock()
if n == 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
rb.mu.Lock()
cur := rb.next
sc := rb.scs[cur]
picked := rb.scToAddr[sc].Addr
rb.next = (rb.next + 1) % len(rb.scs)
rb.mu.Unlock()
rb.lg.Debug(
"picked",
zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Int("subconn-index", cur),
zap.Int("subconn-size", n),
)
doneFunc := func(info balancer.DoneInfo) {
// TODO: error handling?
fss := []zapcore.Field{
zap.Error(info.Err),
zap.String("picker", rb.p.String()),
zap.String("address", picked),
zap.Bool("success", info.Err == nil),
zap.Bool("bytes-sent", info.BytesSent),
zap.Bool("bytes-received", info.BytesReceived),
}
if info.Err == nil {
rb.lg.Debug("balancer done", fss...)
} else {
rb.lg.Warn("balancer failed", fss...)
}
}
return sc, doneFunc, nil
}

View File

@ -1,248 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
package endpoint
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"google.golang.org/grpc/resolver"
)
const scheme = "endpoint"
var (
targetPrefix = fmt.Sprintf("%s://", scheme)
bldr *builder
)
func init() {
bldr = &builder{
resolverGroups: make(map[string]*ResolverGroup),
}
resolver.Register(bldr)
}
type builder struct {
mu sync.RWMutex
resolverGroups map[string]*ResolverGroup
}
// NewResolverGroup creates a new ResolverGroup with the given id.
func NewResolverGroup(id string) (*ResolverGroup, error) {
return bldr.newResolverGroup(id)
}
// ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
// up-to-date.
type ResolverGroup struct {
mu sync.RWMutex
id string
endpoints []string
resolvers []*Resolver
}
func (e *ResolverGroup) addResolver(r *Resolver) {
e.mu.Lock()
addrs := epsToAddrs(e.endpoints...)
e.resolvers = append(e.resolvers, r)
e.mu.Unlock()
r.cc.NewAddress(addrs)
}
func (e *ResolverGroup) removeResolver(r *Resolver) {
e.mu.Lock()
for i, er := range e.resolvers {
if er == r {
e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
break
}
}
e.mu.Unlock()
}
// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
// immediately with the new endpoints.
func (e *ResolverGroup) SetEndpoints(endpoints []string) {
addrs := epsToAddrs(endpoints...)
e.mu.Lock()
e.endpoints = endpoints
for _, r := range e.resolvers {
r.cc.NewAddress(addrs)
}
e.mu.Unlock()
}
// Target constructs a endpoint target using the endpoint id of the ResolverGroup.
func (e *ResolverGroup) Target(endpoint string) string {
return Target(e.id, endpoint)
}
// Target constructs a endpoint resolver target.
func Target(id, endpoint string) string {
return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint)
}
// IsTarget checks if a given target string in an endpoint resolver target.
func IsTarget(target string) bool {
return strings.HasPrefix(target, "endpoint://")
}
func (e *ResolverGroup) Close() {
bldr.close(e.id)
}
// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if len(target.Authority) < 1 {
return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
}
id := target.Authority
es, err := b.getResolverGroup(id)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
r := &Resolver{
endpointID: id,
cc: cc,
}
es.addResolver(r)
return r, nil
}
func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
b.mu.RLock()
_, ok := b.resolverGroups[id]
b.mu.RUnlock()
if ok {
return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
}
es := &ResolverGroup{id: id}
b.mu.Lock()
b.resolverGroups[id] = es
b.mu.Unlock()
return es, nil
}
func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
b.mu.RLock()
es, ok := b.resolverGroups[id]
b.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
}
return es, nil
}
func (b *builder) close(id string) {
b.mu.Lock()
delete(b.resolverGroups, id)
b.mu.Unlock()
}
func (b *builder) Scheme() string {
return scheme
}
// Resolver provides a resolver for a single etcd cluster, identified by name.
type Resolver struct {
endpointID string
cc resolver.ClientConn
sync.RWMutex
}
// TODO: use balancer.epsToAddrs
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
_, host, _ := ParseEndpoint(ep)
addrs = append(addrs, resolver.Address{Addr: ep, ServerName: host})
}
return addrs
}
func (*Resolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *Resolver) Close() {
es, err := bldr.getResolverGroup(r.endpointID)
if err != nil {
return
}
es.removeResolver(r)
}
// ParseEndpoint endpoint parses an endpoint of the form
// (http|https)://<host>*|(unix|unixs)://<path>)
// and returns a protocol ('tcp' or 'unix'),
// host (or filepath if a unix socket),
// scheme (http, https, unix, unixs).
func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
proto = "tcp"
host = endpoint
url, uerr := url.Parse(endpoint)
if uerr != nil || !strings.Contains(endpoint, "://") {
return proto, host, scheme
}
scheme = url.Scheme
// strip scheme:// prefix since grpc dials by host
host = url.Host
switch url.Scheme {
case "http", "https":
case "unix", "unixs":
proto = "unix"
host = url.Host + url.Path
default:
proto, host = "", ""
}
return proto, host, scheme
}
// ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint.
// If the target is malformed, an error is returned.
func ParseTarget(target string) (string, string, error) {
noPrefix := strings.TrimPrefix(target, targetPrefix)
if noPrefix == target {
return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target)
}
parts := strings.SplitN(noPrefix, "/", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target)
}
return parts[0], parts[1], nil
}
// Dialer dials a endpoint using net.Dialer.
// Context cancelation and timeout are supported.
func Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
proto, host, _ := ParseEndpoint(dialEp)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
dialer := &net.Dialer{}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
}
return dialer.DialContext(ctx, proto, host)
}

View File

@ -1,68 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"fmt"
"net/url"
"sort"
"sync/atomic"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
)
func scToString(sc balancer.SubConn) string {
return fmt.Sprintf("%p", sc)
}
func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) {
ss = make([]string, 0, len(scs))
for sc, a := range scs {
ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
}
sort.Strings(ss)
return ss
}
func addrsToStrings(addrs []resolver.Address) (ss []string) {
ss = make([]string, len(addrs))
for i := range addrs {
ss[i] = addrs[i].Addr
}
sort.Strings(ss)
return ss
}
func epsToAddrs(eps ...string) (addrs []resolver.Address) {
addrs = make([]resolver.Address, 0, len(eps))
for _, ep := range eps {
u, err := url.Parse(ep)
if err != nil {
addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend})
continue
}
addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend})
}
return addrs
}
var genN = new(uint32)
func genName() string {
now := time.Now().UnixNano()
return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1))
}

View File

@ -1,34 +0,0 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package balancer
import (
"reflect"
"testing"
"google.golang.org/grpc/resolver"
)
func Test_epsToAddrs(t *testing.T) {
eps := []string{"https://example.com:2379", "127.0.0.1:2379"}
exp := []resolver.Address{
{Addr: "example.com:2379", Type: resolver.Backend},
{Addr: "127.0.0.1:2379", Type: resolver.Backend},
}
rs := epsToAddrs(eps...)
if !reflect.DeepEqual(rs, exp) {
t.Fatalf("expected %v, got %v", exp, rs)
}
}

View File

@ -18,14 +18,11 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -33,10 +30,9 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
"go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/clientv3/internal/endpoint"
"go.etcd.io/etcd/clientv3/internal/resolver"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
)
@ -44,31 +40,8 @@ import (
var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
ErrOldCluster = errors.New("etcdclient: old cluster version")
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
)
func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
lcfg := logutil.DefaultZapLoggerConfig
lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
var err error
lg, err = lcfg.Build() // info level logging
if err != nil {
panic(err)
}
}
// TODO: support custom balancer
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
Logger: lg,
})
}
// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
@ -80,10 +53,10 @@ type Client struct {
conn *grpc.ClientConn
cfg Config
creds grpccredentials.TransportCredentials
resolverGroup *endpoint.ResolverGroup
mu *sync.RWMutex
cfg Config
creds grpccredentials.TransportCredentials
resolver *resolver.EtcdManualResolver
mu *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
@ -153,9 +126,6 @@ func (c *Client) Close() error {
if c.Lease != nil {
c.Lease.Close()
}
if c.resolverGroup != nil {
c.resolverGroup.Close()
}
if c.conn != nil {
return toErr(c.ctx, c.conn.Close())
}
@ -182,7 +152,8 @@ func (c *Client) SetEndpoints(eps ...string) {
c.mu.Lock()
defer c.mu.Unlock()
c.cfg.Endpoints = eps
c.resolverGroup.SetEndpoints(eps)
c.resolver.SetEndpoints(eps)
}
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
@ -215,29 +186,12 @@ func (c *Client) autoSync() {
err := c.Sync(ctx)
cancel()
if err != nil && err != c.ctx.Err() {
lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
c.lg.Info("Auto sync endpoints failed.", zap.Error(err))
}
}
}
}
func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
creds = c.creds
switch scheme {
case "unix":
case "http":
creds = nil
case "https", "unixs":
if creds != nil {
break
}
creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
creds = nil
}
return creds
}
// dialSetupOpts gives the dial opts prior to any authentication.
func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
if c.cfg.DialKeepAliveTime > 0 {
@ -250,18 +204,15 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
}
opts = append(opts, dopts...)
dialer := endpoint.Dialer
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
opts = append(opts, grpc.WithContextDialer(dialer))
// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
// once it is available.
// TODO: Replace all of clientv3/retry.go with RetryPolicy:
// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
@ -275,15 +226,11 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
// Dial connects to a single endpoint using the client's config.
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
creds, err := c.directDialCreds(ep)
if err != nil {
return nil, err
}
// Use the grpc passthrough resolver to directly dial a single endpoint.
// This resolver passes through the 'unix' and 'unixs' endpoints schemes used
// by etcd without modification, allowing us to directly dial endpoints and
// using the same dial functions that we use for load balancer dialing.
return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
creds := c.credentialsForEndpoint(ep)
// Using ad-hoc created resolver, to guarantee only explicitly given
// endpoint is used.
return c.dial(creds, grpc.WithResolvers(resolver.New(ep)))
}
func (c *Client) getToken(ctx context.Context) error {
@ -306,20 +253,18 @@ func (c *Client) getToken(ctx context.Context) error {
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
_, host, _ := endpoint.ParseEndpoint(ep)
target := c.resolverGroup.Target(host)
creds := c.dialWithBalancerCreds(ep)
return c.dial(target, creds, dopts...)
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
creds := c.credentialsForEndpoint(c.Endpoints()[0])
opts := append(dopts, grpc.WithResolvers(c.resolver))
return c.dial(creds, opts...)
}
// dial configures and dials any grpc balancer target.
func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := c.dialSetupOpts(creds, dopts...)
if err != nil {
return nil, fmt.Errorf("failed to configure dialer: %v", err)
}
if c.Username != "" && c.Password != "" {
c.authTokenBundle = credentials.NewBundle(credentials.Config{})
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
@ -334,6 +279,8 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials,
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
@ -341,36 +288,21 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials,
return conn, nil
}
func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) {
_, host, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
if creds != nil {
clone := creds.Clone()
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
overrideServerName, _, err := net.SplitHostPort(host)
if err != nil {
// Either the host didn't have a port or the host could not be parsed. Either way, continue with the
// original host string.
overrideServerName = host
}
clone.OverrideServerName(overrideServerName)
creds = clone
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
r := endpoint.RequiresCredentials(ep)
switch r {
case endpoint.CREDS_DROP:
return nil
case endpoint.CREDS_OPTIONAL:
return c.creds
case endpoint.CREDS_REQUIRE:
if c.creds != nil {
return c.creds
}
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
panic(fmt.Errorf("Unsupported CredsRequirement: %v", r))
}
return creds, nil
}
func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
_, _, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
}
return creds
}
func newClient(cfg *Config) (*Client, error) {
@ -432,30 +364,21 @@ func newClient(cfg *Config) (*Client, error) {
client.callOpts = callOpts
}
// Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
// to dial so the client knows to use this resolver.
client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
if err != nil {
client.cancel()
return nil, err
}
client.resolverGroup.SetEndpoints(cfg.Endpoints)
client.resolver = resolver.New(cfg.Endpoints...)
if len(cfg.Endpoints) < 1 {
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
}
dialEndpoint := cfg.Endpoints[0]
// Use a provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
conn, err := client.dialWithBalancer()
if err != nil {
client.cancel()
client.resolverGroup.Close()
client.resolver.Close()
// TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
client.conn = conn
client.Cluster = NewCluster(client)
@ -474,6 +397,7 @@ func newClient(cfg *Config) (*Client, error) {
if err != nil {
client.Close()
cancel()
//TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
return nil, err
}
cancel()

View File

@ -85,6 +85,8 @@ func TestDialCancel(t *testing.T) {
func TestDialTimeout(t *testing.T) {
defer testutil.AfterTest(t)
wantError := context.DeadlineExceeded
// grpc.WithBlock to block until connection up or timeout
testCfgs := []Config{
{
@ -124,8 +126,8 @@ func TestDialTimeout(t *testing.T) {
case <-time.After(5 * time.Second):
t.Errorf("#%d: failed to timeout dial on time", i)
case err := <-donec:
if err != context.DeadlineExceeded {
t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded)
if err.Error() != wantError.Error() {
t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
}
}
}

View File

@ -22,8 +22,9 @@ import (
"net"
"sync"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
grpccredentials "google.golang.org/grpc/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)
// Config defines gRPC credential configuration.

View File

@ -0,0 +1,138 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoint
import (
"fmt"
"net"
"net/url"
"path"
"strings"
)
type CredsRequirement int
const (
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
CREDS_REQUIRE CredsRequirement = iota
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
CREDS_DROP
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
CREDS_OPTIONAL
)
func extractHostFromHostPort(ep string) string {
host, _, err := net.SplitHostPort(ep)
if err != nil {
return ep
}
return host
}
func extractHostFromPath(pathStr string) string {
return extractHostFromHostPort(path.Base(pathStr))
}
// mustSplit2 returns the values from strings.SplitN(s, sep, 2).
// If sep is not found, it returns ("", "", false) instead.
func mustSplit2(s, sep string) (string, string) {
spl := strings.SplitN(s, sep, 2)
if len(spl) < 2 {
panic(fmt.Errorf("Token '%v' expected to have separator sep: `%v`", s, sep))
}
return spl[0], spl[1]
}
func schemeToCredsRequirement(schema string) CredsRequirement {
switch schema {
case "https", "unixs":
return CREDS_REQUIRE
case "http":
return CREDS_DROP
case "unix":
// Preserving previous behavior from:
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
// that likely was a bug due to missing 'fallthrough'.
// At the same time it seems legit to let the users decide whether they
// want credential control or not (and 'unixs' schema is not a standard thing).
return CREDS_OPTIONAL
case "":
return CREDS_OPTIONAL
default:
return CREDS_OPTIONAL
}
}
// This function translates endpoints names supported by etcd server into
// endpoints as supported by grpc with additional information
// (server_name for cert validation, requireCreds - whether certs are needed).
// The main differences:
// - etcd supports unixs & https names as opposed to unix & http to
// distinguish need to configure certificates.
// - etcd support http(s) names as opposed to tcp supported by grpc/dial method.
// - etcd supports unix(s)://local-file naming schema
// (as opposed to unix:local-file canonical name used by grpc for current dir files).
// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon)
// is considered serverName - to allow local testing of cert-protected communication.
//
// See more:
// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47
// - https://golang.org/pkg/net/#Dial
// - https://github.com/grpc/grpc/blob/master/doc/naming.md
func translateEndpoint(ep string) (addr string, serverName string, requireCreds CredsRequirement) {
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
// absolute path case
schema, absolutePath := mustSplit2(ep, "://")
return "unix://" + absolutePath, extractHostFromPath(absolutePath), schemeToCredsRequirement(schema)
}
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
// legacy etcd local path
schema, localPath := mustSplit2(ep, "://")
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
}
schema, localPath := mustSplit2(ep, ":")
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
}
if strings.Contains(ep, "://") {
url, err := url.Parse(ep)
if err != nil {
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
}
if url.Scheme == "http" || url.Scheme == "https" {
return url.Host, url.Hostname(), schemeToCredsRequirement(url.Scheme)
}
return ep, url.Hostname(), schemeToCredsRequirement(url.Scheme)
}
// Handles plain addresses like 10.0.0.44:437.
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
}
// RequiresCredentials returns whether given endpoint requires
// credentials/certificates for connection.
func RequiresCredentials(ep string) CredsRequirement {
_, _, requireCreds := translateEndpoint(ep)
return requireCreds
}
// Interpret endpoint parses an endpoint of the form
// (http|https)://<host>*|(unix|unixs)://<path>)
// and returns low-level address (supported by 'net') to connect to,
// and a server name used for x509 certificate matching.
func Interpret(ep string) (address string, serverName string) {
addr, serverName, _ := translateEndpoint(ep)
return addr, serverName
}

View File

@ -0,0 +1,99 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoint
import (
"testing"
)
func Test_interpret(t *testing.T) {
tests := []struct {
endpoint string
wantAddress string
wantServerName string
wantRequiresCreds CredsRequirement
}{
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
{"localhost", "localhost", "localhost", CREDS_OPTIONAL},
{"localhost:8080", "localhost:8080", "localhost", CREDS_OPTIONAL},
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_DROP},
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_DROP},
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
{"https://localhost:20000", "localhost:20000", "localhost", CREDS_REQUIRE},
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_OPTIONAL},
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_REQUIRE},
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_OPTIONAL},
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_REQUIRE},
{"etcd.io", "etcd.io", "etcd.io", CREDS_OPTIONAL},
{"http://etcd.io/abc", "etcd.io", "etcd.io", CREDS_DROP},
{"dns://something-other", "dns://something-other", "something-other", CREDS_OPTIONAL},
{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_DROP},
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_OPTIONAL},
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CREDS_OPTIONAL},
}
for _, tt := range tests {
t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
gotAddress, gotServerName := Interpret(tt.endpoint)
if gotAddress != tt.wantAddress {
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
}
if gotServerName != tt.wantServerName {
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
}
})
t.Run("RequiresCredentials_"+tt.endpoint, func(t *testing.T) {
requiresCreds := RequiresCredentials(tt.endpoint)
if requiresCreds != tt.wantRequiresCreds {
t.Errorf("RequiresCredentials() got = %v, want %v", requiresCreds, tt.wantRequiresCreds)
}
})
}
}
func Test_extractHostFromHostPort(t *testing.T) {
tests := []struct {
ep string
want string
}{
{ep: "localhost", want: "localhost"},
{ep: "localhost:8080", want: "localhost"},
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
{ep: "[2001:db8:1f70::999:de8:7648:6e8]", want: "[2001:db8:1f70::999:de8:7648:6e8]"},
{ep: "[2001:db8:1f70::999:de8:7648:6e8]:100", want: "2001:db8:1f70::999:de8:7648:6e8"},
}
for _, tt := range tests {
t.Run(tt.ep, func(t *testing.T) {
if got := extractHostFromHostPort(tt.ep); got != tt.want {
t.Errorf("extractHostFromHostPort() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,75 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resolver
import (
"go.etcd.io/etcd/clientv3/internal/endpoint"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
)
const (
Schema = "etcd-endpoints"
)
// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
// using SetEndpoints.
type EtcdManualResolver struct {
*manual.Resolver
endpoints []string
serviceConfig *serviceconfig.ParseResult
}
func New(endpoints ...string) *EtcdManualResolver {
r := manual.NewBuilderWithScheme(Schema)
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
}
// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
if r.serviceConfig.Err != nil {
return nil, r.serviceConfig.Err
}
res, err := r.Resolver.Build(target, cc, opts)
if err != nil {
return nil, err
}
// Populates endpoints stored in r into ClientConn (cc).
r.updateState()
return res, nil
}
func (r *EtcdManualResolver) SetEndpoints(endpoints []string) {
r.endpoints = endpoints
r.updateState()
}
func (r EtcdManualResolver) updateState() {
if r.CC != nil {
addresses := make([]resolver.Address, len(r.endpoints))
for i, ep := range r.endpoints {
addr, serverName := endpoint.Interpret(ep)
addresses[i] = resolver.Address{Addr: addr, ServerName: serverName}
}
state := resolver.State{
Addresses: addresses,
ServiceConfig: r.serviceConfig,
}
r.UpdateState(state)
}
}

1
go.mod
View File

@ -15,7 +15,6 @@ require (
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903
github.com/golang/protobuf v1.4.3
github.com/google/btree v1.0.0
github.com/google/uuid v1.0.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.11.0

2
go.sum
View File

@ -76,8 +76,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=