etcdserver: Create boostrap wal functions

This commit is contained in:
Marek Siarkowicz 2021-07-07 22:40:45 +02:00
parent e75dfde4cb
commit a72d4462fe
3 changed files with 94 additions and 81 deletions

View File

@ -23,7 +23,6 @@ import (
"sync" "sync"
"time" "time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/contention" "go.etcd.io/etcd/pkg/v3/contention"
@ -34,7 +33,6 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb" "go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -423,29 +421,17 @@ func (r *raftNode) advanceTicks(ticks int) {
func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft { func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft {
member := cl.MemberByName(cfg.Name) member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal( id := member.ID
&pb.Metadata{ wal := boostrapNewWal(cfg, id, cl.ID())
NodeID: uint64(member.ID),
ClusterID: uint64(cl.ID()),
},
)
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
if err != nil {
cfg.Logger.Panic("failed to create WAL", zap.Error(err))
}
if cfg.UnsafeNoFsync {
w.SetUnsafeNoFsync()
}
peers := make([]raft.Peer, len(ids)) peers := make([]raft.Peer, len(ids))
for i, id := range ids { for i, id := range ids {
var ctx []byte var ctx []byte
ctx, err = json.Marshal((*cl).Member(id)) ctx, err := json.Marshal((*cl).Member(id))
if err != nil { if err != nil {
cfg.Logger.Panic("failed to marshal member", zap.Error(err)) cfg.Logger.Panic("failed to marshal member", zap.Error(err))
} }
peers[i] = raft.Peer{ID: uint64(id), Context: ctx} peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
} }
id := member.ID
cfg.Logger.Info( cfg.Logger.Info(
"starting local member", "starting local member",
zap.String("local-member-id", id.String()), zap.String("local-member-id", id.String()),
@ -456,12 +442,11 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster
return &boostrapRaft{ return &boostrapRaft{
lg: cfg.Logger, lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
id: id,
cl: cl, cl: cl,
config: raftConfig(cfg, uint64(id), s), config: raftConfig(cfg, uint64(wal.id), s),
peers: peers, peers: peers,
storage: s, storage: s,
wal: w, wal: wal,
} }
} }
@ -470,30 +455,29 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
} }
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
cfg.Logger.Info( cfg.Logger.Info(
"restarting local member", "restarting local member",
zap.String("cluster-id", cid.String()), zap.String("cluster-id", wal.cid.String()),
zap.String("local-member-id", id.String()), zap.String("local-member-id", wal.id.String()),
zap.Uint64("commit-index", st.Commit), zap.Uint64("commit-index", wal.st.Commit),
) )
cl := membership.NewCluster(cfg.Logger) cl := membership.NewCluster(cfg.Logger)
cl.SetID(id, cid) cl.SetID(wal.id, wal.cid)
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
if snapshot != nil { if snapshot != nil {
s.ApplySnapshot(*snapshot) s.ApplySnapshot(*snapshot)
} }
s.SetHardState(st) s.SetHardState(*wal.st)
s.Append(ents) s.Append(wal.ents)
return &boostrapRaft{ return &boostrapRaft{
lg: cfg.Logger, lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
id: id,
cl: cl, cl: cl,
config: raftConfig(cfg, uint64(id), s), config: raftConfig(cfg, uint64(wal.id), s),
storage: s, storage: s,
wal: w, wal: wal,
} }
} }
@ -502,18 +486,18 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
} }
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync) wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
// discard the previously uncommitted entries // discard the previously uncommitted entries
for i, ent := range ents { for i, ent := range wal.ents {
if ent.Index > st.Commit { if ent.Index > wal.st.Commit {
cfg.Logger.Info( cfg.Logger.Info(
"discarding uncommitted WAL entries", "discarding uncommitted WAL entries",
zap.Uint64("entry-index", ent.Index), zap.Uint64("entry-index", ent.Index),
zap.Uint64("commit-index-from-wal", st.Commit), zap.Uint64("commit-index-from-wal", wal.st.Commit),
zap.Int("number-of-discarded-entries", len(ents)-i), zap.Int("number-of-discarded-entries", len(wal.ents)-i),
) )
ents = ents[:i] wal.ents = wal.ents[:i]
break break
} }
} }
@ -521,45 +505,44 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna
// force append the configuration change entries // force append the configuration change entries
toAppEnts := createConfigChangeEnts( toAppEnts := createConfigChangeEnts(
cfg.Logger, cfg.Logger,
getIDs(cfg.Logger, snapshot, ents), getIDs(cfg.Logger, snapshot, wal.ents),
uint64(id), uint64(wal.id),
st.Term, wal.st.Term,
st.Commit, wal.st.Commit,
) )
ents = append(ents, toAppEnts...) wal.ents = append(wal.ents, toAppEnts...)
// force commit newly appended entries // force commit newly appended entries
err := w.Save(raftpb.HardState{}, toAppEnts) err := wal.w.Save(raftpb.HardState{}, toAppEnts)
if err != nil { if err != nil {
cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err)) cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
} }
if len(ents) != 0 { if len(wal.ents) != 0 {
st.Commit = ents[len(ents)-1].Index wal.st.Commit = wal.ents[len(wal.ents)-1].Index
} }
cfg.Logger.Info( cfg.Logger.Info(
"forcing restart member", "forcing restart member",
zap.String("cluster-id", cid.String()), zap.String("cluster-id", wal.cid.String()),
zap.String("local-member-id", id.String()), zap.String("local-member-id", wal.id.String()),
zap.Uint64("commit-index", st.Commit), zap.Uint64("commit-index", wal.st.Commit),
) )
cl := membership.NewCluster(cfg.Logger) cl := membership.NewCluster(cfg.Logger)
cl.SetID(id, cid) cl.SetID(wal.id, wal.cid)
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
if snapshot != nil { if snapshot != nil {
s.ApplySnapshot(*snapshot) s.ApplySnapshot(*snapshot)
} }
s.SetHardState(st) s.SetHardState(*wal.st)
s.Append(ents) s.Append(wal.ents)
return &boostrapRaft{ return &boostrapRaft{
lg: cfg.Logger, lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
id: id,
cl: cl, cl: cl,
config: raftConfig(cfg, uint64(id), s), config: raftConfig(cfg, uint64(wal.id), s),
storage: s, storage: s,
wal: w, wal: wal,
} }
} }
@ -583,10 +566,9 @@ type boostrapRaft struct {
peers []raft.Peer peers []raft.Peer
config *raft.Config config *raft.Config
id types.ID
cl *membership.RaftCluster cl *membership.RaftCluster
storage *raft.MemoryStorage storage *raft.MemoryStorage
wal *wal.WAL wal *boostrappedWAL
} }
func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode {
@ -606,7 +588,7 @@ func (b *boostrapRaft) newRaftNode(ss *snap.Snapshotter) *raftNode {
Node: n, Node: n,
heartbeat: b.heartbeat, heartbeat: b.heartbeat,
raftStorage: b.storage, raftStorage: b.storage,
storage: NewStorage(b.wal, ss), storage: NewStorage(b.wal.w, ss),
}, },
) )
} }

