server: Move wal bootstrap up the hierarchy

This commit is contained in:
Marek Siarkowicz 2021-07-21 11:51:18 +02:00
parent aa0c050003
commit db06a4ab28
2 changed files with 52 additions and 70 deletions

View File

@ -101,6 +101,7 @@ type bootstrappedStorage struct {
type bootstrapedCluster struct { type bootstrapedCluster struct {
raft *bootstrappedRaft raft *bootstrappedRaft
remotes []*membership.Member remotes []*membership.Member
wal *bootstrappedWAL
} }
func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) {
@ -123,7 +124,7 @@ func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.Ro
case !haveWAL && cfg.NewCluster: case !haveWAL && cfg.NewCluster:
c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
case haveWAL: case haveWAL:
c, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) c, err = bootstrapClusterWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
default: default:
be.Close() be.Close()
return nil, fmt.Errorf("unsupported bootstrap config") return nil, fmt.Errorf("unsupported bootstrap config")
@ -234,11 +235,13 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
cl.SetID(types.ID(0), existingCluster.ID()) cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
br := bootstrapRaftFromCluster(cfg, cl, nil) bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID())
cl.SetID(br.wal.id, existingCluster.ID()) br := bootstrapRaftFromCluster(cfg, cl, nil, bwal)
cl.SetID(bwal.id, existingCluster.ID())
return &bootstrapedCluster{ return &bootstrapedCluster{
raft: br, raft: br,
remotes: remotes, remotes: remotes,
wal: bwal,
}, nil }, nil
} }
@ -274,15 +277,17 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
} }
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs()) bwal := bootstrapNewWAL(cfg, cl.MemberByName(cfg.Name).ID, cl.ID())
cl.SetID(br.wal.id, cl.ID()) br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal)
cl.SetID(bwal.id, cl.ID())
return &bootstrapedCluster{ return &bootstrapedCluster{
remotes: nil, remotes: nil,
raft: br, raft: br,
wal: bwal,
}, nil }, nil
} }
func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) {
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err) return nil, fmt.Errorf("cannot write to member directory: %v", err)
} }
@ -294,7 +299,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
if cfg.ShouldDiscover() { if cfg.ShouldDiscover() {
cfg.Logger.Warn( cfg.Logger.Warn(
"discovery token is ignored since cluster already initialized; valid logs are found", "discovery token is ignored since cluster already initialized; valid logs are found",
zap.String("wal-dir", cfg.WALDir()), zap.String("bwal-dir", cfg.WALDir()),
) )
} }
@ -304,7 +309,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
return nil, err return nil, err
} }
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries // bwal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps) snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot { if err != nil && err != snap.ErrNoSnapshot {
return nil, err return nil, err
@ -355,12 +360,31 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
} }
b := &bootstrapedCluster{} bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
if !cfg.ForceNewCluster { b := &bootstrapedCluster{
b.raft = bootstrapRaftFromWal(cfg, snapshot) wal: bwal,
} else {
b.raft = bootstrapRaftFromWalStandalone(cfg, snapshot)
} }
if cfg.ForceNewCluster {
// discard the previously uncommitted entries
bwal.ents = bwal.CommitedEntries()
entries := bwal.ConfigChangeEntries()
// force commit config change entries
bwal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", bwal.cid.String()),
zap.String("local-member-id", bwal.id.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
} else {
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", bwal.cid.String()),
zap.String("local-member-id", bwal.id.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
}
b.raft = bootstrapRaftFromWal(cfg, bwal)
b.raft.cl.SetStore(st) b.raft.cl.SetStore(st)
b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
@ -373,10 +397,8 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
return b, nil return b, nil
} }
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft {
member := cl.MemberByName(cfg.Name) member := cl.MemberByName(cfg.Name)
id := member.ID
bwal := bootstrapNewWAL(cfg, id, cl.ID())
peers := make([]raft.Peer, len(ids)) peers := make([]raft.Peer, len(ids))
for i, id := range ids { for i, id := range ids {
var ctx []byte var ctx []byte
@ -388,7 +410,7 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste
} }
cfg.Logger.Info( cfg.Logger.Info(
"starting local member", "starting local member",
zap.String("local-member-id", id.String()), zap.String("local-member-id", member.ID.String()),
zap.String("cluster-id", cl.ID().String()), zap.String("cluster-id", cl.ID().String()),
) )
s := bwal.MemoryStorage() s := bwal.MemoryStorage()
@ -396,22 +418,13 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste
lg: cfg.Logger, lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
cl: cl, cl: cl,
config: raftConfig(cfg, uint64(bwal.id), s), config: raftConfig(cfg, uint64(member.ID), s),
peers: peers, peers: peers,
storage: s, storage: s,
wal: bwal,
} }
} }
func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { func bootstrapRaftFromWal(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft {
bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", bwal.cid.String()),
zap.String("local-member-id", bwal.id.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
cl := membership.NewCluster(cfg.Logger) cl := membership.NewCluster(cfg.Logger)
cl.SetID(bwal.id, bwal.cid) cl.SetID(bwal.id, bwal.cid)
s := bwal.MemoryStorage() s := bwal.MemoryStorage()
@ -421,36 +434,6 @@ func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *b
cl: cl, cl: cl,
config: raftConfig(cfg, uint64(bwal.id), s), config: raftConfig(cfg, uint64(bwal.id), s),
storage: s, storage: s,
wal: bwal,
}
}
func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft {
bwal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
// discard the previously uncommitted entries
bwal.ents = bwal.CommitedEntries()
entries := bwal.ConfigChangeEntries()
// force commit config change entries
bwal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", bwal.cid.String()),
zap.String("local-member-id", bwal.id.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
cl := membership.NewCluster(cfg.Logger)
cl.SetID(bwal.id, bwal.cid)
s := bwal.MemoryStorage()
return &bootstrappedRaft{
lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
cl: cl,
config: raftConfig(cfg, uint64(bwal.id), s),
storage: s,
wal: bwal,
} }
} }
@ -476,10 +459,9 @@ type bootstrappedRaft struct {
config *raft.Config config *raft.Config
cl *membership.RaftCluster cl *membership.RaftCluster
storage *raft.MemoryStorage storage *raft.MemoryStorage
wal *bootstrappedWAL
} }
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raftNode {
var n raft.Node var n raft.Node
if len(b.peers) == 0 { if len(b.peers) == 0 {
n = raft.RestartNode(b.config) n = raft.RestartNode(b.config)
@ -496,7 +478,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode {
Node: n, Node: n,
heartbeat: b.heartbeat, heartbeat: b.heartbeat,
raftStorage: b.storage, raftStorage: b.storage,
storage: NewStorage(b.wal.w, ss), storage: NewStorage(wal, ss),
}, },
) )
} }

