From 554777bba48d9cec938c4bf43eda5e3310299777 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 17:29:13 +0200 Subject: [PATCH] etcdserver: Extract boostrapRaft struct --- server/etcdserver/raft.go | 48 ++++++++++++++++++++------ server/etcdserver/server.go | 68 +++++++++++++++---------------------- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 48440ec63..39857522e 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -420,8 +420,7 @@ func (r *raftNode) advanceTicks(ticks int) { } } -func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { - var err error +func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( &pb.Metadata{ @@ -429,7 +428,8 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. ClusterID: uint64(cl.ID()), }, ) - if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil { + w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) + if err != nil { cfg.Logger.Panic("failed to create WAL", zap.Error(err)) } if cfg.UnsafeNoFsync { @@ -444,14 +444,15 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } - id = member.ID + id := member.ID cfg.Logger.Info( "starting local member", zap.String("local-member-id", id.String()), zap.String("cluster-id", cl.ID().String()), ) - s = raft.NewMemoryStorage() + s := raft.NewMemoryStorage() c := raftConfig(cfg, uint64(id), s) + var n raft.Node if len(peers) == 0 { n = raft.RestartNode(c) } else { @@ -460,10 +461,17 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types. raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - return id, n, s, w + + return &boostrapRaft{ + id: id, + cl: cl, + node: n, + storage: s, + wal: w, + } } -func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -489,10 +497,16 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - return id, cl, n, s, w + return &boostrapRaft{ + id: id, + cl: cl, + node: n, + storage: s, + wal: w, + } } -func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { +func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term @@ -551,7 +565,13 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) n := raft.RestartNode(c) raftStatus = n.Status - return id, cl, n, s, w + return &boostrapRaft{ + id: id, + cl: cl, + node: n, + storage: s, + wal: w, + } } func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft.Config { @@ -568,6 +588,14 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft } } +type boostrapRaft struct { + id types.ID + cl *membership.RaftCluster + node raft.Node + storage *raft.MemoryStorage + wal *wal.WAL +} + // getIDs returns an ordered set of IDs included in the given snapshot and // the entries. The given snapshot/entries can contain three kinds of // ID-related entry: diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 1a18bffc5..a53d34bb5 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -393,12 +393,8 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { } type boostrapResult struct { - cl *membership.RaftCluster + raft *boostrapRaft remotes []*membership.Member - w *wal.WAL - n raft.Node - s *raft.MemoryStorage - id types.ID prt http.RoundTripper ci cindex.ConsistentIndexer st v2store.Store @@ -469,15 +465,11 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - id, n, s, w := startNode(cfg, cl, nil) - cl.SetID(id, existingCluster.ID()) + br := startNode(cfg, cl, nil) + cl.SetID(br.id, existingCluster.ID()) return &boostrapResult{ - cl: cl, + raft: br, remotes: remotes, - w: w, - n: n, - s: s, - id: id, }, nil } @@ -513,15 +505,11 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st } cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - id, n, s, w := startNode(cfg, cl, cl.MemberIDs()) - cl.SetID(id, cl.ID()) + br := startNode(cfg, cl, cl.MemberIDs()) + cl.SetID(br.id, cl.ID()) return &boostrapResult{ - cl: cl, remotes: nil, - w: w, - n: n, - s: s, - id: id, + raft: br, }, nil } @@ -600,15 +588,15 @@ func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backe r := &boostrapResult{} if !cfg.ForceNewCluster { - r.id, r.cl, r.n, r.s, r.w = restartNode(cfg, snapshot) + r.raft = restartNode(cfg, snapshot) } else { - r.id, r.cl, r.n, r.s, r.w = restartAsStandaloneNode(cfg, snapshot) + r.raft = restartAsStandaloneNode(cfg, snapshot) } - r.cl.SetStore(st) - r.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - r.cl.Recover(api.UpdateCapability) - if r.cl.Version() != nil && !r.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + r.raft.cl.SetStore(st) + r.raft.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + r.raft.cl.Recover(api.UpdateCapability) + if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) @@ -630,8 +618,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.raft.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.raft.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -645,26 +633,26 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { r: *newRaftNode( raftNodeConfig{ lg: cfg.Logger, - isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, - Node: b.n, + isIDRemoved: func(id uint64) bool { return b.raft.cl.IsIDRemoved(types.ID(id)) }, + Node: b.raft.node, heartbeat: heartbeat, - raftStorage: b.s, - storage: NewStorage(b.w, b.ss), + raftStorage: b.raft.storage, + storage: NewStorage(b.raft.wal, b.ss), }, ), - id: b.id, + id: b.raft.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.cl, + cluster: b.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.raft.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.ci, firstCommitInTermC: make(chan struct{}), } - serverID.With(prometheus.Labels{"server_id": b.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.raft.id.String()}).Set(1) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -733,9 +721,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.id, + ID: b.raft.id, URLs: cfg.PeerURLs, - ClusterID: b.cl.ID(), + ClusterID: b.raft.cl.ID(), Raft: srv, Snapshotter: b.ss, ServerStats: sstats, @@ -747,12 +735,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } // add all remotes into transport for _, m := range b.remotes { - if m.ID != b.id { + if m.ID != b.raft.id { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range b.cl.Members() { - if m.ID != b.id { + for _, m := range b.raft.cl.Members() { + if m.ID != b.raft.id { tr.AddPeer(m.ID, m.PeerURLs) } }