mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Refactor checkTxn into single function handling all request types
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
d09b8d474a
commit
bd7f0dab3c
@ -267,11 +267,15 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
},
|
||||
"compare",
|
||||
)
|
||||
err := checkTxn(ctx, txnRead, rt, isWrite, lessor, txnPath)
|
||||
if isWrite {
|
||||
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
|
||||
}
|
||||
_, err := checkTxn(txnRead, rt, lessor, txnPath)
|
||||
if err != nil {
|
||||
txnRead.End()
|
||||
return nil, nil, err
|
||||
}
|
||||
trace.Step("check requests")
|
||||
// When executing mutable txnWrite ops, etcd must hold the txnWrite lock so
|
||||
// readers do not see any intermediate results. Since writes are
|
||||
// serialized on the raft loop, the revision in the read view will
|
||||
@ -293,22 +297,6 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
return txnResp, trace, err
|
||||
}
|
||||
|
||||
func checkTxn(ctx context.Context, txnRead mvcc.TxnRead, rt *pb.TxnRequest, isWrite bool, lessor lease.Lessor, txnPath []bool) error {
|
||||
trace := traceutil.Get(ctx)
|
||||
if isWrite {
|
||||
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
|
||||
if _, err := checkRequests(txnRead, rt, txnPath,
|
||||
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if _, err := checkRequests(txnRead, rt, txnPath, checkRequestRange); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("check requests")
|
||||
return nil
|
||||
}
|
||||
|
||||
func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) {
|
||||
txnResp, _ := newTxnResp(rt, txnPath)
|
||||
_, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp)
|
||||
@ -416,16 +404,7 @@ func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt
|
||||
return txns, nil
|
||||
}
|
||||
|
||||
//---------------------------------------------------------
|
||||
|
||||
type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
|
||||
|
||||
func checkRequestPut(rv mvcc.ReadView, lessor lease.Lessor, reqOp *pb.RequestOp) error {
|
||||
tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
|
||||
if !ok || tv.RequestPut == nil {
|
||||
return nil
|
||||
}
|
||||
req := tv.RequestPut
|
||||
func checkPut(rv mvcc.ReadView, lessor lease.Lessor, req *pb.PutRequest) error {
|
||||
if req.IgnoreValue || req.IgnoreLease {
|
||||
// expects previous key-value, error if not exist
|
||||
rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
|
||||
@ -444,12 +423,7 @@ func checkRequestPut(rv mvcc.ReadView, lessor lease.Lessor, reqOp *pb.RequestOp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
|
||||
tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
|
||||
if !ok || tv.RequestRange == nil {
|
||||
return nil
|
||||
}
|
||||
req := tv.RequestRange
|
||||
func checkRange(rv mvcc.ReadView, req *pb.RangeRequest) error {
|
||||
switch {
|
||||
case req.Revision == 0:
|
||||
return nil
|
||||
@ -461,23 +435,29 @@ func checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
|
||||
func checkTxn(rv mvcc.ReadView, rt *pb.TxnRequest, lessor lease.Lessor, txnPath []bool) (int, error) {
|
||||
txnCount := 0
|
||||
reqs := rt.Success
|
||||
if !txnPath[0] {
|
||||
reqs = rt.Failure
|
||||
}
|
||||
for _, req := range reqs {
|
||||
if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
|
||||
txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
var err error
|
||||
var txns int
|
||||
switch tv := req.Request.(type) {
|
||||
case *pb.RequestOp_RequestRange:
|
||||
err = checkRange(rv, tv.RequestRange)
|
||||
case *pb.RequestOp_RequestPut:
|
||||
err = checkPut(rv, lessor, tv.RequestPut)
|
||||
case *pb.RequestOp_RequestDeleteRange:
|
||||
case *pb.RequestOp_RequestTxn:
|
||||
txns, err = checkTxn(rv, tv.RequestTxn, lessor, txnPath[1:])
|
||||
txnCount += txns + 1
|
||||
txnPath = txnPath[txns+1:]
|
||||
continue
|
||||
default:
|
||||
// empty union
|
||||
}
|
||||
if err := f(rv, req); err != nil {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user