mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Separate txnRead from txnWrite
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
524fddc426
commit
81ecac11cb
@ -242,31 +242,35 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
isWrite := !IsTxnReadonly(rt)
|
||||
// When the transaction contains write operations, we use ReadTx instead of
|
||||
// ConcurrentReadTx to avoid extra overhead of copying buffer.
|
||||
var txnWrite mvcc.TxnWrite
|
||||
var mode mvcc.ReadTxMode
|
||||
if isWrite && txnModeWriteWithSharedBuffer /*a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer*/ {
|
||||
txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace))
|
||||
mode = mvcc.SharedBufReadTxMode
|
||||
} else {
|
||||
txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace))
|
||||
mode = mvcc.ConcurrentReadTxMode
|
||||
}
|
||||
txnRead := kv.Read(mode, trace)
|
||||
var txnPath []bool
|
||||
trace.StepWithFunction(
|
||||
func() {
|
||||
txnPath = compareToPath(txnWrite, rt)
|
||||
txnPath = compareToPath(txnRead, rt)
|
||||
},
|
||||
"compare",
|
||||
)
|
||||
err := checkTxn(ctx, txnWrite, rt, isWrite, lessor, txnPath)
|
||||
err := checkTxn(ctx, txnRead, rt, isWrite, lessor, txnPath)
|
||||
if err != nil {
|
||||
txnWrite.End()
|
||||
txnRead.End()
|
||||
return nil, nil, err
|
||||
}
|
||||
// When executing mutable txnWrite ops, etcd must hold the txnWrite lock so
|
||||
// readers do not see any intermediate results. Since writes are
|
||||
// serialized on the raft loop, the revision in the read view will
|
||||
// be the revision of the write txnWrite.
|
||||
var txnWrite mvcc.TxnWrite
|
||||
if isWrite {
|
||||
txnWrite.End()
|
||||
txnRead.End()
|
||||
txnWrite = kv.Write(trace)
|
||||
} else {
|
||||
txnWrite = mvcc.NewReadOnlyTxnWrite(txnRead)
|
||||
}
|
||||
txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath)
|
||||
txnWrite.End()
|
||||
@ -278,16 +282,16 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit
|
||||
return txnResp, trace, err
|
||||
}
|
||||
|
||||
func checkTxn(ctx context.Context, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, lessor lease.Lessor, txnPath []bool) error {
|
||||
func checkTxn(ctx context.Context, txnRead mvcc.TxnRead, rt *pb.TxnRequest, isWrite bool, lessor lease.Lessor, txnPath []bool) error {
|
||||
trace := traceutil.Get(ctx)
|
||||
if isWrite {
|
||||
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
|
||||
if _, err := checkRequests(txnWrite, rt, txnPath,
|
||||
if _, err := checkRequests(txnRead, rt, txnPath,
|
||||
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil {
|
||||
if _, err := checkRequests(txnRead, rt, txnPath, checkRequestRange); err != nil {
|
||||
return err
|
||||
}
|
||||
trace.Step("check requests")
|
||||
|
Loading…
x
Reference in New Issue
Block a user