tests: Add support for lazyfs

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2022-11-04 15:44:01 +01:00
parent 6981e4820f
commit eb32d9cccc
15 changed files with 353 additions and 115 deletions

View File

@ -12,14 +12,14 @@ jobs:
uses: ./.github/workflows/robustness-template.yaml uses: ./.github/workflows/robustness-template.yaml
with: with:
etcdBranch: main etcdBranch: main
count: 100 count: 80
testTimeout: 200m testTimeout: 200m
artifactName: main artifactName: main
main-arm64: main-arm64:
uses: ./.github/workflows/robustness-template-arm64.yaml uses: ./.github/workflows/robustness-template-arm64.yaml
with: with:
etcdBranch: main etcdBranch: main
count: 100 count: 80
testTimeout: 200m testTimeout: 200m
artifactName: main-arm64 artifactName: main-arm64
runs-on: "['self-hosted', 'Linux', 'ARM64']" runs-on: "['self-hosted', 'Linux', 'ARM64']"
@ -27,7 +27,7 @@ jobs:
uses: ./.github/workflows/robustness-template.yaml uses: ./.github/workflows/robustness-template.yaml
with: with:
etcdBranch: release-3.5 etcdBranch: release-3.5
count: 100 count: 80
testTimeout: 200m testTimeout: 200m
artifactName: release-35 artifactName: release-35
release-35-arm64: release-35-arm64:
@ -41,6 +41,6 @@ jobs:
uses: ./.github/workflows/robustness-template.yaml uses: ./.github/workflows/robustness-template.yaml
with: with:
etcdBranch: release-3.4 etcdBranch: release-3.4
count: 100 count: 80
testTimeout: 200m testTimeout: 200m
artifactName: release-34 artifactName: release-34

View File

@ -39,6 +39,10 @@ jobs:
set -euo pipefail set -euo pipefail
go clean -testcache go clean -testcache
# Build LazyFS
sudo apt-get -y install cmake libfuse3-dev libfuse3-3 fuse3
make install-lazyfs
# Use --failfast to avoid overriding report generated by failed test # Use --failfast to avoid overriding report generated by failed test
GO_TEST_FLAGS="-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestRobustness" GO_TEST_FLAGS="-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestRobustness"
case "${ETCD_BRANCH}" in case "${ETCD_BRANCH}" in

View File

@ -7,6 +7,6 @@ jobs:
uses: ./.github/workflows/robustness-template.yaml uses: ./.github/workflows/robustness-template.yaml
with: with:
etcdBranch: main etcdBranch: main
count: 15 count: 12
testTimeout: 30m testTimeout: 30m
artifactName: main artifactName: main

View File

@ -149,6 +149,19 @@ ifeq (, $(shell which yamlfmt))
endif endif
yamlfmt -conf tools/.yamlfmt . yamlfmt -conf tools/.yamlfmt .
# Tools
.PHONY: install-lazyfs
install-lazyfs: bin/lazyfs
bin/lazyfs:
rm /tmp/lazyfs -rf
git clone --depth 1 --branch 0.2.0 https://github.com/dsrhaslab/lazyfs /tmp/lazyfs
cd /tmp/lazyfs/libs/libpcache; ./build.sh
cd /tmp/lazyfs/lazyfs; ./build.sh
mkdir -p ./bin
cp /tmp/lazyfs/lazyfs/build/lazyfs ./bin/lazyfs
# Cleanup # Cleanup
clean: clean:
@ -156,6 +169,7 @@ clean:
rm -rf ./covdir rm -rf ./covdir
rm -f ./bin/Dockerfile-release rm -f ./bin/Dockerfile-release
rm -rf ./bin/etcd* rm -rf ./bin/etcd*
rm -rf ./bin/lazyfs
rm -rf ./default.etcd rm -rf ./default.etcd
rm -rf ./tests/e2e/default.etcd rm -rf ./tests/e2e/default.etcd
rm -rf ./release rm -rf ./release

View File

