Merge pull request #4448 from gyuho/f0

etcd-tester: add compactKV every case, clean up logs
This commit is contained in:
Gyu-Ho Lee 2016-02-06 15:14:41 -08:00
commit 2be7f7c2fb
2 changed files with 56 additions and 11 deletions

View File

@ -79,7 +79,6 @@ func (s *stresser) Stress() error {
})
putcancel()
if grpc.ErrorDesc(err) == context.Canceled.Error() {
log.Printf("etcd-tester: stresser#%d is cancelled", i)
return
}
s.mu.Lock()

View File

@ -55,6 +55,7 @@ func (tt *tester) runLoop() {
continue
}
log.Printf("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc())
log.Printf("etcd-tester: [round#%d case#%d] start injecting failure...", i, j)
if err := f.Inject(tt.cluster, i); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] injection error: %v", i, j, err)
@ -64,6 +65,8 @@ func (tt *tester) runLoop() {
}
continue
}
log.Printf("etcd-tester: [round#%d case#%d] injected failure", i, j)
log.Printf("etcd-tester: [round#%d case#%d] start recovering failure...", i, j)
if err := f.Recover(tt.cluster, i); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] recovery error: %v", i, j, err)
@ -73,6 +76,7 @@ func (tt *tester) runLoop() {
}
continue
}
log.Printf("etcd-tester: [round#%d case#%d] recovered failure", i, j)
if tt.cluster.v2Only {
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
@ -83,25 +87,26 @@ func (tt *tester) runLoop() {
for _, s := range tt.cluster.Stressers {
s.Cancel()
}
log.Printf("etcd-tester: [round#%d case#%d] canceled stressers", i, j)
log.Printf("etcd-tester: [round#%d case#%d] checking current revisions...", i, j)
ok := false
var currentRevision int64
for k := 0; k < 5; k++ {
time.Sleep(time.Second)
log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions...", i, j, k)
revs, err := tt.cluster.getRevision()
if err != nil {
if e := tt.cleanup(i, j); e != nil {
log.Printf("etcd-tester: [round#%d case#%d.%d] cleanup error: %v", i, j, k, e)
return
}
log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get revisions (%v)", i, j, k, err)
log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, err)
continue
}
if ok = isSameValueInMap(revs); ok {
log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions succeed!", i, j, k)
if currentRevision, ok = getSameValue(revs); ok {
break
} else {
log.Printf("etcd-tester: [round#%d case#%d.%d] current revisions %+v", i, j, k, revs)
log.Printf("etcd-tester: [round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs)
}
}
if !ok {
@ -112,6 +117,7 @@ func (tt *tester) runLoop() {
}
continue
}
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with current revisions", i, j)
log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j)
hashes, err := tt.cluster.getKVHash()
@ -122,20 +128,32 @@ func (tt *tester) runLoop() {
return
}
}
if !isSameValueInMap(hashes) {
if _, ok = getSameValue(hashes); !ok {
if err := tt.cleanup(i, j); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
return
}
continue
}
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with storage hashes", i, j)
revToCompact := max(0, currentRevision-10000)
log.Printf("etcd-tester: [round#%d case#%d] compacting storage at %d (current revision %d)", i, j, revToCompact, currentRevision)
if err := tt.cluster.compactKV(revToCompact); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] compactKV error (%v)", i, j, err)
if err := tt.cleanup(i, j); err != nil {
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
return
}
}
log.Printf("etcd-tester: [round#%d case#%d] compacted storage", i, j)
log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
for _, s := range tt.cluster.Stressers {
go s.Stress()
}
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
}
}
}
@ -221,7 +239,7 @@ func (c *cluster) getKVHash() (map[string]int64, error) {
return hashes, nil
}
func isSameValueInMap(hashes map[string]int64) bool {
func getSameValue(hashes map[string]int64) (int64, bool) {
var rv int64
ok := true
for _, v := range hashes {
@ -233,5 +251,33 @@ func isSameValueInMap(hashes map[string]int64) bool {
break
}
}
return ok
return rv, ok
}
func max(n1, n2 int64) int64 {
if n1 > n2 {
return n1
}
return n2
}
func (c *cluster) compactKV(rev int64) error {
var (
conn *grpc.ClientConn
err error
)
for _, u := range c.GRPCURLs {
conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
if err != nil {
continue
}
kvc := pb.NewKVClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev})
cancel()
if err == nil {
return nil
}
}
return err
}