mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13500 from ahrtr/reset_ci_after_reload_db
Set the backend again after recovering v3 backend from snapshot
This commit is contained in:
commit
5b0bb07cb0
@ -38,6 +38,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
|
||||
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)
|
||||
- Fix [Lease checkpoints don't prevent to reset ttl on leader change](https://github.com/etcd-io/etcd/pull/13508).
|
||||
- Fix [assertion failed due to tx closed when recovering v3 backend from a snapshot db](https://github.com/etcd-io/etcd/pull/13500)
|
||||
|
||||
### tools/benchmark
|
||||
|
||||
|
@ -408,6 +408,10 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe
|
||||
if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
|
||||
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
||||
}
|
||||
// A snapshot db may have already been recovered, and the old db should have
|
||||
// already been closed in this case, so we should set the backend again.
|
||||
ci.SetBackend(be)
|
||||
|
||||
s1, s2 := be.Size(), be.SizeInUse()
|
||||
cfg.Logger.Info(
|
||||
"recovered v3 backend from snapshot",
|
||||
|
@ -19,16 +19,28 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"go.etcd.io/etcd/server/v3/storage/datadir"
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
"go.etcd.io/etcd/server/v3/storage/wal"
|
||||
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/server/v3/config"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
serverstorage "go.etcd.io/etcd/server/v3/storage"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -138,3 +150,156 @@ func mockMembersJSON(m []etcdserverpb.Member) string {
|
||||
members, _ := json.Marshal(m)
|
||||
return string(members)
|
||||
}
|
||||
|
||||
func TestBootstrapBackend(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
prepareData func(config.ServerConfig) error
|
||||
expectedConsistentIdx uint64
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "bootstrap backend success: no data files",
|
||||
prepareData: nil,
|
||||
expectedConsistentIdx: 0,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "bootstrap backend success: have data files and snapshot db file",
|
||||
prepareData: prepareData,
|
||||
expectedConsistentIdx: 5,
|
||||
expectedError: nil,
|
||||
},
|
||||
// TODO(ahrtr): add more test cases
|
||||
// https://github.com/etcd-io/etcd/issues/13507
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
dataDir, err := createDataDir(t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create the data dir, unexpected error: %v", err)
|
||||
}
|
||||
|
||||
cfg := config.ServerConfig{
|
||||
Name: "demoNode",
|
||||
DataDir: dataDir,
|
||||
BackendFreelistType: bolt.FreelistArrayType,
|
||||
Logger: zap.NewExample(),
|
||||
}
|
||||
|
||||
if tt.prepareData != nil {
|
||||
if err := tt.prepareData(cfg); err != nil {
|
||||
t.Fatalf("failed to prepare data, unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||
ss := snap.New(cfg.Logger, cfg.SnapDir())
|
||||
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
|
||||
|
||||
hasError := err != nil
|
||||
expectedHasError := tt.expectedError != nil
|
||||
if hasError != expectedHasError {
|
||||
t.Errorf("expected error: %v got: %v", expectedHasError, err)
|
||||
}
|
||||
if hasError && !strings.Contains(err.Error(), tt.expectedError.Error()) {
|
||||
t.Fatalf("expected error to contain: %q, got: %q", tt.expectedError.Error(), err.Error())
|
||||
}
|
||||
|
||||
if backend.ci.ConsistentIndex() != tt.expectedConsistentIdx {
|
||||
t.Errorf("expected consistent index: %d, got: %d", tt.expectedConsistentIdx, backend.ci.ConsistentIndex())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func createDataDir(t *testing.T) (dataDir string, err error) {
|
||||
// create the temporary data dir
|
||||
dataDir = t.TempDir()
|
||||
|
||||
// create ${dataDir}/member/snap
|
||||
if err = os.MkdirAll(datadir.ToSnapDir(dataDir), 0700); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// create ${dataDir}/member/wal
|
||||
err = os.MkdirAll(datadir.ToWalDir(dataDir), 0700)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// prepare data for the test case
|
||||
func prepareData(cfg config.ServerConfig) (err error) {
|
||||
var snapshotTerm, snapshotIndex uint64 = 2, 5
|
||||
|
||||
if err = createWALFileWithSnapshotRecord(cfg, snapshotTerm, snapshotIndex); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return createSnapshotAndBackendDB(cfg, snapshotTerm, snapshotIndex)
|
||||
}
|
||||
|
||||
func createWALFileWithSnapshotRecord(cfg config.ServerConfig, snapshotTerm, snapshotIndex uint64) (err error) {
|
||||
var w *wal.WAL
|
||||
if w, err = wal.Create(cfg.Logger, cfg.WALDir(), []byte("somedata")); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = w.Close()
|
||||
}()
|
||||
|
||||
walSnap := walpb.Snapshot{
|
||||
Index: snapshotIndex,
|
||||
Term: snapshotTerm,
|
||||
ConfState: &raftpb.ConfState{
|
||||
Voters: []uint64{0x00ffca74},
|
||||
AutoLeave: false,
|
||||
},
|
||||
}
|
||||
|
||||
if err = w.SaveSnapshot(walSnap); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return w.Save(raftpb.HardState{Term: snapshotTerm, Vote: 3, Commit: snapshotIndex}, nil)
|
||||
}
|
||||
|
||||
func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotIndex uint64) (err error) {
|
||||
confState := raftpb.ConfState{
|
||||
Voters: []uint64{1, 2, 3},
|
||||
}
|
||||
|
||||
// create snapshot file
|
||||
ss := snap.New(cfg.Logger, cfg.SnapDir())
|
||||
if err = ss.SaveSnap(raftpb.Snapshot{
|
||||
Data: []byte("{}"),
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
ConfState: confState,
|
||||
Index: snapshotIndex,
|
||||
Term: snapshotTerm,
|
||||
},
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// create snapshot db file: "%016x.snap.db"
|
||||
be := serverstorage.OpenBackend(cfg, nil)
|
||||
schema.CreateMetaBucket(be.BatchTx())
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm, false)
|
||||
schema.MustUnsafeSaveConfStateToBackend(cfg.Logger, be.BatchTx(), &confState)
|
||||
if err = be.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
sdb := filepath.Join(cfg.SnapDir(), fmt.Sprintf("%016x.snap.db", snapshotIndex))
|
||||
if err = os.Rename(cfg.BackendPath(), sdb); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// create backend db file
|
||||
be = serverstorage.OpenBackend(cfg, nil)
|
||||
schema.CreateMetaBucket(be.BatchTx())
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1, false)
|
||||
return be.Close()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user