mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Move raft up the bootstrap hierarchy
This commit is contained in:
parent
138afa5be9
commit
049e2d6ec0
@ -72,7 +72,6 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
|||||||
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
|
||||||
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
return nil, fmt.Errorf("cannot access member directory: %v", terr)
|
||||||
}
|
}
|
||||||
|
|
||||||
haveWAL := wal.Exist(cfg.WALDir())
|
haveWAL := wal.Exist(cfg.WALDir())
|
||||||
s, err := bootstrapStorage(cfg, haveWAL, ss, prt)
|
s, err := bootstrapStorage(cfg, haveWAL, ss, prt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -84,17 +83,23 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
|
|||||||
s.backend.be.Close()
|
s.backend.be.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
raft := bootstrapRaft(cfg, haveWAL, cluster.cl, cluster.wal)
|
||||||
|
if !haveWAL {
|
||||||
|
cluster.cl.SetID(cluster.nodeID, cluster.cl.ID())
|
||||||
|
}
|
||||||
return &bootstrappedServer{
|
return &bootstrappedServer{
|
||||||
prt: prt,
|
prt: prt,
|
||||||
ss: ss,
|
ss: ss,
|
||||||
storage: s,
|
storage: s,
|
||||||
cluster: cluster,
|
cluster: cluster,
|
||||||
|
raft: raft,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type bootstrappedServer struct {
|
type bootstrappedServer struct {
|
||||||
storage *bootstrappedStorage
|
storage *bootstrappedStorage
|
||||||
cluster *bootstrapedCluster
|
cluster *bootstrapedCluster
|
||||||
|
raft *bootstrappedRaft
|
||||||
prt http.RoundTripper
|
prt http.RoundTripper
|
||||||
ss *snap.Snapshotter
|
ss *snap.Snapshotter
|
||||||
}
|
}
|
||||||
@ -113,7 +118,6 @@ type bootstrappedBackend struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type bootstrapedCluster struct {
|
type bootstrapedCluster struct {
|
||||||
raft *bootstrappedRaft
|
|
||||||
remotes []*membership.Member
|
remotes []*membership.Member
|
||||||
wal *bootstrappedWAL
|
wal *bootstrappedWAL
|
||||||
cl *membership.RaftCluster
|
cl *membership.RaftCluster
|
||||||
@ -232,14 +236,13 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
|
|||||||
func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) {
|
func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrappedStorage, prt http.RoundTripper, ss *snap.Snapshotter) (c *bootstrapedCluster, err error) {
|
||||||
c = &bootstrapedCluster{}
|
c = &bootstrapedCluster{}
|
||||||
var (
|
var (
|
||||||
meta *snapshotMetadata
|
|
||||||
bwal *bootstrappedWAL
|
bwal *bootstrappedWAL
|
||||||
)
|
)
|
||||||
if haveWAL {
|
if haveWAL {
|
||||||
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
|
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
|
||||||
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
|
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
|
||||||
}
|
}
|
||||||
bwal, meta = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot)
|
bwal = bootstrapWALFromSnapshot(cfg, storage.backend.snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
@ -256,7 +259,7 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp
|
|||||||
}
|
}
|
||||||
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
|
c.wal = bootstrapNewWAL(cfg, c.cl, c.nodeID)
|
||||||
case haveWAL:
|
case haveWAL:
|
||||||
c, err = bootstrapClusterWithWAL(cfg, storage, meta)
|
c, err = bootstrapClusterWithWAL(cfg, storage, bwal.meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -264,18 +267,6 @@ func bootstrapCluster(cfg config.ServerConfig, haveWAL bool, storage *bootstrapp
|
|||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||||
}
|
}
|
||||||
switch {
|
|
||||||
case !haveWAL && !cfg.NewCluster:
|
|
||||||
c.raft = bootstrapRaftFromCluster(cfg, c.cl, nil, c.wal)
|
|
||||||
c.cl.SetID(c.nodeID, c.cl.ID())
|
|
||||||
case !haveWAL && cfg.NewCluster:
|
|
||||||
c.raft = bootstrapRaftFromCluster(cfg, c.cl, c.cl.MemberIDs(), c.wal)
|
|
||||||
c.cl.SetID(c.nodeID, c.cl.ID())
|
|
||||||
case haveWAL:
|
|
||||||
c.raft = bootstrapRaftFromSnapshot(cfg, c.wal, meta)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
|
||||||
}
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -437,6 +428,20 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe
|
|||||||
return snapshot, be, nil
|
return snapshot, be, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func bootstrapRaft(cfg config.ServerConfig, haveWAL bool, cl *membership.RaftCluster, bwal *bootstrappedWAL) *bootstrappedRaft {
|
||||||
|
switch {
|
||||||
|
case !haveWAL && !cfg.NewCluster:
|
||||||
|
return bootstrapRaftFromCluster(cfg, cl, nil, bwal)
|
||||||
|
case !haveWAL && cfg.NewCluster:
|
||||||
|
return bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal)
|
||||||
|
case haveWAL:
|
||||||
|
return bootstrapRaftFromWAL(cfg, bwal)
|
||||||
|
default:
|
||||||
|
cfg.Logger.Panic("unsupported bootstrap config")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft {
|
func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft {
|
||||||
member := cl.MemberByName(cfg.Name)
|
member := cl.MemberByName(cfg.Name)
|
||||||
peers := make([]raft.Peer, len(ids))
|
peers := make([]raft.Peer, len(ids))
|
||||||
@ -463,12 +468,12 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapRaftFromSnapshot(cfg config.ServerConfig, bwal *bootstrappedWAL, meta *snapshotMetadata) *bootstrappedRaft {
|
func bootstrapRaftFromWAL(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft {
|
||||||
s := bwal.MemoryStorage()
|
s := bwal.MemoryStorage()
|
||||||
return &bootstrappedRaft{
|
return &bootstrappedRaft{
|
||||||
lg: cfg.Logger,
|
lg: cfg.Logger,
|
||||||
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
||||||
config: raftConfig(cfg, uint64(meta.nodeID), s),
|
config: raftConfig(cfg, uint64(bwal.meta.nodeID), s),
|
||||||
storage: s,
|
storage: s,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -509,7 +514,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *m
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*bootstrappedWAL, *snapshotMetadata) {
|
func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedWAL {
|
||||||
wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot)
|
wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot)
|
||||||
bwal := &bootstrappedWAL{
|
bwal := &bootstrappedWAL{
|
||||||
lg: cfg.Logger,
|
lg: cfg.Logger,
|
||||||
@ -517,12 +522,13 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot
|
|||||||
st: st,
|
st: st,
|
||||||
ents: ents,
|
ents: ents,
|
||||||
snapshot: snap,
|
snapshot: snap,
|
||||||
|
meta: meta,
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.ForceNewCluster {
|
if cfg.ForceNewCluster {
|
||||||
// discard the previously uncommitted entries
|
// discard the previously uncommitted entries
|
||||||
bwal.ents = bwal.CommitedEntries()
|
bwal.ents = bwal.CommitedEntries()
|
||||||
entries := bwal.ConfigChangeEntries(meta)
|
entries := bwal.ConfigChangeEntries()
|
||||||
// force commit config change entries
|
// force commit config change entries
|
||||||
bwal.AppendAndCommitEntries(entries)
|
bwal.AppendAndCommitEntries(entries)
|
||||||
cfg.Logger.Info(
|
cfg.Logger.Info(
|
||||||
@ -539,7 +545,7 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot
|
|||||||
zap.Uint64("commit-index", bwal.st.Commit),
|
zap.Uint64("commit-index", bwal.st.Commit),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
return bwal, meta
|
return bwal
|
||||||
}
|
}
|
||||||
|
|
||||||
// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
||||||
@ -614,6 +620,7 @@ type bootstrappedWAL struct {
|
|||||||
st *raftpb.HardState
|
st *raftpb.HardState
|
||||||
ents []raftpb.Entry
|
ents []raftpb.Entry
|
||||||
snapshot *raftpb.Snapshot
|
snapshot *raftpb.Snapshot
|
||||||
|
meta *snapshotMetadata
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage {
|
func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage {
|
||||||
@ -645,11 +652,11 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry {
|
|||||||
return wal.ents
|
return wal.ents
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wal *bootstrappedWAL) ConfigChangeEntries(meta *snapshotMetadata) []raftpb.Entry {
|
func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry {
|
||||||
return serverstorage.CreateConfigChangeEnts(
|
return serverstorage.CreateConfigChangeEnts(
|
||||||
wal.lg,
|
wal.lg,
|
||||||
serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents),
|
serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents),
|
||||||
uint64(meta.nodeID),
|
uint64(wal.meta.nodeID),
|
||||||
wal.st.Term,
|
wal.st.Term,
|
||||||
wal.st.Commit,
|
wal.st.Commit,
|
||||||
)
|
)
|
||||||
|
@ -320,7 +320,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
errorc: make(chan error, 1),
|
errorc: make(chan error, 1),
|
||||||
v2store: b.storage.st,
|
v2store: b.storage.st,
|
||||||
snapshotter: b.ss,
|
snapshotter: b.ss,
|
||||||
r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl),
|
r: *b.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl),
|
||||||
id: b.cluster.nodeID,
|
id: b.cluster.nodeID,
|
||||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||||
cluster: b.cluster.cl,
|
cluster: b.cluster.cl,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user