mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

If the balancer update notification loop starts with a downed connection and endpoints are switched while the old connection is up, the balancer can potentially wait forever for an up connection without refreshing the connections to reflect the current endpoints. Instead, fetch upc/downc together, only caring about a single transition either from down->up or up->down for each iteration Simple way to reproduce failures: add time.Sleep(time.Second) to the beginning of the update notification loop.
357 lines
8.3 KiB
Go
357 lines
8.3 KiB
Go
// Copyright 2016 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package clientv3
|
|
|
|
import (
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
)
|
|
|
|
// ErrNoAddrAvilable is returned by Get() when the balancer does not have
|
|
// any active connection to endpoints at the time.
|
|
// This error is returned only when opts.BlockingWait is true.
|
|
var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")
|
|
|
|
// simpleBalancer does the bare minimum to expose multiple eps
|
|
// to the grpc reconnection code path
|
|
type simpleBalancer struct {
|
|
// addrs are the client's endpoints for grpc
|
|
addrs []grpc.Address
|
|
// notifyCh notifies grpc of the set of addresses for connecting
|
|
notifyCh chan []grpc.Address
|
|
|
|
// readyc closes once the first connection is up
|
|
readyc chan struct{}
|
|
readyOnce sync.Once
|
|
|
|
// mu protects upEps, pinAddr, and connectingAddr
|
|
mu sync.RWMutex
|
|
|
|
// upc closes when upEps transitions from empty to non-zero or the balancer closes.
|
|
upc chan struct{}
|
|
|
|
// downc closes when grpc calls down() on pinAddr
|
|
downc chan struct{}
|
|
|
|
// stopc is closed to signal updateNotifyLoop should stop.
|
|
stopc chan struct{}
|
|
|
|
// donec closes when all goroutines are exited
|
|
donec chan struct{}
|
|
|
|
// updateAddrsC notifies updateNotifyLoop to update addrs.
|
|
updateAddrsC chan struct{}
|
|
|
|
// grpc issues TLS cert checks using the string passed into dial so
|
|
// that string must be the host. To recover the full scheme://host URL,
|
|
// have a map from hosts to the original endpoint.
|
|
host2ep map[string]string
|
|
|
|
// pinAddr is the currently pinned address; set to the empty string on
|
|
// intialization and shutdown.
|
|
pinAddr string
|
|
|
|
closed bool
|
|
}
|
|
|
|
func newSimpleBalancer(eps []string) *simpleBalancer {
|
|
notifyCh := make(chan []grpc.Address, 1)
|
|
addrs := make([]grpc.Address, len(eps))
|
|
for i := range eps {
|
|
addrs[i].Addr = getHost(eps[i])
|
|
}
|
|
sb := &simpleBalancer{
|
|
addrs: addrs,
|
|
notifyCh: notifyCh,
|
|
readyc: make(chan struct{}),
|
|
upc: make(chan struct{}),
|
|
stopc: make(chan struct{}),
|
|
downc: make(chan struct{}),
|
|
donec: make(chan struct{}),
|
|
updateAddrsC: make(chan struct{}, 1),
|
|
host2ep: getHost2ep(eps),
|
|
}
|
|
close(sb.downc)
|
|
go sb.updateNotifyLoop()
|
|
return sb
|
|
}
|
|
|
|
func (b *simpleBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
|
|
|
|
func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.upc
|
|
}
|
|
|
|
func (b *simpleBalancer) getEndpoint(host string) string {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.host2ep[host]
|
|
}
|
|
|
|
func getHost2ep(eps []string) map[string]string {
|
|
hm := make(map[string]string, len(eps))
|
|
for i := range eps {
|
|
_, host, _ := parseEndpoint(eps[i])
|
|
hm[host] = eps[i]
|
|
}
|
|
return hm
|
|
}
|
|
|
|
func (b *simpleBalancer) updateAddrs(eps []string) {
|
|
np := getHost2ep(eps)
|
|
|
|
b.mu.Lock()
|
|
|
|
match := len(np) == len(b.host2ep)
|
|
for k, v := range np {
|
|
if b.host2ep[k] != v {
|
|
match = false
|
|
break
|
|
}
|
|
}
|
|
if match {
|
|
// same endpoints, so no need to update address
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
b.host2ep = np
|
|
|
|
addrs := make([]grpc.Address, 0, len(eps))
|
|
for i := range eps {
|
|
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
|
|
}
|
|
b.addrs = addrs
|
|
|
|
// updating notifyCh can trigger new connections,
|
|
// only update addrs if all connections are down
|
|
// or addrs does not include pinAddr.
|
|
update := !hasAddr(addrs, b.pinAddr)
|
|
b.mu.Unlock()
|
|
|
|
if update {
|
|
select {
|
|
case b.updateAddrsC <- struct{}{}:
|
|
case <-b.stopc:
|
|
}
|
|
}
|
|
}
|
|
|
|
func hasAddr(addrs []grpc.Address, targetAddr string) bool {
|
|
for _, addr := range addrs {
|
|
if targetAddr == addr.Addr {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (b *simpleBalancer) updateNotifyLoop() {
|
|
defer close(b.donec)
|
|
|
|
for {
|
|
b.mu.RLock()
|
|
upc, downc, addr := b.upc, b.downc, b.pinAddr
|
|
b.mu.RUnlock()
|
|
// downc or upc should be closed
|
|
select {
|
|
case <-downc:
|
|
downc = nil
|
|
default:
|
|
}
|
|
select {
|
|
case <-upc:
|
|
upc = nil
|
|
default:
|
|
}
|
|
switch {
|
|
case downc == nil && upc == nil:
|
|
// stale
|
|
select {
|
|
case <-b.stopc:
|
|
return
|
|
default:
|
|
}
|
|
case downc == nil:
|
|
b.notifyAddrs()
|
|
select {
|
|
case <-upc:
|
|
case <-b.updateAddrsC:
|
|
b.notifyAddrs()
|
|
case <-b.stopc:
|
|
return
|
|
}
|
|
case upc == nil:
|
|
select {
|
|
// close connections that are not the pinned address
|
|
case b.notifyCh <- []grpc.Address{{Addr: addr}}:
|
|
case <-downc:
|
|
case <-b.stopc:
|
|
return
|
|
}
|
|
select {
|
|
case <-downc:
|
|
case <-b.updateAddrsC:
|
|
case <-b.stopc:
|
|
return
|
|
}
|
|
b.notifyAddrs()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *simpleBalancer) notifyAddrs() {
|
|
b.mu.RLock()
|
|
addrs := b.addrs
|
|
b.mu.RUnlock()
|
|
select {
|
|
case b.notifyCh <- addrs:
|
|
case <-b.stopc:
|
|
}
|
|
}
|
|
|
|
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
// gRPC might call Up after it called Close. We add this check
|
|
// to "fix" it up at application layer. Or our simplerBalancer
|
|
// might panic since b.upc is closed.
|
|
if b.closed {
|
|
return func(err error) {}
|
|
}
|
|
// gRPC might call Up on a stale address.
|
|
// Prevent updating pinAddr with a stale address.
|
|
if !hasAddr(b.addrs, addr.Addr) {
|
|
return func(err error) {}
|
|
}
|
|
if b.pinAddr != "" {
|
|
return func(err error) {}
|
|
}
|
|
// notify waiting Get()s and pin first connected address
|
|
close(b.upc)
|
|
b.downc = make(chan struct{})
|
|
b.pinAddr = addr.Addr
|
|
// notify client that a connection is up
|
|
b.readyOnce.Do(func() { close(b.readyc) })
|
|
return func(err error) {
|
|
b.mu.Lock()
|
|
b.upc = make(chan struct{})
|
|
close(b.downc)
|
|
b.pinAddr = ""
|
|
b.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
|
|
var (
|
|
addr string
|
|
closed bool
|
|
)
|
|
|
|
// If opts.BlockingWait is false (for fail-fast RPCs), it should return
|
|
// an address it has notified via Notify immediately instead of blocking.
|
|
if !opts.BlockingWait {
|
|
b.mu.RLock()
|
|
closed = b.closed
|
|
addr = b.pinAddr
|
|
b.mu.RUnlock()
|
|
if closed {
|
|
return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
|
|
}
|
|
if addr == "" {
|
|
return grpc.Address{Addr: ""}, nil, ErrNoAddrAvilable
|
|
}
|
|
return grpc.Address{Addr: addr}, func() {}, nil
|
|
}
|
|
|
|
for {
|
|
b.mu.RLock()
|
|
ch := b.upc
|
|
b.mu.RUnlock()
|
|
select {
|
|
case <-ch:
|
|
case <-b.donec:
|
|
return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
|
|
case <-ctx.Done():
|
|
return grpc.Address{Addr: ""}, nil, ctx.Err()
|
|
}
|
|
b.mu.RLock()
|
|
closed = b.closed
|
|
addr = b.pinAddr
|
|
b.mu.RUnlock()
|
|
// Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
|
|
if closed {
|
|
return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
|
|
}
|
|
if addr != "" {
|
|
break
|
|
}
|
|
}
|
|
return grpc.Address{Addr: addr}, func() {}, nil
|
|
}
|
|
|
|
func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
|
|
|
|
func (b *simpleBalancer) Close() error {
|
|
b.mu.Lock()
|
|
// In case gRPC calls close twice. TODO: remove the checking
|
|
// when we are sure that gRPC wont call close twice.
|
|
if b.closed {
|
|
b.mu.Unlock()
|
|
<-b.donec
|
|
return nil
|
|
}
|
|
b.closed = true
|
|
close(b.stopc)
|
|
b.pinAddr = ""
|
|
|
|
// In the case of following scenario:
|
|
// 1. upc is not closed; no pinned address
|
|
// 2. client issues an rpc, calling invoke(), which calls Get(), enters for loop, blocks
|
|
// 3. clientconn.Close() calls balancer.Close(); closed = true
|
|
// 4. for loop in Get() never exits since ctx is the context passed in by the client and may not be canceled
|
|
// we must close upc so Get() exits from blocking on upc
|
|
select {
|
|
case <-b.upc:
|
|
default:
|
|
// terminate all waiting Get()s
|
|
close(b.upc)
|
|
}
|
|
|
|
b.mu.Unlock()
|
|
|
|
// wait for updateNotifyLoop to finish
|
|
<-b.donec
|
|
close(b.notifyCh)
|
|
|
|
return nil
|
|
}
|
|
|
|
func getHost(ep string) string {
|
|
url, uerr := url.Parse(ep)
|
|
if uerr != nil || !strings.Contains(ep, "://") {
|
|
return ep
|
|
}
|
|
return url.Host
|
|
}
|