From db06a4ab28e23170f6cefc70e7b40e2c9410d11d Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 21 Jul 2021 11:51:18 +0200 Subject: [PATCH] server: Move wal bootstrap up the hierarchy --- server/etcdserver/bootstrap.go | 104 ++++++++++++++------------------- server/etcdserver/server.go | 18 +++--- 2 files changed, 52 insertions(+), 70 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 60ffc5fc6..aafb37969 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -101,6 +101,7 @@ type bootstrappedStorage struct { type bootstrapedCluster struct { raft *bootstrappedRaft remotes []*membership.Member + wal *bootstrappedWAL } func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { @@ -123,7 +124,7 @@ func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.Ro case !haveWAL && cfg.NewCluster: c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) case haveWAL: - c, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) + c, err = bootstrapClusterWithWAL(cfg, st, be, ss, beExist, beHooks, ci) default: be.Close() return nil, fmt.Errorf("unsupported bootstrap config") @@ -234,11 +235,13 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - br := bootstrapRaftFromCluster(cfg, cl, nil) - cl.SetID(br.wal.id, existingCluster.ID()) + bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID()) + br := bootstrapRaftFromCluster(cfg, cl, nil, bwal) + cl.SetID(bwal.id, existingCluster.ID()) return &bootstrapedCluster{ raft: br, remotes: remotes, + wal: bwal, }, nil } @@ -274,15 +277,17 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } cl.SetStore(st) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs()) - cl.SetID(br.wal.id, cl.ID()) + bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID()) + br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal) + cl.SetID(bwal.id, cl.ID()) return &bootstrapedCluster{ remotes: nil, raft: br, + wal: bwal, }, nil } -func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -294,7 +299,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back if cfg.ShouldDiscover() { cfg.Logger.Warn( "discovery token is ignored since cluster already initialized; valid logs are found", - zap.String("wal-dir", cfg.WALDir()), + zap.String("bwal-dir", cfg.WALDir()), ) } @@ -304,7 +309,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back return nil, err } // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding - // wal log entries + // bwal log entries snapshot, err := ss.LoadNewestAvailable(walSnaps) if err != nil && err != snap.ErrNoSnapshot { return nil, err @@ -355,12 +360,31 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } - b := &bootstrapedCluster{} - if !cfg.ForceNewCluster { - b.raft = bootstrapRaftFromWal(cfg, snapshot) - } else { - b.raft = bootstrapRaftFromWalStandalone(cfg, snapshot) + bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) + b := &bootstrapedCluster{ + wal: bwal, } + if cfg.ForceNewCluster { + // discard the previously uncommitted entries + bwal.ents = bwal.CommitedEntries() + entries := bwal.ConfigChangeEntries() + // force commit config change entries + bwal.AppendAndCommitEntries(entries) + cfg.Logger.Info( + "forcing restart member", + zap.String("cluster-id", bwal.cid.String()), + zap.String("local-member-id", bwal.id.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } else { + cfg.Logger.Info( + "restarting local member", + zap.String("cluster-id", bwal.cid.String()), + zap.String("local-member-id", bwal.id.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } + b.raft = bootstrapRaftFromWal(cfg, bwal) b.raft.cl.SetStore(st) b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) @@ -373,10 +397,8 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back return b, nil } -func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { +func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft { member := cl.MemberByName(cfg.Name) - id := member.ID - bwal := bootstrapNewWAL(cfg, id, cl.ID()) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte @@ -388,7 +410,7 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste } cfg.Logger.Info( "starting local member", - zap.String("local-member-id", id.String()), + zap.String("local-member-id", member.ID.String()), zap.String("cluster-id", cl.ID().String()), ) s := bwal.MemoryStorage() @@ -396,22 +418,13 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, cl: cl, - config: raftConfig(cfg, uint64(bwal.id), s), + config: raftConfig(cfg, uint64(member.ID), s), peers: peers, storage: s, - wal: bwal, } } -func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) - - cfg.Logger.Info( - "restarting local member", - zap.String("cluster-id", bwal.cid.String()), - zap.String("local-member-id", bwal.id.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) +func bootstrapRaftFromWal(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft { cl := membership.NewCluster(cfg.Logger) cl.SetID(bwal.id, bwal.cid) s := bwal.MemoryStorage() @@ -421,36 +434,6 @@ func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *b cl: cl, config: raftConfig(cfg, uint64(bwal.id), s), storage: s, - wal: bwal, - } -} - -func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) - - // discard the previously uncommitted entries - bwal.ents = bwal.CommitedEntries() - entries := bwal.ConfigChangeEntries() - // force commit config change entries - bwal.AppendAndCommitEntries(entries) - - cfg.Logger.Info( - "forcing restart member", - zap.String("cluster-id", bwal.cid.String()), - zap.String("local-member-id", bwal.id.String()), - zap.Uint64("commit-index", bwal.st.Commit), - ) - - cl := membership.NewCluster(cfg.Logger) - cl.SetID(bwal.id, bwal.cid) - s := bwal.MemoryStorage() - return &bootstrappedRaft{ - lg: cfg.Logger, - heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - cl: cl, - config: raftConfig(cfg, uint64(bwal.id), s), - storage: s, - wal: bwal, } } @@ -476,10 +459,9 @@ type bootstrappedRaft struct { config *raft.Config cl *membership.RaftCluster storage *raft.MemoryStorage - wal *bootstrappedWAL } -func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { +func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raftNode { var n raft.Node if len(b.peers) == 0 { n = raft.RestartNode(b.config) @@ -496,7 +478,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, - storage: NewStorage(b.wal.w, ss), + storage: NewStorage(wal, ss), }, ) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 2453329f8..89d043ece 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.raft.wal.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.raft.wal.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.wal.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.wal.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -320,21 +320,21 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { errorc: make(chan error, 1), v2store: b.storage.st, snapshotter: b.ss, - r: *b.storage.cluster.raft.newRaftNode(b.ss), - id: b.storage.cluster.raft.wal.id, + r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w), + id: b.storage.cluster.wal.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.storage.cluster.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.raft.wal.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.wal.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.storage.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } - serverID.With(prometheus.Labels{"server_id": b.storage.cluster.raft.wal.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.storage.cluster.wal.id.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -403,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.storage.cluster.raft.wal.id, + ID: b.storage.cluster.wal.id, URLs: cfg.PeerURLs, ClusterID: b.storage.cluster.raft.cl.ID(), Raft: srv, @@ -417,12 +417,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } // add all remotes into transport for _, m := range b.storage.cluster.remotes { - if m.ID != b.storage.cluster.raft.wal.id { + if m.ID != b.storage.cluster.wal.id { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range b.storage.cluster.raft.cl.Members() { - if m.ID != b.storage.cluster.raft.wal.id { + if m.ID != b.storage.cluster.wal.id { tr.AddPeer(m.ID, m.PeerURLs) } }