@ -185,6 +185,7 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckEnabled bool CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration CompactHashCheckTime time.Duration
GoFailEnabled bool GoFailEnabled bool
LazyFSEnabled bool
CompactionBatchLimit int CompactionBatchLimit int
CompactionSleepInterval time.Duration CompactionSleepInterval time.Duration
@ -344,6 +345,10 @@ func WithGoFailEnabled(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.GoFailEnabled = enabled } return func(c *EtcdProcessClusterConfig) { c.GoFailEnabled = enabled }
} }
func WithLazyFSEnabled(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.LazyFSEnabled = enabled }
}
func WithWarningUnaryRequestDuration(time time.Duration) EPClusterOption { func WithWarningUnaryRequestDuration(time time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.WarningUnaryRequestDuration = time } return func(c *EtcdProcessClusterConfig) { c.WarningUnaryRequestDuration = time }
} }
@ -407,7 +412,7 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
// launch etcd processes // launch etcd processes
for i := range etcdCfgs { for i := range etcdCfgs {
proc, err := NewEtcdProcess(etcdCfgs[i]) proc, err := NewEtcdProcess(t, etcdCfgs[i])
if err != nil { if err != nil {
epc.Close() epc.Close()
return nil, fmt.Errorf("cannot configure: %v", err) return nil, fmt.Errorf("cannot configure: %v", err)
@ -659,6 +664,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
InitialToken: cfg.InitialToken, InitialToken: cfg.InitialToken,
GoFailPort: gofailPort, GoFailPort: gofailPort,
Proxy: proxyCfg, Proxy: proxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
} }
} }
@ -826,7 +832,7 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
// Then start process // Then start process
tb.Log("start new member") tb.Log("start new member")
proc, err := NewEtcdProcess(serverCfg) proc, err := NewEtcdProcess(tb, serverCfg)
if err != nil { if err != nil {
epc.Close() epc.Close()
return 0, fmt.Errorf("cannot configure: %v", err) return 0, fmt.Errorf("cannot configure: %v", err)
@ -855,7 +861,7 @@ func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...E
} }
epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new") epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new")
proc, err := NewEtcdProcess(serverCfg) proc, err := NewEtcdProcess(tb, serverCfg)
if err != nil { if err != nil {
return err return err
} }

View File

@ -16,6 +16,8 @@
package e2e package e2e
func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) { import "testing"
return NewEtcdServerProcess(cfg)
func NewEtcdProcess(t testing.TB, cfg *EtcdServerProcessConfig) (EtcdProcess, error) {
return NewEtcdServerProcess(t, cfg)
} }

View File

@ -24,6 +24,7 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"testing"
"go.uber.org/zap" "go.uber.org/zap"
@ -38,12 +39,12 @@ type proxyEtcdProcess struct {
proxyV3 *proxyV3Proc proxyV3 *proxyV3Proc
} }
func NewEtcdProcess(cfg *EtcdServerProcessConfig) (EtcdProcess, error) { func NewEtcdProcess(t testing.TB, cfg *EtcdServerProcessConfig) (EtcdProcess, error) {
return NewProxyEtcdProcess(cfg) return NewProxyEtcdProcess(t, cfg)
} }
func NewProxyEtcdProcess(cfg *EtcdServerProcessConfig) (*proxyEtcdProcess, error) { func NewProxyEtcdProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*proxyEtcdProcess, error) {
ep, err := NewEtcdServerProcess(cfg) ep, err := NewEtcdServerProcess(t, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -56,6 +56,7 @@ type EtcdProcess interface {
Config() *EtcdServerProcessConfig Config() *EtcdServerProcessConfig
PeerProxy() proxy.Server PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints Failpoints() *BinaryFailpoints
LazyFS() *LazyFS
Logs() LogsExpect Logs() LogsExpect
Kill() error Kill() error
} }
@ -70,6 +71,7 @@ type EtcdServerProcess struct {
cfg *EtcdServerProcessConfig cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess proc *expect.ExpectProcess
proxy proxy.Server proxy proxy.Server
lazyfs *LazyFS
failpoints *BinaryFailpoints failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates donec chan struct{} // closed when Interact() terminates
} }
@ -96,10 +98,11 @@ type EtcdServerProcessConfig struct {
InitialCluster string InitialCluster string
GoFailPort int GoFailPort int
LazyFSEnabled bool
Proxy *proxy.ServerConfig Proxy *proxy.ServerConfig
} }
func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
if !fileutil.Exist(cfg.ExecPath) { if !fileutil.Exist(cfg.ExecPath) {
return nil, fmt.Errorf("could not find etcd binary: %s", cfg.ExecPath) return nil, fmt.Errorf("could not find etcd binary: %s", cfg.ExecPath)
} }
@ -107,11 +110,17 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err
if err := os.RemoveAll(cfg.DataDirPath); err != nil { if err := os.RemoveAll(cfg.DataDirPath); err != nil {
return nil, err return nil, err
} }
if err := os.Mkdir(cfg.DataDirPath, 0700); err != nil {
return nil, err
}
} }
ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})} ep := &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}
if cfg.GoFailPort != 0 { if cfg.GoFailPort != 0 {
ep.failpoints = &BinaryFailpoints{member: ep} ep.failpoints = &BinaryFailpoints{member: ep}
} }
if cfg.LazyFSEnabled {
ep.lazyfs = newLazyFS(cfg.lg, cfg.DataDirPath, t)
}
return ep, nil return ep, nil
} }
@ -146,6 +155,14 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error {
return err return err
} }
} }
if ep.lazyfs != nil {
ep.cfg.lg.Info("starting lazyfs...", zap.String("name", ep.cfg.Name))
err := ep.lazyfs.Start(ctx)
if err != nil {
return err
}
}
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name))
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name) proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name)
if err != nil { if err != nil {
@ -205,6 +222,14 @@ func (ep *EtcdServerProcess) Stop() (err error) {
return err return err
} }
} }
if ep.lazyfs != nil {
ep.cfg.lg.Info("stopping lazyfs...", zap.String("name", ep.cfg.Name))
err = ep.lazyfs.Stop()
ep.lazyfs = nil
if err != nil {
return err
}
}
return nil return nil
} }
@ -298,6 +323,10 @@ func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
return ep.proxy return ep.proxy
} }
func (ep *EtcdServerProcess) LazyFS() *LazyFS {
return ep.lazyfs
}
func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints { func (ep *EtcdServerProcess) Failpoints() *BinaryFailpoints {
return ep.failpoints return ep.failpoints
} }

