client: pass around options as pointers

This commit is contained in:
Brian Waldon 2015-01-23 10:54:52 -08:00 committed by Yicheng Qin
parent 0a7e0875d5
commit 3d53e9bfaa
4 changed files with 44 additions and 33 deletions

View File

@ -61,16 +61,16 @@ func NewDiscoveryKeysAPI(c HTTPClient) KeysAPI {
} }
type KeysAPI interface { type KeysAPI interface {
Set(ctx context.Context, key, value string, opts SetOptions) (*Response, error) Set(ctx context.Context, key, value string, opts *SetOptions) (*Response, error)
Create(ctx context.Context, key, value string) (*Response, error) Create(ctx context.Context, key, value string) (*Response, error)
Update(ctx context.Context, key, value string) (*Response, error) Update(ctx context.Context, key, value string) (*Response, error)
Delete(ctx context.Context, key string, opts DeleteOptions) (*Response, error) Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error)
Get(ctx context.Context, key string) (*Response, error) Get(ctx context.Context, key string) (*Response, error)
RGet(ctx context.Context, key string) (*Response, error) RGet(ctx context.Context, key string) (*Response, error)
Watcher(key string, opts WatcherOptions) Watcher Watcher(key string, opts *WatcherOptions) Watcher
} }
type WatcherOptions struct { type WatcherOptions struct {
@ -119,14 +119,17 @@ type httpKeysAPI struct {
prefix string prefix string
} }
func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts SetOptions) (*Response, error) { func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions) (*Response, error) {
act := &setAction{ act := &setAction{
Prefix: k.prefix, Prefix: k.prefix,
Key: key, Key: key,
Value: val, Value: val,
PrevValue: opts.PrevValue, }
PrevIndex: opts.PrevIndex,
PrevExist: opts.PrevExist, if opts != nil {
act.PrevValue = opts.PrevValue
act.PrevIndex = opts.PrevIndex
act.PrevExist = opts.PrevExist
} }
resp, body, err := k.client.Do(ctx, act) resp, body, err := k.client.Do(ctx, act)
@ -138,20 +141,23 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts SetOptions)
} }
func (k *httpKeysAPI) Create(ctx context.Context, key, val string) (*Response, error) { func (k *httpKeysAPI) Create(ctx context.Context, key, val string) (*Response, error) {
return k.Set(ctx, key, val, SetOptions{PrevExist: PrevNoExist}) return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevNoExist})
} }
func (k *httpKeysAPI) Update(ctx context.Context, key, val string) (*Response, error) { func (k *httpKeysAPI) Update(ctx context.Context, key, val string) (*Response, error) {
return k.Set(ctx, key, val, SetOptions{PrevExist: PrevExist}) return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevExist})
} }
func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts DeleteOptions) (*Response, error) { func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) {
act := &deleteAction{ act := &deleteAction{
Prefix: k.prefix, Prefix: k.prefix,
Key: key, Key: key,
PrevValue: opts.PrevValue, }
PrevIndex: opts.PrevIndex,
Recursive: opts.Recursive, if opts != nil {
act.PrevValue = opts.PrevValue
act.PrevIndex = opts.PrevIndex
act.Recursive = opts.Recursive
} }
resp, body, err := k.client.Do(ctx, act) resp, body, err := k.client.Do(ctx, act)
@ -192,15 +198,20 @@ func (k *httpKeysAPI) RGet(ctx context.Context, key string) (*Response, error) {
return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body) return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
} }
func (k *httpKeysAPI) Watcher(key string, opts WatcherOptions) Watcher { func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher {
act := waitAction{
Prefix: k.prefix,
Key: key,
}
if opts != nil {
act.WaitIndex = opts.WaitIndex
act.Recursive = opts.Recursive
}
return &httpWatcher{ return &httpWatcher{
client: k.client, client: k.client,
nextWait: waitAction{ nextWait: act,
Prefix: k.prefix,
Key: key,
WaitIndex: opts.WaitIndex,
Recursive: opts.Recursive,
},
} }
} }

View File

@ -186,7 +186,7 @@ func (d *discovery) createSelf(contents string) error {
} }
// ensure self appears on the server we connected to // ensure self appears on the server we connected to
w := d.c.Watcher(d.selfKey(), client.WatcherOptions{WaitIndex: resp.Node.CreatedIndex}) w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{WaitIndex: resp.Node.CreatedIndex})
_, err = w.Next(context.Background()) _, err = w.Next(context.Background())
return err return err
} }
@ -275,7 +275,7 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int, index uint64) (clien
nodes = nodes[:size] nodes = nodes[:size]
} }
// watch from the next index // watch from the next index
w := d.c.Watcher(d.cluster, client.WatcherOptions{WaitIndex: index + 1, Recursive: true}) w := d.c.Watcher(d.cluster, &client.WatcherOptions{WaitIndex: index + 1, Recursive: true})
all := make(client.Nodes, len(nodes)) all := make(client.Nodes, len(nodes))
copy(all, nodes) copy(all, nodes)
for _, n := range all { for _, n := range all {

View File

@ -431,7 +431,7 @@ func (c *clientWithResp) Get(ctx context.Context, key string) (*client.Response,
return r, nil return r, nil
} }
func (c *clientWithResp) Watcher(key string, opts client.WatcherOptions) client.Watcher { func (c *clientWithResp) Watcher(key string, opts *client.WatcherOptions) client.Watcher {
return c.w return c.w
} }
@ -449,7 +449,7 @@ func (c *clientWithErr) Get(ctx context.Context, key string) (*client.Response,
return &client.Response{}, c.err return &client.Response{}, c.err
} }
func (c *clientWithErr) Watcher(key string, opts client.WatcherOptions) client.Watcher { func (c *clientWithErr) Watcher(key string, opts *client.WatcherOptions) client.Watcher {
return c.w return c.w
} }

View File

@ -142,7 +142,7 @@ func TestForceNewCluster(t *testing.T) {
cancel() cancel()
// ensure create has been applied in this machine // ensure create has been applied in this machine
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Watcher("/foo", client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { if _, err := kapi.Watcher("/foo", &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
t.Fatalf("unexpected watch error: %v", err) t.Fatalf("unexpected watch error: %v", err)
} }
cancel() cancel()
@ -163,7 +163,7 @@ func TestForceNewCluster(t *testing.T) {
kapi = client.NewKeysAPI(cc) kapi = client.NewKeysAPI(cc)
// ensure force restart keep the old data, and new cluster can make progress // ensure force restart keep the old data, and new cluster can make progress
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Watcher("/foo", client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil { if _, err := kapi.Watcher("/foo", &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(ctx); err != nil {
t.Fatalf("unexpected watch error: %v", err) t.Fatalf("unexpected watch error: %v", err)
} }
cancel() cancel()
@ -189,7 +189,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
mcc := mustNewHTTPClient(t, []string{u}) mcc := mustNewHTTPClient(t, []string{u})
mkapi := client.NewKeysAPI(mcc) mkapi := client.NewKeysAPI(mcc)
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := mkapi.Watcher(key, client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil { if _, err := mkapi.Watcher(key, &client.WatcherOptions{WaitIndex: resp.Node.ModifiedIndex}).Next(mctx); err != nil {
t.Fatalf("#%d: watch on %s error: %v", i, u, err) t.Fatalf("#%d: watch on %s error: %v", i, u, err)
} }
mcancel() mcancel()