Rename package alising "apply2" -> apply.

This commit is contained in:
Piotr Tabor 2022-05-05 10:48:26 +02:00
parent c62f01e5fe
commit 63b2f63cc1
5 changed files with 53 additions and 53 deletions

View File

@ -61,11 +61,11 @@ func init() {
})) }))
} }
// apply contains entries, snapshot to be applied. Once // toApply contains entries, snapshot to be applied. Once
// an apply is consumed, the entries will be persisted to // an toApply is consumed, the entries will be persisted to
// to raft storage concurrently; the application must read // to raft storage concurrently; the application must read
// raftDone before assuming the raft messages are stable. // raftDone before assuming the raft messages are stable.
type apply struct { type toApply struct {
entries []raftpb.Entry entries []raftpb.Entry
snapshot raftpb.Snapshot snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node // notifyc synchronizes etcd server applies with the raft node
@ -82,7 +82,7 @@ type raftNode struct {
msgSnapC chan raftpb.Message msgSnapC chan raftpb.Message
// a chan to send out apply // a chan to send out apply
applyc chan apply applyc chan toApply
// a chan to send out readState // a chan to send out readState
readStateC chan raft.ReadState readStateC chan raft.ReadState
@ -134,7 +134,7 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
td: contention.NewTimeoutDetector(2 * cfg.heartbeat), td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1), readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan apply), applyc: make(chan toApply),
stopped: make(chan struct{}), stopped: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -201,7 +201,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
} }
notifyc := make(chan struct{}, 1) notifyc := make(chan struct{}, 1)
ap := apply{ ap := toApply{
entries: rd.CommittedEntries, entries: rd.CommittedEntries,
snapshot: rd.Snapshot, snapshot: rd.Snapshot,
notifyc: notifyc, notifyc: notifyc,
@ -278,7 +278,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// changes to be applied before sending messages. // changes to be applied before sending messages.
// Otherwise we might incorrectly count votes (e.g. votes from removed members). // Otherwise we might incorrectly count votes (e.g. votes from removed members).
// Also slow machine's follower raft-layer could proceed to become the leader // Also slow machine's follower raft-layer could proceed to become the leader
// on its own single-node cluster, before apply-layer applies the config change. // on its own single-node cluster, before toApply-layer applies the config change.
// We simply wait for ALL pending entries to be applied for now. // We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues. // We might improve this later on if it causes unnecessary long blocking issues.
waitApply := false waitApply := false
@ -314,7 +314,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}() }()
} }
func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) {
var ci uint64 var ci uint64
if len(ap.entries) != 0 { if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index ci = ap.entries[len(ap.entries)-1].Index
@ -372,7 +372,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
return ms return ms
} }
func (r *raftNode) apply() chan apply { func (r *raftNode) apply() chan toApply {
return r.applyc return r.applyc
} }

View File

