From b98f67095e3c3a6f5fefeff2b20a82e4c57e5ca9 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 31 Mar 2016 11:02:01 -0700 Subject: [PATCH] e2e: add basic v3 watch test --- e2e/etcdctlv3_test.go | 213 +++++++++++++++++++++++++++++++----------- 1 file changed, 159 insertions(+), 54 deletions(-) diff --git a/e2e/etcdctlv3_test.go b/e2e/etcdctlv3_test.go index 55c9c5d42..959f0ec94 100644 --- a/e2e/etcdctlv3_test.go +++ b/e2e/etcdctlv3_test.go @@ -17,6 +17,7 @@ package e2e import ( "fmt" "os" + "strconv" "strings" "testing" "time" @@ -24,78 +25,163 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) -func TestCtlV3Set(t *testing.T) { - testCtlV3Set(t, &configNoTLS, 3*time.Second, false) +func TestCtlV3Put(t *testing.T) { testCtl(t, putTest) } +func TestCtlV3PutTimeout(t *testing.T) { testCtl(t, putTest, withDialTimeout(0)) } +func TestCtlV3PutTimeoutQuorum(t *testing.T) { testCtl(t, putTest, withDialTimeout(0), withQuorum()) } +func TestCtlV3PutAutoTLS(t *testing.T) { testCtl(t, putTest, withCfg(configAutoTLS)) } +func TestCtlV3PutPeerTLS(t *testing.T) { testCtl(t, putTest, withCfg(configPeerTLS)) } +func TestCtlV3PutClientTLS(t *testing.T) { testCtl(t, putTest, withCfg(configClientTLS)) } + +func TestCtlV3Watch(t *testing.T) { testCtl(t, watchTest) } +func TestCtlV3WatchAutoTLS(t *testing.T) { testCtl(t, watchTest, withCfg(configAutoTLS)) } +func TestCtlV3WatchPeerTLS(t *testing.T) { testCtl(t, watchTest, withCfg(configPeerTLS)) } + +// TODO: Watch with client TLS is not working +// func TestCtlV3WatchClientTLS(t *testing.T) { +// testCtl(t, watchTest, withCfg(configClientTLS)) +// } + +func TestCtlV3WatchInteractive(t *testing.T) { testCtl(t, watchTest, withInteractive()) } +func TestCtlV3WatchInteractiveAutoTLS(t *testing.T) { + testCtl(t, watchTest, withInteractive(), withCfg(configAutoTLS)) +} +func TestCtlV3WatchInteractivePeerTLS(t *testing.T) { + testCtl(t, watchTest, withInteractive(), withCfg(configPeerTLS)) } -func TestCtlV3SetZeroTimeout(t *testing.T) { - testCtlV3Set(t, &configNoTLS, 0, false) +// TODO: Watch with client TLS is not working +// func TestCtlV3WatchInteractiveClientTLS(t *testing.T) { +// testCtl(t, watchTest, withInteractive(), withCfg(configClientTLS)) +// } + +type ctlCtx struct { + t *testing.T + cfg etcdProcessClusterConfig + epc *etcdProcessCluster + + errc chan error + dialTimeout time.Duration + + quorum bool + interactive bool + watchRevision int } -func TestCtlV3SetTimeout(t *testing.T) { - testCtlV3Set(t, &configNoTLS, time.Nanosecond, false) +type ctlOption func(*ctlCtx) + +func (cx *ctlCtx) applyOpts(opts []ctlOption) { + for _, opt := range opts { + opt(cx) + } } -func TestCtlV3SetPeerTLS(t *testing.T) { - testCtlV3Set(t, &configPeerTLS, 3*time.Second, false) +func withCfg(cfg etcdProcessClusterConfig) ctlOption { + return func(cx *ctlCtx) { cx.cfg = cfg } } -func TestCtlV3SetQuorum(t *testing.T) { - testCtlV3Set(t, &configNoTLS, 3*time.Second, true) +func withDialTimeout(timeout time.Duration) ctlOption { + return func(cx *ctlCtx) { cx.dialTimeout = timeout } } -func TestCtlV3SetQuorumZeroTimeout(t *testing.T) { - testCtlV3Set(t, &configNoTLS, 0, true) +func withQuorum() ctlOption { + return func(cx *ctlCtx) { cx.quorum = true } } -func TestCtlV3SetQuorumTimeout(t *testing.T) { - testCtlV3Set(t, &configNoTLS, time.Nanosecond, true) +func withInteractive() ctlOption { + return func(cx *ctlCtx) { cx.interactive = true } } -func TestCtlV3SetPeerTLSQuorum(t *testing.T) { - testCtlV3Set(t, &configPeerTLS, 3*time.Second, true) +func withWatchRevision(rev int) ctlOption { + return func(cx *ctlCtx) { cx.watchRevision = rev } } -func testCtlV3Set(t *testing.T, cfg *etcdProcessClusterConfig, dialTimeout time.Duration, quorum bool) { +func setupCtlV3Test(t *testing.T, cfg etcdProcessClusterConfig, quorum bool) *etcdProcessCluster { + mustEtcdctl(t) + if !quorum { + cfg = *configStandalone(cfg) + } + epc, err := newEtcdProcessCluster(&cfg) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + return epc +} + +func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { defer testutil.AfterTest(t) + var ( + defaultDialTimeout = 7 * time.Second + defaultWatchRevision = 1 + ) + ret := ctlCtx{ + t: t, + cfg: configNoTLS, + errc: make(chan error, 1), + dialTimeout: defaultDialTimeout, + watchRevision: defaultWatchRevision, + } + ret.applyOpts(opts) + os.Setenv("ETCDCTL_API", "3") - epc := setupCtlV3Test(t, cfg, quorum) + ret.epc = setupCtlV3Test(ret.t, ret.cfg, ret.quorum) + defer func() { os.Unsetenv("ETCDCTL_API") - if errC := epc.Close(); errC != nil { + if errC := ret.epc.Close(); errC != nil { t.Fatalf("error closing etcd processes (%v)", errC) } }() + go testFunc(ret) + + select { + case <-time.After(2*ret.dialTimeout + time.Second): + if ret.dialTimeout > 0 { + t.Fatalf("test timed out for %v", ret.dialTimeout) + } + case err := <-ret.errc: + if err != nil { + t.Fatal(err) + } + } + return +} + +func putTest(cx ctlCtx) { key, value := "foo", "bar" - errc := make(chan error, 1) - expectTimeout := dialTimeout > 0 && dialTimeout <= time.Nanosecond - go func() { - defer close(errc) - if err := ctlV3Put(epc, key, value, dialTimeout); err != nil { - if expectTimeout && isGRPCTimedout(err) { - errc <- fmt.Errorf("put error (%v)", err) - return - } + defer close(cx.errc) + + if err := ctlV3Put(cx, key, value); err != nil { + if cx.dialTimeout > 0 && isGRPCTimedout(err) { + cx.errc <- fmt.Errorf("put error (%v)", err) + return } - if err := ctlV3Get(epc, key, value, dialTimeout, quorum); err != nil { - if expectTimeout && isGRPCTimedout(err) { - errc <- fmt.Errorf("get error (%v)", err) - return - } + } + if err := ctlV3Get(cx, key, value); err != nil { + if cx.dialTimeout > 0 && isGRPCTimedout(err) { + cx.errc <- fmt.Errorf("get error (%v)", err) + return + } + } +} + +func watchTest(cx ctlCtx) { + key, value := "foo", "bar" + + defer close(cx.errc) + + go func() { + if err := ctlV3Put(cx, key, value); err != nil { + cx.t.Fatal(err) } }() - select { - case <-time.After(2*dialTimeout + time.Second): - if dialTimeout > 0 { - t.Fatalf("test timed out for %v", dialTimeout) - } - case err := <-errc: - if err != nil { - t.Fatal(err) + if err := ctlV3Watch(cx, key, value); err != nil { + if cx.dialTimeout > 0 && isGRPCTimedout(err) { + cx.errc <- fmt.Errorf("watch error (%v)", err) + return } } } @@ -120,30 +206,49 @@ func ctlV3PrefixArgs(clus *etcdProcessCluster, dialTimeout time.Duration) []stri return cmdArgs } -func ctlV3Put(clus *etcdProcessCluster, key, value string, dialTimeout time.Duration) error { - cmdArgs := append(ctlV3PrefixArgs(clus, dialTimeout), "put", key, value) +func ctlV3Put(cx ctlCtx, key, value string) error { + cmdArgs := append(ctlV3PrefixArgs(cx.epc, cx.dialTimeout), "put", key, value) return spawnWithExpect(cmdArgs, "OK") } -func ctlV3Get(clus *etcdProcessCluster, key, value string, dialTimeout time.Duration, quorum bool) error { - cmdArgs := append(ctlV3PrefixArgs(clus, dialTimeout), "get", key) - if !quorum { +func ctlV3Get(cx ctlCtx, key, value string) error { + cmdArgs := append(ctlV3PrefixArgs(cx.epc, cx.dialTimeout), "get", key) + if !cx.quorum { cmdArgs = append(cmdArgs, "--consistency", "s") } return spawnWithExpects(cmdArgs, key, value) } -func setupCtlV3Test(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) *etcdProcessCluster { - mustEtcdctl(t) - if !quorum { - cfg = configStandalone(*cfg) +func ctlV3Watch(cx ctlCtx, key, value string) error { + cmdArgs := append(ctlV3PrefixArgs(cx.epc, cx.dialTimeout), "watch") + if !cx.interactive { + if cx.watchRevision > 0 { + cmdArgs = append(cmdArgs, "--rev", strconv.Itoa(cx.watchRevision)) + } + cmdArgs = append(cmdArgs, key) + return spawnWithExpects(cmdArgs, key, value) } - copied := *cfg - epc, err := newEtcdProcessCluster(&copied) + cmdArgs = append(cmdArgs, "--interactive") + proc, err := spawnCmd(cmdArgs) if err != nil { - t.Fatalf("could not start etcd process cluster (%v)", err) + return err } - return epc + watchLine := fmt.Sprintf("watch %s", key) + if cx.watchRevision > 0 { + watchLine = fmt.Sprintf("watch %s --rev %d", key, cx.watchRevision) + } + if err = proc.SendLine(watchLine); err != nil { + return err + } + _, err = proc.Expect(key) + if err != nil { + return err + } + _, err = proc.Expect(value) + if err != nil { + return err + } + return proc.Close() } func isGRPCTimedout(err error) bool {