Merge pull request #14765 from serathius/linearizability-compact

Linearizability compact
This commit is contained in:
Benjamin Wang 2022-11-16 06:23:14 +08:00 committed by GitHub
commit 47e71d925a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 19 deletions

View File

@ -18,7 +18,7 @@ GO_BUILD_ENV=("CGO_ENABLED=0" "GO_BUILD_FLAGS=${GO_BUILD_FLAGS}" "GOOS=${GOOS}"
toggle_failpoints() {
mode="$1"
if command -v gofail >/dev/null 2>&1; then
run gofail "$mode" server/etcdserver/ server/storage/backend/
run gofail "$mode" server/etcdserver/ server/storage/backend/ server/storage/mvcc/
(
cd ./server
run go get go.etcd.io/gofail/runtime

View File

@ -216,7 +216,9 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
SetScheduledCompact(s.b.BatchTx(), rev)
// ensure that desired compaction is persisted
// gofail: var compactBeforeCommitScheduledCompact struct{}
s.b.ForceCommit()
// gofail: var compactAfterCommitScheduledCompact struct{}
s.revMu.Unlock()

View File

@ -59,8 +59,10 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
}
if len(keys) < batchNum {
// gofail: var compactBeforeSetFinishedCompact struct{}
UnsafeSetFinishedCompact(tx, compactMainRev)
tx.Unlock()
// gofail: var compactAfterSetFinishedCompact struct{}
hash := h.Hash()
s.lg.Info(
"finished scheduled compaction",
@ -75,7 +77,9 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
// update last
revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
// gofail: var compactBeforeCommitBatch struct{}
s.b.ForceCommit()
// gofail: var compactAfterCommitBatch struct{}
dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
select {

View File

@ -174,6 +174,7 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
GoFailEnabled bool
CompactionBatchLimit int
}
func DefaultConfig() *EtcdProcessClusterConfig {
@ -521,6 +522,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.CompactHashCheckTime != 0 {
args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String())
}
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}
envVars := map[string]string{}
for key, value := range cfg.EnvVars {
envVars[key] = value

View File

@ -31,25 +31,34 @@ import (
)
var (
KillFailpoint Failpoint = killFailpoint{}
DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag}
DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag}
BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil}
AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil}
RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil}
RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil}
BackendBeforePreCommitHookPanic Failpoint = goFailpoint{"backend/commitBeforePreCommitHook", "panic", nil}
BackendAfterPreCommitHookPanic Failpoint = goFailpoint{"backend/commitAfterPreCommitHook", "panic", nil}
BackendBeforeStartDBTxnPanic Failpoint = goFailpoint{"backend/beforeStartDBTxn", "panic", nil}
BackendAfterStartDBTxnPanic Failpoint = goFailpoint{"backend/afterStartDBTxn", "panic", nil}
BackendBeforeWritebackBufPanic Failpoint = goFailpoint{"backend/beforeWritebackBuf", "panic", nil}
BackendAfterWritebackBufPanic Failpoint = goFailpoint{"backend/afterWritebackBuf", "panic", nil}
RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{
KillFailpoint Failpoint = killFailpoint{}
DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag}
DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag}
BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil}
AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil}
RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil}
RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil}
BackendBeforePreCommitHookPanic Failpoint = goFailpoint{"backend/commitBeforePreCommitHook", "panic", nil}
BackendAfterPreCommitHookPanic Failpoint = goFailpoint{"backend/commitAfterPreCommitHook", "panic", nil}
BackendBeforeStartDBTxnPanic Failpoint = goFailpoint{"backend/beforeStartDBTxn", "panic", nil}
BackendAfterStartDBTxnPanic Failpoint = goFailpoint{"backend/afterStartDBTxn", "panic", nil}
BackendBeforeWritebackBufPanic Failpoint = goFailpoint{"backend/beforeWritebackBuf", "panic", nil}
BackendAfterWritebackBufPanic Failpoint = goFailpoint{"backend/afterWritebackBuf", "panic", nil}
CompactBeforeCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitScheduledCompact", "panic", triggerCompact}
CompactAfterCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitScheduledCompact", "panic", triggerCompact}
CompactBeforeSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeSetFinishedCompact", "panic", triggerCompact}
CompactAfterSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterSetFinishedCompact", "panic", triggerCompact}
CompactBeforeCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitBatch", "panic", triggerCompact}
CompactAfterCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitBatch", "panic", triggerCompact}
RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic,
BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic,
BackendBeforeWritebackBufPanic, BackendAfterWritebackBufPanic,
CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic,
CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic,
}}
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
raftBeforeLeaderSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeLeaderSend", "panic", nil}
@ -162,6 +171,28 @@ func triggerDefrag(ctx context.Context, member e2e.EtcdProcess) error {
return nil
}
func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error {
cc, err := clientv3.New(clientv3.Config{
Endpoints: member.EndpointsV3(),
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return fmt.Errorf("failed creating client: %w", err)
}
defer cc.Close()
resp, err := cc.Get(ctx, "/")
if err != nil {
return err
}
_, err = cc.Compact(ctx, resp.Header.Revision)
if err != nil && !strings.Contains(err.Error(), "error reading from server: EOF") {
return err
}
return nil
}
var httpClient = http.Client{
Timeout: 10 * time.Millisecond,
}

View File

@ -51,16 +51,18 @@ func TestLinearizability(t *testing.T) {
name: "ClusterOfSize1",
failpoint: RandomFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
GoFailEnabled: true,
ClusterSize: 1,
GoFailEnabled: true,
CompactionBatchLimit: 100, // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
},
},
{
name: "ClusterOfSize3",
failpoint: RandomFailpoint,
config: e2e.EtcdProcessClusterConfig{
ClusterSize: 3,
GoFailEnabled: true,
ClusterSize: 3,
GoFailEnabled: true,
CompactionBatchLimit: 100, // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
},
},
{