test: fix TestRestoreContinueUnfinishedCompaction

The original testcase uses `return` statement which skips `restore`
case. It's aimed to enable `restore` testcase.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu 2023-03-16 22:19:21 +08:00
parent d200f72d2d
commit 830d9e9eaa

View File

@ -482,58 +482,63 @@ func TestRestoreDelete(t *testing.T) {
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
tests := []string{"recreate", "restore"}
for _, test := range tests {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
test := test
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
t.Run(test, func(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
UnsafeSetScheduledCompact(tx, 2)
tx.Unlock()
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
s0.Close()
var s *store
switch test {
case "recreate":
s = NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
case "restore":
s0.Restore(b)
s = s0
}
// wait for scheduled compaction to be finished
time.Sleep(100 * time.Millisecond)
if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)
// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
for i := 0; i < 5; i++ {
tx := s.b.BatchTx()
// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0)
UnsafeSetScheduledCompact(tx, 2)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
continue
var s *store
switch test {
case "recreate":
s0.Close()
s = NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
case "restore":
// TODO(fuweid): store doesn't support to restore
// from a closed status because there is no lock
// for `Close` or action to mark it is closed.
s0.Restore(b)
s = s0
}
// FIXME(fuweid): it doesn't test restore one?
return
}
cleanup(s, b, tmpPath)
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
defer cleanup(s, b, tmpPath)
// wait for scheduled compaction to be finished
time.Sleep(100 * time.Millisecond)
if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)
// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
for i := 0; i < 5; i++ {
tx := s.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
continue
}
return
}
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
})
}
}