etcdserver: add e2e test to reproduce the incorrect hash issue when resuming scheduled compaction.

check ScheduledCompactKeyName and FinishedCompactKeyName
before writing hash to hashstore. If they do not match, then it means this compaction has once been interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm.

Signed-off-by: caojiamingalan <alan.c.19971111@gmail.com>
This commit is contained in:
caojiamingalan 2023-05-29 15:47:05 -05:00
parent bd5d0a5bbb
commit b9e30bf878
5 changed files with 141 additions and 6 deletions

View File

@ -268,12 +268,13 @@ func (cm *corruptionChecker) CompactHashCheck() {
)
hashes := cm.uncheckedRevisions()
// Assume that revisions are ordered from largest to smallest
for _, hash := range hashes {
for i, hash := range hashes {
peers := cm.hasher.PeerHashByRev(hash.Revision)
if len(peers) == 0 {
continue
}
if cm.checkPeerHashes(hash, peers) {
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
return
}
}

View File

@ -225,7 +225,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
}
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
@ -238,7 +248,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
}
close(ch)
})
@ -248,17 +264,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}
return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
@ -267,7 +284,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()
return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}
func (s *store) Commit() {

View File

@ -338,6 +338,8 @@ func TestStoreCompact(t *testing.T) {
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(lg, revision{1, 0}, false)
key2 := newTestKeyBytes(lg, revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
s.Compact(traceutil.TODO(), 3)
@ -349,6 +351,8 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{schema.Key, key2}},

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
@ -193,3 +194,81 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
}
func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
slowCompactionNodeIndex := 1
// Start a new cluster, with compact hash check enabled.
t.Log("creating a new cluster with 3 nodes...")
dataDirPath := t.TempDir()
cfg := e2e.NewConfig(
e2e.WithKeepDataDir(true),
e2e.WithCompactHashCheckEnabled(true),
e2e.WithCompactHashCheckTime(checkTime),
e2e.WithClusterSize(3),
e2e.WithDataDirPath(dataDirPath),
e2e.WithLogLevel("info"),
)
epc, err := e2e.InitEtcdProcessCluster(t, cfg)
require.NoError(t, err)
// Assign a node a very slow compaction speed, so that its compaction can be interrupted.
err = epc.UpdateProcOptions(slowCompactionNodeIndex, t,
e2e.WithCompactionBatchLimit(1),
e2e.WithCompactionSleepInterval(1*time.Hour),
)
require.NoError(t, err)
epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg)
require.NoError(t, err)
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})
// Put 10 identical keys to the cluster, so that the compaction will drop some stale values.
t.Log("putting 10 values to the identical key...")
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
require.NoError(t, err)
for i := 0; i < 10; i++ {
err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{})
require.NoError(t, err, "error on put")
}
t.Log("compaction started...")
_, err = cc.Compact(ctx, 5, config.CompactOption{})
err = epc.Procs[slowCompactionNodeIndex].Close()
require.NoError(t, err)
err = epc.UpdateProcOptions(slowCompactionNodeIndex, t)
require.NoError(t, err)
t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
err = epc.Procs[slowCompactionNodeIndex].Restart(ctx)
// Wait until the node finished compaction and the leader finished compaction hash check
_, err = epc.Procs[slowCompactionNodeIndex].Logs().ExpectWithContext(ctx, "finished scheduled compaction")
require.NoError(t, err, "can't get log indicating finished scheduled compaction")
leaderIndex := epc.WaitLeader(t)
_, err = epc.Procs[leaderIndex].Logs().ExpectWithContext(ctx, "finished compaction hash check")
require.NoError(t, err, "can't get log indicating finished compaction hash check")
alarmResponse, err := cc.AlarmList(ctx)
require.NoError(t, err, "error on alarm list")
for _, alarm := range alarmResponse.Alarms {
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
t.Fatal("there should be no corruption after resuming the compaction, but corruption detected")
}
}
t.Log("no corruption detected.")
}

View File

@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckTime time.Duration
GoFailEnabled bool
CompactionBatchLimit int
CompactionSleepInterval time.Duration
WarningUnaryRequestDuration time.Duration
ExperimentalWarningUnaryRequestDuration time.Duration
@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}
func WithCompactionSleepInterval(time time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time }
}
func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval }
}
@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}
if cfg.CompactionSleepInterval != 0 {
args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String())
}
if cfg.WarningUnaryRequestDuration != 0 {
args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String())
}
@ -813,6 +821,32 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
return proc.Start(ctx)
}
// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical
// to the cluster.
func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...EPClusterOption) error {
if epc.Procs[i].IsRunning() {
return fmt.Errorf("process %d is still running, please close it before updating its options", i)
}
cfg := *epc.Cfg
for _, opt := range opts {
opt(&cfg)
}
serverCfg := cfg.EtcdServerProcessConfig(tb, i)
var initialCluster []string
for _, p := range epc.Procs {
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String()))
}
epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new")
proc, err := NewEtcdProcess(serverCfg)
if err != nil {
return err
}
epc.Procs[i] = proc
return nil
}
func (epc *EtcdProcessCluster) Start(ctx context.Context) error {
return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) })
}