View File

@ -466,7 +466,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
br := boostrapRaftFromCluster(cfg, cl, nil) br := boostrapRaftFromCluster(cfg, cl, nil)
cl.SetID(br.id, existingCluster.ID()) cl.SetID(br.wal.id, existingCluster.ID())
return &boostrapResult{ return &boostrapResult{
raft: br, raft: br,
remotes: remotes, remotes: remotes,
@ -506,7 +506,7 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
br := boostrapRaftFromCluster(cfg, cl, cl.MemberIDs()) br := boostrapRaftFromCluster(cfg, cl, cl.MemberIDs())
cl.SetID(br.id, cl.ID()) cl.SetID(br.wal.id, cl.ID())
return &boostrapResult{ return &boostrapResult{
remotes: nil, remotes: nil,
raft: br, raft: br,
@ -618,8 +618,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
}() }()
sstats := stats.NewServerStats(cfg.Name, b.raft.id.String()) sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.raft.id.String()) lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{ srv = &EtcdServer{
@ -631,19 +631,19 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
v2store: b.st, v2store: b.st,
snapshotter: b.ss, snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss), r: *b.raft.newRaftNode(b.ss),
id: b.raft.id, id: b.raft.wal.id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.raft.cl, cluster: b.raft.cl,
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond), SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt, peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.raft.id), time.Now()), reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.ci, consistIndex: b.ci,
firstCommitInTermC: make(chan struct{}), firstCommitInTermC: make(chan struct{}),
} }
serverID.With(prometheus.Labels{"server_id": b.raft.id.String()}).Set(1) serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
@ -712,7 +712,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Logger: cfg.Logger, Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo, TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(), DialTimeout: cfg.PeerDialTimeout(),
ID: b.raft.id, ID: b.raft.wal.id,
URLs: cfg.PeerURLs, URLs: cfg.PeerURLs,
ClusterID: b.raft.cl.ID(), ClusterID: b.raft.cl.ID(),
Raft: srv, Raft: srv,
@ -726,12 +726,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
// add all remotes into transport // add all remotes into transport
for _, m := range b.remotes { for _, m := range b.remotes {
if m.ID != b.raft.id { if m.ID != b.raft.wal.id {
tr.AddRemote(m.ID, m.PeerURLs) tr.AddRemote(m.ID, m.PeerURLs)
} }
} }
for _, m := range b.raft.cl.Members() { for _, m := range b.raft.cl.Members() {
if m.ID != b.raft.id { if m.ID != b.raft.wal.id {
tr.AddPeer(m.ID, m.PeerURLs) tr.AddPeer(m.ID, m.PeerURLs)
} }
} }

