mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Unify memory storage boostrap
This commit is contained in:
parent
a72d4462fe
commit
244e5c2cce
@ -33,7 +33,6 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -437,8 +436,7 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster
|
||||
zap.String("local-member-id", id.String()),
|
||||
zap.String("cluster-id", cl.ID().String()),
|
||||
)
|
||||
s := raft.NewMemoryStorage()
|
||||
|
||||
s := wal.MemoryStorage()
|
||||
return &boostrapRaft{
|
||||
lg: cfg.Logger,
|
||||
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
||||
@ -451,11 +449,7 @@ func boostrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster
|
||||
}
|
||||
|
||||
func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft {
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
|
||||
wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
|
||||
|
||||
cfg.Logger.Info(
|
||||
"restarting local member",
|
||||
@ -465,12 +459,7 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo
|
||||
)
|
||||
cl := membership.NewCluster(cfg.Logger)
|
||||
cl.SetID(wal.id, wal.cid)
|
||||
s := raft.NewMemoryStorage()
|
||||
if snapshot != nil {
|
||||
s.ApplySnapshot(*snapshot)
|
||||
}
|
||||
s.SetHardState(*wal.st)
|
||||
s.Append(wal.ents)
|
||||
s := wal.MemoryStorage()
|
||||
return &boostrapRaft{
|
||||
lg: cfg.Logger,
|
||||
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
||||
@ -482,11 +471,7 @@ func boostrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bo
|
||||
}
|
||||
|
||||
func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft {
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
|
||||
wal := boostrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
|
||||
|
||||
// discard the previously uncommitted entries
|
||||
for i, ent := range wal.ents {
|
||||
@ -530,12 +515,7 @@ func boostrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Sna
|
||||
|
||||
cl := membership.NewCluster(cfg.Logger)
|
||||
cl.SetID(wal.id, wal.cid)
|
||||
s := raft.NewMemoryStorage()
|
||||
if snapshot != nil {
|
||||
s.ApplySnapshot(*snapshot)
|
||||
}
|
||||
s.SetHardState(*wal.st)
|
||||
s.Append(wal.ents)
|
||||
s := wal.MemoryStorage()
|
||||
return &boostrapRaft{
|
||||
lg: cfg.Logger,
|
||||
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||
"go.etcd.io/etcd/raft/v3"
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/server/v3/config"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
|
||||
@ -84,10 +85,14 @@ func (st *storage) Release(snap raftpb.Snapshot) error {
|
||||
// boostrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
||||
// after the position of the given snap in the WAL.
|
||||
// The snap must have been previously saved to the WAL, or this call will panic.
|
||||
func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL {
|
||||
func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *boostrappedWAL {
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
repaired := false
|
||||
for {
|
||||
w, err := wal.Open(lg, waldir, snap)
|
||||
w, err := wal.Open(lg, waldir, walsnap)
|
||||
if err != nil {
|
||||
lg.Fatal("failed to open WAL", zap.Error(err))
|
||||
}
|
||||
@ -114,11 +119,12 @@ func boostrapWALFromSnapshot(lg *zap.Logger, waldir string, snap walpb.Snapshot,
|
||||
id := types.ID(metadata.NodeID)
|
||||
cid := types.ID(metadata.ClusterID)
|
||||
return &boostrappedWAL{
|
||||
w: w,
|
||||
id: id,
|
||||
cid: cid,
|
||||
st: &st,
|
||||
ents: ents,
|
||||
w: w,
|
||||
id: id,
|
||||
cid: cid,
|
||||
st: &st,
|
||||
ents: ents,
|
||||
snapshot: snapshot,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -145,8 +151,23 @@ func boostrapNewWal(cfg config.ServerConfig, nodeID, clusterID types.ID) *boostr
|
||||
}
|
||||
|
||||
type boostrappedWAL struct {
|
||||
w *wal.WAL
|
||||
id, cid types.ID
|
||||
st *raftpb.HardState
|
||||
ents []raftpb.Entry
|
||||
w *wal.WAL
|
||||
id, cid types.ID
|
||||
st *raftpb.HardState
|
||||
ents []raftpb.Entry
|
||||
snapshot *raftpb.Snapshot
|
||||
}
|
||||
|
||||
func (wal *boostrappedWAL) MemoryStorage() *raft.MemoryStorage {
|
||||
s := raft.NewMemoryStorage()
|
||||
if wal.snapshot != nil {
|
||||
s.ApplySnapshot(*wal.snapshot)
|
||||
}
|
||||
if wal.st != nil {
|
||||
s.SetHardState(*wal.st)
|
||||
}
|
||||
if len(wal.ents) != 0 {
|
||||
s.Append(wal.ents)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user