Merge pull request #14120 from serathius/compact-check

server: Implement compaction hash checking
This commit is contained in:
Marek Siarkowicz 2022-07-26 15:36:31 +02:00 committed by GitHub
commit a3b410cac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 673 additions and 158 deletions

View File

@ -147,8 +147,10 @@ type ServerConfig struct {
// InitialCorruptCheck is true to check data corruption on boot
// before serving any peer/client traffic.
InitialCorruptCheck bool
CorruptCheckTime time.Duration
InitialCorruptCheck bool
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
// PreVote is true to enable Raft Pre-Vote.
PreVote bool

View File

@ -320,8 +320,11 @@ type Config struct {
// AuthTokenTTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"`
ExperimentalCompactHashCheckEnabled bool `json:"experimental-compact-hash-check-enabled"`
ExperimentalCompactHashCheckTime time.Duration `json:"experimental-compact-hash-check-time"`
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
@ -521,6 +524,9 @@ func NewConfig() *Config {
ExperimentalTxnModeWriteWithSharedBuffer: true,
ExperimentalMaxLearners: membership.DefaultMaxLearners,
ExperimentalCompactHashCheckEnabled: false,
ExperimentalCompactHashCheckTime: time.Minute,
V2Deprecation: config.V2_DEPR_DEFAULT,
DiscoveryCfg: v3discovery.DiscoveryConfig{
@ -759,6 +765,10 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}
if cfg.ExperimentalCompactHashCheckTime <= 0 {
return fmt.Errorf("--experimental-compact-hash-check-time must be >0 (set to %v)", cfg.ExperimentalCompactHashCheckTime)
}
return nil
}

View File

@ -202,6 +202,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
@ -252,7 +254,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized && srvcfg.InitialCorruptCheck {
if err = etcdserver.NewCorruptionMonitor(e.cfg.logger, e.Server).InitialCheck(); err != nil {
if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)
@ -344,6 +346,8 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized
zap.Bool("pre-vote", sc.PreVote),
zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck),
zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()),
zap.Bool("compact-check-time-enabled", sc.CompactHashCheckEnabled),
zap.Duration("compact-check-time-interval", sc.CompactHashCheckTime),
zap.String("auto-compaction-mode", sc.AutoCompactionMode),
zap.Duration("auto-compaction-retention", sc.AutoCompactionRetention),
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),

View File

