etcdctlv3: make watch interactive mode better

This commit is contained in:
Xiang Li 2016-02-20 20:54:13 -08:00
parent c48bafae85
commit a2902c08a9
4 changed files with 69 additions and 76 deletions

View File

@ -189,11 +189,19 @@ Watch watches events stream on keys or prefixes. The watch command runs until it
- rev -- the revision to start watching. Specifying a revision is useful for observing past events.
#### Input Format
Input is only accepted for interactive mode.
```
watch [options] <key or prefix>\n
```
#### Return value
Simple reply
##### Simple reply
- \<event\>\<key\>\n\<value\>\n\<event\>\<next_key\>\n\<next_value\>...
- \<event\>\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- Additional error string if WATCH failed. Exit code is non-zero.
@ -201,6 +209,8 @@ TODO: probably json and binary encoded proto
#### Examples
##### Non-interactive
``` bash
./etcdctl watch foo
PUT
@ -208,9 +218,19 @@ foo
bar
```
#### Notes
##### Interactive
TODO: doc interactive mode
``` bash
./etcdctl watch -i
watch foo
watch foo
PUT
foo
bar
PUT
foo
bar
```
## Utility Commands

View File

@ -18,7 +18,6 @@ import (
"bufio"
"fmt"
"os"
"regexp"
"strconv"
"strings"
@ -117,11 +116,6 @@ func readOps(r *bufio.Reader) (ops []clientv3.Op) {
return ops
}
func argify(s string) []string {
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
return r.FindAllString(s, -1)
}
func parseRequestUnion(line string) (*clientv3.Op, error) {
args := argify(line)
if len(args) < 2 {

View File

@ -17,6 +17,7 @@ package command
import (
"encoding/hex"
"fmt"
"regexp"
pb "github.com/coreos/etcd/storage/storagepb"
)
@ -41,3 +42,8 @@ func addHexPrefix(s string) string {
}
return string(ns)
}
func argify(s string) []string {
r := regexp.MustCompile("'.+'|\".+\"|\\S+")
return r.FindAllString(s, -1)
}

View File

@ -17,15 +17,12 @@ package command
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
var (
@ -71,12 +68,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
} else {
wc = w.Watch(context.TODO(), args[0], watchRev)
}
for resp := range wc {
for _, e := range resp.Events {
fmt.Println(e.Type)
printKV(watchHex, e.Kv)
}
}
printWatchCh(wc, watchHex)
err := w.Close()
if err == nil {
ExitWithError(ExitInterrupted, fmt.Errorf("watch is canceled by the server"))
@ -85,12 +77,8 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
}
func watchInteractiveFunc(cmd *cobra.Command, args []string) {
wStream, err := mustClientFromCmd(cmd).Watch.Watch(context.TODO())
if err != nil {
ExitWithError(ExitBadConnection, err)
}
go recvLoop(wStream)
c := mustClientFromCmd(cmd)
w := clientv3.NewWatcher(c)
reader := bufio.NewReader(os.Stdin)
@ -101,67 +89,52 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) {
}
l = strings.TrimSuffix(l, "\n")
// TODO: support start and end revision
segs := strings.Split(l, " ")
if len(segs) != 2 {
fmt.Fprintf(os.Stderr, "Invalid watch request format: use \"watch [key]\", \"watchprefix [prefix]\" or \"cancel [watcher ID]\"\n")
args := argify(l)
if len(args) < 2 {
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
continue
}
var r *pb.WatchRequest
switch segs[0] {
case "watch":
r = &pb.WatchRequest{
RequestUnion: &pb.WatchRequest_CreateRequest{
CreateRequest: &pb.WatchCreateRequest{
Key: []byte(segs[1])}}}
case "watchprefix":
r = &pb.WatchRequest{
RequestUnion: &pb.WatchRequest_CreateRequest{
CreateRequest: &pb.WatchCreateRequest{
Prefix: []byte(segs[1])}}}
case "cancel":
id, perr := strconv.ParseInt(segs[1], 10, 64)
if perr != nil {
fmt.Fprintf(os.Stderr, "Invalid cancel ID (%v)\n", perr)
continue
}
r = &pb.WatchRequest{
RequestUnion: &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
WatchId: id}}}
default:
fmt.Fprintf(os.Stderr, "Invalid watch request type: use watch, watchprefix or cancel\n")
if args[0] != "watch" {
fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
continue
}
err = wStream.Send(r)
flagset := NewWatchCommand().Flags()
err = flagset.Parse(args[1:])
if err != nil {
fmt.Fprintf(os.Stderr, "Error sending request to server: %v\n", err)
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
moreargs := flagset.Args()
if len(moreargs) != 1 {
fmt.Fprintf(os.Stderr, "Invalid command %s (Too many arguments)\n", l)
continue
}
var key string
_, err = fmt.Sscanf(moreargs[0], "%q", &key)
if err != nil {
key = moreargs[0]
}
var ch clientv3.WatchChan
if watchPrefix {
ch = w.WatchPrefix(context.TODO(), key, watchRev)
} else {
ch = w.Watch(context.TODO(), key, watchRev)
}
go printWatchCh(ch, watchHex)
}
}
func recvLoop(wStream pb.Watch_WatchClient) {
for {
resp, err := wStream.Recv()
if err == io.EOF {
os.Exit(ExitSuccess)
}
if err != nil {
ExitWithError(ExitError, err)
}
switch {
// TODO: handle canceled/compacted and other control response types
case resp.Created:
fmt.Printf("watcher created: id %08x\n", resp.WatchId)
case resp.Canceled:
fmt.Printf("watcher canceled: id %08x\n", resp.WatchId)
default:
for _, ev := range resp.Events {
fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
}
func printWatchCh(ch clientv3.WatchChan, hex bool) {
for resp := range ch {
printWatchResponse(resp, hex)
}
}
func printWatchResponse(resp clientv3.WatchResponse, hex bool) {
for _, e := range resp.Events {
fmt.Println(e.Type)
printKV(hex, e.Kv)
}
}