From 63b2f63cc1a2348409b7b6e11d2c5bfdadec69b6 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 5 May 2022 10:48:26 +0200 Subject: [PATCH] Rename package alising "apply2" -> apply. --- server/etcdserver/raft.go | 18 ++++----- server/etcdserver/raft_test.go | 8 ++-- server/etcdserver/server.go | 66 ++++++++++++++++---------------- server/etcdserver/server_test.go | 2 +- server/etcdserver/v3_server.go | 12 +++--- 5 files changed, 53 insertions(+), 53 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 69e6a8c21..3db8a57c3 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -61,11 +61,11 @@ func init() { })) } -// apply contains entries, snapshot to be applied. Once -// an apply is consumed, the entries will be persisted to +// toApply contains entries, snapshot to be applied. Once +// an toApply is consumed, the entries will be persisted to // to raft storage concurrently; the application must read // raftDone before assuming the raft messages are stable. -type apply struct { +type toApply struct { entries []raftpb.Entry snapshot raftpb.Snapshot // notifyc synchronizes etcd server applies with the raft node @@ -82,7 +82,7 @@ type raftNode struct { msgSnapC chan raftpb.Message // a chan to send out apply - applyc chan apply + applyc chan toApply // a chan to send out readState readStateC chan raft.ReadState @@ -134,7 +134,7 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { td: contention.NewTimeoutDetector(2 * cfg.heartbeat), readStateC: make(chan raft.ReadState, 1), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), - applyc: make(chan apply), + applyc: make(chan toApply), stopped: make(chan struct{}), done: make(chan struct{}), } @@ -201,7 +201,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { } notifyc := make(chan struct{}, 1) - ap := apply{ + ap := toApply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, notifyc: notifyc, @@ -278,7 +278,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // changes to be applied before sending messages. // Otherwise we might incorrectly count votes (e.g. votes from removed members). // Also slow machine's follower raft-layer could proceed to become the leader - // on its own single-node cluster, before apply-layer applies the config change. + // on its own single-node cluster, before toApply-layer applies the config change. // We simply wait for ALL pending entries to be applied for now. // We might improve this later on if it causes unnecessary long blocking issues. waitApply := false @@ -314,7 +314,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } -func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { +func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) { var ci uint64 if len(ap.entries) != 0 { ci = ap.entries[len(ap.entries)-1].Index @@ -372,7 +372,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { return ms } -func (r *raftNode) apply() chan apply { +func (r *raftNode) apply() chan toApply { return r.applyc } diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index f34548553..6644d557c 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -171,7 +171,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { select { case <-srv.r.applyc: case <-time.After(time.Second): - t.Fatalf("failed to receive apply struct") + t.Fatalf("failed to receive toApply struct") } srv.r.stopped <- struct{}{} @@ -182,7 +182,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { } } -// TestConfigChangeBlocksApply ensures apply blocks if committed entries contain config-change. +// TestConfigChangeBlocksApply ensures toApply blocks if committed entries contain config-change. func TestConfigChangeBlocksApply(t *testing.T) { n := newNopReadyNode() @@ -217,11 +217,11 @@ func TestConfigChangeBlocksApply(t *testing.T) { select { case <-continueC: - t.Fatalf("unexpected execution: raft routine should block waiting for apply") + t.Fatalf("unexpected execution: raft routine should block waiting for toApply") case <-time.After(time.Second): } - // finish apply, unblock raft routine + // finish toApply, unblock raft routine <-ap.notifyc select { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 5f0ddb69a..4ddf983b6 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -35,7 +35,7 @@ import ( "go.etcd.io/etcd/pkg/v3/notify" "go.etcd.io/etcd/pkg/v3/runtime" "go.etcd.io/etcd/server/v3/config" - apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply" + "go.etcd.io/etcd/server/v3/etcdserver/apply" "go.etcd.io/etcd/server/v3/etcdserver/etcderrors" "go.uber.org/zap" @@ -150,7 +150,7 @@ type ServerV2 interface { type ServerV3 interface { Server - apply2.RaftStatusGetter + apply.RaftStatusGetter } func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled } @@ -252,7 +252,7 @@ type EtcdServer struct { applyV2 ApplierV2 - uberApply *apply2.UberApplier + uberApply *apply.UberApplier applyWait wait.WaitTime @@ -727,7 +727,7 @@ type etcdProgress struct { // raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, // and helps decouple state machine logic from Raft algorithms. -// TODO: add a state machine interface to apply the commit entries and do snapshot/recover +// TODO: add a state machine interface to toApply the commit entries and do snapshot/recover type raftReadyHandler struct { getLead func() (lead uint64) updateLead func(lead uint64) @@ -743,7 +743,7 @@ func (s *EtcdServer) run() { lg.Panic("failed to get snapshot from Raft storage", zap.Error(err)) } - // asynchronously accept apply packets, dispatch progress in-order + // asynchronously accept toApply packets, dispatch progress in-order sched := schedule.NewFIFOScheduler() var ( @@ -905,7 +905,7 @@ func (s *EtcdServer) Cleanup() { } } -func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { +func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) @@ -914,7 +914,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft - // storage, since the raft routine might be slower than apply routine. + // storage, since the raft routine might be slower than toApply routine. <-apply.notifyc s.triggerSnapshot(ep) @@ -927,8 +927,8 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { } } -func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { - if raft.IsEmptySnap(apply.snapshot) { +func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { + if raft.IsEmptySnap(toApply.snapshot) { return } applySnapshotInProgress.Inc() @@ -938,34 +938,34 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { "applying snapshot", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), - zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), + zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), ) defer func() { lg.Info( "applied snapshot", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), - zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), + zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), ) applySnapshotInProgress.Dec() }() - if apply.snapshot.Metadata.Index <= ep.appliedi { + if toApply.snapshot.Metadata.Index <= ep.appliedi { lg.Panic( "unexpected leader snapshot from outdated index", zap.Uint64("current-snapshot-index", ep.snapi), zap.Uint64("current-applied-index", ep.appliedi), - zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), - zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), + zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), + zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), ) } // wait for raftNode to persist snapshot onto the disk - <-apply.notifyc + <-toApply.notifyc - newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks) + newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks) if err != nil { lg.Panic("failed to open snapshot backend", zap.Error(err)) } @@ -1033,7 +1033,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } lg.Info("restoring v2 store") - if err := s.v2store.Recovery(apply.snapshot.Data); err != nil { + if err := s.v2store.Recovery(toApply.snapshot.Data); err != nil { lg.Panic("failed to restore v2 store", zap.Error(err)) } @@ -1067,18 +1067,18 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Info("added peers from new cluster configuration") - ep.appliedt = apply.snapshot.Metadata.Term - ep.appliedi = apply.snapshot.Metadata.Index + ep.appliedt = toApply.snapshot.Metadata.Term + ep.appliedi = toApply.snapshot.Metadata.Index ep.snapi = ep.appliedi - ep.confState = apply.snapshot.Metadata.ConfState + ep.confState = toApply.snapshot.Metadata.ConfState // As backends and implementations like alarmsStore changed, we need // to re-bootstrap Appliers. s.uberApply = s.NewUberApplier() } -func (s *EtcdServer) NewUberApplier() *apply2.UberApplier { - return apply2.NewUberApplier(s.lg, s.be, s.KV(), s.alarmStore, s.authStore, s.lessor, s.cluster, s, s, s.consistIndex, +func (s *EtcdServer) NewUberApplier() *apply.UberApplier { + return apply.NewUberApplier(s.lg, s.be, s.KV(), s.alarmStore, s.authStore, s.lessor, s.cluster, s, s, s.consistIndex, s.Cfg.WarningApplyDuration, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.Cfg.QuotaBackendBytes) } @@ -1090,7 +1090,7 @@ func verifySnapshotIndex(snapshot raftpb.Snapshot, cindex uint64) { }) } -func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { +func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) { if len(apply.entries) == 0 { return } @@ -1277,7 +1277,7 @@ func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) err // Note that this permission check is done in the API layer, // so TOCTOU problem can be caused potentially in a schedule like this: - // update membership with user A -> revoke root role of A -> apply membership change + // update membership with user A -> revoke root role of A -> toApply membership change // in the state machine layer // However, both of membership change and role management requires the root privilege. // So careful operation by admins can prevent the problem. @@ -1469,7 +1469,7 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error { // check whether the learner catches up with leader or not. // Note: it will return nil if member is not found in cluster or if member is not learner. -// These two conditions will be checked before apply phase later. +// These two conditions will be checked before toApply phase later. func (s *EtcdServer) isLearnerReady(id uint64) error { rs := s.raftStatus() @@ -1774,7 +1774,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { }) } -// apply takes entries received from Raft (after it has been committed) and +// toApply takes entries received from Raft (after it has been committed) and // applies them to the current state of the EtcdServer. // The given entries should not be empty. func (s *EtcdServer) apply( @@ -1795,7 +1795,7 @@ func (s *EtcdServer) apply( s.setTerm(e.Term) case raftpb.EntryConfChange: - // We need to apply all WAL entries on top of v2store + // We need to toApply all WAL entries on top of v2store // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend. shouldApplyV3 := membership.ApplyV2storeOnly @@ -1829,7 +1829,7 @@ func (s *EtcdServer) apply( func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly applyV3Performed := false - var ar *apply2.ApplyResult + var ar *apply.ApplyResult index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry @@ -1843,7 +1843,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { } }() } - s.lg.Debug("apply entry normal", + s.lg.Debug("toApply entry normal", zap.Uint64("consistent-index", index), zap.Uint64("entry-index", e.Index), zap.Bool("should-applyV3", bool(shouldApplyV3))) @@ -1895,7 +1895,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { ar = s.uberApply.Apply(&raftReq, shouldApplyV3) } - // do not re-apply applied entries. + // do not re-toApply applied entries. if !shouldApplyV3 { return } @@ -2039,7 +2039,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // KV().commit() updates the consistent index in backend. // All operations that update consistent index must be called sequentially // from applyAll function. - // So KV().Commit() cannot run in parallel with apply. It has to be called outside + // So KV().Commit() cannot run in parallel with toApply. It has to be called outside // the go routine created below. s.KV().Commit() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 6e344b079..fe770ec09 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -723,7 +723,7 @@ func realisticRaftNode(lg *zap.Logger) *raftNode { return r } -// TestApplyMultiConfChangeShouldStop ensures that apply will return shouldStop +// TestApplyMultiConfChangeShouldStop ensures that toApply will return shouldStop // if the local member is removed along with other conf updates. func TestApplyMultiConfChangeShouldStop(t *testing.T) { lg := zaptest.NewLogger(t) diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 59113f1b9..da6542299 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -43,7 +43,7 @@ import ( const ( // In the health case, there might be a small gap (10s of entries) between // the applied index and committed index. - // However, if the committed entries are very heavy to apply, the gap might grow. + // However, if the committed entries are very heavy to toApply, the gap might grow. // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 traceThreshold = 100 * time.Millisecond @@ -63,9 +63,9 @@ type RaftKV interface { } type Lessor interface { - // LeaseGrant sends LeaseGrant request to raft and apply it after committed. + // LeaseGrant sends LeaseGrant request to raft and toApply it after committed. LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) - // LeaseRevoke sends LeaseRevoke request to raft and apply it after committed. + // LeaseRevoke sends LeaseRevoke request to raft and toApply it after committed. LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) // LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error @@ -223,7 +223,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. s.bemu.RLock() s.be.ForceCommit() s.bemu.RUnlock() - trace.Step("physically apply compaction") + trace.Step("physically toApply compaction") } if err != nil { return nil, err @@ -617,9 +617,9 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque } if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.Trace != nil { applyStart := result.Trace.GetStartTime() - // The trace object is created in apply. Here reset the start time to trace + // The trace object is created in toApply. Here reset the start time to trace // the raft request time by the difference between the request start time - // and apply start time + // and toApply start time result.Trace.SetStartTime(startTime) result.Trace.InsertStep(0, applyStart, "process raft request") result.Trace.LogIfLong(traceThreshold)