View File

@ -48,6 +48,18 @@ type binPath struct {
EtcdLastRelease string EtcdLastRelease string
Etcdctl string Etcdctl string
Etcdutl string Etcdutl string
LazyFS string
}
func (bp *binPath) LazyFSAvailable() bool {
_, err := os.Stat(bp.LazyFS)
if err != nil {
if !os.IsNotExist(err) {
panic(err)
}
return false
}
return true
} }
func InitFlags() { func InitFlags() {
@ -65,6 +77,7 @@ func InitFlags() {
EtcdLastRelease: *binDir + "/etcd-last-release", EtcdLastRelease: *binDir + "/etcd-last-release",
Etcdctl: *binDir + "/etcdctl", Etcdctl: *binDir + "/etcdctl",
Etcdutl: *binDir + "/etcdutl", Etcdutl: *binDir + "/etcdutl",
LazyFS: *binDir + "/lazyfs",
} }
CertPath = CertDir + "/server.crt" CertPath = CertDir + "/server.crt"
PrivateKeyPath = CertDir + "/server.key.insecure" PrivateKeyPath = CertDir + "/server.key.insecure"

View File

@ -0,0 +1,114 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"context"
"fmt"
"os"
"path/filepath"
"go.uber.org/zap"
"go.etcd.io/etcd/pkg/v3/expect"
)
func newLazyFS(lg *zap.Logger, dataDir string, tmp TempDirProvider) *LazyFS {
return &LazyFS{
lg: lg,
DataDir: dataDir,
LazyFSDir: tmp.TempDir(),
}
}
type TempDirProvider interface {
TempDir() string
}
type LazyFS struct {
lg *zap.Logger
DataDir string
LazyFSDir string
ep *expect.ExpectProcess
}
func (fs *LazyFS) Start(ctx context.Context) (err error) {
if fs.ep != nil {
return nil
}
err = os.WriteFile(fs.configPath(), fs.config(), 0666)
if err != nil {
return err
}
dataPath := filepath.Join(fs.LazyFSDir, "data")
err = os.Mkdir(dataPath, 0700)
if err != nil {
return err
}
flags := []string{fs.DataDir, "--config-path", fs.configPath(), "-o", "modules=subdir", "-o", "subdir=" + dataPath, "-f"}
fs.lg.Info("Started lazyfs", zap.Strings("flags", flags))
fs.ep, err = expect.NewExpect(BinPath.LazyFS, flags...)
if err != nil {
return err
}
_, err = fs.ep.ExpectWithContext(ctx, "waiting for fault commands")
return err
}
func (fs *LazyFS) configPath() string {
return filepath.Join(fs.LazyFSDir, "config.toml")
}
func (fs *LazyFS) socketPath() string {
return filepath.Join(fs.LazyFSDir, "sock.fifo")
}
func (fs *LazyFS) config() []byte {
return []byte(fmt.Sprintf(`[faults]
fifo_path=%q
[cache]
apply_eviction=false
[cache.simple]
custom_size="1gb"
blocks_per_page=1
[filesystem]
log_all_operations=false
`, fs.socketPath()))
}
func (fs *LazyFS) Stop() error {
if fs.ep == nil {
return nil
}
defer func() { fs.ep = nil }()
err := fs.ep.Stop()
if err != nil {
return err
}
return fs.ep.Close()
}
func (fs *LazyFS) ClearCache(ctx context.Context) error {
err := os.WriteFile(fs.socketPath(), []byte("lazyfs::clear-cache\n"), 0666)
if err != nil {
return err
}
// TODO: Wait for response on socket instead of reading logs to get command completion.
// Set `fifo_path_completed` config for LazyFS to create separate socket to write when it has completed command.
_, err = fs.ep.ExpectWithContext(ctx, "cache is cleared")
return err
}

