From 2db193fda11616d983a12523dcddffd82f363093 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 13:29:45 +0200 Subject: [PATCH 01/14] etcdserver: Fix snapshot always nil --- server/etcdserver/server.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 19916f800..e6ee52cfa 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -387,6 +387,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { be := openBackend(cfg, beHooks) ci.SetBackend(be) buckets.CreateMetaBucket(be.BatchTx()) + kvindex := ci.ConsistentIndex() + cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { err := maybeDefragBackend(cfg, be) @@ -407,7 +409,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } var ( remotes []*membership.Member - snapshot *raftpb.Snapshot ) switch { @@ -527,6 +528,19 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { zap.Int64("backend-size-in-use-bytes", s2), zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), ) + if beExist { + // TODO: remove kvindex != 0 checking when we do not expect users to upgrade + // etcd from pre-3.0 release. + if kvindex < snapshot.Metadata.Index { + if kvindex != 0 { + return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index) + } + cfg.Logger.Warn( + "consistent index was never saved", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + ) + } + } } else { cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } @@ -620,22 +634,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) - kvindex := ci.ConsistentIndex() - srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) - - if beExist { - // TODO: remove kvindex != 0 checking when we do not expect users to upgrade - // etcd from pre-3.0 release. - if snapshot != nil && kvindex < snapshot.Metadata.Index { - if kvindex != 0 { - return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index) - } - cfg.Logger.Warn( - "consistent index was never saved", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - ) - } - } srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) From 120cd5abe2f4a12f96309d55d7ab0dc98419ce00 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 15:40:54 +0200 Subject: [PATCH 02/14] etcdserver: Extract etcdserver boostrap function --- server/etcdserver/server.go | 99 ++++++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e6ee52cfa..37f50cf48 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -330,9 +330,7 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) { bh.confStateDirty = true } -// NewServer creates a new EtcdServer from the supplied configuration. The -// configuration is considered static for the lifetime of the EtcdServer. -func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { +func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) var ( @@ -408,7 +406,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } var ( - remotes []*membership.Member + remotes []*membership.Member ) switch { @@ -531,7 +529,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. - if kvindex < snapshot.Metadata.Index { + if kvindex < snapshot.Metadata.Index { if kvindex != 0 { return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index) } @@ -566,9 +564,53 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.Errorf("cannot access member directory: %v", terr) } + return &boostrapResult{ + cl: cl, + remotes: remotes, + w: w, + n: n, + s: s, + id: id, + prt: prt, + ci: ci, + st: st, + be: be, + ss: ss, + beHooks: beHooks, + }, nil +} - sstats := stats.NewServerStats(cfg.Name, id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, id.String()) +type boostrapResult struct { + cl *membership.RaftCluster + remotes []*membership.Member + w *wal.WAL + n raft.Node + s *raft.MemoryStorage + id types.ID + prt http.RoundTripper + ci cindex.ConsistentIndexer + st v2store.Store + be backend.Backend + ss *snap.Snapshotter + beHooks *backendHooks +} + +// NewServer creates a new EtcdServer from the supplied configuration. The +// configuration is considered static for the lifetime of the EtcdServer. +func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { + b, err := bootstrap(cfg) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + b.be.Close() + } + }() + + sstats := stats.NewServerStats(cfg.Name, b.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -577,36 +619,36 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { lgMu: new(sync.RWMutex), lg: cfg.Logger, errorc: make(chan error, 1), - v2store: st, - snapshotter: ss, + v2store: b.st, + snapshotter: b.ss, r: *newRaftNode( raftNodeConfig{ lg: cfg.Logger, - isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, - Node: n, + isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, + Node: b.n, heartbeat: heartbeat, - raftStorage: s, - storage: NewStorage(w, ss), + raftStorage: b.s, + storage: NewStorage(b.w, b.ss), }, ), - id: id, + id: b.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: cl, + cluster: b.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), - peerRt: prt, - reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), + peerRt: b.prt, + reqIDGen: idutil.NewGenerator(uint16(b.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: ci, + consistIndex: b.ci, firstCommitInTermC: make(chan struct{}), } - serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.id.String()}).Set(1) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) - srv.be = be - srv.beHooks = beHooks + srv.be = b.be + srv.beHooks = b.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. @@ -634,7 +676,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) - srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) newSrv := srv // since srv == nil in defer if srv is returned as nil @@ -671,11 +712,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: id, + ID: b.id, URLs: cfg.PeerURLs, - ClusterID: cl.ID(), + ClusterID: b.cl.ID(), Raft: srv, - Snapshotter: ss, + Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats, ErrorC: srv.errorc, @@ -684,13 +725,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } // add all remotes into transport - for _, m := range remotes { - if m.ID != id { + for _, m := range b.remotes { + if m.ID != b.id { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range cl.Members() { - if m.ID != id { + for _, m := range b.cl.Members() { + if m.ID != b.id { tr.AddPeer(m.ID, m.PeerURLs) } } From 4dd9424d11e82cbf9b9b11d67086b4b93c0e2c85 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 15:46:38 +0200 Subject: [PATCH 03/14] etcdserver: Extract boostrapBackend function --- server/etcdserver/server.go | 42 +++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 37f50cf48..4414e7b83 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -377,24 +377,10 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { ss := snap.New(cfg.Logger, cfg.SnapDir()) - bepath := cfg.BackendPath() - beExist := fileutil.Exist(bepath) - - ci := cindex.NewConsistentIndex(nil) - beHooks := &backendHooks{lg: cfg.Logger, indexer: ci} - be := openBackend(cfg, beHooks) - ci.SetBackend(be) - buckets.CreateMetaBucket(be.BatchTx()) - kvindex := ci.ConsistentIndex() - cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) - - if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { - err := maybeDefragBackend(cfg, be) - if err != nil { - return nil, err - } + be, ci, beExist, beHooks, err := boostrapBackend(cfg) + if err != nil { + return nil, err } - defer func() { if err != nil { be.Close() @@ -529,9 +515,10 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. + kvindex := ci.ConsistentIndex() if kvindex < snapshot.Metadata.Index { if kvindex != 0 { - return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index) + return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) } cfg.Logger.Warn( "consistent index was never saved", @@ -553,6 +540,7 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) cl.Recover(api.UpdateCapability) if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) } @@ -595,6 +583,24 @@ type boostrapResult struct { beHooks *backendHooks } +func boostrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) { + beExist = fileutil.Exist(cfg.BackendPath()) + ci = cindex.NewConsistentIndex(nil) + beHooks = &backendHooks{lg: cfg.Logger, indexer: ci} + be = openBackend(cfg, beHooks) + ci.SetBackend(be) + buckets.CreateMetaBucket(be.BatchTx()) + if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { + err := maybeDefragBackend(cfg, be) + if err != nil { + be.Close() + return nil, nil, false, nil, err + } + } + cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex())) + return be, ci, beExist, beHooks, nil +} + // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { From 16b2a8b42088d5a2e00361620d848ab6f2d28be8 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 15:56:43 +0200 Subject: [PATCH 04/14] etcdserver: Prepare boostrap to split cluster setup --- server/etcdserver/server.go | 42 ++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4414e7b83..ef47a9536 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -331,13 +331,10 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) { } func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { + b = &boostrapResult{} st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) var ( - w *wal.WAL - n raft.Node - s *raft.MemoryStorage - id types.ID cl *membership.RaftCluster ) @@ -419,8 +416,9 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - id, n, s, w = startNode(cfg, cl, nil) - cl.SetID(id, existingCluster.ID()) + b.id, b.n, b.s, b.w = startNode(cfg, cl, nil) + cl.SetID(b.id, existingCluster.ID()) + b.cl = cl case !haveWAL && cfg.NewCluster: if err = cfg.VerifyBootstrap(); err != nil { @@ -454,8 +452,9 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { } cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) - cl.SetID(id, cl.ID()) + b.id, b.n, b.s, b.w = startNode(cfg, cl, cl.MemberIDs()) + cl.SetID(b.id, cl.ID()) + b.cl = cl case haveWAL: if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { @@ -531,9 +530,9 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { } if !cfg.ForceNewCluster { - id, cl, n, s, w = restartNode(cfg, snapshot) + b.id, cl, b.n, b.s, b.w = restartNode(cfg, snapshot) } else { - id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) + b.id, cl, b.n, b.s, b.w = restartAsStandaloneNode(cfg, snapshot) } cl.SetStore(st) @@ -544,6 +543,7 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) } + b.cl = cl default: return nil, fmt.Errorf("unsupported bootstrap config") @@ -552,20 +552,14 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.Errorf("cannot access member directory: %v", terr) } - return &boostrapResult{ - cl: cl, - remotes: remotes, - w: w, - n: n, - s: s, - id: id, - prt: prt, - ci: ci, - st: st, - be: be, - ss: ss, - beHooks: beHooks, - }, nil + b.remotes = remotes + b.prt = prt + b.ci = ci + b.st = st + b.be = be + b.ss = ss + b.beHooks = beHooks + return b, nil } type boostrapResult struct { From af0439490c237db5d677a37c53f8b98ceaf29bec Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 16:04:52 +0200 Subject: [PATCH 05/14] etcdserver: Extract cluster boostrap functions --- server/etcdserver/server.go | 335 +++++++++++++++++++----------------- 1 file changed, 177 insertions(+), 158 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ef47a9536..1faefabba 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -331,13 +331,8 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) { } func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { - b = &boostrapResult{} st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) - var ( - cl *membership.RaftCluster - ) - if cfg.MaxRequestBytes > recommendedMaxRequestBytes { cfg.Logger.Warn( "exceeded recommended request limit", @@ -388,171 +383,25 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { if err != nil { return nil, err } - var ( - remotes []*membership.Member - ) switch { case !haveWAL && !cfg.NewCluster: - if err = cfg.VerifyJoinExisting(); err != nil { - return nil, err - } - cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) - if err != nil { - return nil, err - } - existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt) - if gerr != nil { - return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) - } - if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil { - return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) - } - if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) { - return nil, fmt.Errorf("incompatible with current running cluster") - } - - remotes = existingCluster.Members() - cl.SetID(types.ID(0), existingCluster.ID()) - cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - b.id, b.n, b.s, b.w = startNode(cfg, cl, nil) - cl.SetID(b.id, existingCluster.ID()) - b.cl = cl - + b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) case !haveWAL && cfg.NewCluster: - if err = cfg.VerifyBootstrap(); err != nil { - return nil, err - } - cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) - if err != nil { - return nil, err - } - m := cl.MemberByName(cfg.Name) - if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) { - return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) - } - if cfg.ShouldDiscover() { - var str string - str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) - if err != nil { - return nil, &DiscoveryError{Op: "join", Err: err} - } - var urlsmap types.URLsMap - urlsmap, err = types.NewURLsMap(str) - if err != nil { - return nil, err - } - if config.CheckDuplicateURL(urlsmap) { - return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) - } - if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil { - return nil, err - } - } - cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - b.id, b.n, b.s, b.w = startNode(cfg, cl, cl.MemberIDs()) - cl.SetID(b.id, cl.ID()) - b.cl = cl - + b, err = boostrapNewClusterNoWAL(cfg, prt, st, be) case haveWAL: - if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { - return nil, fmt.Errorf("cannot write to member directory: %v", err) - } - - if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { - return nil, fmt.Errorf("cannot write to WAL directory: %v", err) - } - - if cfg.ShouldDiscover() { - cfg.Logger.Warn( - "discovery token is ignored since cluster already initialized; valid logs are found", - zap.String("wal-dir", cfg.WALDir()), - ) - } - - // Find a snapshot to start/restart a raft node - walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) - if err != nil { - return nil, err - } - // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding - // wal log entries - snapshot, err := ss.LoadNewestAvailable(walSnaps) - if err != nil && err != snap.ErrNoSnapshot { - return nil, err - } - - if snapshot != nil { - if err = st.Recovery(snapshot.Data); err != nil { - cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) - } - - if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { - cfg.Logger.Error("illegal v2store content", zap.Error(err)) - return nil, err - } - - cfg.Logger.Info( - "recovered v2 store from snapshot", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), - ) - - if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { - cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) - } - s1, s2 := be.Size(), be.SizeInUse() - cfg.Logger.Info( - "recovered v3 backend from snapshot", - zap.Int64("backend-size-bytes", s1), - zap.String("backend-size", humanize.Bytes(uint64(s1))), - zap.Int64("backend-size-in-use-bytes", s2), - zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), - ) - if beExist { - // TODO: remove kvindex != 0 checking when we do not expect users to upgrade - // etcd from pre-3.0 release. - kvindex := ci.ConsistentIndex() - if kvindex < snapshot.Metadata.Index { - if kvindex != 0 { - return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) - } - cfg.Logger.Warn( - "consistent index was never saved", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - ) - } - } - } else { - cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") - } - - if !cfg.ForceNewCluster { - b.id, cl, b.n, b.s, b.w = restartNode(cfg, snapshot) - } else { - b.id, cl, b.n, b.s, b.w = restartAsStandaloneNode(cfg, snapshot) - } - - cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - cl.Recover(api.UpdateCapability) - if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { - bepath := cfg.BackendPath() - os.RemoveAll(bepath) - return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) - } - b.cl = cl - + b, err = boostrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) default: + be.Close() return nil, fmt.Errorf("unsupported bootstrap config") } + if err != nil { + return nil, err + } if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.Errorf("cannot access member directory: %v", terr) } - b.remotes = remotes b.prt = prt b.ci = ci b.st = st @@ -595,6 +444,176 @@ func boostrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Con return be, ci, beExist, beHooks, nil } +func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) { + if err := cfg.VerifyJoinExisting(); err != nil { + return nil, err + } + cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + if err != nil { + return nil, err + } + existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt) + if gerr != nil { + return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) + } + if err := membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil { + return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) + } + if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) { + return nil, fmt.Errorf("incompatible with current running cluster") + } + + remotes := existingCluster.Members() + cl.SetID(types.ID(0), existingCluster.ID()) + cl.SetStore(st) + cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + id, n, s, w := startNode(cfg, cl, nil) + cl.SetID(id, existingCluster.ID()) + return &boostrapResult{ + cl: cl, + remotes: remotes, + w: w, + n: n, + s: s, + id: id, + }, nil +} + +func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) { + if err := cfg.VerifyBootstrap(); err != nil { + return nil, err + } + cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + if err != nil { + return nil, err + } + m := cl.MemberByName(cfg.Name) + if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) { + return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) + } + if cfg.ShouldDiscover() { + var str string + str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) + if err != nil { + return nil, &DiscoveryError{Op: "join", Err: err} + } + var urlsmap types.URLsMap + urlsmap, err = types.NewURLsMap(str) + if err != nil { + return nil, err + } + if config.CheckDuplicateURL(urlsmap) { + return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) + } + if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil { + return nil, err + } + } + cl.SetStore(st) + cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + id, n, s, w := startNode(cfg, cl, cl.MemberIDs()) + cl.SetID(id, cl.ID()) + return &boostrapResult{ + cl: cl, + remotes: nil, + w: w, + n: n, + s: s, + id: id, + }, nil +} + +func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*boostrapResult, error) { + if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { + return nil, fmt.Errorf("cannot write to member directory: %v", err) + } + + if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { + return nil, fmt.Errorf("cannot write to WAL directory: %v", err) + } + + if cfg.ShouldDiscover() { + cfg.Logger.Warn( + "discovery token is ignored since cluster already initialized; valid logs are found", + zap.String("wal-dir", cfg.WALDir()), + ) + } + + // Find a snapshot to start/restart a raft node + walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) + if err != nil { + return nil, err + } + // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding + // wal log entries + snapshot, err := ss.LoadNewestAvailable(walSnaps) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err + } + + if snapshot != nil { + if err = st.Recovery(snapshot.Data); err != nil { + cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) + } + + if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { + cfg.Logger.Error("illegal v2store content", zap.Error(err)) + return nil, err + } + + cfg.Logger.Info( + "recovered v2 store from snapshot", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), + ) + + if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { + cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) + } + s1, s2 := be.Size(), be.SizeInUse() + cfg.Logger.Info( + "recovered v3 backend from snapshot", + zap.Int64("backend-size-bytes", s1), + zap.String("backend-size", humanize.Bytes(uint64(s1))), + zap.Int64("backend-size-in-use-bytes", s2), + zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), + ) + if beExist { + // TODO: remove kvindex != 0 checking when we do not expect users to upgrade + // etcd from pre-3.0 release. + kvindex := ci.ConsistentIndex() + if kvindex < snapshot.Metadata.Index { + if kvindex != 0 { + return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) + } + cfg.Logger.Warn( + "consistent index was never saved", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + ) + } + } + } else { + cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") + } + + r := &boostrapResult{} + if !cfg.ForceNewCluster { + r.id, r.cl, r.n, r.s, r.w = restartNode(cfg, snapshot) + } else { + r.id, r.cl, r.n, r.s, r.w = restartAsStandaloneNode(cfg, snapshot) + } + + r.cl.SetStore(st) + r.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + r.cl.Recover(api.UpdateCapability) + if r.cl.Version() != nil && !r.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + bepath := cfg.BackendPath() + os.RemoveAll(bepath) + return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) + } + return r, nil +} + // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { From 7d39c3c655a24f0415d7bf8757ecb601b961e188 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 16:14:19 +0200 Subject: [PATCH 06/14] etcdserver: Extract boostrapSnapshotter function --- server/etcdserver/server.go | 42 +++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1faefabba..1a18bffc5 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -348,26 +348,7 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { } haveWAL := wal.Exist(cfg.WALDir()) - - if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil { - cfg.Logger.Fatal( - "failed to create snapshot directory", - zap.String("path", cfg.SnapDir()), - zap.Error(err), - ) - } - - if err = fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool { - return strings.HasPrefix(fileName, "tmp") - }); err != nil { - cfg.Logger.Error( - "failed to remove temp file(s) in snapshot directory", - zap.String("path", cfg.SnapDir()), - zap.Error(err), - ) - } - - ss := snap.New(cfg.Logger, cfg.SnapDir()) + ss := boostrapSnapshotter(cfg) be, ci, beExist, beHooks, err := boostrapBackend(cfg) if err != nil { @@ -426,6 +407,27 @@ type boostrapResult struct { beHooks *backendHooks } +func boostrapSnapshotter(cfg config.ServerConfig) *snap.Snapshotter { + if err := fileutil.TouchDirAll(cfg.SnapDir()); err != nil { + cfg.Logger.Fatal( + "failed to create snapshot directory", + zap.String("path", cfg.SnapDir()), + zap.Error(err), + ) + } + + if err := fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool { + return strings.HasPrefix(fileName, "tmp") + }); err != nil { + cfg.Logger.Error( + "failed to remove temp file(s) in snapshot directory", + zap.String("path", cfg.SnapDir()), + zap.Error(err), + ) + } + return snap.New(cfg.Logger, cfg.SnapDir()) +} + func boostrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) { beExist = fileutil.Exist(cfg.BackendPath()) ci = cindex.NewConsistentIndex(nil) From 880673c4a08403664f6f2a7e7a34736b1739d352 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 17:09:17 +0200 Subject: [PATCH 07/14] etcdserver: Extract raftConfig function --- server/etcdserver/raft.go | 41 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 8b9600d39..48440ec63 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -451,17 +451,7 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. zap.String("cluster-id", cl.ID().String()), ) s = raft.NewMemoryStorage() - c := &raft.Config{ - ID: uint64(id), - ElectionTick: cfg.ElectionTicks, - HeartbeatTick: 1, - Storage: s, - MaxSizePerMsg: maxSizePerMsg, - MaxInflightMsgs: maxInflightMsgs, - CheckQuorum: true, - PreVote: cfg.PreVote, - Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")), - } + c := raftConfig(cfg, uint64(id), s) if len(peers) == 0 { n = raft.RestartNode(c) } else { @@ -494,18 +484,7 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, } s.SetHardState(st) s.Append(ents) - c := &raft.Config{ - ID: uint64(id), - ElectionTick: cfg.ElectionTicks, - HeartbeatTick: 1, - Storage: s, - MaxSizePerMsg: maxSizePerMsg, - MaxInflightMsgs: maxInflightMsgs, - CheckQuorum: true, - PreVote: cfg.PreVote, - Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")), - } - + c := raftConfig(cfg, uint64(id), s) n := raft.RestartNode(c) raftStatusMu.Lock() raftStatus = n.Status @@ -568,8 +547,16 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) } s.SetHardState(st) s.Append(ents) - c := &raft.Config{ - ID: uint64(id), + c := raftConfig(cfg, uint64(id), s) + + n := raft.RestartNode(c) + raftStatus = n.Status + return id, cl, n, s, w +} + +func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft.Config { + return &raft.Config{ + ID: id, ElectionTick: cfg.ElectionTicks, HeartbeatTick: 1, Storage: s, @@ -579,10 +566,6 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) PreVote: cfg.PreVote, Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")), } - - n := raft.RestartNode(c) - raftStatus = n.Status - return id, cl, n, s, w } // getIDs returns an ordered set of IDs included in the given snapshot and From 554777bba48d9cec938c4bf43eda5e3310299777 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 17:29:13 +0200 Subject: [PATCH 08/14] etcdserver: Extract boostrapRaft struct --- server/etcdserver/raft.go | 48 ++++++++++++++++++++------ server/etcdserver/server.go | 68 +++++++++++++++---------------------- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 48440ec63..39857522e 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -420,8 +420,7 @@ func (r *raftNode) advanceTicks(ticks int) { } } -func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { - var err error +func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( &pb.Metadata{ @@ -429,7 +428,8 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. ClusterID: uint64(cl.ID()), }, ) - if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil { + w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) + if err != nil { cfg.Logger.Panic("failed to create WAL", zap.Error(err)) } if cfg.UnsafeNoFsync { @@ -444,14 +444,15 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } - id = member.ID + id := member.ID cfg.Logger.Info( "starting local member", zap.String("local-member-id", id.String()), zap.String("cluster-id", cl.ID().String()), ) - s = raft.NewMemoryStorage() + s := raft.NewMemoryStorage() c := raftConfig(cfg, uint64(id), s) + var n raft.Node if len(peers) == 0 { n = raft.RestartNode(c) } else { @@ -460,10 +461,17 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - return id, n, s, w + + return &boostrapRaft{ + id: id, + cl: cl, + node: n, + storage: s, + wal: w, + } } -func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -489,10 +497,16 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - return id, cl, n, s, w + return &boostrapRaft{ + id: id, + cl: cl, + node: n, + storage: s, + wal: w, + } } -func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -551,7 +565,13 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) n := raft.RestartNode(c) raftStatus = n.Status - return id, cl, n, s, w + return &boostrapRaft{ + id: id, + cl: cl, + node: n, + storage: s, + wal: w, + } } func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft.Config { @@ -568,6 +588,14 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft } } +type boostrapRaft struct { + id types.ID + cl *membership.RaftCluster + node raft.Node + storage *raft.MemoryStorage + wal *wal.WAL +} + // getIDs returns an ordered set of IDs included in the given snapshot and // the entries. The given snapshot/entries can contain three kinds of // ID-related entry: diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1a18bffc5..a53d34bb5 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -393,12 +393,8 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { } type boostrapResult struct { - cl *membership.RaftCluster + raft *boostrapRaft remotes []*membership.Member - w *wal.WAL - n raft.Node - s *raft.MemoryStorage - id types.ID prt http.RoundTripper ci cindex.ConsistentIndexer st v2store.Store @@ -469,15 +465,11 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - id, n, s, w := startNode(cfg, cl, nil) - cl.SetID(id, existingCluster.ID()) + br := startNode(cfg, cl, nil) + cl.SetID(br.id, existingCluster.ID()) return &boostrapResult{ - cl: cl, + raft: br, remotes: remotes, - w: w, - n: n, - s: s, - id: id, }, nil } @@ -513,15 +505,11 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - id, n, s, w := startNode(cfg, cl, cl.MemberIDs()) - cl.SetID(id, cl.ID()) + br := startNode(cfg, cl, cl.MemberIDs()) + cl.SetID(br.id, cl.ID()) return &boostrapResult{ - cl: cl, remotes: nil, - w: w, - n: n, - s: s, - id: id, + raft: br, }, nil } @@ -600,15 +588,15 @@ func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backe r := &boostrapResult{} if !cfg.ForceNewCluster { - r.id, r.cl, r.n, r.s, r.w = restartNode(cfg, snapshot) + r.raft = restartNode(cfg, snapshot) } else { - r.id, r.cl, r.n, r.s, r.w = restartAsStandaloneNode(cfg, snapshot) + r.raft = restartAsStandaloneNode(cfg, snapshot) } - r.cl.SetStore(st) - r.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - r.cl.Recover(api.UpdateCapability) - if r.cl.Version() != nil && !r.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + r.raft.cl.SetStore(st) + r.raft.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + r.raft.cl.Recover(api.UpdateCapability) + if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) @@ -630,8 +618,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.raft.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.raft.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -645,26 +633,26 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { r: *newRaftNode( raftNodeConfig{ lg: cfg.Logger, - isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, - Node: b.n, + isIDRemoved: func(id uint64) bool { return b.raft.cl.IsIDRemoved(types.ID(id)) }, + Node: b.raft.node, heartbeat: heartbeat, - raftStorage: b.s, - storage: NewStorage(b.w, b.ss), + raftStorage: b.raft.storage, + storage: NewStorage(b.raft.wal, b.ss), }, ), - id: b.id, + id: b.raft.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.cl, + cluster: b.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.raft.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.ci, firstCommitInTermC: make(chan struct{}), } - serverID.With(prometheus.Labels{"server_id": b.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.raft.id.String()}).Set(1) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -733,9 +721,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.id, + ID: b.raft.id, URLs: cfg.PeerURLs, - ClusterID: b.cl.ID(), + ClusterID: b.raft.cl.ID(), Raft: srv, Snapshotter: b.ss, ServerStats: sstats, @@ -747,12 +735,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } // add all remotes into transport for _, m := range b.remotes { - if m.ID != b.id { + if m.ID != b.raft.id { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range b.cl.Members() { - if m.ID != b.id { + for _, m := range b.raft.cl.Members() { + if m.ID != b.raft.id { tr.AddPeer(m.ID, m.PeerURLs) } } From 08935247a85dc8fe87ba4306a63eeb89989bc4b4 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 17:38:56 +0200 Subject: [PATCH 09/14] etcdserver: Create raftnode based on boostrapRaft struct --- server/etcdserver/raft.go | 53 ++++++++++++++++++++++++++----------- server/etcdserver/server.go | 25 ++++++----------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 39857522e..41144587f 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -33,6 +33,7 @@ import ( "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" + "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" @@ -463,11 +464,13 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. raftStatusMu.Unlock() return &boostrapRaft{ - id: id, - cl: cl, - node: n, - storage: s, - wal: w, + lg: cfg.Logger, + heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, + id: id, + cl: cl, + node: n, + storage: s, + wal: w, } } @@ -498,11 +501,13 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRa raftStatus = n.Status raftStatusMu.Unlock() return &boostrapRaft{ - id: id, - cl: cl, - node: n, - storage: s, - wal: w, + lg: cfg.Logger, + heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, + id: id, + cl: cl, + node: n, + storage: s, + wal: w, } } @@ -566,11 +571,13 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) n := raft.RestartNode(c) raftStatus = n.Status return &boostrapRaft{ - id: id, - cl: cl, - node: n, - storage: s, - wal: w, + lg: cfg.Logger, + heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, + id: id, + cl: cl, + node: n, + storage: s, + wal: w, } } @@ -589,6 +596,9 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft } type boostrapRaft struct { + lg *zap.Logger + heartbeat time.Duration + id types.ID cl *membership.RaftCluster node raft.Node @@ -596,6 +606,19 @@ type boostrapRaft struct { wal *wal.WAL } +func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { + return newRaftNode( + raftNodeConfig{ + lg: b.lg, + isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, + Node: b.node, + heartbeat: b.heartbeat, + raftStorage: b.storage, + storage: NewStorage(b.wal, ss), + }, + ) +} + // getIDs returns an ordered set of IDs included in the given snapshot and // the entries. The given snapshot/entries can contain three kinds of // ID-related entry: diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a53d34bb5..a08419096 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -623,23 +623,14 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ - readych: make(chan struct{}), - Cfg: cfg, - lgMu: new(sync.RWMutex), - lg: cfg.Logger, - errorc: make(chan error, 1), - v2store: b.st, - snapshotter: b.ss, - r: *newRaftNode( - raftNodeConfig{ - lg: cfg.Logger, - isIDRemoved: func(id uint64) bool { return b.raft.cl.IsIDRemoved(types.ID(id)) }, - Node: b.raft.node, - heartbeat: heartbeat, - raftStorage: b.raft.storage, - storage: NewStorage(b.raft.wal, b.ss), - }, - ), + readych: make(chan struct{}), + Cfg: cfg, + lgMu: new(sync.RWMutex), + lg: cfg.Logger, + errorc: make(chan error, 1), + v2store: b.st, + snapshotter: b.ss, + r: *b.raft.newRaftNode(b.ss), id: b.raft.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.raft.cl, From e75dfde4cb3a6aeb6bef00b8d7f80c8ab66811de Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 21:16:38 +0200 Subject: [PATCH 10/14] etcdserver: Move raft node start to just before newRaftNode --- server/etcdserver/raft.go | 46 +++++++++++++++---------------------- server/etcdserver/server.go | 8 +++---- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 41144587f..74715de2d 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -421,7 +421,7 @@ func (r *raftNode) advanceTicks(ticks int) { } } -func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { +func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( &pb.Metadata{ @@ -452,29 +452,20 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. zap.String("cluster-id", cl.ID().String()), ) s := raft.NewMemoryStorage() - c := raftConfig(cfg, uint64(id), s) - var n raft.Node - if len(peers) == 0 { - n = raft.RestartNode(c) - } else { - n = raft.StartNode(c, peers) - } - raftStatusMu.Lock() - raftStatus = n.Status - raftStatusMu.Unlock() return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, id: id, cl: cl, - node: n, + config: raftConfig(cfg, uint64(id), s), + peers: peers, storage: s, wal: w, } } -func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { +func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -495,23 +486,18 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRa } s.SetHardState(st) s.Append(ents) - c := raftConfig(cfg, uint64(id), s) - n := raft.RestartNode(c) - raftStatusMu.Lock() - raftStatus = n.Status - raftStatusMu.Unlock() return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, id: id, cl: cl, - node: n, + config: raftConfig(cfg, uint64(id), s), storage: s, wal: w, } } -func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { +func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -566,16 +552,12 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) } s.SetHardState(st) s.Append(ents) - c := raftConfig(cfg, uint64(id), s) - - n := raft.RestartNode(c) - raftStatus = n.Status return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, id: id, cl: cl, - node: n, + config: raftConfig(cfg, uint64(id), s), storage: s, wal: w, } @@ -599,19 +581,29 @@ type boostrapRaft struct { lg *zap.Logger heartbeat time.Duration + peers []raft.Peer + config *raft.Config id types.ID cl *membership.RaftCluster - node raft.Node storage *raft.MemoryStorage wal *wal.WAL } func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { + var n raft.Node + if len(b.peers) == 0 { + n = raft.RestartNode(b.config) + } else { + n = raft.StartNode(b.config, b.peers) + } + raftStatusMu.Lock() + raftStatus = n.Status + raftStatusMu.Unlock() return newRaftNode( raftNodeConfig{ lg: b.lg, isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, - Node: b.node, + Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, storage: NewStorage(b.wal, ss), diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a08419096..aebe8554a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -465,7 +465,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - br := startNode(cfg, cl, nil) + br := boostrapRaftFromCluster(cfg, cl, nil) cl.SetID(br.id, existingCluster.ID()) return &boostrapResult{ raft: br, @@ -505,7 +505,7 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - br := startNode(cfg, cl, cl.MemberIDs()) + br := boostrapRaftFromCluster(cfg, cl, cl.MemberIDs()) cl.SetID(br.id, cl.ID()) return &boostrapResult{ remotes: nil, @@ -588,9 +588,9 @@ func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backe r := &boostrapResult{} if !cfg.ForceNewCluster { - r.raft = restartNode(cfg, snapshot) + r.raft = boostrapRaftFromWal(cfg, snapshot) } else { - r.raft = restartAsStandaloneNode(cfg, snapshot) + r.raft = boostrapRaftFromWalStandalone(cfg, snapshot) } r.raft.cl.SetStore(st) From a72d4462fee227af1f98d827ef2282c3ace43fed Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 22:40:45 +0200 Subject: [PATCH 11/14] etcdserver: Create boostrap wal functions --- server/etcdserver/raft.go | 94 +++++++++++++++--------------------- server/etcdserver/server.go | 20 ++++---- server/etcdserver/storage.go | 61 +++++++++++++++++------ 3 files changed, 94 insertions(+), 81 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 74715de2d..6d443384b 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -23,7 +23,6 @@ import ( "sync" "time" - pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/contention" @@ -34,7 +33,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" - "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" ) @@ -423,29 +421,17 @@ func (r *raftNode) advanceTicks(ticks int) { func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { member := cl.MemberByName(cfg.Name) - metadata := pbutil.MustMarshal( - &pb.Metadata{ - NodeID: uint64(member.ID), - ClusterID: uint64(cl.ID()), - }, - ) - w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) - if err != nil { - cfg.Logger.Panic("failed to create WAL", zap.Error(err)) - } - if cfg.UnsafeNoFsync { - w.SetUnsafeNoFsync() - } + id := member.ID + wal := boostrapNewWal(cfg, id, cl.ID()) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte - ctx, err = json.Marshal((*cl).Member(id)) + ctx, err := json.Marshal((*cl).Member(id)) if err != nil { cfg.Logger.Panic("failed to marshal member", zap.Error(err)) } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } - id := member.ID cfg.Logger.Info( "starting local member", zap.String("local-member-id", id.String()), @@ -456,12 +442,11 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - id: id, cl: cl, - config: raftConfig(cfg, uint64(id), s), + config: raftConfig(cfg, uint64(wal.id), s), peers: peers, storage: s, - wal: w, + wal: wal, } } @@ -470,30 +455,29 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } - w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) + wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) cfg.Logger.Info( "restarting local member", - zap.String("cluster-id", cid.String()), - zap.String("local-member-id", id.String()), - zap.Uint64("commit-index", st.Commit), + zap.String("cluster-id", wal.cid.String()), + zap.String("local-member-id", wal.id.String()), + zap.Uint64("commit-index", wal.st.Commit), ) cl := membership.NewCluster(cfg.Logger) - cl.SetID(id, cid) + cl.SetID(wal.id, wal.cid) s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } - s.SetHardState(st) - s.Append(ents) + s.SetHardState(*wal.st) + s.Append(wal.ents) return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - id: id, cl: cl, - config: raftConfig(cfg, uint64(id), s), + config: raftConfig(cfg, uint64(wal.id), s), storage: s, - wal: w, + wal: wal, } } @@ -502,18 +486,18 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } - w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) + wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) // discard the previously uncommitted entries - for i, ent := range ents { - if ent.Index > st.Commit { + for i, ent := range wal.ents { + if ent.Index > wal.st.Commit { cfg.Logger.Info( "discarding uncommitted WAL entries", zap.Uint64("entry-index", ent.Index), - zap.Uint64("commit-index-from-wal", st.Commit), - zap.Int("number-of-discarded-entries", len(ents)-i), + zap.Uint64("commit-index-from-wal", wal.st.Commit), + zap.Int("number-of-discarded-entries", len(wal.ents)-i), ) - ents = ents[:i] + wal.ents = wal.ents[:i] break } } @@ -521,45 +505,44 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna // force append the configuration change entries toAppEnts := createConfigChangeEnts( cfg.Logger, - getIDs(cfg.Logger, snapshot, ents), - uint64(id), - st.Term, - st.Commit, + getIDs(cfg.Logger, snapshot, wal.ents), + uint64(wal.id), + wal.st.Term, + wal.st.Commit, ) - ents = append(ents, toAppEnts...) + wal.ents = append(wal.ents, toAppEnts...) // force commit newly appended entries - err := w.Save(raftpb.HardState{}, toAppEnts) + err := wal.w.Save(raftpb.HardState{}, toAppEnts) if err != nil { cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err)) } - if len(ents) != 0 { - st.Commit = ents[len(ents)-1].Index + if len(wal.ents) != 0 { + wal.st.Commit = wal.ents[len(wal.ents)-1].Index } cfg.Logger.Info( "forcing restart member", - zap.String("cluster-id", cid.String()), - zap.String("local-member-id", id.String()), - zap.Uint64("commit-index", st.Commit), + zap.String("cluster-id", wal.cid.String()), + zap.String("local-member-id", wal.id.String()), + zap.Uint64("commit-index", wal.st.Commit), ) cl := membership.NewCluster(cfg.Logger) - cl.SetID(id, cid) + cl.SetID(wal.id, wal.cid) s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } - s.SetHardState(st) - s.Append(ents) + s.SetHardState(*wal.st) + s.Append(wal.ents) return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - id: id, cl: cl, - config: raftConfig(cfg, uint64(id), s), + config: raftConfig(cfg, uint64(wal.id), s), storage: s, - wal: w, + wal: wal, } } @@ -583,10 +566,9 @@ type boostrapRaft struct { peers []raft.Peer config *raft.Config - id types.ID cl *membership.RaftCluster storage *raft.MemoryStorage - wal *wal.WAL + wal *boostrappedWAL } func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { @@ -606,7 +588,7 @@ func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, - storage: NewStorage(b.wal, ss), + storage: NewStorage(b.wal.w, ss), }, ) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index aebe8554a..5c56ebe24 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -466,7 +466,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) br := boostrapRaftFromCluster(cfg, cl, nil) - cl.SetID(br.id, existingCluster.ID()) + cl.SetID(br.wal.id, existingCluster.ID()) return &boostrapResult{ raft: br, remotes: remotes, @@ -506,7 +506,7 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) br := boostrapRaftFromCluster(cfg, cl, cl.MemberIDs()) - cl.SetID(br.id, cl.ID()) + cl.SetID(br.wal.id, cl.ID()) return &boostrapResult{ remotes: nil, raft: br, @@ -618,8 +618,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.raft.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.raft.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -631,19 +631,19 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { v2store: b.st, snapshotter: b.ss, r: *b.raft.newRaftNode(b.ss), - id: b.raft.id, + id: b.raft.wal.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.raft.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.ci, firstCommitInTermC: make(chan struct{}), } - serverID.With(prometheus.Labels{"server_id": b.raft.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -712,7 +712,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.raft.id, + ID: b.raft.wal.id, URLs: cfg.PeerURLs, ClusterID: b.raft.cl.ID(), Raft: srv, @@ -726,12 +726,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } // add all remotes into transport for _, m := range b.remotes { - if m.ID != b.raft.id { + if m.ID != b.raft.wal.id { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range b.raft.cl.Members() { - if m.ID != b.raft.id { + if m.ID != b.raft.wal.id { tr.AddPeer(m.ID, m.PeerURLs) } } diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index e662537d3..df516cb16 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -80,24 +81,21 @@ func (st *storage) Release(snap raftpb.Snapshot) error { return st.Snapshotter.ReleaseSnapDBs(snap) } -// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear +// boostrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. // The snap must have been previously saved to the WAL, or this call will panic. -func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { - var ( - err error - wmetadata []byte - ) - +func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL { repaired := false for { - if w, err = wal.Open(lg, waldir, snap); err != nil { + w, err := wal.Open(lg, waldir, snap) + if err != nil { lg.Fatal("failed to open WAL", zap.Error(err)) } if unsafeNoFsync { w.SetUnsafeNoFsync() } - if wmetadata, st, ents, err = w.ReadAll(); err != nil { + wmetadata, st, ents, err := w.ReadAll() + if err != nil { w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { @@ -111,11 +109,44 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b } continue } - break + var metadata pb.Metadata + pbutil.MustUnmarshal(&metadata, wmetadata) + id := types.ID(metadata.NodeID) + cid := types.ID(metadata.ClusterID) + return &boostrappedWAL{ + w: w, + id: id, + cid: cid, + st: &st, + ents: ents, + } } - var metadata pb.Metadata - pbutil.MustUnmarshal(&metadata, wmetadata) - id = types.ID(metadata.NodeID) - cid = types.ID(metadata.ClusterID) - return w, id, cid, st, ents +} + +func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostrappedWAL { + metadata := pbutil.MustMarshal( + &pb.Metadata{ + NodeID: uint64(nodeID), + ClusterID: uint64(clusterID), + }, + ) + w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) + if err != nil { + cfg.Logger.Panic("failed to create WAL", zap.Error(err)) + } + if cfg.UnsafeNoFsync { + w.SetUnsafeNoFsync() + } + return &boostrappedWAL{ + w: w, + id: nodeID, + cid: clusterID, + } +} + +type boostrappedWAL struct { + w *wal.WAL + id, cid types.ID + st *raftpb.HardState + ents []raftpb.Entry } From 244e5c2cce6c8e93b7521390fa6b80e3155598d5 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 22:53:25 +0200 Subject: [PATCH 12/14] etcdserver: Unify memory storage boostrap --- server/etcdserver/raft.go | 30 +++++-------------------- server/etcdserver/storage.go | 43 +++++++++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 36 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 6d443384b..1a5c4a424 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -33,7 +33,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" - "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" ) @@ -437,8 +436,7 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster zap.String("local-member-id", id.String()), zap.String("cluster-id", cl.ID().String()), ) - s := raft.NewMemoryStorage() - + s := wal.MemoryStorage() return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, @@ -451,11 +449,7 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster } func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { - var walsnap walpb.Snapshot - if snapshot != nil { - walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term - } - wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) + wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) cfg.Logger.Info( "restarting local member", @@ -465,12 +459,7 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo ) cl := membership.NewCluster(cfg.Logger) cl.SetID(wal.id, wal.cid) - s := raft.NewMemoryStorage() - if snapshot != nil { - s.ApplySnapshot(*snapshot) - } - s.SetHardState(*wal.st) - s.Append(wal.ents) + s := wal.MemoryStorage() return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, @@ -482,11 +471,7 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo } func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { - var walsnap walpb.Snapshot - if snapshot != nil { - walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term - } - wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) + wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) // discard the previously uncommitted entries for i, ent := range wal.ents { @@ -530,12 +515,7 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna cl := membership.NewCluster(cfg.Logger) cl.SetID(wal.id, wal.cid) - s := raft.NewMemoryStorage() - if snapshot != nil { - s.ApplySnapshot(*snapshot) - } - s.SetHardState(*wal.st) - s.Append(wal.ents) + s := wal.MemoryStorage() return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index df516cb16..2db3b40b1 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -20,6 +20,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" @@ -84,10 +85,14 @@ func (st *storage) Release(snap raftpb.Snapshot) error { // boostrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. // The snap must have been previously saved to the WAL, or this call will panic. -func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL { +func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL { + var walsnap walpb.Snapshot + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } repaired := false for { - w, err := wal.Open(lg, waldir, snap) + w, err := wal.Open(lg, waldir, walsnap) if err != nil { lg.Fatal("failed to open WAL", zap.Error(err)) } @@ -114,11 +119,12 @@ func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snap walpb.Snapshot, id := types.ID(metadata.NodeID) cid := types.ID(metadata.ClusterID) return &boostrappedWAL{ - w: w, - id: id, - cid: cid, - st: &st, - ents: ents, + w: w, + id: id, + cid: cid, + st: &st, + ents: ents, + snapshot: snapshot, } } } @@ -145,8 +151,23 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr } type boostrappedWAL struct { - w *wal.WAL - id, cid types.ID - st *raftpb.HardState - ents []raftpb.Entry + w *wal.WAL + id, cid types.ID + st *raftpb.HardState + ents []raftpb.Entry + snapshot *raftpb.Snapshot +} + +func (wal *boostrappedWAL) MemoryStorage() *raft.MemoryStorage { + s := raft.NewMemoryStorage() + if wal.snapshot != nil { + s.ApplySnapshot(*wal.snapshot) + } + if wal.st != nil { + s.SetHardState(*wal.st) + } + if len(wal.ents) != 0 { + s.Append(wal.ents) + } + return s } From e1fa356facf3f2c8141fee7d086cf56fb4cfa6e9 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 23:14:09 +0200 Subject: [PATCH 13/14] etcdserver: Refactor standalone boostrap --- server/etcdserver/raft.go | 35 ++++--------------------------- server/etcdserver/storage.go | 40 ++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 31 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 1a5c4a424..42bb4a08b 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -474,37 +474,10 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) // discard the previously uncommitted entries - for i, ent := range wal.ents { - if ent.Index > wal.st.Commit { - cfg.Logger.Info( - "discarding uncommitted WAL entries", - zap.Uint64("entry-index", ent.Index), - zap.Uint64("commit-index-from-wal", wal.st.Commit), - zap.Int("number-of-discarded-entries", len(wal.ents)-i), - ) - wal.ents = wal.ents[:i] - break - } - } - - // force append the configuration change entries - toAppEnts := createConfigChangeEnts( - cfg.Logger, - getIDs(cfg.Logger, snapshot, wal.ents), - uint64(wal.id), - wal.st.Term, - wal.st.Commit, - ) - wal.ents = append(wal.ents, toAppEnts...) - - // force commit newly appended entries - err := wal.w.Save(raftpb.HardState{}, toAppEnts) - if err != nil { - cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err)) - } - if len(wal.ents) != 0 { - wal.st.Commit = wal.ents[len(wal.ents)-1].Index - } + wal.ents = wal.CommitedEntries() + entries := wal.ConfigChangeEntries() + // force commit config change entries + wal.AppendAndCommitEntries(entries) cfg.Logger.Info( "forcing restart member", diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index 2db3b40b1..4b0049196 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -119,6 +119,7 @@ func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sna id := types.ID(metadata.NodeID) cid := types.ID(metadata.ClusterID) return &boostrappedWAL{ + lg: lg, w: w, id: id, cid: cid, @@ -144,6 +145,7 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr w.SetUnsafeNoFsync() } return &boostrappedWAL{ + lg: cfg.Logger, w: w, id: nodeID, cid: clusterID, @@ -151,6 +153,8 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr } type boostrappedWAL struct { + lg *zap.Logger + w *wal.WAL id, cid types.ID st *raftpb.HardState @@ -171,3 +175,39 @@ func (wal *boostrappedWAL) MemoryStorage() *raft.MemoryStorage { } return s } + +func (wal *boostrappedWAL) CommitedEntries() []raftpb.Entry { + for i, ent := range wal.ents { + if ent.Index > wal.st.Commit { + wal.lg.Info( + "discarding uncommitted WAL entries", + zap.Uint64("entry-index", ent.Index), + zap.Uint64("commit-index-from-wal", wal.st.Commit), + zap.Int("number-of-discarded-entries", len(wal.ents)-i), + ) + return wal.ents[:i] + } + } + return wal.ents +} + +func (wal *boostrappedWAL) ConfigChangeEntries() []raftpb.Entry { + return createConfigChangeEnts( + wal.lg, + getIDs(wal.lg, wal.snapshot, wal.ents), + uint64(wal.id), + wal.st.Term, + wal.st.Commit, + ) +} + +func (wal *boostrappedWAL) AppendAndCommitEntries(ents []raftpb.Entry) { + wal.ents = append(wal.ents, ents...) + err := wal.w.Save(raftpb.HardState{}, ents) + if err != nil { + wal.lg.Fatal("failed to save hard state and entries", zap.Error(err)) + } + if len(wal.ents) != 0 { + wal.st.Commit = wal.ents[len(wal.ents)-1].Index + } +} From 9824cc96ed28ca27643ede2c80b2ab6095e503c5 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 8 Jul 2021 00:35:59 +0200 Subject: [PATCH 14/14] etcdserver: Fix typos in bootstrap --- server/etcdserver/raft.go | 24 +++++++++++------------ server/etcdserver/server.go | 38 ++++++++++++++++++------------------ server/etcdserver/storage.go | 20 +++++++++---------- 3 files changed, 41 insertions(+), 41 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 42bb4a08b..6acacf8b4 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -418,10 +418,10 @@ func (r *raftNode) advanceTicks(ticks int) { } } -func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { +func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { member := cl.MemberByName(cfg.Name) id := member.ID - wal := boostrapNewWal(cfg, id, cl.ID()) + wal := bootstrapNewWAL(cfg, id, cl.ID()) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte @@ -437,7 +437,7 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster zap.String("cluster-id", cl.ID().String()), ) s := wal.MemoryStorage() - return &boostrapRaft{ + return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, @@ -448,8 +448,8 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster } } -func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { - wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) +func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { + wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) cfg.Logger.Info( "restarting local member", @@ -460,7 +460,7 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo cl := membership.NewCluster(cfg.Logger) cl.SetID(wal.id, wal.cid) s := wal.MemoryStorage() - return &boostrapRaft{ + return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, @@ -470,8 +470,8 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo } } -func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { - wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) +func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { + wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) // discard the previously uncommitted entries wal.ents = wal.CommitedEntries() @@ -489,7 +489,7 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna cl := membership.NewCluster(cfg.Logger) cl.SetID(wal.id, wal.cid) s := wal.MemoryStorage() - return &boostrapRaft{ + return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, @@ -513,7 +513,7 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft } } -type boostrapRaft struct { +type bootstrappedRaft struct { lg *zap.Logger heartbeat time.Duration @@ -521,10 +521,10 @@ type boostrapRaft struct { config *raft.Config cl *membership.RaftCluster storage *raft.MemoryStorage - wal *boostrappedWAL + wal *bootstrappedWAL } -func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { +func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { var n raft.Node if len(b.peers) == 0 { n = raft.RestartNode(b.config) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 5c56ebe24..156213644 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -330,7 +330,7 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) { bh.confStateDirty = true } -func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { +func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) if cfg.MaxRequestBytes > recommendedMaxRequestBytes { @@ -348,9 +348,9 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { } haveWAL := wal.Exist(cfg.WALDir()) - ss := boostrapSnapshotter(cfg) + ss := bootstrapSnapshot(cfg) - be, ci, beExist, beHooks, err := boostrapBackend(cfg) + be, ci, beExist, beHooks, err := bootstrapBackend(cfg) if err != nil { return nil, err } @@ -369,9 +369,9 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { case !haveWAL && !cfg.NewCluster: b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) case !haveWAL && cfg.NewCluster: - b, err = boostrapNewClusterNoWAL(cfg, prt, st, be) + b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) case haveWAL: - b, err = boostrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) + b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) default: be.Close() return nil, fmt.Errorf("unsupported bootstrap config") @@ -392,8 +392,8 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { return b, nil } -type boostrapResult struct { - raft *boostrapRaft +type bootstrappedServer struct { + raft *bootstrappedRaft remotes []*membership.Member prt http.RoundTripper ci cindex.ConsistentIndexer @@ -403,7 +403,7 @@ type boostrapResult struct { beHooks *backendHooks } -func boostrapSnapshotter(cfg config.ServerConfig) *snap.Snapshotter { +func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { if err := fileutil.TouchDirAll(cfg.SnapDir()); err != nil { cfg.Logger.Fatal( "failed to create snapshot directory", @@ -424,7 +424,7 @@ func boostrapSnapshotter(cfg config.ServerConfig) *snap.Snapshotter { return snap.New(cfg.Logger, cfg.SnapDir()) } -func boostrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) { +func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) { beExist = fileutil.Exist(cfg.BackendPath()) ci = cindex.NewConsistentIndex(nil) beHooks = &backendHooks{lg: cfg.Logger, indexer: ci} @@ -442,7 +442,7 @@ func boostrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Con return be, ci, beExist, beHooks, nil } -func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) { +func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } @@ -465,15 +465,15 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - br := boostrapRaftFromCluster(cfg, cl, nil) + br := bootstrapRaftFromCluster(cfg, cl, nil) cl.SetID(br.wal.id, existingCluster.ID()) - return &boostrapResult{ + return &bootstrappedServer{ raft: br, remotes: remotes, }, nil } -func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) { +func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { if err := cfg.VerifyBootstrap(); err != nil { return nil, err } @@ -505,15 +505,15 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - br := boostrapRaftFromCluster(cfg, cl, cl.MemberIDs()) + br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs()) cl.SetID(br.wal.id, cl.ID()) - return &boostrapResult{ + return &bootstrappedServer{ remotes: nil, raft: br, }, nil } -func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*boostrapResult, error) { +func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -586,11 +586,11 @@ func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backe cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } - r := &boostrapResult{} + r := &bootstrappedServer{} if !cfg.ForceNewCluster { - r.raft = boostrapRaftFromWal(cfg, snapshot) + r.raft = bootstrapRaftFromWal(cfg, snapshot) } else { - r.raft = boostrapRaftFromWalStandalone(cfg, snapshot) + r.raft = bootstrapRaftFromWalStandalone(cfg, snapshot) } r.raft.cl.SetStore(st) diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index 4b0049196..080fe3f1f 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -82,10 +82,10 @@ func (st *storage) Release(snap raftpb.Snapshot) error { return st.Snapshotter.ReleaseSnapDBs(snap) } -// boostrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear +// bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. // The snap must have been previously saved to the WAL, or this call will panic. -func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL { +func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *bootstrappedWAL { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -118,7 +118,7 @@ func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sna pbutil.MustUnmarshal(&metadata, wmetadata) id := types.ID(metadata.NodeID) cid := types.ID(metadata.ClusterID) - return &boostrappedWAL{ + return &bootstrappedWAL{ lg: lg, w: w, id: id, @@ -130,7 +130,7 @@ func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sna } } -func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostrappedWAL { +func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *bootstrappedWAL { metadata := pbutil.MustMarshal( &pb.Metadata{ NodeID: uint64(nodeID), @@ -144,7 +144,7 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr if cfg.UnsafeNoFsync { w.SetUnsafeNoFsync() } - return &boostrappedWAL{ + return &bootstrappedWAL{ lg: cfg.Logger, w: w, id: nodeID, @@ -152,7 +152,7 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr } } -type boostrappedWAL struct { +type bootstrappedWAL struct { lg *zap.Logger w *wal.WAL @@ -162,7 +162,7 @@ type boostrappedWAL struct { snapshot *raftpb.Snapshot } -func (wal *boostrappedWAL) MemoryStorage() *raft.MemoryStorage { +func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage { s := raft.NewMemoryStorage() if wal.snapshot != nil { s.ApplySnapshot(*wal.snapshot) @@ -176,7 +176,7 @@ func (wal *boostrappedWAL) MemoryStorage() *raft.MemoryStorage { return s } -func (wal *boostrappedWAL) CommitedEntries() []raftpb.Entry { +func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { for i, ent := range wal.ents { if ent.Index > wal.st.Commit { wal.lg.Info( @@ -191,7 +191,7 @@ func (wal *boostrappedWAL) CommitedEntries() []raftpb.Entry { return wal.ents } -func (wal *boostrappedWAL) ConfigChangeEntries() []raftpb.Entry { +func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry { return createConfigChangeEnts( wal.lg, getIDs(wal.lg, wal.snapshot, wal.ents), @@ -201,7 +201,7 @@ func (wal *boostrappedWAL) ConfigChangeEntries() []raftpb.Entry { ) } -func (wal *boostrappedWAL) AppendAndCommitEntries(ents []raftpb.Entry) { +func (wal *bootstrappedWAL) AppendAndCommitEntries(ents []raftpb.Entry) { wal.ents = append(wal.ents, ents...) err := wal.w.Save(raftpb.HardState{}, ents) if err != nil {