@ -259,6 +259,8 @@ func newConfig() *config {
// experimental
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.BoolVar(&cfg.ec.ExperimentalCompactHashCheckEnabled, "experimental-compact-hash-check-enabled", cfg.ec.ExperimentalCompactHashCheckEnabled, "Enable leader to periodically check followers compaction hashes.")
fs.DurationVar(&cfg.ec.ExperimentalCompactHashCheckTime, "experimental-compact-hash-check-time", cfg.ec.ExperimentalCompactHashCheckTime, "Duration of time between leader checks followers compaction hashes.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7

View File

@ -21,7 +21,9 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strings"
"sync"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
@ -32,10 +34,19 @@ import (
"go.uber.org/zap"
)
type corruptionMonitor struct {
type CorruptionChecker interface {
InitialCheck() error
PeriodicCheck() error
CompactHashCheck()
}
type corruptionChecker struct {
lg *zap.Logger
hasher Hasher
mux sync.RWMutex
latestRevisionChecked int64
}
type Hasher interface {
@ -47,10 +58,10 @@ type Hasher interface {
TriggerCorruptAlarm(uint64)
}
func NewCorruptionMonitor(lg *zap.Logger, s *EtcdServer) *corruptionMonitor {
return &corruptionMonitor{
func newCorruptionChecker(lg *zap.Logger, s *EtcdServer, storage mvcc.HashStorage) *corruptionChecker {
return &corruptionChecker{
lg: lg,
hasher: hasherAdapter{s, s.KV().HashStorage()},
hasher: hasherAdapter{s, storage},
}
}
@ -74,7 +85,7 @@ func (h hasherAdapter) TriggerCorruptAlarm(memberID uint64) {
// InitialCheck compares initial hash values with its peers
// before serving any peer/client traffic. Only mismatch when hashes
// are different at requested revision, with same compact revision.
func (cm *corruptionMonitor) InitialCheck() error {
func (cm *corruptionChecker) InitialCheck() error {
cm.lg.Info(
"starting initial corruption check",
@ -153,7 +164,7 @@ func (cm *corruptionMonitor) InitialCheck() error {
return nil
}
func (cm *corruptionMonitor) periodicCheck() error {
func (cm *corruptionChecker) PeriodicCheck() error {
h, rev, err := cm.hasher.HashByRev(0)
if err != nil {
return err
@ -241,6 +252,84 @@ func (cm *corruptionMonitor) periodicCheck() error {
return nil
}
func (cm *corruptionChecker) CompactHashCheck() {
cm.lg.Info("starting compact hash check",
zap.String("local-member-id", cm.hasher.MemberId().String()),
zap.Duration("timeout", cm.hasher.ReqTimeout()),
)
hashes := cm.uncheckedRevisions()
// Assume that revisions are ordered from largest to smallest
for i, hash := range hashes {
peers := cm.hasher.PeerHashByRev(hash.Revision)
if len(peers) == 0 {
continue
}
peersChecked := 0
for _, p := range peers {
if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision {
continue
}
// follower's compact revision is leader's old one, then hashes must match
if p.resp.Hash != hash.Hash {
cm.hasher.TriggerCorruptAlarm(uint64(p.id))
cm.lg.Error("failed compaction hash check",
zap.Int64("revision", hash.Revision),
zap.Int64("leader-compact-revision", hash.CompactRevision),
zap.Uint32("leader-hash", hash.Hash),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", p.id.String()),
)
return
}
peersChecked++
cm.lg.Info("successfully checked hash on follower",
zap.Int64("revision", hash.Revision),
zap.String("peer-id", p.id.String()),
)
}
if len(peers) == peersChecked {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int64("revision", hash.Revision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
}
cm.mux.Unlock()
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
return
}
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", len(peers)),
zap.Int64("revision", hash.Revision),
)
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
}
func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash {
cm.mux.RLock()
lastRevisionChecked := cm.latestRevisionChecked
cm.mux.RUnlock()
hashes := cm.hasher.Hashes()
// Sort in descending order
sort.Slice(hashes, func(i, j int) bool {
return hashes[i].Revision > hashes[j].Revision
})
for i, hash := range hashes {
if hash.Revision <= lastRevisionChecked {
return hashes[:i]
}
}
return hashes
}
func (s *EtcdServer) triggerCorruptAlarm(id uint64) {
a := &pb.AlarmRequest{
MemberID: id,

View File

@ -88,7 +88,7 @@ func TestInitialCheck(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{
monitor := corruptionChecker{
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
@ -205,11 +205,11 @@ func TestPeriodicCheck(t *testing.T) {
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionMonitor{
monitor := corruptionChecker{
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
err := monitor.periodicCheck()
err := monitor.PeriodicCheck()
if gotError := err != nil; gotError != tc.expectError {
t.Errorf("Unexpected error, got: %v, expected?: %v", err, tc.expectError)
}
@ -221,11 +221,101 @@ func TestPeriodicCheck(t *testing.T) {
}
}
func TestCompactHashCheck(t *testing.T) {
tcs := []struct {
name string
hasher fakeHasher
lastRevisionChecked int64
expectError bool
expectCorrupt bool
expectActions []string
expectLastRevisionChecked int64
}{
{
name: "No hashes",
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()"},
},
{
name: "No peers, check new checked from largest to smallest",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}, {Revision: 3}, {Revision: 4}},
},
lastRevisionChecked: 2,
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(4)", "PeerHashByRev(3)"},
expectLastRevisionChecked: 2,
},
{
name: "Peer error",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
},
{
name: "Peer returned different compaction revision is skipped",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
},
{
name: "Peer returned same compaction revision but different hash triggers alarm",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"},
expectCorrupt: true,
},
{
name: "Peer returned same hash bumps last revision checked",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"},
expectLastRevisionChecked: 2,
},
{
name: "Only one peer succeeded check",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{
{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}},
{err: fmt.Errorf("failed getting hash")},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
monitor := corruptionChecker{
latestRevisionChecked: tc.lastRevisionChecked,
lg: zaptest.NewLogger(t),
hasher: &tc.hasher,
}
monitor.CompactHashCheck()
if tc.hasher.alarmTriggered != tc.expectCorrupt {
t.Errorf("Unexpected corrupt triggered, got: %v, expected?: %v", tc.hasher.alarmTriggered, tc.expectCorrupt)
}
if tc.expectLastRevisionChecked != monitor.latestRevisionChecked {
t.Errorf("Unexpected last revision checked, got: %v, expected?: %v", monitor.latestRevisionChecked, tc.expectLastRevisionChecked)
}
assert.Equal(t, tc.expectActions, tc.hasher.actions)
})
}
}
type fakeHasher struct {
peerHashes []*peerHashKVResp
hashByRevIndex int
hashByRevResponses []hashByRev
linearizableReadNotify error
hashes []mvcc.KeyValueHash
alarmTriggered bool
actions []string
@ -251,8 +341,14 @@ func (f *fakeHasher) HashByRev(rev int64) (hash mvcc.KeyValueHash, revision int6
return hashByRev.hash, hashByRev.revision, hashByRev.err
}
func (f *fakeHasher) Store(valueHash mvcc.KeyValueHash) {
panic("not implemented")
func (f *fakeHasher) Store(hash mvcc.KeyValueHash) {
f.actions = append(f.actions, fmt.Sprintf("Store(%v)", hash))
f.hashes = append(f.hashes, hash)
}
func (f *fakeHasher) Hashes() []mvcc.KeyValueHash {
f.actions = append(f.actions, "Hashes()")
return f.hashes
}
func (f *fakeHasher) ReqTimeout() time.Duration {

View File

@ -295,7 +295,8 @@ type EtcdServer struct {
*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
forceSnapshot bool
forceSnapshot bool
corruptionChecker CorruptionChecker
}
// NewServer creates a new EtcdServer from the supplied configuration. The
@ -371,6 +372,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
@ -530,6 +532,7 @@ func (s *EtcdServer) Start() {
s.GoAttach(s.monitorStorageVersion)
s.GoAttach(s.linearizableReadLoop)
s.GoAttach(s.monitorKVHash)
s.GoAttach(s.monitorCompactHash)
s.GoAttach(s.monitorDowngrade)
}
@ -2199,7 +2202,6 @@ func (s *EtcdServer) monitorKVHash() {
zap.String("local-member-id", s.MemberId().String()),
zap.Duration("interval", t),
)
monitor := NewCorruptionMonitor(lg, s)
for {
select {
case <-s.stopping:
@ -2209,12 +2211,30 @@ func (s *EtcdServer) monitorKVHash() {
if !s.isLeader() {
continue
}
if err := monitor.periodicCheck(); err != nil {
if err := s.corruptionChecker.PeriodicCheck(); err != nil {
lg.Warn("failed to check hash KV", zap.Error(err))
}
}
}
func (s *EtcdServer) monitorCompactHash() {
if !s.Cfg.CompactHashCheckEnabled {
return
}
t := s.Cfg.CompactHashCheckTime
for {
select {
case <-time.After(t):
case <-s.stopping:
return
}
if !s.isLeader() {
continue
}
s.corruptionChecker.CompactHashCheck()
}
}
func (s *EtcdServer) updateClusterVersionV2(ver string) {
lg := s.Logger()
@ -2416,3 +2436,7 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
}
}
}
func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
return s.corruptionChecker
}

View File

@ -93,6 +93,9 @@ type HashStorage interface {
// Store adds hash value in local cache, allowing it can be returned by HashByRev.
Store(valueHash KeyValueHash)
// Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes.
Hashes() []KeyValueHash
}
type hashStorage struct {
@ -146,3 +149,14 @@ func (s *hashStorage) Store(hash KeyValueHash) {
s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:]
}
}
func (s *hashStorage) Hashes() []KeyValueHash {
s.hashMu.RLock()
// Copy out hashes under lock just to be safe
hashes := make([]KeyValueHash, 0, len(s.hashes))
for _, hash := range s.hashes {
hashes = append(hashes, hash)
}
s.hashMu.RUnlock()
return hashes
}

View File

@ -16,10 +16,14 @@ package testutil
import (
"context"
"errors"
"fmt"
"os"
"testing"
"github.com/stretchr/testify/assert"
"go.etcd.io/bbolt"
"go.etcd.io/etcd/api/v3/mvccpb"
)
const (
@ -103,3 +107,40 @@ func PickKey(i int64) string {
panic("Can't count")
}
}
func CorruptBBolt(fpath string) error {
db, derr := bbolt.Open(fpath, os.ModePerm, &bbolt.Options{})
if derr != nil {
return derr
}
defer db.Close()
return db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return errors.New("got nil bucket for 'key'")
}
keys, vals := [][]byte{}, [][]byte{}
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
keys = append(keys, k)
var kv mvccpb.KeyValue
if uerr := kv.Unmarshal(v); uerr != nil {
return uerr
}
kv.Key[0]++
kv.Value[0]++
v2, v2err := kv.Marshal()
if v2err != nil {
return v2err
}
vals = append(vals, v2)
}
for i := range keys {
if perr := b.Put(keys[i], vals[i]); perr != nil {
return perr
}
}
return nil
})
}

182
tests/e2e/corrupt_test.go Normal file
View File

@ -0,0 +1,182 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
func TestEtcdCorruptHash(t *testing.T) {
// oldenv := os.Getenv("EXPECT_DEBUG")
// defer os.Setenv("EXPECT_DEBUG", oldenv)
// os.Setenv("EXPECT_DEBUG", "1")
cfg := e2e.NewConfigNoTLS()
// trigger snapshot so that restart member can load peers from disk
cfg.SnapshotCount = 3
testCtl(t, corruptTest, withQuorum(),
withCfg(*cfg),
withInitialCorruptCheck(),
withCorruptFunc(testutil.CorruptBBolt),
)
}
func corruptTest(cx ctlCtx) {
cx.t.Log("putting 10 keys...")
for i := 0; i < 10; i++ {
if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil {
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
cx.t.Fatalf("putTest ctlV3Put error (%v)", err)
}
}
}
// enough time for all nodes sync on the same data
cx.t.Log("sleeping 3sec to let nodes sync...")
time.Sleep(3 * time.Second)
cx.t.Log("connecting clientv3...")
eps := cx.epc.EndpointsV3()
cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cli1.Close()
sresp, err := cli1.Status(context.TODO(), eps[0])
cx.t.Logf("checked status sresp:%v err:%v", sresp, err)
if err != nil {
cx.t.Fatal(err)
}
id0 := sresp.Header.GetMemberId()
cx.t.Log("stopping etcd[0]...")
cx.epc.Procs[0].Stop()
// corrupting first member by modifying backend offline.
fp := datadir.ToBackendFileName(cx.epc.Procs[0].Config().DataDirPath)
cx.t.Logf("corrupting backend: %v", fp)
if err = cx.corruptFunc(fp); err != nil {
cx.t.Fatal(err)
}
cx.t.Log("restarting etcd[0]")
ep := cx.epc.Procs[0]
proc, err := e2e.SpawnCmd(append([]string{ep.Config().ExecPath}, ep.Config().Args...), cx.envMap)
if err != nil {
cx.t.Fatal(err)
}
defer proc.Stop()
cx.t.Log("waiting for etcd[0] failure...")
// restarting corrupted member should fail
e2e.WaitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
}
func TestPeriodicCheckDetectsCorruption(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
CorruptCheckTime: time.Second,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})
cc := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}
epc.Procs[0].Stop()
err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath))
assert.NoError(t, err)
err = epc.Procs[0].Restart()
assert.NoError(t, err)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList()
assert.NoError(t, err, "error on alarm list")
// TODO: Investigate why MemberID is 0?
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
}
func TestCompactHashCheckDetectCorruption(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
KeepDataDir: true,
CompactHashCheckEnabled: true,
CompactHashCheckTime: checkTime,
})
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})
cc := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}
members, err := cc.MemberList()
assert.NoError(t, err, "error on member list")
var memberID uint64
for _, m := range members.Members {
if m.Name == epc.Procs[0].Config().Name {
memberID = m.ID
}
}
epc.Procs[0].Stop()
err = testutil.CorruptBBolt(datadir.ToBackendFileName(epc.Procs[0].Config().DataDirPath))
assert.NoError(t, err)
err = epc.Procs[0].Restart()
assert.NoError(t, err)
_, err = cc.Compact(5, config.CompactOption{})
assert.NoError(t, err)
time.Sleep(checkTime * 11 / 10)
alarmResponse, err := cc.AlarmList()
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
}

View File

@ -1,137 +0,0 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"errors"
"fmt"
"os"
"testing"
"time"
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
// TODO: test with embedded etcd in integration package
func TestEtcdCorruptHash(t *testing.T) {
// oldenv := os.Getenv("EXPECT_DEBUG")
// defer os.Setenv("EXPECT_DEBUG", oldenv)
// os.Setenv("EXPECT_DEBUG", "1")
cfg := e2e.NewConfigNoTLS()
// trigger snapshot so that restart member can load peers from disk
cfg.SnapshotCount = 3
testCtl(t, corruptTest, withQuorum(),
withCfg(*cfg),
withInitialCorruptCheck(),
withCorruptFunc(corruptHash),
)
}
func corruptTest(cx ctlCtx) {
cx.t.Log("putting 10 keys...")
for i := 0; i < 10; i++ {
if err := ctlV3Put(cx, fmt.Sprintf("foo%05d", i), fmt.Sprintf("v%05d", i), ""); err != nil {
if cx.dialTimeout > 0 && !isGRPCTimedout(err) {
cx.t.Fatalf("putTest ctlV3Put error (%v)", err)
}
}
}
// enough time for all nodes sync on the same data
cx.t.Log("sleeping 3sec to let nodes sync...")
time.Sleep(3 * time.Second)
cx.t.Log("connecting clientv3...")
eps := cx.epc.EndpointsV3()
cli1, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[1]}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cli1.Close()
sresp, err := cli1.Status(context.TODO(), eps[0])
cx.t.Logf("checked status sresp:%v err:%v", sresp, err)
if err != nil {
cx.t.Fatal(err)
}
id0 := sresp.Header.GetMemberId()
cx.t.Log("stopping etcd[0]...")
cx.epc.Procs[0].Stop()
// corrupting first member by modifying backend offline.
fp := datadir.ToBackendFileName(cx.epc.Procs[0].Config().DataDirPath)
cx.t.Logf("corrupting backend: %v", fp)
if err = cx.corruptFunc(fp); err != nil {
cx.t.Fatal(err)
}
cx.t.Log("restarting etcd[0]")
ep := cx.epc.Procs[0]
proc, err := e2e.SpawnCmd(append([]string{ep.Config().ExecPath}, ep.Config().Args...), cx.envMap)
if err != nil {
cx.t.Fatal(err)
}
defer proc.Stop()
cx.t.Log("waiting for etcd[0] failure...")
// restarting corrupted member should fail
e2e.WaitReadyExpectProc(proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)})
}
func corruptHash(fpath string) error {
db, derr := bolt.Open(fpath, os.ModePerm, &bolt.Options{})
if derr != nil {
return derr
}
defer db.Close()
return db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return errors.New("got nil bucket for 'key'")
}
keys, vals := [][]byte{}, [][]byte{}
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
keys = append(keys, k)
var kv mvccpb.KeyValue
if uerr := kv.Unmarshal(v); uerr != nil {
return uerr
}
kv.Key[0]++
kv.Value[0]++
v2, v2err := kv.Marshal()
if v2err != nil {
return v2err
}
vals = append(vals, v2)
}
for i := range keys {
if perr := b.Put(keys[i], vals[i]); perr != nil {
return perr
}
}
return nil
})
}

