mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Verification framework and check whether cindex is not decreasing.
This commit is contained in:
parent
52662ccd06
commit
d69e07dd3a
@ -21,16 +21,13 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||||
)
|
)
|
||||||
|
|
||||||
// These are constants from "go.etcd.io/etcd/server/v3/verify",
|
|
||||||
// but we don't want to take dependency.
|
|
||||||
const ENV_VERIFY = "ETCD_VERIFY"
|
|
||||||
const ENV_VERIFY_ALL_VALUE = "all"
|
|
||||||
|
|
||||||
func BeforeTest(t testing.TB) {
|
func BeforeTest(t testing.TB) {
|
||||||
RegisterLeakDetection(t)
|
RegisterLeakDetection(t)
|
||||||
os.Setenv(ENV_VERIFY, ENV_VERIFY_ALL_VALUE)
|
|
||||||
|
revertVerifyFunc := verify.EnableAllVerifications()
|
||||||
|
|
||||||
path, err := os.Getwd()
|
path, err := os.Getwd()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@ -38,7 +35,10 @@ func BeforeTest(t testing.TB) {
|
|||||||
assert.NoError(t, os.Chdir(tempDir))
|
assert.NoError(t, os.Chdir(tempDir))
|
||||||
t.Logf("Changing working directory to: %s", tempDir)
|
t.Logf("Changing working directory to: %s", tempDir)
|
||||||
|
|
||||||
t.Cleanup(func() { assert.NoError(t, os.Chdir(path)) })
|
t.Cleanup(func() {
|
||||||
|
revertVerifyFunc()
|
||||||
|
assert.NoError(t, os.Chdir(path))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func BeforeIntegrationExamples(*testing.M) func() {
|
func BeforeIntegrationExamples(*testing.M) func() {
|
||||||
|
60
client/pkg/verify/verify.go
Normal file
60
client/pkg/verify/verify.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
// Copyright 2022 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package verify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
const ENV_VERIFY = "ETCD_VERIFY"
|
||||||
|
|
||||||
|
type VerificationType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
ENV_VERIFY_VALUE_ALL VerificationType = "all"
|
||||||
|
ENV_VERIFY_VALUE_ASSERT VerificationType = "assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getEnvVerify() string {
|
||||||
|
return strings.ToLower(os.Getenv(ENV_VERIFY))
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsVerificationEnabled(verification VerificationType) bool {
|
||||||
|
env := getEnvVerify()
|
||||||
|
return env == string(ENV_VERIFY_VALUE_ALL) || env == strings.ToLower(string(verification))
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableVerifications returns a function that can be used to bring the original settings.
|
||||||
|
func EnableVerifications(verification VerificationType) func() {
|
||||||
|
previousEnv := getEnvVerify()
|
||||||
|
os.Setenv(ENV_VERIFY, string(verification))
|
||||||
|
return func() {
|
||||||
|
os.Setenv(ENV_VERIFY, string(previousEnv))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableAllVerifications returns a function that can be used to bring the original settings.
|
||||||
|
func EnableAllVerifications() func() {
|
||||||
|
return EnableVerifications(ENV_VERIFY_VALUE_ALL)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify performs verification if the assertions are enabled.
|
||||||
|
// In the default setup running in tests and skipped in the production code.
|
||||||
|
func Verify(f func()) {
|
||||||
|
if IsVerificationEnabled(ENV_VERIFY_VALUE_ASSERT) {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
}
|
@ -325,7 +325,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
|||||||
tx.LockOutsideApply()
|
tx.LockOutsideApply()
|
||||||
defer tx.Unlock()
|
defer tx.Unlock()
|
||||||
schema.UnsafeCreateMetaBucket(tx)
|
schema.UnsafeCreateMetaBucket(tx)
|
||||||
schema.UnsafeUpdateConsistentIndex(tx, idx, term)
|
schema.UnsafeUpdateConsistentIndexForce(tx, idx, term)
|
||||||
} else {
|
} else {
|
||||||
// Thanks to translateWAL not moving entries, but just replacing them with
|
// Thanks to translateWAL not moving entries, but just replacing them with
|
||||||
// 'empty', there is no need to update the consistency index.
|
// 'empty', there is no need to update the consistency index.
|
||||||
|
@ -483,6 +483,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
|
|||||||
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
|
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
|
cindex.UpdateConsistentIndexForce(be.BatchTx(), commit, term)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -169,8 +169,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
|
|||||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||||
|
|
||||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
func UpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) {
|
||||||
tx.LockOutsideApply()
|
tx.LockOutsideApply()
|
||||||
defer tx.Unlock()
|
defer tx.Unlock()
|
||||||
schema.UnsafeUpdateConsistentIndex(tx, index, term)
|
schema.UnsafeUpdateConsistentIndexForce(tx, index, term)
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||||
@ -66,6 +67,7 @@ func TestConsistentIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestConsistentIndexDecrease(t *testing.T) {
|
func TestConsistentIndexDecrease(t *testing.T) {
|
||||||
|
testutil.BeforeTest(t)
|
||||||
initIndex := uint64(100)
|
initIndex := uint64(100)
|
||||||
initTerm := uint64(10)
|
initTerm := uint64(10)
|
||||||
|
|
||||||
@ -73,21 +75,25 @@ func TestConsistentIndexDecrease(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
index uint64
|
index uint64
|
||||||
term uint64
|
term uint64
|
||||||
|
panicExpected bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Decrease term",
|
name: "Decrease term",
|
||||||
index: initIndex + 1,
|
index: initIndex + 1,
|
||||||
term: initTerm - 1,
|
term: initTerm - 1,
|
||||||
|
panicExpected: false, // TODO: Change in v3.7
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Decrease CI",
|
name: "Decrease CI",
|
||||||
index: initIndex - 1,
|
index: initIndex - 1,
|
||||||
term: initTerm + 1,
|
term: initTerm + 1,
|
||||||
|
panicExpected: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Decrease CI and term",
|
name: "Decrease CI and term",
|
||||||
index: initIndex - 1,
|
index: initIndex - 1,
|
||||||
term: initTerm - 1,
|
term: initTerm - 1,
|
||||||
|
panicExpected: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
@ -106,13 +112,21 @@ func TestConsistentIndexDecrease(t *testing.T) {
|
|||||||
ci := NewConsistentIndex(be)
|
ci := NewConsistentIndex(be)
|
||||||
ci.SetConsistentIndex(tc.index, tc.term)
|
ci.SetConsistentIndex(tc.index, tc.term)
|
||||||
tx = be.BatchTx()
|
tx = be.BatchTx()
|
||||||
|
func() {
|
||||||
tx.Lock()
|
tx.Lock()
|
||||||
|
defer tx.Unlock()
|
||||||
|
if tc.panicExpected {
|
||||||
|
assert.Panics(t, func() { ci.UnsafeSave(tx) }, "Should refuse to decrease cindex")
|
||||||
|
return
|
||||||
|
}
|
||||||
ci.UnsafeSave(tx)
|
ci.UnsafeSave(tx)
|
||||||
tx.Unlock()
|
}()
|
||||||
|
if !tc.panicExpected {
|
||||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||||
|
|
||||||
ci = NewConsistentIndex(be)
|
ci = NewConsistentIndex(be)
|
||||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,17 +15,15 @@
|
|||||||
package backend
|
package backend
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ENV_VERIFY = "ETCD_VERIFY"
|
ENV_VERIFY_VALUE_LOCK verify.VerificationType = "lock"
|
||||||
ENV_VERIFY_ALL_VALUE = "all"
|
|
||||||
ENV_VERIFY_LOCK = "lock"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func ValidateCalledInsideApply(lg *zap.Logger) {
|
func ValidateCalledInsideApply(lg *zap.Logger) {
|
||||||
@ -56,7 +54,7 @@ func ValidateCalledInsideUnittest(lg *zap.Logger) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func verifyLockEnabled() bool {
|
func verifyLockEnabled() bool {
|
||||||
return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK
|
return verify.IsVerificationEnabled(ENV_VERIFY_VALUE_LOCK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func insideApply() bool {
|
func insideApply() bool {
|
||||||
|
@ -15,10 +15,10 @@
|
|||||||
package backend_test
|
package backend_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||||
)
|
)
|
||||||
@ -69,11 +69,8 @@ func TestLockVerify(t *testing.T) {
|
|||||||
expectPanic: false,
|
expectPanic: false,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
env := os.Getenv("ETCD_VERIFY")
|
revertVerifyFunc := verify.EnableVerifications(backend.ENV_VERIFY_VALUE_LOCK)
|
||||||
os.Setenv("ETCD_VERIFY", "lock")
|
defer revertVerifyFunc()
|
||||||
defer func() {
|
|
||||||
os.Setenv("ETCD_VERIFY", env)
|
|
||||||
}()
|
|
||||||
for _, tc := range tcs {
|
for _, tc := range tcs {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
|
||||||
|
@ -16,16 +16,18 @@ package schema
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||||
)
|
)
|
||||||
|
|
||||||
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
// UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exist yet).
|
||||||
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
func UnsafeCreateMetaBucket(tx backend.BatchTx) {
|
||||||
tx.UnsafeCreateBucket(Meta)
|
tx.UnsafeCreateBucket(Meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
|
// CreateMetaBucket creates the `meta` bucket (if it does not exist yet).
|
||||||
func CreateMetaBucket(tx backend.BatchTx) {
|
func CreateMetaBucket(tx backend.BatchTx) {
|
||||||
tx.LockOutsideApply()
|
tx.LockOutsideApply()
|
||||||
defer tx.Unlock()
|
defer tx.Unlock()
|
||||||
@ -57,13 +59,31 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
|||||||
return UnsafeReadConsistentIndex(tx)
|
return UnsafeReadConsistentIndex(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UnsafeUpdateConsistentIndexForce(tx backend.BatchTx, index uint64, term uint64) {
|
||||||
|
unsafeUpdateConsistentIndex(tx, index, term, true)
|
||||||
|
}
|
||||||
|
|
||||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||||
|
unsafeUpdateConsistentIndex(tx, index, term, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func unsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, allowDecreasing bool) {
|
||||||
if index == 0 {
|
if index == 0 {
|
||||||
// Never save 0 as it means that we didn't load the real index yet.
|
// Never save 0 as it means that we didn't load the real index yet.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
bs1 := make([]byte, 8)
|
bs1 := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(bs1, index)
|
binary.BigEndian.PutUint64(bs1, index)
|
||||||
|
|
||||||
|
if !allowDecreasing {
|
||||||
|
verify.Verify(func() {
|
||||||
|
previousIndex, _ := UnsafeReadConsistentIndex(tx)
|
||||||
|
if index < previousIndex {
|
||||||
|
panic(fmt.Errorf("update of consistent index not advancing: previous: %v new: %v", previousIndex, index))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// put the index into the underlying backend
|
// put the index into the underlying backend
|
||||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||||
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
|
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
|
||||||
|
@ -16,8 +16,8 @@ package verify
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||||
"go.etcd.io/etcd/server/v3/storage/datadir"
|
"go.etcd.io/etcd/server/v3/storage/datadir"
|
||||||
@ -27,8 +27,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ENV_VERIFY = "ETCD_VERIFY"
|
const ENV_VERIFY_VALUE_STORAGE_WAL verify.VerificationType = "storage_wal"
|
||||||
const ENV_VERIFY_ALL_VALUE = "all"
|
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
// DataDir is a root directory where the data being verified are stored.
|
// DataDir is a root directory where the data being verified are stored.
|
||||||
@ -90,7 +89,7 @@ func Verify(cfg Config) error {
|
|||||||
// VerifyIfEnabled performs verification according to ETCD_VERIFY env settings.
|
// VerifyIfEnabled performs verification according to ETCD_VERIFY env settings.
|
||||||
// See Verify for more information.
|
// See Verify for more information.
|
||||||
func VerifyIfEnabled(cfg Config) error {
|
func VerifyIfEnabled(cfg Config) error {
|
||||||
if os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE {
|
if verify.IsVerificationEnabled(ENV_VERIFY_VALUE_STORAGE_WAL) {
|
||||||
return Verify(cfg)
|
return Verify(cfg)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -21,9 +21,9 @@ import (
|
|||||||
|
|
||||||
grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
|
grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
|
||||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||||
|
"go.etcd.io/etcd/client/pkg/v3/verify"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.etcd.io/etcd/server/v3/embed"
|
"go.etcd.io/etcd/server/v3/embed"
|
||||||
"go.etcd.io/etcd/server/v3/verify"
|
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
"go.uber.org/zap/zapgrpc"
|
"go.uber.org/zap/zapgrpc"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
@ -90,18 +90,20 @@ func BeforeTest(t testutil.TB, opts ...TestOption) {
|
|||||||
}
|
}
|
||||||
previousInsideTestContext := insideTestContext
|
previousInsideTestContext := insideTestContext
|
||||||
|
|
||||||
|
// Integration tests should verify written state as much as possible.
|
||||||
|
revertFunc := verify.EnableAllVerifications()
|
||||||
|
|
||||||
// Registering cleanup early, such it will get executed even if the helper fails.
|
// Registering cleanup early, such it will get executed even if the helper fails.
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
grpc_logger.Reset()
|
grpc_logger.Reset()
|
||||||
insideTestContext = previousInsideTestContext
|
insideTestContext = previousInsideTestContext
|
||||||
os.Chdir(previousWD)
|
os.Chdir(previousWD)
|
||||||
|
revertFunc()
|
||||||
})
|
})
|
||||||
|
|
||||||
grpc_logger.Set(zapgrpc.NewLogger(zaptest.NewLogger(t).Named("grpc")))
|
grpc_logger.Set(zapgrpc.NewLogger(zaptest.NewLogger(t).Named("grpc")))
|
||||||
insideTestContext = true
|
insideTestContext = true
|
||||||
|
|
||||||
// Integration tests should verify written state as much as possible.
|
|
||||||
os.Setenv(verify.ENV_VERIFY, verify.ENV_VERIFY_ALL_VALUE)
|
|
||||||
os.Chdir(t.TempDir())
|
os.Chdir(t.TempDir())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user