From a72d4462fee227af1f98d827ef2282c3ace43fed Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 7 Jul 2021 22:40:45 +0200 Subject: [PATCH] etcdserver: Create boostrap wal functions --- server/etcdserver/raft.go | 94 +++++++++++++++--------------------- server/etcdserver/server.go | 20 ++++---- server/etcdserver/storage.go | 61 +++++++++++++++++------ 3 files changed, 94 insertions(+), 81 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 74715de2d..6d443384b 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -23,7 +23,6 @@ import ( "sync" "time" - pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/contention" @@ -34,7 +33,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" - "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" ) @@ -423,29 +421,17 @@ func (r *raftNode) advanceTicks(ticks int) { func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { member := cl.MemberByName(cfg.Name) - metadata := pbutil.MustMarshal( - &pb.Metadata{ - NodeID: uint64(member.ID), - ClusterID: uint64(cl.ID()), - }, - ) - w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) - if err != nil { - cfg.Logger.Panic("failed to create WAL", zap.Error(err)) - } - if cfg.UnsafeNoFsync { - w.SetUnsafeNoFsync() - } + id := member.ID + wal := boostrapNewWal(cfg, id, cl.ID()) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte - ctx, err = json.Marshal((*cl).Member(id)) + ctx, err := json.Marshal((*cl).Member(id)) if err != nil { cfg.Logger.Panic("failed to marshal member", zap.Error(err)) } peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } - id := member.ID cfg.Logger.Info( "starting local member", zap.String("local-member-id", id.String()), @@ -456,12 +442,11 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - id: id, cl: cl, - config: raftConfig(cfg, uint64(id), s), + config: raftConfig(cfg, uint64(wal.id), s), peers: peers, storage: s, - wal: w, + wal: wal, } } @@ -470,30 +455,29 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } - w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) + wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) cfg.Logger.Info( "restarting local member", - zap.String("cluster-id", cid.String()), - zap.String("local-member-id", id.String()), - zap.Uint64("commit-index", st.Commit), + zap.String("cluster-id", wal.cid.String()), + zap.String("local-member-id", wal.id.String()), + zap.Uint64("commit-index", wal.st.Commit), ) cl := membership.NewCluster(cfg.Logger) - cl.SetID(id, cid) + cl.SetID(wal.id, wal.cid) s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } - s.SetHardState(st) - s.Append(ents) + s.SetHardState(*wal.st) + s.Append(wal.ents) return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - id: id, cl: cl, - config: raftConfig(cfg, uint64(id), s), + config: raftConfig(cfg, uint64(wal.id), s), storage: s, - wal: w, + wal: wal, } } @@ -502,18 +486,18 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } - w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) + wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) // discard the previously uncommitted entries - for i, ent := range ents { - if ent.Index > st.Commit { + for i, ent := range wal.ents { + if ent.Index > wal.st.Commit { cfg.Logger.Info( "discarding uncommitted WAL entries", zap.Uint64("entry-index", ent.Index), - zap.Uint64("commit-index-from-wal", st.Commit), - zap.Int("number-of-discarded-entries", len(ents)-i), + zap.Uint64("commit-index-from-wal", wal.st.Commit), + zap.Int("number-of-discarded-entries", len(wal.ents)-i), ) - ents = ents[:i] + wal.ents = wal.ents[:i] break } } @@ -521,45 +505,44 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna // force append the configuration change entries toAppEnts := createConfigChangeEnts( cfg.Logger, - getIDs(cfg.Logger, snapshot, ents), - uint64(id), - st.Term, - st.Commit, + getIDs(cfg.Logger, snapshot, wal.ents), + uint64(wal.id), + wal.st.Term, + wal.st.Commit, ) - ents = append(ents, toAppEnts...) + wal.ents = append(wal.ents, toAppEnts...) // force commit newly appended entries - err := w.Save(raftpb.HardState{}, toAppEnts) + err := wal.w.Save(raftpb.HardState{}, toAppEnts) if err != nil { cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err)) } - if len(ents) != 0 { - st.Commit = ents[len(ents)-1].Index + if len(wal.ents) != 0 { + wal.st.Commit = wal.ents[len(wal.ents)-1].Index } cfg.Logger.Info( "forcing restart member", - zap.String("cluster-id", cid.String()), - zap.String("local-member-id", id.String()), - zap.Uint64("commit-index", st.Commit), + zap.String("cluster-id", wal.cid.String()), + zap.String("local-member-id", wal.id.String()), + zap.Uint64("commit-index", wal.st.Commit), ) cl := membership.NewCluster(cfg.Logger) - cl.SetID(id, cid) + cl.SetID(wal.id, wal.cid) s := raft.NewMemoryStorage() if snapshot != nil { s.ApplySnapshot(*snapshot) } - s.SetHardState(st) - s.Append(ents) + s.SetHardState(*wal.st) + s.Append(wal.ents) return &boostrapRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - id: id, cl: cl, - config: raftConfig(cfg, uint64(id), s), + config: raftConfig(cfg, uint64(wal.id), s), storage: s, - wal: w, + wal: wal, } } @@ -583,10 +566,9 @@ type boostrapRaft struct { peers []raft.Peer config *raft.Config - id types.ID cl *membership.RaftCluster storage *raft.MemoryStorage - wal *wal.WAL + wal *boostrappedWAL } func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { @@ -606,7 +588,7 @@ func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, - storage: NewStorage(b.wal, ss), + storage: NewStorage(b.wal.w, ss), }, ) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index aebe8554a..5c56ebe24 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -466,7 +466,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) br := boostrapRaftFromCluster(cfg, cl, nil) - cl.SetID(br.id, existingCluster.ID()) + cl.SetID(br.wal.id, existingCluster.ID()) return &boostrapResult{ raft: br, remotes: remotes, @@ -506,7 +506,7 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st cl.SetStore(st) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) br := boostrapRaftFromCluster(cfg, cl, cl.MemberIDs()) - cl.SetID(br.id, cl.ID()) + cl.SetID(br.wal.id, cl.ID()) return &boostrapResult{ remotes: nil, raft: br, @@ -618,8 +618,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.raft.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.raft.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -631,19 +631,19 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { v2store: b.st, snapshotter: b.ss, r: *b.raft.newRaftNode(b.ss), - id: b.raft.id, + id: b.raft.wal.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.raft.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.ci, firstCommitInTermC: make(chan struct{}), } - serverID.With(prometheus.Labels{"server_id": b.raft.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -712,7 +712,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.raft.id, + ID: b.raft.wal.id, URLs: cfg.PeerURLs, ClusterID: b.raft.cl.ID(), Raft: srv, @@ -726,12 +726,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } // add all remotes into transport for _, m := range b.remotes { - if m.ID != b.raft.id { + if m.ID != b.raft.wal.id { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range b.raft.cl.Members() { - if m.ID != b.raft.id { + if m.ID != b.raft.wal.id { tr.AddPeer(m.ID, m.PeerURLs) } } diff --git a/server/etcdserver/storage.go b/server/etcdserver/storage.go index e662537d3..df516cb16 100644 --- a/server/etcdserver/storage.go +++ b/server/etcdserver/storage.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -80,24 +81,21 @@ func (st *storage) Release(snap raftpb.Snapshot) error { return st.Snapshotter.ReleaseSnapDBs(snap) } -// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear +// boostrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. // The snap must have been previously saved to the WAL, or this call will panic. -func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { - var ( - err error - wmetadata []byte - ) - +func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL { repaired := false for { - if w, err = wal.Open(lg, waldir, snap); err != nil { + w, err := wal.Open(lg, waldir, snap) + if err != nil { lg.Fatal("failed to open WAL", zap.Error(err)) } if unsafeNoFsync { w.SetUnsafeNoFsync() } - if wmetadata, st, ents, err = w.ReadAll(); err != nil { + wmetadata, st, ents, err := w.ReadAll() + if err != nil { w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { @@ -111,11 +109,44 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b } continue } - break + var metadata pb.Metadata + pbutil.MustUnmarshal(&metadata, wmetadata) + id := types.ID(metadata.NodeID) + cid := types.ID(metadata.ClusterID) + return &boostrappedWAL{ + w: w, + id: id, + cid: cid, + st: &st, + ents: ents, + } } - var metadata pb.Metadata - pbutil.MustUnmarshal(&metadata, wmetadata) - id = types.ID(metadata.NodeID) - cid = types.ID(metadata.ClusterID) - return w, id, cid, st, ents +} + +func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostrappedWAL { + metadata := pbutil.MustMarshal( + &pb.Metadata{ + NodeID: uint64(nodeID), + ClusterID: uint64(clusterID), + }, + ) + w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) + if err != nil { + cfg.Logger.Panic("failed to create WAL", zap.Error(err)) + } + if cfg.UnsafeNoFsync { + w.SetUnsafeNoFsync() + } + return &boostrappedWAL{ + w: w, + id: nodeID, + cid: clusterID, + } +} + +type boostrappedWAL struct { + w *wal.WAL + id, cid types.ID + st *raftpb.HardState + ents []raftpb.Entry }