From 26a09d84794dcf9b720289d4fd458d0a65ab12c9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 18 Jun 2015 09:34:51 -0700 Subject: [PATCH 1/3] storage: enhance TestRestore and kill TODO --- storage/kvstore_test.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 25ee47ac5..a131620fa 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "crypto/rand" + "math" "os" "reflect" "testing" @@ -390,7 +391,6 @@ func TestCompaction(t *testing.T) { // TODO: test more complicated cases: // with unfinished compaction -// with removed keys func TestRestore(t *testing.T) { s0 := newStore("test") defer os.Remove("test") @@ -402,6 +402,19 @@ func TestRestore(t *testing.T) { s0.Put([]byte("foo1"), []byte("bar12")) s0.Put([]byte("foo2"), []byte("bar13")) s0.Put([]byte("foo1"), []byte("bar14")) + s0.Put([]byte("foo3"), []byte("bar3")) + s0.DeleteRange([]byte("foo3"), nil) + s0.Put([]byte("foo3"), []byte("bar31")) + s0.DeleteRange([]byte("foo3"), nil) + + mink := newRevBytes() + revToBytes(reversion{main: 0, sub: 0}, mink) + maxk := newRevBytes() + revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, maxk) + s0kvs, _, err := s0.rangeKeys(mink, maxk, 0, 0) + if err != nil { + t.Fatalf("rangeKeys on s0 error (%v)", err) + } s0.Close() @@ -411,6 +424,13 @@ func TestRestore(t *testing.T) { if !s0.Equal(s1) { t.Errorf("not equal!") } + s1kvs, _, err := s1.rangeKeys(mink, maxk, 0, 0) + if err != nil { + t.Fatalf("rangeKeys on s1 error (%v)", err) + } + if !reflect.DeepEqual(s1kvs, s0kvs) { + t.Errorf("s1kvs = %+v, want %+v", s1kvs, s0kvs) + } } func BenchmarkStorePut(b *testing.B) { From 148394f66f1626c3c4119f90774a8eaa3d77b43d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 18 Jun 2015 10:38:15 -0700 Subject: [PATCH 2/3] storage: fix schedule compaction bug in recover process It uses wrong schedule compaction reversion before. --- storage/kvstore.go | 2 +- storage/kvstore_test.go | 43 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index 9399f8f80..83f2b135b 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -213,7 +213,7 @@ func (s *store) Restore() error { _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) if len(scheduledCompactBytes) != 0 { - scheduledCompact := bytesToRev(finishedCompactBytes[0]).main + scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main if scheduledCompact > s.compactMainRev { log.Printf("storage: resume scheduled compaction at %d", scheduledCompact) go s.Compact(scheduledCompact) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index a131620fa..60495f6d2 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -7,6 +7,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/coreos/etcd/storage/storagepb" ) @@ -389,8 +390,6 @@ func TestCompaction(t *testing.T) { } } -// TODO: test more complicated cases: -// with unfinished compaction func TestRestore(t *testing.T) { s0 := newStore("test") defer os.Remove("test") @@ -433,6 +432,46 @@ func TestRestore(t *testing.T) { } } +func TestRestoreContinueUnfinishedCompaction(t *testing.T) { + s0 := newStore("test") + defer os.Remove("test") + + s0.Put([]byte("foo"), []byte("bar")) + s0.Put([]byte("foo"), []byte("bar1")) + s0.Put([]byte("foo"), []byte("bar2")) + + // write scheduled compaction, but not do compaction + rbytes := newRevBytes() + revToBytes(reversion{main: 2}, rbytes) + tx := s0.b.BatchTx() + tx.Lock() + tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) + tx.Unlock() + + s0.Close() + + s1 := newStore("test") + s1.Restore() + + // wait for scheduled compaction to be finished + time.Sleep(100 * time.Millisecond) + + if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted { + t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) + } + // check the key in backend is deleted + revbytes := newRevBytes() + // TODO: compact should delete main=2 key too + revToBytes(reversion{main: 1}, revbytes) + tx = s1.b.BatchTx() + tx.Lock() + ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + if len(ks) != 0 { + t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + } + tx.Unlock() +} + func BenchmarkStorePut(b *testing.B) { s := newStore("test") defer os.Remove("test") From 7cba42fb733bb058d5c25e9a27ed345d9cc9f8e1 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 18 Jun 2015 11:05:31 -0700 Subject: [PATCH 3/3] storage: wait for compact goroutine to exit before close backend If backend is closed, the operations on backend in compact goroutine will panic. So this PR waits for compact goroutine to exit before close backend. This fixes the TestWorkflow failure too. --- storage/kv_test.go | 3 +++ storage/kvstore.go | 8 ++++++++ storage/kvstore_compaction.go | 7 ++++++- 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/storage/kv_test.go b/storage/kv_test.go index 45c65389e..b39934d2f 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -44,6 +44,9 @@ func TestWorkflow(t *testing.T) { if err != nil { t.Errorf("#%d: range error (%v)", err) } + if len(kvs) != len(wkvs) { + t.Fatalf("#%d: len(kvs) = %d, want %d", i, len(kvs), len(wkvs)) + } for j, kv := range kvs { if !reflect.DeepEqual(kv.Key, wkvs[j].k) { t.Errorf("#%d: keys[%d] = %s, want %s", i, j, kv.Key, wkvs[j].k) diff --git a/storage/kvstore.go b/storage/kvstore.go index 83f2b135b..2c457482b 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -39,6 +39,9 @@ type store struct { tmu sync.Mutex // protect the tnxID field tnxID int64 // tracks the current tnxID to verify tnx operations + + wg sync.WaitGroup + stopc chan struct{} } func newStore(path string) *store { @@ -47,6 +50,7 @@ func newStore(path string) *store { kvindex: newTreeIndex(), currentRev: reversion{}, compactMainRev: -1, + stopc: make(chan struct{}), } tx := s.b.BatchTx() @@ -161,6 +165,7 @@ func (s *store) Compact(rev int64) error { keep := s.kvindex.Compact(rev) + s.wg.Add(1) go s.scheduleCompaction(rev, keep) return nil } @@ -226,6 +231,9 @@ func (s *store) Restore() error { } func (s *store) Close() error { + close(s.stopc) + s.wg.Wait() + s.b.ForceCommit() return s.b.Close() } diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go index 466f58321..b6b1ba80f 100644 --- a/storage/kvstore_compaction.go +++ b/storage/kvstore_compaction.go @@ -6,6 +6,7 @@ import ( ) func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) { + defer s.wg.Done() end := make([]byte, 8) binary.BigEndian.PutUint64(end, uint64(compactMainRev+1)) @@ -37,6 +38,10 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]stru revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last) tx.Unlock() - time.Sleep(100 * time.Millisecond) + select { + case <-time.After(100 * time.Millisecond): + case <-s.stopc: + return + } } }