mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Refactor standalone boostrap
This commit is contained in:
parent
244e5c2cce
commit
e1fa356fac
@ -474,37 +474,10 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna
|
||||
wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
|
||||
|
||||
// discard the previously uncommitted entries
|
||||
for i, ent := range wal.ents {
|
||||
if ent.Index > wal.st.Commit {
|
||||
cfg.Logger.Info(
|
||||
"discarding uncommitted WAL entries",
|
||||
zap.Uint64("entry-index", ent.Index),
|
||||
zap.Uint64("commit-index-from-wal", wal.st.Commit),
|
||||
zap.Int("number-of-discarded-entries", len(wal.ents)-i),
|
||||
)
|
||||
wal.ents = wal.ents[:i]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// force append the configuration change entries
|
||||
toAppEnts := createConfigChangeEnts(
|
||||
cfg.Logger,
|
||||
getIDs(cfg.Logger, snapshot, wal.ents),
|
||||
uint64(wal.id),
|
||||
wal.st.Term,
|
||||
wal.st.Commit,
|
||||
)
|
||||
wal.ents = append(wal.ents, toAppEnts...)
|
||||
|
||||
// force commit newly appended entries
|
||||
err := wal.w.Save(raftpb.HardState{}, toAppEnts)
|
||||
if err != nil {
|
||||
cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
|
||||
}
|
||||
if len(wal.ents) != 0 {
|
||||
wal.st.Commit = wal.ents[len(wal.ents)-1].Index
|
||||
}
|
||||
wal.ents = wal.CommitedEntries()
|
||||
entries := wal.ConfigChangeEntries()
|
||||
// force commit config change entries
|
||||
wal.AppendAndCommitEntries(entries)
|
||||
|
||||
cfg.Logger.Info(
|
||||
"forcing restart member",
|
||||
|
@ -119,6 +119,7 @@ func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sna
|
||||
id := types.ID(metadata.NodeID)
|
||||
cid := types.ID(metadata.ClusterID)
|
||||
return &boostrappedWAL{
|
||||
lg: lg,
|
||||
w: w,
|
||||
id: id,
|
||||
cid: cid,
|
||||
@ -144,6 +145,7 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr
|
||||
w.SetUnsafeNoFsync()
|
||||
}
|
||||
return &boostrappedWAL{
|
||||
lg: cfg.Logger,
|
||||
w: w,
|
||||
id: nodeID,
|
||||
cid: clusterID,
|
||||
@ -151,6 +153,8 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr
|
||||
}
|
||||
|
||||
type boostrappedWAL struct {
|
||||
lg *zap.Logger
|
||||
|
||||
w *wal.WAL
|
||||
id, cid types.ID
|
||||
st *raftpb.HardState
|
||||
@ -171,3 +175,39 @@ func (wal *boostrappedWAL) MemoryStorage() *raft.MemoryStorage {
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (wal *boostrappedWAL) CommitedEntries() []raftpb.Entry {
|
||||
for i, ent := range wal.ents {
|
||||
if ent.Index > wal.st.Commit {
|
||||
wal.lg.Info(
|
||||
"discarding uncommitted WAL entries",
|
||||
zap.Uint64("entry-index", ent.Index),
|
||||
zap.Uint64("commit-index-from-wal", wal.st.Commit),
|
||||
zap.Int("number-of-discarded-entries", len(wal.ents)-i),
|
||||
)
|
||||
return wal.ents[:i]
|
||||
}
|
||||
}
|
||||
return wal.ents
|
||||
}
|
||||
|
||||
func (wal *boostrappedWAL) ConfigChangeEntries() []raftpb.Entry {
|
||||
return createConfigChangeEnts(
|
||||
wal.lg,
|
||||
getIDs(wal.lg, wal.snapshot, wal.ents),
|
||||
uint64(wal.id),
|
||||
wal.st.Term,
|
||||
wal.st.Commit,
|
||||
)
|
||||
}
|
||||
|
||||
func (wal *boostrappedWAL) AppendAndCommitEntries(ents []raftpb.Entry) {
|
||||
wal.ents = append(wal.ents, ents...)
|
||||
err := wal.w.Save(raftpb.HardState{}, ents)
|
||||
if err != nil {
|
||||
wal.lg.Fatal("failed to save hard state and entries", zap.Error(err))
|
||||
}
|
||||
if len(wal.ents) != 0 {
|
||||
wal.st.Commit = wal.ents[len(wal.ents)-1].Index
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user