mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: applier uses ReadTx instead of ConcurrentTx
This commit is contained in:
parent
9a3aff6d8f
commit
8d8d0377a2
@ -336,7 +336,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
|
||||
resp.Header = &pb.ResponseHeader{}
|
||||
|
||||
if txn == nil {
|
||||
txn = a.s.kv.Read(trace)
|
||||
txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
|
||||
defer txn.End()
|
||||
}
|
||||
|
||||
@ -434,7 +434,15 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
|
||||
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
|
||||
}
|
||||
isWrite := !isTxnReadonly(rt)
|
||||
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))
|
||||
|
||||
// When the transaction contains write operations, we use ReadTx instead of
|
||||
// ConcurrentReadTx to avoid extra overhead of copying buffer.
|
||||
var txn mvcc.TxnWrite
|
||||
if isWrite {
|
||||
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
|
||||
} else {
|
||||
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
|
||||
}
|
||||
|
||||
var txnPath []bool
|
||||
trace.StepWithFunction(
|
||||
|
@ -100,12 +100,21 @@ func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
|
||||
|
||||
func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
|
||||
|
||||
type ReadTxMode uint32
|
||||
|
||||
const (
|
||||
// Use ConcurrentReadTx and the txReadBuffer is copied
|
||||
ConcurrentReadTxMode = ReadTxMode(1)
|
||||
// Use backend ReadTx and txReadBuffer is not copied
|
||||
SharedBufReadTxMode = ReadTxMode(2)
|
||||
)
|
||||
|
||||
type KV interface {
|
||||
ReadView
|
||||
WriteView
|
||||
|
||||
// Read creates a read transaction.
|
||||
Read(trace *traceutil.Trace) TxnRead
|
||||
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead
|
||||
|
||||
// Write creates a write transaction.
|
||||
Write(trace *traceutil.Trace) TxnWrite
|
||||
|
@ -50,7 +50,7 @@ var (
|
||||
return kv.Range(context.TODO(), key, end, ro)
|
||||
}
|
||||
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
|
||||
txn := kv.Read(traceutil.TODO())
|
||||
txn := kv.Read(ConcurrentReadTxMode, traceutil.TODO())
|
||||
defer txn.End()
|
||||
return txn.Range(context.TODO(), key, end, ro)
|
||||
}
|
||||
|
@ -24,19 +24,19 @@ import (
|
||||
type readView struct{ kv KV }
|
||||
|
||||
func (rv *readView) FirstRev() int64 {
|
||||
tr := rv.kv.Read(traceutil.TODO())
|
||||
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
|
||||
defer tr.End()
|
||||
return tr.FirstRev()
|
||||
}
|
||||
|
||||
func (rv *readView) Rev() int64 {
|
||||
tr := rv.kv.Read(traceutil.TODO())
|
||||
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
|
||||
defer tr.End()
|
||||
return tr.Rev()
|
||||
}
|
||||
|
||||
func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
|
||||
tr := rv.kv.Read(traceutil.TODO())
|
||||
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
|
||||
defer tr.End()
|
||||
return tr.Range(ctx, key, end, ro)
|
||||
}
|
||||
|
@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
||||
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
|
||||
|
||||
// readTx simulates a long read request
|
||||
readTx1 := s.Read(traceutil.TODO())
|
||||
readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
|
||||
|
||||
// write should not be blocked by reads
|
||||
done := make(chan struct{}, 1)
|
||||
@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
|
||||
}
|
||||
|
||||
// readTx2 simulates a short read request
|
||||
readTx2 := s.Read(traceutil.TODO())
|
||||
readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
|
||||
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
|
||||
ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro)
|
||||
if err != nil {
|
||||
@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
|
||||
mu.Lock()
|
||||
wKVs := make(kvs, len(committedKVs))
|
||||
copy(wKVs, committedKVs)
|
||||
tx := s.Read(traceutil.TODO())
|
||||
tx := s.Read(ConcurrentReadTxMode, traceutil.TODO())
|
||||
mu.Unlock()
|
||||
// get all keys in backend store, and compare with wKVs
|
||||
ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
|
||||
|
@ -34,12 +34,18 @@ type storeTxnRead struct {
|
||||
trace *traceutil.Trace
|
||||
}
|
||||
|
||||
func (s *store) Read(trace *traceutil.Trace) TxnRead {
|
||||
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
|
||||
var tx backend.ReadTx
|
||||
s.mu.RLock()
|
||||
s.revMu.RLock()
|
||||
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
|
||||
// ConcurrentReadTx is created, it will not block write transaction.
|
||||
tx := s.b.ConcurrentReadTx()
|
||||
if mode == ConcurrentReadTxMode {
|
||||
tx = s.b.ConcurrentReadTx()
|
||||
} else {
|
||||
tx = s.b.ReadTx()
|
||||
}
|
||||
|
||||
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
|
||||
firstRev, rev := s.compactMainRev, s.currentRev
|
||||
s.revMu.RUnlock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user