client: clarify relationship of AfterIndex and waitIndex

This commit is contained in:
Brian Waldon 2015-01-29 14:27:07 -08:00 committed by Yicheng Qin
parent 09017af35e
commit cd85451971
4 changed files with 42 additions and 40 deletions

View File

@ -335,8 +335,10 @@ func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher {
}
if opts != nil {
act.AfterIndex = opts.AfterIndex
act.Recursive = opts.Recursive
if opts.AfterIndex > 0 {
act.WaitIndex = opts.AfterIndex + 1
}
}
return &httpWatcher{
@ -361,7 +363,7 @@ func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
return nil, err
}
hw.nextWait.AfterIndex = resp.Node.ModifiedIndex + 1
hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
return resp, nil
}
@ -395,10 +397,10 @@ func (g *getAction) HTTPRequest(ep url.URL) *http.Request {
}
type waitAction struct {
Prefix string
Key string
AfterIndex uint64
Recursive bool
Prefix string
Key string
WaitIndex uint64
Recursive bool
}
func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
@ -406,7 +408,7 @@ func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
params := u.Query()
params.Set("wait", "true")
params.Set("waitIndex", strconv.FormatUint(w.AfterIndex, 10))
params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
params.Set("recursive", strconv.FormatBool(w.Recursive))
u.RawQuery = params.Encode()

View File

@ -154,32 +154,32 @@ func TestWaitAction(t *testing.T) {
wantHeader := http.Header{}
tests := []struct {
afterIndex uint64
recursive bool
wantQuery string
waitIndex uint64
recursive bool
wantQuery string
}{
{
recursive: false,
afterIndex: uint64(0),
wantQuery: "recursive=false&wait=true&waitIndex=0",
recursive: false,
waitIndex: uint64(0),
wantQuery: "recursive=false&wait=true&waitIndex=0",
},
{
recursive: false,
afterIndex: uint64(12),
wantQuery: "recursive=false&wait=true&waitIndex=12",
recursive: false,
waitIndex: uint64(12),
wantQuery: "recursive=false&wait=true&waitIndex=12",
},
{
recursive: true,
afterIndex: uint64(12),
wantQuery: "recursive=true&wait=true&waitIndex=12",
recursive: true,
waitIndex: uint64(12),
wantQuery: "recursive=true&wait=true&waitIndex=12",
},
}
for i, tt := range tests {
f := waitAction{
Key: "/foo/bar",
AfterIndex: tt.afterIndex,
Recursive: tt.recursive,
Key: "/foo/bar",
WaitIndex: tt.waitIndex,
Recursive: tt.recursive,
}
got := *f.HTTPRequest(ep)
@ -188,7 +188,7 @@ func TestWaitAction(t *testing.T) {
err := assertRequest(got, "GET", wantURL, wantHeader, nil)
if err != nil {
t.Errorf("#%d: %v", i, err)
t.Errorf("#%d: unexpected error: %#v", i, err)
}
}
}
@ -628,10 +628,10 @@ func TestUnmarshalFailedKeysResponseBadJSON(t *testing.T) {
func TestHTTPWatcherNextWaitAction(t *testing.T) {
initAction := waitAction{
Prefix: "/pants",
Key: "/foo/bar",
Recursive: true,
AfterIndex: 19,
Prefix: "/pants",
Key: "/foo/bar",
Recursive: true,
WaitIndex: 19,
}
client := &actionAssertingHTTPClient{
@ -652,10 +652,10 @@ func TestHTTPWatcherNextWaitAction(t *testing.T) {
}
wantNextWait := waitAction{
Prefix: "/pants",
Key: "/foo/bar",
Recursive: true,
AfterIndex: 22,
Prefix: "/pants",
Key: "/foo/bar",
Recursive: true,
WaitIndex: 22,
}
watcher := &httpWatcher{
@ -702,10 +702,10 @@ func TestHTTPWatcherNextFail(t *testing.T) {
for i, tt := range tests {
act := waitAction{
Prefix: "/pants",
Key: "/foo/bar",
Recursive: true,
AfterIndex: 19,
Prefix: "/pants",
Key: "/foo/bar",
Recursive: true,
WaitIndex: 19,
}
watcher := &httpWatcher{

View File

@ -190,7 +190,7 @@ func (d *discovery) createSelf(contents string) error {
}
// ensure self appears on the server we connected to
w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{AfterIndex: resp.Node.CreatedIndex})
w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{AfterIndex: resp.Node.CreatedIndex - 1})
_, err = w.Next(context.Background())
return err
}
@ -279,7 +279,7 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
nodes = nodes[:size]
}
// watch from the next index
w := d.c.Watcher(d.cluster, &client.WatcherOptions{AfterIndex: index + 1, Recursive: true})
w := d.c.Watcher(d.cluster, &client.WatcherOptions{AfterIndex: index, Recursive: true})
all := make([]*client.Node, len(nodes))
copy(all, nodes)
for _, n := range all {

View File

@ -141,7 +141,7 @@ func TestForceNewCluster(t *testing.T) {
cancel()
// ensure create has been applied in this machine
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
t.Fatalf("unexpected watch error: %v", err)
}
cancel()
@ -162,7 +162,7 @@ func TestForceNewCluster(t *testing.T) {
kapi = client.NewKeysAPI(cc)
// ensure force restart keep the old data, and new cluster can make progress
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
t.Fatalf("unexpected watch error: %v", err)
}
cancel()
@ -188,7 +188,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
mcc := mustNewHTTPClient(t, []string{u})
mkapi := client.NewKeysAPI(mcc)
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil {
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
t.Fatalf("#%d: watch on %s error: %v", i, u, err)
}
mcancel()