mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
externalize snapshot catchup entries to etcd flag
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
e73f55d4e9
commit
2c46b2b299
@ -67,6 +67,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
- Add [Protection on maintenance request when auth is enabled](https://github.com/etcd-io/etcd/pull/14663).
|
||||
- Graduated [`--experimental-warning-unary-request-duration` to `--warning-unary-request-duration`](https://github.com/etcd-io/etcd/pull/14414). Note the experimental flag is deprecated and will be decommissioned in v3.7.
|
||||
- Add [field `hash_revision` into `HashKVResponse`](https://github.com/etcd-io/etcd/pull/14537).
|
||||
- Add [`etcd --experimental-snapshot-catch-up-entries`](https://github.com/etcd-io/etcd/pull/15033) flag to configure number of entries for a slow follower to catch up after compacting the the raft storage entries and defaults to 5k.
|
||||
|
||||
### etcd grpc-proxy
|
||||
|
||||
|
@ -56,7 +56,6 @@ type ServerConfig struct {
|
||||
// We expect the follower has a millisecond level latency with the leader.
|
||||
// The max throughput is around 10K. Keep a 5K entries is enough for helping
|
||||
// follower to catch up.
|
||||
// WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
|
||||
SnapshotCatchUpEntries uint64
|
||||
|
||||
MaxSnapFiles uint
|
||||
|
@ -156,9 +156,7 @@ type Config struct {
|
||||
// We expect the follower has a millisecond level latency with the leader.
|
||||
// The max throughput is around 10K. Keep a 5K entries is enough for helping
|
||||
// follower to catch up.
|
||||
// WARNING: only change this for tests.
|
||||
// Always use "DefaultSnapshotCatchUpEntries"
|
||||
SnapshotCatchUpEntries uint64
|
||||
SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"`
|
||||
|
||||
MaxSnapFiles uint `json:"max-snapshots"`
|
||||
MaxWalFiles uint `json:"max-wals"`
|
||||
|
@ -279,6 +279,7 @@ func newConfig() *config {
|
||||
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.")
|
||||
fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.")
|
||||
|
||||
// unsafe
|
||||
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
|
||||
|
@ -34,6 +34,7 @@ func TestConfigParsingMemberFlags(t *testing.T) {
|
||||
"-max-wals=10",
|
||||
"-max-snapshots=10",
|
||||
"-snapshot-count=10",
|
||||
"-experimental-snapshot-catchup-entries=1000",
|
||||
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
|
||||
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
|
||||
// it should be set if -listen-client-urls is set
|
||||
@ -51,20 +52,22 @@ func TestConfigParsingMemberFlags(t *testing.T) {
|
||||
|
||||
func TestConfigFileMemberFields(t *testing.T) {
|
||||
yc := struct {
|
||||
Dir string `json:"data-dir"`
|
||||
MaxSnapFiles uint `json:"max-snapshots"`
|
||||
MaxWalFiles uint `json:"max-wals"`
|
||||
Name string `json:"name"`
|
||||
SnapshotCount uint64 `json:"snapshot-count"`
|
||||
LPUrls string `json:"listen-peer-urls"`
|
||||
LCUrls string `json:"listen-client-urls"`
|
||||
AcurlsCfgFile string `json:"advertise-client-urls"`
|
||||
Dir string `json:"data-dir"`
|
||||
MaxSnapFiles uint `json:"max-snapshots"`
|
||||
MaxWalFiles uint `json:"max-wals"`
|
||||
Name string `json:"name"`
|
||||
SnapshotCount uint64 `json:"snapshot-count"`
|
||||
SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"`
|
||||
LPUrls string `json:"listen-peer-urls"`
|
||||
LCUrls string `json:"listen-client-urls"`
|
||||
AcurlsCfgFile string `json:"advertise-client-urls"`
|
||||
}{
|
||||
"testdir",
|
||||
10,
|
||||
10,
|
||||
"testname",
|
||||
10,
|
||||
1000,
|
||||
"http://localhost:8000,https://localhost:8001",
|
||||
"http://localhost:7000,https://localhost:7001",
|
||||
"http://localhost:7000,https://localhost:7001",
|
||||
@ -392,13 +395,14 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {
|
||||
|
||||
func validateMemberFlags(t *testing.T, cfg *config) {
|
||||
wcfg := &embed.Config{
|
||||
Dir: "testdir",
|
||||
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
|
||||
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
|
||||
MaxSnapFiles: 10,
|
||||
MaxWalFiles: 10,
|
||||
Name: "testname",
|
||||
SnapshotCount: 10,
|
||||
Dir: "testdir",
|
||||
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
|
||||
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
|
||||
MaxSnapFiles: 10,
|
||||
MaxWalFiles: 10,
|
||||
Name: "testname",
|
||||
SnapshotCount: 10,
|
||||
SnapshotCatchUpEntries: 1000,
|
||||
}
|
||||
|
||||
if cfg.ec.Dir != wcfg.Dir {
|
||||
@ -416,6 +420,9 @@ func validateMemberFlags(t *testing.T, cfg *config) {
|
||||
if cfg.ec.SnapshotCount != wcfg.SnapshotCount {
|
||||
t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount)
|
||||
}
|
||||
if cfg.ec.SnapshotCatchUpEntries != wcfg.SnapshotCatchUpEntries {
|
||||
t.Errorf("snapshot catch up entries = %v, want %v", cfg.ec.SnapshotCatchUpEntries, wcfg.SnapshotCatchUpEntries)
|
||||
}
|
||||
if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) {
|
||||
t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls)
|
||||
}
|
||||
|
@ -269,6 +269,8 @@ Experimental feature:
|
||||
Set the max number of learner members allowed in the cluster membership.
|
||||
--experimental-wait-cluster-ready-timeout '5s'
|
||||
Set the maximum time duration to wait for the cluster to be ready.
|
||||
--experimental-snapshot-catch-up-entries '5000'
|
||||
Number of entries for a slow follower to catch up after compacting the the raft storage entries.
|
||||
|
||||
Unsafe feature:
|
||||
--force-new-cluster 'false'
|
||||
|
@ -20,10 +20,14 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/expect"
|
||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
)
|
||||
|
||||
@ -359,4 +363,64 @@ func TestBootstrapDefragFlag(t *testing.T) {
|
||||
if err = proc.Stop(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// wait for the process to exit, otherwise test will have leaked goroutine
|
||||
if err := proc.Close(); err != nil {
|
||||
t.Logf("etcd process closed with error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotCatchupEntriesFlag(t *testing.T) {
|
||||
e2e.SkipInShortMode(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--experimental-snapshot-catchup-entries", "1000"}, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, e2e.WaitReadyExpectProc(ctx, proc, []string{"\"snapshot-catchup-entries\":1000"}))
|
||||
require.NoError(t, e2e.WaitReadyExpectProc(ctx, proc, []string{"serving client traffic"}))
|
||||
require.NoError(t, proc.Stop())
|
||||
|
||||
// wait for the process to exit, otherwise test will have leaked goroutine
|
||||
if err := proc.Close(); err != nil {
|
||||
t.Logf("etcd process closed with error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEtcdHealthyWithTinySnapshotCatchupEntries ensures multi-node etcd cluster remains healthy with 1 snapshot catch up entry
|
||||
func TestEtcdHealthyWithTinySnapshotCatchupEntries(t *testing.T) {
|
||||
e2e.BeforeTest(t)
|
||||
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
|
||||
e2e.WithClusterSize(3),
|
||||
e2e.WithSnapshotCount(1),
|
||||
e2e.WithSnapshotCatchUpEntries(1),
|
||||
)
|
||||
require.NoErrorf(t, err, "could not start etcd process cluster (%v)", err)
|
||||
t.Cleanup(func() {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
})
|
||||
|
||||
// simulate 10 clients keep writing to etcd in parallel with no error
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
for i := 0; i < 10; i++ {
|
||||
clientId := i
|
||||
g.Go(func() error {
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for j := 0; j < 100; j++ {
|
||||
if err = cc.Put(ctx, "foo", fmt.Sprintf("bar%d", clientId), config.PutOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, g.Wait())
|
||||
}
|
||||
|
@ -147,7 +147,8 @@ type EtcdProcessClusterConfig struct {
|
||||
|
||||
MetricsURLScheme string
|
||||
|
||||
SnapshotCount int // default is 10000
|
||||
SnapshotCount int // default is 100000
|
||||
SnapshotCatchUpEntries int // default is 5000
|
||||
|
||||
Client ClientConfig
|
||||
IsPeerTLS bool
|
||||
@ -223,6 +224,10 @@ func WithSnapshotCount(count int) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.SnapshotCount = count }
|
||||
}
|
||||
|
||||
func WithSnapshotCatchUpEntries(count int) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.SnapshotCatchUpEntries = count }
|
||||
}
|
||||
|
||||
func WithClusterSize(size int) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.ClusterSize = size }
|
||||
}
|
||||
@ -548,6 +553,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
||||
if cfg.ExperimentalWarningUnaryRequestDuration != 0 {
|
||||
args = append(args, "--experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration.String())
|
||||
}
|
||||
if cfg.SnapshotCatchUpEntries > 0 {
|
||||
args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries))
|
||||
}
|
||||
envVars := map[string]string{}
|
||||
for key, value := range cfg.EnvVars {
|
||||
envVars[key] = value
|
||||
|
Loading…
x
Reference in New Issue
Block a user