Merge pull request #13230 from serathius/storage

Move storage bootstrap to its package
This commit is contained in:
Piotr Tabor 2021-09-25 17:34:30 +02:00 committed by GitHub
commit 183cc52bc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 286 additions and 203 deletions

View File

@ -49,7 +49,6 @@ import (
) )
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
if cfg.MaxRequestBytes > recommendedMaxRequestBytes { if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
cfg.Logger.Warn( cfg.Logger.Warn(
@ -65,60 +64,118 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
return nil, fmt.Errorf("cannot access data directory: %v", terr) return nil, fmt.Errorf("cannot access data directory: %v", terr)
} }
haveWAL := wal.Exist(cfg.WALDir()) if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr)
}
ss := bootstrapSnapshot(cfg) ss := bootstrapSnapshot(cfg)
be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
be.Close()
}
}()
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout()) prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
if err != nil { if err != nil {
return nil, err return nil, err
} }
switch { haveWAL := wal.Exist(cfg.WALDir())
case !haveWAL && !cfg.NewCluster: st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
case !haveWAL && cfg.NewCluster:
b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
case haveWAL:
b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
default:
be.Close()
return nil, fmt.Errorf("unsupported bootstrap config")
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
var (
bwal *bootstrappedWAL
)
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { if haveWAL {
return nil, fmt.Errorf("cannot access member directory: %v", terr) if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
} }
b.prt = prt bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)
b.ci = ci }
b.st = st
b.be = be cluster, err := bootstrapCluster(cfg, bwal, prt)
b.ss = ss if err != nil {
b.beHooks = beHooks backend.Close()
return b, nil return nil, err
}
s, err := bootstrapStorage(cfg, st, backend, bwal, cluster)
if err != nil {
backend.Close()
return nil, err
}
err = cluster.Finalize(cfg, s)
if err != nil {
backend.Close()
return nil, err
}
raft := bootstrapRaft(cfg, cluster, s.wal)
return &bootstrappedServer{
prt: prt,
ss: ss,
storage: s,
cluster: cluster,
raft: raft,
}, nil
} }
type bootstrappedServer struct { type bootstrappedServer struct {
storage *bootstrappedStorage
cluster *bootstrapedCluster
raft *bootstrappedRaft raft *bootstrappedRaft
remotes []*membership.Member
prt http.RoundTripper prt http.RoundTripper
ci cindex.ConsistentIndexer
st v2store.Store
be backend.Backend
ss *snap.Snapshotter ss *snap.Snapshotter
}
func (s *bootstrappedServer) Close() {
s.storage.Close()
}
type bootstrappedStorage struct {
backend *bootstrappedBackend
wal *bootstrappedWAL
st v2store.Store
}
func (s *bootstrappedStorage) Close() {
s.backend.Close()
}
type bootstrappedBackend struct {
beHooks *serverstorage.BackendHooks beHooks *serverstorage.BackendHooks
be backend.Backend
ci cindex.ConsistentIndexer
beExist bool
snapshot *raftpb.Snapshot
}
func (s *bootstrappedBackend) Close() {
s.be.Close()
}
type bootstrapedCluster struct {
remotes []*membership.Member
cl *membership.RaftCluster
nodeID types.ID
}
type bootstrappedRaft struct {
lg *zap.Logger
heartbeat time.Duration
peers []raft.Peer
config *raft.Config
storage *raft.MemoryStorage
}
func bootstrapStorage(cfg config.ServerConfig, st v2store.Store, be *bootstrappedBackend, wal *bootstrappedWAL, cl *bootstrapedCluster) (b *bootstrappedStorage, err error) {
if wal == nil {
wal = bootstrapNewWAL(cfg, cl)
}
return &bootstrappedStorage{
backend: be,
st: st,
wal: wal,
}, nil
} }
func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
@ -142,11 +199,11 @@ func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
return snap.New(cfg.Logger, cfg.SnapDir()) return snap.New(cfg.Logger, cfg.SnapDir())
} }
func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *serverstorage.BackendHooks, err error) { func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) {
beExist = fileutil.Exist(cfg.BackendPath()) beExist := fileutil.Exist(cfg.BackendPath())
ci = cindex.NewConsistentIndex(nil) ci := cindex.NewConsistentIndex(nil)
beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci) beHooks := serverstorage.NewBackendHooks(cfg.Logger, ci)
be = serverstorage.OpenBackend(cfg, beHooks) be := serverstorage.OpenBackend(cfg, beHooks)
defer func() { defer func() {
if err != nil && be != nil { if err != nil && be != nil {
be.Close() be.Close()
@ -157,20 +214,35 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err = maybeDefragBackend(cfg, be) err = maybeDefragBackend(cfg, be)
if err != nil { if err != nil {
return nil, nil, false, nil, err return nil, err
} }
} }
cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex())) cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))
// TODO(serathius): Implement schema setup in fresh storage // TODO(serathius): Implement schema setup in fresh storage
var (
snapshot *raftpb.Snapshot
)
if haveWAL {
snapshot, be, err = recoverSnapshot(cfg, st, be, beExist, beHooks, ci, ss)
if err != nil {
return nil, err
}
}
if beExist { if beExist {
err = schema.Validate(cfg.Logger, be.BatchTx()) err = schema.Validate(cfg.Logger, be.BatchTx())
if err != nil { if err != nil {
cfg.Logger.Error("Failed to validate schema", zap.Error(err)) cfg.Logger.Error("Failed to validate schema", zap.Error(err))
return nil, nil, false, nil, err return nil, err
} }
} }
return be, ci, beExist, beHooks, nil return &bootstrappedBackend{
beHooks: beHooks,
be: be,
ci: ci,
beExist: beExist,
snapshot: snapshot,
}, nil
} }
func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
@ -192,7 +264,24 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
return be.Defrag() return be.Defrag()
} }
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { func bootstrapCluster(cfg config.ServerConfig, bwal *bootstrappedWAL, prt http.RoundTripper) (c *bootstrapedCluster, err error) {
switch {
case bwal == nil && !cfg.NewCluster:
c, err = bootstrapExistingClusterNoWAL(cfg, prt)
case bwal == nil && cfg.NewCluster:
c, err = bootstrapNewClusterNoWAL(cfg, prt)
case bwal != nil && bwal.haveWAL:
c, err = bootstrapClusterWithWAL(cfg, bwal.meta)
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
if err != nil {
return nil, err
}
return c, nil
}
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) {
if err := cfg.VerifyJoinExisting(); err != nil { if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err return nil, err
} }
@ -213,17 +302,15 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
remotes := existingCluster.Members() remotes := existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID()) cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st) member := cl.MemberByName(cfg.Name)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) return &bootstrapedCluster{
br := bootstrapRaftFromCluster(cfg, cl, nil)
cl.SetID(br.wal.id, existingCluster.ID())
return &bootstrappedServer{
raft: br,
remotes: remotes, remotes: remotes,
cl: cl,
nodeID: member.ID,
}, nil }, nil
} }
func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) {
if err := cfg.VerifyBootstrap(); err != nil { if err := cfg.VerifyBootstrap(); err != nil {
return nil, err return nil, err
} }
@ -253,42 +340,43 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
return nil, err return nil, err
} }
} }
cl.SetStore(st) return &bootstrapedCluster{
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs())
cl.SetID(br.wal.id, cl.ID())
return &bootstrappedServer{
remotes: nil, remotes: nil,
raft: br, cl: cl,
nodeID: m.ID,
}, nil }, nil
} }
func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) { func bootstrapClusterWithWAL(cfg config.ServerConfig, meta *snapshotMetadata) (*bootstrapedCluster, error) {
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err) return nil, fmt.Errorf("cannot write to member directory: %v", err)
} }
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
if cfg.ShouldDiscover() { if cfg.ShouldDiscover() {
cfg.Logger.Warn( cfg.Logger.Warn(
"discovery token is ignored since cluster already initialized; valid logs are found", "discovery token is ignored since cluster already initialized; valid logs are found",
zap.String("wal-dir", cfg.WALDir()), zap.String("wal-dir", cfg.WALDir()),
) )
} }
cl := membership.NewCluster(cfg.Logger)
cl.SetID(meta.nodeID, meta.clusterID)
return &bootstrapedCluster{
cl: cl,
nodeID: meta.nodeID,
}, nil
}
func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) {
// Find a snapshot to start/restart a raft node // Find a snapshot to start/restart a raft node
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if err != nil { if err != nil {
return nil, err return nil, be, err
} }
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries // bwal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps) snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot { if err != nil && err != snap.ErrNoSnapshot {
return nil, err return nil, be, err
} }
if snapshot != nil { if snapshot != nil {
@ -298,7 +386,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
cfg.Logger.Error("illegal v2store content", zap.Error(err)) cfg.Logger.Error("illegal v2store content", zap.Error(err))
return nil, err return nil, be, err
} }
cfg.Logger.Info( cfg.Logger.Info(
@ -324,7 +412,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
kvindex := ci.ConsistentIndex() kvindex := ci.ConsistentIndex()
if kvindex < snapshot.Metadata.Index { if kvindex < snapshot.Metadata.Index {
if kvindex != 0 { if kvindex != 0 {
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
} }
cfg.Logger.Warn( cfg.Logger.Warn(
"consistent index was never saved", "consistent index was never saved",
@ -335,29 +423,47 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
} else { } else {
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
} }
return snapshot, be, nil
r := &bootstrappedServer{}
if !cfg.ForceNewCluster {
r.raft = bootstrapRaftFromWal(cfg, snapshot)
} else {
r.raft = bootstrapRaftFromWalStandalone(cfg, snapshot)
} }
r.raft.cl.SetStore(st) func (c *bootstrapedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedStorage) error {
r.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) if !s.wal.haveWAL {
r.raft.cl.Recover(api.UpdateCapability) c.cl.SetID(c.nodeID, c.cl.ID())
if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { }
c.cl.SetStore(s.st)
c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, s.backend.be))
if s.wal.haveWAL {
c.cl.Recover(api.UpdateCapability)
if c.databaseFileMissing(s) {
bepath := cfg.BackendPath() bepath := cfg.BackendPath()
os.RemoveAll(bepath) os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) return fmt.Errorf("database file (%v) of the backend is missing", bepath)
} }
return r, nil }
return nil
} }
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { func (c *bootstrapedCluster) databaseFileMissing(s *bootstrappedStorage) bool {
v3Cluster := c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3})
return v3Cluster && !s.backend.beExist
}
func bootstrapRaft(cfg config.ServerConfig, cluster *bootstrapedCluster, bwal *bootstrappedWAL) *bootstrappedRaft {
switch {
case !bwal.haveWAL && !cfg.NewCluster:
return bootstrapRaftFromCluster(cfg, cluster.cl, nil, bwal)
case !bwal.haveWAL && cfg.NewCluster:
return bootstrapRaftFromCluster(cfg, cluster.cl, cluster.cl.MemberIDs(), bwal)
case bwal.haveWAL:
return bootstrapRaftFromWAL(cfg, bwal)
default:
cfg.Logger.Panic("unsupported bootstrap config")
return nil
}
}
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft {
member := cl.MemberByName(cfg.Name) member := cl.MemberByName(cfg.Name)
id := member.ID
wal := bootstrapNewWAL(cfg, id, cl.ID())
peers := make([]raft.Peer, len(ids)) peers := make([]raft.Peer, len(ids))
for i, id := range ids { for i, id := range ids {
var ctx []byte var ctx []byte
@ -369,69 +475,26 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste
} }
cfg.Logger.Info( cfg.Logger.Info(
"starting local member", "starting local member",
zap.String("local-member-id", id.String()), zap.String("local-member-id", member.ID.String()),
zap.String("cluster-id", cl.ID().String()), zap.String("cluster-id", cl.ID().String()),
) )
s := wal.MemoryStorage() s := bwal.MemoryStorage()
return &bootstrappedRaft{ return &bootstrappedRaft{
lg: cfg.Logger, lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
cl: cl, config: raftConfig(cfg, uint64(member.ID), s),
config: raftConfig(cfg, uint64(wal.id), s),
peers: peers, peers: peers,
storage: s, storage: s,
wal: wal,
} }
} }
func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { func bootstrapRaftFromWAL(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft {
wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) s := bwal.MemoryStorage()
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", wal.cid.String()),
zap.String("local-member-id", wal.id.String()),
zap.Uint64("commit-index", wal.st.Commit),
)
cl := membership.NewCluster(cfg.Logger)
cl.SetID(wal.id, wal.cid)
s := wal.MemoryStorage()
return &bootstrappedRaft{ return &bootstrappedRaft{
lg: cfg.Logger, lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
cl: cl, config: raftConfig(cfg, uint64(bwal.meta.nodeID), s),
config: raftConfig(cfg, uint64(wal.id), s),
storage: s, storage: s,
wal: wal,
}
}
func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft {
wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
// discard the previously uncommitted entries
wal.ents = wal.CommitedEntries()
entries := wal.ConfigChangeEntries()
// force commit config change entries
wal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", wal.cid.String()),
zap.String("local-member-id", wal.id.String()),
zap.Uint64("commit-index", wal.st.Commit),
)
cl := membership.NewCluster(cfg.Logger)
cl.SetID(wal.id, wal.cid)
s := wal.MemoryStorage()
return &bootstrappedRaft{
lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
cl: cl,
config: raftConfig(cfg, uint64(wal.id), s),
storage: s,
wal: wal,
} }
} }
@ -449,18 +512,7 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft
} }
} }
type bootstrappedRaft struct { func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode {
lg *zap.Logger
heartbeat time.Duration
peers []raft.Peer
config *raft.Config
cl *membership.RaftCluster
storage *raft.MemoryStorage
wal *bootstrappedWAL
}
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode {
var n raft.Node var n raft.Node
if len(b.peers) == 0 { if len(b.peers) == 0 {
n = raft.RestartNode(b.config) n = raft.RestartNode(b.config)
@ -473,30 +525,65 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode {
return newRaftNode( return newRaftNode(
raftNodeConfig{ raftNodeConfig{
lg: b.lg, lg: b.lg,
isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n, Node: n,
heartbeat: b.heartbeat, heartbeat: b.heartbeat,
raftStorage: b.storage, raftStorage: b.storage,
storage: NewStorage(b.wal.w, ss), storage: NewStorage(wal, ss),
}, },
) )
} }
// bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedWAL {
wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot)
bwal := &bootstrappedWAL{
lg: cfg.Logger,
w: wal,
st: st,
ents: ents,
snapshot: snap,
meta: meta,
haveWAL: true,
}
if cfg.ForceNewCluster {
// discard the previously uncommitted entries
bwal.ents = bwal.CommitedEntries()
entries := bwal.NewConfigChangeEntries()
// force commit config change entries
bwal.AppendAndCommitEntries(entries)
cfg.Logger.Info(
"forcing restart member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
} else {
cfg.Logger.Info(
"restarting local member",
zap.String("cluster-id", meta.clusterID.String()),
zap.String("local-member-id", meta.nodeID.String()),
zap.Uint64("commit-index", bwal.st.Commit),
)
}
return bwal
}
// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL. // after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic. // The snap must have been previously saved to the WAL, or this call will panic.
func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *bootstrappedWAL { func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
} }
repaired := false repaired := false
for { for {
w, err := wal.Open(lg, waldir, walsnap) w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap)
if err != nil { if err != nil {
lg.Fatal("failed to open WAL", zap.Error(err)) cfg.Logger.Fatal("failed to open WAL", zap.Error(err))
} }
if unsafeNoFsync { if cfg.UnsafeNoFsync {
w.SetUnsafeNoFsync() w.SetUnsafeNoFsync()
} }
wmetadata, st, ents, err := w.ReadAll() wmetadata, st, ents, err := w.ReadAll()
@ -504,12 +591,12 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn
w.Close() w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice. // we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF { if repaired || err != io.ErrUnexpectedEOF {
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
} }
if !wal.Repair(lg, waldir) { if !wal.Repair(cfg.Logger, cfg.WALDir()) {
lg.Fatal("failed to repair WAL", zap.Error(err)) cfg.Logger.Fatal("failed to repair WAL", zap.Error(err))
} else { } else {
lg.Info("repaired WAL", zap.Error(err)) cfg.Logger.Info("repaired WAL", zap.Error(err))
repaired = true repaired = true
} }
continue continue
@ -518,23 +605,20 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn
pbutil.MustUnmarshal(&metadata, wmetadata) pbutil.MustUnmarshal(&metadata, wmetadata)
id := types.ID(metadata.NodeID) id := types.ID(metadata.NodeID)
cid := types.ID(metadata.ClusterID) cid := types.ID(metadata.ClusterID)
return &bootstrappedWAL{ meta := &snapshotMetadata{clusterID: cid, nodeID: id}
lg: lg, return w, &st, ents, snapshot, meta
w: w,
id: id,
cid: cid,
st: &st,
ents: ents,
snapshot: snapshot,
}
} }
} }
func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *bootstrappedWAL { type snapshotMetadata struct {
nodeID, clusterID types.ID
}
func bootstrapNewWAL(cfg config.ServerConfig, cl *bootstrapedCluster) *bootstrappedWAL {
metadata := pbutil.MustMarshal( metadata := pbutil.MustMarshal(
&etcdserverpb.Metadata{ &etcdserverpb.Metadata{
NodeID: uint64(nodeID), NodeID: uint64(cl.nodeID),
ClusterID: uint64(clusterID), ClusterID: uint64(cl.cl.ID()),
}, },
) )
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
@ -547,19 +631,18 @@ func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *boots
return &bootstrappedWAL{ return &bootstrappedWAL{
lg: cfg.Logger, lg: cfg.Logger,
w: w, w: w,
id: nodeID,
cid: clusterID,
} }
} }
type bootstrappedWAL struct { type bootstrappedWAL struct {
lg *zap.Logger lg *zap.Logger
haveWAL bool
w *wal.WAL w *wal.WAL
id, cid types.ID
st *raftpb.HardState st *raftpb.HardState
ents []raftpb.Entry ents []raftpb.Entry
snapshot *raftpb.Snapshot snapshot *raftpb.Snapshot
meta *snapshotMetadata
} }
func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage { func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage {
@ -591,11 +674,11 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry {
return wal.ents return wal.ents
} }
func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry { func (wal *bootstrappedWAL) NewConfigChangeEntries() []raftpb.Entry {
return serverstorage.CreateConfigChangeEnts( return serverstorage.CreateConfigChangeEnts(
wal.lg, wal.lg,
serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents), serverstorage.GetEffectiveNodeIDsFromWalEntries(wal.lg, wal.snapshot, wal.ents),
uint64(wal.id), uint64(wal.meta.nodeID),
wal.st.Term, wal.st.Term,
wal.st.Commit, wal.st.Commit,
) )

View File

@ -67,7 +67,7 @@ func TestGetIDs(t *testing.T) {
if tt.confState != nil { if tt.confState != nil {
snap.Metadata.ConfState = *tt.confState snap.Metadata.ConfState = *tt.confState
} }
idSet := serverstorage.GetIDs(testLogger, &snap, tt.ents) idSet := serverstorage.GetEffectiveNodeIDsFromWalEntries(testLogger, &snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) { if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet) t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
} }

View File

@ -304,12 +304,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
b.be.Close() b.Close()
} }
}() }()
sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String()) sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String()) lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{ srv = &EtcdServer{
@ -318,28 +318,28 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
lgMu: new(sync.RWMutex), lgMu: new(sync.RWMutex),
lg: cfg.Logger, lg: cfg.Logger,
errorc: make(chan error, 1), errorc: make(chan error, 1),
v2store: b.st, v2store: b.storage.st,
snapshotter: b.ss, snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss), r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
id: b.raft.wal.id, id: b.cluster.nodeID,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.raft.cl, cluster: b.cluster.cl,
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond), SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt, peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()), reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.ci, consistIndex: b.storage.backend.ci,
firstCommitInTerm: notify.NewNotifier(), firstCommitInTerm: notify.NewNotifier(),
clusterVersionChanged: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(),
} }
serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1) serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = b.be srv.be = b.storage.backend.be
srv.beHooks = b.beHooks srv.beHooks = b.storage.backend.beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
@ -403,9 +403,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Logger: cfg.Logger, Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo, TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(), DialTimeout: cfg.PeerDialTimeout(),
ID: b.raft.wal.id, ID: b.cluster.nodeID,
URLs: cfg.PeerURLs, URLs: cfg.PeerURLs,
ClusterID: b.raft.cl.ID(), ClusterID: b.cluster.cl.ID(),
Raft: srv, Raft: srv,
Snapshotter: b.ss, Snapshotter: b.ss,
ServerStats: sstats, ServerStats: sstats,
@ -416,13 +416,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, err return nil, err
} }
// add all remotes into transport // add all remotes into transport
for _, m := range b.remotes { for _, m := range b.cluster.remotes {
if m.ID != b.raft.wal.id { if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs) tr.AddRemote(m.ID, m.PeerURLs)
} }
} }
for _, m := range b.raft.cl.Members() { for _, m := range b.cluster.cl.Members() {
if m.ID != b.raft.wal.id { if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs) tr.AddPeer(m.ID, m.PeerURLs)
} }
} }

View File

@ -109,13 +109,13 @@ func CreateConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, ind
return ents return ents
} }
// GetIDs returns an ordered set of IDs included in the given snapshot and // GetEffectiveNodeIDsFromWalEntries returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain three kinds of // the entries. The given snapshot/entries can contain three kinds of
// ID-related entry: // ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will Be added into the set. // - ConfChangeAddNode, in which case the contained ID will Be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will Be removed from the set. // - ConfChangeRemoveNode, in which case the contained ID will Be removed from the set.
// - ConfChangeAddLearnerNode, in which the contained ID will Be added into the set. // - ConfChangeAddLearnerNode, in which the contained ID will Be added into the set.
func GetIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { func GetEffectiveNodeIDsFromWalEntries(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool) ids := make(map[uint64]bool)
if snap != nil { if snap != nil {
for _, id := range snap.Metadata.ConfState.Voters { for _, id := range snap.Metadata.ConfState.Voters {