server: Move cluster backend setting up the call hierarchy

This commit is contained in:
Marek Siarkowicz 2021-07-21 15:20:56 +02:00
parent 049e2d6ec0
commit d3abf774ea

View File

@ -247,19 +247,19 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp
switch { switch {
case !haveWAL && !cfg.NewCluster: case !haveWAL && !cfg.NewCluster:
c, err = bootstrapExistingClusterNoWAL(cfg, prt, storage.st, storage.backend.be) c, err = bootstrapExistingClusterNoWAL(cfg, prt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
case !haveWAL && cfg.NewCluster: case !haveWAL && cfg.NewCluster:
c, err = bootstrapNewClusterNoWAL(cfg, prt, storage.st, storage.backend.be) c, err = bootstrapNewClusterNoWAL(cfg, prt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID) c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
case haveWAL: case haveWAL:
c, err = bootstrapClusterWithWAL(cfg, storage, bwal.meta) c, err = bootstrapClusterWithWAL(cfg, bwal.meta)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -267,10 +267,20 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp
default: default:
return nil, fmt.Errorf("unsupported bootstrap config") return nil, fmt.Errorf("unsupported bootstrap config")
} }
c.cl.SetStore(storage.st)
c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be))
if haveWAL {
c.cl.Recover(api.UpdateCapability)
if c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist {
bepath := cfg.BackendPath()
os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
}
}
return c, nil return c, nil
} }
func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) {
if err := cfg.VerifyJoinExisting(); err != nil { if err := cfg.VerifyJoinExisting(); err != nil {
return nil, err return nil, err
} }
@ -291,8 +301,6 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
remotes := existingCluster.Members() remotes := existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID()) cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
member := cl.MemberByName(cfg.Name) member := cl.MemberByName(cfg.Name)
return &bootstrapedCluster{ return &bootstrapedCluster{
remotes: remotes, remotes: remotes,
@ -301,7 +309,7 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
}, nil }, nil
} }
func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrapedCluster, error) { func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) {
if err := cfg.VerifyBootstrap(); err != nil { if err := cfg.VerifyBootstrap(); err != nil {
return nil, err return nil, err
} }
@ -331,17 +339,14 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
return nil, err return nil, err
} }
} }
cl.SetStore(st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be))
member := cl.MemberByName(cfg.Name)
return &bootstrapedCluster{ return &bootstrapedCluster{
remotes: nil, remotes: nil,
cl: cl, cl: cl,
nodeID: member.ID, nodeID: m.ID,
}, nil }, nil
} }
func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStorage, meta *snapshotMetadata) (*bootstrapedCluster, error) { func bootstrapClusterWithWAL(cfg config.ServerConfig, meta *snapshotMetadata) (*bootstrapedCluster, error) {
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err) return nil, fmt.Errorf("cannot write to member directory: %v", err)
} }
@ -354,14 +359,6 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora
} }
cl := membership.NewCluster(cfg.Logger) cl := membership.NewCluster(cfg.Logger)
cl.SetID(meta.nodeID, meta.clusterID) cl.SetID(meta.nodeID, meta.clusterID)
cl.SetStore(storage.st)
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be))
cl.Recover(api.UpdateCapability)
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist {
bepath := cfg.BackendPath()
os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
}
return &bootstrapedCluster{ return &bootstrapedCluster{
cl: cl, cl: cl,
nodeID: meta.nodeID, nodeID: meta.nodeID,