mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: add cluster version checking
This commit is contained in:
parent
c9452c6ad4
commit
4d2aa80ecf
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -35,6 +36,7 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
|
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
|
||||||
|
ErrOldCluster = errors.New("etcdclient: old cluster version")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Client provides and manages an etcd v3 client session.
|
// Client provides and manages an etcd v3 client session.
|
||||||
@ -358,10 +360,57 @@ func newClient(cfg *Config) (*Client, error) {
|
|||||||
client.Auth = NewAuth(client)
|
client.Auth = NewAuth(client)
|
||||||
client.Maintenance = NewMaintenance(client)
|
client.Maintenance = NewMaintenance(client)
|
||||||
|
|
||||||
|
if cfg.RejectOldCluster {
|
||||||
|
if err := client.checkVersion(); err != nil {
|
||||||
|
client.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
go client.autoSync()
|
go client.autoSync()
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) checkVersion() (err error) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
errc := make(chan error, len(c.cfg.Endpoints))
|
||||||
|
ctx, cancel := context.WithCancel(c.ctx)
|
||||||
|
if c.cfg.DialTimeout > 0 {
|
||||||
|
ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout)
|
||||||
|
}
|
||||||
|
wg.Add(len(c.cfg.Endpoints))
|
||||||
|
for _, ep := range c.cfg.Endpoints {
|
||||||
|
// if cluster is current, any endpoint gives a recent version
|
||||||
|
go func(e string) {
|
||||||
|
defer wg.Done()
|
||||||
|
resp, rerr := c.Status(ctx, e)
|
||||||
|
if rerr != nil {
|
||||||
|
errc <- rerr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
vs := strings.Split(resp.Version, ".")
|
||||||
|
maj, min := 0, 0
|
||||||
|
if len(vs) >= 2 {
|
||||||
|
maj, rerr = strconv.Atoi(vs[0])
|
||||||
|
min, rerr = strconv.Atoi(vs[1])
|
||||||
|
}
|
||||||
|
if maj < 3 || (maj == 3 && min < 2) {
|
||||||
|
rerr = ErrOldCluster
|
||||||
|
}
|
||||||
|
errc <- rerr
|
||||||
|
}(ep)
|
||||||
|
}
|
||||||
|
// wait for success
|
||||||
|
for i := 0; i < len(c.cfg.Endpoints); i++ {
|
||||||
|
if err = <-errc; err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
wg.Wait()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// ActiveConnection returns the current in-use connection
|
// ActiveConnection returns the current in-use connection
|
||||||
func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
|
func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
|
||||||
|
|
||||||
|
@ -42,6 +42,9 @@ type Config struct {
|
|||||||
// Password is a password for authentication.
|
// Password is a password for authentication.
|
||||||
Password string `json:"password"`
|
Password string `json:"password"`
|
||||||
|
|
||||||
|
// RejectOldCluster when set will refuse to create a client against an outdated cluster.
|
||||||
|
RejectOldCluster bool `json:"reject-old-cluster"`
|
||||||
|
|
||||||
// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
|
// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
|
||||||
DialOptions []grpc.DialOption
|
DialOptions []grpc.DialOption
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user