diff --git a/client/keys.go b/client/keys.go index 7de7a709b..68f15f56d 100644 --- a/client/keys.go +++ b/client/keys.go @@ -61,16 +61,16 @@ func NewDiscoveryKeysAPI(c HTTPClient) KeysAPI { } type KeysAPI interface { - Set(ctx context.Context, key, value string, opts SetOptions) (*Response, error) + Set(ctx context.Context, key, value string, opts *SetOptions) (*Response, error) Create(ctx context.Context, key, value string) (*Response, error) Update(ctx context.Context, key, value string) (*Response, error) - Delete(ctx context.Context, key string, opts DeleteOptions) (*Response, error) + Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) Get(ctx context.Context, key string) (*Response, error) RGet(ctx context.Context, key string) (*Response, error) - Watcher(key string, opts WatcherOptions) Watcher + Watcher(key string, opts *WatcherOptions) Watcher } type WatcherOptions struct { @@ -119,14 +119,17 @@ type httpKeysAPI struct { prefix string } -func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts SetOptions) (*Response, error) { +func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions) (*Response, error) { act := &setAction{ - Prefix: k.prefix, - Key: key, - Value: val, - PrevValue: opts.PrevValue, - PrevIndex: opts.PrevIndex, - PrevExist: opts.PrevExist, + Prefix: k.prefix, + Key: key, + Value: val, + } + + if opts != nil { + act.PrevValue = opts.PrevValue + act.PrevIndex = opts.PrevIndex + act.PrevExist = opts.PrevExist } resp, body, err := k.client.Do(ctx, act) @@ -138,20 +141,23 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts SetOptions) } func (k *httpKeysAPI) Create(ctx context.Context, key, val string) (*Response, error) { - return k.Set(ctx, key, val, SetOptions{PrevExist: PrevNoExist}) + return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevNoExist}) } func (k *httpKeysAPI) Update(ctx context.Context, key, val string) (*Response, error) { - return k.Set(ctx, key, val, SetOptions{PrevExist: PrevExist}) + return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevExist}) } -func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts DeleteOptions) (*Response, error) { +func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) { act := &deleteAction{ - Prefix: k.prefix, - Key: key, - PrevValue: opts.PrevValue, - PrevIndex: opts.PrevIndex, - Recursive: opts.Recursive, + Prefix: k.prefix, + Key: key, + } + + if opts != nil { + act.PrevValue = opts.PrevValue + act.PrevIndex = opts.PrevIndex + act.Recursive = opts.Recursive } resp, body, err := k.client.Do(ctx, act) @@ -192,15 +198,20 @@ func (k *httpKeysAPI) RGet(ctx context.Context, key string) (*Response, error) { return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) } -func (k *httpKeysAPI) Watcher(key string, opts WatcherOptions) Watcher { +func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher { + act := waitAction{ + Prefix: k.prefix, + Key: key, + } + + if opts != nil { + act.WaitIndex = opts.WaitIndex + act.Recursive = opts.Recursive + } + return &httpWatcher{ - client: k.client, - nextWait: waitAction{ - Prefix: k.prefix, - Key: key, - WaitIndex: opts.WaitIndex, - Recursive: opts.Recursive, - }, + client: k.client, + nextWait: act, } } diff --git a/discovery/discovery.go b/discovery/discovery.go index ec3a9aea7..989db529f 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -186,7 +186,7 @@ func (d *discovery) createSelf(contents string) error { } // ensure self appears on the server we connected to - w := d.c.Watcher(d.selfKey(), client.WatcherOptions{WaitIndex: resp.Node.CreatedIndex}) + w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{WaitIndex: resp.Node.CreatedIndex}) _, err = w.Next(context.Background()) return err } @@ -275,7 +275,7 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int, index uint64) (clien nodes = nodes[:size] } // watch from the next index - w := d.c.Watcher(d.cluster, client.WatcherOptions{WaitIndex: index + 1, Recursive: true}) + w := d.c.Watcher(d.cluster, &client.WatcherOptions{WaitIndex: index + 1, Recursive: true}) all := make(client.Nodes, len(nodes)) copy(all, nodes) for _, n := range all { diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 6d14d20b9..96941a41c 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -431,7 +431,7 @@ func (c *clientWithResp) Get(ctx context.Context, key string) (*client.Response, return r, nil } -func (c *clientWithResp) Watcher(key string, opts client.WatcherOptions) client.Watcher { +func (c *clientWithResp) Watcher(key string, opts *client.WatcherOptions) client.Watcher { return c.w } @@ -449,7 +449,7 @@ func (c *clientWithErr) Get(ctx context.Context, key string) (*client.Response, return &client.Response{}, c.err } -func (c *clientWithErr) Watcher(key string, opts client.WatcherOptions) client.Watcher { +func (c *clientWithErr) Watcher(key string, opts *client.WatcherOptions) client.Watcher { return c.w } diff --git a/integration/cluster_test.go b/integration/cluster_test.go index b5e82d09c..da13661a3 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -142,7 +142,7 @@ func TestForceNewCluster(t *testing.T) { cancel() // ensure create has been applied in this machine ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Watcher("/foo", client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { + if _, err := kapi.Watcher("/foo", &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { t.Fatalf("unexpected watch error: %v", err) } cancel() @@ -163,7 +163,7 @@ func TestForceNewCluster(t *testing.T) { kapi = client.NewKeysAPI(cc) // ensure force restart keep the old data, and new cluster can make progress ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Watcher("/foo", client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { + if _, err := kapi.Watcher("/foo", &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { t.Fatalf("unexpected watch error: %v", err) } cancel() @@ -189,7 +189,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { mcc := mustNewHTTPClient(t, []string{u}) mkapi := client.NewKeysAPI(mcc) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := mkapi.Watcher(key, client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil { + if _, err := mkapi.Watcher(key, &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil { t.Fatalf("#%d: watch on %s error: %v", i, u, err) } mcancel()