View File

@ -177,7 +177,10 @@ type EtcdProcessClusterConfig struct {
DiscoveryToken string
LogLevel string
MaxConcurrentStreams uint32 // default is math.MaxUint32
MaxConcurrentStreams uint32 // default is math.MaxUint32
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
}
// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
@ -347,6 +350,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams))
}
if cfg.CorruptCheckTime != 0 {
args = append(args, "--experimental-corrupt-check-time", fmt.Sprintf("%s", cfg.CorruptCheckTime))
}
if cfg.CompactHashCheckEnabled {
args = append(args, "--experimental-compact-hash-check-enabled")
}
if cfg.CompactHashCheckTime != 0 {
args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String())
}
etcdCfgs[i] = &EtcdServerProcessConfig{
lg: lg,
ExecPath: cfg.ExecPath,

View File

@ -28,7 +28,6 @@ require (
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.2
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/api/v3 v3.6.0-alpha.0
go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0
go.etcd.io/etcd/client/v2 v2.306.0-alpha.0
@ -80,6 +79,7 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect

View File

@ -0,0 +1,175 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/mvcc/testutil"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
func TestPeriodicCheck(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cc, err := clus.ClusterClient()
require.NoError(t, err)
ctx := context.Background()
var totalRevisions int64 = 1210
var rev int64
for ; rev < totalRevisions; rev += testutil.CompactionCycle {
testPeriodicCheck(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle)
}
testPeriodicCheck(ctx, t, cc, clus, rev, rev+totalRevisions)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember(nil), alarmResponse.Alarms)
}
func testPeriodicCheck(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *integration.Cluster, start, stop int64) {
for i := start; i <= stop; i++ {
if i%67 == 0 {
_, err := cc.Delete(ctx, testutil.PickKey(i+83))
assert.NoError(t, err, "error on delete")
} else {
_, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
}
err := clus.Members[0].Server.CorruptionChecker().PeriodicCheck()
assert.NoError(t, err, "error on periodic check (rev %v)", stop)
}
func TestPeriodicCheckDetectsCorruption(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cc, err := clus.ClusterClient()
require.NoError(t, err)
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
err = clus.Members[0].Server.CorruptionChecker().PeriodicCheck()
assert.NoError(t, err, "error on periodic check")
clus.Members[0].Stop(t)
clus.WaitLeader(t)
err = testutil.CorruptBBolt(clus.Members[0].BackendPath())
assert.NoError(t, err)
err = clus.Members[0].Restart(t)
assert.NoError(t, err)
time.Sleep(50 * time.Millisecond)
leader := clus.WaitLeader(t)
err = clus.Members[leader].Server.CorruptionChecker().PeriodicCheck()
assert.NoError(t, err, "error on periodic check")
time.Sleep(50 * time.Millisecond)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
// TODO: Investigate why MemberID is 0?
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
}
func TestCompactHashCheck(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cc, err := clus.ClusterClient()
require.NoError(t, err)
ctx := context.Background()
var totalRevisions int64 = 1210
var rev int64
for ; rev < totalRevisions; rev += testutil.CompactionCycle {
testCompactionHash(ctx, t, cc, clus, rev, rev+testutil.CompactionCycle)
}
testCompactionHash(ctx, t, cc, clus, rev, rev+totalRevisions)
}
func testCompactionHash(ctx context.Context, t *testing.T, cc *clientv3.Client, clus *integration.Cluster, start, stop int64) {
for i := start; i <= stop; i++ {
if i%67 == 0 {
_, err := cc.Delete(ctx, testutil.PickKey(i+83))
assert.NoError(t, err, "error on delete")
} else {
_, err := cc.Put(ctx, testutil.PickKey(i), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
}
_, err := cc.Compact(ctx, stop)
assert.NoError(t, err, "error on compact (rev %v)", stop)
// Wait for compaction to be compacted
time.Sleep(50 * time.Millisecond)
clus.Members[0].Server.CorruptionChecker().CompactHashCheck()
}
func TestCompactHashCheckDetectCorruption(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cc, err := clus.ClusterClient()
require.NoError(t, err)
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
clus.Members[0].Server.CorruptionChecker().CompactHashCheck()
clus.Members[0].Stop(t)
clus.WaitLeader(t)
err = testutil.CorruptBBolt(clus.Members[0].BackendPath())
assert.NoError(t, err)
err = clus.Members[0].Restart(t)
assert.NoError(t, err)
_, err = cc.Compact(ctx, 5)
assert.NoError(t, err)
time.Sleep(50 * time.Millisecond)
leader := clus.WaitLeader(t)
clus.Members[leader].Server.CorruptionChecker().CompactHashCheck()
time.Sleep(50 * time.Millisecond)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms)
}