View File

@ -208,7 +208,13 @@ func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err)
} }
} }
if lazyfs := member.LazyFS(); lazyfs != nil {
lg.Info("Removing data that was not fsynced")
err := lazyfs.ClearCache(ctx)
if err != nil {
return err
}
}
err := member.Start(ctx) err := member.Start(ctx)
if err != nil { if err != nil {
return err return err
@ -278,6 +284,14 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg
lg.Info("Member exited as expected", zap.String("member", member.Config().Name)) lg.Info("Member exited as expected", zap.String("member", member.Config().Name))
} }
if lazyfs := member.LazyFS(); lazyfs != nil {
lg.Info("Removing data that was not fsynced")
err := lazyfs.ClearCache(ctx)
if err != nil {
return err
}
}
return member.Start(ctx) return member.Start(ctx)
} }

View File

@ -16,6 +16,7 @@ package robustness
import ( import (
"context" "context"
"path/filepath"
"testing" "testing"
"time" "time"
@ -34,39 +35,71 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/validate" "go.etcd.io/etcd/tests/v3/robustness/validate"
) )
type TrafficProfile struct {
Traffic traffic.Traffic
Profile traffic.Profile
}
var trafficProfiles = []TrafficProfile{
{
Traffic: traffic.EtcdPut,
Profile: traffic.HighTrafficProfile,
},
{
Traffic: traffic.EtcdPutDeleteLease,
Profile: traffic.LowTraffic,
},
{
Traffic: traffic.Kubernetes,
Profile: traffic.HighTrafficProfile,
},
{
Traffic: traffic.Kubernetes,
Profile: traffic.LowTraffic,
},
}
func TestRobustness(t *testing.T) { func TestRobustness(t *testing.T) {
testRunner.BeforeTest(t) testRunner.BeforeTest(t)
v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) v, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil { if err != nil {
t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err) t.Fatalf("Failed checking etcd version binary, binary: %q, err: %v", e2e.BinPath.Etcd, err)
} }
scenarios := []testScenario{} enableLazyFS := e2e.BinPath.LazyFSAvailable()
for _, traffic := range []traffic.Config{traffic.LowTraffic, traffic.HighTraffic, traffic.KubernetesTraffic} { baseOptions := []e2e.EPClusterOption{
scenarios = append(scenarios, testScenario{
name: traffic.Name + "/ClusterOfSize1",
traffic: traffic,
cluster: *e2e.NewConfig(
e2e.WithClusterSize(1),
e2e.WithSnapshotCount(100), e2e.WithSnapshotCount(100),
e2e.WithGoFailEnabled(true), e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints e2e.WithCompactionBatchLimit(100),
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
),
})
clusterOfSize3Options := []e2e.EPClusterOption{
e2e.WithIsPeerTLS(true),
e2e.WithSnapshotCount(100),
e2e.WithPeerProxy(true),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond), e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
} }
scenarios := []testScenario{}
for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1")
clusterOfSize1Options := baseOptions
clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1))
// Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS.
if enableLazyFS && tp.Profile.MinimalQPS <= 100 {
clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true))
name = filepath.Join(name, "LazyFS")
}
scenarios = append(scenarios, testScenario{
name: name,
traffic: tp.Traffic,
cluster: *e2e.NewConfig(clusterOfSize1Options...),
})
}
for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize3")
clusterOfSize3Options := baseOptions
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithIsPeerTLS(true))
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithPeerProxy(true))
if !v.LessThan(version.V3_6) { if !v.LessThan(version.V3_6) {
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100)) clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithSnapshotCatchUpEntries(100))
} }
scenarios = append(scenarios, testScenario{ scenarios = append(scenarios, testScenario{
name: traffic.Name + "/ClusterOfSize3", name: name,
traffic: traffic, traffic: tp.Traffic,
cluster: *e2e.NewConfig(clusterOfSize3Options...), cluster: *e2e.NewConfig(clusterOfSize3Options...),
}) })
} }
@ -89,7 +122,8 @@ func TestRobustness(t *testing.T) {
scenarios = append(scenarios, testScenario{ scenarios = append(scenarios, testScenario{
name: "Issue13766", name: "Issue13766",
failpoint: KillFailpoint, failpoint: KillFailpoint,
traffic: traffic.HighTraffic, profile: traffic.HighTrafficProfile,
traffic: traffic.EtcdPut,
cluster: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithSnapshotCount(100), e2e.WithSnapshotCount(100),
), ),
@ -108,7 +142,8 @@ func TestRobustness(t *testing.T) {
scenarios = append(scenarios, testScenario{ scenarios = append(scenarios, testScenario{
name: "Issue15271", name: "Issue15271",
failpoint: BlackholeUntilSnapshot, failpoint: BlackholeUntilSnapshot,
traffic: traffic.HighTraffic, profile: traffic.HighTrafficProfile,
traffic: traffic.EtcdPut,
cluster: *e2e.NewConfig( cluster: *e2e.NewConfig(
e2e.WithSnapshotCatchUpEntries(100), e2e.WithSnapshotCatchUpEntries(100),
e2e.WithSnapshotCount(100), e2e.WithSnapshotCount(100),
@ -118,8 +153,11 @@ func TestRobustness(t *testing.T) {
}) })
} }
for _, scenario := range scenarios { for _, scenario := range scenarios {
if scenario.traffic == (traffic.Config{}) { if scenario.traffic == nil {
scenario.traffic = traffic.LowTraffic scenario.traffic = traffic.EtcdPutDeleteLease
}
if scenario.profile == (traffic.Profile{}) {
scenario.profile = traffic.LowTraffic
} }
t.Run(scenario.name, func(t *testing.T) { t.Run(scenario.name, func(t *testing.T) {
@ -135,7 +173,8 @@ type testScenario struct {
name string name string
failpoint Failpoint failpoint Failpoint
cluster e2e.EtcdProcessClusterConfig cluster e2e.EtcdProcessClusterConfig
traffic traffic.Config traffic traffic.Traffic
profile traffic.Profile
watch watchConfig watch watchConfig
} }
@ -169,7 +208,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
watchProgressNotifyEnabled := report.Cluster.Cfg.WatchProcessNotifyInterval != 0 watchProgressNotifyEnabled := report.Cluster.Cfg.WatchProcessNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled) validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.Traffic.ExpectUniqueRevision()} validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client) report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client)
panicked = false panicked = false
@ -193,7 +232,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
maxRevisionChan := make(chan int64, 1) maxRevisionChan := make(chan int64, 1)
g.Go(func() error { g.Go(func() error {
defer close(maxRevisionChan) defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.traffic, finishTraffic, baseTime, ids) operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
maxRevisionChan <- operationsMaxRevision(operationReport) maxRevisionChan <- operationsMaxRevision(operationReport)
return nil return nil
}) })

View File

@ -28,13 +28,7 @@ import (
) )
var ( var (
LowTraffic = Config{ EtcdPutDeleteLease = etcdTraffic{
Name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
maxNonUniqueRequestConcurrency: 3,
Traffic: etcdTraffic{
keyCount: 10, keyCount: 10,
leaseTTL: DefaultLeaseTTL, leaseTTL: DefaultLeaseTTL,
largePutSize: 32769, largePutSize: 32769,
@ -51,15 +45,8 @@ var (
{choice: LeaseRevoke, weight: 5}, {choice: LeaseRevoke, weight: 5},
{choice: CompareAndSet, weight: 5}, {choice: CompareAndSet, weight: 5},
}, },
},
} }
HighTraffic = Config{ EtcdPut = etcdTraffic{
Name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
maxNonUniqueRequestConcurrency: 3,
Traffic: etcdTraffic{
keyCount: 10, keyCount: 10,
largePutSize: 32769, largePutSize: 32769,
leaseTTL: DefaultLeaseTTL, leaseTTL: DefaultLeaseTTL,
@ -72,7 +59,6 @@ var (
{choice: MultiOpTxn, weight: 5}, {choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5}, {choice: LargePut, weight: 5},
}, },
},
} }
) )
@ -104,6 +90,10 @@ const (
Defragment etcdRequestType = "defragment" Defragment etcdRequestType = "defragment"
) )
func (t etcdTraffic) Name() string {
return "Etcd"
}
func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
lastOperationSucceeded := true lastOperationSucceeded := true
var lastRev int64 var lastRev int64

View File

@ -31,13 +31,7 @@ import (
) )
var ( var (
KubernetesTraffic = Config{ Kubernetes = kubernetesTraffic{
Name: "Kubernetes",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
maxNonUniqueRequestConcurrency: 3,
Traffic: kubernetesTraffic{
averageKeyCount: 10, averageKeyCount: 10,
resource: "pods", resource: "pods",
namespace: "default", namespace: "default",
@ -46,7 +40,6 @@ var (
{choice: KubernetesDelete, weight: 5}, {choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5}, {choice: KubernetesCreate, weight: 5},
}, },
},
} }
) )
@ -61,6 +54,10 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool {
return true return true
} }
func (t kubernetesTraffic) Name() string {
return "Kubernetes"
}
func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
kc := &kubernetesClient{client: c} kc := &kubernetesClient{client: c}
s := newStorage() s := newStorage()

