functional-tester/etcd-tester: silent grpclog, check revs

This commit is contained in:
Gyu-Ho Lee 2016-02-05 10:53:09 -08:00
parent c3fd2f95f0
commit 09fc764552
2 changed files with 85 additions and 36 deletions

View File

@ -16,6 +16,8 @@ package main
import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
@ -24,10 +26,15 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/grpclog"
clientV2 "github.com/coreos/etcd/client"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
func init() {
grpclog.SetLogger(log.New(ioutil.Discard, "", 0))
}
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error

View File

@ -15,7 +15,6 @@
package main
import (
"fmt"
"log"
"sync"
"time"
@ -85,22 +84,54 @@ func (tt *tester) runLoop() {
s.Cancel()
}
log.Printf("etcd-tester: [round#%d case#%d] waiting 5s for pending PUTs to be committed across cluster...", i, j)
time.Sleep(5 * time.Second)
log.Printf("etcd-tester: [round#%d case#%d] starting checking consistency...", i, j)
err := tt.cluster.checkConsistency()
if err != nil {
log.Printf("etcd-tester: [round#%d case#%d] checkConsistency error (%v)", i, j, err)
ok := false
for k := 0; k < 5; k++ {
time.Sleep(time.Second)
log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions...", i, j, k)
revs, err := tt.cluster.getRevision()
if err != nil {
if e := tt.cleanup(i, j); e != nil {
log.Printf("etcd-tester: [round#%d case#%d.%d] cleanup error: %v", i, j, k, e)
return
}
log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get revisions (%v)", i, j, k, err)
continue
}
if ok = isSameValueInMap(revs); ok {
log.Printf("etcd-tester: [round#%d case#%d] checking current revisions succeed!", i, j)
break
} else {
log.Printf("etcd-tester: [round#%d case#%d] current revisions %+v", i, j, revs)
}
}
if !ok {
log.Printf("etcd-tester: [round#%d case#%d] checking current revisions failure...", i, j)
if err := tt.cleanup(i, j); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
return
}
} else {
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
continue
}
log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j)
hashes, err := tt.cluster.getKVHash()
if err != nil {
log.Printf("etcd-tester: [round#%d case#%d] getKVHash error (%v)", i, j, err)
if err := tt.cleanup(i, j); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
return
}
}
if !isSameValueInMap(hashes) {
if err := tt.cleanup(i, j); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
return
}
continue
}
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
for _, s := range tt.cluster.Stressers {
go s.Stress()
@ -152,44 +183,55 @@ func (s *Status) setCase(c int) {
s.Case = c
}
// checkConsistency stops the cluster for a moment and get the hashes of KV storages.
func (c *cluster) checkConsistency() error {
hashes := make(map[string]uint32)
func (c *cluster) getRevision() (map[string]int64, error) {
revs := make(map[string]int64)
for _, u := range c.GRPCURLs {
conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
if err != nil {
return err
return nil, err
}
kvc := pb.NewKVClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := kvc.Hash(ctx, &pb.HashRequest{})
hv := resp.Hash
if resp != nil && err != nil {
return err
resp, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
if err != nil {
return nil, err
}
cancel()
hashes[u] = hv
revs[u] = resp.Header.Revision
}
if !checkConsistency(hashes) {
return fmt.Errorf("check consistency fails: %v", hashes)
}
return nil
return revs, nil
}
// checkConsistency returns true if all nodes have the same KV hash values.
func checkConsistency(hashes map[string]uint32) bool {
var cv uint32
isConsistent := true
func (c *cluster) getKVHash() (map[string]int64, error) {
hashes := make(map[string]int64)
for _, u := range c.GRPCURLs {
conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
if err != nil {
return nil, err
}
kvc := pb.NewKVClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := kvc.Hash(ctx, &pb.HashRequest{})
if resp != nil && err != nil {
return nil, err
}
cancel()
hashes[u] = int64(resp.Hash)
}
return hashes, nil
}
func isSameValueInMap(hashes map[string]int64) bool {
var rv int64
ok := true
for _, v := range hashes {
if cv == 0 {
cv = v
if rv == 0 {
rv = v
}
if cv != v {
isConsistent = false
if rv != v {
ok = false
break
}
}
return isConsistent
return ok
}