diff --git a/storage/kvstore.go b/storage/kvstore.go index 9399f8f80..83f2b135b 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -213,7 +213,7 @@ func (s *store) Restore() error { _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) if len(scheduledCompactBytes) != 0 { - scheduledCompact := bytesToRev(finishedCompactBytes[0]).main + scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main if scheduledCompact > s.compactMainRev { log.Printf("storage: resume scheduled compaction at %d", scheduledCompact) go s.Compact(scheduledCompact) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index a131620fa..60495f6d2 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -7,6 +7,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/coreos/etcd/storage/storagepb" ) @@ -389,8 +390,6 @@ func TestCompaction(t *testing.T) { } } -// TODO: test more complicated cases: -// with unfinished compaction func TestRestore(t *testing.T) { s0 := newStore("test") defer os.Remove("test") @@ -433,6 +432,46 @@ func TestRestore(t *testing.T) { } } +func TestRestoreContinueUnfinishedCompaction(t *testing.T) { + s0 := newStore("test") + defer os.Remove("test") + + s0.Put([]byte("foo"), []byte("bar")) + s0.Put([]byte("foo"), []byte("bar1")) + s0.Put([]byte("foo"), []byte("bar2")) + + // write scheduled compaction, but not do compaction + rbytes := newRevBytes() + revToBytes(reversion{main: 2}, rbytes) + tx := s0.b.BatchTx() + tx.Lock() + tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) + tx.Unlock() + + s0.Close() + + s1 := newStore("test") + s1.Restore() + + // wait for scheduled compaction to be finished + time.Sleep(100 * time.Millisecond) + + if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted { + t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) + } + // check the key in backend is deleted + revbytes := newRevBytes() + // TODO: compact should delete main=2 key too + revToBytes(reversion{main: 1}, revbytes) + tx = s1.b.BatchTx() + tx.Lock() + ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + if len(ks) != 0 { + t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + } + tx.Unlock() +} + func BenchmarkStorePut(b *testing.B) { s := newStore("test") defer os.Remove("test")