From 32ee8b877a00f7c51f95056c72dc3768e88a5c34 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sat, 3 Feb 2024 17:06:55 +0800 Subject: [PATCH] etcdserver: drain leaky goroutines before test completed If pending changes aren't committed before test completed, it might cause data race when we don't drain all the background goroutines. ```bash $ cd server $ go test -race -v -run TestApplyRepeat ./etcdserver ... panic: Log in goroutine after TestApplyRepeat has completed: 2024-02-03T17:06:13.262+0800 DEBUG bbolt Committing transaction 2 goroutine 81 [running]: testing.(*common).logDepth(0xc000502820, {0xc0001b0460, 0x41}, 0x3) /usr/local/go/src/testing/testing.go:1022 +0x6d4 testing.(*common).log(...) /usr/local/go/src/testing/testing.go:1004 testing.(*common).Logf(0xc000502820, {0x1421ad7, 0x2}, {0xc000603520, 0x1, 0x1}) /usr/local/go/src/testing/testing.go:1055 +0xa5 go.uber.org/zap/zaptest.testingWriter.Write({{0x15f1f90?, 0xc000502820?}, 0xda?}, {0xc000119800, 0x42, 0x400}) /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zaptest/logger.go:130 +0x11e go.uber.org/zap/zapcore.(*ioCore).Write(0xc0000b55c0, {0xff, {0xc1679e614f9fd7a4, 0x73a3657, 0x1cc2400}, {0x1422b2d, 0x5}, {0xc0001a0330, 0x18}, {0x0, ...}, ...}, ...) /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/core.go:99 +0x193 go.uber.org/zap/zapcore.(*CheckedEntry).Write(0xc000115930, {0x0, 0x0, 0x0}) /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/entry.go:253 +0x2f0 go.uber.org/zap.(*SugaredLogger).log(0xc0001960f8, 0xff, {0x1437885, 0x19}, {0xc0006034e0, 0x1, 0x1}, {0x0, 0x0, 0x0}) /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:316 +0x130 go.uber.org/zap.(*SugaredLogger).Debugf(...) /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:171 go.etcd.io/bbolt.(*Tx).Commit(0xc0001aa9a0) /home/fuwei/go/pkg/mod/go.etcd.io/bbolt@v1.4.0-alpha.0/tx.go:173 +0x206 go.etcd.io/etcd/server/v3/storage/backend.(*batchTx).commit(0xc00019b180, 0x0) /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:269 +0xdf go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).unsafeCommit(0xc00019b180, 0x0) /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:378 +0x425 go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).commit(0xc00019b180, 0x80?) /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:355 +0x78 go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).Commit(0xc00019b180) /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:342 +0x35 go.etcd.io/etcd/server/v3/storage/backend.(*backend).run(0xc000478180) /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/backend.go:426 +0x2c7 created by go.etcd.io/etcd/server/v3/storage/backend.newBackend in goroutine 80 /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/backend.go:227 +0xbfd FAIL go.etcd.io/etcd/server/v3/etcdserver 0.129s FAIL ``` This patch also drains goroutines related to raftNode and watch store. Signed-off-by: Wei Fu --- server/etcdserver/bootstrap_test.go | 3 +++ server/etcdserver/corrupt_test.go | 3 +++ server/etcdserver/raft_test.go | 2 +- server/etcdserver/server_test.go | 13 +++++++++++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/server/etcdserver/bootstrap_test.go b/server/etcdserver/bootstrap_test.go index 66dc52439..724a1df6a 100644 --- a/server/etcdserver/bootstrap_test.go +++ b/server/etcdserver/bootstrap_test.go @@ -198,6 +198,9 @@ func TestBootstrapBackend(t *testing.T) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) ss := snap.New(cfg.Logger, cfg.SnapDir()) backend, err := bootstrapBackend(cfg, haveWAL, st, ss) + defer t.Cleanup(func() { + backend.Close() + }) hasError := err != nil expectedHasError := tt.expectedError != nil diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 3ddf22c83..d5a579618 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -517,6 +517,9 @@ func TestHashKVHandler(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + defer func() { + assert.NoError(t, etcdSrv.kv.Close()) + }() ph := &hashKVHandler{ lg: zap.NewNop(), server: etcdSrv, diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 079aa13e2..7845d24ba 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -207,7 +207,7 @@ func TestConfigChangeBlocksApply(t *testing.T) { updateLead: func(uint64) {}, updateLeadership: func(bool) {}, }) - defer srv.r.Stop() + defer srv.r.stop() n.readyc <- raft.Ready{ SoftState: &raft.SoftState{RaftState: raft.StateFollower}, diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 4b8a12f01..a7a7d9f3c 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -77,6 +77,7 @@ func TestApplyRepeat(t *testing.T) { st := v2store.New() cl.SetStore(v2store.New()) be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) cl.SetBackend(schema.NewMembershipBackend(lg, be)) cl.AddMember(&membership.Member{ID: 1234}, true) @@ -201,6 +202,7 @@ func TestV2SetClusterVersion(t *testing.T) { func TestApplyConfStateWithRestart(t *testing.T) { n := newNodeRecorder() srv := newServer(t, n) + defer srv.Cleanup() assert.Equal(t, srv.consistIndex.ConsistentIndex(), uint64(0)) @@ -476,6 +478,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { consistIndex: ci, beHooks: serverstorage.NewBackendHooks(lg, ci), } + defer srv.r.raftNodeConfig.Stop() // create EntryConfChange entry now := time.Now() @@ -547,6 +550,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { cl := membership.NewCluster(lg) cl.SetStore(v2store.New()) be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) cl.SetBackend(schema.NewMembershipBackend(lg, be)) for i := 1; i <= 5; i++ { @@ -596,6 +600,7 @@ func TestSnapshot(t *testing.T) { defer revertFunc() be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) s := raft.NewMemoryStorage() s.Append([]raftpb.Entry{{Index: 1}}) @@ -615,6 +620,9 @@ func TestSnapshot(t *testing.T) { consistIndex: cindex.NewConsistentIndex(be), } srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + defer func() { + assert.NoError(t, srv.kv.Close()) + }() srv.be = be cl := membership.NewCluster(zaptest.NewLogger(t)) @@ -921,6 +929,7 @@ func TestProcessIgnoreMismatchMessage(t *testing.T) { }, }, }) + defer r.raftNodeConfig.Stop() s := &EtcdServer{ lgMu: new(sync.RWMutex), lg: lg, @@ -961,6 +970,7 @@ func TestRemoveMember(t *testing.T) { st := v2store.New() cl.SetStore(v2store.New()) be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) cl.SetBackend(schema.NewMembershipBackend(lg, be)) cl.AddMember(&membership.Member{ID: 1234}, true) @@ -1060,6 +1070,7 @@ func TestPublishV3(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) lg := zaptest.NewLogger(t) be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: lg, @@ -1130,6 +1141,7 @@ func TestPublishV3Retry(t *testing.T) { lg := zaptest.NewLogger(t) be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: lg, @@ -1181,6 +1193,7 @@ func TestUpdateVersionV3(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) lg := zaptest.NewLogger(t) be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zaptest.NewLogger(t),