From d72bcdc1561fd798821e915f314e89359dc22d0b Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 13 Apr 2016 19:59:26 -0700 Subject: [PATCH] storage: have Range on rev=0 work even if compacted to current revision --- etcdserver/etcdserverpb/rpc.proto | 6 ++---- integration/v3_grpc_test.go | 25 +++++++++++++++++++++++++ storage/kv.go | 1 + storage/kv_test.go | 4 +++- storage/kvstore.go | 2 +- storage/kvstore_test.go | 3 +-- 6 files changed, 33 insertions(+), 8 deletions(-) diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index f1a5ed96c..6f2b73d18 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -290,10 +290,8 @@ message TxnResponse { repeated ResponseUnion responses = 3; } -// Compaction compacts the kv store upto the given revision (including). -// It removes the old versions of a key. It keeps the newest version of -// the key even if its latest modification revision is smaller than the given -// revision. +// Compaction compacts the kv store upto a given revision. All superseded keys +// with a revision less than the compaction revision will be removed. message CompactionRequest { int64 revision = 1; // physical is set so the RPC will wait until the compaction is physically diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index dcd539bb0..6d8f285fb 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -74,6 +74,31 @@ func TestV3PutOverwrite(t *testing.T) { } } +// TestV3CompactCurrentRev ensures keys are present when compacting on current revision. +func TestV3CompactCurrentRev(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + kvc := toGRPC(clus.RandClient()).KV + preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + for i := 0; i < 3; i++ { + if _, err := kvc.Put(context.Background(), preq); err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + } + // compact on current revision + _, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4}) + if err != nil { + t.Fatalf("couldn't compact kv space (%v)", err) + } + // key still exists? + _, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatalf("couldn't get key after compaction (%v)", err) + } +} + func TestV3TxnTooManyOps(t *testing.T) { defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 3}) diff --git a/storage/kv.go b/storage/kv.go index 33d26e29e..0fb2377e7 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -67,6 +67,7 @@ type KV interface { TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) + // Compact frees all superseded keys with revisions less than rev. Compact(rev int64) (<-chan struct{}, error) // Hash retrieves the hash of KV state. diff --git a/storage/kv_test.go b/storage/kv_test.go index 5923ed6eb..0d6797260 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -194,9 +194,11 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) { rev int64 werr error }{ - {-1, ErrCompacted}, + {-1, nil}, // <= 0 is most recent store + {0, nil}, {1, ErrCompacted}, {2, ErrCompacted}, + {4, nil}, {5, ErrFutureRev}, {100, ErrFutureRev}, } diff --git a/storage/kvstore.go b/storage/kvstore.go index fc4beec7b..185ec11fb 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -425,7 +425,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage } else { rev = rangeRev } - if rev <= s.compactMainRev { + if rev < s.compactMainRev { return nil, 0, ErrCompacted } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 86ec99f7b..46aebbb06 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -440,12 +440,11 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { // wait for scheduled compaction to be finished time.Sleep(100 * time.Millisecond) - if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted { + if _, _, err := s1.Range([]byte("foo"), nil, 0, 1); err != ErrCompacted { t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) } // check the key in backend is deleted revbytes := newRevBytes() - // TODO: compact should delete main=2 key too revToBytes(revision{main: 1}, revbytes) // The disk compaction is done asynchronously and requires more time on slow disk.