@ -171,7 +171,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
select { select {
case <-srv.r.applyc: case <-srv.r.applyc:
case <-time.After(time.Second): case <-time.After(time.Second):
t.Fatalf("failed to receive apply struct") t.Fatalf("failed to receive toApply struct")
} }
srv.r.stopped <- struct{}{} srv.r.stopped <- struct{}{}
@ -182,7 +182,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
} }
} }
// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change. // TestConfigChangeBlocksApply ensures toApply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) { func TestConfigChangeBlocksApply(t *testing.T) {
n := newNopReadyNode() n := newNopReadyNode()
@ -217,11 +217,11 @@ func TestConfigChangeBlocksApply(t *testing.T) {
select { select {
case <-continueC: case <-continueC:
t.Fatalf("unexpected execution: raft routine should block waiting for apply") t.Fatalf("unexpected execution: raft routine should block waiting for toApply")
case <-time.After(time.Second): case <-time.After(time.Second):
} }
// finish apply, unblock raft routine // finish toApply, unblock raft routine
<-ap.notifyc <-ap.notifyc
select { select {

View File

@ -35,7 +35,7 @@ import (
"go.etcd.io/etcd/pkg/v3/notify" "go.etcd.io/etcd/pkg/v3/notify"
"go.etcd.io/etcd/pkg/v3/runtime" "go.etcd.io/etcd/pkg/v3/runtime"
"go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/config"
apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply" "go.etcd.io/etcd/server/v3/etcdserver/apply"
"go.etcd.io/etcd/server/v3/etcdserver/etcderrors" "go.etcd.io/etcd/server/v3/etcdserver/etcderrors"
"go.uber.org/zap" "go.uber.org/zap"
@ -150,7 +150,7 @@ type ServerV2 interface {
type ServerV3 interface { type ServerV3 interface {
Server Server
apply2.RaftStatusGetter apply.RaftStatusGetter
} }
func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled } func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
@ -252,7 +252,7 @@ type EtcdServer struct {
applyV2 ApplierV2 applyV2 ApplierV2
uberApply *apply2.UberApplier uberApply *apply.UberApplier
applyWait wait.WaitTime applyWait wait.WaitTime
@ -727,7 +727,7 @@ type etcdProgress struct {
// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, // raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
// and helps decouple state machine logic from Raft algorithms. // and helps decouple state machine logic from Raft algorithms.
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover // TODO: add a state machine interface to toApply the commit entries and do snapshot/recover
type raftReadyHandler struct { type raftReadyHandler struct {
getLead func() (lead uint64) getLead func() (lead uint64)
updateLead func(lead uint64) updateLead func(lead uint64)
@ -743,7 +743,7 @@ func (s *EtcdServer) run() {
lg.Panic("failed to get snapshot from Raft storage", zap.Error(err)) lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
} }
// asynchronously accept apply packets, dispatch progress in-order // asynchronously accept toApply packets, dispatch progress in-order
sched := schedule.NewFIFOScheduler() sched := schedule.NewFIFOScheduler()
var ( var (
@ -905,7 +905,7 @@ func (s *EtcdServer) Cleanup() {
} }
} }
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
s.applySnapshot(ep, apply) s.applySnapshot(ep, apply)
s.applyEntries(ep, apply) s.applyEntries(ep, apply)
@ -914,7 +914,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// wait for the raft routine to finish the disk writes before triggering a // wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft // snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine. // storage, since the raft routine might be slower than toApply routine.
<-apply.notifyc <-apply.notifyc
s.triggerSnapshot(ep) s.triggerSnapshot(ep)
@ -927,8 +927,8 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
} }
} }
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
if raft.IsEmptySnap(apply.snapshot) { if raft.IsEmptySnap(toApply.snapshot) {
return return
} }
applySnapshotInProgress.Inc() applySnapshotInProgress.Inc()
@ -938,34 +938,34 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
"applying snapshot", "applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
) )
defer func() { defer func() {
lg.Info( lg.Info(
"applied snapshot", "applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
) )
applySnapshotInProgress.Dec() applySnapshotInProgress.Dec()
}() }()
if apply.snapshot.Metadata.Index <= ep.appliedi { if toApply.snapshot.Metadata.Index <= ep.appliedi {
lg.Panic( lg.Panic(
"unexpected leader snapshot from outdated index", "unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term),
) )
} }
// wait for raftNode to persist snapshot onto the disk // wait for raftNode to persist snapshot onto the disk
<-apply.notifyc <-toApply.notifyc
newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks) newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks)
if err != nil { if err != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err)) lg.Panic("failed to open snapshot backend", zap.Error(err))
} }
@ -1033,7 +1033,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
} }
lg.Info("restoring v2 store") lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil { if err := s.v2store.Recovery(toApply.snapshot.Data); err != nil {
lg.Panic("failed to restore v2 store", zap.Error(err)) lg.Panic("failed to restore v2 store", zap.Error(err))
} }
@ -1067,18 +1067,18 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Info("added peers from new cluster configuration") lg.Info("added peers from new cluster configuration")
ep.appliedt = apply.snapshot.Metadata.Term ep.appliedt = toApply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index ep.appliedi = toApply.snapshot.Metadata.Index
ep.snapi = ep.appliedi ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState ep.confState = toApply.snapshot.Metadata.ConfState
// As backends and implementations like alarmsStore changed, we need // As backends and implementations like alarmsStore changed, we need
// to re-bootstrap Appliers. // to re-bootstrap Appliers.
s.uberApply = s.NewUberApplier() s.uberApply = s.NewUberApplier()
} }
func (s *EtcdServer) NewUberApplier() *apply2.UberApplier { func (s *EtcdServer) NewUberApplier() *apply.UberApplier {
return apply2.NewUberApplier(s.lg, s.be, s.KV(), s.alarmStore, s.authStore, s.lessor, s.cluster, s, s, s.consistIndex, return apply.NewUberApplier(s.lg, s.be, s.KV(), s.alarmStore, s.authStore, s.lessor, s.cluster, s, s, s.consistIndex,
s.Cfg.WarningApplyDuration, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.Cfg.QuotaBackendBytes) s.Cfg.WarningApplyDuration, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.Cfg.QuotaBackendBytes)
} }
@ -1090,7 +1090,7 @@ func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) {
}) })
} }
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
if len(apply.entries) == 0 { if len(apply.entries) == 0 {
return return
} }
@ -1277,7 +1277,7 @@ func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) err
// Note that this permission check is done in the API layer, // Note that this permission check is done in the API layer,
// so TOCTOU problem can be caused potentially in a schedule like this: // so TOCTOU problem can be caused potentially in a schedule like this:
// update membership with user A -> revoke root role of A -> apply membership change // update membership with user A -> revoke root role of A -> toApply membership change
// in the state machine layer // in the state machine layer
// However, both of membership change and role management requires the root privilege. // However, both of membership change and role management requires the root privilege.
// So careful operation by admins can prevent the problem. // So careful operation by admins can prevent the problem.
@ -1469,7 +1469,7 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error {
// check whether the learner catches up with leader or not. // check whether the learner catches up with leader or not.
// Note: it will return nil if member is not found in cluster or if member is not learner. // Note: it will return nil if member is not found in cluster or if member is not learner.
// These two conditions will be checked before apply phase later. // These two conditions will be checked before toApply phase later.
func (s *EtcdServer) isLearnerReady(id uint64) error { func (s *EtcdServer) isLearnerReady(id uint64) error {
rs := s.raftStatus() rs := s.raftStatus()
@ -1774,7 +1774,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
}) })
} }
// apply takes entries received from Raft (after it has been committed) and // toApply takes entries received from Raft (after it has been committed) and
// applies them to the current state of the EtcdServer. // applies them to the current state of the EtcdServer.
// The given entries should not be empty. // The given entries should not be empty.
func (s *EtcdServer) apply( func (s *EtcdServer) apply(
@ -1795,7 +1795,7 @@ func (s *EtcdServer) apply(
s.setTerm(e.Term) s.setTerm(e.Term)
case raftpb.EntryConfChange: case raftpb.EntryConfChange:
// We need to apply all WAL entries on top of v2store // We need to toApply all WAL entries on top of v2store
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend. // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
shouldApplyV3 := membership.ApplyV2storeOnly shouldApplyV3 := membership.ApplyV2storeOnly
@ -1829,7 +1829,7 @@ func (s *EtcdServer) apply(
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false applyV3Performed := false
var ar *apply2.ApplyResult var ar *apply.ApplyResult
index := s.consistIndex.ConsistentIndex() index := s.consistIndex.ConsistentIndex()
if e.Index > index { if e.Index > index {
// set the consistent index of current executing entry // set the consistent index of current executing entry
@ -1843,7 +1843,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
} }
}() }()
} }
s.lg.Debug("apply entry normal", s.lg.Debug("toApply entry normal",
zap.Uint64("consistent-index", index), zap.Uint64("consistent-index", index),
zap.Uint64("entry-index", e.Index), zap.Uint64("entry-index", e.Index),
zap.Bool("should-applyV3", bool(shouldApplyV3))) zap.Bool("should-applyV3", bool(shouldApplyV3)))
@ -1895,7 +1895,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
ar = s.uberApply.Apply(&raftReq, shouldApplyV3) ar = s.uberApply.Apply(&raftReq, shouldApplyV3)
} }
// do not re-apply applied entries. // do not re-toApply applied entries.
if !shouldApplyV3 { if !shouldApplyV3 {
return return
} }
@ -2039,7 +2039,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// KV().commit() updates the consistent index in backend. // KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially // All operations that update consistent index must be called sequentially
// from applyAll function. // from applyAll function.
// So KV().Commit() cannot run in parallel with apply. It has to be called outside // So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below. // the go routine created below.
s.KV().Commit() s.KV().Commit()