View File

@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
}() }()
sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.raft.wal.id.String()) sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.wal.id.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.raft.wal.id.String()) lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.wal.id.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{ srv = &EtcdServer{
@ -320,21 +320,21 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
errorc: make(chan error, 1), errorc: make(chan error, 1),
v2store: b.storage.st, v2store: b.storage.st,
snapshotter: b.ss, snapshotter: b.ss,
r: *b.storage.cluster.raft.newRaftNode(b.ss), r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w),
id: b.storage.cluster.raft.wal.id, id: b.storage.cluster.wal.id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.storage.cluster.raft.cl, cluster: b.storage.cluster.raft.cl,
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond), SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt, peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.raft.wal.id), time.Now()), reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.wal.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.storage.ci, consistIndex: b.storage.ci,
firstCommitInTerm: notify.NewNotifier(), firstCommitInTerm: notify.NewNotifier(),
clusterVersionChanged: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(),
} }
serverID.With(prometheus.Labels{"server_id": b.storage.cluster.raft.wal.id.String()}).Set(1) serverID.With(prometheus.Labels{"server_id": b.storage.cluster.wal.id.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
@ -403,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Logger: cfg.Logger, Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo, TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(), DialTimeout: cfg.PeerDialTimeout(),
ID: b.storage.cluster.raft.wal.id, ID: b.storage.cluster.wal.id,
URLs: cfg.PeerURLs, URLs: cfg.PeerURLs,
ClusterID: b.storage.cluster.raft.cl.ID(), ClusterID: b.storage.cluster.raft.cl.ID(),
Raft: srv, Raft: srv,
@ -417,12 +417,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
// add all remotes into transport // add all remotes into transport
for _, m := range b.storage.cluster.remotes { for _, m := range b.storage.cluster.remotes {
if m.ID != b.storage.cluster.raft.wal.id { if m.ID != b.storage.cluster.wal.id {
tr.AddRemote(m.ID, m.PeerURLs) tr.AddRemote(m.ID, m.PeerURLs)
} }
} }
for _, m := range b.storage.cluster.raft.cl.Members() { for _, m := range b.storage.cluster.raft.cl.Members() {
if m.ID != b.storage.cluster.raft.wal.id { if m.ID != b.storage.cluster.wal.id {
tr.AddPeer(m.ID, m.PeerURLs) tr.AddPeer(m.ID, m.PeerURLs)
} }
} }