diff --git a/CHANGELOG-3.4.md b/CHANGELOG-3.4.md index b09da3c65..003128727 100644 --- a/CHANGELOG-3.4.md +++ b/CHANGELOG-3.4.md @@ -58,6 +58,8 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [ - e.g. exit with error on `ETCD_INITIAL_CLUSTER_TOKEN=abc etcd --initial-cluster-token=def`. - e.g. exit with error on `ETCDCTL_ENDPOINTS=abc.com ETCDCTL_API=3 etcdctl endpoint health --endpoints=def.com`. - Change [`etcdserverpb.AuthRoleRevokePermissionRequest/key,range_end` fields type from `string` to `bytes`](https://github.com/coreos/etcd/pull/9433). +- Rename `etcdserver.ServerConfig.SnapCount` field to `etcdserver.ServerConfig.SnapshotCount`, to be consistent with the flag name `etcd --snapshot-count`. +- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`. - Change [`embed.Config.CorsInfo` in `*cors.CORSInfo` type to `embed.Config.CORS` in `map[string]struct{}` type](https://github.com/coreos/etcd/pull/9490). - Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572). - Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572). @@ -231,6 +233,7 @@ Note: **v3.5 will deprecate `etcd --log-package-levels` flag for `capnslog`**; ` - Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572). - Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572). - Add [`embed.Config.Logger`](https://github.com/coreos/etcd/pull/9518) to support [structured logger `zap`](https://github.com/uber-go/zap) in server-side. +- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`. - Rename [**`embed.Config.LogOutput`** to **`embed.Config.LogOutputs`**](https://github.com/coreos/etcd/pull/9624) to support multiple log outputs. - Change [**`embed.Config.LogOutputs`** type from `string` to `[]string`](https://github.com/coreos/etcd/pull/9579) to support multiple log outputs. diff --git a/Documentation/upgrades/upgrade_3_4.md b/Documentation/upgrades/upgrade_3_4.md index 90f24da10..f40a4a8bb 100644 --- a/Documentation/upgrades/upgrade_3_4.md +++ b/Documentation/upgrades/upgrade_3_4.md @@ -90,6 +90,30 @@ if err != nil { } ``` +#### Changed `embed.Config.SnapCount` to `embed.Config.SnapshotCount` + +To be consistent with the flag name `etcd --snapshot-count`, `embed.Config.SnapCount` field has been renamed to `embed.Config.SnapshotCount`: + +```diff +import "github.com/coreos/etcd/embed" + +cfg := embed.NewConfig() +-cfg.SnapCount = 100000 ++cfg.SnapshotCount = 100000 +``` + +#### Changed `etcdserver.ServerConfig.SnapCount` to `etcdserver.ServerConfig.SnapshotCount` + +To be consistent with the flag name `etcd --snapshot-count`, `etcdserver.ServerConfig.SnapCount` field has been renamed to `etcdserver.ServerConfig.SnapshotCount`: + +```diff +import "github.com/coreos/etcd/etcdserver" + +srvcfg := etcdserver.ServerConfig{ +- SnapCount: 100000, ++ SnapshotCount: 100000, +``` + #### Changed function signature in package `wal` Changed `wal` function signatures to support structured logger. diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 0774ceed7..18c720d83 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -71,7 +71,7 @@ type raftNode struct { httpdonec chan struct{} // signals http server shutdown complete } -var defaultSnapCount uint64 = 10000 +var defaultSnapshotCount uint64 = 10000 // newRaftNode initiates a raft instance and returns a committed log entry // channel and error channel. Proposals for log updates are sent over the @@ -95,7 +95,7 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, waldir: fmt.Sprintf("raftexample-%d", id), snapdir: fmt.Sprintf("raftexample-%d-snap", id), getSnapshot: getSnapshot, - snapCount: defaultSnapCount, + snapCount: defaultSnapshotCount, stopc: make(chan struct{}), httpstopc: make(chan struct{}), httpdonec: make(chan struct{}), diff --git a/embed/config.go b/embed/config.go index 4289b615b..597b9cca8 100644 --- a/embed/config.go +++ b/embed/config.go @@ -111,12 +111,23 @@ func init() { // Config holds the arguments for configuring an etcd server. type Config struct { - Name string `json:"name"` - Dir string `json:"data-dir"` - WalDir string `json:"wal-dir"` - SnapCount uint64 `json:"snapshot-count"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + Dir string `json:"data-dir"` + WalDir string `json:"wal-dir"` + + SnapshotCount uint64 `json:"snapshot-count"` + + // SnapshotCatchUpEntries is the number of entries for a slow follower + // to catch-up after compacting the raft storage entries. + // We expect the follower has a millisecond level latency with the leader. + // The max throughput is around 10K. Keep a 5K entries is enough for helping + // follower to catch up. + // WARNING: only change this for tests. + // Always use "DefaultSnapshotCatchUpEntries" + SnapshotCatchUpEntries uint64 + + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` // TickMs is the number of milliseconds between heartbeat ticks. // TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1). @@ -342,7 +353,9 @@ func NewConfig() *Config { Name: DefaultName, - SnapCount: etcdserver.DefaultSnapCount, + SnapshotCount: etcdserver.DefaultSnapshotCount, + SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries, + MaxTxnOps: DefaultMaxTxnOps, MaxRequestBytes: DefaultMaxRequestBytes, diff --git a/embed/etcd.go b/embed/etcd.go index 50c8f283e..1626a2155 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -163,7 +163,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { PeerURLs: cfg.APUrls, DataDir: cfg.Dir, DedicatedWALDir: cfg.WalDir, - SnapCount: cfg.SnapCount, + SnapshotCount: cfg.SnapshotCount, MaxSnapFiles: cfg.MaxSnapFiles, MaxWALFiles: cfg.MaxWalFiles, InitialPeerURLsMap: urlsmap, diff --git a/etcdmain/config.go b/etcdmain/config.go index c969e8a16..43b1d094d 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -150,7 +150,7 @@ func newConfig() *config { fs.UintVar(&cfg.ec.MaxSnapFiles, "max-snapshots", cfg.ec.MaxSnapFiles, "Maximum number of snapshot files to retain (0 is unlimited).") fs.UintVar(&cfg.ec.MaxWalFiles, "max-wals", cfg.ec.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).") fs.StringVar(&cfg.ec.Name, "name", cfg.ec.Name, "Human-readable name for this member.") - fs.Uint64Var(&cfg.ec.SnapCount, "snapshot-count", cfg.ec.SnapCount, "Number of committed transactions to trigger a snapshot to disk.") + fs.Uint64Var(&cfg.ec.SnapshotCount, "snapshot-count", cfg.ec.SnapshotCount, "Number of committed transactions to trigger a snapshot to disk.") fs.UintVar(&cfg.ec.TickMs, "heartbeat-interval", cfg.ec.TickMs, "Time (in milliseconds) of a heartbeat interval.") fs.UintVar(&cfg.ec.ElectionMs, "election-timeout", cfg.ec.ElectionMs, "Time (in milliseconds) for an election to timeout.") fs.BoolVar(&cfg.ec.InitialElectionTickAdvance, "initial-election-tick-advance", cfg.ec.InitialElectionTickAdvance, "Whether to fast-forward initial election ticks on boot for faster election.") diff --git a/etcdmain/config_test.go b/etcdmain/config_test.go index ef202a417..56d3f60dc 100644 --- a/etcdmain/config_test.go +++ b/etcdmain/config_test.go @@ -55,7 +55,7 @@ func TestConfigFileMemberFields(t *testing.T) { MaxSnapFiles uint `json:"max-snapshots"` MaxWalFiles uint `json:"max-wals"` Name string `json:"name"` - SnapCount uint64 `json:"snapshot-count"` + SnapshotCount uint64 `json:"snapshot-count"` LPUrls string `json:"listen-peer-urls"` LCUrls string `json:"listen-client-urls"` AcurlsCfgFile string `json:"advertise-client-urls"` @@ -513,13 +513,13 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { func validateMemberFlags(t *testing.T, cfg *config) { wcfg := &embed.Config{ - Dir: "testdir", - LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, - MaxSnapFiles: 10, - MaxWalFiles: 10, - Name: "testname", - SnapCount: 10, + Dir: "testdir", + LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + MaxSnapFiles: 10, + MaxWalFiles: 10, + Name: "testname", + SnapshotCount: 10, } if cfg.ec.Dir != wcfg.Dir { @@ -534,8 +534,8 @@ func validateMemberFlags(t *testing.T, cfg *config) { if cfg.ec.Name != wcfg.Name { t.Errorf("name = %v, want %v", cfg.ec.Name, wcfg.Name) } - if cfg.ec.SnapCount != wcfg.SnapCount { - t.Errorf("snapcount = %v, want %v", cfg.ec.SnapCount, wcfg.SnapCount) + if cfg.ec.SnapshotCount != wcfg.SnapshotCount { + t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount) } if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) { t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls) diff --git a/etcdserver/config.go b/etcdserver/config.go index 767b6c3a0..0da9bc29f 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -40,10 +40,21 @@ type ServerConfig struct { DataDir string // DedicatedWALDir config will make the etcd to write the WAL to the WALDir // rather than the dataDir/member/wal. - DedicatedWALDir string - SnapCount uint64 - MaxSnapFiles uint - MaxWALFiles uint + DedicatedWALDir string + + SnapshotCount uint64 + + // SnapshotCatchUpEntries is the number of entries for a slow follower + // to catch-up after compacting the raft storage entries. + // We expect the follower has a millisecond level latency with the leader. + // The max throughput is around 10K. Keep a 5K entries is enough for helping + // follower to catch up. + // WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries" + SnapshotCatchUpEntries uint64 + + MaxSnapFiles uint + MaxWALFiles uint + InitialPeerURLsMap types.URLsMap InitialClusterToken string NewCluster bool @@ -273,7 +284,7 @@ func (c *ServerConfig) print(initial bool) { } plog.Infof("heartbeat = %dms", c.TickMs) plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs)) - plog.Infof("snapshot count = %d", c.SnapCount) + plog.Infof("snapshot count = %d", c.SnapshotCount) if len(c.DiscoveryURL) != 0 { plog.Infof("discovery URL= %s", c.DiscoveryURL) if len(c.DiscoveryProxy) != 0 { @@ -302,7 +313,8 @@ func (c *ServerConfig) print(initial bool) { zap.Int("election-tick-ms", c.ElectionTicks), zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)), zap.Bool("initial-election-tick-advance", c.InitialElectionTickAdvance), - zap.Uint64("snapshot-count", c.SnapCount), + zap.Uint64("snapshot-count", c.SnapshotCount), + zap.Uint64("snapshot-catchup-entries", c.SnapshotCatchUpEntries), zap.Strings("advertise-client-urls", c.getACURLs()), zap.Strings("initial-advertise-peer-urls", c.getAPURLs()), zap.Bool("initial", initial), diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 4b3ad2808..f10190221 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -39,13 +39,6 @@ import ( ) const ( - // Number of entries for slow follower to catch-up after compacting - // the raft storage entries. - // We expect the follower has a millisecond level latency with the leader. - // The max throughput is around 10K. Keep a 5K entries is enough for helping - // follower to catch up. - numberOfCatchUpEntries = 5000 - // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value). // Assuming the RTT is around 10ms, 1MB max size is large enough. maxSizePerMsg = 1 * 1024 * 1024 diff --git a/etcdserver/server.go b/etcdserver/server.go index 12682b29b..fe9e188a8 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -64,7 +64,14 @@ import ( ) const ( - DefaultSnapCount = 100000 + DefaultSnapshotCount = 100000 + + // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower + // to catch-up after compacting the raft storage entries. + // We expect the follower has a millisecond level latency with the leader. + // The max throughput is around 10K. Keep a 5K entries is enough for helping + // follower to catch up. + DefaultSnapshotCatchUpEntries uint64 = 5000 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -703,14 +710,30 @@ func (s *EtcdServer) Start() { // This function is just used for testing. func (s *EtcdServer) start() { lg := s.getLogger() - if s.Cfg.SnapCount == 0 { - if lg != nil { + if s.Cfg.SnapshotCount == 0 { + if lg != nil { + lg.Info( + "updating snapshot-count to default", + zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount), + zap.Uint64("updated-snapshot-count", DefaultSnapshotCount), + ) } else { - plog.Infof("set snapshot count to default %d", DefaultSnapCount) + plog.Infof("set snapshot count to default %d", DefaultSnapshotCount) } - s.Cfg.SnapCount = DefaultSnapCount + s.Cfg.SnapshotCount = DefaultSnapshotCount } + if s.Cfg.SnapshotCatchUpEntries == 0 { + if lg != nil { + lg.Info( + "updating snapshot catch-up entries to default", + zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries), + zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries), + ) + } + s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries + } + s.w = wait.New() s.applyWait = wait.NewTimeList() s.done = make(chan struct{}) @@ -743,6 +766,7 @@ func (s *EtcdServer) start() { plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version) } } + // TODO: if this is an empty log, writes all peer infos // into the first entry go s.run() @@ -1058,7 +1082,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { "applying snapshot", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), ) } else { plog.Infof("applying snapshot at index %d...", ep.snapi) @@ -1069,7 +1094,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { "applied snapshot", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), ) } else { plog.Infof("finished applying incoming snapshot at index %d", ep.snapi) @@ -1083,6 +1109,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), ) } else { plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", @@ -1304,7 +1331,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { } func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if ep.appliedi-ep.snapi <= s.Cfg.SnapCount { + if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount { return } @@ -1314,7 +1341,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { zap.String("local-member-id", s.ID().String()), zap.Uint64("local-member-applied-index", ep.appliedi), zap.Uint64("local-member-snapshot-index", ep.snapi), - zap.Uint64("local-member-snapshot-count", s.Cfg.SnapCount), + zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), ) } else { plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) @@ -2132,9 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // keep some in memory log entries for slow followers. compacti := uint64(1) - if snapi > numberOfCatchUpEntries { - compacti = snapi - numberOfCatchUpEntries + if snapi > s.Cfg.SnapshotCatchUpEntries { + compacti = snapi - s.Cfg.SnapshotCatchUpEntries } + err = s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index f22b64b1a..a99aaed9a 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -714,7 +714,7 @@ func TestDoProposal(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -745,7 +745,7 @@ func TestDoProposalCancelled(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -769,7 +769,7 @@ func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -788,7 +788,7 @@ func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -899,7 +899,7 @@ func TestSyncTrigger(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: mockstore.NewNop(), SyncTicker: tk, @@ -1033,7 +1033,7 @@ func TestSnapshotOrdering(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir}, + Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, snapshotter: raftsnap.New(zap.NewExample(), snapdir), @@ -1077,7 +1077,7 @@ func TestSnapshotOrdering(t *testing.T) { } } -// Applied > SnapCount should trigger a SaveSnap event +// Applied > SnapshotCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer func() { @@ -1097,7 +1097,7 @@ func TestTriggerSnap(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapCount: uint64(snapc)}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1116,7 +1116,7 @@ func TestTriggerSnap(t *testing.T) { gaction, _ := p.Wait(wcnt) // each operation is recorded as a Save - // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap + // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap if len(gaction) != wcnt { t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } @@ -1164,7 +1164,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir}, + Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, snapshotter: raftsnap.New(zap.NewExample(), testdir), @@ -1375,7 +1375,7 @@ func TestPublish(t *testing.T) { lgMu: new(sync.RWMutex), lg: zap.NewExample(), readych: make(chan struct{}), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, id: 1, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, @@ -1428,7 +1428,7 @@ func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, cluster: &membership.RaftCluster{}, w: mockwait.NewNop(), @@ -1452,7 +1452,7 @@ func TestPublishRetry(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), w: mockwait.NewNop(), stopping: make(chan struct{}), @@ -1495,7 +1495,7 @@ func TestUpdateVersion(t *testing.T) { lgMu: new(sync.RWMutex), lg: zap.NewExample(), id: 1, - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &membership.RaftCluster{}, diff --git a/functional/rpcpb/etcd_config.go b/functional/rpcpb/etcd_config.go index e84a6e6cc..bc3f8b34b 100644 --- a/functional/rpcpb/etcd_config.go +++ b/functional/rpcpb/etcd_config.go @@ -160,7 +160,7 @@ func (e *Etcd) EmbedConfig() (cfg *embed.Config, err error) { cfg.ClusterState = e.InitialClusterState cfg.InitialClusterToken = e.InitialClusterToken - cfg.SnapCount = uint64(e.SnapshotCount) + cfg.SnapshotCount = uint64(e.SnapshotCount) cfg.QuotaBackendBytes = e.QuotaBackendBytes cfg.PreVote = e.PreVote diff --git a/functional/rpcpb/etcd_config_test.go b/functional/rpcpb/etcd_config_test.go index 337b31353..480098e41 100644 --- a/functional/rpcpb/etcd_config_test.go +++ b/functional/rpcpb/etcd_config_test.go @@ -128,7 +128,7 @@ func TestEtcd(t *testing.T) { expc.InitialCluster = "s1=https://127.0.0.1:13800,s2=https://127.0.0.1:23800,s3=https://127.0.0.1:33800" expc.ClusterState = "new" expc.InitialClusterToken = "tkn" - expc.SnapCount = 10000 + expc.SnapshotCount = 10000 expc.QuotaBackendBytes = 10740000000 expc.PreVote = true expc.ExperimentalInitialCorruptCheck = true diff --git a/integration/cluster.go b/integration/cluster.go index ccd232f6f..b32895b96 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -117,17 +117,25 @@ func init() { } type ClusterConfig struct { - Size int - PeerTLS *transport.TLSInfo - ClientTLS *transport.TLSInfo - DiscoveryURL string - UseGRPC bool - QuotaBackendBytes int64 - MaxTxnOps uint - MaxRequestBytes uint + Size int + PeerTLS *transport.TLSInfo + ClientTLS *transport.TLSInfo + + DiscoveryURL string + + UseGRPC bool + + QuotaBackendBytes int64 + + MaxTxnOps uint + MaxRequestBytes uint + SnapshotCount uint64 + SnapshotCatchUpEntries uint64 + GRPCKeepAliveMinTime time.Duration GRPCKeepAliveInterval time.Duration GRPCKeepAliveTimeout time.Duration + // SkipCreatingClient to skip creating clients for each member. SkipCreatingClient bool @@ -269,6 +277,8 @@ func (c *cluster) mustNewMember(t *testing.T) *member { quotaBackendBytes: c.cfg.QuotaBackendBytes, maxTxnOps: c.cfg.MaxTxnOps, maxRequestBytes: c.cfg.MaxRequestBytes, + snapshotCount: c.cfg.SnapshotCount, + snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries, grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, @@ -550,6 +560,8 @@ type memberConfig struct { quotaBackendBytes int64 maxTxnOps uint maxRequestBytes uint + snapshotCount uint64 + snapshotCatchUpEntries uint64 grpcKeepAliveMinTime time.Duration grpcKeepAliveInterval time.Duration grpcKeepAliveTimeout time.Duration @@ -612,6 +624,14 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { if m.MaxRequestBytes == 0 { m.MaxRequestBytes = embed.DefaultMaxRequestBytes } + m.SnapshotCount = etcdserver.DefaultSnapshotCount + if mcfg.snapshotCount != 0 { + m.SnapshotCount = mcfg.snapshotCount + } + m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries + if mcfg.snapshotCatchUpEntries != 0 { + m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries + } m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing diff --git a/integration/cluster_test.go b/integration/cluster_test.go index a706f0dd4..2e68c3286 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -251,7 +251,7 @@ func testIssue2746(t *testing.T, members int) { c := NewCluster(t, members) for _, m := range c.Members { - m.SnapCount = 10 + m.SnapshotCount = 10 } c.Launch(t) diff --git a/integration/member_test.go b/integration/member_test.go index a56dd4be8..97c84bb13 100644 --- a/integration/member_test.go +++ b/integration/member_test.go @@ -86,7 +86,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) { func TestSnapshotAndRestartMember(t *testing.T) { defer testutil.AfterTest(t) m := mustNewMember(t, memberConfig{name: "snapAndRestartTest"}) - m.SnapCount = 100 + m.SnapshotCount = 100 m.Launch() defer m.Terminate(t) m.WaitOK(t) diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index c91f4df65..2487e41d3 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -352,6 +352,84 @@ func TestV3WatchFutureRevision(t *testing.T) { } } +// TestV3WatchRestoreSnapshotUnsync tests whether slow follower can restore +// from leader snapshot, and still notify on watchers from an old revision +// that were created in synced watcher group in the first place. +func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{ + Size: 3, + SnapshotCount: 10, + SnapshotCatchUpEntries: 5, + }) + defer clus.Terminate(t) + + // spawn a watcher before shutdown, and put it in synced watcher + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, errW := toGRPC(clus.Client(0)).Watch.Watch(ctx) + if errW != nil { + t.Fatal(errW) + } + if err := wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 5}}}); err != nil { + t.Fatalf("wStream.Send error: %v", err) + } + wresp, errR := wStream.Recv() + if errR != nil { + t.Errorf("wStream.Recv error: %v", errR) + } + if !wresp.Created { + t.Errorf("wresp.Created got = %v, want = true", wresp.Created) + } + + clus.Members[0].InjectPartition(t, clus.Members[1:]...) + clus.waitLeader(t, clus.Members[1:]) + time.Sleep(2 * time.Second) + + kvc := toGRPC(clus.Client(1)).KV + + // to trigger snapshot from the leader to the stopped follower + for i := 0; i < 15; i++ { + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", i, err) + } + } + + // trigger snapshot send from leader to this slow follower + // which then calls watchable store Restore + clus.Members[0].RecoverPartition(t, clus.Members[1:]...) + clus.WaitLeader(t) + time.Sleep(2 * time.Second) + + // slow follower now applies leader snapshot + // should be able to notify on old-revision watchers in unsynced + // make sure restore watch operation correctly moves watchers + // between synced and unsynced watchers + errc := make(chan error) + go func() { + cresp, cerr := wStream.Recv() + if cerr != nil { + errc <- cerr + return + } + // from start revision 5 to latest revision 16 + if len(cresp.Events) != 12 { + errc <- fmt.Errorf("expected 12 events, got %+v", cresp.Events) + return + } + errc <- nil + }() + select { + case <-time.After(10 * time.Second): + t.Fatal("took too long to receive events from restored watcher") + case err := <-errc: + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + } +} + // TestV3WatchWrongRange tests wrong range does not create watchers. func TestV3WatchWrongRange(t *testing.T) { defer testutil.AfterTest(t) diff --git a/tests/e2e/cluster_test.go b/tests/e2e/cluster_test.go index 65be68980..133b3ee35 100644 --- a/tests/e2e/cluster_test.go +++ b/tests/e2e/cluster_test.go @@ -108,7 +108,7 @@ type etcdProcessClusterConfig struct { metricsURLScheme string - snapCount int // default is 10000 + snapshotCount int // default is 10000 clientTLS clientConnType clientCertAuthEnabled bool @@ -175,8 +175,8 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro if cfg.execPath == "" { cfg.execPath = binPath } - if cfg.snapCount == 0 { - cfg.snapCount = etcdserver.DefaultSnapCount + if cfg.snapshotCount == 0 { + cfg.snapshotCount = etcdserver.DefaultSnapshotCount } etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize) @@ -217,7 +217,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerPro "--initial-advertise-peer-urls", purl.String(), "--initial-cluster-token", cfg.initialToken, "--data-dir", dataDirPath, - "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount), + "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), } args = addV2Args(args) if cfg.forceNewCluster { diff --git a/tests/e2e/ctl_v2_test.go b/tests/e2e/ctl_v2_test.go index 8af4425ee..709f607e4 100644 --- a/tests/e2e/ctl_v2_test.go +++ b/tests/e2e/ctl_v2_test.go @@ -242,7 +242,7 @@ func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) { defer os.RemoveAll(backupDir) etcdCfg := configNoTLS - etcdCfg.snapCount = snapCount + etcdCfg.snapshotCount = snapCount epc1 := setupEtcdctlTest(t, &etcdCfg, false) // v3 put before v2 set so snapshot happens after v3 operations to confirm diff --git a/tests/e2e/etcd_corrupt_test.go b/tests/e2e/etcd_corrupt_test.go index 2e3030947..d72d08792 100644 --- a/tests/e2e/etcd_corrupt_test.go +++ b/tests/e2e/etcd_corrupt_test.go @@ -39,7 +39,7 @@ func TestEtcdCorruptHash(t *testing.T) { cfg := configNoTLS // trigger snapshot so that restart member can load peers from disk - cfg.snapCount = 3 + cfg.snapshotCount = 3 testCtl(t, corruptTest, withQuorum(), withCfg(cfg), diff --git a/tests/e2e/etcd_release_upgrade_test.go b/tests/e2e/etcd_release_upgrade_test.go index 6b1d42323..136b62fd1 100644 --- a/tests/e2e/etcd_release_upgrade_test.go +++ b/tests/e2e/etcd_release_upgrade_test.go @@ -38,7 +38,7 @@ func TestReleaseUpgrade(t *testing.T) { copiedCfg := configNoTLS copiedCfg.execPath = lastReleaseBinary - copiedCfg.snapCount = 3 + copiedCfg.snapshotCount = 3 copiedCfg.baseScheme = "unix" // to avoid port conflict epc, err := newEtcdProcessCluster(&copiedCfg) @@ -113,7 +113,7 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { copiedCfg := configNoTLS copiedCfg.execPath = lastReleaseBinary - copiedCfg.snapCount = 10 + copiedCfg.snapshotCount = 10 copiedCfg.baseScheme = "unix" epc, err := newEtcdProcessCluster(&copiedCfg)