mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: backport check scheduledCompactKeyName and finishedCompactKeyName before writing hash to release-3.5.
Fix #15919. Check ScheduledCompactKeyName and FinishedCompactKeyName before writing hash to hashstore. If they do not match, then it means this compaction has once been interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm. Signed-off-by: caojiamingalan <alan.c.19971111@gmail.com>
This commit is contained in:
parent
9ac1d7378a
commit
6ac9d94d67
@ -227,7 +227,27 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
|
||||
return nil, compactMainRev, nil
|
||||
}
|
||||
|
||||
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
|
||||
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
|
||||
func (s *store) checkPrevCompactionCompleted() bool {
|
||||
tx := s.b.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
|
||||
scheduledCompact := int64(0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
|
||||
}
|
||||
|
||||
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
|
||||
finishedCompact := int64(0)
|
||||
if len(finishedCompactBytes) != 0 {
|
||||
finishedCompact = bytesToRev(finishedCompactBytes[0]).main
|
||||
|
||||
}
|
||||
return scheduledCompact == finishedCompact
|
||||
}
|
||||
|
||||
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
|
||||
ch := make(chan struct{})
|
||||
var j = func(ctx context.Context) {
|
||||
if ctx.Err() != nil {
|
||||
@ -240,7 +260,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
|
||||
s.compactBarrier(context.TODO(), ch)
|
||||
return
|
||||
}
|
||||
s.hashes.Store(hash)
|
||||
// Only store the hash value if the previous hash is completed, i.e. this compaction
|
||||
// hashes every revision from last compaction. For more details, see #15919.
|
||||
if prevCompactionCompleted {
|
||||
s.hashes.Store(hash)
|
||||
} else {
|
||||
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
|
||||
}
|
||||
close(ch)
|
||||
}
|
||||
|
||||
@ -250,17 +276,19 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
|
||||
}
|
||||
|
||||
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
|
||||
prevCompactionCompleted := s.checkPrevCompactionCompleted()
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
if err != nil {
|
||||
return ch, err
|
||||
}
|
||||
|
||||
return s.compact(traceutil.TODO(), rev, prevCompactRev)
|
||||
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
|
||||
}
|
||||
|
||||
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||
s.mu.Lock()
|
||||
|
||||
prevCompactionCompleted := s.checkPrevCompactionCompleted()
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
trace.Step("check and update compact revision")
|
||||
if err != nil {
|
||||
@ -269,7 +297,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
return s.compact(trace, rev, prevCompactRev)
|
||||
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
|
||||
}
|
||||
|
||||
func (s *store) Commit() {
|
||||
|
@ -333,6 +333,8 @@ func TestStoreCompact(t *testing.T) {
|
||||
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
|
||||
key1 := newTestKeyBytes(revision{1, 0}, false)
|
||||
key2 := newTestKeyBytes(revision{2, 0}, false)
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
|
||||
|
||||
s.Compact(traceutil.TODO(), 3)
|
||||
@ -344,6 +346,8 @@ func TestStoreCompact(t *testing.T) {
|
||||
end := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(end, uint64(4))
|
||||
wact := []testutil.Action{
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}},
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}},
|
||||
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
|
||||
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
|
||||
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
|
||||
|
@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct {
|
||||
CompactHashCheckEnabled bool
|
||||
CompactHashCheckTime time.Duration
|
||||
WatchProcessNotifyInterval time.Duration
|
||||
CompactionBatchLimit int
|
||||
}
|
||||
|
||||
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||
@ -339,6 +340,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
|
||||
if cfg.WatchProcessNotifyInterval != 0 {
|
||||
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
|
||||
}
|
||||
if cfg.CompactionBatchLimit != 0 {
|
||||
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
|
||||
}
|
||||
|
||||
etcdCfgs[i] = &etcdServerProcessConfig{
|
||||
lg: lg,
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/datadir"
|
||||
@ -180,3 +181,61 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
|
||||
assert.NoError(t, err, "error on alarm list")
|
||||
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
|
||||
}
|
||||
|
||||
func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
|
||||
checkTime := time.Second
|
||||
BeforeTest(t)
|
||||
|
||||
slowCompactionNodeIndex := 1
|
||||
|
||||
// Start a new cluster, with compact hash check enabled.
|
||||
t.Log("creating a new cluster with 3 nodes...")
|
||||
|
||||
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
|
||||
clusterSize: 3,
|
||||
keepDataDir: true,
|
||||
CompactHashCheckEnabled: true,
|
||||
CompactHashCheckTime: checkTime,
|
||||
logLevel: "info",
|
||||
CompactionBatchLimit: 1,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
})
|
||||
|
||||
// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
|
||||
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
|
||||
t.Log("putting 200 values to the identical key...")
|
||||
cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false)
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
err = cc.Put("key", fmt.Sprint(i))
|
||||
require.NoError(t, err, "error on put")
|
||||
}
|
||||
|
||||
t.Log("compaction started...")
|
||||
_, err = cc.Compact(200)
|
||||
|
||||
t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
|
||||
err = epc.procs[slowCompactionNodeIndex].Restart()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait until the node finished compaction.
|
||||
_, err = epc.procs[slowCompactionNodeIndex].Logs().Expect("finished scheduled compaction")
|
||||
require.NoError(t, err, "can't get log indicating finished scheduled compaction")
|
||||
|
||||
// Wait for compaction hash check
|
||||
time.Sleep(checkTime * 5)
|
||||
|
||||
alarmResponse, err := cc.AlarmList()
|
||||
require.NoError(t, err, "error on alarm list")
|
||||
for _, alarm := range alarmResponse.Alarms {
|
||||
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
|
||||
t.Fatal("there should be no corruption after resuming the compaction, but corruption detected")
|
||||
}
|
||||
}
|
||||
t.Log("no corruption detected.")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user