etcdctl: exec_watch use etcd/client

This commit is contained in:
Xiang Li 2015-06-05 17:06:24 -07:00 committed by Yicheng Qin
parent 5b01b3877f
commit 759c156e3e

View File

@ -23,7 +23,8 @@ import (
"os/signal" "os/signal"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/client"
) )
// NewExecWatchCommand returns the CLI command for "exec-watch". // NewExecWatchCommand returns the CLI command for "exec-watch".
@ -36,24 +37,22 @@ func NewExecWatchCommand() cli.Command {
cli.BoolFlag{Name: "recursive", Usage: "watch all values for key and child keys"}, cli.BoolFlag{Name: "recursive", Usage: "watch all values for key and child keys"},
}, },
Action: func(c *cli.Context) { Action: func(c *cli.Context) {
handleKey(c, execWatchCommandFunc) execWatchCommandFunc(c, mustNewKeyAPI(c))
}, },
} }
} }
// execWatchCommandFunc executes the "exec-watch" command. // execWatchCommandFunc executes the "exec-watch" command.
func execWatchCommandFunc(c *cli.Context, client *etcd.Client) (*etcd.Response, error) { func execWatchCommandFunc(c *cli.Context, ki client.KeysAPI) {
_ = io.Copy
_ = exec.Command
args := c.Args() args := c.Args()
argsLen := len(args) argslen := len(args)
if argsLen < 2 { if argslen < 2 {
return nil, errors.New("Key and command to exec required") handleError(ExitBadArgs, errors.New("key and command to exec required"))
} }
key := args[argsLen-1] key := args[argslen-1]
cmdArgs := args[:argsLen-1] cmdArgs := args[:argslen-1]
index := 0 index := 0
if c.Int("after-index") != 0 { if c.Int("after-index") != 0 {
@ -70,19 +69,24 @@ func execWatchCommandFunc(c *cli.Context, client *etcd.Client) (*etcd.Response,
sigch := make(chan os.Signal, 1) sigch := make(chan os.Signal, 1)
signal.Notify(sigch, os.Interrupt) signal.Notify(sigch, os.Interrupt)
stop := make(chan bool)
go func() { go func() {
<-sigch <-sigch
stop <- true
os.Exit(0) os.Exit(0)
}() }()
receiver := make(chan *etcd.Response) w := ki.Watcher(key, &client.WatcherOptions{AfterIndex: uint64(index), Recursive: recursive})
go client.Watch(key, uint64(index), recursive, receiver, stop)
for { for {
resp := <-receiver resp, err := w.Next(context.TODO())
if err != nil {
handleError(ExitServerError, err)
}
if resp.Node.Dir {
fmt.Fprintf(os.Stderr, "Ignored dir %s change", resp.Node.Key)
continue
}
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
cmd.Env = environResponse(resp, os.Environ()) cmd.Env = environResponse(resp, os.Environ())
@ -96,18 +100,21 @@ func execWatchCommandFunc(c *cli.Context, client *etcd.Client) (*etcd.Response,
fmt.Fprintf(os.Stderr, err.Error()) fmt.Fprintf(os.Stderr, err.Error())
os.Exit(1) os.Exit(1)
} }
err = cmd.Start()
if err != nil { go func() {
fmt.Fprintf(os.Stderr, err.Error()) err := cmd.Start()
os.Exit(1) if err != nil {
} fmt.Fprintf(os.Stderr, err.Error())
go io.Copy(os.Stdout, stdout) os.Exit(1)
go io.Copy(os.Stderr, stderr) }
cmd.Wait() go io.Copy(os.Stdout, stdout)
go io.Copy(os.Stderr, stderr)
cmd.Wait()
}()
} }
} }
func environResponse(resp *etcd.Response, env []string) []string { func environResponse(resp *client.Response, env []string) []string {
env = append(env, "ETCD_WATCH_ACTION="+resp.Action) env = append(env, "ETCD_WATCH_ACTION="+resp.Action)
env = append(env, "ETCD_WATCH_MODIFIED_INDEX="+fmt.Sprintf("%d", resp.Node.ModifiedIndex)) env = append(env, "ETCD_WATCH_MODIFIED_INDEX="+fmt.Sprintf("%d", resp.Node.ModifiedIndex))
env = append(env, "ETCD_WATCH_KEY="+resp.Node.Key) env = append(env, "ETCD_WATCH_KEY="+resp.Node.Key)