From d430c7baf70404e4df6d8a1697a41df118411c24 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 24 Feb 2016 22:39:43 -0800 Subject: [PATCH] clientv3: use default client watcher --- clientv3/concurrency/key.go | 4 +--- clientv3/example_watch_test.go | 10 ++-------- clientv3/mirror/syncer.go | 18 +----------------- contrib/recipes/watch.go | 12 ++++-------- etcdctlv3/command/snapshot_command.go | 4 +--- etcdctlv3/command/watch_command.go | 11 ++++------- 6 files changed, 13 insertions(+), 46 deletions(-) diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go index b741f97b4..e16de51b4 100644 --- a/clientv3/concurrency/key.go +++ b/clientv3/concurrency/key.go @@ -40,9 +40,7 @@ func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption } func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { - w := v3.NewWatcher(client) - defer w.Close() - wc := w.Watch(ctx, key, opts...) + wc := client.Watch(ctx, key, opts...) if wc == nil { return ctx.Err() } diff --git a/clientv3/example_watch_test.go b/clientv3/example_watch_test.go index 8c49821ac..4cee69d78 100644 --- a/clientv3/example_watch_test.go +++ b/clientv3/example_watch_test.go @@ -32,10 +32,7 @@ func ExampleWatcher_watch() { } defer cli.Close() - wc := clientv3.NewWatcher(cli) - defer wc.Close() - - rch := wc.Watch(context.Background(), "foo") + rch := cli.Watch(context.Background(), "foo") for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) @@ -54,10 +51,7 @@ func ExampleWatcher_watchPrefix() { } defer cli.Close() - wc := clientv3.NewWatcher(cli) - defer wc.Close() - - rch := wc.Watch(context.Background(), "foo", clientv3.WithPrefix()) + rch := cli.Watch(context.Background(), "foo", clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) diff --git a/clientv3/mirror/syncer.go b/clientv3/mirror/syncer.go index f9faaed3d..a07a40f6f 100644 --- a/clientv3/mirror/syncer.go +++ b/clientv3/mirror/syncer.go @@ -106,21 +106,5 @@ func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan { if s.rev == 0 { panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?") } - - respchan := make(chan clientv3.WatchResponse, 1024) - - go func() { - wapi := clientv3.NewWatcher(s.c) - defer wapi.Close() - defer close(respchan) - - // get all events since revision (or get non-compacted revision, if - // rev is too far behind) - wch := wapi.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev)) - for wr := range wch { - respchan <- wr - } - }() - - return respchan + return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev)) } diff --git a/contrib/recipes/watch.go b/contrib/recipes/watch.go index 508582d6f..9fe70c795 100644 --- a/contrib/recipes/watch.go +++ b/contrib/recipes/watch.go @@ -22,23 +22,19 @@ import ( // WaitEvents waits on a key until it observes the given events and returns the final one. func WaitEvents(c *clientv3.Client, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { - w := clientv3.NewWatcher(c) - wc := w.Watch(context.Background(), key, clientv3.WithRev(rev)) + wc := c.Watch(context.Background(), key, clientv3.WithRev(rev)) if wc == nil { - w.Close() return nil, ErrNoWatcher } - return waitEvents(wc, evs), w.Close() + return waitEvents(wc, evs), nil } func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { - w := clientv3.NewWatcher(c) - wc := w.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev)) + wc := c.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithRev(rev)) if wc == nil { - w.Close() return nil, ErrNoWatcher } - return waitEvents(wc, evs), w.Close() + return waitEvents(wc, evs), nil } func waitEvents(wc clientv3.WatchChan, evs []storagepb.Event_EventType) *storagepb.Event { diff --git a/etcdctlv3/command/snapshot_command.go b/etcdctlv3/command/snapshot_command.go index 0dcc868cb..7931d8adb 100644 --- a/etcdctlv3/command/snapshot_command.go +++ b/etcdctlv3/command/snapshot_command.go @@ -52,9 +52,7 @@ func snapshotCommandFunc(cmd *cobra.Command, args []string) { // snapshotToStdout streams a snapshot over stdout func snapshotToStdout(c *clientv3.Client) { // must explicitly fetch first revision since no retry on stdout - wapi := clientv3.NewWatcher(c) - defer wapi.Close() - wr := <-wapi.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) + wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1)) if len(wr.Events) > 0 { wr.CompactRevision = 1 } diff --git a/etcdctlv3/command/watch_command.go b/etcdctlv3/command/watch_command.go index 332345008..ba8b7c419 100644 --- a/etcdctlv3/command/watch_command.go +++ b/etcdctlv3/command/watch_command.go @@ -57,16 +57,14 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { ExitWithError(ExitBadArgs, fmt.Errorf("watch in non-interactive mode requires an argument as key or prefix")) } - c := mustClientFromCmd(cmd) - w := clientv3.NewWatcher(c) - opts := []clientv3.OpOption{clientv3.WithRev(watchRev)} if watchPrefix { opts = append(opts, clientv3.WithPrefix()) } - wc := w.Watch(context.TODO(), args[0], opts...) + c := mustClientFromCmd(cmd) + wc := c.Watch(context.TODO(), args[0], opts...) printWatchCh(wc) - err := w.Close() + err := c.Close() if err == nil { ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server")) } @@ -75,7 +73,6 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { func watchInteractiveFunc(cmd *cobra.Command, args []string) { c := mustClientFromCmd(cmd) - w := clientv3.NewWatcher(c) reader := bufio.NewReader(os.Stdin) @@ -117,7 +114,7 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) { if watchPrefix { opts = append(opts, clientv3.WithPrefix()) } - ch := w.Watch(context.TODO(), key, opts...) + ch := c.Watch(context.TODO(), key, opts...) go printWatchCh(ch) } }