mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Extract cluster boostrap functions
This commit is contained in:
parent
16b2a8b420
commit
af0439490c
@ -331,13 +331,8 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
|
||||
}
|
||||
|
||||
func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) {
|
||||
b = &boostrapResult{}
|
||||
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||
|
||||
var (
|
||||
cl *membership.RaftCluster
|
||||
)
|
||||
|
||||
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
|
||||
cfg.Logger.Warn(
|
||||
"exceeded recommended request limit",
|
||||
@ -388,171 +383,25 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
remotes []*membership.Member
|
||||
)
|
||||
|
||||
switch {
|
||||
case !haveWAL && !cfg.NewCluster:
|
||||
if err = cfg.VerifyJoinExisting(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
|
||||
if gerr != nil {
|
||||
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
|
||||
}
|
||||
if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
|
||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||
}
|
||||
if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
|
||||
return nil, fmt.Errorf("incompatible with current running cluster")
|
||||
}
|
||||
|
||||
remotes = existingCluster.Members()
|
||||
cl.SetID(types.ID(0), existingCluster.ID())
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
b.id, b.n, b.s, b.w = startNode(cfg, cl, nil)
|
||||
cl.SetID(b.id, existingCluster.ID())
|
||||
b.cl = cl
|
||||
|
||||
b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be)
|
||||
case !haveWAL && cfg.NewCluster:
|
||||
if err = cfg.VerifyBootstrap(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := cl.MemberByName(cfg.Name)
|
||||
if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) {
|
||||
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
||||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
var str string
|
||||
str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
|
||||
if err != nil {
|
||||
return nil, &DiscoveryError{Op: "join", Err: err}
|
||||
}
|
||||
var urlsmap types.URLsMap
|
||||
urlsmap, err = types.NewURLsMap(str)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if config.CheckDuplicateURL(urlsmap) {
|
||||
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
|
||||
}
|
||||
if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
b.id, b.n, b.s, b.w = startNode(cfg, cl, cl.MemberIDs())
|
||||
cl.SetID(b.id, cl.ID())
|
||||
b.cl = cl
|
||||
|
||||
b, err = boostrapNewClusterNoWAL(cfg, prt, st, be)
|
||||
case haveWAL:
|
||||
if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||
}
|
||||
|
||||
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
|
||||
}
|
||||
|
||||
if cfg.ShouldDiscover() {
|
||||
cfg.Logger.Warn(
|
||||
"discovery token is ignored since cluster already initialized; valid logs are found",
|
||||
zap.String("wal-dir", cfg.WALDir()),
|
||||
)
|
||||
}
|
||||
|
||||
// Find a snapshot to start/restart a raft node
|
||||
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
|
||||
// wal log entries
|
||||
snapshot, err := ss.LoadNewestAvailable(walSnaps)
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if snapshot != nil {
|
||||
if err = st.Recovery(snapshot.Data); err != nil {
|
||||
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
|
||||
}
|
||||
|
||||
if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
|
||||
cfg.Logger.Error("illegal v2store content", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.Logger.Info(
|
||||
"recovered v2 store from snapshot",
|
||||
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
|
||||
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
|
||||
)
|
||||
|
||||
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
|
||||
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
||||
}
|
||||
s1, s2 := be.Size(), be.SizeInUse()
|
||||
cfg.Logger.Info(
|
||||
"recovered v3 backend from snapshot",
|
||||
zap.Int64("backend-size-bytes", s1),
|
||||
zap.String("backend-size", humanize.Bytes(uint64(s1))),
|
||||
zap.Int64("backend-size-in-use-bytes", s2),
|
||||
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
|
||||
)
|
||||
if beExist {
|
||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||
// etcd from pre-3.0 release.
|
||||
kvindex := ci.ConsistentIndex()
|
||||
if kvindex < snapshot.Metadata.Index {
|
||||
if kvindex != 0 {
|
||||
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
|
||||
}
|
||||
cfg.Logger.Warn(
|
||||
"consistent index was never saved",
|
||||
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
|
||||
}
|
||||
|
||||
if !cfg.ForceNewCluster {
|
||||
b.id, cl, b.n, b.s, b.w = restartNode(cfg, snapshot)
|
||||
} else {
|
||||
b.id, cl, b.n, b.s, b.w = restartAsStandaloneNode(cfg, snapshot)
|
||||
}
|
||||
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
cl.Recover(api.UpdateCapability)
|
||||
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||
bepath := cfg.BackendPath()
|
||||
os.RemoveAll(bepath)
|
||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||
}
|
||||
b.cl = cl
|
||||
|
||||
b, err = boostrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
|
||||
default:
|
||||
be.Close()
|
||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
||||
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||
}
|
||||
b.remotes = remotes
|
||||
b.prt = prt
|
||||
b.ci = ci
|
||||
b.st = st
|
||||
@ -595,6 +444,176 @@ func boostrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Con
|
||||
return be, ci, beExist, beHooks, nil
|
||||
}
|
||||
|
||||
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) {
|
||||
if err := cfg.VerifyJoinExisting(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
|
||||
if gerr != nil {
|
||||
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
|
||||
}
|
||||
if err := membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
|
||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||
}
|
||||
if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
|
||||
return nil, fmt.Errorf("incompatible with current running cluster")
|
||||
}
|
||||
|
||||
remotes := existingCluster.Members()
|
||||
cl.SetID(types.ID(0), existingCluster.ID())
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
id, n, s, w := startNode(cfg, cl, nil)
|
||||
cl.SetID(id, existingCluster.ID())
|
||||
return &boostrapResult{
|
||||
cl: cl,
|
||||
remotes: remotes,
|
||||
w: w,
|
||||
n: n,
|
||||
s: s,
|
||||
id: id,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) {
|
||||
if err := cfg.VerifyBootstrap(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := cl.MemberByName(cfg.Name)
|
||||
if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) {
|
||||
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
||||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
var str string
|
||||
str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
|
||||
if err != nil {
|
||||
return nil, &DiscoveryError{Op: "join", Err: err}
|
||||
}
|
||||
var urlsmap types.URLsMap
|
||||
urlsmap, err = types.NewURLsMap(str)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if config.CheckDuplicateURL(urlsmap) {
|
||||
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
|
||||
}
|
||||
if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
id, n, s, w := startNode(cfg, cl, cl.MemberIDs())
|
||||
cl.SetID(id, cl.ID())
|
||||
return &boostrapResult{
|
||||
cl: cl,
|
||||
remotes: nil,
|
||||
w: w,
|
||||
n: n,
|
||||
s: s,
|
||||
id: id,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*boostrapResult, error) {
|
||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
|
||||
}
|
||||
|
||||
if cfg.ShouldDiscover() {
|
||||
cfg.Logger.Warn(
|
||||
"discovery token is ignored since cluster already initialized; valid logs are found",
|
||||
zap.String("wal-dir", cfg.WALDir()),
|
||||
)
|
||||
}
|
||||
|
||||
// Find a snapshot to start/restart a raft node
|
||||
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
|
||||
// wal log entries
|
||||
snapshot, err := ss.LoadNewestAvailable(walSnaps)
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if snapshot != nil {
|
||||
if err = st.Recovery(snapshot.Data); err != nil {
|
||||
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
|
||||
}
|
||||
|
||||
if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
|
||||
cfg.Logger.Error("illegal v2store content", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg.Logger.Info(
|
||||
"recovered v2 store from snapshot",
|
||||
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
|
||||
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
|
||||
)
|
||||
|
||||
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
|
||||
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
|
||||
}
|
||||
s1, s2 := be.Size(), be.SizeInUse()
|
||||
cfg.Logger.Info(
|
||||
"recovered v3 backend from snapshot",
|
||||
zap.Int64("backend-size-bytes", s1),
|
||||
zap.String("backend-size", humanize.Bytes(uint64(s1))),
|
||||
zap.Int64("backend-size-in-use-bytes", s2),
|
||||
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
|
||||
)
|
||||
if beExist {
|
||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||
// etcd from pre-3.0 release.
|
||||
kvindex := ci.ConsistentIndex()
|
||||
if kvindex < snapshot.Metadata.Index {
|
||||
if kvindex != 0 {
|
||||
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
|
||||
}
|
||||
cfg.Logger.Warn(
|
||||
"consistent index was never saved",
|
||||
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
|
||||
}
|
||||
|
||||
r := &boostrapResult{}
|
||||
if !cfg.ForceNewCluster {
|
||||
r.id, r.cl, r.n, r.s, r.w = restartNode(cfg, snapshot)
|
||||
} else {
|
||||
r.id, r.cl, r.n, r.s, r.w = restartAsStandaloneNode(cfg, snapshot)
|
||||
}
|
||||
|
||||
r.cl.SetStore(st)
|
||||
r.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
r.cl.Recover(api.UpdateCapability)
|
||||
if r.cl.Version() != nil && !r.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||
bepath := cfg.BackendPath()
|
||||
os.RemoveAll(bepath)
|
||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||
// configuration is considered static for the lifetime of the EtcdServer.
|
||||
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user