diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e6ee52cfa..37f50cf48 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -330,9 +330,7 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) { bh.confStateDirty = true } -// NewServer creates a new EtcdServer from the supplied configuration. The -// configuration is considered static for the lifetime of the EtcdServer. -func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { +func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) { st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) var ( @@ -408,7 +406,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } var ( - remotes []*membership.Member + remotes []*membership.Member ) switch { @@ -531,7 +529,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. - if kvindex < snapshot.Metadata.Index { + if kvindex < snapshot.Metadata.Index { if kvindex != 0 { return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index) } @@ -566,9 +564,53 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.Errorf("cannot access member directory: %v", terr) } + return &boostrapResult{ + cl: cl, + remotes: remotes, + w: w, + n: n, + s: s, + id: id, + prt: prt, + ci: ci, + st: st, + be: be, + ss: ss, + beHooks: beHooks, + }, nil +} - sstats := stats.NewServerStats(cfg.Name, id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, id.String()) +type boostrapResult struct { + cl *membership.RaftCluster + remotes []*membership.Member + w *wal.WAL + n raft.Node + s *raft.MemoryStorage + id types.ID + prt http.RoundTripper + ci cindex.ConsistentIndexer + st v2store.Store + be backend.Backend + ss *snap.Snapshotter + beHooks *backendHooks +} + +// NewServer creates a new EtcdServer from the supplied configuration. The +// configuration is considered static for the lifetime of the EtcdServer. +func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { + b, err := bootstrap(cfg) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + b.be.Close() + } + }() + + sstats := stats.NewServerStats(cfg.Name, b.id.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.id.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -577,36 +619,36 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { lgMu: new(sync.RWMutex), lg: cfg.Logger, errorc: make(chan error, 1), - v2store: st, - snapshotter: ss, + v2store: b.st, + snapshotter: b.ss, r: *newRaftNode( raftNodeConfig{ lg: cfg.Logger, - isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, - Node: n, + isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, + Node: b.n, heartbeat: heartbeat, - raftStorage: s, - storage: NewStorage(w, ss), + raftStorage: b.s, + storage: NewStorage(b.w, b.ss), }, ), - id: id, + id: b.id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: cl, + cluster: b.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), - peerRt: prt, - reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), + peerRt: b.prt, + reqIDGen: idutil.NewGenerator(uint16(b.id), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: ci, + consistIndex: b.ci, firstCommitInTermC: make(chan struct{}), } - serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.id.String()}).Set(1) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) - srv.be = be - srv.beHooks = beHooks + srv.be = b.be + srv.beHooks = b.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. @@ -634,7 +676,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) - srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) newSrv := srv // since srv == nil in defer if srv is returned as nil @@ -671,11 +712,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: id, + ID: b.id, URLs: cfg.PeerURLs, - ClusterID: cl.ID(), + ClusterID: b.cl.ID(), Raft: srv, - Snapshotter: ss, + Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats, ErrorC: srv.errorc, @@ -684,13 +725,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } // add all remotes into transport - for _, m := range remotes { - if m.ID != id { + for _, m := range b.remotes { + if m.ID != b.id { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range cl.Members() { - if m.ID != id { + for _, m := range b.cl.Members() { + if m.ID != b.id { tr.AddPeer(m.ID, m.PeerURLs) } }