From 09fc764552f2de7b1c69fca31bc89637391075e9 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Fri, 5 Feb 2016 10:53:09 -0800 Subject: [PATCH] functional-tester/etcd-tester: silent grpclog, check revs --- .../functional-tester/etcd-tester/stresser.go | 7 ++ tools/functional-tester/etcd-tester/tester.go | 114 ++++++++++++------ 2 files changed, 85 insertions(+), 36 deletions(-) diff --git a/tools/functional-tester/etcd-tester/stresser.go b/tools/functional-tester/etcd-tester/stresser.go index f2aa60c97..205188b50 100644 --- a/tools/functional-tester/etcd-tester/stresser.go +++ b/tools/functional-tester/etcd-tester/stresser.go @@ -16,6 +16,8 @@ package main import ( "fmt" + "io/ioutil" + "log" "math/rand" "net" "net/http" @@ -24,10 +26,15 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/grpclog" clientV2 "github.com/coreos/etcd/client" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) +func init() { + grpclog.SetLogger(log.New(ioutil.Discard, "", 0)) +} + type Stresser interface { // Stress starts to stress the etcd cluster Stress() error diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index a64abab05..ed18e1b9b 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -15,7 +15,6 @@ package main import ( - "fmt" "log" "sync" "time" @@ -85,22 +84,54 @@ func (tt *tester) runLoop() { s.Cancel() } - log.Printf("etcd-tester: [round#%d case#%d] waiting 5s for pending PUTs to be committed across cluster...", i, j) - time.Sleep(5 * time.Second) - - log.Printf("etcd-tester: [round#%d case#%d] starting checking consistency...", i, j) - err := tt.cluster.checkConsistency() - if err != nil { - log.Printf("etcd-tester: [round#%d case#%d] checkConsistency error (%v)", i, j, err) + ok := false + for k := 0; k < 5; k++ { + time.Sleep(time.Second) + log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions...", i, j, k) + revs, err := tt.cluster.getRevision() + if err != nil { + if e := tt.cleanup(i, j); e != nil { + log.Printf("etcd-tester: [round#%d case#%d.%d] cleanup error: %v", i, j, k, e) + return + } + log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get revisions (%v)", i, j, k, err) + continue + } + if ok = isSameValueInMap(revs); ok { + log.Printf("etcd-tester: [round#%d case#%d] checking current revisions succeed!", i, j) + break + } else { + log.Printf("etcd-tester: [round#%d case#%d] current revisions %+v", i, j, revs) + } + } + if !ok { + log.Printf("etcd-tester: [round#%d case#%d] checking current revisions failure...", i, j) if err := tt.cleanup(i, j); err != nil { log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err) return } - } else { - log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j) - log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j) + continue } + log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j) + hashes, err := tt.cluster.getKVHash() + if err != nil { + log.Printf("etcd-tester: [round#%d case#%d] getKVHash error (%v)", i, j, err) + if err := tt.cleanup(i, j); err != nil { + log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err) + return + } + } + if !isSameValueInMap(hashes) { + if err := tt.cleanup(i, j); err != nil { + log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err) + return + } + continue + } + log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j) + log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j) + log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j) for _, s := range tt.cluster.Stressers { go s.Stress() @@ -152,44 +183,55 @@ func (s *Status) setCase(c int) { s.Case = c } -// checkConsistency stops the cluster for a moment and get the hashes of KV storages. -func (c *cluster) checkConsistency() error { - hashes := make(map[string]uint32) +func (c *cluster) getRevision() (map[string]int64, error) { + revs := make(map[string]int64) for _, u := range c.GRPCURLs { conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) if err != nil { - return err + return nil, err } kvc := pb.NewKVClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - resp, err := kvc.Hash(ctx, &pb.HashRequest{}) - hv := resp.Hash - if resp != nil && err != nil { - return err + resp, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + return nil, err } cancel() - - hashes[u] = hv + revs[u] = resp.Header.Revision } - - if !checkConsistency(hashes) { - return fmt.Errorf("check consistency fails: %v", hashes) - } - return nil + return revs, nil } -// checkConsistency returns true if all nodes have the same KV hash values. -func checkConsistency(hashes map[string]uint32) bool { - var cv uint32 - isConsistent := true +func (c *cluster) getKVHash() (map[string]int64, error) { + hashes := make(map[string]int64) + for _, u := range c.GRPCURLs { + conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) + if err != nil { + return nil, err + } + kvc := pb.NewKVClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + resp, err := kvc.Hash(ctx, &pb.HashRequest{}) + if resp != nil && err != nil { + return nil, err + } + cancel() + hashes[u] = int64(resp.Hash) + } + return hashes, nil +} + +func isSameValueInMap(hashes map[string]int64) bool { + var rv int64 + ok := true for _, v := range hashes { - if cv == 0 { - cv = v + if rv == 0 { + rv = v } - if cv != v { - isConsistent = false + if rv != v { + ok = false + break } } - return isConsistent + return ok }