diff --git a/clientv3/example_watch_test.go b/clientv3/example_watch_test.go index 7c7ba1579..8c49821ac 100644 --- a/clientv3/example_watch_test.go +++ b/clientv3/example_watch_test.go @@ -35,7 +35,7 @@ func ExampleWatcher_watch() { wc := clientv3.NewWatcher(cli) defer wc.Close() - rch := wc.Watch(context.Background(), "foo", 0) + rch := wc.Watch(context.Background(), "foo") for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) @@ -57,7 +57,7 @@ func ExampleWatcher_watchPrefix() { wc := clientv3.NewWatcher(cli) defer wc.Close() - rch := wc.WatchPrefix(context.Background(), "foo", 0) + rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 91779ec69..24b430f4d 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -350,7 +350,7 @@ func TestKVCompact(t *testing.T) { wc := clientv3.NewWatcher(clus.RandClient()) defer wc.Close() - wchan := wc.Watch(ctx, "foo", 3) + wchan := wc.Watch(ctx, "foo", clientv3.WithRev(3)) if wr := <-wchan; wr.CompactRevision != 7 { t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index cf5b7f493..9939eafd4 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -73,7 +73,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { for _, k := range keys { // key watcher go func(key string) { - ch := wctx.w.Watch(context.TODO(), key, 0) + ch := wctx.w.Watch(context.TODO(), key) if ch == nil { t.Fatalf("expected watcher channel, got nil") } @@ -94,7 +94,7 @@ func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { } // prefix watcher on "b" (bar and baz) go func() { - prefixc := wctx.w.WatchPrefix(context.TODO(), "b", 0) + prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix()) if prefixc == nil { t.Fatalf("expected watcher channel, got nil") } @@ -181,7 +181,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) { } }() // should reconnect when requesting watch - if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { + if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { t.Fatalf("expected non-nil channel") } @@ -200,7 +200,7 @@ func TestWatchReconnInit(t *testing.T) { } func testWatchReconnInit(t *testing.T, wctx *watchctx) { - if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { + if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { t.Fatalf("expected non-nil channel") } // take down watcher connection @@ -216,7 +216,7 @@ func TestWatchReconnRunning(t *testing.T) { } func testWatchReconnRunning(t *testing.T, wctx *watchctx) { - if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { + if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { t.Fatalf("expected non-nil channel") } putAndWatch(t, wctx, "a", "a") @@ -233,7 +233,7 @@ func TestWatchCancelInit(t *testing.T) { func testWatchCancelInit(t *testing.T, wctx *watchctx) { ctx, cancel := context.WithCancel(context.Background()) - if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil { + if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil { t.Fatalf("expected non-nil watcher channel") } cancel() @@ -254,7 +254,7 @@ func TestWatchCancelRunning(t *testing.T) { func testWatchCancelRunning(t *testing.T, wctx *watchctx) { ctx, cancel := context.WithCancel(context.Background()) - if wctx.ch = wctx.w.Watch(ctx, "a", 0); wctx.ch == nil { + if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil { t.Fatalf("expected non-nil watcher channel") } if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil { diff --git a/clientv3/op.go b/clientv3/op.go index 7a5549720..9f76e2d39 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -15,6 +15,8 @@ package clientv3 import ( + "reflect" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" ) @@ -36,10 +38,12 @@ type Op struct { // for range limit int64 - rev int64 sort *SortOption serializable bool + // for range, watch + rev int64 + // for put val []byte leaseID lease.LeaseID @@ -65,6 +69,27 @@ func (op Op) toRequestUnion() *pb.RequestUnion { } } +func (op Op) toWatchRequest() *watchRequest { + switch op.t { + case tRange: + key := string(op.key) + prefix := "" + if op.end != nil { + prefix = key + key = "" + } + wr := &watchRequest{ + key: key, + prefix: prefix, + rev: op.rev, + } + return wr + + default: + panic("Only for tRange") + } +} + func (op Op) isWrite() bool { return op.t != tRange } @@ -111,6 +136,24 @@ func OpPut(key, val string, opts ...OpOption) Op { return ret } +func opWatch(key string, opts ...OpOption) Op { + ret := Op{t: tRange, key: []byte(key)} + ret.applyOpts(opts) + switch { + case ret.end != nil && !reflect.DeepEqual(ret.end, getPrefix(ret.key)): + panic("only supports single keys or prefixes") + case ret.leaseID != 0: + panic("unexpected lease in watch") + case ret.limit != 0: + panic("unexpected limit in watch") + case ret.sort != nil: + panic("unexpected sort in watch") + case ret.serializable != false: + panic("unexpected serializable in watch") + } + return ret +} + func (op *Op) applyOpts(opts []OpOption) { for _, opt := range opts { opt(op) @@ -129,8 +172,7 @@ func WithLease(leaseID lease.LeaseID) OpOption { func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } } // WithRev specifies the store revision for 'Get' request. -// -// TODO: support Watch API +// Or the start revision of 'Watch' request. func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } } // WithSort specifies the ordering in 'Get' request. It requires @@ -143,25 +185,28 @@ func WithSort(target SortTarget, order SortOrder) OpOption { } } -// WithPrefix enables 'Get' or 'Delete' requests to operate on the -// keys with matching prefix. For example, 'Get(foo, WithPrefix())' +func getPrefix(key []byte) []byte { + end := make([]byte, len(key)) + copy(end, key) + for i := len(end) - 1; i >= 0; i-- { + if end[i] < 0xff { + end[i] = end[i] + 1 + end = end[:i+1] + return end + } + } + // next prefix does not exist (e.g., 0xffff); + // default to WithFromKey policy + end = []byte{0} + return end +} + +// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate +// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())' // can return 'foo1', 'foo2', and so on. -// -// TODO: support Watch API func WithPrefix() OpOption { return func(op *Op) { - op.end = make([]byte, len(op.key)) - copy(op.end, op.key) - for i := len(op.end) - 1; i >= 0; i-- { - if op.end[i] < 0xff { - op.end[i] = op.end[i] + 1 - op.end = op.end[:i+1] - return - } - } - // next prefix does not exist (e.g., 0xffff); - // default to WithFromKey policy - op.end = []byte{0} + op.end = getPrefix(op.key) } } diff --git a/clientv3/sync/syncer.go b/clientv3/sync/syncer.go index d416c660c..59bf95cbd 100644 --- a/clientv3/sync/syncer.go +++ b/clientv3/sync/syncer.go @@ -116,7 +116,7 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan { // get all events since revision (or get non-compacted revision, if // rev is too far behind) - wch := wapi.WatchPrefix(ctx, s.prefix, s.rev) + wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev)) for wr := range wch { respchan <- wr } diff --git a/clientv3/watch.go b/clientv3/watch.go index cbc7df456..149497896 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -27,17 +27,12 @@ import ( type WatchChan <-chan WatchResponse type Watcher interface { - // Watch watches on a single key. The watched events will be returned + // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. // If the watch is slow or the required rev is compacted, the watch request // might be canceled from the server-side and the chan will be closed. - Watch(ctx context.Context, key string, rev int64) WatchChan - - // WatchPrefix watches on a prefix. The watched events will be returned - // through the returned channel. - // If the watch is slow or the required rev is compacted, the watch request - // might be canceled from the server-side and the chan will be closed. - WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan + // 'opts' can be: 'WithRev' and/or 'WitchPrefix'. + Watch(ctx context.Context, key string, opts ...OpOption) WatchChan // Close closes the watcher and cancels all watch requests. Close() error @@ -127,27 +122,16 @@ func NewWatcher(c *Client) Watcher { return w } -func (w *watcher) Watch(ctx context.Context, key string, rev int64) WatchChan { - return w.watch(ctx, key, "", rev) -} +// Watch posts a watch request to run() and waits for a new watcher channel +func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { + ow := opWatch(key, opts...) -func (w *watcher) WatchPrefix(ctx context.Context, prefix string, rev int64) WatchChan { - return w.watch(ctx, "", prefix, rev) -} + wr := ow.toWatchRequest() + wr.ctx = ctx -func (w *watcher) Close() error { - select { - case w.stopc <- struct{}{}: - case <-w.donec: - } - <-w.donec - return <-w.errc -} - -// watch posts a watch request to run() and waits for a new watcher channel -func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) WatchChan { retc := make(chan chan WatchResponse, 1) - wr := &watchRequest{ctx: ctx, key: key, prefix: prefix, rev: rev, retc: retc} + wr.retc = retc + // submit request select { case w.reqc <- wr: @@ -167,6 +151,15 @@ func (w *watcher) watch(ctx context.Context, key, prefix string, rev int64) Watc } } +func (w *watcher) Close() error { + select { + case w.stopc <- struct{}{}: + case <-w.donec: + } + <-w.donec + return <-w.errc +} + func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { if pendingReq == nil { // no pending request; ignore diff --git a/contrib/recipes/watch.go b/contrib/recipes/watch.go index 7a869736c..508582d6f 100644 --- a/contrib/recipes/watch.go +++ b/contrib/recipes/watch.go @@ -23,7 +23,7 @@ import ( // WaitEvents waits on a key until it observes the given events and returns the final one. func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { w := clientv3.NewWatcher(c) - wc := w.Watch(context.Background(), key, rev) + wc := w.Watch(context.Background(), key, clientv3.WithRev(rev)) if wc == nil { w.Close() return nil, ErrNoWatcher @@ -33,7 +33,7 @@ func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { w := clientv3.NewWatcher(c) - wc := w.WatchPrefix(context.Background(), prefix, rev) + wc := w.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev)) if wc == nil { w.Close() return nil, ErrNoWatcher diff --git a/etcdctlv3/command/snapshot_command.go b/etcdctlv3/command/snapshot_command.go index 4ea4446bb..3746630e3 100644 --- a/etcdctlv3/command/snapshot_command.go +++ b/etcdctlv3/command/snapshot_command.go @@ -54,7 +54,7 @@ func snapshotToStdout(c *clientv3.Client) { // must explicitly fetch first revision since no retry on stdout wapi := clientv3.NewWatcher(c) defer wapi.Close() - wr := <-wapi.WatchPrefix(context.TODO(), "", 1) + wr := <-wapi.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) if len(wr.Events) > 0 { wr.CompactRevision = 1 } diff --git a/etcdctlv3/command/watch_command.go b/etcdctlv3/command/watch_command.go index 58defbefa..332345008 100644 --- a/etcdctlv3/command/watch_command.go +++ b/etcdctlv3/command/watch_command.go @@ -60,12 +60,11 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { c := mustClientFromCmd(cmd) w := clientv3.NewWatcher(c) - var wc clientv3.WatchChan - if !watchPrefix { - wc = w.Watch(context.TODO(), args[0], watchRev) - } else { - wc = w.Watch(context.TODO(), args[0], watchRev) + opts := []clientv3.OpOption{clientv3.WithRev(watchRev)} + if watchPrefix { + opts = append(opts, clientv3.WithPrefix()) } + wc := w.Watch(context.TODO(), args[0], opts...) printWatchCh(wc) err := w.Close() if err == nil { @@ -114,12 +113,11 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) { if err != nil { key = moreargs[0] } - var ch clientv3.WatchChan + opts := []clientv3.OpOption{clientv3.WithRev(watchRev)} if watchPrefix { - ch = w.WatchPrefix(context.TODO(), key, watchRev) - } else { - ch = w.Watch(context.TODO(), key, watchRev) + opts = append(opts, clientv3.WithPrefix()) } + ch := w.Watch(context.TODO(), key, opts...) go printWatchCh(ch) } }