mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Move wal bootstrap from cluster to storage
This commit is contained in:
parent
d3abf774ea
commit
4884e7d8cf
@ -72,21 +72,42 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
|||||||
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
||||||
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||||
}
|
}
|
||||||
|
|
||||||
haveWAL := wal.Exist(cfg.WALDir())
|
haveWAL := wal.Exist(cfg.WALDir())
|
||||||
s, err := bootstrapStorage(cfg, haveWAL, ss, prt)
|
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||||
|
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
var (
|
||||||
|
bwal *bootstrappedWAL
|
||||||
|
)
|
||||||
|
|
||||||
|
if haveWAL {
|
||||||
|
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
|
||||||
|
}
|
||||||
|
bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster, err := bootstrapCluster(cfg, bwal, prt)
|
||||||
|
if err != nil {
|
||||||
|
backend.be.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
cluster, err := bootstrapCluster(cfg, haveWAL, s, prt, ss)
|
s, err := bootstrapStorage(cfg, st, backend, bwal, cluster)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.backend.be.Close()
|
backend.be.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
raft := bootstrapRaft(cfg, haveWAL, cluster.cl, cluster.wal)
|
|
||||||
if !haveWAL {
|
err = cluster.Finalize(cfg, s)
|
||||||
cluster.cl.SetID(cluster.nodeID, cluster.cl.ID())
|
if err != nil {
|
||||||
|
backend.be.Close()
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
raft := bootstrapRaft(cfg, cluster, s.wal)
|
||||||
return &bootstrappedServer{
|
return &bootstrappedServer{
|
||||||
prt: prt,
|
prt: prt,
|
||||||
ss: ss,
|
ss: ss,
|
||||||
@ -106,6 +127,7 @@ type bootstrappedServer struct {
|
|||||||
|
|
||||||
type bootstrappedStorage struct {
|
type bootstrappedStorage struct {
|
||||||
backend *bootstrappedBackend
|
backend *bootstrappedBackend
|
||||||
|
wal *bootstrappedWAL
|
||||||
st v2store.Store
|
st v2store.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,7 +141,6 @@ type bootstrappedBackend struct {
|
|||||||
|
|
||||||
type bootstrapedCluster struct {
|
type bootstrapedCluster struct {
|
||||||
remotes []*membership.Member
|
remotes []*membership.Member
|
||||||
wal *bootstrappedWAL
|
|
||||||
cl *membership.RaftCluster
|
cl *membership.RaftCluster
|
||||||
nodeID types.ID
|
nodeID types.ID
|
||||||
}
|
}
|
||||||
@ -133,17 +154,15 @@ type bootstrappedRaft struct {
|
|||||||
storage *raft.MemoryStorage
|
storage *raft.MemoryStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapStorage(cfg config.ServerConfig, haveWAL bool, ss *snap.Snapshotter, prt http.RoundTripper) (b *bootstrappedStorage, err error) {
|
func bootstrapStorage(cfg config.ServerConfig, st v2store.Store, be *bootstrappedBackend, wal *bootstrappedWAL, cl *bootstrapedCluster) (b *bootstrappedStorage, err error) {
|
||||||
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
|
if wal == nil {
|
||||||
|
wal = bootstrapNewWAL(cfg, cl)
|
||||||
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &bootstrappedStorage{
|
return &bootstrappedStorage{
|
||||||
backend: backend,
|
backend: be,
|
||||||
st: st,
|
st: st,
|
||||||
|
wal: wal,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,49 +252,19 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
|||||||
return be.Defrag()
|
return be.Defrag()
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) {
|
func bootstrapCluster(cfg config.ServerConfig, bwal *bootstrappedWAL, prt http.RoundTripper) (c *bootstrapedCluster, err error) {
|
||||||
c = &bootstrapedCluster{}
|
|
||||||
var (
|
|
||||||
bwal *bootstrappedWAL
|
|
||||||
)
|
|
||||||
if haveWAL {
|
|
||||||
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
|
|
||||||
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
|
|
||||||
}
|
|
||||||
bwal = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case !haveWAL && !cfg.NewCluster:
|
case bwal == nil && !cfg.NewCluster:
|
||||||
c, err = bootstrapExistingClusterNoWAL(cfg, prt)
|
c, err = bootstrapExistingClusterNoWAL(cfg, prt)
|
||||||
if err != nil {
|
case bwal == nil && cfg.NewCluster:
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
|
|
||||||
case !haveWAL && cfg.NewCluster:
|
|
||||||
c, err = bootstrapNewClusterNoWAL(cfg, prt)
|
c, err = bootstrapNewClusterNoWAL(cfg, prt)
|
||||||
if err != nil {
|
case bwal != nil && bwal.haveWAL:
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
|
|
||||||
case haveWAL:
|
|
||||||
c, err = bootstrapClusterWithWAL(cfg, bwal.meta)
|
c, err = bootstrapClusterWithWAL(cfg, bwal.meta)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.wal = bwal
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||||
}
|
}
|
||||||
c.cl.SetStore(storage.st)
|
if err != nil {
|
||||||
c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be))
|
return nil, err
|
||||||
if haveWAL {
|
|
||||||
c.cl.Recover(api.UpdateCapability)
|
|
||||||
if c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist {
|
|
||||||
bepath := cfg.BackendPath()
|
|
||||||
os.RemoveAll(bepath)
|
|
||||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
@ -425,13 +414,30 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe
|
|||||||
return snapshot, be, nil
|
return snapshot, be, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapRaft(cfg config.ServerConfig, haveWAL bool, cl *membership.RaftCluster, bwal *bootstrappedWAL) *bootstrappedRaft {
|
func (c *bootstrapedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedStorage) error {
|
||||||
switch {
|
if !s.wal.haveWAL {
|
||||||
case !haveWAL && !cfg.NewCluster:
|
c.cl.SetID(c.nodeID, c.cl.ID())
|
||||||
return bootstrapRaftFromCluster(cfg, cl, nil, bwal)
|
}
|
||||||
case !haveWAL && cfg.NewCluster:
|
c.cl.SetStore(s.st)
|
||||||
return bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal)
|
c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, s.backend.be))
|
||||||
case haveWAL:
|
if s.wal.haveWAL {
|
||||||
|
c.cl.Recover(api.UpdateCapability)
|
||||||
|
if c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) && !s.backend.beExist {
|
||||||
|
bepath := cfg.BackendPath()
|
||||||
|
os.RemoveAll(bepath)
|
||||||
|
return fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func bootstrapRaft(cfg config.ServerConfig, cluster *bootstrapedCluster, bwal *bootstrappedWAL) *bootstrappedRaft {
|
||||||
|
switch {
|
||||||
|
case !bwal.haveWAL && !cfg.NewCluster:
|
||||||
|
return bootstrapRaftFromCluster(cfg, cluster.cl, nil, bwal)
|
||||||
|
case !bwal.haveWAL && cfg.NewCluster:
|
||||||
|
return bootstrapRaftFromCluster(cfg, cluster.cl, cluster.cl.MemberIDs(), bwal)
|
||||||
|
case bwal.haveWAL:
|
||||||
return bootstrapRaftFromWAL(cfg, bwal)
|
return bootstrapRaftFromWAL(cfg, bwal)
|
||||||
default:
|
default:
|
||||||
cfg.Logger.Panic("unsupported bootstrap config")
|
cfg.Logger.Panic("unsupported bootstrap config")
|
||||||
@ -520,6 +526,7 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot
|
|||||||
ents: ents,
|
ents: ents,
|
||||||
snapshot: snap,
|
snapshot: snap,
|
||||||
meta: meta,
|
meta: meta,
|
||||||
|
haveWAL: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.ForceNewCluster {
|
if cfg.ForceNewCluster {
|
||||||
@ -590,11 +597,11 @@ type snapshotMetadata struct {
|
|||||||
nodeID, clusterID types.ID
|
nodeID, clusterID types.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapNewWAL(cfg config.ServerConfig, cl *membership.RaftCluster, nodeID types.ID) *bootstrappedWAL {
|
func bootstrapNewWAL(cfg config.ServerConfig, cl *bootstrapedCluster) *bootstrappedWAL {
|
||||||
metadata := pbutil.MustMarshal(
|
metadata := pbutil.MustMarshal(
|
||||||
&etcdserverpb.Metadata{
|
&etcdserverpb.Metadata{
|
||||||
NodeID: uint64(nodeID),
|
NodeID: uint64(cl.nodeID),
|
||||||
ClusterID: uint64(cl.ID()),
|
ClusterID: uint64(cl.cl.ID()),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
|
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
|
||||||
@ -613,6 +620,7 @@ func bootstrapNewWAL(cfg config.ServerConfig, cl *membership.RaftCluster, nodeID
|
|||||||
type bootstrappedWAL struct {
|
type bootstrappedWAL struct {
|
||||||
lg *zap.Logger
|
lg *zap.Logger
|
||||||
|
|
||||||
|
haveWAL bool
|
||||||
w *wal.WAL
|
w *wal.WAL
|
||||||
st *raftpb.HardState
|
st *raftpb.HardState
|
||||||
ents []raftpb.Entry
|
ents []raftpb.Entry
|
||||||
|
@ -320,7 +320,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
errorc: make(chan error, 1),
|
errorc: make(chan error, 1),
|
||||||
v2store: b.storage.st,
|
v2store: b.storage.st,
|
||||||
snapshotter: b.ss,
|
snapshotter: b.ss,
|
||||||
r: *b.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl),
|
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
|
||||||
id: b.cluster.nodeID,
|
id: b.cluster.nodeID,
|
||||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||||
cluster: b.cluster.cl,
|
cluster: b.cluster.cl,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user