mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcd-tester: add compactKV
It compacts storage for every case. For https://github.com/coreos/etcd/issues/4380.
This commit is contained in:
parent
e7a5899582
commit
08dbabdb5f
@ -79,7 +79,6 @@ func (s *stresser) Stress() error {
|
|||||||
})
|
})
|
||||||
putcancel()
|
putcancel()
|
||||||
if grpc.ErrorDesc(err) == context.Canceled.Error() {
|
if grpc.ErrorDesc(err) == context.Canceled.Error() {
|
||||||
log.Printf("etcd-tester: stresser#%d is cancelled", i)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
@ -55,6 +55,7 @@ func (tt *tester) runLoop() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc())
|
log.Printf("etcd-tester: [round#%d case#%d] start failure %s", i, j, f.Desc())
|
||||||
|
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] start injecting failure...", i, j)
|
log.Printf("etcd-tester: [round#%d case#%d] start injecting failure...", i, j)
|
||||||
if err := f.Inject(tt.cluster, i); err != nil {
|
if err := f.Inject(tt.cluster, i); err != nil {
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] injection error: %v", i, j, err)
|
log.Printf("etcd-tester: [round#%d case#%d] injection error: %v", i, j, err)
|
||||||
@ -64,6 +65,8 @@ func (tt *tester) runLoop() {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] injected failure", i, j)
|
||||||
|
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] start recovering failure...", i, j)
|
log.Printf("etcd-tester: [round#%d case#%d] start recovering failure...", i, j)
|
||||||
if err := f.Recover(tt.cluster, i); err != nil {
|
if err := f.Recover(tt.cluster, i); err != nil {
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] recovery error: %v", i, j, err)
|
log.Printf("etcd-tester: [round#%d case#%d] recovery error: %v", i, j, err)
|
||||||
@ -73,6 +76,7 @@ func (tt *tester) runLoop() {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] recovered failure", i, j)
|
||||||
|
|
||||||
if tt.cluster.v2Only {
|
if tt.cluster.v2Only {
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
|
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
|
||||||
@ -83,25 +87,26 @@ func (tt *tester) runLoop() {
|
|||||||
for _, s := range tt.cluster.Stressers {
|
for _, s := range tt.cluster.Stressers {
|
||||||
s.Cancel()
|
s.Cancel()
|
||||||
}
|
}
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] canceled stressers", i, j)
|
||||||
|
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] checking current revisions...", i, j)
|
||||||
ok := false
|
ok := false
|
||||||
|
var currentRevision int64
|
||||||
for k := 0; k < 5; k++ {
|
for k := 0; k < 5; k++ {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions...", i, j, k)
|
|
||||||
revs, err := tt.cluster.getRevision()
|
revs, err := tt.cluster.getRevision()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if e := tt.cleanup(i, j); e != nil {
|
if e := tt.cleanup(i, j); e != nil {
|
||||||
log.Printf("etcd-tester: [round#%d case#%d.%d] cleanup error: %v", i, j, k, e)
|
log.Printf("etcd-tester: [round#%d case#%d.%d] cleanup error: %v", i, j, k, e)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get revisions (%v)", i, j, k, err)
|
log.Printf("etcd-tester: [round#%d case#%d.%d] failed to get current revisions (%v)", i, j, k, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ok = isSameValueInMap(revs); ok {
|
if currentRevision, ok = getSameValue(revs); ok {
|
||||||
log.Printf("etcd-tester: [round#%d case#%d.%d] checking current revisions succeed!", i, j, k)
|
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
log.Printf("etcd-tester: [round#%d case#%d.%d] current revisions %+v", i, j, k, revs)
|
log.Printf("etcd-tester: [round#%d case#%d.%d] inconsistent current revisions %+v", i, j, k, revs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -112,6 +117,7 @@ func (tt *tester) runLoop() {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with current revisions", i, j)
|
||||||
|
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j)
|
log.Printf("etcd-tester: [round#%d case#%d] checking current storage hashes...", i, j)
|
||||||
hashes, err := tt.cluster.getKVHash()
|
hashes, err := tt.cluster.getKVHash()
|
||||||
@ -122,20 +128,32 @@ func (tt *tester) runLoop() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !isSameValueInMap(hashes) {
|
if _, ok = getSameValue(hashes); !ok {
|
||||||
if err := tt.cleanup(i, j); err != nil {
|
if err := tt.cleanup(i, j); err != nil {
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
|
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent!", i, j)
|
log.Printf("etcd-tester: [round#%d case#%d] all members are consistent with storage hashes", i, j)
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
|
|
||||||
|
revToCompact := max(0, currentRevision-10000)
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] compacting storage at %d (current revision %d)", i, j, revToCompact, currentRevision)
|
||||||
|
if err := tt.cluster.compactKV(revToCompact); err != nil {
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] compactKV error (%v)", i, j, err)
|
||||||
|
if err := tt.cleanup(i, j); err != nil {
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] cleanup error: %v", i, j, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] compacted storage", i, j)
|
||||||
|
|
||||||
log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
|
log.Printf("etcd-tester: [round#%d case#%d] restarting the stressers...", i, j)
|
||||||
for _, s := range tt.cluster.Stressers {
|
for _, s := range tt.cluster.Stressers {
|
||||||
go s.Stress()
|
go s.Stress()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Printf("etcd-tester: [round#%d case#%d] succeed!", i, j)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -221,7 +239,7 @@ func (c *cluster) getKVHash() (map[string]int64, error) {
|
|||||||
return hashes, nil
|
return hashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isSameValueInMap(hashes map[string]int64) bool {
|
func getSameValue(hashes map[string]int64) (int64, bool) {
|
||||||
var rv int64
|
var rv int64
|
||||||
ok := true
|
ok := true
|
||||||
for _, v := range hashes {
|
for _, v := range hashes {
|
||||||
@ -233,5 +251,33 @@ func isSameValueInMap(hashes map[string]int64) bool {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ok
|
return rv, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func max(n1, n2 int64) int64 {
|
||||||
|
if n1 > n2 {
|
||||||
|
return n1
|
||||||
|
}
|
||||||
|
return n2
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cluster) compactKV(rev int64) error {
|
||||||
|
var (
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
for _, u := range c.GRPCURLs {
|
||||||
|
conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
kvc := pb.NewKVClient(conn)
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
_, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev})
|
||||||
|
cancel()
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user