client: WaitIndex -> AfterIndex

This commit is contained in:
Brian Waldon 2015-01-28 14:29:03 -08:00 committed by Yicheng Qin
parent a834f297f9
commit 1c03df62a5
6 changed files with 39 additions and 35 deletions

View File

@ -82,14 +82,14 @@ type KeysAPI interface {
} }
type WatcherOptions struct { type WatcherOptions struct {
// WaitIndex defines the index after-which the Watcher should // AfterIndex defines the index after-which the Watcher should
// start emitting events. For example, if a value of 5 is // start emitting events. For example, if a value of 5 is
// provided, the first event will have an index >= 6. // provided, the first event will have an index >= 6.
// //
// Setting WaitIndex to 0 (default) means that the Watcher // Setting AfterIndex to 0 (default) means that the Watcher
// should start watching for events starting at the current // should start watching for events starting at the current
// index, whatever that may be. // index, whatever that may be.
WaitIndex uint64 AfterIndex uint64
// Recursive specifices whether or not the Watcher should emit // Recursive specifices whether or not the Watcher should emit
// events that occur in children of the given keyspace. If set // events that occur in children of the given keyspace. If set
@ -276,7 +276,7 @@ func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher {
} }
if opts != nil { if opts != nil {
act.WaitIndex = opts.WaitIndex act.AfterIndex = opts.AfterIndex
act.Recursive = opts.Recursive act.Recursive = opts.Recursive
} }
@ -302,7 +302,7 @@ func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
return nil, err return nil, err
} }
hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 hw.nextWait.AfterIndex = resp.Node.ModifiedIndex + 1
return resp, nil return resp, nil
} }
@ -336,10 +336,10 @@ func (g *getAction) HTTPRequest(ep url.URL) *http.Request {
} }
type waitAction struct { type waitAction struct {
Prefix string Prefix string
Key string Key string
WaitIndex uint64 AfterIndex uint64
Recursive bool Recursive bool
} }
func (w *waitAction) HTTPRequest(ep url.URL) *http.Request { func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
@ -347,7 +347,7 @@ func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
params := u.Query() params := u.Query()
params.Set("wait", "true") params.Set("wait", "true")
params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10)) params.Set("waitIndex", strconv.FormatUint(w.AfterIndex, 10))
params.Set("recursive", strconv.FormatBool(w.Recursive)) params.Set("recursive", strconv.FormatBool(w.Recursive))
u.RawQuery = params.Encode() u.RawQuery = params.Encode()

View File

@ -152,32 +152,32 @@ func TestWaitAction(t *testing.T) {
wantHeader := http.Header{} wantHeader := http.Header{}
tests := []struct { tests := []struct {
waitIndex uint64 afterIndex uint64
recursive bool recursive bool
wantQuery string wantQuery string
}{ }{
{ {
recursive: false, recursive: false,
waitIndex: uint64(0), afterIndex: uint64(0),
wantQuery: "recursive=false&wait=true&waitIndex=0", wantQuery: "recursive=false&wait=true&waitIndex=0",
}, },
{ {
recursive: false, recursive: false,
waitIndex: uint64(12), afterIndex: uint64(12),
wantQuery: "recursive=false&wait=true&waitIndex=12", wantQuery: "recursive=false&wait=true&waitIndex=12",
}, },
{ {
recursive: true, recursive: true,
waitIndex: uint64(12), afterIndex: uint64(12),
wantQuery: "recursive=true&wait=true&waitIndex=12", wantQuery: "recursive=true&wait=true&waitIndex=12",
}, },
} }
for i, tt := range tests { for i, tt := range tests {
f := waitAction{ f := waitAction{
Key: "/foo/bar", Key: "/foo/bar",
WaitIndex: tt.waitIndex, AfterIndex: tt.afterIndex,
Recursive: tt.recursive, Recursive: tt.recursive,
} }
got := *f.HTTPRequest(ep) got := *f.HTTPRequest(ep)

View File

@ -190,7 +190,7 @@ func (d *discovery) createSelf(contents string) error {
} }
// ensure self appears on the server we connected to // ensure self appears on the server we connected to
w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{WaitIndex: resp.Node.CreatedIndex}) w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{AfterIndex: resp.Node.CreatedIndex})
_, err = w.Next(context.Background()) _, err = w.Next(context.Background())
return err return err
} }
@ -279,7 +279,7 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
nodes = nodes[:size] nodes = nodes[:size]
} }
// watch from the next index // watch from the next index
w := d.c.Watcher(d.cluster, &client.WatcherOptions{WaitIndex: index + 1, Recursive: true}) w := d.c.Watcher(d.cluster, &client.WatcherOptions{AfterIndex: index + 1, Recursive: true})
all := make([]*client.Node, len(nodes)) all := make([]*client.Node, len(nodes))
copy(all, nodes) copy(all, nodes)
for _, n := range all { for _, n := range all {

View File

@ -141,7 +141,7 @@ func TestForceNewCluster(t *testing.T) {
cancel() cancel()
// ensure create has been applied in this machine // ensure create has been applied in this machine
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Watcher("/foo", &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
t.Fatalf("unexpected watch error: %v", err) t.Fatalf("unexpected watch error: %v", err)
} }
cancel() cancel()
@ -162,7 +162,7 @@ func TestForceNewCluster(t *testing.T) {
kapi = client.NewKeysAPI(cc) kapi = client.NewKeysAPI(cc)
// ensure force restart keep the old data, and new cluster can make progress // ensure force restart keep the old data, and new cluster can make progress
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Watcher("/foo", &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
t.Fatalf("unexpected watch error: %v", err) t.Fatalf("unexpected watch error: %v", err)
} }
cancel() cancel()
@ -188,7 +188,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
mcc := mustNewHTTPClient(t, []string{u}) mcc := mustNewHTTPClient(t, []string{u})
mkapi := client.NewKeysAPI(mcc) mkapi := client.NewKeysAPI(mcc)
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := mkapi.Watcher(key, &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil { if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil {
t.Fatalf("#%d: watch on %s error: %v", i, u, err) t.Fatalf("#%d: watch on %s error: %v", i, u, err)
} }
mcancel() mcancel()
@ -551,7 +551,7 @@ func (m *member) WaitOK(t *testing.T) {
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
for { for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err := kapi.Get(ctx, "/") _, err := kapi.Get(ctx, "/", nil)
if err != nil { if err != nil {
time.Sleep(tickDuration) time.Sleep(tickDuration)
continue continue

View File

@ -109,7 +109,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
kapi := client.NewKeysAPI(cc) kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i) key := fmt.Sprintf("foo%d", i)
resp, err := kapi.Get(ctx, "/"+key) resp, err := kapi.Get(ctx, "/"+key, nil)
if err != nil { if err != nil {
t.Fatalf("#%d: get on %s error: %v", i, m.URL(), err) t.Fatalf("#%d: get on %s error: %v", i, m.URL(), err)
} }

View File

@ -269,14 +269,18 @@ func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) {
} }
token := u.Path token := u.Path
u.Path = "" u.Path = ""
c, err := client.NewHTTPClient(&http.Transport{}, []string{u.String()}) cfg := client.Config{
Transport: &http.Transport{},
Endpoints: []string{u.String()},
}
c, err := client.New(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
dc := client.NewDiscoveryKeysAPI(c) dc := client.NewKeysAPIWithPrefix(c, "")
ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
resp, err := dc.Get(ctx, token) resp, err := dc.Get(ctx, token, nil)
cancel() cancel()
if err != nil { if err != nil {
return nil, err return nil, err