etcdserver: Extract etcdserver boostrap function

This commit is contained in:
Marek Siarkowicz 2021-07-07 15:40:54 +02:00
parent 2db193fda1
commit 120cd5abe2

View File

@ -330,9 +330,7 @@ func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
bh.confStateDirty = true bh.confStateDirty = true
} }
// NewServer creates a new EtcdServer from the supplied configuration. The func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) {
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
var ( var (
@ -566,9 +564,53 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr) return nil, fmt.Errorf("cannot access member directory: %v", terr)
} }
return &boostrapResult{
cl: cl,
remotes: remotes,
w: w,
n: n,
s: s,
id: id,
prt: prt,
ci: ci,
st: st,
be: be,
ss: ss,
beHooks: beHooks,
}, nil
}
sstats := stats.NewServerStats(cfg.Name, id.String()) type boostrapResult struct {
lstats := stats.NewLeaderStats(cfg.Logger, id.String()) cl *membership.RaftCluster
remotes []*membership.Member
w *wal.WAL
n raft.Node
s *raft.MemoryStorage
id types.ID
prt http.RoundTripper
ci cindex.ConsistentIndexer
st v2store.Store
be backend.Backend
ss *snap.Snapshotter
beHooks *backendHooks
}
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
b, err := bootstrap(cfg)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
b.be.Close()
}
}()
sstats := stats.NewServerStats(cfg.Name, b.id.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.id.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{ srv = &EtcdServer{
@ -577,36 +619,36 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
lgMu: new(sync.RWMutex), lgMu: new(sync.RWMutex),
lg: cfg.Logger, lg: cfg.Logger,
errorc: make(chan error, 1), errorc: make(chan error, 1),
v2store: st, v2store: b.st,
snapshotter: ss, snapshotter: b.ss,
r: *newRaftNode( r: *newRaftNode(
raftNodeConfig{ raftNodeConfig{
lg: cfg.Logger, lg: cfg.Logger,
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) },
Node: n, Node: b.n,
heartbeat: heartbeat, heartbeat: heartbeat,
raftStorage: s, raftStorage: b.s,
storage: NewStorage(w, ss), storage: NewStorage(b.w, b.ss),
}, },
), ),
id: id, id: b.id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl, cluster: b.cl,
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond), SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt, peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), reqIDGen: idutil.NewGenerator(uint16(b.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: ci, consistIndex: b.ci,
firstCommitInTermC: make(chan struct{}), firstCommitInTermC: make(chan struct{}),
} }
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) serverID.With(prometheus.Labels{"server_id": b.id.String()}).Set(1)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = be srv.be = b.be
srv.beHooks = beHooks srv.beHooks = b.beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
@ -634,7 +676,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost)) srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil newSrv := srv // since srv == nil in defer if srv is returned as nil
@ -671,11 +712,11 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Logger: cfg.Logger, Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo, TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(), DialTimeout: cfg.PeerDialTimeout(),
ID: id, ID: b.id,
URLs: cfg.PeerURLs, URLs: cfg.PeerURLs,
ClusterID: cl.ID(), ClusterID: b.cl.ID(),
Raft: srv, Raft: srv,
Snapshotter: ss, Snapshotter: b.ss,
ServerStats: sstats, ServerStats: sstats,
LeaderStats: lstats, LeaderStats: lstats,
ErrorC: srv.errorc, ErrorC: srv.errorc,
@ -684,13 +725,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, err return nil, err
} }
// add all remotes into transport // add all remotes into transport
for _, m := range remotes { for _, m := range b.remotes {
if m.ID != id { if m.ID != b.id {
tr.AddRemote(m.ID, m.PeerURLs) tr.AddRemote(m.ID, m.PeerURLs)
} }
} }
for _, m := range cl.Members() { for _, m := range b.cl.Members() {
if m.ID != id { if m.ID != b.id {
tr.AddPeer(m.ID, m.PeerURLs) tr.AddPeer(m.ID, m.PeerURLs)
} }
} }