diff --git a/tools/functional-tester/etcd-agent/agent.go b/tools/functional-tester/etcd-agent/agent.go index 7302dc275..faa067f45 100644 --- a/tools/functional-tester/etcd-agent/agent.go +++ b/tools/functional-tester/etcd-agent/agent.go @@ -157,7 +157,7 @@ func (a *Agent) cleanup() error { // https://github.com/torvalds/linux/blob/master/fs/drop_caches.c cmd := exec.Command("/bin/sh", "-c", `echo "echo 1 > /proc/sys/vm/drop_caches" | sudo sh`) if err := cmd.Run(); err != nil { - plog.Printf("error when cleaning page cache (%v)", err) + plog.Infof("error when cleaning page cache (%v)", err) } return nil } diff --git a/tools/functional-tester/etcd-tester/checks.go b/tools/functional-tester/etcd-tester/checks.go index a648d53bf..183e8e619 100644 --- a/tools/functional-tester/etcd-tester/checks.go +++ b/tools/functional-tester/etcd-tester/checks.go @@ -18,6 +18,9 @@ import ( "fmt" "time" + "google.golang.org/grpc" + + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" @@ -77,20 +80,31 @@ func (hc *hashChecker) Check() error { return hc.checkRevAndHashes() } -type leaseChecker struct{ ls *leaseStresser } +type leaseChecker struct { + ls *leaseStresser + leaseClient pb.LeaseClient + kvc pb.KVClient +} func (lc *leaseChecker) Check() error { - plog.Infof("checking revoked leases %v", lc.ls.revokedLeases.leases) + conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1)) + if err != nil { + return fmt.Errorf("%v (%s)", err, lc.ls.endpoint) + } + defer func() { + if conn != nil { + conn.Close() + } + }() + lc.kvc = pb.NewKVClient(conn) + lc.leaseClient = pb.NewLeaseClient(conn) if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { return err } - plog.Infof("checking alive leases %v", lc.ls.aliveLeases.leases) if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil { return err } - plog.Infof("checking short lived leases %v", lc.ls.shortLivedLeases.leases) return lc.checkShortLivedLeases() - } // checkShortLivedLeases ensures leases expire. @@ -117,19 +131,19 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it. var resp *pb.LeaseTimeToLiveResponse for i := 0; i < retries; i++ { - resp, err = lc.ls.getLeaseByID(ctx, leaseID) + resp, err = lc.getLeaseByID(ctx, leaseID) if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { return nil } if err != nil { - plog.Warningf("retry %d. failed to retrieve lease %v error (%v)", i, leaseID, err) + plog.Debugf("retry %d. failed to retrieve lease %v error (%v)", i, leaseID, err) continue } if resp.TTL > 0 { - plog.Warningf("lease %v is not expired. sleep for %d until it expires.", leaseID, resp.TTL) + plog.Debugf("lease %v is not expired. sleep for %d until it expires.", leaseID, resp.TTL) time.Sleep(time.Duration(resp.TTL) * time.Second) } else { - plog.Warningf("retry %d. lease %v is expired but not yet revoked", i, leaseID) + plog.Debugf("retry %d. lease %v is expired but not yet revoked", i, leaseID) time.Sleep(time.Second) } if err = lc.checkLease(ctx, false, leaseID); err != nil { @@ -141,12 +155,12 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64) } func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { - keysExpired, err := lc.ls.hasKeysAttachedToLeaseExpired(ctx, leaseID) + keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID) if err != nil { plog.Errorf("hasKeysAttachedToLeaseExpired error: (%v)", err) return err } - leaseExpired, err := lc.ls.hasLeaseExpired(ctx, leaseID) + leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID) if err != nil { plog.Errorf("hasLeaseExpired error: (%v)", err) return err @@ -171,6 +185,42 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error { return nil } +func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { + ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} + return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) +} + +func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { + // keep retrying until lease's state is known or ctx is being canceled + for ctx.Err() == nil { + resp, err := lc.getLeaseByID(ctx, leaseID) + if err == nil { + return false, nil + } + if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { + return true, nil + } + plog.Warningf("hasLeaseExpired %v resp %v error (%v)", leaseID, resp, err) + } + return false, ctx.Err() +} + +// The keys attached to the lease has the format of "_" where idx is the ordering key creation +// Since the format of keys contains about leaseID, finding keys base on "" prefix +// determines whether the attached keys for a given leaseID has been deleted or not +func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { + resp, err := lc.kvc.Range(ctx, &pb.RangeRequest{ + Key: []byte(fmt.Sprintf("%d", leaseID)), + RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), + }, grpc.FailFast(false)) + plog.Debugf("hasKeysAttachedToLeaseExpired %v resp %v error (%v)", leaseID, resp, err) + if err != nil { + plog.Errorf("retriving keys attached to lease %v error: (%v)", leaseID, err) + return false, err + } + return len(resp.Kvs) == 0, nil +} + // compositeChecker implements a checker that runs a slice of Checkers concurrently. type compositeChecker struct{ checkers []Checker } diff --git a/tools/functional-tester/etcd-tester/lease_stresser.go b/tools/functional-tester/etcd-tester/lease_stresser.go index 1b80347d7..963a902a8 100644 --- a/tools/functional-tester/etcd-tester/lease_stresser.go +++ b/tools/functional-tester/etcd-tester/lease_stresser.go @@ -22,7 +22,6 @@ import ( "time" - "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" @@ -114,14 +113,6 @@ func (ls *leaseStresser) setupOnce() error { panic("expect keysPerLease to be set") } - conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) - if err != nil { - return fmt.Errorf("%v (%s)", err, ls.endpoint) - } - ls.conn = conn - ls.kvc = pb.NewKVClient(conn) - ls.lc = pb.NewLeaseClient(conn) - ls.aliveLeases = &atomicLeases{leases: make(map[int64]time.Time)} return nil @@ -132,6 +123,14 @@ func (ls *leaseStresser) Stress() error { if err := ls.setupOnce(); err != nil { return err } + + conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) + if err != nil { + return fmt.Errorf("%v (%s)", err, ls.endpoint) + } + ls.conn = conn + ls.kvc = pb.NewKVClient(conn) + ls.lc = pb.NewLeaseClient(conn) ls.revokedLeases = &atomicLeases{leases: make(map[int64]time.Time)} ls.shortLivedLeases = &atomicLeases{leases: make(map[int64]time.Time)} @@ -255,42 +254,6 @@ func (ls *leaseStresser) randomlyDropLeases() { wg.Wait() } -func (ls *leaseStresser) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { - ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} - return ls.lc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) -} - -func (ls *leaseStresser) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { - // keep retrying until lease's state is known or ctx is being canceled - for ctx.Err() == nil { - resp, err := ls.getLeaseByID(ctx, leaseID) - if err == nil { - return false, nil - } - if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound { - return true, nil - } - plog.Warningf("hasLeaseExpired %v resp %v error (%v)", leaseID, resp, err) - } - return false, ctx.Err() -} - -// The keys attached to the lease has the format of "_" where idx is the ordering key creation -// Since the format of keys contains about leaseID, finding keys base on "" prefix -// determines whether the attached keys for a given leaseID has been deleted or not -func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { - resp, err := ls.kvc.Range(ctx, &pb.RangeRequest{ - Key: []byte(fmt.Sprintf("%d", leaseID)), - RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), - }, grpc.FailFast(false)) - plog.Debugf("hasKeysAttachedToLeaseExpired %v resp %v error (%v)", leaseID, resp, err) - if err != nil { - plog.Errorf("retriving keys attached to lease %v error: (%v)", leaseID, err) - return false, err - } - return len(resp.Kvs) == 0, nil -} - func (ls *leaseStresser) createLease(ttl int64) (int64, error) { resp, err := ls.lc.LeaseGrant(ls.ctx, &pb.LeaseGrantRequest{TTL: ttl}) if err != nil { @@ -402,6 +365,7 @@ func (ls *leaseStresser) Cancel() { ls.cancel() ls.runWg.Wait() ls.aliveWg.Wait() + ls.conn.Close() plog.Infof("lease stresser %q is canceled", ls.endpoint) } @@ -409,4 +373,4 @@ func (ls *leaseStresser) ModifiedKeys() int64 { return atomic.LoadInt64(&ls.atomicModifiedKey) } -func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls} } +func (ls *leaseStresser) Checker() Checker { return &leaseChecker{ls: ls} } diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 62b3ec9bc..5eecf292d 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -75,7 +75,7 @@ func (tt *tester) runLoop() { preModifiedKey = currentModifiedKey timeout := 10 * time.Second timeout += time.Duration(modifiedKey/compactQPS) * time.Second - plog.Printf("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout) + plog.Infof("%s compacting %d modifications (timeout %v)", tt.logPrefix(), modifiedKey, timeout) if err := tt.compact(revToCompact, timeout); err != nil { plog.Warningf("%s functional-tester compact got error (%v)", tt.logPrefix(), err) if tt.cleanup() != nil { @@ -92,7 +92,7 @@ func (tt *tester) runLoop() { } } - plog.Printf("%s functional-tester is finished", tt.logPrefix()) + plog.Infof("%s functional-tester is finished", tt.logPrefix()) } func (tt *tester) doRound(round int) error { @@ -103,26 +103,31 @@ func (tt *tester) doRound(round int) error { if err := tt.cluster.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } - plog.Printf("%s injecting failure %q", tt.logPrefix(), f.Desc()) + plog.Infof("%s injecting failure %q", tt.logPrefix(), f.Desc()) if err := f.Inject(tt.cluster, round); err != nil { return fmt.Errorf("injection error: %v", err) } - plog.Printf("%s injected failure", tt.logPrefix()) + plog.Infof("%s injected failure", tt.logPrefix()) - plog.Printf("%s recovering failure %q", tt.logPrefix(), f.Desc()) + plog.Infof("%s recovering failure %q", tt.logPrefix(), f.Desc()) if err := f.Recover(tt.cluster, round); err != nil { return fmt.Errorf("recovery error: %v", err) } - plog.Printf("%s recovered failure", tt.logPrefix()) + plog.Infof("%s recovered failure", tt.logPrefix()) tt.cancelStresser() - plog.Printf("%s wait until cluster is healthy", tt.logPrefix()) + plog.Infof("%s wait until cluster is healthy", tt.logPrefix()) if err := tt.cluster.WaitHealth(); err != nil { return fmt.Errorf("wait full health error: %v", err) } + plog.Infof("%s cluster is healthy", tt.logPrefix()) + + plog.Infof("%s checking consistency and invariant of cluster", tt.logPrefix()) if err := tt.checkConsistency(); err != nil { return fmt.Errorf("tt.checkConsistency error (%v)", err) } - plog.Printf("%s succeed!", tt.logPrefix()) + plog.Infof("%s checking consistency and invariant of cluster done", tt.logPrefix()) + + plog.Infof("%s succeed!", tt.logPrefix()) } return nil } @@ -134,7 +139,7 @@ func (tt *tester) updateRevision() error { break // just need get one of the current revisions } - plog.Printf("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision) + plog.Infof("%s updated current revision to %d", tt.logPrefix(), tt.currentRevision) return err } @@ -150,7 +155,7 @@ func (tt *tester) checkConsistency() (err error) { err = tt.startStresser() }() if err = tt.checker.Check(); err != nil { - plog.Printf("%s %v", tt.logPrefix(), err) + plog.Infof("%s %v", tt.logPrefix(), err) } return err } @@ -163,24 +168,24 @@ func (tt *tester) compact(rev int64, timeout time.Duration) (err error) { } }() - plog.Printf("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev) + plog.Infof("%s compacting storage (current revision %d, compact revision %d)", tt.logPrefix(), tt.currentRevision, rev) if err = tt.cluster.compactKV(rev, timeout); err != nil { return err } - plog.Printf("%s compacted storage (compact revision %d)", tt.logPrefix(), rev) + plog.Infof("%s compacted storage (compact revision %d)", tt.logPrefix(), rev) - plog.Printf("%s checking compaction (compact revision %d)", tt.logPrefix(), rev) + plog.Infof("%s checking compaction (compact revision %d)", tt.logPrefix(), rev) if err = tt.cluster.checkCompact(rev); err != nil { plog.Warningf("%s checkCompact error (%v)", tt.logPrefix(), err) return err } - plog.Printf("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev) + plog.Infof("%s confirmed compaction (compact revision %d)", tt.logPrefix(), rev) return nil } func (tt *tester) defrag() error { - plog.Printf("%s defragmenting...", tt.logPrefix()) + plog.Infof("%s defragmenting...", tt.logPrefix()) if err := tt.cluster.defrag(); err != nil { plog.Warningf("%s defrag error (%v)", tt.logPrefix(), err) if cerr := tt.cleanup(); cerr != nil { @@ -188,7 +193,7 @@ func (tt *tester) defrag() error { } return err } - plog.Printf("%s defragmented...", tt.logPrefix()) + plog.Infof("%s defragmented...", tt.logPrefix()) return nil } @@ -225,15 +230,15 @@ func (tt *tester) cleanup() error { } func (tt *tester) cancelStresser() { - plog.Printf("%s canceling the stressers...", tt.logPrefix()) + plog.Infof("%s canceling the stressers...", tt.logPrefix()) tt.stresser.Cancel() - plog.Printf("%s canceled stressers", tt.logPrefix()) + plog.Infof("%s canceled stressers", tt.logPrefix()) } func (tt *tester) startStresser() (err error) { - plog.Printf("%s starting the stressers...", tt.logPrefix()) + plog.Infof("%s starting the stressers...", tt.logPrefix()) err = tt.stresser.Stress() - plog.Printf("%s started stressers", tt.logPrefix()) + plog.Infof("%s started stressers", tt.logPrefix()) return err }