etcdserver: Extract boostrapRaft struct

This commit is contained in:
Marek Siarkowicz 2021-07-07 17:29:13 +02:00
parent 880673c4a0
commit 554777bba4
2 changed files with 66 additions and 50 deletions

View File

@ -420,8 +420,7 @@ func (r *raftNode) advanceTicks(ticks int) {
}
}
func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *boostrapRaft {
member := cl.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
&pb.Metadata{
@ -429,7 +428,8 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.
ClusterID: uint64(cl.ID()),
},
)
if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil {
w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata)
if err != nil {
cfg.Logger.Panic("failed to create WAL", zap.Error(err))
}
if cfg.UnsafeNoFsync {
@ -444,14 +444,15 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
id := member.ID
cfg.Logger.Info(
"starting local member",
zap.String("local-member-id", id.String()),
zap.String("cluster-id", cl.ID().String()),
)
s = raft.NewMemoryStorage()
s := raft.NewMemoryStorage()
c := raftConfig(cfg, uint64(id), s)
var n raft.Node
if len(peers) == 0 {
n = raft.RestartNode(c)
} else {
@ -460,10 +461,17 @@ func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
return id, n, s, w
return &boostrapRaft{
id: id,
cl: cl,
node: n,
storage: s,
wal: w,
}
}
func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -489,10 +497,16 @@ func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID,
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
return id, cl, n, s, w
return &boostrapRaft{
id: id,
cl: cl,
node: n,
storage: s,
wal: w,
}
}
func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *boostrapRaft {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
@ -551,7 +565,13 @@ func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot)
n := raft.RestartNode(c)
raftStatus = n.Status
return id, cl, n, s, w
return &boostrapRaft{
id: id,
cl: cl,
node: n,
storage: s,
wal: w,
}
}
func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft.Config {
@ -568,6 +588,14 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft
}
}
type boostrapRaft struct {
id types.ID
cl *membership.RaftCluster
node raft.Node
storage *raft.MemoryStorage
wal *wal.WAL
}
// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain three kinds of
// ID-related entry:

View File

@ -393,12 +393,8 @@ func bootstrap(cfg config.ServerConfig) (b *boostrapResult, err error) {
}
type boostrapResult struct {
cl *membership.RaftCluster
raft *boostrapRaft
remotes []*membership.Member
w *wal.WAL
n raft.Node
s *raft.MemoryStorage
id types.ID
prt http.RoundTripper
ci cindex.ConsistentIndexer
st v2store.Store
@ -469,15 +465,11 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe
cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
id, n, s, w := startNode(cfg, cl, nil)
cl.SetID(id, existingCluster.ID())
br := startNode(cfg, cl, nil)
cl.SetID(br.id, existingCluster.ID())
return &boostrapResult{
cl: cl,
raft: br,
remotes: remotes,
w: w,
n: n,
s: s,
id: id,
}, nil
}
@ -513,15 +505,11 @@ func boostrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
}
cl.SetStore(st)
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
id, n, s, w := startNode(cfg, cl, cl.MemberIDs())
cl.SetID(id, cl.ID())
br := startNode(cfg, cl, cl.MemberIDs())
cl.SetID(br.id, cl.ID())
return &boostrapResult{
cl: cl,
remotes: nil,
w: w,
n: n,
s: s,
id: id,
raft: br,
}, nil
}
@ -600,15 +588,15 @@ func boostrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backe
r := &boostrapResult{}
if !cfg.ForceNewCluster {
r.id, r.cl, r.n, r.s, r.w = restartNode(cfg, snapshot)
r.raft = restartNode(cfg, snapshot)
} else {
r.id, r.cl, r.n, r.s, r.w = restartAsStandaloneNode(cfg, snapshot)
r.raft = restartAsStandaloneNode(cfg, snapshot)
}
r.cl.SetStore(st)
r.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
r.cl.Recover(api.UpdateCapability)
if r.cl.Version() != nil && !r.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
r.raft.cl.SetStore(st)
r.raft.cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
r.raft.cl.Recover(api.UpdateCapability)
if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
bepath := cfg.BackendPath()
os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
@ -630,8 +618,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}()
sstats := stats.NewServerStats(cfg.Name, b.id.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.id.String())
sstats := stats.NewServerStats(cfg.Name, b.raft.id.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.raft.id.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{
@ -645,26 +633,26 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
r: *newRaftNode(
raftNodeConfig{
lg: cfg.Logger,
isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) },
Node: b.n,
isIDRemoved: func(id uint64) bool { return b.raft.cl.IsIDRemoved(types.ID(id)) },
Node: b.raft.node,
heartbeat: heartbeat,
raftStorage: b.s,
storage: NewStorage(b.w, b.ss),
raftStorage: b.raft.storage,
storage: NewStorage(b.raft.wal, b.ss),
},
),
id: b.id,
id: b.raft.id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.cl,
cluster: b.raft.cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.id), time.Now()),
reqIDGen: idutil.NewGenerator(uint16(b.raft.id), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.ci,
firstCommitInTermC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": b.id.String()}).Set(1)
serverID.With(prometheus.Labels{"server_id": b.raft.id.String()}).Set(1)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
@ -733,9 +721,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(),
ID: b.id,
ID: b.raft.id,
URLs: cfg.PeerURLs,
ClusterID: b.cl.ID(),
ClusterID: b.raft.cl.ID(),
Raft: srv,
Snapshotter: b.ss,
ServerStats: sstats,
@ -747,12 +735,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
// add all remotes into transport
for _, m := range b.remotes {
if m.ID != b.id {
if m.ID != b.raft.id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range b.cl.Members() {
if m.ID != b.id {
for _, m := range b.raft.cl.Members() {
if m.ID != b.raft.id {
tr.AddPeer(m.ID, m.PeerURLs)
}
}