etcd/server/etcdserver/bootstrap_test.go
Wei Fu 32ee8b877a etcdserver: drain leaky goroutines before test completed
If pending changes aren't committed before test completed, it might cause
data race when we don't drain all the background goroutines.

```bash
$ cd server
$ go test -race -v -run TestApplyRepeat ./etcdserver
...
panic: Log in goroutine after TestApplyRepeat has completed: 2024-02-03T17:06:13.262+0800       DEBUG   bbolt   Committing transaction 2

goroutine 81 [running]:
testing.(*common).logDepth(0xc000502820, {0xc0001b0460, 0x41}, 0x3)
        /usr/local/go/src/testing/testing.go:1022 +0x6d4
testing.(*common).log(...)
        /usr/local/go/src/testing/testing.go:1004
testing.(*common).Logf(0xc000502820, {0x1421ad7, 0x2}, {0xc000603520, 0x1, 0x1})
        /usr/local/go/src/testing/testing.go:1055 +0xa5
go.uber.org/zap/zaptest.testingWriter.Write({{0x15f1f90?, 0xc000502820?}, 0xda?}, {0xc000119800, 0x42, 0x400})
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zaptest/logger.go:130 +0x11e
go.uber.org/zap/zapcore.(*ioCore).Write(0xc0000b55c0, {0xff, {0xc1679e614f9fd7a4, 0x73a3657, 0x1cc2400}, {0x1422b2d, 0x5}, {0xc0001a0330, 0x18}, {0x0, ...}, ...}, ...)
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/core.go:99 +0x193
go.uber.org/zap/zapcore.(*CheckedEntry).Write(0xc000115930, {0x0, 0x0, 0x0})
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/entry.go:253 +0x2f0
go.uber.org/zap.(*SugaredLogger).log(0xc0001960f8, 0xff, {0x1437885, 0x19}, {0xc0006034e0, 0x1, 0x1}, {0x0, 0x0, 0x0})
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:316 +0x130
go.uber.org/zap.(*SugaredLogger).Debugf(...)
        /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:171
go.etcd.io/bbolt.(*Tx).Commit(0xc0001aa9a0)
        /home/fuwei/go/pkg/mod/go.etcd.io/bbolt@v1.4.0-alpha.0/tx.go:173 +0x206
go.etcd.io/etcd/server/v3/storage/backend.(*batchTx).commit(0xc00019b180, 0x0)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:269 +0xdf
go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).unsafeCommit(0xc00019b180, 0x0)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:378 +0x425
go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).commit(0xc00019b180, 0x80?)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:355 +0x78
go.etcd.io/etcd/server/v3/storage/backend.(*batchTxBuffered).Commit(0xc00019b180)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/batch_tx.go:342 +0x35
go.etcd.io/etcd/server/v3/storage/backend.(*backend).run(0xc000478180)
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/backend.go:426 +0x2c7
created by go.etcd.io/etcd/server/v3/storage/backend.newBackend in goroutine 80
        /home/fuwei/go/src/go.etcd.io/etcd/server/storage/backend/backend.go:227 +0xbfd
FAIL    go.etcd.io/etcd/server/v3/etcdserver    0.129s
FAIL
```

This patch also drains goroutines related to raftNode and watch store.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
2024-02-03 18:58:17 +08:00

317 lines
9.3 KiB
Go

// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package version implements etcd version parsing and contains latest version
// information.
package etcdserver
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"testing"
"go.uber.org/zap/zaptest"
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
)
func TestBootstrapExistingClusterNoWALMaxLearner(t *testing.T) {
tests := []struct {
name string
members []etcdserverpb.Member
maxLearner int
hasError bool
expectedError error
}{
{
name: "bootstrap success: maxLearner gt learner count",
members: []etcdserverpb.Member{
{ID: 4512484362714696085, PeerURLs: []string{"http://localhost:2380"}},
{ID: 5321713336100798248, PeerURLs: []string{"http://localhost:2381"}},
{ID: 5670219998796287055, PeerURLs: []string{"http://localhost:2382"}},
},
maxLearner: 1,
hasError: false,
expectedError: nil,
},
{
name: "bootstrap success: maxLearner eq learner count",
members: []etcdserverpb.Member{
{ID: 4512484362714696085, PeerURLs: []string{"http://localhost:2380"}, IsLearner: true},
{ID: 5321713336100798248, PeerURLs: []string{"http://localhost:2381"}},
{ID: 5670219998796287055, PeerURLs: []string{"http://localhost:2382"}, IsLearner: true},
},
maxLearner: 2,
hasError: false,
expectedError: nil,
},
{
name: "bootstrap fail: maxLearner lt learner count",
members: []etcdserverpb.Member{
{ID: 4512484362714696085, PeerURLs: []string{"http://localhost:2380"}},
{ID: 5321713336100798248, PeerURLs: []string{"http://localhost:2381"}, IsLearner: true},
{ID: 5670219998796287055, PeerURLs: []string{"http://localhost:2382"}, IsLearner: true},
},
maxLearner: 1,
hasError: true,
expectedError: membership.ErrTooManyLearners,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cluster, err := types.NewURLsMap("node0=http://localhost:2380,node1=http://localhost:2381,node2=http://localhost:2382")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cfg := config.ServerConfig{
Name: "node0",
InitialPeerURLsMap: cluster,
Logger: zaptest.NewLogger(t),
ExperimentalMaxLearners: tt.maxLearner,
}
_, err = bootstrapExistingClusterNoWAL(cfg, mockBootstrapRoundTrip(tt.members))
hasError := err != nil
if hasError != tt.hasError {
t.Errorf("expected error: %v got: %v", tt.hasError, err)
}
if hasError && !strings.Contains(err.Error(), tt.expectedError.Error()) {
t.Fatalf("expected error to contain: %q, got: %q", tt.expectedError.Error(), err.Error())
}
})
}
}
type roundTripFunc func(r *http.Request) (*http.Response, error)
func (s roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return s(r)
}
func mockBootstrapRoundTrip(members []etcdserverpb.Member) roundTripFunc {
return func(r *http.Request) (*http.Response, error) {
switch {
case strings.Contains(r.URL.String(), "/members"):
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(mockMembersJSON(members))),
Header: http.Header{"X-Etcd-Cluster-Id": []string{"f4588138892a16b0"}},
}, nil
case strings.Contains(r.URL.String(), "/version"):
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(mockVersionJSON())),
}, nil
case strings.Contains(r.URL.String(), DowngradeEnabledPath):
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(`true`)),
}, nil
}
return nil, nil
}
}
func mockVersionJSON() string {
v := version.Versions{Server: "3.7.0", Cluster: "3.7.0"}
version, _ := json.Marshal(v)
return string(version)
}
func mockMembersJSON(m []etcdserverpb.Member) string {
members, _ := json.Marshal(m)
return string(members)
}
func TestBootstrapBackend(t *testing.T) {
tests := []struct {
name string
prepareData func(config.ServerConfig) error
expectedConsistentIdx uint64
expectedError error
}{
{
name: "bootstrap backend success: no data files",
prepareData: nil,
expectedConsistentIdx: 0,
expectedError: nil,
},
{
name: "bootstrap backend success: have data files and snapshot db file",
prepareData: prepareData,
expectedConsistentIdx: 5,
expectedError: nil,
},
// TODO(ahrtr): add more test cases
// https://github.com/etcd-io/etcd/issues/13507
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dataDir, err := createDataDir(t)
if err != nil {
t.Fatalf("Failed to create the data dir, unexpected error: %v", err)
}
cfg := config.ServerConfig{
Name: "demoNode",
DataDir: dataDir,
BackendFreelistType: bolt.FreelistArrayType,
Logger: zaptest.NewLogger(t),
}
if tt.prepareData != nil {
if err = tt.prepareData(cfg); err != nil {
t.Fatalf("failed to prepare data, unexpected error: %v", err)
}
}
haveWAL := wal.Exist(cfg.WALDir())
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
ss := snap.New(cfg.Logger, cfg.SnapDir())
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
defer t.Cleanup(func() {
backend.Close()
})
hasError := err != nil
expectedHasError := tt.expectedError != nil
if hasError != expectedHasError {
t.Errorf("expected error: %v got: %v", expectedHasError, err)
}
if hasError && !strings.Contains(err.Error(), tt.expectedError.Error()) {
t.Fatalf("expected error to contain: %q, got: %q", tt.expectedError.Error(), err.Error())
}
if backend.ci.ConsistentIndex() != tt.expectedConsistentIdx {
t.Errorf("expected consistent index: %d, got: %d", tt.expectedConsistentIdx, backend.ci.ConsistentIndex())
}
})
}
}
func createDataDir(t *testing.T) (string, error) {
var err error
// create the temporary data dir
dataDir := t.TempDir()
// create ${dataDir}/member/snap
if err = os.MkdirAll(datadir.ToSnapDir(dataDir), 0700); err != nil {
return "", err
}
// create ${dataDir}/member/wal
err = os.MkdirAll(datadir.ToWalDir(dataDir), 0700)
if err != nil {
return "", err
}
return dataDir, nil
}
// prepare data for the test case
func prepareData(cfg config.ServerConfig) error {
var snapshotTerm, snapshotIndex uint64 = 2, 5
if err := createWALFileWithSnapshotRecord(cfg, snapshotTerm, snapshotIndex); err != nil {
return err
}
return createSnapshotAndBackendDB(cfg, snapshotTerm, snapshotIndex)
}
func createWALFileWithSnapshotRecord(cfg config.ServerConfig, snapshotTerm, snapshotIndex uint64) (err error) {
var w *wal.WAL
if w, err = wal.Create(cfg.Logger, cfg.WALDir(), []byte("somedata")); err != nil {
return err
}
defer func() {
err = w.Close()
}()
walSnap := walpb.Snapshot{
Index: snapshotIndex,
Term: snapshotTerm,
ConfState: &raftpb.ConfState{
Voters: []uint64{0x00ffca74},
AutoLeave: false,
},
}
if err = w.SaveSnapshot(walSnap); err != nil {
return err
}
return w.Save(raftpb.HardState{Term: snapshotTerm, Vote: 3, Commit: snapshotIndex}, nil)
}
func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotIndex uint64) error {
var err error
confState := raftpb.ConfState{
Voters: []uint64{1, 2, 3},
}
// create snapshot file
ss := snap.New(cfg.Logger, cfg.SnapDir())
if err = ss.SaveSnap(raftpb.Snapshot{
Data: []byte("{}"),
Metadata: raftpb.SnapshotMetadata{
ConfState: confState,
Index: snapshotIndex,
Term: snapshotTerm,
},
}); err != nil {
return err
}
// create snapshot db file: "%016x.snap.db"
be := serverstorage.OpenBackend(cfg, nil)
schema.CreateMetaBucket(be.BatchTx())
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm)
schema.MustUnsafeSaveConfStateToBackend(cfg.Logger, be.BatchTx(), &confState)
if err = be.Close(); err != nil {
return err
}
sdb := filepath.Join(cfg.SnapDir(), fmt.Sprintf("%016x.snap.db", snapshotIndex))
if err = os.Rename(cfg.BackendPath(), sdb); err != nil {
return err
}
// create backend db file
be = serverstorage.OpenBackend(cfg, nil)
schema.CreateMetaBucket(be.BatchTx())
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1)
return be.Close()
}