diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ef47a9536..1faefabba 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -331,13 +331,8 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) { } func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { - b = &boostrapResult{} st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) - var ( - cl *membership.RaftCluster - ) - if cfg.MaxRequestBytes > recommendedMaxRequestBytes { cfg.Logger.Warn( "exceeded recommended request limit", @@ -388,171 +383,25 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { if err != nil { return nil, err } - var ( - remotes []*membership.Member - ) switch { case !haveWAL && !cfg.NewCluster: - if err = cfg.VerifyJoinExisting(); err != nil { - return nil, err - } - cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) - if err != nil { - return nil, err - } - existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt) - if gerr != nil { - return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) - } - if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil { - return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) - } - if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) { - return nil, fmt.Errorf("incompatible with current running cluster") - } - - remotes = existingCluster.Members() - cl.SetID(types.ID(0), existingCluster.ID()) - cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - b.id, b.n, b.s, b.w = startNode(cfg, cl, nil) - cl.SetID(b.id, existingCluster.ID()) - b.cl = cl - + b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) case !haveWAL && cfg.NewCluster: - if err = cfg.VerifyBootstrap(); err != nil { - return nil, err - } - cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) - if err != nil { - return nil, err - } - m := cl.MemberByName(cfg.Name) - if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) { - return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) - } - if cfg.ShouldDiscover() { - var str string - str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) - if err != nil { - return nil, &DiscoveryError{Op: "join", Err: err} - } - var urlsmap types.URLsMap - urlsmap, err = types.NewURLsMap(str) - if err != nil { - return nil, err - } - if config.CheckDuplicateURL(urlsmap) { - return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) - } - if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil { - return nil, err - } - } - cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - b.id, b.n, b.s, b.w = startNode(cfg, cl, cl.MemberIDs()) - cl.SetID(b.id, cl.ID()) - b.cl = cl - + b, err = boostrapNewClusterNoWAL(cfg, prt, st, be) case haveWAL: - if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { - return nil, fmt.Errorf("cannot write to member directory: %v", err) - } - - if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { - return nil, fmt.Errorf("cannot write to WAL directory: %v", err) - } - - if cfg.ShouldDiscover() { - cfg.Logger.Warn( - "discovery token is ignored since cluster already initialized; valid logs are found", - zap.String("wal-dir", cfg.WALDir()), - ) - } - - // Find a snapshot to start/restart a raft node - walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) - if err != nil { - return nil, err - } - // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding - // wal log entries - snapshot, err := ss.LoadNewestAvailable(walSnaps) - if err != nil && err != snap.ErrNoSnapshot { - return nil, err - } - - if snapshot != nil { - if err = st.Recovery(snapshot.Data); err != nil { - cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) - } - - if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { - cfg.Logger.Error("illegal v2store content", zap.Error(err)) - return nil, err - } - - cfg.Logger.Info( - "recovered v2 store from snapshot", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), - ) - - if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { - cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) - } - s1, s2 := be.Size(), be.SizeInUse() - cfg.Logger.Info( - "recovered v3 backend from snapshot", - zap.Int64("backend-size-bytes", s1), - zap.String("backend-size", humanize.Bytes(uint64(s1))), - zap.Int64("backend-size-in-use-bytes", s2), - zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), - ) - if beExist { - // TODO: remove kvindex != 0 checking when we do not expect users to upgrade - // etcd from pre-3.0 release. - kvindex := ci.ConsistentIndex() - if kvindex < snapshot.Metadata.Index { - if kvindex != 0 { - return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) - } - cfg.Logger.Warn( - "consistent index was never saved", - zap.Uint64("snapshot-index", snapshot.Metadata.Index), - ) - } - } - } else { - cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") - } - - if !cfg.ForceNewCluster { - b.id, cl, b.n, b.s, b.w = restartNode(cfg, snapshot) - } else { - b.id, cl, b.n, b.s, b.w = restartAsStandaloneNode(cfg, snapshot) - } - - cl.SetStore(st) - cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) - cl.Recover(api.UpdateCapability) - if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { - bepath := cfg.BackendPath() - os.RemoveAll(bepath) - return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) - } - b.cl = cl - + b, err = boostrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) default: + be.Close() return nil, fmt.Errorf("unsupported bootstrap config") } + if err != nil { + return nil, err + } if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.Errorf("cannot access member directory: %v", terr) } - b.remotes = remotes b.prt = prt b.ci = ci b.st = st @@ -595,6 +444,176 @@ func boostrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Con return be, ci, beExist, beHooks, nil } +func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) { + if err := cfg.VerifyJoinExisting(); err != nil { + return nil, err + } + cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + if err != nil { + return nil, err + } + existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt) + if gerr != nil { + return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) + } + if err := membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil { + return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) + } + if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) { + return nil, fmt.Errorf("incompatible with current running cluster") + } + + remotes := existingCluster.Members() + cl.SetID(types.ID(0), existingCluster.ID()) + cl.SetStore(st) + cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + id, n, s, w := startNode(cfg, cl, nil) + cl.SetID(id, existingCluster.ID()) + return &boostrapResult{ + cl: cl, + remotes: remotes, + w: w, + n: n, + s: s, + id: id, + }, nil +} + +func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*boostrapResult, error) { + if err := cfg.VerifyBootstrap(); err != nil { + return nil, err + } + cl, err := membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) + if err != nil { + return nil, err + } + m := cl.MemberByName(cfg.Name) + if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) { + return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) + } + if cfg.ShouldDiscover() { + var str string + str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) + if err != nil { + return nil, &DiscoveryError{Op: "join", Err: err} + } + var urlsmap types.URLsMap + urlsmap, err = types.NewURLsMap(str) + if err != nil { + return nil, err + } + if config.CheckDuplicateURL(urlsmap) { + return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) + } + if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil { + return nil, err + } + } + cl.SetStore(st) + cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + id, n, s, w := startNode(cfg, cl, cl.MemberIDs()) + cl.SetID(id, cl.ID()) + return &boostrapResult{ + cl: cl, + remotes: nil, + w: w, + n: n, + s: s, + id: id, + }, nil +} + +func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*boostrapResult, error) { + if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { + return nil, fmt.Errorf("cannot write to member directory: %v", err) + } + + if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { + return nil, fmt.Errorf("cannot write to WAL directory: %v", err) + } + + if cfg.ShouldDiscover() { + cfg.Logger.Warn( + "discovery token is ignored since cluster already initialized; valid logs are found", + zap.String("wal-dir", cfg.WALDir()), + ) + } + + // Find a snapshot to start/restart a raft node + walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) + if err != nil { + return nil, err + } + // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding + // wal log entries + snapshot, err := ss.LoadNewestAvailable(walSnaps) + if err != nil && err != snap.ErrNoSnapshot { + return nil, err + } + + if snapshot != nil { + if err = st.Recovery(snapshot.Data); err != nil { + cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) + } + + if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { + cfg.Logger.Error("illegal v2store content", zap.Error(err)) + return nil, err + } + + cfg.Logger.Info( + "recovered v2 store from snapshot", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), + ) + + if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { + cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) + } + s1, s2 := be.Size(), be.SizeInUse() + cfg.Logger.Info( + "recovered v3 backend from snapshot", + zap.Int64("backend-size-bytes", s1), + zap.String("backend-size", humanize.Bytes(uint64(s1))), + zap.Int64("backend-size-in-use-bytes", s2), + zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), + ) + if beExist { + // TODO: remove kvindex != 0 checking when we do not expect users to upgrade + // etcd from pre-3.0 release. + kvindex := ci.ConsistentIndex() + if kvindex < snapshot.Metadata.Index { + if kvindex != 0 { + return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) + } + cfg.Logger.Warn( + "consistent index was never saved", + zap.Uint64("snapshot-index", snapshot.Metadata.Index), + ) + } + } + } else { + cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") + } + + r := &boostrapResult{} + if !cfg.ForceNewCluster { + r.id, r.cl, r.n, r.s, r.w = restartNode(cfg, snapshot) + } else { + r.id, r.cl, r.n, r.s, r.w = restartAsStandaloneNode(cfg, snapshot) + } + + r.cl.SetStore(st) + r.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) + r.cl.Recover(api.UpdateCapability) + if r.cl.Version() != nil && !r.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + bepath := cfg.BackendPath() + os.RemoveAll(bepath) + return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) + } + return r, nil +} + // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {