mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Fix ETCDCTL_API=2 etcdctl backup --with-v3 consistent index consistency
Prior to this CL, `ETCDCTL_API=2 etcdctl backup --with-v3` was readacting WAL log (by removal of some entries), but was NOT updating consistent_index in the backend. Also the WAL editing logic was buggy, as it didn't took in consideration the fact that when TERM changes, there can be entries with duplicated indexes in the log. So its NOT sufficient to subtract number of removed entries to get accurate log indexes. The PR replaces removing and shifting of WAL entries with replacing them with an no-op entries. Thanks to this consistent-index references are staying up to date. The PR also: - updates 'verification' logic to check whether consistent_index does not lag befor last snapshot - env-gated execution of verification framework in `etcdctl backup`. Tested with: ``` (./build.sh && cd tests && EXPECT_DEBUG=TRUE 'env' 'go' 'test' '-timeout=300m' 'go.etcd.io/etcd/tests/v3/e2e' -run=TestCtlV2Backup --count=1000 2>&1 | tee TestCtlV2BackupV3.log) ```
This commit is contained in:
@@ -23,9 +23,9 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
metaBucketName = []byte("meta")
|
||||
MetaBucketName = []byte("meta")
|
||||
|
||||
consistentIndexKeyName = []byte("consistent_index")
|
||||
ConsistentIndexKeyName = []byte("consistent_index")
|
||||
)
|
||||
|
||||
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
|
||||
@@ -52,14 +52,11 @@ type consistentIndex struct {
|
||||
// it caches the "consistent_index" key's value. Accessed
|
||||
// through atomics so must be 64-bit aligned.
|
||||
consistentIndex uint64
|
||||
// bytesBuf8 is a byte slice of length 8
|
||||
// to avoid a repetitive allocation in saveIndex.
|
||||
bytesBuf8 []byte
|
||||
mutex sync.Mutex
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
|
||||
return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)}
|
||||
return &consistentIndex{tx: tx}
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
@@ -69,14 +66,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
|
||||
}
|
||||
ci.mutex.Lock()
|
||||
defer ci.mutex.Unlock()
|
||||
ci.tx.Lock()
|
||||
defer ci.tx.Unlock()
|
||||
_, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return 0
|
||||
}
|
||||
v := binary.BigEndian.Uint64(vs[0])
|
||||
atomic.StoreUint64(&ci.consistentIndex, v)
|
||||
v := ReadConsistentIndex(ci.tx)
|
||||
return v
|
||||
}
|
||||
|
||||
@@ -85,11 +75,16 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
||||
bs := ci.bytesBuf8
|
||||
binary.BigEndian.PutUint64(bs, ci.consistentIndex)
|
||||
index := atomic.LoadUint64(&ci.consistentIndex)
|
||||
if index == 0 {
|
||||
// Never save 0 as it means that we didn't loaded the real index yet.
|
||||
return
|
||||
}
|
||||
bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
|
||||
binary.BigEndian.PutUint64(bs, index)
|
||||
// put the index into the underlying backend
|
||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
|
||||
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
|
||||
@@ -112,3 +107,26 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}
|
||||
|
||||
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.UnsafeCreateBucket(MetaBucketName)
|
||||
}
|
||||
|
||||
// unsafeGetConsistentIndex loads consistent index from given transaction.
|
||||
// returns 0 if the data are not found.
|
||||
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
|
||||
_, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0)
|
||||
if len(vs) == 0 {
|
||||
return 0
|
||||
}
|
||||
v := binary.BigEndian.Uint64(vs[0])
|
||||
return v
|
||||
}
|
||||
|
||||
// ReadConsistentIndex loads consistent index from given transaction.
|
||||
// returns 0 if the data are not found.
|
||||
func ReadConsistentIndex(tx backend.ReadTx) uint64 {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
return unsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestConsistentIndex(t *testing.T) {
|
||||
t.Fatal("batch tx is nil")
|
||||
}
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket(metaBucketName)
|
||||
UnsafeCreateMetaBucket(tx)
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
r := rand.Uint64()
|
||||
@@ -50,6 +50,7 @@ func TestConsistentIndex(t *testing.T) {
|
||||
be.Close()
|
||||
|
||||
b := backend.NewDefaultBackend(tmpPath)
|
||||
defer b.Close()
|
||||
ci.SetConsistentIndex(0)
|
||||
ci.SetBatchTx(b.BatchTx())
|
||||
index = ci.ConsistentIndex()
|
||||
@@ -62,8 +63,6 @@ func TestConsistentIndex(t *testing.T) {
|
||||
if index != r {
|
||||
t.Errorf("expected %d,got %d", r, index)
|
||||
}
|
||||
b.Close()
|
||||
|
||||
}
|
||||
|
||||
func TestFakeConsistentIndex(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user