mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16068 from CaojiamingAlan/release-3.5
[3.5] etcdserver: backport check scheduledCompactKeyName and finishedCompac…
This commit is contained in:
commit
8f4b6c9ed2
@ -227,7 +227,27 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
|
||||
return nil, compactMainRev, nil
|
||||
}
|
||||
|
||||
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
|
||||
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
|
||||
func (s *store) checkPrevCompactionCompleted() bool {
|
||||
tx := s.b.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
|
||||
scheduledCompact := int64(0)
|
||||
if len(scheduledCompactBytes) != 0 {
|
||||
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
|
||||
}
|
||||
|
||||
_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
|
||||
finishedCompact := int64(0)
|
||||
if len(finishedCompactBytes) != 0 {
|
||||
finishedCompact = bytesToRev(finishedCompactBytes[0]).main
|
||||
|
||||
}
|
||||
return scheduledCompact == finishedCompact
|
||||
}
|
||||
|
||||
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
|
||||
ch := make(chan struct{})
|
||||
var j = func(ctx context.Context) {
|
||||
if ctx.Err() != nil {
|
||||
@ -240,7 +260,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
|
||||
s.compactBarrier(context.TODO(), ch)
|
||||
return
|
||||
}
|
||||
s.hashes.Store(hash)
|
||||
// Only store the hash value if the previous hash is completed, i.e. this compaction
|
||||
// hashes every revision from last compaction. For more details, see #15919.
|
||||
if prevCompactionCompleted {
|
||||
s.hashes.Store(hash)
|
||||
} else {
|
||||
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
|
||||
}
|
||||
close(ch)
|
||||
}
|
||||
|
||||
@ -250,17 +276,19 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
|
||||
}
|
||||
|
||||
func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
|
||||
prevCompactionCompleted := s.checkPrevCompactionCompleted()
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
if err != nil {
|
||||
return ch, err
|
||||
}
|
||||
|
||||
return s.compact(traceutil.TODO(), rev, prevCompactRev)
|
||||
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
|
||||
}
|
||||
|
||||
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
|
||||
s.mu.Lock()
|
||||
|
||||
prevCompactionCompleted := s.checkPrevCompactionCompleted()
|
||||
ch, prevCompactRev, err := s.updateCompactRev(rev)
|
||||
trace.Step("check and update compact revision")
|
||||
if err != nil {
|
||||
@ -269,7 +297,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
return s.compact(trace, rev, prevCompactRev)
|
||||
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
|
||||
}
|
||||
|
||||
func (s *store) Commit() {
|
||||
|
@ -333,6 +333,8 @@ func TestStoreCompact(t *testing.T) {
|
||||
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
|
||||
key1 := newTestKeyBytes(revision{1, 0}, false)
|
||||
key2 := newTestKeyBytes(revision{2, 0}, false)
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
|
||||
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
|
||||
|
||||
s.Compact(traceutil.TODO(), 3)
|
||||
@ -344,6 +346,8 @@ func TestStoreCompact(t *testing.T) {
|
||||
end := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(end, uint64(4))
|
||||
wact := []testutil.Action{
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}},
|
||||
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}},
|
||||
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
|
||||
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
|
||||
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
|
||||
|
@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct {
|
||||
CompactHashCheckEnabled bool
|
||||
CompactHashCheckTime time.Duration
|
||||
WatchProcessNotifyInterval time.Duration
|
||||
CompactionBatchLimit int
|
||||
}
|
||||
|
||||
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
|
||||
@ -339,6 +340,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
|
||||
if cfg.WatchProcessNotifyInterval != 0 {
|
||||
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
|
||||
}
|
||||
if cfg.CompactionBatchLimit != 0 {
|
||||
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
|
||||
}
|
||||
|
||||
etcdCfgs[i] = &etcdServerProcessConfig{
|
||||
lg: lg,
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/datadir"
|
||||
@ -180,3 +181,61 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
|
||||
assert.NoError(t, err, "error on alarm list")
|
||||
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
|
||||
}
|
||||
|
||||
func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
|
||||
checkTime := time.Second
|
||||
BeforeTest(t)
|
||||
|
||||
slowCompactionNodeIndex := 1
|
||||
|
||||
// Start a new cluster, with compact hash check enabled.
|
||||
t.Log("creating a new cluster with 3 nodes...")
|
||||
|
||||
epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
|
||||
clusterSize: 3,
|
||||
keepDataDir: true,
|
||||
CompactHashCheckEnabled: true,
|
||||
CompactHashCheckTime: checkTime,
|
||||
logLevel: "info",
|
||||
CompactionBatchLimit: 1,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
})
|
||||
|
||||
// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
|
||||
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
|
||||
t.Log("putting 200 values to the identical key...")
|
||||
cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false)
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
err = cc.Put("key", fmt.Sprint(i))
|
||||
require.NoError(t, err, "error on put")
|
||||
}
|
||||
|
||||
t.Log("compaction started...")
|
||||
_, err = cc.Compact(200)
|
||||
|
||||
t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
|
||||
err = epc.procs[slowCompactionNodeIndex].Restart()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait until the node finished compaction.
|
||||
_, err = epc.procs[slowCompactionNodeIndex].Logs().Expect("finished scheduled compaction")
|
||||
require.NoError(t, err, "can't get log indicating finished scheduled compaction")
|
||||
|
||||
// Wait for compaction hash check
|
||||
time.Sleep(checkTime * 5)
|
||||
|
||||
alarmResponse, err := cc.AlarmList()
|
||||
require.NoError(t, err, "error on alarm list")
|
||||
for _, alarm := range alarmResponse.Alarms {
|
||||
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
|
||||
t.Fatal("there should be no corruption after resuming the compaction, but corruption detected")
|
||||
}
|
||||
}
|
||||
t.Log("no corruption detected.")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user