diff --git a/client/keys.go b/client/keys.go index e21bdaf8a..e3ea7e18f 100644 --- a/client/keys.go +++ b/client/keys.go @@ -335,8 +335,10 @@ func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher { } if opts != nil { - act.AfterIndex = opts.AfterIndex act.Recursive = opts.Recursive + if opts.AfterIndex > 0 { + act.WaitIndex = opts.AfterIndex + 1 + } } return &httpWatcher{ @@ -361,7 +363,7 @@ func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) { return nil, err } - hw.nextWait.AfterIndex = resp.Node.ModifiedIndex + 1 + hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 return resp, nil } @@ -395,10 +397,10 @@ func (g *getAction) HTTPRequest(ep url.URL) *http.Request { } type waitAction struct { - Prefix string - Key string - AfterIndex uint64 - Recursive bool + Prefix string + Key string + WaitIndex uint64 + Recursive bool } func (w *waitAction) HTTPRequest(ep url.URL) *http.Request { @@ -406,7 +408,7 @@ func (w *waitAction) HTTPRequest(ep url.URL) *http.Request { params := u.Query() params.Set("wait", "true") - params.Set("waitIndex", strconv.FormatUint(w.AfterIndex, 10)) + params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10)) params.Set("recursive", strconv.FormatBool(w.Recursive)) u.RawQuery = params.Encode() diff --git a/client/keys_test.go b/client/keys_test.go index a5f32a601..edc34ab2b 100644 --- a/client/keys_test.go +++ b/client/keys_test.go @@ -154,32 +154,32 @@ func TestWaitAction(t *testing.T) { wantHeader := http.Header{} tests := []struct { - afterIndex uint64 - recursive bool - wantQuery string + waitIndex uint64 + recursive bool + wantQuery string }{ { - recursive: false, - afterIndex: uint64(0), - wantQuery: "recursive=false&wait=true&waitIndex=0", + recursive: false, + waitIndex: uint64(0), + wantQuery: "recursive=false&wait=true&waitIndex=0", }, { - recursive: false, - afterIndex: uint64(12), - wantQuery: "recursive=false&wait=true&waitIndex=12", + recursive: false, + waitIndex: uint64(12), + wantQuery: "recursive=false&wait=true&waitIndex=12", }, { - recursive: true, - afterIndex: uint64(12), - wantQuery: "recursive=true&wait=true&waitIndex=12", + recursive: true, + waitIndex: uint64(12), + wantQuery: "recursive=true&wait=true&waitIndex=12", }, } for i, tt := range tests { f := waitAction{ - Key: "/foo/bar", - AfterIndex: tt.afterIndex, - Recursive: tt.recursive, + Key: "/foo/bar", + WaitIndex: tt.waitIndex, + Recursive: tt.recursive, } got := *f.HTTPRequest(ep) @@ -188,7 +188,7 @@ func TestWaitAction(t *testing.T) { err := assertRequest(got, "GET", wantURL, wantHeader, nil) if err != nil { - t.Errorf("#%d: %v", i, err) + t.Errorf("#%d: unexpected error: %#v", i, err) } } } @@ -628,10 +628,10 @@ func TestUnmarshalFailedKeysResponseBadJSON(t *testing.T) { func TestHTTPWatcherNextWaitAction(t *testing.T) { initAction := waitAction{ - Prefix: "/pants", - Key: "/foo/bar", - Recursive: true, - AfterIndex: 19, + Prefix: "/pants", + Key: "/foo/bar", + Recursive: true, + WaitIndex: 19, } client := &actionAssertingHTTPClient{ @@ -652,10 +652,10 @@ func TestHTTPWatcherNextWaitAction(t *testing.T) { } wantNextWait := waitAction{ - Prefix: "/pants", - Key: "/foo/bar", - Recursive: true, - AfterIndex: 22, + Prefix: "/pants", + Key: "/foo/bar", + Recursive: true, + WaitIndex: 22, } watcher := &httpWatcher{ @@ -702,10 +702,10 @@ func TestHTTPWatcherNextFail(t *testing.T) { for i, tt := range tests { act := waitAction{ - Prefix: "/pants", - Key: "/foo/bar", - Recursive: true, - AfterIndex: 19, + Prefix: "/pants", + Key: "/foo/bar", + Recursive: true, + WaitIndex: 19, } watcher := &httpWatcher{ diff --git a/discovery/discovery.go b/discovery/discovery.go index e7bdf5ff4..bceb5bb17 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -190,7 +190,7 @@ func (d *discovery) createSelf(contents string) error { } // ensure self appears on the server we connected to - w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{AfterIndex: resp.Node.CreatedIndex}) + w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{AfterIndex: resp.Node.CreatedIndex - 1}) _, err = w.Next(context.Background()) return err } @@ -279,7 +279,7 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]* nodes = nodes[:size] } // watch from the next index - w := d.c.Watcher(d.cluster, &client.WatcherOptions{AfterIndex: index + 1, Recursive: true}) + w := d.c.Watcher(d.cluster, &client.WatcherOptions{AfterIndex: index, Recursive: true}) all := make([]*client.Node, len(nodes)) copy(all, nodes) for _, n := range all { diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 6e8ab0725..59d8ac38c 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -141,7 +141,7 @@ func TestForceNewCluster(t *testing.T) { cancel() // ensure create has been applied in this machine ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { + if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil { t.Fatalf("unexpected watch error: %v", err) } cancel() @@ -162,7 +162,7 @@ func TestForceNewCluster(t *testing.T) { kapi = client.NewKeysAPI(cc) // ensure force restart keep the old data, and new cluster can make progress ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) - if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { + if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil { t.Fatalf("unexpected watch error: %v", err) } cancel() @@ -188,7 +188,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { mcc := mustNewHTTPClient(t, []string{u}) mkapi := client.NewKeysAPI(mcc) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) - if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil { + if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { t.Fatalf("#%d: watch on %s error: %v", i, u, err) } mcancel()