Merge pull request #13194 from serathius/bootstrap

Refactor NewServer function
This commit is contained in:
Marek Siarkowicz 2021-07-08 14:20:23 +02:00 committed by GitHub
commit 1e32a0830b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 460 additions and 368 deletions

View File

@ -23,7 +23,6 @@ import (
"sync" "sync"
"time" "time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/contention" "go.etcd.io/etcd/pkg/v3/contention"
@ -33,8 +32,7 @@ import (
"go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/wal/walpb"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -420,156 +418,90 @@ func (r *raftNode) advanceTicks(ticks int) {
} }
} }
func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft {
var err error
member := cl.MemberByName(cfg.Name) member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal( id := member.ID
&pb.Metadata{ wal := bootstrapNewWAL(cfg, id, cl.ID())
NodeID: uint64(member.ID),
ClusterID: uint64(cl.ID()),
},
)
if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil {
cfg.Logger.Panic("failed to create WAL", zap.Error(err))
}
if cfg.UnsafeNoFsync {
w.SetUnsafeNoFsync()
}
peers := make([]raft.Peer, len(ids)) peers := make([]raft.Peer, len(ids))
for i, id := range ids { for i, id := range ids {
var ctx []byte var ctx []byte
ctx, err = json.Marshal((*cl).Member(id)) ctx, err := json.Marshal((*cl).Member(id))
if err != nil { if err != nil {
cfg.Logger.Panic("failed to marshal member", zap.Error(err)) cfg.Logger.Panic("failed to marshal member", zap.Error(err))
} }
peers[i] = raft.Peer{ID: uint64(id), Context: ctx} peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
} }
id = member.ID
cfg.Logger.Info( cfg.Logger.Info(
"starting local member", "starting local member",
zap.String("local-member-id", id.String()), zap.String("local-member-id", id.String()),
zap.String("cluster-id", cl.ID().String()), zap.String("cluster-id", cl.ID().String()),
) )
s = raft.NewMemoryStorage() s := wal.MemoryStorage()
c := &raft.Config{ return &bootstrappedRaft{
ID: uint64(id), lg: cfg.Logger,
ElectionTick: cfg.ElectionTicks, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
HeartbeatTick: 1, cl: cl,
Storage: s, config: raftConfig(cfg, uint64(wal.id), s),
MaxSizePerMsg: maxSizePerMsg, peers: peers,
MaxInflightMsgs: maxInflightMsgs, storage: s,
CheckQuorum: true, wal: wal,
PreVote: cfg.PreVote,
Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")),
} }
if len(peers) == 0 {
n = raft.RestartNode(c)
} else {
n = raft.StartNode(c, peers)
}
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
return id, n, s, w
} }
func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft {
var walsnap walpb.Snapshot wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
cfg.Logger.Info( cfg.Logger.Info(
"restarting local member", "restarting local member",
zap.String("cluster-id", cid.String()), zap.String("cluster-id", wal.cid.String()),
zap.String("local-member-id", id.String()), zap.String("local-member-id", wal.id.String()),
zap.Uint64("commit-index", st.Commit), zap.Uint64("commit-index", wal.st.Commit),
) )
cl := membership.NewCluster(cfg.Logger) cl := membership.NewCluster(cfg.Logger)
cl.SetID(id, cid) cl.SetID(wal.id, wal.cid)
s := raft.NewMemoryStorage() s := wal.MemoryStorage()
if snapshot != nil { return &bootstrappedRaft{
s.ApplySnapshot(*snapshot) lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
cl: cl,
config: raftConfig(cfg, uint64(wal.id), s),
storage: s,
wal: wal,
} }
s.SetHardState(st)
s.Append(ents)
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,
Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")),
} }
n := raft.RestartNode(c) func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft {
raftStatusMu.Lock() wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync)
raftStatus = n.Status
raftStatusMu.Unlock()
return id, cl, n, s, w
}
func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
// discard the previously uncommitted entries // discard the previously uncommitted entries
for i, ent := range ents { wal.ents = wal.CommitedEntries()
if ent.Index > st.Commit { entries := wal.ConfigChangeEntries()
cfg.Logger.Info( // force commit config change entries
"discarding uncommitted WAL entries", wal.AppendAndCommitEntries(entries)
zap.Uint64("entry-index", ent.Index),
zap.Uint64("commit-index-from-wal", st.Commit),
zap.Int("number-of-discarded-entries", len(ents)-i),
)
ents = ents[:i]
break
}
}
// force append the configuration change entries
toAppEnts := createConfigChangeEnts(
cfg.Logger,
getIDs(cfg.Logger, snapshot, ents),
uint64(id),
st.Term,
st.Commit,
)
ents = append(ents, toAppEnts...)
// force commit newly appended entries
err := w.Save(raftpb.HardState{}, toAppEnts)
if err != nil {
cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
}
if len(ents) != 0 {
st.Commit = ents[len(ents)-1].Index
}
cfg.Logger.Info( cfg.Logger.Info(
"forcing restart member", "forcing restart member",
zap.String("cluster-id", cid.String()), zap.String("cluster-id", wal.cid.String()),
zap.String("local-member-id", id.String()), zap.String("local-member-id", wal.id.String()),
zap.Uint64("commit-index", st.Commit), zap.Uint64("commit-index", wal.st.Commit),
) )
cl := membership.NewCluster(cfg.Logger) cl := membership.NewCluster(cfg.Logger)
cl.SetID(id, cid) cl.SetID(wal.id, wal.cid)
s := raft.NewMemoryStorage() s := wal.MemoryStorage()
if snapshot != nil { return &bootstrappedRaft{
s.ApplySnapshot(*snapshot) lg: cfg.Logger,
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
cl: cl,
config: raftConfig(cfg, uint64(wal.id), s),
storage: s,
wal: wal,
} }
s.SetHardState(st) }
s.Append(ents)
c := &raft.Config{ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft.Config {
ID: uint64(id), return &raft.Config{
ID: id,
ElectionTick: cfg.ElectionTicks, ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1, HeartbeatTick: 1,
Storage: s, Storage: s,
@ -579,10 +511,39 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot)
PreVote: cfg.PreVote, PreVote: cfg.PreVote,
Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")), Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")),
} }
}
n := raft.RestartNode(c) type bootstrappedRaft struct {
lg *zap.Logger
heartbeat time.Duration
peers []raft.Peer
config *raft.Config
cl *membership.RaftCluster
storage *raft.MemoryStorage
wal *bootstrappedWAL
}
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode {
var n raft.Node
if len(b.peers) == 0 {
n = raft.RestartNode(b.config)
} else {
n = raft.StartNode(b.config, b.peers)
}
raftStatusMu.Lock()
raftStatus = n.Status raftStatus = n.Status
return id, cl, n, s, w raftStatusMu.Unlock()
return newRaftNode(
raftNodeConfig{
lg: b.lg,
isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) },
Node: n,
heartbeat: b.heartbeat,
raftStorage: b.storage,
storage: NewStorage(b.wal.w, ss),
},
)
} }
// getIDs returns an ordered set of IDs included in the given snapshot and // getIDs returns an ordered set of IDs included in the given snapshot and

