Rename WrapApply to Apply.

This commit is contained in:
Piotr Tabor 2022-05-13 14:18:28 +02:00
parent 87b80f16ca
commit 85b18c9b3e
6 changed files with 19 additions and 18 deletions

View File

@ -52,7 +52,7 @@ type RaftStatusGetter interface {
Term() uint64 Term() uint64
} }
type ApplyResult struct { type Result struct {
Resp proto.Message Resp proto.Message
Err error Err error
// Physc signals the physical effect of the request has completed in addition // Physc signals the physical effect of the request has completed in addition
@ -62,12 +62,13 @@ type ApplyResult struct {
Trace *traceutil.Trace Trace *traceutil.Trace
} }
type ApplyFunc func(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult type applyFunc func(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result
// applierV3 is the interface for processing V3 raft messages // applierV3 is the interface for processing V3 raft messages
type applierV3 interface { type applierV3 interface {
WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *ApplyResult // Apply executes the generic portion of application logic for the current applier, but
//Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult // delegates the actual execution to the applyFunc method.
Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result
Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
@ -151,7 +152,7 @@ func newApplierV3Backend(
txnModeWriteWithSharedBuffer: txnModeWriteWithSharedBuffer} txnModeWriteWithSharedBuffer: txnModeWriteWithSharedBuffer}
} }
func (a *applierV3backend) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *ApplyResult { func (a *applierV3backend) Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
return applyFunc(ctx, r, shouldApplyV3) return applyFunc(ctx, r, shouldApplyV3)
} }

View File

@ -43,7 +43,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a
return &authApplierV3{applierV3: base, as: as, lessor: lessor} return &authApplierV3{applierV3: base, as: as, lessor: lessor}
} }
func (aa *authApplierV3) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *ApplyResult { func (aa *authApplierV3) Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
aa.mu.Lock() aa.mu.Lock()
defer aa.mu.Unlock() defer aa.mu.Unlock()
if r.Header != nil { if r.Header != nil {
@ -56,10 +56,10 @@ func (aa *authApplierV3) WrapApply(ctx context.Context, r *pb.InternalRaftReques
if err := aa.as.IsAdminPermitted(&aa.authInfo); err != nil { if err := aa.as.IsAdminPermitted(&aa.authInfo); err != nil {
aa.authInfo.Username = "" aa.authInfo.Username = ""
aa.authInfo.Revision = 0 aa.authInfo.Revision = 0
return &ApplyResult{Err: err} return &Result{Err: err}
} }
} }
ret := aa.applierV3.WrapApply(ctx, r, shouldApplyV3, applyFunc) ret := aa.applierV3.Apply(ctx, r, shouldApplyV3, applyFunc)
aa.authInfo.Username = "" aa.authInfo.Username = ""
aa.authInfo.Revision = 0 aa.authInfo.Revision = 0
return ret return ret

View File

@ -31,7 +31,7 @@ import (
) )
type UberApplier interface { type UberApplier interface {
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result
} }
type uberApplier struct { type uberApplier struct {
@ -107,20 +107,20 @@ func (a *uberApplier) restoreAlarms() {
} }
} }
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult { func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
// We first execute chain of WrapApply() calls down the hierarchy: // We first execute chain of Apply() calls down the hierarchy:
// (i.e. CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend), // (i.e. CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend),
// then dispatch() unpacks the request to a specific method (like Put), // then dispatch() unpacks the request to a specific method (like Put),
// that gets executed down the hierarchy again: // that gets executed down the hierarchy again:
// i.e. CorruptApplier.Put(CappedApplier.Put(...(BackendApplier.Put(...)))). // i.e. CorruptApplier.Put(CappedApplier.Put(...(BackendApplier.Put(...)))).
return a.applyV3.WrapApply(context.TODO(), r, shouldApplyV3, a.dispatch) return a.applyV3.Apply(context.TODO(), r, shouldApplyV3, a.dispatch)
} }
// dispatch translates the request (r) into appropriate call (like Put) on // dispatch translates the request (r) into appropriate call (like Put) on
// the underlying applyV3 object. // the underlying applyV3 object.
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult { func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
op := "unknown" op := "unknown"
ar := &ApplyResult{} ar := &Result{}
defer func(start time.Time) { defer func(start time.Time) {
success := ar.Err == nil || ar.Err == mvcc.ErrCompacted success := ar.Err == nil || ar.Err == mvcc.ErrCompacted
txn.ApplySecObserve(v3Version, op, success, time.Since(start)) txn.ApplySecObserve(v3Version, op, success, time.Since(start))

View File

@ -1829,7 +1829,7 @@ func (s *EtcdServer) apply(
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false applyV3Performed := false
var ar *apply.ApplyResult var ar *apply.Result
index := s.consistIndex.ConsistentIndex() index := s.consistIndex.ConsistentIndex()
if e.Index > index { if e.Index > index {
// set the consistent index of current executing entry // set the consistent index of current executing entry

View File

@ -1479,7 +1479,7 @@ func TestPublishV3(t *testing.T) {
n := newNodeRecorder() n := newNodeRecorder()
ch := make(chan interface{}, 1) ch := make(chan interface{}, 1)
// simulate that request has gone through consensus // simulate that request has gone through consensus
ch <- &apply2.ApplyResult{} ch <- &apply2.Result{}
w := wait.NewWithResponse(ch) w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
lg := zaptest.NewLogger(t) lg := zaptest.NewLogger(t)

View File

@ -656,7 +656,7 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
return nil return nil
} }
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.ApplyResult, error) { func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
ai := s.getAppliedIndex() ai := s.getAppliedIndex()
ci := s.getCommittedIndex() ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex { if ci > ai+maxGapBetweenApplyAndCommitIndex {
@ -709,7 +709,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
select { select {
case x := <-ch: case x := <-ch:
return x.(*apply2.ApplyResult), nil return x.(*apply2.Result), nil
case <-cctx.Done(): case <-cctx.Done():
proposalsFailed.Inc() proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait s.w.Trigger(id, nil) // GC wait