View File

@ -21,6 +21,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb" "go.etcd.io/etcd/server/v3/wal/walpb"
@ -80,24 +81,21 @@ func (st *storage) Release(snap raftpb.Snapshot) error {
return st.Snapshotter.ReleaseSnapDBs(snap) return st.Snapshotter.ReleaseSnapDBs(snap)
} }
// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // boostrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL. // after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic. // The snap must have been previously saved to the WAL, or this call will panic.
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL {
var (
err error
wmetadata []byte
)
repaired := false repaired := false
for { for {
if w, err = wal.Open(lg, waldir, snap); err != nil { w, err := wal.Open(lg, waldir, snap)
if err != nil {
lg.Fatal("failed to open WAL", zap.Error(err)) lg.Fatal("failed to open WAL", zap.Error(err))
} }
if unsafeNoFsync { if unsafeNoFsync {
w.SetUnsafeNoFsync() w.SetUnsafeNoFsync()
} }
if wmetadata, st, ents, err = w.ReadAll(); err != nil { wmetadata, st, ents, err := w.ReadAll()
if err != nil {
w.Close() w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice. // we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF { if repaired || err != io.ErrUnexpectedEOF {
@ -111,11 +109,44 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b
} }
continue continue
} }
break var metadata pb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
id := types.ID(metadata.NodeID)
cid := types.ID(metadata.ClusterID)
return &boostrappedWAL{
w: w,
id: id,
cid: cid,
st: &st,
ents: ents,
}
} }
var metadata pb.Metadata }
pbutil.MustUnmarshal(&metadata, wmetadata)
id = types.ID(metadata.NodeID) func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostrappedWAL {
cid = types.ID(metadata.ClusterID) metadata := pbutil.MustMarshal(
return w, id, cid, st, ents &pb.Metadata{
NodeID: uint64(nodeID),
ClusterID: uint64(clusterID),
},
)
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
if err != nil {
cfg.Logger.Panic("failed to create WAL", zap.Error(err))
}
if cfg.UnsafeNoFsync {
w.SetUnsafeNoFsync()
}
return &boostrappedWAL{
w: w,
id: nodeID,
cid: clusterID,
}
}
type boostrappedWAL struct {
w *wal.WAL
id, cid types.ID
st *raftpb.HardState
ents []raftpb.Entry
} }