diff --git a/storage/kv_test.go b/storage/kv_test.go index 45c65389e..b39934d2f 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -44,6 +44,9 @@ func TestWorkflow(t *testing.T) { if err != nil { t.Errorf("#%d: range error (%v)", err) } + if len(kvs) != len(wkvs) { + t.Fatalf("#%d: len(kvs) = %d, want %d", i, len(kvs), len(wkvs)) + } for j, kv := range kvs { if !reflect.DeepEqual(kv.Key, wkvs[j].k) { t.Errorf("#%d: keys[%d] = %s, want %s", i, j, kv.Key, wkvs[j].k) diff --git a/storage/kvstore.go b/storage/kvstore.go index 9399f8f80..2c457482b 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -39,6 +39,9 @@ type store struct { tmu sync.Mutex // protect the tnxID field tnxID int64 // tracks the current tnxID to verify tnx operations + + wg sync.WaitGroup + stopc chan struct{} } func newStore(path string) *store { @@ -47,6 +50,7 @@ func newStore(path string) *store { kvindex: newTreeIndex(), currentRev: reversion{}, compactMainRev: -1, + stopc: make(chan struct{}), } tx := s.b.BatchTx() @@ -161,6 +165,7 @@ func (s *store) Compact(rev int64) error { keep := s.kvindex.Compact(rev) + s.wg.Add(1) go s.scheduleCompaction(rev, keep) return nil } @@ -213,7 +218,7 @@ func (s *store) Restore() error { _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) if len(scheduledCompactBytes) != 0 { - scheduledCompact := bytesToRev(finishedCompactBytes[0]).main + scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main if scheduledCompact > s.compactMainRev { log.Printf("storage: resume scheduled compaction at %d", scheduledCompact) go s.Compact(scheduledCompact) @@ -226,6 +231,9 @@ func (s *store) Restore() error { } func (s *store) Close() error { + close(s.stopc) + s.wg.Wait() + s.b.ForceCommit() return s.b.Close() } diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go index 466f58321..b6b1ba80f 100644 --- a/storage/kvstore_compaction.go +++ b/storage/kvstore_compaction.go @@ -6,6 +6,7 @@ import ( ) func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) { + defer s.wg.Done() end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) @@ -37,6 +38,10 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]stru revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last) tx.Unlock() - time.Sleep(100 * time.Millisecond) + select { + case <-time.After(100 * time.Millisecond): + case <-s.stopc: + return + } } } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 25ee47ac5..60495f6d2 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -3,9 +3,11 @@ package storage import ( "bytes" "crypto/rand" + "math" "os" "reflect" "testing" + "time" "github.com/coreos/etcd/storage/storagepb" ) @@ -388,9 +390,6 @@ func TestCompaction(t *testing.T) { } } -// TODO: test more complicated cases: -// with unfinished compaction -// with removed keys func TestRestore(t *testing.T) { s0 := newStore("test") defer os.Remove("test") @@ -402,6 +401,19 @@ func TestRestore(t *testing.T) { s0.Put([]byte("foo1"), []byte("bar12")) s0.Put([]byte("foo2"), []byte("bar13")) s0.Put([]byte("foo1"), []byte("bar14")) + s0.Put([]byte("foo3"), []byte("bar3")) + s0.DeleteRange([]byte("foo3"), nil) + s0.Put([]byte("foo3"), []byte("bar31")) + s0.DeleteRange([]byte("foo3"), nil) + + mink := newRevBytes() + revToBytes(reversion{main: 0, sub: 0}, mink) + maxk := newRevBytes() + revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, maxk) + s0kvs, _, err := s0.rangeKeys(mink, maxk, 0, 0) + if err != nil { + t.Fatalf("rangeKeys on s0 error (%v)", err) + } s0.Close() @@ -411,6 +423,53 @@ func TestRestore(t *testing.T) { if !s0.Equal(s1) { t.Errorf("not equal!") } + s1kvs, _, err := s1.rangeKeys(mink, maxk, 0, 0) + if err != nil { + t.Fatalf("rangeKeys on s1 error (%v)", err) + } + if !reflect.DeepEqual(s1kvs, s0kvs) { + t.Errorf("s1kvs = %+v, want %+v", s1kvs, s0kvs) + } +} + +func TestRestoreContinueUnfinishedCompaction(t *testing.T) { + s0 := newStore("test") + defer os.Remove("test") + + s0.Put([]byte("foo"), []byte("bar")) + s0.Put([]byte("foo"), []byte("bar1")) + s0.Put([]byte("foo"), []byte("bar2")) + + // write scheduled compaction, but not do compaction + rbytes := newRevBytes() + revToBytes(reversion{main: 2}, rbytes) + tx := s0.b.BatchTx() + tx.Lock() + tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) + tx.Unlock() + + s0.Close() + + s1 := newStore("test") + s1.Restore() + + // wait for scheduled compaction to be finished + time.Sleep(100 * time.Millisecond) + + if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted { + t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) + } + // check the key in backend is deleted + revbytes := newRevBytes() + // TODO: compact should delete main=2 key too + revToBytes(reversion{main: 1}, revbytes) + tx = s1.b.BatchTx() + tx.Lock() + ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + if len(ks) != 0 { + t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + } + tx.Unlock() } func BenchmarkStorePut(b *testing.B) {