View File

@ -723,7 +723,7 @@ func realisticRaftNode(lg *zap.Logger) *raftNode {
return r return r
} }
// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop // TestApplyMultiConfChangeShouldStop ensures that toApply will return shouldStop
// if the local member is removed along with other conf updates. // if the local member is removed along with other conf updates.
func TestApplyMultiConfChangeShouldStop(t *testing.T) { func TestApplyMultiConfChangeShouldStop(t *testing.T) {
lg := zaptest.NewLogger(t) lg := zaptest.NewLogger(t)

View File

@ -43,7 +43,7 @@ import (
const ( const (
// In the health case, there might be a small gap (10s of entries) between // In the health case, there might be a small gap (10s of entries) between
// the applied index and committed index. // the applied index and committed index.
// However, if the committed entries are very heavy to apply, the gap might grow. // However, if the committed entries are very heavy to toApply, the gap might grow.
// We should stop accepting new proposals if the gap growing to a certain point. // We should stop accepting new proposals if the gap growing to a certain point.
maxGapBetweenApplyAndCommitIndex = 5000 maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond traceThreshold = 100 * time.Millisecond
@ -63,9 +63,9 @@ type RaftKV interface {
} }
type Lessor interface { type Lessor interface {
// LeaseGrant sends LeaseGrant request to raft and apply it after committed. // LeaseGrant sends LeaseGrant request to raft and toApply it after committed.
LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
// LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. // LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed.
LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
@ -223,7 +223,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
s.bemu.RLock() s.bemu.RLock()
s.be.ForceCommit() s.be.ForceCommit()
s.bemu.RUnlock() s.bemu.RUnlock()
trace.Step("physically apply compaction") trace.Step("physically toApply compaction")
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -617,9 +617,9 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque
} }
if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil { if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil {
applyStart := result.Trace.GetStartTime() applyStart := result.Trace.GetStartTime()
// The trace object is created in apply. Here reset the start time to trace // The trace object is created in toApply. Here reset the start time to trace
// the raft request time by the difference between the request start time // the raft request time by the difference between the request start time
// and apply start time // and toApply start time
result.Trace.SetStartTime(startTime) result.Trace.SetStartTime(startTime)
result.Trace.InsertStep(0, applyStart, "process raft request") result.Trace.InsertStep(0, applyStart, "process raft request")
result.Trace.LogIfLong(traceThreshold) result.Trace.LogIfLong(traceThreshold)