mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Separate internal functions for recursion and have public function create transaction and trace
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
9ad5189d3d
commit
fa21c07baa
@ -71,9 +71,9 @@ type applierV3 interface {
|
||||
// delegates the actual execution to the applyFunc method.
|
||||
Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result
|
||||
|
||||
Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
||||
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
||||
Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
|
||||
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
|
||||
DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
|
||||
Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
|
||||
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
|
||||
|
||||
@ -157,16 +157,16 @@ func (a *applierV3backend) Apply(ctx context.Context, r *pb.InternalRaftRequest,
|
||||
return applyFunc(ctx, r, shouldApplyV3)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
|
||||
return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, txn, p)
|
||||
func (a *applierV3backend) Put(ctx context.Context, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
|
||||
return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, p)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
return mvcctxn.DeleteRange(a.kv, txn, dr)
|
||||
func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
return mvcctxn.DeleteRange(a.kv, dr)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
return mvcctxn.Range(ctx, a.lg, a.kv, txn, r)
|
||||
func (a *applierV3backend) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
return mvcctxn.Range(ctx, a.lg, a.kv, r)
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
||||
@ -255,7 +255,7 @@ type applierV3Capped struct {
|
||||
// with Puts so that the number of keys in the store is capped.
|
||||
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
|
||||
|
||||
func (a *applierV3Capped) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
func (a *applierV3Capped) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
return nil, nil, errors.ErrNoSpace
|
||||
}
|
||||
|
||||
@ -447,9 +447,9 @@ func newQuotaApplierV3(lg *zap.Logger, quotaBackendBytesCfg int64, be backend.Ba
|
||||
return "aApplierV3{app, serverstorage.NewBackendQuota(lg, quotaBackendBytesCfg, be, "v3-applier")}
|
||||
}
|
||||
|
||||
func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
func (a *quotaApplierV3) Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
ok := a.q.Available(p)
|
||||
resp, trace, err := a.applierV3.Put(ctx, txn, p)
|
||||
resp, trace, err := a.applierV3.Put(ctx, p)
|
||||
if err == nil && !ok {
|
||||
err = errors.ErrNoSpace
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/txn"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
)
|
||||
|
||||
type authApplierV3 struct {
|
||||
@ -65,7 +64,7 @@ func (aa *authApplierV3) Apply(ctx context.Context, r *pb.InternalRaftRequest, s
|
||||
return ret
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
func (aa *authApplierV3) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -84,17 +83,17 @@ func (aa *authApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, r *pb.PutRe
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
return aa.applierV3.Put(ctx, txn, r)
|
||||
return aa.applierV3.Put(ctx, r)
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
func (aa *authApplierV3) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return aa.applierV3.Range(ctx, txn, r)
|
||||
return aa.applierV3.Range(ctx, r)
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
func (aa *authApplierV3) DeleteRange(r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -105,7 +104,7 @@ func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest
|
||||
}
|
||||
}
|
||||
|
||||
return aa.applierV3.DeleteRange(txn, r)
|
||||
return aa.applierV3.DeleteRange(r)
|
||||
}
|
||||
|
||||
func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
|
||||
|
@ -445,7 +445,7 @@ func TestAuthApplierV3_Put(t *testing.T) {
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
setAuthInfo(authApplier, tc.userName)
|
||||
_, _, err := authApplier.Put(ctx, nil, tc.request)
|
||||
_, _, err := authApplier.Put(ctx, tc.request)
|
||||
require.Equalf(t, tc.expectError, err, "Put returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
|
||||
})
|
||||
}
|
||||
@ -466,7 +466,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
|
||||
// The user should be able to put the key
|
||||
setAuthInfo(authApplier, userWriteOnly)
|
||||
_, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
@ -475,7 +475,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
|
||||
// Put a key under the lease outside user's key range
|
||||
setAuthInfo(authApplier, userRoot)
|
||||
_, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(keyOutsideRange),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
@ -484,7 +484,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {
|
||||
|
||||
// The user should not be able to put the key anymore
|
||||
setAuthInfo(authApplier, userWriteOnly)
|
||||
_, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(key),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
@ -532,7 +532,7 @@ func TestAuthApplierV3_Range(t *testing.T) {
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
setAuthInfo(authApplier, tc.userName)
|
||||
_, err := authApplier.Range(ctx, nil, tc.request)
|
||||
_, err := authApplier.Range(ctx, tc.request)
|
||||
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
|
||||
})
|
||||
}
|
||||
@ -596,7 +596,7 @@ func TestAuthApplierV3_DeleteRange(t *testing.T) {
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
setAuthInfo(authApplier, tc.userName)
|
||||
_, err := authApplier.DeleteRange(nil, tc.request)
|
||||
_, err := authApplier.DeleteRange(tc.request)
|
||||
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
|
||||
})
|
||||
}
|
||||
@ -703,7 +703,7 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) {
|
||||
|
||||
// Put a key under the lease outside user's key range
|
||||
setAuthInfo(authApplier, userRoot)
|
||||
_, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{
|
||||
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
|
||||
Key: []byte(keyOutsideRange),
|
||||
Value: []byte("1"),
|
||||
Lease: LeaseId,
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/errors"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
)
|
||||
|
||||
type applierV3Corrupt struct {
|
||||
@ -29,15 +28,15 @@ type applierV3Corrupt struct {
|
||||
|
||||
func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }
|
||||
|
||||
func (a *applierV3Corrupt) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
func (a *applierV3Corrupt) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
|
||||
return nil, nil, errors.ErrCorrupt
|
||||
}
|
||||
|
||||
func (a *applierV3Corrupt) Range(_ context.Context, _ mvcc.TxnRead, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
func (a *applierV3Corrupt) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
return nil, errors.ErrCorrupt
|
||||
}
|
||||
|
||||
func (a *applierV3Corrupt) DeleteRange(_ mvcc.TxnWrite, _ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
func (a *applierV3Corrupt) DeleteRange(_ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
return nil, errors.ErrCorrupt
|
||||
}
|
||||
|
||||
|
@ -153,13 +153,13 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s
|
||||
switch {
|
||||
case r.Range != nil:
|
||||
op = "Range"
|
||||
ar.Resp, ar.Err = a.applyV3.Range(ctx, nil, r.Range)
|
||||
ar.Resp, ar.Err = a.applyV3.Range(ctx, r.Range)
|
||||
case r.Put != nil:
|
||||
op = "Put"
|
||||
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, nil, r.Put)
|
||||
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, r.Put)
|
||||
case r.DeleteRange != nil:
|
||||
op = "DeleteRange"
|
||||
ar.Resp, ar.Err = a.applyV3.DeleteRange(nil, r.DeleteRange)
|
||||
ar.Resp, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
|
||||
case r.Txn != nil:
|
||||
op = "Txn"
|
||||
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(ctx, r.Txn)
|
||||
|
@ -31,9 +31,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
)
|
||||
|
||||
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
|
||||
resp = &pb.PutResponse{}
|
||||
resp.Header = &pb.ResponseHeader{}
|
||||
func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
|
||||
trace = traceutil.Get(ctx)
|
||||
// create put tracing if the trace in context is empty
|
||||
if trace.IsEmpty() {
|
||||
@ -42,17 +40,25 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t
|
||||
traceutil.Field{Key: "key", Value: string(p.Key)},
|
||||
traceutil.Field{Key: "req_size", Value: p.Size()},
|
||||
)
|
||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||
}
|
||||
val, leaseID := p.Value, lease.LeaseID(p.Lease)
|
||||
if txnWrite == nil {
|
||||
if leaseID != lease.NoLease {
|
||||
if l := lessor.Lookup(leaseID); l == nil {
|
||||
return nil, nil, lease.ErrLeaseNotFound
|
||||
}
|
||||
leaseID := lease.LeaseID(p.Lease)
|
||||
if leaseID != lease.NoLease {
|
||||
if l := lessor.Lookup(leaseID); l == nil {
|
||||
return nil, nil, lease.ErrLeaseNotFound
|
||||
}
|
||||
txnWrite = kv.Write(trace)
|
||||
defer txnWrite.End()
|
||||
}
|
||||
txnWrite := kv.Write(trace)
|
||||
defer txnWrite.End()
|
||||
resp, err = put(ctx, txnWrite, p)
|
||||
return resp, trace, err
|
||||
}
|
||||
|
||||
func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) {
|
||||
trace := traceutil.Get(ctx)
|
||||
resp = &pb.PutResponse{}
|
||||
resp.Header = &pb.ResponseHeader{}
|
||||
val, leaseID := p.Value, lease.LeaseID(p.Lease)
|
||||
|
||||
var rr *mvcc.RangeResult
|
||||
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
|
||||
@ -61,13 +67,13 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t
|
||||
}, "get previous kv pair")
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if p.IgnoreValue || p.IgnoreLease {
|
||||
if rr == nil || len(rr.KVs) == 0 {
|
||||
// ignore_{lease,value} flag expects previous key-value pair
|
||||
return nil, nil, errors.ErrKeyNotFound
|
||||
return nil, errors.ErrKeyNotFound
|
||||
}
|
||||
}
|
||||
if p.IgnoreValue {
|
||||
@ -84,19 +90,21 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t
|
||||
|
||||
resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID)
|
||||
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
|
||||
return resp, trace, nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func DeleteRange(kv mvcc.KV, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
func DeleteRange(kv mvcc.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
txnWrite := kv.Write(traceutil.TODO())
|
||||
defer txnWrite.End()
|
||||
resp, err := deleteRange(txnWrite, dr)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func deleteRange(txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
|
||||
resp := &pb.DeleteRangeResponse{}
|
||||
resp.Header = &pb.ResponseHeader{}
|
||||
end := mkGteRange(dr.RangeEnd)
|
||||
|
||||
if txnWrite == nil {
|
||||
txnWrite = kv.Write(traceutil.TODO())
|
||||
defer txnWrite.End()
|
||||
}
|
||||
|
||||
if dr.PrevKv {
|
||||
rr, err := txnWrite.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
|
||||
if err != nil {
|
||||
@ -114,17 +122,23 @@ func DeleteRange(kv mvcc.KV, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
trace := traceutil.Get(ctx)
|
||||
if trace.IsEmpty() {
|
||||
trace = traceutil.New("range", lg)
|
||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||
}
|
||||
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
|
||||
defer txnRead.End()
|
||||
return executeRange(ctx, lg, txnRead, r)
|
||||
}
|
||||
|
||||
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
|
||||
trace := traceutil.Get(ctx)
|
||||
|
||||
resp := &pb.RangeResponse{}
|
||||
resp.Header = &pb.ResponseHeader{}
|
||||
|
||||
if txnRead == nil {
|
||||
txnRead = kv.Read(mvcc.ConcurrentReadTxMode, trace)
|
||||
defer txnRead.End()
|
||||
}
|
||||
|
||||
limit := r.Limit
|
||||
if r.SortOrder != pb.RangeRequest_NONE ||
|
||||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
|
||||
@ -226,7 +240,6 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||
}
|
||||
isWrite := !IsTxnReadonly(rt)
|
||||
|
||||
// When the transaction contains write operations, we use ReadTx instead of
|
||||
// ConcurrentReadTx to avoid extra overhead of copying buffer.
|
||||
var txnWrite mvcc.TxnWrite
|
||||
@ -235,7 +248,6 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
} else {
|
||||
txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace))
|
||||
}
|
||||
|
||||
var txnPath []bool
|
||||
trace.StepWithFunction(
|
||||
func() {
|
||||
@ -243,22 +255,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
},
|
||||
"compare",
|
||||
)
|
||||
|
||||
if isWrite {
|
||||
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
|
||||
if _, err := checkRequests(txnWrite, rt, txnPath,
|
||||
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
|
||||
txnWrite.End()
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil {
|
||||
err := checkTxn(ctx, txnWrite, rt, isWrite, lessor, txnPath)
|
||||
if err != nil {
|
||||
txnWrite.End()
|
||||
return nil, nil, err
|
||||
}
|
||||
trace.Step("check requests")
|
||||
txnResp, _ := newTxnResp(rt, txnPath)
|
||||
|
||||
// When executing mutable txnWrite ops, etcd must hold the txnWrite lock so
|
||||
// readers do not see any intermediate results. Since writes are
|
||||
// serialized on the raft loop, the revision in the read view will
|
||||
@ -267,7 +268,35 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
txnWrite.End()
|
||||
txnWrite = kv.Write(trace)
|
||||
}
|
||||
_, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
|
||||
txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
|
||||
txnWrite.End()
|
||||
|
||||
trace.AddField(
|
||||
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
|
||||
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
|
||||
)
|
||||
return txnResp, trace, err
|
||||
}
|
||||
|
||||
func checkTxn(ctx context.Context, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, lessor lease.Lessor, txnPath []bool) error {
|
||||
trace := traceutil.Get(ctx)
|
||||
if isWrite {
|
||||
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
|
||||
if _, err := checkRequests(txnWrite, rt, txnPath,
|
||||
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("check requests")
|
||||
return nil
|
||||
}
|
||||
|
||||
func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
|
||||
txnResp, _ := newTxnResp(rt, txnPath)
|
||||
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp)
|
||||
if err != nil {
|
||||
if isWrite {
|
||||
// end txn to release locks before panic
|
||||
@ -283,14 +312,8 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
if len(txnWrite.Changes()) != 0 {
|
||||
rev++
|
||||
}
|
||||
txnWrite.End()
|
||||
|
||||
txnResp.Header.Revision = rev
|
||||
trace.AddField(
|
||||
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
|
||||
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
|
||||
)
|
||||
return txnResp, trace, err
|
||||
return txnResp, err
|
||||
}
|
||||
|
||||
// newTxnResp allocates a txn response for a txn request given a path.
|
||||
@ -324,7 +347,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn
|
||||
return txnResp, txnCount
|
||||
}
|
||||
|
||||
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
|
||||
func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
|
||||
trace := traceutil.Get(ctx)
|
||||
reqs := rt.Success
|
||||
if !txnPath[0] {
|
||||
@ -339,7 +362,7 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
|
||||
traceutil.Field{Key: "req_type", Value: "range"},
|
||||
traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
|
||||
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
|
||||
resp, err := Range(ctx, lg, kv, txnWrite, tv.RequestRange)
|
||||
resp, err := executeRange(ctx, lg, txnWrite, tv.RequestRange)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
|
||||
}
|
||||
@ -350,21 +373,21 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less
|
||||
traceutil.Field{Key: "req_type", Value: "put"},
|
||||
traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
|
||||
traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
|
||||
resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut)
|
||||
resp, err := put(ctx, txnWrite, tv.RequestPut)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
|
||||
}
|
||||
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
|
||||
trace.StopSubTrace()
|
||||
case *pb.RequestOp_RequestDeleteRange:
|
||||
resp, err := DeleteRange(kv, txnWrite, tv.RequestDeleteRange)
|
||||
resp, err := deleteRange(txnWrite, tv.RequestDeleteRange)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
|
||||
}
|
||||
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
|
||||
case *pb.RequestOp_RequestTxn:
|
||||
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
|
||||
applyTxns, err := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp)
|
||||
applyTxns, err := executeTxn(ctx, lg, txnWrite, tv.RequestTxn, txnPath[1:], resp)
|
||||
if err != nil {
|
||||
// don't wrap the error. It's a recursive call and err should be already wrapped
|
||||
return 0, err
|
||||
|
@ -131,7 +131,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
|
||||
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
|
||||
}
|
||||
|
||||
get := func() { resp, err = txn.Range(ctx, s.Logger(), s.KV(), nil, r) }
|
||||
get := func() { resp, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
|
||||
if serr := s.doSerialize(ctx, chk, get); serr != nil {
|
||||
err = serr
|
||||
return nil, err
|
||||
|
Loading…
x
Reference in New Issue
Block a user