diff --git a/tools/functional-tester/etcd-tester/main.go b/tools/functional-tester/etcd-tester/main.go index 708b221d5..f0854de46 100644 --- a/tools/functional-tester/etcd-tester/main.go +++ b/tools/functional-tester/etcd-tester/main.go @@ -33,10 +33,24 @@ func main() { } defer c.Terminate() + stressers := make([]Stresser, len(c.ClientURLs)) + for i, u := range c.ClientURLs { + s := &stresser{ + Endpoint: u, + N: 200, + } + go s.Stress() + stressers[i] = s + } + t := &tester{ failures: []failure{newFailureBase(), newFailureKillAll()}, cluster: c, limit: *limit, } t.runLoop() + + for _, s := range stressers { + s.Cancel() + } } diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go new file mode 100644 index 000000000..680d47337 --- /dev/null +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -0,0 +1,87 @@ +package main + +import ( + "net" + "net/http" + "sync" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/client" +) + +type Stresser interface { + // Stress starts to stress the etcd cluster + Stress() error + // Cancel cancels the stress test on the etcd cluster + Cancel() + // Report reports the success and failure of the stress test + Report() (success int, failure int) +} + +type stresser struct { + Endpoint string + // TODO: not implemented + SuffixRange int + + N int + // TODO: not implemented + Interval time.Duration + + mu sync.Mutex + failure int + success int + + cancel func() +} + +func (s *stresser) Stress() error { + cfg := client.Config{ + Endpoints: []string{s.Endpoint}, + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + MaxIdleConnsPerHost: s.N, + }, + } + c, err := client.New(cfg) + if err != nil { + return err + } + + kv := client.NewKeysAPI(c) + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + + for i := 0; i < s.N; i++ { + go func() { + for { + _, err := kv.Set(ctx, "foo", "bar", nil) + if err == context.Canceled { + return + } + s.mu.Lock() + if err != nil { + s.failure++ + } + s.success++ + s.mu.Unlock() + } + }() + } + + <-ctx.Done() + return nil +} + +func (s *stresser) Cancel() { + s.cancel() +} + +func (s *stresser) Report() (success int, failure int) { + s.mu.Lock() + defer s.mu.Unlock() + return s.success, s.failure +}