View File

@ -330,19 +330,9 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
bh.confStateDirty = true bh.confStateDirty = true
} }
// NewServer creates a new EtcdServer from the supplied configuration. The func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
var (
w *wal.WAL
n raft.Node
s *raft.MemoryStorage
id types.ID
cl *membership.RaftCluster
)
if cfg.MaxRequestBytes > recommendedMaxRequestBytes { if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
cfg.Logger.Warn( cfg.Logger.Warn(
"exceeded recommended request limit", "exceeded recommended request limit",
@ -358,43 +348,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
haveWAL := wal.Exist(cfg.WALDir()) haveWAL := wal.Exist(cfg.WALDir())
ss := bootstrapSnapshot(cfg)
if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil { be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
cfg.Logger.Fatal(
"failed to create snapshot directory",
zap.String("path", cfg.SnapDir()),
zap.Error(err),
)
}
if err = fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool {
return strings.HasPrefix(fileName, "tmp")
}); err != nil {
cfg.Logger.Error(
"failed to remove temp file(s) in snapshot directory",
zap.String("path", cfg.SnapDir()),
zap.Error(err),
)
}
ss := snap.New(cfg.Logger, cfg.SnapDir())
bepath := cfg.BackendPath()
beExist := fileutil.Exist(bepath)
ci := cindex.NewConsistentIndex(nil)
beHooks := &backendHooks{lg: cfg.Logger, indexer: ci}
be := openBackend(cfg, beHooks)
ci.SetBackend(be)
buckets.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err := maybeDefragBackend(cfg, be)
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
defer func() { defer func() {
if err != nil { if err != nil {
be.Close() be.Close()
@ -405,17 +364,89 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
var (
remotes []*membership.Member
snapshot *raftpb.Snapshot
)
switch { switch {
case !haveWAL && !cfg.NewCluster: case !haveWAL && !cfg.NewCluster:
if err = cfg.VerifyJoinExisting(); err != nil { b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be)
case !haveWAL && cfg.NewCluster:
b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
case haveWAL:
b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
default:
be.Close()
return nil, fmt.Errorf("unsupported bootstrap config")
}
if err != nil {
return nil, err return nil, err
} }
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr)
}
b.prt = prt
b.ci = ci
b.st = st
b.be = be
b.ss = ss
b.beHooks = beHooks
return b, nil
}
type bootstrappedServer struct {
raft *bootstrappedRaft
remotes []*membership.Member
prt http.RoundTripper
ci cindex.ConsistentIndexer
st v2store.Store
be backend.Backend
ss *snap.Snapshotter
beHooks *backendHooks
}
func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
if err := fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
cfg.Logger.Fatal(
"failed to create snapshot directory",
zap.String("path", cfg.SnapDir()),
zap.Error(err),
)
}
if err := fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool {
return strings.HasPrefix(fileName, "tmp")
}); err != nil {
cfg.Logger.Error(
"failed to remove temp file(s) in snapshot directory",
zap.String("path", cfg.SnapDir()),
zap.Error(err),
)
}
return snap.New(cfg.Logger, cfg.SnapDir())
}
func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) {
beExist = fileutil.Exist(cfg.BackendPath())
ci = cindex.NewConsistentIndex(nil)
beHooks = &backendHooks{lg: cfg.Logger, indexer: ci}
be = openBackend(cfg, beHooks)
ci.SetBackend(be)
buckets.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
err := maybeDefragBackend(cfg, be)
if err != nil {
be.Close()
return nil, nil, false, nil, err
}
}
cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))
return be, ci, beExist, beHooks, nil
}
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) {
if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -423,25 +454,30 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if gerr != nil { if gerr != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
} }
if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil { if err := membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
} }
if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) { if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
return nil, fmt.Errorf("incompatible with current running cluster") return nil, fmt.Errorf("incompatible with current running cluster")
} }
remotes = existingCluster.Members() remotes := existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID()) cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
id, n, s, w = startNode(cfg, cl, nil) br := bootstrapRaftFromCluster(cfg, cl, nil)
cl.SetID(id, existingCluster.ID()) cl.SetID(br.wal.id, existingCluster.ID())
return &bootstrappedServer{
raft: br,
remotes: remotes,
}, nil
}
case !haveWAL && cfg.NewCluster: func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) {
if err = cfg.VerifyBootstrap(); err != nil { if err := cfg.VerifyBootstrap(); err != nil {
return nil, err return nil, err
} }
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -469,15 +505,20 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
cl.SetStore(st) cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs())
cl.SetID(id, cl.ID()) cl.SetID(br.wal.id, cl.ID())
return &bootstrappedServer{
remotes: nil,
raft: br,
}, nil
}
case haveWAL: func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) {
if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err) return nil, fmt.Errorf("cannot write to member directory: %v", err)
} }
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err) return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
} }
@ -527,34 +568,58 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
zap.Int64("backend-size-in-use-bytes", s2), zap.Int64("backend-size-in-use-bytes", s2),
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
) )
if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
kvindex := ci.ConsistentIndex()
if kvindex < snapshot.Metadata.Index {
if kvindex != 0 {
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
}
cfg.Logger.Warn(
"consistent index was never saved",
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
)
}
}
} else { } else {
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
} }
r := &bootstrappedServer{}
if !cfg.ForceNewCluster { if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot) r.raft = bootstrapRaftFromWal(cfg, snapshot)
} else { } else {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) r.raft = bootstrapRaftFromWalStandalone(cfg, snapshot)
} }
cl.SetStore(st) r.raft.cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) r.raft.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
cl.Recover(api.UpdateCapability) r.raft.cl.Recover(api.UpdateCapability)
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
bepath := cfg.BackendPath()
os.RemoveAll(bepath) os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
} }
return r, nil
default:
return nil, fmt.Errorf("unsupported bootstrap config")
} }
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { // NewServer creates a new EtcdServer from the supplied configuration. The
return nil, fmt.Errorf("cannot access member directory: %v", terr) // configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
b, err := bootstrap(cfg)
if err != nil {
return nil, err
} }
sstats := stats.NewServerStats(cfg.Name, id.String()) defer func() {
lstats := stats.NewLeaderStats(cfg.Logger, id.String()) if err != nil {
b.be.Close()
}
}()
sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{ srv = &EtcdServer{
@ -563,36 +628,27 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
lgMu: new(sync.RWMutex), lgMu: new(sync.RWMutex),
lg: cfg.Logger, lg: cfg.Logger,
errorc: make(chan error, 1), errorc: make(chan error, 1),
v2store: st, v2store: b.st,
snapshotter: ss, snapshotter: b.ss,
r: *newRaftNode( r: *b.raft.newRaftNode(b.ss),
raftNodeConfig{ id: b.raft.wal.id,
lg: cfg.Logger,
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: n,
heartbeat: heartbeat,
raftStorage: s,
storage: NewStorage(w, ss),
},
),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl, cluster: b.raft.cl,
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond), SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt, peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: ci, consistIndex: b.ci,
firstCommitInTermC: make(chan struct{}), firstCommitInTermC: make(chan struct{}),
} }
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = be srv.be = b.be
srv.beHooks = beHooks srv.beHooks = b.beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
@ -620,23 +676,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
kvindex := ci.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
if snapshot != nil && kvindex < snapshot.Metadata.Index {
if kvindex != 0 {
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index)
}
cfg.Logger.Warn(
"consistent index was never saved",
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
)
}
}
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil newSrv := srv // since srv == nil in defer if srv is returned as nil
@ -673,11 +712,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Logger: cfg.Logger, Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo, TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(), DialTimeout: cfg.PeerDialTimeout(),
ID: id, ID: b.raft.wal.id,
URLs: cfg.PeerURLs, URLs: cfg.PeerURLs,
ClusterID: cl.ID(), ClusterID: b.raft.cl.ID(),
Raft: srv, Raft: srv,
Snapshotter: ss, Snapshotter: b.ss,
ServerStats: sstats, ServerStats: sstats,
LeaderStats: lstats, LeaderStats: lstats,
ErrorC: srv.errorc, ErrorC: srv.errorc,
@ -686,13 +725,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, err return nil, err
} }
// add all remotes into transport // add all remotes into transport
for _, m := range remotes { for _, m := range b.remotes {
if m.ID != id { if m.ID != b.raft.wal.id {
tr.AddRemote(m.ID, m.PeerURLs) tr.AddRemote(m.ID, m.PeerURLs)
} }
} }
for _, m := range cl.Members() { for _, m := range b.raft.cl.Members() {
if m.ID != id { if m.ID != b.raft.wal.id {
tr.AddPeer(m.ID, m.PeerURLs) tr.AddPeer(m.ID, m.PeerURLs)
} }
} }

