From ed682c9f08a517d2f89e513586b70d3ecbb1c593 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 3 Feb 2016 14:13:43 -0800 Subject: [PATCH 1/2] tools/functional-tester: minor cleanup --- tools/functional-tester/etcd-agent/agent_test.go | 12 +++++++++--- tools/functional-tester/etcd-agent/rpc.go | 1 + tools/functional-tester/etcd-agent/rpc_test.go | 8 ++++---- tools/functional-tester/etcd-tester/cluster.go | 8 ++++---- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/tools/functional-tester/etcd-agent/agent_test.go b/tools/functional-tester/etcd-agent/agent_test.go index 031c14241..4d8ea24f6 100644 --- a/tools/functional-tester/etcd-agent/agent_test.go +++ b/tools/functional-tester/etcd-agent/agent_test.go @@ -23,20 +23,24 @@ import ( const etcdPath = "./etcd" func TestAgentStart(t *testing.T) { + defer os.Remove("etcd.log") + a, dir := newTestAgent(t) defer a.terminate() - err := a.start("-data-dir", dir) + err := a.start("--data-dir", dir) if err != nil { t.Fatal(err) } } func TestAgentRestart(t *testing.T) { + defer os.Remove("etcd.log") + a, dir := newTestAgent(t) defer a.terminate() - err := a.start("-data-dir", dir) + err := a.start("--data-dir", dir) if err != nil { t.Fatal(err) } @@ -52,9 +56,11 @@ func TestAgentRestart(t *testing.T) { } func TestAgentTerminate(t *testing.T) { + defer os.Remove("etcd.log") + a, dir := newTestAgent(t) - err := a.start("-data-dir", dir) + err := a.start("--data-dir", dir) if err != nil { t.Fatal(err) } diff --git a/tools/functional-tester/etcd-agent/rpc.go b/tools/functional-tester/etcd-agent/rpc.go index 32b857587..195f023de 100644 --- a/tools/functional-tester/etcd-agent/rpc.go +++ b/tools/functional-tester/etcd-agent/rpc.go @@ -30,6 +30,7 @@ func (a *Agent) serveRPC() { if e != nil { log.Fatal("agent:", e) } + log.Println("agent listening on :9027") go http.Serve(l, nil) } diff --git a/tools/functional-tester/etcd-agent/rpc_test.go b/tools/functional-tester/etcd-agent/rpc_test.go index bda72aa9c..4563c5310 100644 --- a/tools/functional-tester/etcd-agent/rpc_test.go +++ b/tools/functional-tester/etcd-agent/rpc_test.go @@ -43,7 +43,7 @@ func TestRPCStart(t *testing.T) { t.Fatal(err) } var pid int - err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid) + err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid) if err != nil { t.Fatal(err) } @@ -66,7 +66,7 @@ func TestRPCRestart(t *testing.T) { t.Fatal(err) } var pid int - err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid) + err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid) if err != nil { t.Fatal(err) } @@ -111,7 +111,7 @@ func TestRPCTerminate(t *testing.T) { t.Fatal(err) } var pid int - err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid) + err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid) if err != nil { t.Fatal(err) } @@ -146,7 +146,7 @@ func TestRPCStatus(t *testing.T) { t.Fatal(err) } var pid int - err = c.Call("Agent.RPCStart", []string{"-data-dir", dir}, &pid) + err = c.Call("Agent.RPCStart", []string{"--data-dir", dir}, &pid) if err != nil { t.Fatal(err) } diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 8109fa4f7..ce40b33e1 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -23,7 +23,7 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - etcdclient "github.com/coreos/etcd/client" + clientv2 "github.com/coreos/etcd/client" "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client" ) @@ -198,15 +198,15 @@ func (c *cluster) Status() ClusterStatus { // setHealthKey sets health key on all given urls. func setHealthKey(us []string) error { for _, u := range us { - cfg := etcdclient.Config{ + cfg := clientv2.Config{ Endpoints: []string{u}, } - c, err := etcdclient.New(cfg) + c, err := clientv2.New(cfg) if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), time.Second) - kapi := etcdclient.NewKeysAPI(c) + kapi := clientv2.NewKeysAPI(c) _, err = kapi.Set(ctx, "health", "good", nil) cancel() if err != nil { From 7a3426a23123155e0199d7161c96c3d0c9d981f1 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 3 Feb 2016 14:30:05 -0800 Subject: [PATCH 2/2] tools/functional-tester/etcd-tester: support v3 kv storage --- .../functional-tester/etcd-tester/cluster.go | 109 +++++++++++++----- tools/functional-tester/etcd-tester/main.go | 13 ++- .../functional-tester/etcd-tester/stresser.go | 80 +++++++++++-- 3 files changed, 161 insertions(+), 41 deletions(-) diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index ce40b33e1..3e1a69543 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -23,13 +23,18 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - clientv2 "github.com/coreos/etcd/client" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + + clientV2 "github.com/coreos/etcd/client" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/tools/functional-tester/etcd-agent/client" ) const peerURLPort = 2380 type cluster struct { + v2Only bool // to be deprecated + agentEndpoints []string datadir string stressKeySize int @@ -39,6 +44,7 @@ type cluster struct { Agents []client.Agent Stressers []Stresser Names []string + GRPCURLs []string ClientURLs []string } @@ -47,8 +53,9 @@ type ClusterStatus struct { } // newCluster starts and returns a new cluster. The caller should call Terminate when finished, to shut it down. -func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int) (*cluster, error) { +func newCluster(agentEndpoints []string, datadir string, stressKeySize, stressKeySuffixRange int, isV2Only bool) (*cluster, error) { c := &cluster{ + v2Only: isV2Only, agentEndpoints: agentEndpoints, datadir: datadir, stressKeySize: stressKeySize, @@ -65,6 +72,7 @@ func (c *cluster) Bootstrap() error { agents := make([]client.Agent, size) names := make([]string, size) + grpcURLs := make([]string, size) clientURLs := make([]string, size) peerURLs := make([]string, size) members := make([]string, size) @@ -90,18 +98,28 @@ func (c *cluster) Bootstrap() error { token := fmt.Sprint(rand.Int()) for i, a := range agents { - _, err := a.Start( - "-name", names[i], - "-data-dir", c.datadir, - "-advertise-client-urls", clientURLs[i], - "-listen-client-urls", clientURLs[i], - "-initial-advertise-peer-urls", peerURLs[i], - "-listen-peer-urls", peerURLs[i], - "-initial-cluster-token", token, - "-initial-cluster", clusterStr, - "-initial-cluster-state", "new", - ) - if err != nil { + flags := []string{ + "--name", names[i], + "--data-dir", c.datadir, + + "--listen-client-urls", clientURLs[i], + "--advertise-client-urls", clientURLs[i], + + "--listen-peer-urls", peerURLs[i], + "--initial-advertise-peer-urls", peerURLs[i], + + "--initial-cluster-token", token, + "--initial-cluster", clusterStr, + "--initial-cluster-state", "new", + } + if !c.v2Only { + flags = append(flags, + "--experimental-v3demo", + "--experimental-gRPC-addr", grpcURLs[i], + ) + } + + if _, err := a.Start(flags...); err != nil { // cleanup for j := 0; j < i; j++ { agents[j].Terminate() @@ -110,22 +128,36 @@ func (c *cluster) Bootstrap() error { } } - stressers := make([]Stresser, len(clientURLs)) - for i, u := range clientURLs { - s := &stresser{ - Endpoint: u, - KeySize: c.stressKeySize, - KeySuffixRange: c.stressKeySuffixRange, - N: 200, + var stressers []Stresser + if c.v2Only { + for _, u := range clientURLs { + s := &stresserV2{ + Endpoint: u, + KeySize: c.stressKeySize, + KeySuffixRange: c.stressKeySuffixRange, + N: 200, + } + go s.Stress() + stressers = append(stressers, s) + } + } else { + for _, u := range grpcURLs { + s := &stresser{ + Endpoint: u, + KeySize: c.stressKeySize, + KeySuffixRange: c.stressKeySuffixRange, + N: 200, + } + go s.Stress() + stressers = append(stressers, s) } - go s.Stress() - stressers[i] = s } c.Size = size c.Agents = agents c.Stressers = stressers c.Names = names + c.GRPCURLs = grpcURLs c.ClientURLs = clientURLs return nil } @@ -136,8 +168,13 @@ func (c *cluster) WaitHealth() error { // TODO: set it to a reasonable value. It is set that high because // follower may use long time to catch up the leader when reboot under // reasonable workload (https://github.com/coreos/etcd/issues/2698) + healthFunc, urls := setHealthKey, c.GRPCURLs + if c.v2Only { + healthFunc = setHealthKeyV2 + urls = c.ClientURLs + } for i := 0; i < 60; i++ { - err = setHealthKey(c.ClientURLs) + err = healthFunc(urls) if err == nil { return nil } @@ -198,15 +235,33 @@ func (c *cluster) Status() ClusterStatus { // setHealthKey sets health key on all given urls. func setHealthKey(us []string) error { for _, u := range us { - cfg := clientv2.Config{ + conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) + if err != nil { + return fmt.Errorf("no connection available for %s (%v)", u, err) + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + kvc := pb.NewKVClient(conn) + _, err = kvc.Put(ctx, &pb.PutRequest{Key: []byte("health"), Value: []byte("good")}) + cancel() + if err != nil { + return err + } + } + return nil +} + +// setHealthKeyV2 sets health key on all given urls. +func setHealthKeyV2(us []string) error { + for _, u := range us { + cfg := clientV2.Config{ Endpoints: []string{u}, } - c, err := clientv2.New(cfg) + c, err := clientV2.New(cfg) if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), time.Second) - kapi := clientv2.NewKeysAPI(c) + kapi := clientV2.NewKeysAPI(c) _, err = kapi.Set(ctx, "health", "good", nil) cancel() if err != nil { diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 41a3e568f..acd918fa2 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -22,15 +22,16 @@ import ( ) func main() { - endpointStr := flag.String("agent-endpoints", ":9027", "HTTP RPC endpoints of agents") - datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine") - stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd") - stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd") - limit := flag.Int("limit", 3, "the limit of rounds to run failure set") + endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.") + datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.") + stressKeySize := flag.Int("stress-key-size", 100, "the size of each key written into etcd.") + stressKeySuffixRange := flag.Int("stress-key-count", 250000, "the count of key range written into etcd.") + limit := flag.Int("limit", 3, "the limit of rounds to run failure set.") + isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.") flag.Parse() endpoints := strings.Split(*endpointStr, ",") - c, err := newCluster(endpoints, *datadir, *stressKeySize, *stressKeySuffixRange) + c, err := newCluster(endpoints, *datadir, *stressKeySize, *stressKeySuffixRange, *isV2Only) if err != nil { log.Fatal(err) } diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index 2026dc967..467ffe1c9 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -23,7 +23,9 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/client" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + clientV2 "github.com/coreos/etcd/client" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) type Stresser interface { @@ -41,6 +43,68 @@ type stresser struct { KeySize int KeySuffixRange int + N int + + mu sync.Mutex + failure int + success int + + cancel func() +} + +func (s *stresser) Stress() error { + conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) + if err != nil { + return fmt.Errorf("no connection available for %s (%v)", s.Endpoint, err) + } + kvc := pb.NewKVClient(conn) + + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + + for i := 0; i < s.N; i++ { + go func(i int) { + for { + putctx, putcancel := context.WithTimeout(ctx, 5*time.Second) + _, err := kvc.Put(putctx, &pb.PutRequest{ + Key: []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))), + Value: []byte(randStr(s.KeySize)), + }) + putcancel() + if err == context.Canceled { + return + } + s.mu.Lock() + if err != nil { + s.failure++ + } else { + s.success++ + } + s.mu.Unlock() + } + }(i) + } + + <-ctx.Done() + return nil +} + +func (s *stresser) Cancel() { + s.cancel() +} + +func (s *stresser) Report() (success int, failure int) { + s.mu.Lock() + defer s.mu.Unlock() + return s.success, s.failure +} + +type stresserV2 struct { + Endpoint string + + KeySize int + KeySuffixRange int + N int // TODO: not implemented Interval time.Duration @@ -52,8 +116,8 @@ type stresser struct { cancel func() } -func (s *stresser) Stress() error { - cfg := client.Config{ +func (s *stresserV2) Stress() error { + cfg := clientV2.Config{ Endpoints: []string{s.Endpoint}, Transport: &http.Transport{ Dial: (&net.Dialer{ @@ -63,19 +127,19 @@ func (s *stresser) Stress() error { MaxIdleConnsPerHost: s.N, }, } - c, err := client.New(cfg) + c, err := clientV2.New(cfg) if err != nil { return err } - kv := client.NewKeysAPI(c) + kv := clientV2.NewKeysAPI(c) ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel for i := 0; i < s.N; i++ { go func() { for { - setctx, setcancel := context.WithTimeout(ctx, client.DefaultRequestTimeout) + setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout) key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange)) _, err := kv.Set(setctx, key, randStr(s.KeySize), nil) setcancel() @@ -97,11 +161,11 @@ func (s *stresser) Stress() error { return nil } -func (s *stresser) Cancel() { +func (s *stresserV2) Cancel() { s.cancel() } -func (s *stresser) Report() (success int, failure int) { +func (s *stresserV2) Report() (success int, failure int) { s.mu.Lock() defer s.mu.Unlock() return s.success, s.failure