diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index d12c754e0..36a8ff4ca 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -322,6 +322,11 @@ func (c *cluster) compactKV(rev int64) error { conn *grpc.ClientConn err error ) + + if rev <= 0 { + return nil + } + for _, u := range c.GRPCURLs { conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) if err != nil { @@ -329,7 +334,7 @@ func (c *cluster) compactKV(rev int64) error { } kvc := pb.NewKVClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev}) + _, err = kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}) cancel() conn.Close() if err == nil { @@ -338,3 +343,33 @@ func (c *cluster) compactKV(rev int64) error { } return err } + +func (c *cluster) checkCompact(rev int64) error { + if rev == 0 { + return nil + } + for _, u := range c.GRPCURLs { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{u}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1)) + wr, ok := <-wch + cancel() + + cli.Close() + + if !ok { + return fmt.Errorf("watch channel terminated") + } + if wr.CompactRevision != rev { + return fmt.Errorf("got compact revision %v, wanted %v", wr.CompactRevision, rev) + } + } + return nil +} diff --git a/tools/functional-tester/etcd-tester/tester.go b/tools/functional-tester/etcd-tester/tester.go index 2284724f9..79d916798 100644 --- a/tools/functional-tester/etcd-tester/tester.go +++ b/tools/functional-tester/etcd-tester/tester.go @@ -149,8 +149,15 @@ func (tt *tester) runLoop() { } plog.Printf("[round#%d] compacted storage", i) - // TODO: make sure compaction is finished - time.Sleep(30 * time.Second) + plog.Printf("[round#%d] check compaction at %d", i, revToCompact) + if err := tt.cluster.checkCompact(revToCompact); err != nil { + plog.Printf("[round#%d] checkCompact error (%v)", i, err) + if err := tt.cleanup(i, 0); err != nil { + plog.Printf("[round#%d] cleanup error: %v", i, err) + return + } + } + plog.Printf("[round#%d] confirmed compaction at %d", i, revToCompact) } }