View File

@ -20,7 +20,9 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb" "go.etcd.io/etcd/server/v3/wal/walpb"
@ -80,24 +82,25 @@ func (st *storage) Release(snap raftpb.Snapshot) error {
return st.Snapshotter.ReleaseSnapDBs(snap) return st.Snapshotter.ReleaseSnapDBs(snap)
} }
// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL. // after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic. // The snap must have been previously saved to the WAL, or this call will panic.
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *bootstrappedWAL {
var ( var walsnap walpb.Snapshot
err error if snapshot != nil {
wmetadata []byte walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
) }
repaired := false repaired := false
for { for {
if w, err = wal.Open(lg, waldir, snap); err != nil { w, err := wal.Open(lg, waldir, walsnap)
if err != nil {
lg.Fatal("failed to open WAL", zap.Error(err)) lg.Fatal("failed to open WAL", zap.Error(err))
} }
if unsafeNoFsync { if unsafeNoFsync {
w.SetUnsafeNoFsync() w.SetUnsafeNoFsync()
} }
if wmetadata, st, ents, err = w.ReadAll(); err != nil { wmetadata, st, ents, err := w.ReadAll()
if err != nil {
w.Close() w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice. // we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF { if repaired || err != io.ErrUnexpectedEOF {
@ -111,11 +114,100 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b
} }
continue continue
} }
break
}
var metadata pb.Metadata var metadata pb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata) pbutil.MustUnmarshal(&metadata, wmetadata)
id = types.ID(metadata.NodeID) id := types.ID(metadata.NodeID)
cid = types.ID(metadata.ClusterID) cid := types.ID(metadata.ClusterID)
return w, id, cid, st, ents return &bootstrappedWAL{
lg: lg,
w: w,
id: id,
cid: cid,
st: &st,
ents: ents,
snapshot: snapshot,
}
}
}
func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *bootstrappedWAL {
metadata := pbutil.MustMarshal(
&pb.Metadata{
NodeID: uint64(nodeID),
ClusterID: uint64(clusterID),
},
)
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
if err != nil {
cfg.Logger.Panic("failed to create WAL", zap.Error(err))
}
if cfg.UnsafeNoFsync {
w.SetUnsafeNoFsync()
}
return &bootstrappedWAL{
lg: cfg.Logger,
w: w,
id: nodeID,
cid: clusterID,
}
}
type bootstrappedWAL struct {
lg *zap.Logger
w *wal.WAL
id, cid types.ID
st *raftpb.HardState
ents []raftpb.Entry
snapshot *raftpb.Snapshot
}
func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage {
s := raft.NewMemoryStorage()
if wal.snapshot != nil {
s.ApplySnapshot(*wal.snapshot)
}
if wal.st != nil {
s.SetHardState(*wal.st)
}
if len(wal.ents) != 0 {
s.Append(wal.ents)
}
return s
}
func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry {
for i, ent := range wal.ents {
if ent.Index > wal.st.Commit {
wal.lg.Info(
"discarding uncommitted WAL entries",
zap.Uint64("entry-index", ent.Index),
zap.Uint64("commit-index-from-wal", wal.st.Commit),
zap.Int("number-of-discarded-entries", len(wal.ents)-i),
)
return wal.ents[:i]
}
}
return wal.ents
}
func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry {
return createConfigChangeEnts(
wal.lg,
getIDs(wal.lg, wal.snapshot, wal.ents),
uint64(wal.id),
wal.st.Term,
wal.st.Commit,
)
}
func (wal *bootstrappedWAL) AppendAndCommitEntries(ents []raftpb.Entry) {
wal.ents = append(wal.ents, ents...)
err := wal.w.Save(raftpb.HardState{}, ents)
if err != nil {
wal.lg.Fatal("failed to save hard state and entries", zap.Error(err))
}
if len(wal.ents) != 0 {
wal.st.Commit = wal.ents[len(wal.ents)-1].Index
}
} }