mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #6474 from gyuho/auto-sync
clientv3: add 'Sync' method
This commit is contained in:
commit
d743b8b866
@ -96,10 +96,24 @@ func getHost2ep(eps []string) map[string]string {
|
||||
}
|
||||
|
||||
func (b *simpleBalancer) updateAddrs(eps []string) {
|
||||
np := getHost2ep(eps)
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
b.host2ep = getHost2ep(eps)
|
||||
match := len(np) == len(b.host2ep)
|
||||
for k, v := range np {
|
||||
if b.host2ep[k] != v {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match {
|
||||
// same endpoints, so no need to update address
|
||||
return
|
||||
}
|
||||
|
||||
b.host2ep = np
|
||||
|
||||
addrs := make([]grpc.Address, 0, len(eps))
|
||||
for i := range eps {
|
||||
|
@ -105,6 +105,38 @@ func (c *Client) SetEndpoints(eps ...string) {
|
||||
c.balancer.updateAddrs(eps)
|
||||
}
|
||||
|
||||
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
|
||||
func (c *Client) Sync(ctx context.Context) error {
|
||||
mresp, err := c.MemberList(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var eps []string
|
||||
for _, m := range mresp.Members {
|
||||
eps = append(eps, m.ClientURLs...)
|
||||
}
|
||||
c.SetEndpoints(eps...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) autoSync() {
|
||||
if c.cfg.AutoSyncInterval == time.Duration(0) {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-time.After(c.cfg.AutoSyncInterval):
|
||||
ctx, _ := context.WithTimeout(c.ctx, 5*time.Second)
|
||||
if err := c.Sync(ctx); err != nil && err != c.ctx.Err() {
|
||||
logger.Println("Auto sync endpoints failed:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type authTokenCredential struct {
|
||||
token string
|
||||
}
|
||||
@ -292,6 +324,7 @@ func newClient(cfg *Config) (*Client, error) {
|
||||
logger.Set(log.New(ioutil.Discard, "", 0))
|
||||
}
|
||||
|
||||
go client.autoSync()
|
||||
return client, nil
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,10 @@ type Config struct {
|
||||
// Endpoints is a list of URLs
|
||||
Endpoints []string
|
||||
|
||||
// AutoSyncInterval is the interval to update endpoints with its latest members.
|
||||
// 0 disables auto-sync. By default auto-sync is disabled.
|
||||
AutoSyncInterval time.Duration
|
||||
|
||||
// DialTimeout is the timeout for failing to establish a connection.
|
||||
DialTimeout time.Duration
|
||||
|
||||
@ -46,6 +50,7 @@ type Config struct {
|
||||
|
||||
type yamlConfig struct {
|
||||
Endpoints []string `json:"endpoints"`
|
||||
AutoSyncInterval time.Duration `json:"auto-sync-interval"`
|
||||
DialTimeout time.Duration `json:"dial-timeout"`
|
||||
InsecureTransport bool `json:"insecure-transport"`
|
||||
InsecureSkipTLSVerify bool `json:"insecure-skip-tls-verify"`
|
||||
@ -68,8 +73,9 @@ func configFromFile(fpath string) (*Config, error) {
|
||||
}
|
||||
|
||||
cfg := &Config{
|
||||
Endpoints: yc.Endpoints,
|
||||
DialTimeout: yc.DialTimeout,
|
||||
Endpoints: yc.Endpoints,
|
||||
AutoSyncInterval: yc.AutoSyncInterval,
|
||||
DialTimeout: yc.DialTimeout,
|
||||
}
|
||||
|
||||
if yc.InsecureTransport {
|
||||
|
Loading…
x
Reference in New Issue
Block a user