diff --git a/etcdserver/config.go b/etcdserver/config.go index 767b6c3a0..0da9bc29f 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -40,10 +40,21 @@ type ServerConfig struct { DataDir string // DedicatedWALDir config will make the etcd to write the WAL to the WALDir // rather than the dataDir/member/wal. - DedicatedWALDir string - SnapCount uint64 - MaxSnapFiles uint - MaxWALFiles uint + DedicatedWALDir string + + SnapshotCount uint64 + + // SnapshotCatchUpEntries is the number of entries for a slow follower + // to catch-up after compacting the raft storage entries. + // We expect the follower has a millisecond level latency with the leader. + // The max throughput is around 10K. Keep a 5K entries is enough for helping + // follower to catch up. + // WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries" + SnapshotCatchUpEntries uint64 + + MaxSnapFiles uint + MaxWALFiles uint + InitialPeerURLsMap types.URLsMap InitialClusterToken string NewCluster bool @@ -273,7 +284,7 @@ func (c *ServerConfig) print(initial bool) { } plog.Infof("heartbeat = %dms", c.TickMs) plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs)) - plog.Infof("snapshot count = %d", c.SnapCount) + plog.Infof("snapshot count = %d", c.SnapshotCount) if len(c.DiscoveryURL) != 0 { plog.Infof("discovery URL= %s", c.DiscoveryURL) if len(c.DiscoveryProxy) != 0 { @@ -302,7 +313,8 @@ func (c *ServerConfig) print(initial bool) { zap.Int("election-tick-ms", c.ElectionTicks), zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)), zap.Bool("initial-election-tick-advance", c.InitialElectionTickAdvance), - zap.Uint64("snapshot-count", c.SnapCount), + zap.Uint64("snapshot-count", c.SnapshotCount), + zap.Uint64("snapshot-catchup-entries", c.SnapshotCatchUpEntries), zap.Strings("advertise-client-urls", c.getACURLs()), zap.Strings("initial-advertise-peer-urls", c.getAPURLs()), zap.Bool("initial", initial), diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 4b3ad2808..f10190221 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -39,13 +39,6 @@ import ( ) const ( - // Number of entries for slow follower to catch-up after compacting - // the raft storage entries. - // We expect the follower has a millisecond level latency with the leader. - // The max throughput is around 10K. Keep a 5K entries is enough for helping - // follower to catch up. - numberOfCatchUpEntries = 5000 - // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value). // Assuming the RTT is around 10ms, 1MB max size is large enough. maxSizePerMsg = 1 * 1024 * 1024 diff --git a/etcdserver/server.go b/etcdserver/server.go index 12682b29b..fe9e188a8 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -64,7 +64,14 @@ import ( ) const ( - DefaultSnapCount = 100000 + DefaultSnapshotCount = 100000 + + // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower + // to catch-up after compacting the raft storage entries. + // We expect the follower has a millisecond level latency with the leader. + // The max throughput is around 10K. Keep a 5K entries is enough for helping + // follower to catch up. + DefaultSnapshotCatchUpEntries uint64 = 5000 StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -703,14 +710,30 @@ func (s *EtcdServer) Start() { // This function is just used for testing. func (s *EtcdServer) start() { lg := s.getLogger() - if s.Cfg.SnapCount == 0 { - if lg != nil { + if s.Cfg.SnapshotCount == 0 { + if lg != nil { + lg.Info( + "updating snapshot-count to default", + zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount), + zap.Uint64("updated-snapshot-count", DefaultSnapshotCount), + ) } else { - plog.Infof("set snapshot count to default %d", DefaultSnapCount) + plog.Infof("set snapshot count to default %d", DefaultSnapshotCount) } - s.Cfg.SnapCount = DefaultSnapCount + s.Cfg.SnapshotCount = DefaultSnapshotCount } + if s.Cfg.SnapshotCatchUpEntries == 0 { + if lg != nil { + lg.Info( + "updating snapshot catch-up entries to default", + zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries), + zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries), + ) + } + s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries + } + s.w = wait.New() s.applyWait = wait.NewTimeList() s.done = make(chan struct{}) @@ -743,6 +766,7 @@ func (s *EtcdServer) start() { plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version) } } + // TODO: if this is an empty log, writes all peer infos // into the first entry go s.run() @@ -1058,7 +1082,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { "applying snapshot", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), ) } else { plog.Infof("applying snapshot at index %d...", ep.snapi) @@ -1069,7 +1094,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { "applied snapshot", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), ) } else { plog.Infof("finished applying incoming snapshot at index %d", ep.snapi) @@ -1083,6 +1109,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), ) } else { plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", @@ -1304,7 +1331,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { } func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if ep.appliedi-ep.snapi <= s.Cfg.SnapCount { + if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount { return } @@ -1314,7 +1341,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { zap.String("local-member-id", s.ID().String()), zap.Uint64("local-member-applied-index", ep.appliedi), zap.Uint64("local-member-snapshot-index", ep.snapi), - zap.Uint64("local-member-snapshot-count", s.Cfg.SnapCount), + zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), ) } else { plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) @@ -2132,9 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // keep some in memory log entries for slow followers. compacti := uint64(1) - if snapi > numberOfCatchUpEntries { - compacti = snapi - numberOfCatchUpEntries + if snapi > s.Cfg.SnapshotCatchUpEntries { + compacti = snapi - s.Cfg.SnapshotCatchUpEntries } + err = s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index f22b64b1a..a99aaed9a 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -714,7 +714,7 @@ func TestDoProposal(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -745,7 +745,7 @@ func TestDoProposalCancelled(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -769,7 +769,7 @@ func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -788,7 +788,7 @@ func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeNop()}), w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -899,7 +899,7 @@ func TestSyncTrigger(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: mockstore.NewNop(), SyncTicker: tk, @@ -1033,7 +1033,7 @@ func TestSnapshotOrdering(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir}, + Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, snapshotter: raftsnap.New(zap.NewExample(), snapdir), @@ -1077,7 +1077,7 @@ func TestSnapshotOrdering(t *testing.T) { } } -// Applied > SnapCount should trigger a SaveSnap event +// Applied > SnapshotCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer func() { @@ -1097,7 +1097,7 @@ func TestTriggerSnap(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapCount: uint64(snapc)}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1116,7 +1116,7 @@ func TestTriggerSnap(t *testing.T) { gaction, _ := p.Wait(wcnt) // each operation is recorded as a Save - // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap + // (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap if len(gaction) != wcnt { t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } @@ -1164,7 +1164,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { s := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir}, + Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, v2store: st, snapshotter: raftsnap.New(zap.NewExample(), testdir), @@ -1375,7 +1375,7 @@ func TestPublish(t *testing.T) { lgMu: new(sync.RWMutex), lg: zap.NewExample(), readych: make(chan struct{}), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, id: 1, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, @@ -1428,7 +1428,7 @@ func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *r, cluster: &membership.RaftCluster{}, w: mockwait.NewNop(), @@ -1452,7 +1452,7 @@ func TestPublishRetry(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), w: mockwait.NewNop(), stopping: make(chan struct{}), @@ -1495,7 +1495,7 @@ func TestUpdateVersion(t *testing.T) { lgMu: new(sync.RWMutex), lg: zap.NewExample(), id: 1, - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1}, + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &membership.RaftCluster{},