mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12906 from ptabor/20210429-etcdctl-v2-backup-cindex-fix
20210429 etcdctl v2 backup cindex fix
This commit is contained in:
commit
835643e6e2
@ -110,8 +110,12 @@ func BeforeTest(t TB) {
|
||||
// It will detect common goroutine leaks, retrying in case there are goroutines
|
||||
// not synchronously torn down, and fail the test if any goroutines are stuck.
|
||||
func AfterTest(t TB) {
|
||||
if err := CheckAfterTest(1 * time.Second); err != nil {
|
||||
t.Errorf("Test %v", err)
|
||||
// If test-failed the leaked goroutines list is hidding the real
|
||||
// source of problem.
|
||||
if !t.Failed() {
|
||||
if err := CheckAfterTest(1 * time.Second); err != nil {
|
||||
t.Errorf("Test %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,8 +53,9 @@ func MustNewURL(t *testing.T, s string) *url.URL {
|
||||
func FatalStack(t *testing.T, s string) {
|
||||
stackTrace := make([]byte, 1024*1024)
|
||||
n := runtime.Stack(stackTrace, true)
|
||||
t.Errorf("---> Test failed: %s", s)
|
||||
t.Error(string(stackTrace[:n]))
|
||||
t.Fatalf(s)
|
||||
t.Fatal(s)
|
||||
}
|
||||
|
||||
// ConditionFunc returns true when a condition is met.
|
||||
|
@ -15,7 +15,6 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
@ -33,6 +32,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/verify"
|
||||
"go.etcd.io/etcd/server/v3/wal"
|
||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||
|
||||
@ -90,7 +90,10 @@ func handleBackup(c *cli.Context) error {
|
||||
var srcWAL string
|
||||
var destWAL string
|
||||
|
||||
lg := zap.NewExample()
|
||||
lg, err := zap.NewProduction()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
withV3 := c.Bool("with-v3")
|
||||
srcDir := c.String("data-dir")
|
||||
@ -112,28 +115,35 @@ func handleBackup(c *cli.Context) error {
|
||||
}
|
||||
|
||||
if err := fileutil.CreateDirAll(destSnap); err != nil {
|
||||
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
|
||||
lg.Fatal("failed creating backup snapshot dir", zap.String("dest-snap", destSnap), zap.Error(err))
|
||||
}
|
||||
|
||||
destDbPath := datadir.ToBackendFileName(destDir)
|
||||
srcDbPath := datadir.ToBackendFileName(srcDir)
|
||||
desired := newDesiredCluster()
|
||||
|
||||
walsnap := saveSnap(lg, destSnap, srcSnap, &desired)
|
||||
metadata, state, ents := loadWAL(srcWAL, walsnap, withV3)
|
||||
destDbPath := datadir.ToBackendFileName(destDir)
|
||||
saveDB(lg, destDbPath, datadir.ToBackendFileName(srcDir), state.Commit, &desired, withV3)
|
||||
metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3)
|
||||
saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3)
|
||||
|
||||
neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata))
|
||||
neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("wal.Create failed", zap.Error(err))
|
||||
}
|
||||
defer neww.Close()
|
||||
if err := neww.Save(state, ents); err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("wal.Save failed ", zap.Error(err))
|
||||
}
|
||||
if err := neww.SaveSnapshot(walsnap); err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("SaveSnapshot", zap.Error(err))
|
||||
}
|
||||
|
||||
verify.MustVerifyIfEnabled(verify.Config{
|
||||
Logger: lg,
|
||||
DataDir: destDir,
|
||||
ExactIndex: false,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -141,7 +151,7 @@ func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster)
|
||||
ss := snap.New(lg, srcSnap)
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("saveSnap(Snapshoter.Load) failed", zap.Error(err))
|
||||
}
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &desired.confState
|
||||
@ -149,7 +159,7 @@ func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster)
|
||||
snapshot.Metadata.ConfState = desired.confState
|
||||
snapshot.Data = mustTranslateV2store(lg, snapshot.Data, desired)
|
||||
if err = newss.SaveSnap(*snapshot); err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("saveSnap(Snapshoter.SaveSnap) failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
return walsnap
|
||||
@ -175,37 +185,36 @@ func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredClus
|
||||
return outputData
|
||||
}
|
||||
|
||||
func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
|
||||
w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap)
|
||||
func translateWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
|
||||
w, err := wal.OpenForRead(lg, srcWAL, walsnap)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("wal.OpenForRead failed", zap.Error(err))
|
||||
}
|
||||
defer w.Close()
|
||||
wmetadata, state, ents, err := w.ReadAll()
|
||||
switch err {
|
||||
case nil:
|
||||
case wal.ErrSnapshotNotFound:
|
||||
log.Printf("Failed to find the match snapshot record %+v in wal %v.", walsnap, srcWAL)
|
||||
log.Printf("etcdctl will add it back. Start auto fixing...")
|
||||
lg.Warn("failed to find the match snapshot record", zap.Any("walsnap", walsnap), zap.String("srcWAL", srcWAL))
|
||||
lg.Warn("etcdctl will add it back. Start auto fixing...")
|
||||
default:
|
||||
log.Fatal(err)
|
||||
lg.Fatal("unexpected error while reading WAL", zap.Error(err))
|
||||
}
|
||||
|
||||
re := path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")
|
||||
memberAttrRE := regexp.MustCompile(re)
|
||||
|
||||
removed := uint64(0)
|
||||
i := 0
|
||||
remove := func() {
|
||||
ents = append(ents[:i], ents[i+1:]...)
|
||||
removed++
|
||||
i--
|
||||
}
|
||||
for i = 0; i < len(ents); i++ {
|
||||
ents[i].Index -= removed
|
||||
for i := 0; i < len(ents); i++ {
|
||||
|
||||
// Replacing WAL entries with 'dummy' entries allows to avoid
|
||||
// complicated entries shifting and risk of other data (like consistent_index)
|
||||
// running out of sync.
|
||||
// Also moving entries and computing offsets would get complicated if
|
||||
// TERM changes (so there are superflous entries from previous term).
|
||||
|
||||
if ents[i].Type == raftpb.EntryConfChange {
|
||||
log.Println("ignoring EntryConfChange raft entry")
|
||||
remove()
|
||||
lg.Info("ignoring EntryConfChange raft entry")
|
||||
raftEntryToNoOp(&ents[i])
|
||||
continue
|
||||
}
|
||||
|
||||
@ -219,33 +228,42 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
|
||||
}
|
||||
|
||||
if v2Req != nil && v2Req.Method == "PUT" && memberAttrRE.MatchString(v2Req.Path) {
|
||||
log.Println("ignoring member attribute update on", v2Req.Path)
|
||||
remove()
|
||||
lg.Info("ignoring member attribute update on",
|
||||
zap.Stringer("entry", &ents[i]),
|
||||
zap.String("v2Req.Path", v2Req.Path))
|
||||
raftEntryToNoOp(&ents[i])
|
||||
continue
|
||||
}
|
||||
|
||||
if v2Req != nil {
|
||||
continue
|
||||
lg.Debug("preserving log entry", zap.Stringer("entry", &ents[i]))
|
||||
}
|
||||
|
||||
if raftReq.ClusterMemberAttrSet != nil {
|
||||
log.Println("ignoring cluster_member_attr_set")
|
||||
remove()
|
||||
lg.Info("ignoring cluster_member_attr_set")
|
||||
raftEntryToNoOp(&ents[i])
|
||||
continue
|
||||
}
|
||||
|
||||
if v3 || raftReq.Header == nil {
|
||||
lg.Debug("preserving log entry", zap.Stringer("entry", &ents[i]))
|
||||
continue
|
||||
}
|
||||
log.Println("ignoring v3 raft entry")
|
||||
remove()
|
||||
lg.Info("ignoring v3 raft entry")
|
||||
raftEntryToNoOp(&ents[i])
|
||||
}
|
||||
state.Commit -= removed
|
||||
var metadata etcdserverpb.Metadata
|
||||
pbutil.MustUnmarshal(&metadata, wmetadata)
|
||||
return metadata, state, ents
|
||||
}
|
||||
|
||||
func raftEntryToNoOp(entry *raftpb.Entry) {
|
||||
// Empty (dummy) entries are send by RAFT when new leader is getting elected.
|
||||
// They do not cary any change to data-model so its safe to replace entries
|
||||
// to be ignored with them.
|
||||
*entry = raftpb.Entry{Term: entry.Term, Index: entry.Index, Type: raftpb.EntryNormal, Data: nil}
|
||||
}
|
||||
|
||||
// saveDB copies the v3 backend and strips cluster information.
|
||||
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) {
|
||||
|
||||
@ -256,34 +274,34 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
|
||||
go func() {
|
||||
db, err := bolt.Open(srcDB, 0444, &bolt.Options{ReadOnly: true})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("bolt.Open FAILED", zap.Error(err))
|
||||
}
|
||||
ch <- db
|
||||
}()
|
||||
select {
|
||||
case src = <-ch:
|
||||
case <-time.After(time.Second):
|
||||
log.Println("waiting to acquire lock on", srcDB)
|
||||
lg.Fatal("timed out waiting to acquire lock on", zap.String("srcDB", srcDB))
|
||||
src = <-ch
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
tx, err := src.Begin(false)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("bbolt.BeginTx failed", zap.Error(err))
|
||||
}
|
||||
|
||||
// copy srcDB to destDB
|
||||
dest, err := os.Create(destDB)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("creation of destination file failed", zap.String("dest", destDB), zap.Error(err))
|
||||
}
|
||||
if _, err := tx.WriteTo(dest); err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("bbolt write to destination file failed", zap.String("dest", destDB), zap.Error(err))
|
||||
}
|
||||
dest.Close()
|
||||
if err := tx.Rollback(); err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("bbolt tx.Rollback failed", zap.String("dest", destDB), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -291,7 +309,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
|
||||
defer be.Close()
|
||||
|
||||
if err := membership.TrimClusterFromBackend(be); err != nil {
|
||||
log.Fatal(err)
|
||||
lg.Fatal("bbolt tx.Membership failed", zap.Error(err))
|
||||
}
|
||||
|
||||
raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
|
||||
@ -303,10 +321,13 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket([]byte("meta"))
|
||||
cindex.UnsafeCreateMetaBucket(tx)
|
||||
ci := cindex.NewConsistentIndex(tx)
|
||||
ci.SetConsistentIndex(idx)
|
||||
ci.UnsafeSave(tx)
|
||||
} else {
|
||||
// Thanks to translateWAL not moving entries, but just replacing them with
|
||||
// 'empty', there is no need to update the consistency index.
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
"go.etcd.io/etcd/server/v3/mvcc"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/wal"
|
||||
@ -91,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
|
||||
}()
|
||||
|
||||
readKeys(reader, be)
|
||||
mvcc.UpdateConsistentIndex(be, index)
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), index)
|
||||
err := <-errc
|
||||
if err != nil {
|
||||
fmt.Println("failed to transform keys")
|
||||
|
@ -147,6 +147,8 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error {
|
||||
}
|
||||
|
||||
// Close waits for the expect process to exit.
|
||||
// Close currently does not return error if process exited with !=0 status.
|
||||
// TODO: Close should expose underlying proces failure by default.
|
||||
func (ep *ExpectProcess) Close() error { return ep.close(false) }
|
||||
|
||||
func (ep *ExpectProcess) close(kill bool) error {
|
||||
@ -162,7 +164,6 @@ func (ep *ExpectProcess) close(kill bool) error {
|
||||
ep.wg.Wait()
|
||||
|
||||
if err != nil {
|
||||
ep.err = err
|
||||
if !kill && strings.Contains(err.Error(), "exit status") {
|
||||
// non-zero exit code
|
||||
err = nil
|
||||
@ -170,6 +171,7 @@ func (ep *ExpectProcess) close(kill bool) error {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
|
||||
ep.cmd = nil
|
||||
return err
|
||||
}
|
||||
@ -178,3 +180,12 @@ func (ep *ExpectProcess) Send(command string) error {
|
||||
_, err := io.WriteString(ep.fpty, command)
|
||||
return err
|
||||
}
|
||||
|
||||
func (ep *ExpectProcess) ProcessError() error {
|
||||
if strings.Contains(ep.err.Error(), "input/output error") {
|
||||
// TODO: The expect library should not return
|
||||
// `/dev/ptmx: input/output error` when process just exits.
|
||||
return nil
|
||||
}
|
||||
return ep.err
|
||||
}
|
||||
|
@ -229,6 +229,9 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
if err = e.Server.CheckInitialHashKV(); err != nil {
|
||||
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
|
||||
// (nothing to close since rafthttp transports have not been started)
|
||||
|
||||
e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
|
||||
e.Server.Cleanup()
|
||||
e.Server = nil
|
||||
return e, err
|
||||
}
|
||||
|
@ -23,9 +23,9 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
metaBucketName = []byte("meta")
|
||||
MetaBucketName = []byte("meta")
|
||||
|
||||
consistentIndexKeyName = []byte("consistent_index")
|
||||
ConsistentIndexKeyName = []byte("consistent_index")
|
||||
)
|
||||
|
||||
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
|
||||
@ -52,14 +52,11 @@ type consistentIndex struct {
|
||||
// it caches the "consistent_index" key's value. Accessed
|
||||
// through atomics so must be 64-bit aligned.
|
||||
consistentIndex uint64
|
||||
// bytesBuf8 is a byte slice of length 8
|
||||
// to avoid a repetitive allocation in saveIndex.
|
||||
bytesBuf8 []byte
|
||||
mutex sync.Mutex
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
|
||||
return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)}
|
||||
return &consistentIndex{tx: tx}
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
@ -69,14 +66,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
}
|
||||
ci.mutex.Lock()
|
||||
defer ci.mutex.Unlock()
|
||||
ci.tx.Lock()
|
||||
defer ci.tx.Unlock()
|
||||
_, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return 0
|
||||
}
|
||||
v := binary.BigEndian.Uint64(vs[0])
|
||||
atomic.StoreUint64(&ci.consistentIndex, v)
|
||||
v := ReadConsistentIndex(ci.tx)
|
||||
return v
|
||||
}
|
||||
|
||||
@ -85,11 +75,12 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
||||
bs := ci.bytesBuf8
|
||||
binary.BigEndian.PutUint64(bs, ci.consistentIndex)
|
||||
// put the index into the underlying backend
|
||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
|
||||
index := atomic.LoadUint64(&ci.consistentIndex)
|
||||
if index == 0 {
|
||||
// Never save 0 as it means that we didn't loaded the real index yet.
|
||||
return
|
||||
}
|
||||
unsafeUpdateConsistentIndex(tx, index)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
|
||||
@ -112,3 +103,45 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}
|
||||
|
||||
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.UnsafeCreateBucket(MetaBucketName)
|
||||
}
|
||||
|
||||
// unsafeGetConsistentIndex loads consistent index from given transaction.
|
||||
// returns 0 if the data are not found.
|
||||
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
|
||||
_, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return 0
|
||||
}
|
||||
v := binary.BigEndian.Uint64(vs[0])
|
||||
return v
|
||||
}
|
||||
|
||||
// ReadConsistentIndex loads consistent index from given transaction.
|
||||
// returns 0 if the data are not found.
|
||||
func ReadConsistentIndex(tx backend.ReadTx) uint64 {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
return unsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64) {
|
||||
bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
|
||||
binary.BigEndian.PutUint64(bs, index)
|
||||
// put the index into the underlying backend
|
||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
|
||||
}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64) {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
oldi := unsafeReadConsistentIndex(tx)
|
||||
if index <= oldi {
|
||||
return
|
||||
}
|
||||
unsafeUpdateConsistentIndex(tx, index)
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ func TestConsistentIndex(t *testing.T) {
|
||||
t.Fatal("batch tx is nil")
|
||||
}
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(metaBucketName)
|
||||
UnsafeCreateMetaBucket(tx)
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
r := rand.Uint64()
|
||||
@ -50,6 +50,7 @@ func TestConsistentIndex(t *testing.T) {
|
||||
be.Close()
|
||||
|
||||
b := backend.NewDefaultBackend(tmpPath)
|
||||
defer b.Close()
|
||||
ci.SetConsistentIndex(0)
|
||||
ci.SetBatchTx(b.BatchTx())
|
||||
index = ci.ConsistentIndex()
|
||||
@ -62,8 +63,6 @@ func TestConsistentIndex(t *testing.T) {
|
||||
if index != r {
|
||||
t.Errorf("expected %d,got %d", r, index)
|
||||
}
|
||||
b.Close()
|
||||
|
||||
}
|
||||
|
||||
func TestFakeConsistentIndex(t *testing.T) {
|
||||
|
@ -1028,7 +1028,6 @@ func (s *EtcdServer) run() {
|
||||
close(s.stopping)
|
||||
s.wgMu.Unlock()
|
||||
s.cancel()
|
||||
|
||||
sched.Stop()
|
||||
|
||||
// wait for gouroutines before closing raft so wal stays open
|
||||
@ -1040,23 +1039,8 @@ func (s *EtcdServer) run() {
|
||||
// by adding a peer after raft stops the transport
|
||||
s.r.stop()
|
||||
|
||||
// kv, lessor and backend can be nil if running without v3 enabled
|
||||
// or running unit tests.
|
||||
if s.lessor != nil {
|
||||
s.lessor.Stop()
|
||||
}
|
||||
if s.kv != nil {
|
||||
s.kv.Close()
|
||||
}
|
||||
if s.authStore != nil {
|
||||
s.authStore.Close()
|
||||
}
|
||||
if s.be != nil {
|
||||
s.be.Close()
|
||||
}
|
||||
if s.compactor != nil {
|
||||
s.compactor.Stop()
|
||||
}
|
||||
s.Cleanup()
|
||||
|
||||
close(s.done)
|
||||
}()
|
||||
|
||||
@ -1112,6 +1096,28 @@ func (s *EtcdServer) run() {
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup removes allocated objects by EtcdServer.NewServer in
|
||||
// situation that EtcdServer::Start was not called (that takes care of cleanup).
|
||||
func (s *EtcdServer) Cleanup() {
|
||||
// kv, lessor and backend can be nil if running without v3 enabled
|
||||
// or running unit tests.
|
||||
if s.lessor != nil {
|
||||
s.lessor.Stop()
|
||||
}
|
||||
if s.kv != nil {
|
||||
s.kv.Close()
|
||||
}
|
||||
if s.authStore != nil {
|
||||
s.authStore.Close()
|
||||
}
|
||||
if s.be != nil {
|
||||
s.be.Close()
|
||||
}
|
||||
if s.compactor != nil {
|
||||
s.compactor.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||
s.applySnapshot(ep, apply)
|
||||
s.applyEntries(ep, apply)
|
||||
@ -2256,6 +2262,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
clone := s.v2store.Clone()
|
||||
// commit kv to write metadata (for example: consistent index) to disk.
|
||||
//
|
||||
// This guarantees that Backend's consistent_index is >= index of last snapshot.
|
||||
//
|
||||
// KV().commit() updates the consistent index in backend.
|
||||
// All operations that update consistent index must be called sequentially
|
||||
// from applyAll function.
|
||||
|
@ -35,9 +35,8 @@ import (
|
||||
|
||||
var (
|
||||
keyBucketName = []byte("key")
|
||||
metaBucketName = []byte("meta")
|
||||
metaBucketName = cindex.MetaBucketName
|
||||
|
||||
consistentIndexKeyName = []byte("consistent_index")
|
||||
scheduledCompactKeyName = []byte("scheduledCompactRev")
|
||||
finishedCompactKeyName = []byte("finishedCompactRev")
|
||||
|
||||
@ -128,7 +127,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.Cons
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(keyBucketName)
|
||||
tx.UnsafeCreateBucket(metaBucketName)
|
||||
cindex.UnsafeCreateMetaBucket(tx)
|
||||
tx.Unlock()
|
||||
s.b.ForceCommit()
|
||||
|
||||
@ -308,7 +307,7 @@ func init() {
|
||||
DefaultIgnores = map[backend.IgnoreKey]struct{}{
|
||||
// consistent index might be changed due to v2 internal sync, which
|
||||
// is not controllable by the user.
|
||||
{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
|
||||
{Bucket: string(metaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,33 +15,12 @@
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
)
|
||||
|
||||
func UpdateConsistentIndex(be backend.Backend, index uint64) {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
var oldi uint64
|
||||
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
|
||||
if len(vs) != 0 {
|
||||
oldi = binary.BigEndian.Uint64(vs[0])
|
||||
}
|
||||
|
||||
if index <= oldi {
|
||||
return
|
||||
}
|
||||
|
||||
bs := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs, index)
|
||||
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
|
||||
}
|
||||
|
||||
func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
|
||||
ibytes := newRevBytes()
|
||||
revToBytes(revision{main: kv.ModRevision}, ibytes)
|
||||
|
@ -76,7 +76,7 @@ func Verify(cfg Config) error {
|
||||
be := backend.New(beConfig)
|
||||
defer be.Close()
|
||||
|
||||
_, hardstate, err := validateWal(cfg)
|
||||
snapshot, hardstate, err := validateWal(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -84,7 +84,7 @@ func Verify(cfg Config) error {
|
||||
// TODO: Perform validation of consistency of membership between
|
||||
// backend/members & WAL confstate (and maybe storev2 if still exists).
|
||||
|
||||
return validateConsistentIndex(cfg, hardstate, be)
|
||||
return validateConsistentIndex(cfg, hardstate, snapshot, be)
|
||||
}
|
||||
|
||||
// VerifyIfEnabled performs verification according to ETCD_VERIFY env settings.
|
||||
@ -101,22 +101,25 @@ func VerifyIfEnabled(cfg Config) error {
|
||||
// See Verify for more information.
|
||||
func MustVerifyIfEnabled(cfg Config) {
|
||||
if err := VerifyIfEnabled(cfg); err != nil {
|
||||
cfg.Logger.Panic("Verification failed",
|
||||
cfg.Logger.Fatal("Verification failed",
|
||||
zap.String("data-dir", cfg.DataDir),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, be backend.Backend) error {
|
||||
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
ci := cindex.NewConsistentIndex(tx)
|
||||
index := ci.ConsistentIndex()
|
||||
index := cindex.ReadConsistentIndex(tx)
|
||||
if cfg.ExactIndex && index != hardstate.Commit {
|
||||
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
|
||||
}
|
||||
if index > hardstate.Commit {
|
||||
return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)", index, hardstate.Commit)
|
||||
}
|
||||
if index < snapshot.Index {
|
||||
return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)", index, snapshot.Index)
|
||||
}
|
||||
|
||||
cfg.Logger.Info("verification: consistentIndex OK", zap.Uint64("backend-consistent-index", index), zap.Uint64("hardstate-commit", hardstate.Commit))
|
||||
return nil
|
||||
}
|
||||
|
@ -20,17 +20,14 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
)
|
||||
|
||||
func BeforeTestV2(t testing.TB) {
|
||||
skipInShortMode(t)
|
||||
BeforeTest(t)
|
||||
os.Setenv("ETCDCTL_API", "2")
|
||||
t.Cleanup(func() {
|
||||
os.Unsetenv("ETCDCTL_API")
|
||||
})
|
||||
testutil.BeforeTest(t)
|
||||
}
|
||||
|
||||
func TestCtlV2Set(t *testing.T) { testCtlV2Set(t, newConfigNoTLS(), false) }
|
||||
@ -493,7 +490,11 @@ func etcdctlBackup(t testing.TB, clus *etcdProcessCluster, dataDir, backupDir st
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return proc.Close()
|
||||
err = proc.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return proc.ProcessError()
|
||||
}
|
||||
|
||||
func setupEtcdctlTest(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) *etcdProcessCluster {
|
||||
|
@ -23,10 +23,13 @@ import (
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/verify"
|
||||
)
|
||||
|
||||
func BeforeTest(t testing.TB) {
|
||||
skipInShortMode(t)
|
||||
testutil.BeforeTest(t)
|
||||
os.Setenv(verify.ENV_VERIFY, verify.ENV_VERIFY_ALL_VALUE)
|
||||
}
|
||||
|
||||
func TestCtlV3Migrate(t *testing.T) {
|
||||
|
@ -250,6 +250,7 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
|
||||
testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
|
||||
case <-donec:
|
||||
}
|
||||
t.Log("---Test logic DONE")
|
||||
}
|
||||
|
||||
func (cx *ctlCtx) prefixArgs(eps []string) []string {
|
||||
|
@ -19,14 +19,13 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"go.etcd.io/etcd/server/v3/datadir"
|
||||
)
|
||||
|
||||
// TODO: test with embedded etcd in integration package
|
||||
@ -49,6 +48,7 @@ func TestEtcdCorruptHash(t *testing.T) {
|
||||
}
|
||||
|
||||
func corruptTest(cx ctlCtx) {
|
||||
cx.t.Log("putting 10 keys...")
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil {
|
||||
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
|
||||
@ -57,8 +57,10 @@ func corruptTest(cx ctlCtx) {
|
||||
}
|
||||
}
|
||||
// enough time for all nodes sync on the same data
|
||||
cx.t.Log("sleeping 3sec to let nodes sync...")
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
cx.t.Log("connecting clientv3...")
|
||||
eps := cx.epc.EndpointsV3()
|
||||
cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second})
|
||||
if err != nil {
|
||||
@ -67,19 +69,23 @@ func corruptTest(cx ctlCtx) {
|
||||
defer cli1.Close()
|
||||
|
||||
sresp, err := cli1.Status(context.TODO(), eps[0])
|
||||
cx.t.Logf("checked status sresp:%v err:%v", sresp, err)
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
id0 := sresp.Header.GetMemberId()
|
||||
|
||||
cx.t.Log("stopping etcd[0]...")
|
||||
cx.epc.procs[0].Stop()
|
||||
|
||||
// corrupt first member by modifying backend offline.
|
||||
fp := filepath.Join(cx.epc.procs[0].Config().dataDirPath, "member", "snap", "db")
|
||||
// corrupting first member by modifying backend offline.
|
||||
fp := datadir.ToBackendFileName(cx.epc.procs[0].Config().dataDirPath)
|
||||
cx.t.Logf("corrupting backend: %v", fp)
|
||||
if err = cx.corruptFunc(fp); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
cx.t.Log("restarting etcd[0]")
|
||||
ep := cx.epc.procs[0]
|
||||
proc, err := spawnCmd(append([]string{ep.Config().execPath}, ep.Config().args...))
|
||||
if err != nil {
|
||||
@ -87,6 +93,7 @@ func corruptTest(cx ctlCtx) {
|
||||
}
|
||||
defer proc.Stop()
|
||||
|
||||
cx.t.Log("waiting for etcd[0] failure...")
|
||||
// restarting corrupted member should fail
|
||||
waitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user