diff --git a/client/client.go b/client/client.go index 116a1a4b2..31134251d 100644 --- a/client/client.go +++ b/client/client.go @@ -122,6 +122,22 @@ type Client interface { // Sync updates the internal cache of the etcd cluster's membership. Sync(context.Context) error + // AutoSync periodically calls Sync() every given interval. + // The recommended sync interval is 10 seconds to 1 minute, which does + // not bring too much overhead to server and makes client catch up the + // cluster change in time. + // + // The example to use it: + // + // for { + // err := client.AutoSync(ctx, 10*time.Second) + // if err == context.DeadlineExceeded || err == context.Canceled { + // break + // } + // log.Print(err) + // } + AutoSync(context.Context, time.Duration) error + // Endpoints returns a copy of the current set of API endpoints used // by Client to resolve HTTP requests. If Sync has ever been called, // this may differ from the initial Endpoints provided in the Config. @@ -290,6 +306,22 @@ func (c *httpClusterClient) Sync(ctx context.Context) error { return c.reset(eps) } +func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + err := c.Sync(ctx) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + type roundTripResponse struct { resp *http.Response err error diff --git a/client/client_test.go b/client/client_test.go index ecd6f56d0..bf31d328a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -757,6 +757,51 @@ func TestHTTPClusterClientSyncFail(t *testing.T) { } } +func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) { + cf := newStaticHTTPClientFactory([]staticHTTPResponse{ + staticHTTPResponse{ + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + }, + }) + + hc := &httpClusterClient{ + clientFactory: cf, + rand: rand.New(rand.NewSource(0)), + } + err := hc.reset([]string{"http://127.0.0.1:2379"}) + if err != nil { + t.Fatalf("unexpected error during setup: %#v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = hc.AutoSync(ctx, time.Hour) + if err != context.Canceled { + t.Fatalf("incorrect error value: want=%v got=%v", context.Canceled, err) + } +} + +func TestHTTPClusterClientAutoSyncFail(t *testing.T) { + cf := newStaticHTTPClientFactory([]staticHTTPResponse{ + staticHTTPResponse{err: errors.New("fail!")}, + }) + + hc := &httpClusterClient{ + clientFactory: cf, + rand: rand.New(rand.NewSource(0)), + } + err := hc.reset([]string{"http://127.0.0.1:2379"}) + if err != nil { + t.Fatalf("unexpected error during setup: %#v", err) + } + + err = hc.AutoSync(context.Background(), time.Hour) + if err.Error() != ErrClusterUnavailable.Error() { + t.Fatalf("incorrect error value: want=%v got=%v", ErrClusterUnavailable, err) + } +} + func TestHTTPClusterClientResetFail(t *testing.T) { tests := [][]string{ // need at least one endpoint