From 6c8a4fdcc5f84022782893c89cc009c03323badb Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 5 Aug 2021 10:51:31 +0200 Subject: [PATCH 01/18] server: Rename bootstrappedWal variables to bwal to separate it from wal package --- server/etcdserver/bootstrap.go | 46 +++++++++++++++++----------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index da9cc6da0..2a1b4b92a 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -357,7 +357,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { member := cl.MemberByName(cfg.Name) id := member.ID - wal := bootstrapNewWAL(cfg, id, cl.ID()) + bwal := bootstrapNewWAL(cfg, id, cl.ID()) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte @@ -372,66 +372,66 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste zap.String("local-member-id", id.String()), zap.String("cluster-id", cl.ID().String()), ) - s := wal.MemoryStorage() + s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, - config: raftConfig(cfg, uint64(wal.id), s), + config: raftConfig(cfg, uint64(bwal.id), s), peers: peers, storage: s, - wal: wal, + wal: bwal, } } func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) + bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) cfg.Logger.Info( "restarting local member", - zap.String("cluster-id", wal.cid.String()), - zap.String("local-member-id", wal.id.String()), - zap.Uint64("commit-index", wal.st.Commit), + zap.String("cluster-id", bwal.cid.String()), + zap.String("local-member-id", bwal.id.String()), + zap.Uint64("commit-index", bwal.st.Commit), ) cl := membership.NewCluster(cfg.Logger) - cl.SetID(wal.id, wal.cid) - s := wal.MemoryStorage() + cl.SetID(bwal.id, bwal.cid) + s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, - config: raftConfig(cfg, uint64(wal.id), s), + config: raftConfig(cfg, uint64(bwal.id), s), storage: s, - wal: wal, + wal: bwal, } } func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) + bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) // discard the previously uncommitted entries - wal.ents = wal.CommitedEntries() - entries := wal.ConfigChangeEntries() + bwal.ents = bwal.CommitedEntries() + entries := bwal.ConfigChangeEntries() // force commit config change entries - wal.AppendAndCommitEntries(entries) + bwal.AppendAndCommitEntries(entries) cfg.Logger.Info( "forcing restart member", - zap.String("cluster-id", wal.cid.String()), - zap.String("local-member-id", wal.id.String()), - zap.Uint64("commit-index", wal.st.Commit), + zap.String("cluster-id", bwal.cid.String()), + zap.String("local-member-id", bwal.id.String()), + zap.Uint64("commit-index", bwal.st.Commit), ) cl := membership.NewCluster(cfg.Logger) - cl.SetID(wal.id, wal.cid) - s := wal.MemoryStorage() + cl.SetID(bwal.id, bwal.cid) + s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, - config: raftConfig(cfg, uint64(wal.id), s), + config: raftConfig(cfg, uint64(bwal.id), s), storage: s, - wal: wal, + wal: bwal, } } From aa0c05000319ff6db6519d0cc0f98373f97b3322 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 10:54:51 +0200 Subject: [PATCH 02/18] etcdserver: Add more hierarchy bootstap introducing a separate storage bootstrap step --- server/etcdserver/bootstrap.go | 113 +++++++++++++++++++-------------- server/etcdserver/server.go | 36 +++++------ 2 files changed, 84 insertions(+), 65 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 2a1b4b92a..60ffc5fc6 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -49,7 +49,6 @@ import ( ) func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { - st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) if cfg.MaxRequestBytes > recommendedMaxRequestBytes { cfg.Logger.Warn( @@ -64,9 +63,49 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil { return nil, fmt.Errorf("cannot access data directory: %v", terr) } - - haveWAL := wal.Exist(cfg.WALDir()) ss := bootstrapSnapshot(cfg) + prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout()) + if err != nil { + return nil, err + } + + if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { + return nil, fmt.Errorf("cannot access member directory: %v", terr) + } + + storage, err := bootstrapStorage(cfg, ss, prt) + if err != nil { + return nil, err + } + return &bootstrappedServer{ + prt: prt, + ss: ss, + storage: storage, + }, nil +} + +type bootstrappedServer struct { + storage *bootstrappedStorage + prt http.RoundTripper + ss *snap.Snapshotter +} + +type bootstrappedStorage struct { + cluster *bootstrapedCluster + beHooks *serverstorage.BackendHooks + st v2store.Store + be backend.Backend + ci cindex.ConsistentIndexer +} + +type bootstrapedCluster struct { + raft *bootstrappedRaft + remotes []*membership.Member +} + +func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { + st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) + haveWAL := wal.Exist(cfg.WALDir()) be, ci, beExist, beHooks, err := bootstrapBackend(cfg) if err != nil { @@ -77,19 +116,14 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { be.Close() } }() - - prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout()) - if err != nil { - return nil, err - } - + var c *bootstrapedCluster switch { case !haveWAL && !cfg.NewCluster: - b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) + c, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) case !haveWAL && cfg.NewCluster: - b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) + c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) case haveWAL: - b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) + c, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) default: be.Close() return nil, fmt.Errorf("unsupported bootstrap config") @@ -97,28 +131,13 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { if err != nil { return nil, err } - - if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { - return nil, fmt.Errorf("cannot access member directory: %v", terr) - } - b.prt = prt - b.ci = ci - b.st = st - b.be = be - b.ss = ss - b.beHooks = beHooks - return b, nil -} - -type bootstrappedServer struct { - raft *bootstrappedRaft - remotes []*membership.Member - prt http.RoundTripper - ci cindex.ConsistentIndexer - st v2store.Store - be backend.Backend - ss *snap.Snapshotter - beHooks *serverstorage.BackendHooks + return &bootstrappedStorage{ + cluster: c, + beHooks: beHooks, + st: st, + be: be, + ci: ci, + }, nil } func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { @@ -192,7 +211,7 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { return be.Defrag() } -func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { +func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } @@ -217,13 +236,13 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) br := bootstrapRaftFromCluster(cfg, cl, nil) cl.SetID(br.wal.id, existingCluster.ID()) - return &bootstrappedServer{ + return &bootstrapedCluster{ raft: br, remotes: remotes, }, nil } -func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { +func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { if err := cfg.VerifyBootstrap(); err != nil { return nil, err } @@ -257,13 +276,13 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs()) cl.SetID(br.wal.id, cl.ID()) - return &bootstrappedServer{ + return &bootstrapedCluster{ remotes: nil, raft: br, }, nil } -func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) { +func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -336,22 +355,22 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } - r := &bootstrappedServer{} + b := &bootstrapedCluster{} if !cfg.ForceNewCluster { - r.raft = bootstrapRaftFromWal(cfg, snapshot) + b.raft = bootstrapRaftFromWal(cfg, snapshot) } else { - r.raft = bootstrapRaftFromWalStandalone(cfg, snapshot) + b.raft = bootstrapRaftFromWalStandalone(cfg, snapshot) } - r.raft.cl.SetStore(st) - r.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - r.raft.cl.Recover(api.UpdateCapability) - if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + b.raft.cl.SetStore(st) + b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) + b.raft.cl.Recover(api.UpdateCapability) + if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) } - return r, nil + return b, nil } func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4a9d55efa..2453329f8 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -304,12 +304,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { defer func() { if err != nil { - b.be.Close() + b.storage.be.Close() } }() - sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.raft.wal.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.raft.wal.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -318,28 +318,28 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { lgMu: new(sync.RWMutex), lg: cfg.Logger, errorc: make(chan error, 1), - v2store: b.st, + v2store: b.storage.st, snapshotter: b.ss, - r: *b.raft.newRaftNode(b.ss), - id: b.raft.wal.id, + r: *b.storage.cluster.raft.newRaftNode(b.ss), + id: b.storage.cluster.raft.wal.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.raft.cl, + cluster: b.storage.cluster.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.raft.wal.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: b.ci, + consistIndex: b.storage.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } - serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.storage.cluster.raft.wal.id.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) - srv.be = b.be - srv.beHooks = b.beHooks + srv.be = b.storage.be + srv.beHooks = b.storage.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. @@ -403,9 +403,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.raft.wal.id, + ID: b.storage.cluster.raft.wal.id, URLs: cfg.PeerURLs, - ClusterID: b.raft.cl.ID(), + ClusterID: b.storage.cluster.raft.cl.ID(), Raft: srv, Snapshotter: b.ss, ServerStats: sstats, @@ -416,13 +416,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } // add all remotes into transport - for _, m := range b.remotes { - if m.ID != b.raft.wal.id { + for _, m := range b.storage.cluster.remotes { + if m.ID != b.storage.cluster.raft.wal.id { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range b.raft.cl.Members() { - if m.ID != b.raft.wal.id { + for _, m := range b.storage.cluster.raft.cl.Members() { + if m.ID != b.storage.cluster.raft.wal.id { tr.AddPeer(m.ID, m.PeerURLs) } } From db06a4ab28e23170f6cefc70e7b40e2c9410d11d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 11:51:18 +0200 Subject: [PATCH 03/18] server: Move wal bootstrap up the hierarchy --- server/etcdserver/bootstrap.go | 104 ++++++++++++++------------------- server/etcdserver/server.go | 18 +++--- 2 files changed, 52 insertions(+), 70 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 60ffc5fc6..aafb37969 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -101,6 +101,7 @@ type bootstrappedStorage struct { type bootstrapedCluster struct { raft *bootstrappedRaft remotes []*membership.Member + wal *bootstrappedWAL } func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { @@ -123,7 +124,7 @@ func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.Ro case !haveWAL && cfg.NewCluster: c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) case haveWAL: - c, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) + c, err = bootstrapClusterWithWAL(cfg, st, be, ss, beExist, beHooks, ci) default: be.Close() return nil, fmt.Errorf("unsupported bootstrap config") @@ -234,11 +235,13 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - br := bootstrapRaftFromCluster(cfg, cl, nil) - cl.SetID(br.wal.id, existingCluster.ID()) + bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID()) + br := bootstrapRaftFromCluster(cfg, cl, nil, bwal) + cl.SetID(bwal.id, existingCluster.ID()) return &bootstrapedCluster{ raft: br, remotes: remotes, + wal: bwal, }, nil } @@ -274,15 +277,17 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs()) - cl.SetID(br.wal.id, cl.ID()) + bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID()) + br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal) + cl.SetID(bwal.id, cl.ID()) return &bootstrapedCluster{ remotes: nil, raft: br, + wal: bwal, }, nil } -func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -294,7 +299,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back if cfg.ShouldDiscover() { cfg.Logger.Warn( "discovery token is ignored since cluster already initialized; valid logs are found", - zap.String("wal-dir", cfg.WALDir()), + zap.String("bwal-dir", cfg.WALDir()), ) } @@ -304,7 +309,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back return nil, err } // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding - // wal log entries + // bwal log entries snapshot, err := ss.LoadNewestAvailable(walSnaps) if err != nil && err != snap.ErrNoSnapshot { return nil, err @@ -355,12 +360,31 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } - b := &bootstrapedCluster{} - if !cfg.ForceNewCluster { - b.raft = bootstrapRaftFromWal(cfg, snapshot) - } else { - b.raft = bootstrapRaftFromWalStandalone(cfg, snapshot) + bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) + b := &bootstrapedCluster{ + wal: bwal, } + if cfg.ForceNewCluster { + // discard the previously uncommitted entries + bwal.ents = bwal.CommitedEntries() + entries := bwal.ConfigChangeEntries() + // force commit config change entries + bwal.AppendAndCommitEntries(entries) + cfg.Logger.Info( + "forcing restart member", + zap.String("cluster-id", bwal.cid.String()), + zap.String("local-member-id", bwal.id.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } else { + cfg.Logger.Info( + "restarting local member", + zap.String("cluster-id", bwal.cid.String()), + zap.String("local-member-id", bwal.id.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } + b.raft = bootstrapRaftFromWal(cfg, bwal) b.raft.cl.SetStore(st) b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) @@ -373,10 +397,8 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back return b, nil } -func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { +func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft { member := cl.MemberByName(cfg.Name) - id := member.ID - bwal := bootstrapNewWAL(cfg, id, cl.ID()) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte @@ -388,7 +410,7 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste } cfg.Logger.Info( "starting local member", - zap.String("local-member-id", id.String()), + zap.String("local-member-id", member.ID.String()), zap.String("cluster-id", cl.ID().String()), ) s := bwal.MemoryStorage() @@ -396,22 +418,13 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, - config: raftConfig(cfg, uint64(bwal.id), s), + config: raftConfig(cfg, uint64(member.ID), s), peers: peers, storage: s, - wal: bwal, } } -func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) - - cfg.Logger.Info( - "restarting local member", - zap.String("cluster-id", bwal.cid.String()), - zap.String("local-member-id", bwal.id.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) +func bootstrapRaftFromWal(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft { cl := membership.NewCluster(cfg.Logger) cl.SetID(bwal.id, bwal.cid) s := bwal.MemoryStorage() @@ -421,36 +434,6 @@ func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *b cl: cl, config: raftConfig(cfg, uint64(bwal.id), s), storage: s, - wal: bwal, - } -} - -func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) - - // discard the previously uncommitted entries - bwal.ents = bwal.CommitedEntries() - entries := bwal.ConfigChangeEntries() - // force commit config change entries - bwal.AppendAndCommitEntries(entries) - - cfg.Logger.Info( - "forcing restart member", - zap.String("cluster-id", bwal.cid.String()), - zap.String("local-member-id", bwal.id.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) - - cl := membership.NewCluster(cfg.Logger) - cl.SetID(bwal.id, bwal.cid) - s := bwal.MemoryStorage() - return &bootstrappedRaft{ - lg: cfg.Logger, - heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - cl: cl, - config: raftConfig(cfg, uint64(bwal.id), s), - storage: s, - wal: bwal, } } @@ -476,10 +459,9 @@ type bootstrappedRaft struct { config *raft.Config cl *membership.RaftCluster storage *raft.MemoryStorage - wal *bootstrappedWAL } -func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { +func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raftNode { var n raft.Node if len(b.peers) == 0 { n = raft.RestartNode(b.config) @@ -496,7 +478,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, - storage: NewStorage(b.wal.w, ss), + storage: NewStorage(wal, ss), }, ) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2453329f8..89d043ece 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.raft.wal.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.raft.wal.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.wal.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.wal.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -320,21 +320,21 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { errorc: make(chan error, 1), v2store: b.storage.st, snapshotter: b.ss, - r: *b.storage.cluster.raft.newRaftNode(b.ss), - id: b.storage.cluster.raft.wal.id, + r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w), + id: b.storage.cluster.wal.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.storage.cluster.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.raft.wal.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.wal.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.storage.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } - serverID.With(prometheus.Labels{"server_id": b.storage.cluster.raft.wal.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.storage.cluster.wal.id.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -403,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.storage.cluster.raft.wal.id, + ID: b.storage.cluster.wal.id, URLs: cfg.PeerURLs, ClusterID: b.storage.cluster.raft.cl.ID(), Raft: srv, @@ -417,12 +417,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } // add all remotes into transport for _, m := range b.storage.cluster.remotes { - if m.ID != b.storage.cluster.raft.wal.id { + if m.ID != b.storage.cluster.wal.id { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range b.storage.cluster.raft.cl.Members() { - if m.ID != b.storage.cluster.raft.wal.id { + if m.ID != b.storage.cluster.wal.id { tr.AddPeer(m.ID, m.PeerURLs) } } From 6a4ea70aef49db1cd193570048e63b0b694c026f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 12:19:35 +0200 Subject: [PATCH 04/18] server: Move clusterID and nodeID up the bootstrap hierarchy --- server/etcdserver/bootstrap.go | 86 +++++++++++++++++++--------------- server/etcdserver/server.go | 16 +++---- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index aafb37969..cd63b44a9 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -99,9 +99,10 @@ type bootstrappedStorage struct { } type bootstrapedCluster struct { - raft *bootstrappedRaft - remotes []*membership.Member - wal *bootstrappedWAL + raft *bootstrappedRaft + remotes []*membership.Member + wal *bootstrappedWAL + clusterID, nodeID types.ID } func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { @@ -235,13 +236,16 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID()) + member := cl.MemberByName(cfg.Name) + bwal := bootstrapNewWAL(cfg, member, cl) br := bootstrapRaftFromCluster(cfg, cl, nil, bwal) - cl.SetID(bwal.id, existingCluster.ID()) + cl.SetID(member.ID, existingCluster.ID()) return &bootstrapedCluster{ - raft: br, - remotes: remotes, - wal: bwal, + raft: br, + remotes: remotes, + wal: bwal, + clusterID: cl.ID(), + nodeID: member.ID, }, nil } @@ -277,13 +281,16 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID()) + member := cl.MemberByName(cfg.Name) + bwal := bootstrapNewWAL(cfg, member, cl) br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal) - cl.SetID(bwal.id, cl.ID()) + cl.SetID(member.ID, cl.ID()) return &bootstrapedCluster{ - remotes: nil, - raft: br, - wal: bwal, + remotes: nil, + raft: br, + wal: bwal, + clusterID: cl.ID(), + nodeID: member.ID, }, nil } @@ -360,31 +367,34 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } - bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) + bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) + b := &bootstrapedCluster{ - wal: bwal, + wal: bwal, + clusterID: meta.clusterID, + nodeID: meta.nodeID, } if cfg.ForceNewCluster { // discard the previously uncommitted entries bwal.ents = bwal.CommitedEntries() - entries := bwal.ConfigChangeEntries() + entries := bwal.ConfigChangeEntries(meta) // force commit config change entries bwal.AppendAndCommitEntries(entries) cfg.Logger.Info( "forcing restart member", - zap.String("cluster-id", bwal.cid.String()), - zap.String("local-member-id", bwal.id.String()), + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), zap.Uint64("commit-index", bwal.st.Commit), ) } else { cfg.Logger.Info( "restarting local member", - zap.String("cluster-id", bwal.cid.String()), - zap.String("local-member-id", bwal.id.String()), + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), zap.Uint64("commit-index", bwal.st.Commit), ) } - b.raft = bootstrapRaftFromWal(cfg, bwal) + b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) b.raft.cl.SetStore(st) b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) @@ -424,15 +434,15 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste } } -func bootstrapRaftFromWal(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft { +func bootstrapRaftFromSnapshot(cfg config.ServerConfig, bwal *bootstrappedWAL, meta *snapshotMetadata) *bootstrappedRaft { cl := membership.NewCluster(cfg.Logger) - cl.SetID(bwal.id, bwal.cid) + cl.SetID(meta.nodeID, meta.clusterID) s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, - config: raftConfig(cfg, uint64(bwal.id), s), + config: raftConfig(cfg, uint64(meta.nodeID), s), storage: s, } } @@ -486,7 +496,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raft // bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. // The snap must have been previously saved to the WAL, or this call will panic. -func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *bootstrappedWAL { +func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) (*bootstrappedWAL, *snapshotMetadata) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -519,23 +529,26 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn pbutil.MustUnmarshal(&metadata, wmetadata) id := types.ID(metadata.NodeID) cid := types.ID(metadata.ClusterID) + meta := &snapshotMetadata{clusterID: cid, nodeID: id} return &bootstrappedWAL{ lg: lg, w: w, - id: id, - cid: cid, st: &st, ents: ents, snapshot: snapshot, - } + }, meta } } -func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *bootstrappedWAL { +type snapshotMetadata struct { + nodeID, clusterID types.ID +} + +func bootstrapNewWAL(cfg config.ServerConfig, m *membership.Member, cl *membership.RaftCluster) *bootstrappedWAL { metadata := pbutil.MustMarshal( &etcdserverpb.Metadata{ - NodeID: uint64(nodeID), - ClusterID: uint64(clusterID), + NodeID: uint64(m.ID), + ClusterID: uint64(cl.ID()), }, ) w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) @@ -546,10 +559,8 @@ func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *boots w.SetUnsafeNoFsync() } return &bootstrappedWAL{ - lg: cfg.Logger, - w: w, - id: nodeID, - cid: clusterID, + lg: cfg.Logger, + w: w, } } @@ -557,7 +568,6 @@ type bootstrappedWAL struct { lg *zap.Logger w *wal.WAL - id, cid types.ID st *raftpb.HardState ents []raftpb.Entry snapshot *raftpb.Snapshot @@ -592,11 +602,11 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { return wal.ents } -func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry { +func (wal *bootstrappedWAL) ConfigChangeEntries(meta *snapshotMetadata) []raftpb.Entry { return serverstorage.CreateConfigChangeEnts( wal.lg, serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents), - uint64(wal.id), + uint64(meta.nodeID), wal.st.Term, wal.st.Commit, ) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 89d043ece..8923fd835 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.wal.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.wal.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.nodeID.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.nodeID.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -321,20 +321,20 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { v2store: b.storage.st, snapshotter: b.ss, r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w), - id: b.storage.cluster.wal.id, + id: b.storage.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.storage.cluster.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.wal.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.nodeID), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.storage.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } - serverID.With(prometheus.Labels{"server_id": b.storage.cluster.wal.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.storage.cluster.nodeID.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -403,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.storage.cluster.wal.id, + ID: b.storage.cluster.nodeID, URLs: cfg.PeerURLs, ClusterID: b.storage.cluster.raft.cl.ID(), Raft: srv, @@ -417,12 +417,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } // add all remotes into transport for _, m := range b.storage.cluster.remotes { - if m.ID != b.storage.cluster.wal.id { + if m.ID != b.storage.cluster.nodeID { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range b.storage.cluster.raft.cl.Members() { - if m.ID != b.storage.cluster.wal.id { + if m.ID != b.storage.cluster.nodeID { tr.AddPeer(m.ID, m.PeerURLs) } } From 648bac833feb1011cd571685b19bf1fd3da3b6d9 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 12:23:23 +0200 Subject: [PATCH 05/18] server: Move bootstrappedRaft up in file --- server/etcdserver/bootstrap.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index cd63b44a9..26f478583 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -105,6 +105,16 @@ type bootstrapedCluster struct { clusterID, nodeID types.ID } +type bootstrappedRaft struct { + lg *zap.Logger + heartbeat time.Duration + + peers []raft.Peer + config *raft.Config + cl *membership.RaftCluster + storage *raft.MemoryStorage +} + func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) haveWAL := wal.Exist(cfg.WALDir()) @@ -461,16 +471,6 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft } } -type bootstrappedRaft struct { - lg *zap.Logger - heartbeat time.Duration - - peers []raft.Peer - config *raft.Config - cl *membership.RaftCluster - storage *raft.MemoryStorage -} - func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raftNode { var n raft.Node if len(b.peers) == 0 { From c97ab8f5e01b8ccb228b85bc23b357ed23a0984e Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 12:53:42 +0200 Subject: [PATCH 06/18] server: Move cluster up the bootstrap hierarchy --- server/etcdserver/bootstrap.go | 71 ++++++++++++++++++---------------- server/etcdserver/server.go | 26 ++++++------- 2 files changed, 51 insertions(+), 46 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 26f478583..4b1859487 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -77,25 +77,33 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { if err != nil { return nil, err } + + cluster, err := bootstrapCluster(cfg, storage, prt, ss) + if err != nil { + storage.be.Close() + return nil, err + } return &bootstrappedServer{ prt: prt, ss: ss, storage: storage, + cluster: cluster, }, nil } type bootstrappedServer struct { storage *bootstrappedStorage + cluster *bootstrapedCluster prt http.RoundTripper ss *snap.Snapshotter } type bootstrappedStorage struct { - cluster *bootstrapedCluster beHooks *serverstorage.BackendHooks st v2store.Store be backend.Backend ci cindex.ConsistentIndexer + beExist bool } type bootstrapedCluster struct { @@ -117,38 +125,17 @@ type bootstrappedRaft struct { func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) - haveWAL := wal.Exist(cfg.WALDir()) be, ci, beExist, beHooks, err := bootstrapBackend(cfg) if err != nil { return nil, err } - defer func() { - if err != nil { - be.Close() - } - }() - var c *bootstrapedCluster - switch { - case !haveWAL && !cfg.NewCluster: - c, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) - case !haveWAL && cfg.NewCluster: - c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) - case haveWAL: - c, err = bootstrapClusterWithWAL(cfg, st, be, ss, beExist, beHooks, ci) - default: - be.Close() - return nil, fmt.Errorf("unsupported bootstrap config") - } - if err != nil { - return nil, err - } return &bootstrappedStorage{ - cluster: c, beHooks: beHooks, st: st, be: be, ci: ci, + beExist: beExist, }, nil } @@ -223,6 +210,24 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { return be.Defrag() } +func bootstrapCluster(cfg config.ServerConfig, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) { + haveWAL := wal.Exist(cfg.WALDir()) + switch { + case !haveWAL && !cfg.NewCluster: + c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.be) + case !haveWAL && cfg.NewCluster: + c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.be) + case haveWAL: + c, err = bootstrapClusterWithWAL(cfg, storage, ss) + default: + return nil, fmt.Errorf("unsupported bootstrap config") + } + if err != nil { + return nil, err + } + return c, nil +} + func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err @@ -304,7 +309,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st }, nil } -func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -333,11 +338,11 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe } if snapshot != nil { - if err = st.Recovery(snapshot.Data); err != nil { + if err = storage.st.Recovery(snapshot.Data); err != nil { cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) } - if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { + if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, storage.st, cfg.V2Deprecation); err != nil { cfg.Logger.Error("illegal v2store content", zap.Error(err)) return nil, err } @@ -348,10 +353,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), ) - if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { + if storage.be, err = serverstorage.RecoverSnapshotBackend(cfg, storage.be, *snapshot, storage.beExist, storage.beHooks); err != nil { cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } - s1, s2 := be.Size(), be.SizeInUse() + s1, s2 := storage.be.Size(), storage.be.SizeInUse() cfg.Logger.Info( "recovered v3 backend from snapshot", zap.Int64("backend-size-bytes", s1), @@ -359,10 +364,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe zap.Int64("backend-size-in-use-bytes", s2), zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), ) - if beExist { + if storage.beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. - kvindex := ci.ConsistentIndex() + kvindex := storage.ci.ConsistentIndex() if kvindex < snapshot.Metadata.Index { if kvindex != 0 { return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) @@ -406,10 +411,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe } b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) - b.raft.cl.SetStore(st) - b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) + b.raft.cl.SetStore(storage.st) + b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.be)) b.raft.cl.Recover(api.UpdateCapability) - if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.beExist { bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 8923fd835..d99491c05 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.nodeID.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.nodeID.String()) + sstats := stats.NewServerStats(cfg.Name, b.cluster.nodeID.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -320,21 +320,21 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { errorc: make(chan error, 1), v2store: b.storage.st, snapshotter: b.ss, - r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w), - id: b.storage.cluster.nodeID, + r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w), + id: b.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.storage.cluster.raft.cl, + cluster: b.cluster.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.nodeID), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.storage.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } - serverID.With(prometheus.Labels{"server_id": b.storage.cluster.nodeID.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -403,9 +403,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.storage.cluster.nodeID, + ID: b.cluster.nodeID, URLs: cfg.PeerURLs, - ClusterID: b.storage.cluster.raft.cl.ID(), + ClusterID: b.cluster.raft.cl.ID(), Raft: srv, Snapshotter: b.ss, ServerStats: sstats, @@ -416,13 +416,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } // add all remotes into transport - for _, m := range b.storage.cluster.remotes { - if m.ID != b.storage.cluster.nodeID { + for _, m := range b.cluster.remotes { + if m.ID != b.cluster.nodeID { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range b.storage.cluster.raft.cl.Members() { - if m.ID != b.storage.cluster.nodeID { + for _, m := range b.cluster.raft.cl.Members() { + if m.ID != b.cluster.nodeID { tr.AddPeer(m.ID, m.PeerURLs) } } From 0211f5a2e80e585761829c93e5012750c9dc0689 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 6 Aug 2021 17:53:23 +0200 Subject: [PATCH 07/18] server: Move snapshot recovery to separate function --- server/etcdserver/bootstrap.go | 116 +++++++++++++++++---------------- 1 file changed, 61 insertions(+), 55 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 4b1859487..d37af2190 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -324,64 +324,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora zap.String("bwal-dir", cfg.WALDir()), ) } - - // Find a snapshot to start/restart a raft node - walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) + snapshot, err := recoverSnapshot(cfg, storage, ss) if err != nil { return nil, err } - // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding - // bwal log entries - snapshot, err := ss.LoadNewestAvailable(walSnaps) - if err != nil && err != snap.ErrNoSnapshot { - return nil, err - } - - if snapshot != nil { - if err = storage.st.Recovery(snapshot.Data); err != nil { - cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) - } - - if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, storage.st, cfg.V2Deprecation); err != nil { - cfg.Logger.Error("illegal v2store content", zap.Error(err)) - return nil, err - } - - cfg.Logger.Info( - "recovered v2 store from snapshot", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), - ) - - if storage.be, err = serverstorage.RecoverSnapshotBackend(cfg, storage.be, *snapshot, storage.beExist, storage.beHooks); err != nil { - cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) - } - s1, s2 := storage.be.Size(), storage.be.SizeInUse() - cfg.Logger.Info( - "recovered v3 backend from snapshot", - zap.Int64("backend-size-bytes", s1), - zap.String("backend-size", humanize.Bytes(uint64(s1))), - zap.Int64("backend-size-in-use-bytes", s2), - zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), - ) - if storage.beExist { - // TODO: remove kvindex != 0 checking when we do not expect users to upgrade - // etcd from pre-3.0 release. - kvindex := storage.ci.ConsistentIndex() - if kvindex < snapshot.Metadata.Index { - if kvindex != 0 { - return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) - } - cfg.Logger.Warn( - "consistent index was never saved", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - ) - } - } - } else { - cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") - } - bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) b := &bootstrapedCluster{ @@ -422,6 +368,66 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora return b, nil } +func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*raftpb.Snapshot, error) { + // Find a snapshot to start/restart a raft node + walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) + if err != nil { + return nil, err + } + // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding + // bwal log entries + snapshot, err = ss.LoadNewestAvailable(walSnaps) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err + } + + if snapshot != nil { + if err = storage.st.Recovery(snapshot.Data); err != nil { + cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) + } + + if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, storage.st, cfg.V2Deprecation); err != nil { + cfg.Logger.Error("illegal v2store content", zap.Error(err)) + return nil, err + } + + cfg.Logger.Info( + "recovered v2 store from snapshot", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), + ) + + if storage.be, err = serverstorage.RecoverSnapshotBackend(cfg, storage.be, *snapshot, storage.beExist, storage.beHooks); err != nil { + cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) + } + s1, s2 := be.Size(), be.SizeInUse() + cfg.Logger.Info( + "recovered v3 backend from snapshot", + zap.Int64("backend-size-bytes", s1), + zap.String("backend-size", humanize.Bytes(uint64(s1))), + zap.Int64("backend-size-in-use-bytes", s2), + zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), + ) + if storage.beExist { + // TODO: remove kvindex != 0 checking when we do not expect users to upgrade + // etcd from pre-3.0 release. + kvindex := storage.ci.ConsistentIndex() + if kvindex < snapshot.Metadata.Index { + if kvindex != 0 { + return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) + } + cfg.Logger.Warn( + "consistent index was never saved", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + ) + } + } + } else { + cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") + } + return snapshot, nil +} + func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft { member := cl.MemberByName(cfg.Name) peers := make([]raft.Peer, len(ids)) From 7c8f7166e738eac7e5f0a1003cc910f66ffe198a Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 13:14:57 +0200 Subject: [PATCH 08/18] server: Move bootstraping backend from snapshot to bootstrapBackend --- server/etcdserver/bootstrap.go | 106 +++++++++++++++++++-------------- server/etcdserver/server.go | 8 +-- 2 files changed, 64 insertions(+), 50 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index d37af2190..6e6563aee 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -73,14 +73,15 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { return nil, fmt.Errorf("cannot access member directory: %v", terr) } - storage, err := bootstrapStorage(cfg, ss, prt) + haveWAL := wal.Exist(cfg.WALDir()) + storage, err := bootstrapStorage(cfg, haveWAL, ss, prt) if err != nil { return nil, err } - cluster, err := bootstrapCluster(cfg, storage, prt, ss) + cluster, err := bootstrapCluster(cfg, haveWAL, storage, prt, ss) if err != nil { - storage.be.Close() + storage.backend.be.Close() return nil, err } return &bootstrappedServer{ @@ -99,11 +100,16 @@ type bootstrappedServer struct { } type bootstrappedStorage struct { - beHooks *serverstorage.BackendHooks + backend *bootstrappedBackend st v2store.Store - be backend.Backend - ci cindex.ConsistentIndexer - beExist bool +} + +type bootstrappedBackend struct { + beHooks *serverstorage.BackendHooks + be backend.Backend + ci cindex.ConsistentIndexer + beExist bool + snapshot *raftpb.Snapshot } type bootstrapedCluster struct { @@ -123,19 +129,17 @@ type bootstrappedRaft struct { storage *raft.MemoryStorage } -func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { +func bootstrapStorage(cfg config.ServerConfig, haveWAL bool, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) - be, ci, beExist, beHooks, err := bootstrapBackend(cfg) + backend, err := bootstrapBackend(cfg, haveWAL, st, ss) if err != nil { return nil, err } + return &bootstrappedStorage{ - beHooks: beHooks, + backend: backend, st: st, - be: be, - ci: ci, - beExist: beExist, }, nil } @@ -160,11 +164,11 @@ func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { return snap.New(cfg.Logger, cfg.SnapDir()) } -func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *serverstorage.BackendHooks, err error) { - beExist = fileutil.Exist(cfg.BackendPath()) - ci = cindex.NewConsistentIndex(nil) - beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci) - be = serverstorage.OpenBackend(cfg, beHooks) +func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) { + beExist := fileutil.Exist(cfg.BackendPath()) + ci := cindex.NewConsistentIndex(nil) + beHooks := serverstorage.NewBackendHooks(cfg.Logger, ci) + be := serverstorage.OpenBackend(cfg, beHooks) defer func() { if err != nil && be != nil { be.Close() @@ -175,20 +179,35 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { err = maybeDefragBackend(cfg, be) if err != nil { - return nil, nil, false, nil, err + return nil, err } } cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex())) // TODO(serathius): Implement schema setup in fresh storage + var ( + snapshot *raftpb.Snapshot + ) + if haveWAL { + snapshot, be, err = recoverSnapshot(cfg, st, be, beExist, beHooks, ci, ss) + if err != nil { + return nil, err + } + } if beExist { err = schema.Validate(cfg.Logger, be.BatchTx()) if err != nil { cfg.Logger.Error("Failed to validate schema", zap.Error(err)) - return nil, nil, false, nil, err + return nil, err } } - return be, ci, beExist, beHooks, nil + return &bootstrappedBackend{ + beHooks: beHooks, + be: be, + ci: ci, + beExist: beExist, + snapshot: snapshot, + }, nil } func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { @@ -210,15 +229,14 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { return be.Defrag() } -func bootstrapCluster(cfg config.ServerConfig, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) { - haveWAL := wal.Exist(cfg.WALDir()) +func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) { switch { case !haveWAL && !cfg.NewCluster: - c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.be) + c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be) case !haveWAL && cfg.NewCluster: - c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.be) + c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be) case haveWAL: - c, err = bootstrapClusterWithWAL(cfg, storage, ss) + c, err = bootstrapClusterWithWAL(cfg, storage) default: return nil, fmt.Errorf("unsupported bootstrap config") } @@ -309,7 +327,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st }, nil } -func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*bootstrapedCluster, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -324,11 +342,7 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora zap.String("bwal-dir", cfg.WALDir()), ) } - snapshot, err := recoverSnapshot(cfg, storage, ss) - if err != nil { - return nil, err - } - bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) + bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync) b := &bootstrapedCluster{ wal: bwal, @@ -358,9 +372,9 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) b.raft.cl.SetStore(storage.st) - b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.be)) + b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be)) b.raft.cl.Recover(api.UpdateCapability) - if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.beExist { + if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist { bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) @@ -368,27 +382,27 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora return b, nil } -func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*raftpb.Snapshot, error) { +func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) { // Find a snapshot to start/restart a raft node walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) if err != nil { - return nil, err + return nil, be, err } // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding // bwal log entries - snapshot, err = ss.LoadNewestAvailable(walSnaps) + snapshot, err := ss.LoadNewestAvailable(walSnaps) if err != nil && err != snap.ErrNoSnapshot { - return nil, err + return nil, be, err } if snapshot != nil { - if err = storage.st.Recovery(snapshot.Data); err != nil { + if err = st.Recovery(snapshot.Data); err != nil { cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) } - if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, storage.st, cfg.V2Deprecation); err != nil { + if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { cfg.Logger.Error("illegal v2store content", zap.Error(err)) - return nil, err + return nil, be, err } cfg.Logger.Info( @@ -397,7 +411,7 @@ func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss * zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), ) - if storage.be, err = serverstorage.RecoverSnapshotBackend(cfg, storage.be, *snapshot, storage.beExist, storage.beHooks); err != nil { + if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } s1, s2 := be.Size(), be.SizeInUse() @@ -408,13 +422,13 @@ func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss * zap.Int64("backend-size-in-use-bytes", s2), zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), ) - if storage.beExist { + if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. - kvindex := storage.ci.ConsistentIndex() + kvindex := ci.ConsistentIndex() if kvindex < snapshot.Metadata.Index { if kvindex != 0 { - return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) + return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) } cfg.Logger.Warn( "consistent index was never saved", @@ -425,7 +439,7 @@ func recoverSnapshot(cfg config.ServerConfig, storage *bootstrappedStorage, ss * } else { cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } - return snapshot, nil + return snapshot, be, nil } func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index d99491c05..c13af0cc7 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -304,7 +304,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { defer func() { if err != nil { - b.storage.be.Close() + b.storage.backend.be.Close() } }() @@ -330,7 +330,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { peerRt: b.prt, reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: b.storage.ci, + consistIndex: b.storage.backend.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } @@ -338,8 +338,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) - srv.be = b.storage.be - srv.beHooks = b.storage.beHooks + srv.be = b.storage.backend.be + srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. From 8b0d8ea2afd737a40737b69699d38d9d98acb045 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 13:39:29 +0200 Subject: [PATCH 09/18] server: Move cluster up the bootstrap hierarchy --- server/etcdserver/bootstrap.go | 63 ++++++++++++++++------------------ server/etcdserver/server.go | 10 +++--- 2 files changed, 35 insertions(+), 38 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 6e6563aee..fe7c64ea4 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -113,10 +113,11 @@ type bootstrappedBackend struct { } type bootstrapedCluster struct { - raft *bootstrappedRaft - remotes []*membership.Member - wal *bootstrappedWAL - clusterID, nodeID types.ID + raft *bootstrappedRaft + remotes []*membership.Member + wal *bootstrappedWAL + cl *membership.RaftCluster + nodeID types.ID } type bootstrappedRaft struct { @@ -125,7 +126,6 @@ type bootstrappedRaft struct { peers []raft.Peer config *raft.Config - cl *membership.RaftCluster storage *raft.MemoryStorage } @@ -274,11 +274,11 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe br := bootstrapRaftFromCluster(cfg, cl, nil, bwal) cl.SetID(member.ID, existingCluster.ID()) return &bootstrapedCluster{ - raft: br, - remotes: remotes, - wal: bwal, - clusterID: cl.ID(), - nodeID: member.ID, + raft: br, + remotes: remotes, + wal: bwal, + cl: cl, + nodeID: member.ID, }, nil } @@ -319,11 +319,11 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal) cl.SetID(member.ID, cl.ID()) return &bootstrapedCluster{ - remotes: nil, - raft: br, - wal: bwal, - clusterID: cl.ID(), - nodeID: member.ID, + remotes: nil, + raft: br, + wal: bwal, + cl: cl, + nodeID: member.ID, }, nil } @@ -343,12 +343,6 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora ) } bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync) - - b := &bootstrapedCluster{ - wal: bwal, - clusterID: meta.clusterID, - nodeID: meta.nodeID, - } if cfg.ForceNewCluster { // discard the previously uncommitted entries bwal.ents = bwal.CommitedEntries() @@ -369,17 +363,24 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora zap.Uint64("commit-index", bwal.st.Commit), ) } - b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) + cl := membership.NewCluster(cfg.Logger) + cl.SetID(meta.nodeID, meta.clusterID) + raft := bootstrapRaftFromSnapshot(cfg, bwal, meta) - b.raft.cl.SetStore(storage.st) - b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be)) - b.raft.cl.Recover(api.UpdateCapability) - if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist { + cl.SetStore(storage.st) + cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be)) + cl.Recover(api.UpdateCapability) + if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist { bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) } - return b, nil + return &bootstrapedCluster{ + raft: raft, + wal: bwal, + cl: cl, + nodeID: meta.nodeID, + }, nil } func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) { @@ -462,7 +463,6 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - cl: cl, config: raftConfig(cfg, uint64(member.ID), s), peers: peers, storage: s, @@ -470,13 +470,10 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste } func bootstrapRaftFromSnapshot(cfg config.ServerConfig, bwal *bootstrappedWAL, meta *snapshotMetadata) *bootstrappedRaft { - cl := membership.NewCluster(cfg.Logger) - cl.SetID(meta.nodeID, meta.clusterID) s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - cl: cl, config: raftConfig(cfg, uint64(meta.nodeID), s), storage: s, } @@ -496,7 +493,7 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft } } -func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raftNode { +func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode { var n raft.Node if len(b.peers) == 0 { n = raft.RestartNode(b.config) @@ -509,7 +506,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raft return newRaftNode( raftNodeConfig{ lg: b.lg, - isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index c13af0cc7..421f7d059 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -308,7 +308,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.cluster.nodeID.String()) + sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String()) lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond @@ -320,10 +320,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { errorc: make(chan error, 1), v2store: b.storage.st, snapshotter: b.ss, - r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w), + r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl), id: b.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.cluster.raft.cl, + cluster: b.cluster.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), @@ -405,7 +405,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { DialTimeout: cfg.PeerDialTimeout(), ID: b.cluster.nodeID, URLs: cfg.PeerURLs, - ClusterID: b.cluster.raft.cl.ID(), + ClusterID: b.cluster.cl.ID(), Raft: srv, Snapshotter: b.ss, ServerStats: sstats, @@ -421,7 +421,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range b.cluster.raft.cl.Members() { + for _, m := range b.cluster.cl.Members() { if m.ID != b.cluster.nodeID { tr.AddPeer(m.ID, m.PeerURLs) } From 5d044563a8cca851d773d58e8905af6ab4cff340 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 13:55:22 +0200 Subject: [PATCH 10/18] server: Move raft and wal up the bootstrap hierarchy --- server/etcdserver/bootstrap.go | 97 +++++++++++++++++----------------- 1 file changed, 49 insertions(+), 48 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index fe7c64ea4..d923a595b 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -233,17 +233,57 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp switch { case !haveWAL && !cfg.NewCluster: c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be) + if err != nil { + return nil, err + } + c.wal = bootstrapNewWAL(cfg, c) + c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal) + c.cl.SetID(c.nodeID, c.cl.ID()) + return c, nil case !haveWAL && cfg.NewCluster: c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be) + if err != nil { + return nil, err + } + c.wal = bootstrapNewWAL(cfg, c) + c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal) + c.cl.SetID(c.nodeID, c.cl.ID()) + return c, nil case haveWAL: - c, err = bootstrapClusterWithWAL(cfg, storage) + bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync) + if cfg.ForceNewCluster { + // discard the previously uncommitted entries + bwal.ents = bwal.CommitedEntries() + entries := bwal.ConfigChangeEntries(meta) + // force commit config change entries + bwal.AppendAndCommitEntries(entries) + cfg.Logger.Info( + "forcing restart member", + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } else { + cfg.Logger.Info( + "restarting local member", + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } + c, err = bootstrapClusterWithWAL(cfg, storage, meta) + if err != nil { + return nil, err + } + if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { + return nil, fmt.Errorf("cannot write to WAL directory: %v", err) + } + c.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) + c.wal = bwal + return c, nil default: return nil, fmt.Errorf("unsupported bootstrap config") } - if err != nil { - return nil, err - } - return c, nil } func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { @@ -270,13 +310,8 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) member := cl.MemberByName(cfg.Name) - bwal := bootstrapNewWAL(cfg, member, cl) - br := bootstrapRaftFromCluster(cfg, cl, nil, bwal) - cl.SetID(member.ID, existingCluster.ID()) return &bootstrapedCluster{ - raft: br, remotes: remotes, - wal: bwal, cl: cl, nodeID: member.ID, }, nil @@ -315,58 +350,26 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) member := cl.MemberByName(cfg.Name) - bwal := bootstrapNewWAL(cfg, member, cl) - br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal) - cl.SetID(member.ID, cl.ID()) return &bootstrapedCluster{ remotes: nil, - raft: br, - wal: bwal, cl: cl, nodeID: member.ID, }, nil } -func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage) (*bootstrapedCluster, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, meta *snapshotMetadata) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } - if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { - return nil, fmt.Errorf("cannot write to WAL directory: %v", err) - } - if cfg.ShouldDiscover() { cfg.Logger.Warn( "discovery token is ignored since cluster already initialized; valid logs are found", zap.String("bwal-dir", cfg.WALDir()), ) } - bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync) - if cfg.ForceNewCluster { - // discard the previously uncommitted entries - bwal.ents = bwal.CommitedEntries() - entries := bwal.ConfigChangeEntries(meta) - // force commit config change entries - bwal.AppendAndCommitEntries(entries) - cfg.Logger.Info( - "forcing restart member", - zap.String("cluster-id", meta.clusterID.String()), - zap.String("local-member-id", meta.nodeID.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) - } else { - cfg.Logger.Info( - "restarting local member", - zap.String("cluster-id", meta.clusterID.String()), - zap.String("local-member-id", meta.nodeID.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) - } cl := membership.NewCluster(cfg.Logger) cl.SetID(meta.nodeID, meta.clusterID) - raft := bootstrapRaftFromSnapshot(cfg, bwal, meta) - cl.SetStore(storage.st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be)) cl.Recover(api.UpdateCapability) @@ -376,8 +379,6 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) } return &bootstrapedCluster{ - raft: raft, - wal: bwal, cl: cl, nodeID: meta.nodeID, }, nil @@ -566,11 +567,11 @@ type snapshotMetadata struct { nodeID, clusterID types.ID } -func bootstrapNewWAL(cfg config.ServerConfig, m *membership.Member, cl *membership.RaftCluster) *bootstrappedWAL { +func bootstrapNewWAL(cfg config.ServerConfig, cluster *bootstrapedCluster) *bootstrappedWAL { metadata := pbutil.MustMarshal( &etcdserverpb.Metadata{ - NodeID: uint64(m.ID), - ClusterID: uint64(cl.ID()), + NodeID: uint64(cluster.nodeID), + ClusterID: uint64(cluster.cl.ID()), }, ) w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) From 138afa5be9609a2f3c7cdfbc8c3dc2cd5d24781f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 14:37:23 +0200 Subject: [PATCH 11/18] server: Split wal, cluster and raft bootstraping --- server/etcdserver/bootstrap.go | 134 +++++++++++++++++++-------------- 1 file changed, 77 insertions(+), 57 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index d923a595b..9d905e61f 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -74,20 +74,20 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { } haveWAL := wal.Exist(cfg.WALDir()) - storage, err := bootstrapStorage(cfg, haveWAL, ss, prt) + s, err := bootstrapStorage(cfg, haveWAL, ss, prt) if err != nil { return nil, err } - cluster, err := bootstrapCluster(cfg, haveWAL, storage, prt, ss) + cluster, err := bootstrapCluster(cfg, haveWAL, s, prt, ss) if err != nil { - storage.backend.be.Close() + s.backend.be.Close() return nil, err } return &bootstrappedServer{ prt: prt, ss: ss, - storage: storage, + storage: s, cluster: cluster, }, nil } @@ -230,60 +230,53 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { } func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) { + c = &bootstrapedCluster{} + var ( + meta *snapshotMetadata + bwal *bootstrappedWAL + ) + if haveWAL { + if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { + return nil, fmt.Errorf("cannot write to WAL directory: %v", err) + } + bwal, meta = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot) + } + switch { case !haveWAL && !cfg.NewCluster: c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be) if err != nil { return nil, err } - c.wal = bootstrapNewWAL(cfg, c) - c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal) - c.cl.SetID(c.nodeID, c.cl.ID()) - return c, nil + c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) case !haveWAL && cfg.NewCluster: c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be) if err != nil { return nil, err } - c.wal = bootstrapNewWAL(cfg, c) - c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal) - c.cl.SetID(c.nodeID, c.cl.ID()) - return c, nil + c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) case haveWAL: - bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync) - if cfg.ForceNewCluster { - // discard the previously uncommitted entries - bwal.ents = bwal.CommitedEntries() - entries := bwal.ConfigChangeEntries(meta) - // force commit config change entries - bwal.AppendAndCommitEntries(entries) - cfg.Logger.Info( - "forcing restart member", - zap.String("cluster-id", meta.clusterID.String()), - zap.String("local-member-id", meta.nodeID.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) - } else { - cfg.Logger.Info( - "restarting local member", - zap.String("cluster-id", meta.clusterID.String()), - zap.String("local-member-id", meta.nodeID.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) - } c, err = bootstrapClusterWithWAL(cfg, storage, meta) if err != nil { return nil, err } - if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { - return nil, fmt.Errorf("cannot write to WAL directory: %v", err) - } - c.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) c.wal = bwal - return c, nil default: return nil, fmt.Errorf("unsupported bootstrap config") } + switch { + case !haveWAL && !cfg.NewCluster: + c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal) + c.cl.SetID(c.nodeID, c.cl.ID()) + case !haveWAL && cfg.NewCluster: + c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal) + c.cl.SetID(c.nodeID, c.cl.ID()) + case haveWAL: + c.raft = bootstrapRaftFromSnapshot(cfg, c.wal, meta) + default: + return nil, fmt.Errorf("unsupported bootstrap config") + } + return c, nil } func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { @@ -516,21 +509,54 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *m ) } -// bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear +func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*bootstrappedWAL, *snapshotMetadata) { + wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot) + bwal := &bootstrappedWAL{ + lg: cfg.Logger, + w: wal, + st: st, + ents: ents, + snapshot: snap, + } + + if cfg.ForceNewCluster { + // discard the previously uncommitted entries + bwal.ents = bwal.CommitedEntries() + entries := bwal.ConfigChangeEntries(meta) + // force commit config change entries + bwal.AppendAndCommitEntries(entries) + cfg.Logger.Info( + "forcing restart member", + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } else { + cfg.Logger.Info( + "restarting local member", + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } + return bwal, meta +} + +// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. // The snap must have been previously saved to the WAL, or this call will panic. -func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) (*bootstrappedWAL, *snapshotMetadata) { +func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } repaired := false for { - w, err := wal.Open(lg, waldir, walsnap) + w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap) if err != nil { - lg.Fatal("failed to open WAL", zap.Error(err)) + cfg.Logger.Fatal("failed to open WAL", zap.Error(err)) } - if unsafeNoFsync { + if cfg.UnsafeNoFsync { w.SetUnsafeNoFsync() } wmetadata, st, ents, err := w.ReadAll() @@ -538,12 +564,12 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { - lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) + cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) } - if !wal.Repair(lg, waldir) { - lg.Fatal("failed to repair WAL", zap.Error(err)) + if !wal.Repair(cfg.Logger, cfg.WALDir()) { + cfg.Logger.Fatal("failed to repair WAL", zap.Error(err)) } else { - lg.Info("repaired WAL", zap.Error(err)) + cfg.Logger.Info("repaired WAL", zap.Error(err)) repaired = true } continue @@ -553,13 +579,7 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn id := types.ID(metadata.NodeID) cid := types.ID(metadata.ClusterID) meta := &snapshotMetadata{clusterID: cid, nodeID: id} - return &bootstrappedWAL{ - lg: lg, - w: w, - st: &st, - ents: ents, - snapshot: snapshot, - }, meta + return w, &st, ents, snapshot, meta } } @@ -567,11 +587,11 @@ type snapshotMetadata struct { nodeID, clusterID types.ID } -func bootstrapNewWAL(cfg config.ServerConfig, cluster *bootstrapedCluster) *bootstrappedWAL { +func bootstrapNewWAL(cfg config.ServerConfig, cl *membership.RaftCluster, nodeID types.ID) *bootstrappedWAL { metadata := pbutil.MustMarshal( &etcdserverpb.Metadata{ - NodeID: uint64(cluster.nodeID), - ClusterID: uint64(cluster.cl.ID()), + NodeID: uint64(nodeID), + ClusterID: uint64(cl.ID()), }, ) w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) From 049e2d6ec0951f6b4e6be0f7d6c2cc8ebda65c97 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 15:04:28 +0200 Subject: [PATCH 12/18] server: Move raft up the bootstrap hierarchy --- server/etcdserver/bootstrap.go | 55 +++++++++++++++++++--------------- server/etcdserver/server.go | 2 +- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 9d905e61f..9bb3e8155 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -72,7 +72,6 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.Errorf("cannot access member directory: %v", terr) } - haveWAL := wal.Exist(cfg.WALDir()) s, err := bootstrapStorage(cfg, haveWAL, ss, prt) if err != nil { @@ -84,17 +83,23 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { s.backend.be.Close() return nil, err } + raft := bootstrapRaft(cfg, haveWAL, cluster.cl, cluster.wal) + if !haveWAL { + cluster.cl.SetID(cluster.nodeID, cluster.cl.ID()) + } return &bootstrappedServer{ prt: prt, ss: ss, storage: s, cluster: cluster, + raft: raft, }, nil } type bootstrappedServer struct { storage *bootstrappedStorage cluster *bootstrapedCluster + raft *bootstrappedRaft prt http.RoundTripper ss *snap.Snapshotter } @@ -113,7 +118,6 @@ type bootstrappedBackend struct { } type bootstrapedCluster struct { - raft *bootstrappedRaft remotes []*membership.Member wal *bootstrappedWAL cl *membership.RaftCluster @@ -232,14 +236,13 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) { c = &bootstrapedCluster{} var ( - meta *snapshotMetadata bwal *bootstrappedWAL ) if haveWAL { if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { return nil, fmt.Errorf("cannot write to WAL directory: %v", err) } - bwal, meta = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot) + bwal = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot) } switch { @@ -256,7 +259,7 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp } c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) case haveWAL: - c, err = bootstrapClusterWithWAL(cfg, storage, meta) + c, err = bootstrapClusterWithWAL(cfg, storage, bwal.meta) if err != nil { return nil, err } @@ -264,18 +267,6 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp default: return nil, fmt.Errorf("unsupported bootstrap config") } - switch { - case !haveWAL && !cfg.NewCluster: - c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal) - c.cl.SetID(c.nodeID, c.cl.ID()) - case !haveWAL && cfg.NewCluster: - c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal) - c.cl.SetID(c.nodeID, c.cl.ID()) - case haveWAL: - c.raft = bootstrapRaftFromSnapshot(cfg, c.wal, meta) - default: - return nil, fmt.Errorf("unsupported bootstrap config") - } return c, nil } @@ -437,6 +428,20 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe return snapshot, be, nil } +func bootstrapRaft(cfg config.ServerConfig, haveWAL bool, cl *membership.RaftCluster, bwal *bootstrappedWAL) *bootstrappedRaft { + switch { + case !haveWAL && !cfg.NewCluster: + return bootstrapRaftFromCluster(cfg, cl, nil, bwal) + case !haveWAL && cfg.NewCluster: + return bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal) + case haveWAL: + return bootstrapRaftFromWAL(cfg, bwal) + default: + cfg.Logger.Panic("unsupported bootstrap config") + return nil + } +} + func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft { member := cl.MemberByName(cfg.Name) peers := make([]raft.Peer, len(ids)) @@ -463,12 +468,12 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste } } -func bootstrapRaftFromSnapshot(cfg config.ServerConfig, bwal *bootstrappedWAL, meta *snapshotMetadata) *bootstrappedRaft { +func bootstrapRaftFromWAL(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft { s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - config: raftConfig(cfg, uint64(meta.nodeID), s), + config: raftConfig(cfg, uint64(bwal.meta.nodeID), s), storage: s, } } @@ -509,7 +514,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *m ) } -func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*bootstrappedWAL, *snapshotMetadata) { +func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedWAL { wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot) bwal := &bootstrappedWAL{ lg: cfg.Logger, @@ -517,12 +522,13 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot st: st, ents: ents, snapshot: snap, + meta: meta, } if cfg.ForceNewCluster { // discard the previously uncommitted entries bwal.ents = bwal.CommitedEntries() - entries := bwal.ConfigChangeEntries(meta) + entries := bwal.ConfigChangeEntries() // force commit config change entries bwal.AppendAndCommitEntries(entries) cfg.Logger.Info( @@ -539,7 +545,7 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot zap.Uint64("commit-index", bwal.st.Commit), ) } - return bwal, meta + return bwal } // openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear @@ -614,6 +620,7 @@ type bootstrappedWAL struct { st *raftpb.HardState ents []raftpb.Entry snapshot *raftpb.Snapshot + meta *snapshotMetadata } func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage { @@ -645,11 +652,11 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { return wal.ents } -func (wal *bootstrappedWAL) ConfigChangeEntries(meta *snapshotMetadata) []raftpb.Entry { +func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry { return serverstorage.CreateConfigChangeEnts( wal.lg, serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents), - uint64(meta.nodeID), + uint64(wal.meta.nodeID), wal.st.Term, wal.st.Commit, ) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 421f7d059..8d55295e1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -320,7 +320,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { errorc: make(chan error, 1), v2store: b.storage.st, snapshotter: b.ss, - r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl), + r: *b.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl), id: b.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.cluster.cl, From d3abf774eaeffafc2e9b1e881d23e70a65570b2b Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 15:20:56 +0200 Subject: [PATCH 13/18] server: Move cluster backend setting up the call hierarchy --- server/etcdserver/bootstrap.go | 37 ++++++++++++++++------------------ 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 9bb3e8155..e081e6bd2 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -247,19 +247,19 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp switch { case !haveWAL && !cfg.NewCluster: - c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be) + c, err = bootstrapExistingClusterNoWAL(cfg, prt) if err != nil { return nil, err } c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) case !haveWAL && cfg.NewCluster: - c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be) + c, err = bootstrapNewClusterNoWAL(cfg, prt) if err != nil { return nil, err } c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) case haveWAL: - c, err = bootstrapClusterWithWAL(cfg, storage, bwal.meta) + c, err = bootstrapClusterWithWAL(cfg, bwal.meta) if err != nil { return nil, err } @@ -267,10 +267,20 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp default: return nil, fmt.Errorf("unsupported bootstrap config") } + c.cl.SetStore(storage.st) + c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be)) + if haveWAL { + c.cl.Recover(api.UpdateCapability) + if c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist { + bepath := cfg.BackendPath() + os.RemoveAll(bepath) + return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) + } + } return c, nil } -func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { +func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } @@ -291,8 +301,6 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe remotes := existingCluster.Members() cl.SetID(types.ID(0), existingCluster.ID()) - cl.SetStore(st) - cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) member := cl.MemberByName(cfg.Name) return &bootstrapedCluster{ remotes: remotes, @@ -301,7 +309,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe }, nil } -func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { +func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) { if err := cfg.VerifyBootstrap(); err != nil { return nil, err } @@ -331,17 +339,14 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st return nil, err } } - cl.SetStore(st) - cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - member := cl.MemberByName(cfg.Name) return &bootstrapedCluster{ remotes: nil, cl: cl, - nodeID: member.ID, + nodeID: m.ID, }, nil } -func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, meta *snapshotMetadata) (*bootstrapedCluster, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, meta *snapshotMetadata) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -354,14 +359,6 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora } cl := membership.NewCluster(cfg.Logger) cl.SetID(meta.nodeID, meta.clusterID) - cl.SetStore(storage.st) - cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be)) - cl.Recover(api.UpdateCapability) - if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist { - bepath := cfg.BackendPath() - os.RemoveAll(bepath) - return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) - } return &bootstrapedCluster{ cl: cl, nodeID: meta.nodeID, From 4884e7d8cf92d8a6d42b94b20807907ace677738 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 15:57:53 +0200 Subject: [PATCH 14/18] server: Move wal bootstrap from cluster to storage --- server/etcdserver/bootstrap.go | 128 +++++++++++++++++---------------- server/etcdserver/server.go | 2 +- 2 files changed, 69 insertions(+), 61 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index e081e6bd2..0cd73ddb1 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -72,21 +72,42 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.Errorf("cannot access member directory: %v", terr) } + haveWAL := wal.Exist(cfg.WALDir()) - s, err := bootstrapStorage(cfg, haveWAL, ss, prt) + st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) + backend, err := bootstrapBackend(cfg, haveWAL, st, ss) if err != nil { return nil, err } + var ( + bwal *bootstrappedWAL + ) + + if haveWAL { + if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { + return nil, fmt.Errorf("cannot write to WAL directory: %v", err) + } + bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot) + } + + cluster, err := bootstrapCluster(cfg, bwal, prt) + if err != nil { + backend.be.Close() + return nil, err + } - cluster, err := bootstrapCluster(cfg, haveWAL, s, prt, ss) + s, err := bootstrapStorage(cfg, st, backend, bwal, cluster) if err != nil { - s.backend.be.Close() + backend.be.Close() return nil, err } - raft := bootstrapRaft(cfg, haveWAL, cluster.cl, cluster.wal) - if !haveWAL { - cluster.cl.SetID(cluster.nodeID, cluster.cl.ID()) + + err = cluster.Finalize(cfg, s) + if err != nil { + backend.be.Close() + return nil, err } + raft := bootstrapRaft(cfg, cluster, s.wal) return &bootstrappedServer{ prt: prt, ss: ss, @@ -106,6 +127,7 @@ type bootstrappedServer struct { type bootstrappedStorage struct { backend *bootstrappedBackend + wal *bootstrappedWAL st v2store.Store } @@ -119,7 +141,6 @@ type bootstrappedBackend struct { type bootstrapedCluster struct { remotes []*membership.Member - wal *bootstrappedWAL cl *membership.RaftCluster nodeID types.ID } @@ -133,17 +154,15 @@ type bootstrappedRaft struct { storage *raft.MemoryStorage } -func bootstrapStorage(cfg config.ServerConfig, haveWAL bool, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { - st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) - - backend, err := bootstrapBackend(cfg, haveWAL, st, ss) - if err != nil { - return nil, err +func bootstrapStorage(cfg config.ServerConfig, st v2store.Store, be *bootstrappedBackend, wal *bootstrappedWAL, cl *bootstrapedCluster) (b *bootstrappedStorage, err error) { + if wal == nil { + wal = bootstrapNewWAL(cfg, cl) } return &bootstrappedStorage{ - backend: backend, + backend: be, st: st, + wal: wal, }, nil } @@ -233,49 +252,19 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { return be.Defrag() } -func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) { - c = &bootstrapedCluster{} - var ( - bwal *bootstrappedWAL - ) - if haveWAL { - if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { - return nil, fmt.Errorf("cannot write to WAL directory: %v", err) - } - bwal = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot) - } - +func bootstrapCluster(cfg config.ServerConfig, bwal *bootstrappedWAL, prt http.RoundTripper) (c *bootstrapedCluster, err error) { switch { - case !haveWAL && !cfg.NewCluster: + case bwal == nil && !cfg.NewCluster: c, err = bootstrapExistingClusterNoWAL(cfg, prt) - if err != nil { - return nil, err - } - c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) - case !haveWAL && cfg.NewCluster: + case bwal == nil && cfg.NewCluster: c, err = bootstrapNewClusterNoWAL(cfg, prt) - if err != nil { - return nil, err - } - c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) - case haveWAL: + case bwal != nil && bwal.haveWAL: c, err = bootstrapClusterWithWAL(cfg, bwal.meta) - if err != nil { - return nil, err - } - c.wal = bwal default: return nil, fmt.Errorf("unsupported bootstrap config") } - c.cl.SetStore(storage.st) - c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be)) - if haveWAL { - c.cl.Recover(api.UpdateCapability) - if c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist { - bepath := cfg.BackendPath() - os.RemoveAll(bepath) - return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) - } + if err != nil { + return nil, err } return c, nil } @@ -425,13 +414,30 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe return snapshot, be, nil } -func bootstrapRaft(cfg config.ServerConfig, haveWAL bool, cl *membership.RaftCluster, bwal *bootstrappedWAL) *bootstrappedRaft { - switch { - case !haveWAL && !cfg.NewCluster: - return bootstrapRaftFromCluster(cfg, cl, nil, bwal) - case !haveWAL && cfg.NewCluster: - return bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal) - case haveWAL: +func (c *bootstrapedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedStorage) error { + if !s.wal.haveWAL { + c.cl.SetID(c.nodeID, c.cl.ID()) + } + c.cl.SetStore(s.st) + c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, s.backend.be)) + if s.wal.haveWAL { + c.cl.Recover(api.UpdateCapability) + if c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) && !s.backend.beExist { + bepath := cfg.BackendPath() + os.RemoveAll(bepath) + return fmt.Errorf("database file (%v) of the backend is missing", bepath) + } + } + return nil +} + +func bootstrapRaft(cfg config.ServerConfig, cluster *bootstrapedCluster, bwal *bootstrappedWAL) *bootstrappedRaft { + switch { + case !bwal.haveWAL && !cfg.NewCluster: + return bootstrapRaftFromCluster(cfg, cluster.cl, nil, bwal) + case !bwal.haveWAL && cfg.NewCluster: + return bootstrapRaftFromCluster(cfg, cluster.cl, cluster.cl.MemberIDs(), bwal) + case bwal.haveWAL: return bootstrapRaftFromWAL(cfg, bwal) default: cfg.Logger.Panic("unsupported bootstrap config") @@ -520,6 +526,7 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot ents: ents, snapshot: snap, meta: meta, + haveWAL: true, } if cfg.ForceNewCluster { @@ -590,11 +597,11 @@ type snapshotMetadata struct { nodeID, clusterID types.ID } -func bootstrapNewWAL(cfg config.ServerConfig, cl *membership.RaftCluster, nodeID types.ID) *bootstrappedWAL { +func bootstrapNewWAL(cfg config.ServerConfig, cl *bootstrapedCluster) *bootstrappedWAL { metadata := pbutil.MustMarshal( &etcdserverpb.Metadata{ - NodeID: uint64(nodeID), - ClusterID: uint64(cl.ID()), + NodeID: uint64(cl.nodeID), + ClusterID: uint64(cl.cl.ID()), }, ) w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) @@ -613,6 +620,7 @@ func bootstrapNewWAL(cfg config.ServerConfig, cl *membership.RaftCluster, nodeID type bootstrappedWAL struct { lg *zap.Logger + haveWAL bool w *wal.WAL st *raftpb.HardState ents []raftpb.Entry diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 8d55295e1..c0c5aa1a0 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -320,7 +320,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { errorc: make(chan error, 1), v2store: b.storage.st, snapshotter: b.ss, - r: *b.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl), + r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl), id: b.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.cluster.cl, From a450dc7f91cde674197331c87dd4cfdba24b6947 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 5 Aug 2021 11:53:47 +0200 Subject: [PATCH 15/18] server: Rename function to NewConfigChangeEntries indicating we are not reading it from wal --- server/etcdserver/bootstrap.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 0cd73ddb1..31d6f6408 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -532,7 +532,7 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot if cfg.ForceNewCluster { // discard the previously uncommitted entries bwal.ents = bwal.CommitedEntries() - entries := bwal.ConfigChangeEntries() + entries := bwal.NewConfigChangeEntries() // force commit config change entries bwal.AppendAndCommitEntries(entries) cfg.Logger.Info( @@ -657,7 +657,7 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { return wal.ents } -func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry { +func (wal *bootstrappedWAL) NewConfigChangeEntries() []raftpb.Entry { return serverstorage.CreateConfigChangeEnts( wal.lg, serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents), From a206ad2c9636fe850775c86bc6ef63e7fe845168 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 5 Aug 2021 11:55:08 +0200 Subject: [PATCH 16/18] server: Rename GetIDs to GetEffectiveNodeIDsFromWalEntries --- server/etcdserver/bootstrap.go | 2 +- server/etcdserver/raft_test.go | 2 +- server/storage/util.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 31d6f6408..318f1f8e3 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -660,7 +660,7 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { func (wal *bootstrappedWAL) NewConfigChangeEntries() []raftpb.Entry { return serverstorage.CreateConfigChangeEnts( wal.lg, - serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents), + serverstorage.GetEffectiveNodeIDsFromWalEntries(wal.lg, wal.snapshot, wal.ents), uint64(wal.meta.nodeID), wal.st.Term, wal.st.Commit, diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 49de844b5..f552f8180 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -67,7 +67,7 @@ func TestGetIDs(t *testing.T) { if tt.confState != nil { snap.Metadata.ConfState = *tt.confState } - idSet := serverstorage.GetIDs(testLogger, &snap, tt.ents) + idSet := serverstorage.GetEffectiveNodeIDsFromWalEntries(testLogger, &snap, tt.ents) if !reflect.DeepEqual(idSet, tt.widSet) { t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet) } diff --git a/server/storage/util.go b/server/storage/util.go index bdac72ec1..252e74f92 100644 --- a/server/storage/util.go +++ b/server/storage/util.go @@ -109,13 +109,13 @@ func CreateConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, ind return ents } -// GetIDs returns an ordered set of IDs included in the given snapshot and +// GetEffectiveNodeIDsFromWalEntries returns an ordered set of IDs included in the given snapshot and // the entries. The given snapshot/entries can contain three kinds of // ID-related entry: // - ConfChangeAddNode, in which case the contained ID will Be added into the set. // - ConfChangeRemoveNode, in which case the contained ID will Be removed from the set. // - ConfChangeAddLearnerNode, in which the contained ID will Be added into the set. -func GetIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { +func GetEffectiveNodeIDsFromWalEntries(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { ids := make(map[uint64]bool) if snap != nil { for _, id := range snap.Metadata.ConfState.Voters { From 39f92a32ca0f32be80bf9845637bf70d2529c015 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 6 Aug 2021 17:21:24 +0200 Subject: [PATCH 17/18] server: Move member dir creation up and introduce Close method to bootstrap structs --- server/etcdserver/bootstrap.go | 28 ++++++++++++++++++++-------- server/etcdserver/server.go | 2 +- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 318f1f8e3..b25684db8 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -63,16 +63,16 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil { return nil, fmt.Errorf("cannot access data directory: %v", terr) } + + if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { + return nil, fmt.Errorf("cannot access member directory: %v", terr) + } ss := bootstrapSnapshot(cfg) prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout()) if err != nil { return nil, err } - if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { - return nil, fmt.Errorf("cannot access member directory: %v", terr) - } - haveWAL := wal.Exist(cfg.WALDir()) st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) backend, err := bootstrapBackend(cfg, haveWAL, st, ss) @@ -92,19 +92,19 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { cluster, err := bootstrapCluster(cfg, bwal, prt) if err != nil { - backend.be.Close() + backend.Close() return nil, err } s, err := bootstrapStorage(cfg, st, backend, bwal, cluster) if err != nil { - backend.be.Close() + backend.Close() return nil, err } err = cluster.Finalize(cfg, s) if err != nil { - backend.be.Close() + backend.Close() return nil, err } raft := bootstrapRaft(cfg, cluster, s.wal) @@ -125,12 +125,20 @@ type bootstrappedServer struct { ss *snap.Snapshotter } +func (s *bootstrappedServer) Close() { + s.storage.Close() +} + type bootstrappedStorage struct { backend *bootstrappedBackend wal *bootstrappedWAL st v2store.Store } +func (s *bootstrappedStorage) Close() { + s.backend.Close() +} + type bootstrappedBackend struct { beHooks *serverstorage.BackendHooks be backend.Backend @@ -139,6 +147,10 @@ type bootstrappedBackend struct { snapshot *raftpb.Snapshot } +func (s *bootstrappedBackend) Close() { + s.be.Close() +} + type bootstrapedCluster struct { remotes []*membership.Member cl *membership.RaftCluster @@ -343,7 +355,7 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, meta *snapshotMetadata) (* if cfg.ShouldDiscover() { cfg.Logger.Warn( "discovery token is ignored since cluster already initialized; valid logs are found", - zap.String("bwal-dir", cfg.WALDir()), + zap.String("wal-dir", cfg.WALDir()), ) } cl := membership.NewCluster(cfg.Logger) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index c0c5aa1a0..310f436e5 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -304,7 +304,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { defer func() { if err != nil { - b.storage.backend.be.Close() + b.Close() } }() From 35db0a581723d240da756468d3a70e283aaf2db6 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 19 Aug 2021 16:39:33 +0200 Subject: [PATCH 18/18] server: Refactor databaseFileMissing function --- server/etcdserver/bootstrap.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index b25684db8..f1f74fea9 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -434,7 +434,7 @@ func (c *bootstrapedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedSt c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, s.backend.be)) if s.wal.haveWAL { c.cl.Recover(api.UpdateCapability) - if c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) && !s.backend.beExist { + if c.databaseFileMissing(s) { bepath := cfg.BackendPath() os.RemoveAll(bepath) return fmt.Errorf("database file (%v) of the backend is missing", bepath) @@ -443,6 +443,11 @@ func (c *bootstrapedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedSt return nil } +func (c *bootstrapedCluster) databaseFileMissing(s *bootstrappedStorage) bool { + v3Cluster := c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) + return v3Cluster && !s.backend.beExist +} + func bootstrapRaft(cfg config.ServerConfig, cluster *bootstrapedCluster, bwal *bootstrappedWAL) *bootstrappedRaft { switch { case !bwal.haveWAL && !cfg.NewCluster: