From a2902c08a9b63d52b2d2d4df2a31e2b1be5a8035 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 20 Feb 2016 20:54:13 -0800 Subject: [PATCH] etcdctlv3: make watch interactive mode better --- etcdctlv3/README.md | 28 ++++++-- etcdctlv3/command/txn_command.go | 6 -- etcdctlv3/command/util.go | 6 ++ etcdctlv3/command/watch_command.go | 105 +++++++++++------------------ 4 files changed, 69 insertions(+), 76 deletions(-) diff --git a/etcdctlv3/README.md b/etcdctlv3/README.md index 27920a692..1a6a5e0ce 100644 --- a/etcdctlv3/README.md +++ b/etcdctlv3/README.md @@ -189,11 +189,19 @@ Watch watches events stream on keys or prefixes. The watch command runs until it - rev -- the revision to start watching. Specifying a revision is useful for observing past events. +#### Input Format + +Input is only accepted for interactive mode. + +``` +watch [options] \n +``` + #### Return value -Simple reply +##### Simple reply -- \\\n\\n\\\n\... +- \\n\\n\\n\\n\\n\\n... - Additional error string if WATCH failed. Exit code is non-zero. @@ -201,6 +209,8 @@ TODO: probably json and binary encoded proto #### Examples +##### Non-interactive + ``` bash ./etcdctl watch foo PUT @@ -208,9 +218,19 @@ foo bar ``` -#### Notes +##### Interactive -TODO: doc interactive mode +``` bash +./etcdctl watch -i +watch foo +watch foo +PUT +foo +bar +PUT +foo +bar +``` ## Utility Commands diff --git a/etcdctlv3/command/txn_command.go b/etcdctlv3/command/txn_command.go index 91acf99a7..37c24aab5 100644 --- a/etcdctlv3/command/txn_command.go +++ b/etcdctlv3/command/txn_command.go @@ -18,7 +18,6 @@ import ( "bufio" "fmt" "os" - "regexp" "strconv" "strings" @@ -117,11 +116,6 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) { return ops } -func argify(s string) []string { - r := regexp.MustCompile("'.+'|\".+\"|\\S+") - return r.FindAllString(s, -1) -} - func parseRequestUnion(line string) (*clientv3.Op, error) { args := argify(line) if len(args) < 2 { diff --git a/etcdctlv3/command/util.go b/etcdctlv3/command/util.go index 0f7799c9a..c74bbe02d 100644 --- a/etcdctlv3/command/util.go +++ b/etcdctlv3/command/util.go @@ -17,6 +17,7 @@ package command import ( "encoding/hex" "fmt" + "regexp" pb "github.com/coreos/etcd/storage/storagepb" ) @@ -41,3 +42,8 @@ func addHexPrefix(s string) string { } return string(ns) } + +func argify(s string) []string { + r := regexp.MustCompile("'.+'|\".+\"|\\S+") + return r.FindAllString(s, -1) +} diff --git a/etcdctlv3/command/watch_command.go b/etcdctlv3/command/watch_command.go index 98668c73e..686574fed 100644 --- a/etcdctlv3/command/watch_command.go +++ b/etcdctlv3/command/watch_command.go @@ -17,15 +17,12 @@ package command import ( "bufio" "fmt" - "io" "os" - "strconv" "strings" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) var ( @@ -71,12 +68,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { } else { wc = w.Watch(context.TODO(), args[0], watchRev) } - for resp := range wc { - for _, e := range resp.Events { - fmt.Println(e.Type) - printKV(watchHex, e.Kv) - } - } + printWatchCh(wc, watchHex) err := w.Close() if err == nil { ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server")) @@ -85,12 +77,8 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { } func watchInteractiveFunc(cmd *cobra.Command, args []string) { - wStream, err := mustClientFromCmd(cmd).Watch.Watch(context.TODO()) - if err != nil { - ExitWithError(ExitBadConnection, err) - } - - go recvLoop(wStream) + c := mustClientFromCmd(cmd) + w := clientv3.NewWatcher(c) reader := bufio.NewReader(os.Stdin) @@ -101,67 +89,52 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) { } l = strings.TrimSuffix(l, "\n") - // TODO: support start and end revision - segs := strings.Split(l, " ") - if len(segs) != 2 { - fmt.Fprintf(os.Stderr, "Invalid watch request format: use \"watch [key]\", \"watchprefix [prefix]\" or \"cancel [watcher ID]\"\n") + args := argify(l) + if len(args) < 2 { + fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l) continue } - var r *pb.WatchRequest - switch segs[0] { - case "watch": - r = &pb.WatchRequest{ - RequestUnion: &pb.WatchRequest_CreateRequest{ - CreateRequest: &pb.WatchCreateRequest{ - Key: []byte(segs[1])}}} - case "watchprefix": - r = &pb.WatchRequest{ - RequestUnion: &pb.WatchRequest_CreateRequest{ - CreateRequest: &pb.WatchCreateRequest{ - Prefix: []byte(segs[1])}}} - case "cancel": - id, perr := strconv.ParseInt(segs[1], 10, 64) - if perr != nil { - fmt.Fprintf(os.Stderr, "Invalid cancel ID (%v)\n", perr) - continue - } - r = &pb.WatchRequest{ - RequestUnion: &pb.WatchRequest_CancelRequest{ - CancelRequest: &pb.WatchCancelRequest{ - WatchId: id}}} - default: - fmt.Fprintf(os.Stderr, "Invalid watch request type: use watch, watchprefix or cancel\n") + if args[0] != "watch" { + fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l) continue } - err = wStream.Send(r) + flagset := NewWatchCommand().Flags() + err = flagset.Parse(args[1:]) if err != nil { - fmt.Fprintf(os.Stderr, "Error sending request to server: %v\n", err) + fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err) + continue } + moreargs := flagset.Args() + if len(moreargs) != 1 { + fmt.Fprintf(os.Stderr, "Invalid command %s (Too many arguments)\n", l) + continue + } + var key string + _, err = fmt.Sscanf(moreargs[0], "%q", &key) + if err != nil { + key = moreargs[0] + } + var ch clientv3.WatchChan + if watchPrefix { + ch = w.WatchPrefix(context.TODO(), key, watchRev) + } else { + ch = w.Watch(context.TODO(), key, watchRev) + } + go printWatchCh(ch, watchHex) } } -func recvLoop(wStream pb.Watch_WatchClient) { - for { - resp, err := wStream.Recv() - if err == io.EOF { - os.Exit(ExitSuccess) - } - if err != nil { - ExitWithError(ExitError, err) - } - - switch { - // TODO: handle canceled/compacted and other control response types - case resp.Created: - fmt.Printf("watcher created: id %08x\n", resp.WatchId) - case resp.Canceled: - fmt.Printf("watcher canceled: id %08x\n", resp.WatchId) - default: - for _, ev := range resp.Events { - fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value)) - } - } +func printWatchCh(ch clientv3.WatchChan, hex bool) { + for resp := range ch { + printWatchResponse(resp, hex) + } +} + +func printWatchResponse(resp clientv3.WatchResponse, hex bool) { + for _, e := range resp.Events { + fmt.Println(e.Type) + printKV(hex, e.Kv) } }