mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Add more hierarchy bootstap introducing a separate storage bootstrap step
This commit is contained in:
parent
6c8a4fdcc5
commit
aa0c050003
@ -49,7 +49,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
||||||
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
|
||||||
|
|
||||||
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
|
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
|
||||||
cfg.Logger.Warn(
|
cfg.Logger.Warn(
|
||||||
@ -64,9 +63,49 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
|||||||
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
|
if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
|
||||||
return nil, fmt.Errorf("cannot access data directory: %v", terr)
|
return nil, fmt.Errorf("cannot access data directory: %v", terr)
|
||||||
}
|
}
|
||||||
|
|
||||||
haveWAL := wal.Exist(cfg.WALDir())
|
|
||||||
ss := bootstrapSnapshot(cfg)
|
ss := bootstrapSnapshot(cfg)
|
||||||
|
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
||||||
|
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||||
|
}
|
||||||
|
|
||||||
|
storage, err := bootstrapStorage(cfg, ss, prt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &bootstrappedServer{
|
||||||
|
prt: prt,
|
||||||
|
ss: ss,
|
||||||
|
storage: storage,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type bootstrappedServer struct {
|
||||||
|
storage *bootstrappedStorage
|
||||||
|
prt http.RoundTripper
|
||||||
|
ss *snap.Snapshotter
|
||||||
|
}
|
||||||
|
|
||||||
|
type bootstrappedStorage struct {
|
||||||
|
cluster *bootstrapedCluster
|
||||||
|
beHooks *serverstorage.BackendHooks
|
||||||
|
st v2store.Store
|
||||||
|
be backend.Backend
|
||||||
|
ci cindex.ConsistentIndexer
|
||||||
|
}
|
||||||
|
|
||||||
|
type bootstrapedCluster struct {
|
||||||
|
raft *bootstrappedRaft
|
||||||
|
remotes []*membership.Member
|
||||||
|
}
|
||||||
|
|
||||||
|
func bootstrapStorage(cfg config.ServerConfig, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) {
|
||||||
|
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||||
|
haveWAL := wal.Exist(cfg.WALDir())
|
||||||
|
|
||||||
be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
|
be, ci, beExist, beHooks, err := bootstrapBackend(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -77,19 +116,14 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
|||||||
be.Close()
|
be.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
var c *bootstrapedCluster
|
||||||
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case !haveWAL && !cfg.NewCluster:
|
case !haveWAL && !cfg.NewCluster:
|
||||||
b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be)
|
c, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be)
|
||||||
case !haveWAL && cfg.NewCluster:
|
case !haveWAL && cfg.NewCluster:
|
||||||
b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
|
c, err = bootstrapNewClusterNoWAL(cfg, prt, st, be)
|
||||||
case haveWAL:
|
case haveWAL:
|
||||||
b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
|
c, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci)
|
||||||
default:
|
default:
|
||||||
be.Close()
|
be.Close()
|
||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||||
@ -97,28 +131,13 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return &bootstrappedStorage{
|
||||||
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
cluster: c,
|
||||||
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
beHooks: beHooks,
|
||||||
}
|
st: st,
|
||||||
b.prt = prt
|
be: be,
|
||||||
b.ci = ci
|
ci: ci,
|
||||||
b.st = st
|
}, nil
|
||||||
b.be = be
|
|
||||||
b.ss = ss
|
|
||||||
b.beHooks = beHooks
|
|
||||||
return b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type bootstrappedServer struct {
|
|
||||||
raft *bootstrappedRaft
|
|
||||||
remotes []*membership.Member
|
|
||||||
prt http.RoundTripper
|
|
||||||
ci cindex.ConsistentIndexer
|
|
||||||
st v2store.Store
|
|
||||||
be backend.Backend
|
|
||||||
ss *snap.Snapshotter
|
|
||||||
beHooks *serverstorage.BackendHooks
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
|
func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
|
||||||
@ -192,7 +211,7 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
|||||||
return be.Defrag()
|
return be.Defrag()
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) {
|
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) {
|
||||||
if err := cfg.VerifyJoinExisting(); err != nil {
|
if err := cfg.VerifyJoinExisting(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -217,13 +236,13 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
|
|||||||
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
||||||
br := bootstrapRaftFromCluster(cfg, cl, nil)
|
br := bootstrapRaftFromCluster(cfg, cl, nil)
|
||||||
cl.SetID(br.wal.id, existingCluster.ID())
|
cl.SetID(br.wal.id, existingCluster.ID())
|
||||||
return &bootstrappedServer{
|
return &bootstrapedCluster{
|
||||||
raft: br,
|
raft: br,
|
||||||
remotes: remotes,
|
remotes: remotes,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) {
|
func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) {
|
||||||
if err := cfg.VerifyBootstrap(); err != nil {
|
if err := cfg.VerifyBootstrap(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -257,13 +276,13 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
|
|||||||
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
||||||
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs())
|
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs())
|
||||||
cl.SetID(br.wal.id, cl.ID())
|
cl.SetID(br.wal.id, cl.ID())
|
||||||
return &bootstrappedServer{
|
return &bootstrapedCluster{
|
||||||
remotes: nil,
|
remotes: nil,
|
||||||
raft: br,
|
raft: br,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) {
|
func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrapedCluster, error) {
|
||||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||||
}
|
}
|
||||||
@ -336,22 +355,22 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
|
|||||||
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
|
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &bootstrappedServer{}
|
b := &bootstrapedCluster{}
|
||||||
if !cfg.ForceNewCluster {
|
if !cfg.ForceNewCluster {
|
||||||
r.raft = bootstrapRaftFromWal(cfg, snapshot)
|
b.raft = bootstrapRaftFromWal(cfg, snapshot)
|
||||||
} else {
|
} else {
|
||||||
r.raft = bootstrapRaftFromWalStandalone(cfg, snapshot)
|
b.raft = bootstrapRaftFromWalStandalone(cfg, snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.raft.cl.SetStore(st)
|
b.raft.cl.SetStore(st)
|
||||||
r.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
|
||||||
r.raft.cl.Recover(api.UpdateCapability)
|
b.raft.cl.Recover(api.UpdateCapability)
|
||||||
if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||||
bepath := cfg.BackendPath()
|
bepath := cfg.BackendPath()
|
||||||
os.RemoveAll(bepath)
|
os.RemoveAll(bepath)
|
||||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||||
}
|
}
|
||||||
return r, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft {
|
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft {
|
||||||
|
@ -304,12 +304,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.be.Close()
|
b.storage.be.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String())
|
sstats := stats.NewServerStats(cfg.Name, b.storage.cluster.raft.wal.id.String())
|
||||||
lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String())
|
lstats := stats.NewLeaderStats(cfg.Logger, b.storage.cluster.raft.wal.id.String())
|
||||||
|
|
||||||
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
|
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
|
||||||
srv = &EtcdServer{
|
srv = &EtcdServer{
|
||||||
@ -318,28 +318,28 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
lgMu: new(sync.RWMutex),
|
lgMu: new(sync.RWMutex),
|
||||||
lg: cfg.Logger,
|
lg: cfg.Logger,
|
||||||
errorc: make(chan error, 1),
|
errorc: make(chan error, 1),
|
||||||
v2store: b.st,
|
v2store: b.storage.st,
|
||||||
snapshotter: b.ss,
|
snapshotter: b.ss,
|
||||||
r: *b.raft.newRaftNode(b.ss),
|
r: *b.storage.cluster.raft.newRaftNode(b.ss),
|
||||||
id: b.raft.wal.id,
|
id: b.storage.cluster.raft.wal.id,
|
||||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||||
cluster: b.raft.cl,
|
cluster: b.storage.cluster.raft.cl,
|
||||||
stats: sstats,
|
stats: sstats,
|
||||||
lstats: lstats,
|
lstats: lstats,
|
||||||
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
||||||
peerRt: b.prt,
|
peerRt: b.prt,
|
||||||
reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()),
|
reqIDGen: idutil.NewGenerator(uint16(b.storage.cluster.raft.wal.id), time.Now()),
|
||||||
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
||||||
consistIndex: b.ci,
|
consistIndex: b.storage.ci,
|
||||||
firstCommitInTerm: notify.NewNotifier(),
|
firstCommitInTerm: notify.NewNotifier(),
|
||||||
clusterVersionChanged: notify.NewNotifier(),
|
clusterVersionChanged: notify.NewNotifier(),
|
||||||
}
|
}
|
||||||
serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1)
|
serverID.With(prometheus.Labels{"server_id": b.storage.cluster.raft.wal.id.String()}).Set(1)
|
||||||
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
|
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
|
||||||
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
|
||||||
|
|
||||||
srv.be = b.be
|
srv.be = b.storage.be
|
||||||
srv.beHooks = b.beHooks
|
srv.beHooks = b.storage.beHooks
|
||||||
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
|
||||||
|
|
||||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||||
@ -403,9 +403,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
Logger: cfg.Logger,
|
Logger: cfg.Logger,
|
||||||
TLSInfo: cfg.PeerTLSInfo,
|
TLSInfo: cfg.PeerTLSInfo,
|
||||||
DialTimeout: cfg.PeerDialTimeout(),
|
DialTimeout: cfg.PeerDialTimeout(),
|
||||||
ID: b.raft.wal.id,
|
ID: b.storage.cluster.raft.wal.id,
|
||||||
URLs: cfg.PeerURLs,
|
URLs: cfg.PeerURLs,
|
||||||
ClusterID: b.raft.cl.ID(),
|
ClusterID: b.storage.cluster.raft.cl.ID(),
|
||||||
Raft: srv,
|
Raft: srv,
|
||||||
Snapshotter: b.ss,
|
Snapshotter: b.ss,
|
||||||
ServerStats: sstats,
|
ServerStats: sstats,
|
||||||
@ -416,13 +416,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// add all remotes into transport
|
// add all remotes into transport
|
||||||
for _, m := range b.remotes {
|
for _, m := range b.storage.cluster.remotes {
|
||||||
if m.ID != b.raft.wal.id {
|
if m.ID != b.storage.cluster.raft.wal.id {
|
||||||
tr.AddRemote(m.ID, m.PeerURLs)
|
tr.AddRemote(m.ID, m.PeerURLs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, m := range b.raft.cl.Members() {
|
for _, m := range b.storage.cluster.raft.cl.Members() {
|
||||||
if m.ID != b.raft.wal.id {
|
if m.ID != b.storage.cluster.raft.wal.id {
|
||||||
tr.AddPeer(m.ID, m.PeerURLs)
|
tr.AddPeer(m.ID, m.PeerURLs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user