diff --git a/server/storage/mvcc/hash_test.go b/server/storage/mvcc/hash_test.go index 2c7a35f9a..f0d516276 100644 --- a/server/storage/mvcc/hash_test.go +++ b/server/storage/mvcc/hash_test.go @@ -32,8 +32,9 @@ import ( // output which would have catastrophic consequences. Expected output is just // hardcoded, so please regenerate it every time you change input parameters. func TestHashByRevValue(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -72,8 +73,9 @@ func TestHashByRevValue(t *testing.T) { } func TestHashByRevValueLastRevision(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -131,8 +133,9 @@ func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { // TestCompactionHash tests compaction hash // TODO: Change this to fuzz test func TestCompactionHash(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) testutil.TestCompactionHash(context.Background(), t, hashTestCase{s}, s.cfg.CompactionBatchLimit) } @@ -176,6 +179,8 @@ func (tc hashTestCase) Compact(ctx context.Context, rev int64) error { func TestHasherStore(t *testing.T) { lg := zaptest.NewLogger(t) s := newHashStorage(lg, newFakeStore(lg)) + defer s.store.Close() + var hashes []KeyValueHash for i := 0; i < hashStorageMaxSize; i++ { hash := KeyValueHash{Hash: uint32(i), Revision: int64(i) + 10, CompactRevision: int64(i) + 100} @@ -203,6 +208,8 @@ func TestHasherStore(t *testing.T) { func TestHasherStoreFull(t *testing.T) { lg := zaptest.NewLogger(t) s := newHashStorage(lg, newFakeStore(lg)) + defer s.store.Close() + var minRevision int64 = 100 var maxRevision = minRevision + hashStorageMaxSize for i := 0; i < hashStorageMaxSize; i++ { diff --git a/server/storage/mvcc/kvstore_compaction_test.go b/server/storage/mvcc/kvstore_compaction_test.go index 2f8fac83c..b9b782273 100644 --- a/server/storage/mvcc/kvstore_compaction_test.go +++ b/server/storage/mvcc/kvstore_compaction_test.go @@ -110,7 +110,10 @@ func TestScheduleCompaction(t *testing.T) { func TestCompactAllAndRestore(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer func() { + b.Close() + os.Remove(tmpPath) + }() s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) @@ -143,4 +146,8 @@ func TestCompactAllAndRestore(t *testing.T) { if err != nil { t.Errorf("unexpect range error %v", err) } + err = s1.Close() + if err != nil { + t.Fatal(err) + } } diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index c755827ce..eb7c6acbc 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -22,7 +22,6 @@ import ( "fmt" "math" mrand "math/rand" - "os" "reflect" "sort" "strconv" @@ -324,6 +323,7 @@ func TestStoreDeleteRange(t *testing.T) { if s.currentRev != tt.wrev.main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } + s.Close() } } @@ -370,6 +370,7 @@ func TestStoreRestore(t *testing.T) { s := newFakeStore(lg) b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) + defer s.Close() putkey := newTestKeyBytes(lg, revision{3, 0}, false) putkv := mvccpb.KeyValue{ @@ -435,6 +436,7 @@ func TestRestoreDelete(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer b.Close() keys := make(map[string]struct{}) for i := 0; i < 20; i++ { @@ -480,7 +482,7 @@ func TestRestoreDelete(t *testing.T) { func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tests := []string{"recreate", "restore"} for _, test := range tests { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -527,9 +529,10 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { time.Sleep(100 * time.Millisecond) continue } + // FIXME(fuweid): it doesn't test restore one? return } - + cleanup(s, b, tmpPath) t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) } } @@ -705,7 +708,7 @@ func TestTxnPut(t *testing.T) { func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) // write something to read later s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -774,9 +777,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { ) b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer b.Close() - defer s.Close() - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) var wg sync.WaitGroup wg.Add(numOfWrites) diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index a36c3ee14..b7d6868f8 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -17,7 +17,6 @@ package mvcc import ( "bytes" "fmt" - "os" "reflect" "sync" "testing" @@ -34,20 +33,16 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - w.Watch(0, testKey, nil, 0) + defer w.Close() + w.Watch(0, testKey, nil, 0) if !s.synced.contains(string(testKey)) { // the key must have had an entry in synced t.Errorf("existence = false, want true") @@ -57,18 +52,16 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - wt, _ := w.Watch(0, testKey, nil, 0) + defer w.Close() + wt, _ := w.Watch(0, testKey, nil, 0) if err := w.Cancel(wt); err != nil { t.Error(err) } @@ -94,12 +87,10 @@ func TestCancelUnsynced(t *testing.T) { // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) // Put a key so that we can spawn watchers on that key. // (testKey in this test). This increases the rev to 1, @@ -110,6 +101,7 @@ func TestCancelUnsynced(t *testing.T) { s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() + defer w.Close() // arbitrary number for watchers watcherN := 100 @@ -146,18 +138,17 @@ func TestSyncWatchers(t *testing.T) { store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() + defer w.Close() // arbitrary number for watchers watcherN := 100 @@ -227,11 +218,8 @@ func TestSyncWatchers(t *testing.T) { func TestWatchCompacted(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() testKey := []byte("foo") testValue := []byte("bar") @@ -246,8 +234,9 @@ func TestWatchCompacted(t *testing.T) { } w := s.NewWatchStream() - wt, _ := w.Watch(0, testKey, nil, compactRev-1) + defer w.Close() + wt, _ := w.Watch(0, testKey, nil, compactRev-1) select { case resp := <-w.Chan(): if resp.WatchID != wt { @@ -264,17 +253,14 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") w := s.NewWatchStream() + defer w.Close() + wrev := int64(10) w.Watch(0, testKey, nil, wrev) @@ -317,6 +303,8 @@ func TestWatchRestore(t *testing.T) { defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() + defer w.Close() + w.Watch(0, testKey, nil, rev-1) time.Sleep(delay) @@ -365,6 +353,8 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // create a watcher with a future revision // add to "synced" watcher group (startRev > s.store.currentRev) w1 := s1.NewWatchStream() + defer w1.Close() + w1.Watch(0, testKey, nil, startRev) // make "s2" ends up with a higher last revision @@ -407,8 +397,7 @@ func TestWatchBatchUnsynced(t *testing.T) { oldMaxRevs := watchBatchMaxRevs defer func() { watchBatchMaxRevs = oldMaxRevs - s.store.Close() - os.Remove(tmpPath) + cleanup(s, b, tmpPath) }() batches := 3 watchBatchMaxRevs = 4 @@ -419,6 +408,8 @@ func TestWatchBatchUnsynced(t *testing.T) { } w := s.NewWatchStream() + defer w.Close() + w.Watch(0, v, nil, 1) for i := 0; i < batches; i++ { if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs { @@ -539,9 +530,7 @@ func TestWatchVictims(t *testing.T) { s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) + cleanup(s, b, tmpPath) chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync }() @@ -616,12 +605,7 @@ func TestWatchVictims(t *testing.T) { func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey, testValue := []byte("foo"), []byte("bar") var wg sync.WaitGroup