From 42c6e08f22b7b93166c25f093b0cd0364ed3dbaa Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 28 Apr 2022 16:56:23 +0200 Subject: [PATCH] Rename the txn, so as not to be the same as the package name. --- server/etcdserver/txn/txn.go | 82 ++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/server/etcdserver/txn/txn.go b/server/etcdserver/txn/txn.go index b3b5f2c4e..36782d34b 100644 --- a/server/etcdserver/txn/txn.go +++ b/server/etcdserver/txn/txn.go @@ -29,7 +29,7 @@ import ( "go.uber.org/zap" ) -func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { +func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} trace = traceutil.Get(ctx) @@ -42,20 +42,20 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t ) } val, leaseID := p.Value, lease.LeaseID(p.Lease) - if txn == nil { + if txnWrite == nil { if leaseID != lease.NoLease { if l := lessor.Lookup(leaseID); l == nil { return nil, nil, lease.ErrLeaseNotFound } } - txn = kv.Write(trace) - defer txn.End() + txnWrite = kv.Write(trace) + defer txnWrite.End() } var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { trace.StepWithFunction(func() { - rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) + rr, err = txnWrite.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) }, "get previous kv pair") if err != nil { @@ -80,23 +80,23 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t } } - resp.Header.Revision = txn.Put(p.Key, val, leaseID) + resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID) trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) return resp, trace, nil } -func DeleteRange(kv mvcc.KV, txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func DeleteRange(kv mvcc.KV, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} end := mkGteRange(dr.RangeEnd) - if txn == nil { - txn = kv.Write(traceutil.TODO()) - defer txn.End() + if txnWrite == nil { + txnWrite = kv.Write(traceutil.TODO()) + defer txnWrite.End() } if dr.PrevKv { - rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{}) + rr, err := txnWrite.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{}) if err != nil { return nil, err } @@ -108,19 +108,19 @@ func DeleteRange(kv mvcc.KV, txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb. } } - resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end) + resp.Deleted, resp.Header.Revision = txnWrite.DeleteRange(dr.Key, end) return resp, nil } -func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { trace := traceutil.Get(ctx) resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} - if txn == nil { - txn = kv.Read(mvcc.ConcurrentReadTxMode, trace) - defer txn.End() + if txnRead == nil { + txnRead = kv.Read(mvcc.ConcurrentReadTxMode, trace) + defer txnRead.End() } limit := r.Limit @@ -141,7 +141,7 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txn mvcc.TxnRead, r Count: r.CountOnly, } - rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) + rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) if err != nil { return nil, err } @@ -227,50 +227,50 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit // When the transaction contains write operations, we use ReadTx instead of // ConcurrentReadTx to avoid extra overhead of copying buffer. - var txn mvcc.TxnWrite + var txnWrite mvcc.TxnWrite if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer*/ { - txn = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace)) + txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace)) } else { - txn = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace)) + txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace)) } var txnPath []bool trace.StepWithFunction( func() { - txnPath = compareToPath(txn, rt) + txnPath = compareToPath(txnWrite, rt) }, "compare", ) if isWrite { trace.AddField(traceutil.Field{Key: "read_only", Value: false}) - if _, err := checkRequests(txn, rt, txnPath, + if _, err := checkRequests(txnWrite, rt, txnPath, func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil { - txn.End() + txnWrite.End() return nil, nil, err } } - if _, err := checkRequests(txn, rt, txnPath, checkRequestRange); err != nil { - txn.End() + if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil { + txnWrite.End() return nil, nil, err } trace.Step("check requests") txnResp, _ := newTxnResp(rt, txnPath) - // When executing mutable txn ops, etcd must hold the txn lock so + // When executing mutable txnWrite ops, etcd must hold the txnWrite lock so // readers do not see any intermediate results. Since writes are // serialized on the raft loop, the revision in the read view will - // be the revision of the write txn. + // be the revision of the write txnWrite. if isWrite { - txn.End() - txn = kv.Write(trace) + txnWrite.End() + txnWrite = kv.Write(trace) } - applyTxn(ctx, lg, kv, lessor, txn, rt, txnPath, txnResp) - rev := txn.Rev() - if len(txn.Changes()) != 0 { + applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp) + rev := txnWrite.Rev() + if len(txnWrite.Changes()) != 0 { rev++ } - txn.End() + txnWrite.End() txnResp.Header.Revision = rev trace.AddField( @@ -311,7 +311,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn return txnResp, txnCount } -func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { +func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { trace := traceutil.Get(ctx) reqs := rt.Success if !txnPath[0] { @@ -326,9 +326,9 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less traceutil.Field{Key: "req_type", Value: "range"}, traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)}, traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)}) - resp, err := Range(ctx, lg, kv, txn, tv.RequestRange) + resp, err := Range(ctx, lg, kv, txnWrite, tv.RequestRange) if err != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) + lg.Panic("unexpected error during txnWrite", zap.Error(err)) } respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp trace.StopSubTrace() @@ -337,21 +337,21 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less traceutil.Field{Key: "req_type", Value: "put"}, traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)}, traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()}) - resp, _, err := Put(ctx, lg, lessor, kv, txn, tv.RequestPut) + resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut) if err != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) + lg.Panic("unexpected error during txnWrite", zap.Error(err)) } respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp trace.StopSubTrace() case *pb.RequestOp_RequestDeleteRange: - resp, err := DeleteRange(kv, txn, tv.RequestDeleteRange) + resp, err := DeleteRange(kv, txnWrite, tv.RequestDeleteRange) if err != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) + lg.Panic("unexpected error during txnWrite", zap.Error(err)) } respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp case *pb.RequestOp_RequestTxn: resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn - applyTxns := applyTxn(ctx, lg, kv, lessor, txn, tv.RequestTxn, txnPath[1:], resp) + applyTxns := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp) txns += applyTxns + 1 txnPath = txnPath[applyTxns+1:] default: