mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Move cluster up the bootstrap hierarchy
This commit is contained in:
@@ -113,10 +113,11 @@ type bootstrappedBackend struct {
|
||||
}
|
||||
|
||||
type bootstrapedCluster struct {
|
||||
raft *bootstrappedRaft
|
||||
remotes []*membership.Member
|
||||
wal *bootstrappedWAL
|
||||
clusterID, nodeID types.ID
|
||||
raft *bootstrappedRaft
|
||||
remotes []*membership.Member
|
||||
wal *bootstrappedWAL
|
||||
cl *membership.RaftCluster
|
||||
nodeID types.ID
|
||||
}
|
||||
|
||||
type bootstrappedRaft struct {
|
||||
@@ -125,7 +126,6 @@ type bootstrappedRaft struct {
|
||||
|
||||
peers []raft.Peer
|
||||
config *raft.Config
|
||||
cl *membership.RaftCluster
|
||||
storage *raft.MemoryStorage
|
||||
}
|
||||
|
||||
@@ -274,11 +274,11 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
|
||||
br := bootstrapRaftFromCluster(cfg, cl, nil, bwal)
|
||||
cl.SetID(member.ID, existingCluster.ID())
|
||||
return &bootstrapedCluster{
|
||||
raft: br,
|
||||
remotes: remotes,
|
||||
wal: bwal,
|
||||
clusterID: cl.ID(),
|
||||
nodeID: member.ID,
|
||||
raft: br,
|
||||
remotes: remotes,
|
||||
wal: bwal,
|
||||
cl: cl,
|
||||
nodeID: member.ID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -319,11 +319,11 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
|
||||
br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs(), bwal)
|
||||
cl.SetID(member.ID, cl.ID())
|
||||
return &bootstrapedCluster{
|
||||
remotes: nil,
|
||||
raft: br,
|
||||
wal: bwal,
|
||||
clusterID: cl.ID(),
|
||||
nodeID: member.ID,
|
||||
remotes: nil,
|
||||
raft: br,
|
||||
wal: bwal,
|
||||
cl: cl,
|
||||
nodeID: member.ID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -343,12 +343,6 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora
|
||||
)
|
||||
}
|
||||
bwal, meta := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), storage.backend.snapshot, cfg.UnsafeNoFsync)
|
||||
|
||||
b := &bootstrapedCluster{
|
||||
wal: bwal,
|
||||
clusterID: meta.clusterID,
|
||||
nodeID: meta.nodeID,
|
||||
}
|
||||
if cfg.ForceNewCluster {
|
||||
// discard the previously uncommitted entries
|
||||
bwal.ents = bwal.CommitedEntries()
|
||||
@@ -369,17 +363,24 @@ func bootstrapClusterWithWAL(cfg config.ServerConfig, storage *bootstrappedStora
|
||||
zap.Uint64("commit-index", bwal.st.Commit),
|
||||
)
|
||||
}
|
||||
b.raft = bootstrapRaftFromSnapshot(cfg, bwal, meta)
|
||||
cl := membership.NewCluster(cfg.Logger)
|
||||
cl.SetID(meta.nodeID, meta.clusterID)
|
||||
raft := bootstrapRaftFromSnapshot(cfg, bwal, meta)
|
||||
|
||||
b.raft.cl.SetStore(storage.st)
|
||||
b.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be))
|
||||
b.raft.cl.Recover(api.UpdateCapability)
|
||||
if b.raft.cl.Version() != nil && !b.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist {
|
||||
cl.SetStore(storage.st)
|
||||
cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, storage.backend.be))
|
||||
cl.Recover(api.UpdateCapability)
|
||||
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !storage.backend.beExist {
|
||||
bepath := cfg.BackendPath()
|
||||
os.RemoveAll(bepath)
|
||||
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
|
||||
}
|
||||
return b, nil
|
||||
return &bootstrapedCluster{
|
||||
raft: raft,
|
||||
wal: bwal,
|
||||
cl: cl,
|
||||
nodeID: meta.nodeID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) {
|
||||
@@ -462,7 +463,6 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste
|
||||
return &bootstrappedRaft{
|
||||
lg: cfg.Logger,
|
||||
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
||||
cl: cl,
|
||||
config: raftConfig(cfg, uint64(member.ID), s),
|
||||
peers: peers,
|
||||
storage: s,
|
||||
@@ -470,13 +470,10 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste
|
||||
}
|
||||
|
||||
func bootstrapRaftFromSnapshot(cfg config.ServerConfig, bwal *bootstrappedWAL, meta *snapshotMetadata) *bootstrappedRaft {
|
||||
cl := membership.NewCluster(cfg.Logger)
|
||||
cl.SetID(meta.nodeID, meta.clusterID)
|
||||
s := bwal.MemoryStorage()
|
||||
return &bootstrappedRaft{
|
||||
lg: cfg.Logger,
|
||||
heartbeat: time.Duration(cfg.TickMs) * time.Millisecond,
|
||||
cl: cl,
|
||||
config: raftConfig(cfg, uint64(meta.nodeID), s),
|
||||
storage: s,
|
||||
}
|
||||
@@ -496,7 +493,7 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raftNode {
|
||||
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode {
|
||||
var n raft.Node
|
||||
if len(b.peers) == 0 {
|
||||
n = raft.RestartNode(b.config)
|
||||
@@ -509,7 +506,7 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL) *raft
|
||||
return newRaftNode(
|
||||
raftNodeConfig{
|
||||
lg: b.lg,
|
||||
isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) },
|
||||
isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
|
||||
Node: n,
|
||||
heartbeat: b.heartbeat,
|
||||
raftStorage: b.storage,
|
||||
|
||||
@@ -308,7 +308,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
sstats := stats.NewServerStats(cfg.Name, b.cluster.nodeID.String())
|
||||
sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
|
||||
lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
|
||||
|
||||
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
|
||||
@@ -320,10 +320,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
errorc: make(chan error, 1),
|
||||
v2store: b.storage.st,
|
||||
snapshotter: b.ss,
|
||||
r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w),
|
||||
r: *b.cluster.raft.newRaftNode(b.ss, b.cluster.wal.w, b.cluster.cl),
|
||||
id: b.cluster.nodeID,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: b.cluster.raft.cl,
|
||||
cluster: b.cluster.cl,
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
||||
@@ -405,7 +405,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
DialTimeout: cfg.PeerDialTimeout(),
|
||||
ID: b.cluster.nodeID,
|
||||
URLs: cfg.PeerURLs,
|
||||
ClusterID: b.cluster.raft.cl.ID(),
|
||||
ClusterID: b.cluster.cl.ID(),
|
||||
Raft: srv,
|
||||
Snapshotter: b.ss,
|
||||
ServerStats: sstats,
|
||||
@@ -421,7 +421,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
tr.AddRemote(m.ID, m.PeerURLs)
|
||||
}
|
||||
}
|
||||
for _, m := range b.cluster.raft.cl.Members() {
|
||||
for _, m := range b.cluster.cl.Members() {
|
||||
if m.ID != b.cluster.nodeID {
|
||||
tr.AddPeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user