server: Split wal, cluster and raft bootstraping

This commit is contained in:
Marek Siarkowicz 2021-07-21 14:37:23 +02:00
parent 5d044563a8
commit 138afa5be9

View File

@ -74,20 +74,20 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
}
haveWAL := wal.Exist(cfg.WALDir())
storage, err := bootstrapStorage(cfg, haveWAL, ss, prt)
s, err := bootstrapStorage(cfg, haveWAL, ss, prt)
if err != nil {
return nil, err
}
cluster, err := bootstrapCluster(cfg, haveWAL, storage, prt, ss)
cluster, err := bootstrapCluster(cfg, haveWAL, s, prt, ss)
if err != nil {
storage.backend.be.Close()
s.backend.be.Close()
return nil, err
}
return &bootstrappedServer{
prt: prt,
ss: ss,
storage: storage,
storage: s,
cluster: cluster,
}, nil
}
@ -230,60 +230,53 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
}
func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) {
c = &bootstrapedCluster{}
var (
meta *snapshotMetadata
bwal *bootstrappedWAL
)
if haveWAL {
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
bwal, meta = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot)
}
switch {
case !haveWAL && !cfg.NewCluster:
c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be)
if err != nil {
return nil, err
}
c.wal = bootstrapNewWAL(cfg, c)
c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal)
c.cl.SetID(c.nodeID, c.cl.ID())
return c, nil
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
case !haveWAL && cfg.NewCluster:
c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be)
if err != nil {
return nil, err
}
c.wal = bootstrapNewWAL(cfg, c)
c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal)
c.cl.SetID(c.nodeID, c.cl.ID())
return c, nil
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
case haveWAL:
bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync)
if cfg.ForceNewCluster {
// discard the previously uncommitted entries
bwal.ents = bwal.CommitedEntries()
entries := bwal.ConfigChangeEntries(meta)
// force commit config change entries
bwal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
} else {
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
}
c, err = bootstrapClusterWithWAL(cfg, storage, meta)
if err != nil {
return nil, err
}
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
c.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta)
c.wal = bwal
return c, nil
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
switch {
case !haveWAL && !cfg.NewCluster:
c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal)
c.cl.SetID(c.nodeID, c.cl.ID())
case !haveWAL && cfg.NewCluster:
c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal)
c.cl.SetID(c.nodeID, c.cl.ID())
case haveWAL:
c.raft = bootstrapRaftFromSnapshot(cfg, c.wal, meta)
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
return c, nil
}
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) {
@ -516,21 +509,54 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *m
)
}
// bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*bootstrappedWAL, *snapshotMetadata) {
wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot)
bwal := &bootstrappedWAL{
lg: cfg.Logger,
w: wal,
st: st,
ents: ents,
snapshot: snap,
}
if cfg.ForceNewCluster {
// discard the previously uncommitted entries
bwal.ents = bwal.CommitedEntries()
entries := bwal.ConfigChangeEntries(meta)
// force commit config change entries
bwal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
} else {
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
}
return bwal, meta
}
// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic.
func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) (*bootstrappedWAL, *snapshotMetadata) {
func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
repaired := false
for {
w, err := wal.Open(lg, waldir, walsnap)
w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap)
if err != nil {
lg.Fatal("failed to open WAL", zap.Error(err))
cfg.Logger.Fatal("failed to open WAL", zap.Error(err))
}
if unsafeNoFsync {
if cfg.UnsafeNoFsync {
w.SetUnsafeNoFsync()
}
wmetadata, st, ents, err := w.ReadAll()
@ -538,12 +564,12 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
}
if !wal.Repair(lg, waldir) {
lg.Fatal("failed to repair WAL", zap.Error(err))
if !wal.Repair(cfg.Logger, cfg.WALDir()) {
cfg.Logger.Fatal("failed to repair WAL", zap.Error(err))
} else {
lg.Info("repaired WAL", zap.Error(err))
cfg.Logger.Info("repaired WAL", zap.Error(err))
repaired = true
}
continue
@ -553,13 +579,7 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn
id := types.ID(metadata.NodeID)
cid := types.ID(metadata.ClusterID)
meta := &snapshotMetadata{clusterID: cid, nodeID: id}
return &bootstrappedWAL{
lg: lg,
w: w,
st: &st,
ents: ents,
snapshot: snapshot,
}, meta
return w, &st, ents, snapshot, meta
}
}
@ -567,11 +587,11 @@ type snapshotMetadata struct {
nodeID, clusterID types.ID
}
func bootstrapNewWAL(cfg config.ServerConfig, cluster *bootstrapedCluster) *bootstrappedWAL {
func bootstrapNewWAL(cfg config.ServerConfig, cl *membership.RaftCluster, nodeID types.ID) *bootstrappedWAL {
metadata := pbutil.MustMarshal(
&etcdserverpb.Metadata{
NodeID: uint64(cluster.nodeID),
ClusterID: uint64(cluster.cl.ID()),
NodeID: uint64(nodeID),
ClusterID: uint64(cl.ID()),
},
)
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)