diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go new file mode 100644 index 000000000..1bb324028 --- /dev/null +++ b/clientv3/balancer/balancer.go @@ -0,0 +1,275 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package balancer + +import ( + "fmt" + "sync" + + "github.com/coreos/etcd/clientv3/balancer/picker" + + "go.uber.org/zap" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" + _ "google.golang.org/grpc/resolver/dns" // register DNS resolver + _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver +) + +// Balancer defines client balancer interface. +type Balancer interface { + // Builder is called at the beginning to initialize sub-connection states and picker. + balancer.Builder + + // Balancer is called on specified client connection. Client initiates gRPC + // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved + // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs". + // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn". + // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state + // changes, thus requires failover logic in this method. + balancer.Balancer + + // Picker calls "Pick" for every client request. + picker.Picker + + // SetEndpoints updates client's endpoints. + SetEndpoints(eps ...string) +} + +type baseBalancer struct { + policy picker.Policy + name string + lg *zap.Logger + + mu sync.RWMutex + + eps []string + + addrToSc map[resolver.Address]balancer.SubConn + scToAddr map[balancer.SubConn]resolver.Address + scToSt map[balancer.SubConn]connectivity.State + + currrentConn balancer.ClientConn + currentState connectivity.State + csEvltr *connectivityStateEvaluator + + picker.Picker +} + +// New returns a new balancer from specified picker policy. +func New(cfg Config) Balancer { + bb := &baseBalancer{ + policy: cfg.Policy, + name: cfg.Policy.String(), + lg: cfg.Logger, + + eps: cfg.Endpoints, + + addrToSc: make(map[resolver.Address]balancer.SubConn), + scToAddr: make(map[balancer.SubConn]resolver.Address), + scToSt: make(map[balancer.SubConn]connectivity.State), + + currrentConn: nil, + csEvltr: &connectivityStateEvaluator{}, + + // initialize picker always returns "ErrNoSubConnAvailable" + Picker: picker.NewErr(balancer.ErrNoSubConnAvailable), + } + if cfg.Name != "" { + bb.name = cfg.Name + } + if bb.lg == nil { + bb.lg = zap.NewNop() + } + + balancer.Register(bb) + bb.lg.Info( + "registered balancer", + zap.String("policy", bb.policy.String()), + zap.String("name", bb.name), + ) + return bb +} + +// Name implements "grpc/balancer.Builder" interface. +func (bb *baseBalancer) Name() string { return bb.name } + +// Build implements "grpc/balancer.Builder" interface. +// Build is called initially when creating "ccBalancerWrapper". +// "grpc.Dial" is called to this client connection. +// Then, resolved addreses will be handled via "HandleResolvedAddrs". +func (bb *baseBalancer) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { + // TODO: support multiple connections + bb.mu.Lock() + bb.currrentConn = cc + bb.mu.Unlock() + + bb.lg.Info( + "built balancer", + zap.String("policy", bb.policy.String()), + zap.String("resolver-target", cc.Target()), + ) + return bb +} + +// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. +// gRPC sends initial or updated resolved addresses from "Build". +func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + if err != nil { + bb.lg.Warn("HandleResolvedAddrs called with error", zap.Error(err)) + return + } + bb.lg.Info("resolved", zap.Strings("addresses", addrsToStrings(addrs))) + + bb.mu.Lock() + defer bb.mu.Unlock() + + resolved := make(map[resolver.Address]struct{}) + for _, addr := range addrs { + resolved[addr] = struct{}{} + if _, ok := bb.addrToSc[addr]; !ok { + sc, err := bb.currrentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) + if err != nil { + bb.lg.Warn("NewSubConn failed", zap.Error(err), zap.String("address", addr.Addr)) + continue + } + bb.addrToSc[addr] = sc + bb.scToAddr[sc] = addr + bb.scToSt[sc] = connectivity.Idle + sc.Connect() + } + } + + for addr, sc := range bb.addrToSc { + if _, ok := resolved[addr]; !ok { + // was removed by resolver or failed to create subconn + bb.currrentConn.RemoveSubConn(sc) + delete(bb.addrToSc, addr) + + bb.lg.Info( + "removed subconn", + zap.String("address", addr.Addr), + zap.String("subconn", scToString(sc)), + ) + + // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown. + // The entry will be deleted in HandleSubConnStateChange. + // (DO NOT) delete(bb.scToAddr, sc) + // (DO NOT) delete(bb.scToSt, sc) + } + } +} + +// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. +func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + bb.mu.Lock() + defer bb.mu.Unlock() + + old, ok := bb.scToSt[sc] + if !ok { + bb.lg.Warn( + "state change for an unknown subconn", + zap.String("subconn", scToString(sc)), + zap.String("state", s.String()), + ) + return + } + + bb.lg.Info( + "state changed", + zap.Bool("connected", s == connectivity.Ready), + zap.String("subconn", scToString(sc)), + zap.String("address", bb.scToAddr[sc].Addr), + zap.String("old-state", old.String()), + zap.String("new-state", s.String()), + ) + + bb.scToSt[sc] = s + switch s { + case connectivity.Idle: + sc.Connect() + case connectivity.Shutdown: + // When an address was removed by resolver, b called RemoveSubConn but + // kept the sc's state in scToSt. Remove state for this sc here. + delete(bb.scToAddr, sc) + delete(bb.scToSt, sc) + } + + oldAggrState := bb.currentState + bb.currentState = bb.csEvltr.recordTransition(old, s) + + // Regenerate picker when one of the following happens: + // - this sc became ready from not-ready + // - this sc became not-ready from ready + // - the aggregated state of balancer became TransientFailure from non-TransientFailure + // - the aggregated state of balancer became non-TransientFailure from TransientFailure + if (s == connectivity.Ready) != (old == connectivity.Ready) || + (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { + bb.regeneratePicker() + } + + bb.currrentConn.UpdateBalancerState(bb.currentState, bb.Picker) + return +} + +func (bb *baseBalancer) regeneratePicker() { + if bb.currentState == connectivity.TransientFailure { + bb.Picker = picker.NewErr(balancer.ErrTransientFailure) + return + } + + // only pass ready subconns to picker + scs := make([]balancer.SubConn, 0) + addrToSc := make(map[resolver.Address]balancer.SubConn) + scToAddr := make(map[balancer.SubConn]resolver.Address) + for addr, sc := range bb.addrToSc { + if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready { + scs = append(scs, sc) + addrToSc[addr] = sc + scToAddr[sc] = addr + } + } + + switch bb.policy { + case picker.RoundrobinBalanced: + bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr) + + default: + panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy)) + } + + bb.lg.Info( + "generated picker", + zap.String("policy", bb.policy.String()), + zap.Strings("subconn-ready", scsToStrings(addrToSc)), + zap.Int("subconn-size", len(addrToSc)), + ) +} + +// SetEndpoints updates client's endpoints. +// TODO: implement this +func (bb *baseBalancer) SetEndpoints(eps ...string) { + addrs := epsToAddrs(eps...) + bb.mu.Lock() + bb.Picker.UpdateAddrs(addrs) + bb.mu.Unlock() +} + +// Close implements "grpc/balancer.Balancer" interface. +// Close is a nop because base balancer doesn't have internal state to clean up, +// and it doesn't need to call RemoveSubConn for the SubConns. +func (bb *baseBalancer) Close() { + // TODO +} diff --git a/clientv3/balancer/config.go b/clientv3/balancer/config.go new file mode 100644 index 000000000..5c649e220 --- /dev/null +++ b/clientv3/balancer/config.go @@ -0,0 +1,39 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package balancer + +import ( + "github.com/coreos/etcd/clientv3/balancer/picker" + + "go.uber.org/zap" +) + +// Config defines balancer configurations. +type Config struct { + // Policy configures balancer policy. + Policy picker.Policy + + // Name defines an additional name for balancer. + // Useful for balancer testing to avoid register conflicts. + // If empty, defaults to policy name. + Name string + + // Logger configures balancer logging. + // If nil, logs are discarded. + Logger *zap.Logger + + // Endpoints is a list of server endpoints. + Endpoints []string +} diff --git a/clientv3/balancer/connectivity.go b/clientv3/balancer/connectivity.go new file mode 100644 index 000000000..6cdeb3fa3 --- /dev/null +++ b/clientv3/balancer/connectivity.go @@ -0,0 +1,58 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package balancer + +import "google.golang.org/grpc/connectivity" + +// connectivityStateEvaluator gets updated by addrConns when their +// states transition, based on which it evaluates the state of +// ClientConn. +type connectivityStateEvaluator struct { + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. + numTransientFailure uint64 // Number of addrConns in transientFailure. +} + +// recordTransition records state change happening in every subConn and based on +// that it evaluates what aggregated state should be. +// It can only transition between Ready, Connecting and TransientFailure. Other states, +// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection +// before any subConn is created ClientConn is in idle state. In the end when ClientConn +// closes it is in Shutdown state. +// +// recordTransition should only be called synchronously from the same goroutine. +func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State { + // Update counters. + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + switch state { + case connectivity.Ready: + cse.numReady += updateVal + case connectivity.Connecting: + cse.numConnecting += updateVal + case connectivity.TransientFailure: + cse.numTransientFailure += updateVal + } + } + + // Evaluate. + if cse.numReady > 0 { + return connectivity.Ready + } + if cse.numConnecting > 0 { + return connectivity.Connecting + } + return connectivity.TransientFailure +} diff --git a/clientv3/balancer/grpc1.7-health_test.go b/clientv3/balancer/grpc1.7-health_test.go index bf139a5b1..1671f5338 100644 --- a/clientv3/balancer/grpc1.7-health_test.go +++ b/clientv3/balancer/grpc1.7-health_test.go @@ -30,7 +30,7 @@ import ( var endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} -func TestBalancerGetUnblocking(t *testing.T) { +func TestOldHealthBalancerGetUnblocking(t *testing.T) { hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) defer hb.Close() if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { @@ -74,7 +74,7 @@ func TestBalancerGetUnblocking(t *testing.T) { } } -func TestBalancerGetBlocking(t *testing.T) { +func TestOldHealthBalancerGetBlocking(t *testing.T) { hb := NewGRPC17Health(endpoints, minHealthRetryDuration, func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { return nil, nil }) defer hb.Close() if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { @@ -131,9 +131,9 @@ func TestBalancerGetBlocking(t *testing.T) { } } -// TestHealthBalancerGraylist checks one endpoint is tried after the other +// TestOldHealthBalancerGraylist checks one endpoint is tried after the other // due to gray listing. -func TestHealthBalancerGraylist(t *testing.T) { +func TestOldHealthBalancerGraylist(t *testing.T) { var wg sync.WaitGroup // Use 3 endpoints so gray list doesn't fallback to all connections // after failing on 2 endpoints. @@ -192,7 +192,7 @@ func TestHealthBalancerGraylist(t *testing.T) { // TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other // due to rapid open/close conn. The deadlock causes balancer.Close() to block forever. // See issue: https://github.com/coreos/etcd/issues/7283 for more detail. -func TestBalancerDoNotBlockOnClose(t *testing.T) { +func TestOldHealthBalancerDoNotBlockOnClose(t *testing.T) { defer testutil.AfterTest(t) kcl := newKillConnListener(t, 3) diff --git a/clientv3/balancer/picker/doc.go b/clientv3/balancer/picker/doc.go new file mode 100644 index 000000000..35dabf553 --- /dev/null +++ b/clientv3/balancer/picker/doc.go @@ -0,0 +1,16 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package picker defines/implements client balancer picker policy. +package picker diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go new file mode 100644 index 000000000..281f453fc --- /dev/null +++ b/clientv3/balancer/picker/err.go @@ -0,0 +1,39 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package picker + +import ( + "context" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" +) + +// NewErr returns a picker that always returns err on "Pick". +func NewErr(err error) Picker { + return &errPicker{err: err} +} + +type errPicker struct { + err error +} + +func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { + return nil, nil, p.err +} + +func (p *errPicker) UpdateAddrs(addrs []resolver.Address) { + return +} diff --git a/clientv3/balancer/picker/picker.go b/clientv3/balancer/picker/picker.go new file mode 100644 index 000000000..93412e2af --- /dev/null +++ b/clientv3/balancer/picker/picker.go @@ -0,0 +1,31 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package picker + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" +) + +// Picker defines balancer Picker methods. +type Picker interface { + balancer.Picker + + // UpdateAddrs updates current endpoints in picker. + // Used when endpoints are updated manually. + // TODO: handle resolver target change + // TODO: handle resolved addresses change + UpdateAddrs(addrs []resolver.Address) +} diff --git a/clientv3/balancer/picker/picker_policy.go b/clientv3/balancer/picker/picker_policy.go new file mode 100644 index 000000000..463ddc2a5 --- /dev/null +++ b/clientv3/balancer/picker/picker_policy.go @@ -0,0 +1,49 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package picker + +import "fmt" + +// Policy defines balancer picker policy. +type Policy uint8 + +const ( + // TODO: custom picker is not supported yet. + // custom defines custom balancer picker. + custom Policy = iota + + // RoundrobinBalanced balance loads over multiple endpoints + // and implements failover in roundrobin fashion. + RoundrobinBalanced Policy = iota + + // TODO: only send loads to pinned address "RoundrobinFailover" + // just like how 3.3 client works + // + // TODO: priotize leader + // TODO: health-check + // TODO: weighted roundrobin + // TODO: power of two random choice +) + +func (p Policy) String() string { + switch p { + case custom: + panic("'custom' picker policy is not supported yet") + case RoundrobinBalanced: + return "etcd-client-roundrobin-balanced" + default: + panic(fmt.Errorf("invalid balancer picker policy (%d)", p)) + } +} diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go new file mode 100644 index 000000000..9175562a2 --- /dev/null +++ b/clientv3/balancer/picker/roundrobin_balanced.go @@ -0,0 +1,105 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package picker + +import ( + "context" + "sync" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" +) + +// NewRoundrobinBalanced returns a new roundrobin balanced picker. +func NewRoundrobinBalanced( + lg *zap.Logger, + scs []balancer.SubConn, + addrToSc map[resolver.Address]balancer.SubConn, + scToAddr map[balancer.SubConn]resolver.Address, +) Picker { + return &rrBalanced{ + lg: lg, + scs: scs, + addrToSc: addrToSc, + scToAddr: scToAddr, + } +} + +type rrBalanced struct { + lg *zap.Logger + + mu sync.RWMutex + next int + scs []balancer.SubConn + + addrToSc map[resolver.Address]balancer.SubConn + scToAddr map[balancer.SubConn]resolver.Address + + updateAddrs func(addrs []resolver.Address) +} + +// Pick is called for every client request. +func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { + rb.mu.RLock() + n := len(rb.scs) + rb.mu.RUnlock() + if n == 0 { + return nil, nil, balancer.ErrNoSubConnAvailable + } + + rb.mu.Lock() + cur := rb.next + sc := rb.scs[cur] + picked := rb.scToAddr[sc].Addr + rb.next = (rb.next + 1) % len(rb.scs) + rb.mu.Unlock() + + rb.lg.Debug( + "picked", + zap.String("address", picked), + zap.Int("subconn-index", cur), + zap.Int("subconn-size", n), + ) + + doneFunc := func(info balancer.DoneInfo) { + // TODO: error handling? + fss := []zapcore.Field{ + zap.Error(info.Err), + zap.String("address", picked), + zap.Bool("success", info.Err == nil), + zap.Bool("bytes-sent", info.BytesSent), + zap.Bool("bytes-received", info.BytesReceived), + } + if info.Err == nil { + rb.lg.Debug("balancer done", fss...) + } else { + rb.lg.Warn("balancer failed", fss...) + } + } + return sc, doneFunc, nil +} + +// UpdateAddrs +// TODO: implement this +func (rb *rrBalanced) UpdateAddrs(addrs []resolver.Address) { + rb.mu.Lock() + // close all resolved sub-connections first + for _, sc := range rb.scs { + sc.UpdateAddresses([]resolver.Address{}) + } + rb.mu.Unlock() +} diff --git a/clientv3/balancer/utils.go b/clientv3/balancer/utils.go new file mode 100644 index 000000000..e54d203c1 --- /dev/null +++ b/clientv3/balancer/utils.go @@ -0,0 +1,45 @@ +package balancer + +import ( + "fmt" + "net/url" + "sort" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" +) + +func scToString(sc balancer.SubConn) string { + return fmt.Sprintf("%p", sc) +} + +func scsToStrings(scs map[resolver.Address]balancer.SubConn) (ss []string) { + ss = make([]string, 0, len(scs)) + for a, sc := range scs { + ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc))) + } + sort.Strings(ss) + return ss +} + +func addrsToStrings(addrs []resolver.Address) (ss []string) { + ss = make([]string, len(addrs)) + for i := range addrs { + ss[i] = addrs[i].Addr + } + sort.Strings(ss) + return ss +} + +func epsToAddrs(eps ...string) (addrs []resolver.Address) { + addrs = make([]resolver.Address, 0, len(eps)) + for _, ep := range eps { + u, err := url.Parse(ep) + if err != nil { + addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend}) + continue + } + addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend}) + } + return addrs +} diff --git a/clientv3/balancer/utils_test.go b/clientv3/balancer/utils_test.go new file mode 100644 index 000000000..0ef69c864 --- /dev/null +++ b/clientv3/balancer/utils_test.go @@ -0,0 +1,20 @@ +package balancer + +import ( + "reflect" + "testing" + + "google.golang.org/grpc/resolver" +) + +func Test_epsToAddrs(t *testing.T) { + eps := []string{"https://example.com:2379", "127.0.0.1:2379"} + exp := []resolver.Address{ + {Addr: "example.com:2379", Type: resolver.Backend}, + {Addr: "127.0.0.1:2379", Type: resolver.Backend}, + } + rs := epsToAddrs(eps...) + if !reflect.DeepEqual(rs, exp) { + t.Fatalf("expected %v, got %v", exp, rs) + } +}