mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13963 from ptabor/20220412-verify-assert
Add verification consistent index is (nearly) never decreasing
This commit is contained in:
commit
887f95d0d3
@ -21,16 +21,13 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||
)
|
||||
|
||||
// These are constants from "go.etcd.io/etcd/server/v3/verify",
|
||||
// but we don't want to take dependency.
|
||||
const ENV_VERIFY = "ETCD_VERIFY"
|
||||
const ENV_VERIFY_ALL_VALUE = "all"
|
||||
|
||||
func BeforeTest(t testing.TB) {
|
||||
RegisterLeakDetection(t)
|
||||
os.Setenv(ENV_VERIFY, ENV_VERIFY_ALL_VALUE)
|
||||
|
||||
revertVerifyFunc := verify.EnableAllVerifications()
|
||||
|
||||
path, err := os.Getwd()
|
||||
assert.NoError(t, err)
|
||||
@ -38,7 +35,10 @@ func BeforeTest(t testing.TB) {
|
||||
assert.NoError(t, os.Chdir(tempDir))
|
||||
t.Logf("Changing working directory to: %s", tempDir)
|
||||
|
||||
t.Cleanup(func() { assert.NoError(t, os.Chdir(path)) })
|
||||
t.Cleanup(func() {
|
||||
revertVerifyFunc()
|
||||
assert.NoError(t, os.Chdir(path))
|
||||
})
|
||||
}
|
||||
|
||||
func BeforeIntegrationExamples(*testing.M) func() {
|
||||
|
60
client/pkg/verify/verify.go
Normal file
60
client/pkg/verify/verify.go
Normal file
@ -0,0 +1,60 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package verify
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const ENV_VERIFY = "ETCD_VERIFY"
|
||||
|
||||
type VerificationType string
|
||||
|
||||
const (
|
||||
ENV_VERIFY_VALUE_ALL VerificationType = "all"
|
||||
ENV_VERIFY_VALUE_ASSERT VerificationType = "assert"
|
||||
)
|
||||
|
||||
func getEnvVerify() string {
|
||||
return strings.ToLower(os.Getenv(ENV_VERIFY))
|
||||
}
|
||||
|
||||
func IsVerificationEnabled(verification VerificationType) bool {
|
||||
env := getEnvVerify()
|
||||
return env == string(ENV_VERIFY_VALUE_ALL) || env == strings.ToLower(string(verification))
|
||||
}
|
||||
|
||||
// EnableVerifications returns a function that can be used to bring the original settings.
|
||||
func EnableVerifications(verification VerificationType) func() {
|
||||
previousEnv := getEnvVerify()
|
||||
os.Setenv(ENV_VERIFY, string(verification))
|
||||
return func() {
|
||||
os.Setenv(ENV_VERIFY, string(previousEnv))
|
||||
}
|
||||
}
|
||||
|
||||
// EnableAllVerifications returns a function that can be used to bring the original settings.
|
||||
func EnableAllVerifications() func() {
|
||||
return EnableVerifications(ENV_VERIFY_VALUE_ALL)
|
||||
}
|
||||
|
||||
// Verify performs verification if the assertions are enabled.
|
||||
// In the default setup running in tests and skipped in the production code.
|
||||
func Verify(f func()) {
|
||||
if IsVerificationEnabled(ENV_VERIFY_VALUE_ASSERT) {
|
||||
f()
|
||||
}
|
||||
}
|
@ -325,7 +325,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, idx, term)
|
||||
schema.UnsafeUpdateConsistentIndexForce(tx, idx, term)
|
||||
} else {
|
||||
// Thanks to translateWAL not moving entries, but just replacing them with
|
||||
// 'empty', there is no need to update the consistency index.
|
||||
|
@ -483,6 +483,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
|
||||
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
|
||||
defer be.Close()
|
||||
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
|
||||
cindex.UpdateConsistentIndexForce(be.BatchTx(), commit, term)
|
||||
return nil
|
||||
}
|
||||
|
@ -169,8 +169,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
func UpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) {
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
schema.UnsafeUpdateConsistentIndexForce(tx, index, term)
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
@ -66,28 +67,33 @@ func TestConsistentIndex(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConsistentIndexDecrease(t *testing.T) {
|
||||
testutil.BeforeTest(t)
|
||||
initIndex := uint64(100)
|
||||
initTerm := uint64(10)
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
index uint64
|
||||
term uint64
|
||||
name string
|
||||
index uint64
|
||||
term uint64
|
||||
panicExpected bool
|
||||
}{
|
||||
{
|
||||
name: "Decrease term",
|
||||
index: initIndex + 1,
|
||||
term: initTerm - 1,
|
||||
name: "Decrease term",
|
||||
index: initIndex + 1,
|
||||
term: initTerm - 1,
|
||||
panicExpected: false, // TODO: Change in v3.7
|
||||
},
|
||||
{
|
||||
name: "Decrease CI",
|
||||
index: initIndex - 1,
|
||||
term: initTerm + 1,
|
||||
name: "Decrease CI",
|
||||
index: initIndex - 1,
|
||||
term: initTerm + 1,
|
||||
panicExpected: true,
|
||||
},
|
||||
{
|
||||
name: "Decrease CI and term",
|
||||
index: initIndex - 1,
|
||||
term: initTerm - 1,
|
||||
name: "Decrease CI and term",
|
||||
index: initIndex - 1,
|
||||
term: initTerm - 1,
|
||||
panicExpected: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
@ -106,13 +112,21 @@ func TestConsistentIndexDecrease(t *testing.T) {
|
||||
ci := NewConsistentIndex(be)
|
||||
ci.SetConsistentIndex(tc.index, tc.term)
|
||||
tx = be.BatchTx()
|
||||
tx.Lock()
|
||||
ci.UnsafeSave(tx)
|
||||
tx.Unlock()
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
func() {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
if tc.panicExpected {
|
||||
assert.Panics(t, func() { ci.UnsafeSave(tx) }, "Should refuse to decrease cindex")
|
||||
return
|
||||
}
|
||||
ci.UnsafeSave(tx)
|
||||
}()
|
||||
if !tc.panicExpected {
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
|
||||
ci = NewConsistentIndex(be)
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
ci = NewConsistentIndex(be)
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -15,17 +15,15 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
ENV_VERIFY = "ETCD_VERIFY"
|
||||
ENV_VERIFY_ALL_VALUE = "all"
|
||||
ENV_VERIFY_LOCK = "lock"
|
||||
ENV_VERIFY_VALUE_LOCK verify.VerificationType = "lock"
|
||||
)
|
||||
|
||||
func ValidateCalledInsideApply(lg *zap.Logger) {
|
||||
@ -56,7 +54,7 @@ func ValidateCalledInsideUnittest(lg *zap.Logger) {
|
||||
}
|
||||
|
||||
func verifyLockEnabled() bool {
|
||||
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
|
||||
return verify.IsVerificationEnabled(ENV_VERIFY_VALUE_LOCK)
|
||||
}
|
||||
|
||||
func insideApply() bool {
|
||||
|
@ -15,10 +15,10 @@
|
||||
package backend_test
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||
)
|
||||
@ -69,11 +69,8 @@ func TestLockVerify(t *testing.T) {
|
||||
expectPanic: false,
|
||||
},
|
||||
}
|
||||
env := os.Getenv("ETCD_VERIFY")
|
||||
os.Setenv("ETCD_VERIFY", "lock")
|
||||
defer func() {
|
||||
os.Setenv("ETCD_VERIFY", env)
|
||||
}()
|
||||
revertVerifyFunc := verify.EnableVerifications(backend.ENV_VERIFY_VALUE_LOCK)
|
||||
defer revertVerifyFunc()
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
|
@ -16,16 +16,18 @@ package schema
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
)
|
||||
|
||||
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
||||
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exist yet).
|
||||
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.UnsafeCreateBucket(Meta)
|
||||
}
|
||||
|
||||
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
||||
// CreateMetaBucket creates the `meta` bucket (if it does not exist yet).
|
||||
func CreateMetaBucket(tx backend.BatchTx) {
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
@ -57,13 +59,31 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
return UnsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
func UnsafeUpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) {
|
||||
unsafeUpdateConsistentIndex(tx, index, term, true)
|
||||
}
|
||||
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
unsafeUpdateConsistentIndex(tx, index, term, false)
|
||||
}
|
||||
|
||||
func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, allowDecreasing bool) {
|
||||
if index == 0 {
|
||||
// Never save 0 as it means that we didn't load the real index yet.
|
||||
return
|
||||
}
|
||||
bs1 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs1, index)
|
||||
|
||||
if !allowDecreasing {
|
||||
verify.Verify(func() {
|
||||
previousIndex, _ := UnsafeReadConsistentIndex(tx)
|
||||
if index < previousIndex {
|
||||
panic(fmt.Errorf("update of consistent index not advancing: previous: %v new: %v", previousIndex, index))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// put the index into the underlying backend
|
||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
|
||||
|
@ -16,8 +16,8 @@ package verify
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
"go.etcd.io/etcd/server/v3/storage/datadir"
|
||||
@ -27,8 +27,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const ENV_VERIFY = "ETCD_VERIFY"
|
||||
const ENV_VERIFY_ALL_VALUE = "all"
|
||||
const ENV_VERIFY_VALUE_STORAGE_WAL verify.VerificationType = "storage_wal"
|
||||
|
||||
type Config struct {
|
||||
// DataDir is a root directory where the data being verified are stored.
|
||||
@ -90,7 +89,7 @@ func Verify(cfg Config) error {
|
||||
// VerifyIfEnabled performs verification according to ETCD_VERIFY env settings.
|
||||
// See Verify for more information.
|
||||
func VerifyIfEnabled(cfg Config) error {
|
||||
if os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE {
|
||||
if verify.IsVerificationEnabled(ENV_VERIFY_VALUE_STORAGE_WAL) {
|
||||
return Verify(cfg)
|
||||
}
|
||||
return nil
|
||||
|
@ -21,9 +21,9 @@ import (
|
||||
|
||||
grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
"go.etcd.io/etcd/server/v3/verify"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"go.uber.org/zap/zapgrpc"
|
||||
"go.uber.org/zap/zaptest"
|
||||
@ -90,18 +90,20 @@ func BeforeTest(t testutil.TB, opts ...TestOption) {
|
||||
}
|
||||
previousInsideTestContext := insideTestContext
|
||||
|
||||
// Integration tests should verify written state as much as possible.
|
||||
revertFunc := verify.EnableAllVerifications()
|
||||
|
||||
// Registering cleanup early, such it will get executed even if the helper fails.
|
||||
t.Cleanup(func() {
|
||||
grpc_logger.Reset()
|
||||
insideTestContext = previousInsideTestContext
|
||||
os.Chdir(previousWD)
|
||||
revertFunc()
|
||||
})
|
||||
|
||||
grpc_logger.Set(zapgrpc.NewLogger(zaptest.NewLogger(t).Named("grpc")))
|
||||
insideTestContext = true
|
||||
|
||||
// Integration tests should verify written state as much as possible.
|
||||
os.Setenv(verify.ENV_VERIFY, verify.ENV_VERIFY_ALL_VALUE)
|
||||
os.Chdir(t.TempDir())
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user