Rename the txn, so as not to be the same as the package name.

This commit is contained in:
Piotr Tabor 2022-04-28 16:56:23 +02:00
parent 0da0cf4795
commit 42c6e08f22

View File

@ -29,7 +29,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{} resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{} resp.Header = &pb.ResponseHeader{}
trace = traceutil.Get(ctx) trace = traceutil.Get(ctx)
@ -42,20 +42,20 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t
) )
} }
val, leaseID := p.Value, lease.LeaseID(p.Lease) val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil { if txnWrite == nil {
if leaseID != lease.NoLease { if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil { if l := lessor.Lookup(leaseID); l == nil {
return nil, nil, lease.ErrLeaseNotFound return nil, nil, lease.ErrLeaseNotFound
} }
} }
txn = kv.Write(trace) txnWrite = kv.Write(trace)
defer txn.End() defer txnWrite.End()
} }
var rr *mvcc.RangeResult var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv { if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.StepWithFunction(func() { trace.StepWithFunction(func() {
rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{}) rr, err = txnWrite.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
}, "get previous kv pair") }, "get previous kv pair")
if err != nil { if err != nil {
@ -80,23 +80,23 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t
} }
} }
resp.Header.Revision = txn.Put(p.Key, val, leaseID) resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID)
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp, trace, nil return resp, trace, nil
} }
func DeleteRange(kv mvcc.KV, txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { func DeleteRange(kv mvcc.KV, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{} resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{} resp.Header = &pb.ResponseHeader{}
end := mkGteRange(dr.RangeEnd) end := mkGteRange(dr.RangeEnd)
if txn == nil { if txnWrite == nil {
txn = kv.Write(traceutil.TODO()) txnWrite = kv.Write(traceutil.TODO())
defer txn.End() defer txnWrite.End()
} }
if dr.PrevKv { if dr.PrevKv {
rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{}) rr, err := txnWrite.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -108,19 +108,19 @@ func DeleteRange(kv mvcc.KV, txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.
} }
} }
resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end) resp.Deleted, resp.Header.Revision = txnWrite.DeleteRange(dr.Key, end)
return resp, nil return resp, nil
} }
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx) trace := traceutil.Get(ctx)
resp := &pb.RangeResponse{} resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{} resp.Header = &pb.ResponseHeader{}
if txn == nil { if txnRead == nil {
txn = kv.Read(mvcc.ConcurrentReadTxMode, trace) txnRead = kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txn.End() defer txnRead.End()
} }
limit := r.Limit limit := r.Limit
@ -141,7 +141,7 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txn mvcc.TxnRead, r
Count: r.CountOnly, Count: r.CountOnly,
} }
rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -227,50 +227,50 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
// When the transaction contains write operations, we use ReadTx instead of // When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer. // ConcurrentReadTx to avoid extra overhead of copying buffer.
var txn mvcc.TxnWrite var txnWrite mvcc.TxnWrite
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer*/ { if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer*/ {
txn = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace)) txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace))
} else { } else {
txn = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace)) txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace))
} }
var txnPath []bool var txnPath []bool
trace.StepWithFunction( trace.StepWithFunction(
func() { func() {
txnPath = compareToPath(txn, rt) txnPath = compareToPath(txnWrite, rt)
}, },
"compare", "compare",
) )
if isWrite { if isWrite {
trace.AddField(traceutil.Field{Key: "read_only", Value: false}) trace.AddField(traceutil.Field{Key: "read_only", Value: false})
if _, err := checkRequests(txn, rt, txnPath, if _, err := checkRequests(txnWrite, rt, txnPath,
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil { func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
txn.End() txnWrite.End()
return nil, nil, err return nil, nil, err
} }
} }
if _, err := checkRequests(txn, rt, txnPath, checkRequestRange); err != nil { if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil {
txn.End() txnWrite.End()
return nil, nil, err return nil, nil, err
} }
trace.Step("check requests") trace.Step("check requests")
txnResp, _ := newTxnResp(rt, txnPath) txnResp, _ := newTxnResp(rt, txnPath)
// When executing mutable txn ops, etcd must hold the txn lock so // When executing mutable txnWrite ops, etcd must hold the txnWrite lock so
// readers do not see any intermediate results. Since writes are // readers do not see any intermediate results. Since writes are
// serialized on the raft loop, the revision in the read view will // serialized on the raft loop, the revision in the read view will
// be the revision of the write txn. // be the revision of the write txnWrite.
if isWrite { if isWrite {
txn.End() txnWrite.End()
txn = kv.Write(trace) txnWrite = kv.Write(trace)
} }
applyTxn(ctx, lg, kv, lessor, txn, rt, txnPath, txnResp) applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
rev := txn.Rev() rev := txnWrite.Rev()
if len(txn.Changes()) != 0 { if len(txnWrite.Changes()) != 0 {
rev++ rev++
} }
txn.End() txnWrite.End()
txnResp.Header.Revision = rev txnResp.Header.Revision = rev
trace.AddField( trace.AddField(
@ -311,7 +311,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn
return txnResp, txnCount return txnResp, txnCount
} }
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
trace := traceutil.Get(ctx) trace := traceutil.Get(ctx)
reqs := rt.Success reqs := rt.Success
if !txnPath[0] { if !txnPath[0] {
@ -326,9 +326,9 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
traceutil.Field{Key: "req_type", Value: "range"}, traceutil.Field{Key: "req_type", Value: "range"},
traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)}, traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)}) traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
resp, err := Range(ctx, lg, kv, txn, tv.RequestRange) resp, err := Range(ctx, lg, kv, txnWrite, tv.RequestRange)
if err != nil { if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err)) lg.Panic("unexpected error during txnWrite", zap.Error(err))
} }
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
trace.StopSubTrace() trace.StopSubTrace()
@ -337,21 +337,21 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
traceutil.Field{Key: "req_type", Value: "put"}, traceutil.Field{Key: "req_type", Value: "put"},
traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)}, traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()}) traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
resp, _, err := Put(ctx, lg, lessor, kv, txn, tv.RequestPut) resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut)
if err != nil { if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err)) lg.Panic("unexpected error during txnWrite", zap.Error(err))
} }
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
trace.StopSubTrace() trace.StopSubTrace()
case *pb.RequestOp_RequestDeleteRange: case *pb.RequestOp_RequestDeleteRange:
resp, err := DeleteRange(kv, txn, tv.RequestDeleteRange) resp, err := DeleteRange(kv, txnWrite, tv.RequestDeleteRange)
if err != nil { if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err)) lg.Panic("unexpected error during txnWrite", zap.Error(err))
} }
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn: case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns := applyTxn(ctx, lg, kv, lessor, txn, tv.RequestTxn, txnPath[1:], resp) applyTxns := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp)
txns += applyTxns + 1 txns += applyTxns + 1
txnPath = txnPath[applyTxns+1:] txnPath = txnPath[applyTxns+1:]
default: default: