server: Move raft and wal up the bootstrap hierarchy

This commit is contained in:
Marek Siarkowicz 2021-07-21 13:55:22 +02:00
parent 8b0d8ea2af
commit 5d044563a8

View File

@ -233,17 +233,57 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp
switch {
case !haveWAL && !cfg.NewCluster:
c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be)
if err != nil {
return nil, err
}
c.wal = bootstrapNewWAL(cfg, c)
c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal)
c.cl.SetID(c.nodeID, c.cl.ID())
return c, nil
case !haveWAL && cfg.NewCluster:
c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be)
if err != nil {
return nil, err
}
c.wal = bootstrapNewWAL(cfg, c)
c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal)
c.cl.SetID(c.nodeID, c.cl.ID())
return c, nil
case haveWAL:
c, err = bootstrapClusterWithWAL(cfg, storage)
bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync)
if cfg.ForceNewCluster {
// discard the previously uncommitted entries
bwal.ents = bwal.CommitedEntries()
entries := bwal.ConfigChangeEntries(meta)
// force commit config change entries
bwal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
} else {
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
}
c, err = bootstrapClusterWithWAL(cfg, storage, meta)
if err != nil {
return nil, err
}
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
c.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta)
c.wal = bwal
return c, nil
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
if err != nil {
return nil, err
}
return c, nil
}
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) {
@ -270,13 +310,8 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
cl.SetStore(st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
member := cl.MemberByName(cfg.Name)
bwal := bootstrapNewWAL(cfg, member, cl)
br := bootstrapRaftFromCluster(cfg, cl, nil, bwal)
cl.SetID(member.ID, existingCluster.ID())
return &bootstrapedCluster{
raft: br,
remotes: remotes,
wal: bwal,
cl: cl,
nodeID: member.ID,
}, nil
@ -315,58 +350,26 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
cl.SetStore(st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
member := cl.MemberByName(cfg.Name)
bwal := bootstrapNewWAL(cfg, member, cl)
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal)
cl.SetID(member.ID, cl.ID())
return &bootstrapedCluster{
remotes: nil,
raft: br,
wal: bwal,
cl: cl,
nodeID: member.ID,
}, nil
}
func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage) (*bootstrapedCluster, error) {
func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, meta *snapshotMetadata) (*bootstrapedCluster, error) {
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
if cfg.ShouldDiscover() {
cfg.Logger.Warn(
"discovery token is ignored since cluster already initialized; valid logs are found",
zap.String("bwal-dir", cfg.WALDir()),
)
}
bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync)
if cfg.ForceNewCluster {
// discard the previously uncommitted entries
bwal.ents = bwal.CommitedEntries()
entries := bwal.ConfigChangeEntries(meta)
// force commit config change entries
bwal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
} else {
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
}
cl := membership.NewCluster(cfg.Logger)
cl.SetID(meta.nodeID, meta.clusterID)
raft := bootstrapRaftFromSnapshot(cfg, bwal, meta)
cl.SetStore(storage.st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be))
cl.Recover(api.UpdateCapability)
@ -376,8 +379,6 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
}
return &bootstrapedCluster{
raft: raft,
wal: bwal,
cl: cl,
nodeID: meta.nodeID,
}, nil
@ -566,11 +567,11 @@ type snapshotMetadata struct {
nodeID, clusterID types.ID
}
func bootstrapNewWAL(cfg config.ServerConfig, m *membership.Member, cl *membership.RaftCluster) *bootstrappedWAL {
func bootstrapNewWAL(cfg config.ServerConfig, cluster *bootstrapedCluster) *bootstrappedWAL {
metadata := pbutil.MustMarshal(
&etcdserverpb.Metadata{
NodeID: uint64(m.ID),
ClusterID: uint64(cl.ID()),
NodeID: uint64(cluster.nodeID),
ClusterID: uint64(cluster.cl.ID()),
},
)
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)