View File

@ -34,15 +34,30 @@ var (
RequestTimeout = 40 * time.Millisecond RequestTimeout = 40 * time.Millisecond
WatchTimeout = 400 * time.Millisecond WatchTimeout = 400 * time.Millisecond
MultiOpTxnOpCount = 4 MultiOpTxnOpCount = 4
LowTraffic = Profile{
Name: "LowTraffic",
MinimalQPS: 100,
MaximalQPS: 200,
ClientCount: 8,
MaxNonUniqueRequestConcurrency: 3,
}
HighTrafficProfile = Profile{
Name: "HighTraffic",
MinimalQPS: 200,
MaximalQPS: 1000,
ClientCount: 12,
MaxNonUniqueRequestConcurrency: 3,
}
) )
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, config Config, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport { func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport {
mux := sync.Mutex{} mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC() endpoints := clus.EndpointsGRPC()
lm := identity.NewLeaseIdStorage() lm := identity.NewLeaseIdStorage()
reports := []report.ClientReport{} reports := []report.ClientReport{}
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200)
startTime := time.Now() startTime := time.Now()
cc, err := NewClient(endpoints, ids, baseTime) cc, err := NewClient(endpoints, ids, baseTime)
@ -51,8 +66,8 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
} }
defer cc.Close() defer cc.Close()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
nonUniqueWriteLimiter := NewConcurrencyLimiter(config.maxNonUniqueRequestConcurrency) nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
for i := 0; i < config.clientCount; i++ { for i := 0; i < profile.ClientCount; i++ {
wg.Add(1) wg.Add(1)
c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) c, err := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime)
if err != nil { if err != nil {
@ -62,7 +77,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
defer wg.Done() defer wg.Done()
defer c.Close() defer c.Close()
config.Traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish) traffic.Run(ctx, c, limiter, ids, lm, nonUniqueWriteLimiter, finish)
mux.Lock() mux.Lock()
reports = append(reports, c.Report()) reports = append(reports, c.Report())
mux.Unlock() mux.Unlock()
@ -87,22 +102,22 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
qps := float64(operationCount) / float64(endTime.Sub(startTime)) * float64(time.Second) qps := float64(operationCount) / float64(endTime.Sub(startTime)) * float64(time.Second)
lg.Info("Average traffic", zap.Float64("qps", qps)) lg.Info("Average traffic", zap.Float64("qps", qps))
if qps < config.minimalQPS { if qps < profile.MinimalQPS {
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", config.minimalQPS, qps) t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", profile.MinimalQPS, qps)
} }
return reports return reports
} }
type Config struct { type Profile struct {
Name string Name string
minimalQPS float64 MinimalQPS float64
maximalQPS float64 MaximalQPS float64
maxNonUniqueRequestConcurrency int MaxNonUniqueRequestConcurrency int
clientCount int ClientCount int
Traffic Traffic
} }
type Traffic interface { type Traffic interface {
Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
ExpectUniqueRevision() bool ExpectUniqueRevision() bool
Name() string
} }