server: Move cluster up the bootstrap hierarchy

This commit is contained in:
Marek Siarkowicz 2021-07-21 12:53:42 +02:00
parent 648bac833f
commit c97ab8f5e0
2 changed files with 51 additions and 46 deletions

View File

@ -77,25 +77,33 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
cluster, err := bootstrapCluster(cfg, storage, prt, ss)
if err != nil {
storage.be.Close()
return nil, err
}
return &bootstrappedServer{ return &bootstrappedServer{
prt: prt, prt: prt,
ss: ss, ss: ss,
storage: storage, storage: storage,
cluster: cluster,
}, nil }, nil
} }
type bootstrappedServer struct { type bootstrappedServer struct {
storage *bootstrappedStorage storage *bootstrappedStorage
cluster *bootstrapedCluster
prt http.RoundTripper prt http.RoundTripper
ss *snap.Snapshotter ss *snap.Snapshotter
} }
type bootstrappedStorage struct { type bootstrappedStorage struct {
cluster *bootstrapedCluster
beHooks *serverstorage.BackendHooks beHooks *serverstorage.BackendHooks
st v2store.Store st v2store.Store
be backend.Backend be backend.Backend
ci cindex.ConsistentIndexer ci cindex.ConsistentIndexer
beExist bool
} }
type bootstrapedCluster struct { type bootstrapedCluster struct {
@ -117,38 +125,17 @@ type bootstrappedRaft struct {
func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) { func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) {
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
haveWAL := wal.Exist(cfg.WALDir())
be, ci, beExist, beHooks, err := bootstrapBackend(cfg) be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer func() {
if err != nil {
be.Close()
}
}()
var c *bootstrapedCluster
switch {
case !haveWAL && !cfg.NewCluster:
c, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be)
case !haveWAL && cfg.NewCluster:
c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
case haveWAL:
c, err = bootstrapClusterWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
default:
be.Close()
return nil, fmt.Errorf("unsupported bootstrap config")
}
if err != nil {
return nil, err
}
return &bootstrappedStorage{ return &bootstrappedStorage{
cluster: c,
beHooks: beHooks, beHooks: beHooks,
st: st, st: st,
be: be, be: be,
ci: ci, ci: ci,
beExist: beExist,
}, nil }, nil
} }
@ -223,6 +210,24 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
return be.Defrag() return be.Defrag()
} }
func bootstrapCluster(cfg config.ServerConfig, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) {
haveWAL := wal.Exist(cfg.WALDir())
switch {
case !haveWAL && !cfg.NewCluster:
c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.be)
case !haveWAL && cfg.NewCluster:
c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.be)
case haveWAL:
c, err = bootstrapClusterWithWAL(cfg, storage, ss)
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
if err != nil {
return nil, err
}
return c, nil
}
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) {
if err := cfg.VerifyJoinExisting(); err != nil { if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err return nil, err
@ -304,7 +309,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
}, nil }, nil
} }
func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) { func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, ss *snap.Snapshotter) (*bootstrapedCluster, error) {
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err) return nil, fmt.Errorf("cannot write to member directory: %v", err)
} }
@ -333,11 +338,11 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe
} }
if snapshot != nil { if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil { if err = storage.st.Recovery(snapshot.Data); err != nil {
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
} }
if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, storage.st, cfg.V2Deprecation); err != nil {
cfg.Logger.Error("illegal v2store content", zap.Error(err)) cfg.Logger.Error("illegal v2store content", zap.Error(err))
return nil, err return nil, err
} }
@ -348,10 +353,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
) )
if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { if storage.be, err = serverstorage.RecoverSnapshotBackend(cfg, storage.be, *snapshot, storage.beExist, storage.beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
} }
s1, s2 := be.Size(), be.SizeInUse() s1, s2 := storage.be.Size(), storage.be.SizeInUse()
cfg.Logger.Info( cfg.Logger.Info(
"recovered v3 backend from snapshot", "recovered v3 backend from snapshot",
zap.Int64("backend-size-bytes", s1), zap.Int64("backend-size-bytes", s1),
@ -359,10 +364,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe
zap.Int64("backend-size-in-use-bytes", s2), zap.Int64("backend-size-in-use-bytes", s2),
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
) )
if beExist { if storage.beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade // TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release. // etcd from pre-3.0 release.
kvindex := ci.ConsistentIndex() kvindex := storage.ci.ConsistentIndex()
if kvindex < snapshot.Metadata.Index { if kvindex < snapshot.Metadata.Index {
if kvindex != 0 { if kvindex != 0 {
return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
@ -406,10 +411,10 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, st v2store.Store, be backe
} }
b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta) b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta)
b.raft.cl.SetStore(st) b.raft.cl.SetStore(storage.st)
b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.be))
b.raft.cl.Recover(api.UpdateCapability) b.raft.cl.Recover(api.UpdateCapability)
if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.beExist {
bepath := cfg.BackendPath() bepath := cfg.BackendPath()
os.RemoveAll(bepath) os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)

View File

@ -308,8 +308,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
} }
}() }()
sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.nodeID.String()) sstats := stats.NewServerStats(cfg.Name, b.cluster.nodeID.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.nodeID.String()) lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{ srv = &EtcdServer{
@ -320,21 +320,21 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
errorc: make(chan error, 1), errorc: make(chan error, 1),
v2store: b.storage.st, v2store: b.storage.st,
snapshotter: b.ss, snapshotter: b.ss,
r: *b.storage.cluster.raft.newRaftNode(b.ss, b.storage.cluster.wal.w), r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w),
id: b.storage.cluster.nodeID, id: b.cluster.nodeID,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.storage.cluster.raft.cl, cluster: b.cluster.raft.cl,
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond), SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt, peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.nodeID), time.Now()), reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.storage.ci, consistIndex: b.storage.ci,
firstCommitInTerm: notify.NewNotifier(), firstCommitInTerm: notify.NewNotifier(),
clusterVersionChanged: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(),
} }
serverID.With(prometheus.Labels{"server_id": b.storage.cluster.nodeID.String()}).Set(1) serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
@ -403,9 +403,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Logger: cfg.Logger, Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo, TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(), DialTimeout: cfg.PeerDialTimeout(),
ID: b.storage.cluster.nodeID, ID: b.cluster.nodeID,
URLs: cfg.PeerURLs, URLs: cfg.PeerURLs,
ClusterID: b.storage.cluster.raft.cl.ID(), ClusterID: b.cluster.raft.cl.ID(),
Raft: srv, Raft: srv,
Snapshotter: b.ss, Snapshotter: b.ss,
ServerStats: sstats, ServerStats: sstats,
@ -416,13 +416,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, err return nil, err
} }
// add all remotes into transport // add all remotes into transport
for _, m := range b.storage.cluster.remotes { for _, m := range b.cluster.remotes {
if m.ID != b.storage.cluster.nodeID { if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs) tr.AddRemote(m.ID, m.PeerURLs)
} }
} }
for _, m := range b.storage.cluster.raft.cl.Members() { for _, m := range b.cluster.raft.cl.Members() {
if m.ID != b.storage.cluster.nodeID { if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs) tr.AddPeer(m.ID, m.PeerURLs)
} }
} }