diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 26f478583..4b1859487 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -77,25 +77,33 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { if err != nil { return nil, err } + + cluster, err := bootstrapCluster(cfg, storage, prt, ss) + if err != nil { + storage.be.Close() + return nil, err + } return &bootstrappedServer{ prt: prt, ss: ss, storage: storage, + cluster: cluster, }, nil } type bootstrappedServer struct { storage *bootstrappedStorage + cluster *bootstrapedCluster prt http.RoundTripper ss *snap.Snapshotter } type bootstrappedStorage struct { - cluster *bootstrapedCluster beHooks *serverstorage.BackendHooks st v2store.Store be backend.Backend ci cindex.ConsistentIndexer + beExist bool } type bootstrapedCluster struct { @@ -117,38 +125,17 @@ type bootstrappedRaft struct { func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) - haveWAL := wal.Exist(cfg.WALDir()) be, ci, beExist, beHooks, err := bootstrapBackend(cfg) if err != nil { return nil, err } - defer func() { - if err != nil { - be.Close() - } - }() - var c *bootstrapedCluster - switch { - case !haveWAL && !cfg.NewCluster: - c, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) - case !haveWAL && cfg.NewCluster: - c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) - case haveWAL: - c, err = bootstrapClusterWithWAL(cfg, st, be, ss, beExist, beHooks, ci) - default: - be.Close() - return nil, fmt.Errorf("unsupported bootstrap config") - } - if err != nil { - return nil, err - } return &bootstrappedStorage{ - cluster: c, beHooks: beHooks, st: st, be: be, ci: ci, + beExist: beExist, }, nil } @@ -223,6 +210,24 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { return be.Defrag() } +func bootstrapCluster(cfg config.ServerConfig, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) { + haveWAL := wal.Exist(cfg.WALDir()) + switch { + case !haveWAL && !cfg.NewCluster: + c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.be) + case !haveWAL && cfg.NewCluster: + c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.be) + case haveWAL: + c, err = bootstrapClusterWithWAL(cfg, storage, ss) + default: + return nil, fmt.Errorf("unsupported bootstrap config") + } + if err != nil { + return nil, err + } + return c, nil +} + func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err @@ -304,7 +309,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st }, nil } -func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } @@ -333,11 +338,11 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe } if snapshot != nil { - if err = st.Recovery(snapshot.Data); err != nil { + if err = storage.st.Recovery(snapshot.Data); err != nil { cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) } - if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { + if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, storage.st, cfg.V2Deprecation); err != nil { cfg.Logger.Error("illegal v2store content", zap.Error(err)) return nil, err } @@ -348,10 +353,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), ) - if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { + if storage.be, err = serverstorage.RecoverSnapshotBackend(cfg, storage.be, *snapshot, storage.beExist, storage.beHooks); err != nil { cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } - s1, s2 := be.Size(), be.SizeInUse() + s1, s2 := storage.be.Size(), storage.be.SizeInUse() cfg.Logger.Info( "recovered v3 backend from snapshot", zap.Int64("backend-size-bytes", s1), @@ -359,10 +364,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe zap.Int64("backend-size-in-use-bytes", s2), zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), ) - if beExist { + if storage.beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. - kvindex := ci.ConsistentIndex() + kvindex := storage.ci.ConsistentIndex() if kvindex < snapshot.Metadata.Index { if kvindex != 0 { return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) @@ -406,10 +411,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe } b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) - b.raft.cl.SetStore(st) - b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) + b.raft.cl.SetStore(storage.st) + b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.be)) b.raft.cl.Recover(api.UpdateCapability) - if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { + if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.beExist { bepath := cfg.BackendPath() os.RemoveAll(bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 8923fd835..d99491c05 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() - sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.nodeID.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.nodeID.String()) + sstats := stats.NewServerStats(cfg.Name, b.cluster.nodeID.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -320,21 +320,21 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { errorc: make(chan error, 1), v2store: b.storage.st, snapshotter: b.ss, - r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w), - id: b.storage.cluster.nodeID, + r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w), + id: b.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.storage.cluster.raft.cl, + cluster: b.cluster.raft.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.nodeID), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, consistIndex: b.storage.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } - serverID.With(prometheus.Labels{"server_id": b.storage.cluster.nodeID.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) @@ -403,9 +403,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.storage.cluster.nodeID, + ID: b.cluster.nodeID, URLs: cfg.PeerURLs, - ClusterID: b.storage.cluster.raft.cl.ID(), + ClusterID: b.cluster.raft.cl.ID(), Raft: srv, Snapshotter: b.ss, ServerStats: sstats, @@ -416,13 +416,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } // add all remotes into transport - for _, m := range b.storage.cluster.remotes { - if m.ID != b.storage.cluster.nodeID { + for _, m := range b.cluster.remotes { + if m.ID != b.cluster.nodeID { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range b.storage.cluster.raft.cl.Members() { - if m.ID != b.storage.cluster.nodeID { + for _, m := range b.cluster.raft.cl.Members() { + if m.ID != b.cluster.nodeID { tr.AddPeer(m.ID, m.PeerURLs) } }