verify consistent_index in snapshot must be equal to the snapshot index

Usually the consistent_index should be greater than the index of the
latest snapshot with suffix .snap. But for the snapshot coming from the
leader, the consistent_index should be equal to the snapshot index.
This commit is contained in:
ahrtr 2022-04-26 00:49:48 +08:00
parent 1597219ca7
commit fb2eeb9027
4 changed files with 38 additions and 3 deletions

View File

@ -37,20 +37,32 @@ func IsVerificationEnabled(verification VerificationType) bool {
return env == string(ENV_VERIFY_VALUE_ALL) || env == strings.ToLower(string(verification)) return env == string(ENV_VERIFY_VALUE_ALL) || env == strings.ToLower(string(verification))
} }
// EnableVerifications returns a function that can be used to bring the original settings. // EnableVerifications sets `ENV_VERIFY` and returns a function that
// can be used to bring the original settings.
func EnableVerifications(verification VerificationType) func() { func EnableVerifications(verification VerificationType) func() {
previousEnv := getEnvVerify() previousEnv := getEnvVerify()
os.Setenv(ENV_VERIFY, string(verification)) os.Setenv(ENV_VERIFY, string(verification))
return func() { return func() {
os.Setenv(ENV_VERIFY, string(previousEnv)) os.Setenv(ENV_VERIFY, previousEnv)
} }
} }
// EnableAllVerifications returns a function that can be used to bring the original settings. // EnableAllVerifications enables verification and returns a function
// that can be used to bring the original settings.
func EnableAllVerifications() func() { func EnableAllVerifications() func() {
return EnableVerifications(ENV_VERIFY_VALUE_ALL) return EnableVerifications(ENV_VERIFY_VALUE_ALL)
} }
// DisableVerifications unsets `ENV_VERIFY` and returns a function that
// can be used to bring the original settings.
func DisableVerifications() func() {
previousEnv := getEnvVerify()
os.Unsetenv(ENV_VERIFY)
return func() {
os.Setenv(ENV_VERIFY, previousEnv)
}
}
// Verify performs verification if the assertions are enabled. // Verify performs verification if the assertions are enabled.
// In the default setup running in tests and skipped in the production code. // In the default setup running in tests and skipped in the production code.
func Verify(f func()) { func Verify(f func()) {

View File

@ -41,6 +41,7 @@ import (
"go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/pkg/v3/verify"
"go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/idutil"
"go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/pkg/v3/runtime" "go.etcd.io/etcd/pkg/v3/runtime"
@ -971,6 +972,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
// Eventually the new consistent_index value coming from snapshot is overwritten // Eventually the new consistent_index value coming from snapshot is overwritten
// by the old value. // by the old value.
s.consistIndex.SetBackend(newbe) s.consistIndex.SetBackend(newbe)
verifySnapshotIndex(apply.snapshot, s.consistIndex.ConsistentIndex())
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
@ -1067,6 +1069,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
ep.confState = apply.snapshot.Metadata.ConfState ep.confState = apply.snapshot.Metadata.ConfState
} }
func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) {
verify.Verify(func() {
if cindex != snapshot.Metadata.Index {
panic(fmt.Sprintf("consistent_index(%d) isn't equal to snapshot index (%d)", cindex, snapshot.Metadata.Index))
}
})
}
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
if len(apply.entries) == 0 { if len(apply.entries) == 0 {
return return

View File

@ -34,6 +34,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/pkg/v3/verify"
"go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/idutil"
"go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/pkg/v3/wait" "go.etcd.io/etcd/pkg/v3/wait"
@ -1077,6 +1078,11 @@ func TestSnapshot(t *testing.T) {
// TestSnapshotOrdering ensures raft persists snapshot onto disk before // TestSnapshotOrdering ensures raft persists snapshot onto disk before
// snapshot db is applied. // snapshot db is applied.
func TestSnapshotOrdering(t *testing.T) { func TestSnapshotOrdering(t *testing.T) {
// Ignore the snapshot index verification in unit test, because
// it doesn't follow the e2e applying logic.
revertFunc := verify.DisableVerifications()
defer revertFunc()
lg := zaptest.NewLogger(t) lg := zaptest.NewLogger(t)
n := newNopReadyNode() n := newNopReadyNode()
st := v2store.New() st := v2store.New()
@ -1229,6 +1235,11 @@ func TestTriggerSnap(t *testing.T) {
// TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with
// proposals. // proposals.
func TestConcurrentApplyAndSnapshotV3(t *testing.T) { func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
// Ignore the snapshot index verification in unit test, because
// it doesn't follow the e2e applying logic.
revertFunc := verify.DisableVerifications()
defer revertFunc()
lg := zaptest.NewLogger(t) lg := zaptest.NewLogger(t)
n := newNopReadyNode() n := newNopReadyNode()
st := v2store.New() st := v2store.New()

View File

@ -55,6 +55,8 @@ func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi
} }
m.Snapshot = snapshot m.Snapshot = snapshot
verifySnapshotIndex(snapshot, s.consistIndex.ConsistentIndex())
return *snap.NewMessage(m, rc, dbsnap.Size()) return *snap.NewMessage(m, rc, dbsnap.Size())
} }