From 8d8d0377a2d8dea95bc0782e5a489a2b67555c3c Mon Sep 17 00:00:00 2001 From: Wilson Wang Date: Sun, 25 Apr 2021 18:06:40 -0700 Subject: [PATCH] server: applier uses ReadTx instead of ConcurrentTx --- server/etcdserver/apply.go | 12 ++++++++++-- server/mvcc/kv.go | 11 ++++++++++- server/mvcc/kv_test.go | 2 +- server/mvcc/kv_view.go | 6 +++--- server/mvcc/kvstore_test.go | 6 +++--- server/mvcc/kvstore_txn.go | 10 ++++++++-- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 74a257507..46da09a0b 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -336,7 +336,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra resp.Header = &pb.ResponseHeader{} if txn == nil { - txn = a.s.kv.Read(trace) + txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace) defer txn.End() } @@ -434,7 +434,15 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR ctx = context.WithValue(ctx, traceutil.TraceKey, trace) } isWrite := !isTxnReadonly(rt) - txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace)) + + // When the transaction contains write operations, we use ReadTx instead of + // ConcurrentReadTx to avoid extra overhead of copying buffer. + var txn mvcc.TxnWrite + if isWrite { + txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace)) + } else { + txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace)) + } var txnPath []bool trace.StepWithFunction( diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index b8cd982da..35f108bfc 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -100,12 +100,21 @@ func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil } func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} } +type ReadTxMode uint32 + +const ( + // Use ConcurrentReadTx and the txReadBuffer is copied + ConcurrentReadTxMode = ReadTxMode(1) + // Use backend ReadTx and txReadBuffer is not copied + SharedBufReadTxMode = ReadTxMode(2) +) + type KV interface { ReadView WriteView // Read creates a read transaction. - Read(trace *traceutil.Trace) TxnRead + Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite diff --git a/server/mvcc/kv_test.go b/server/mvcc/kv_test.go index 19d6539d8..51f688db7 100644 --- a/server/mvcc/kv_test.go +++ b/server/mvcc/kv_test.go @@ -50,7 +50,7 @@ var ( return kv.Range(context.TODO(), key, end, ro) } txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) { - txn := kv.Read(traceutil.TODO()) + txn := kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer txn.End() return txn.Range(context.TODO(), key, end, ro) } diff --git a/server/mvcc/kv_view.go b/server/mvcc/kv_view.go index 29464c50e..56260e759 100644 --- a/server/mvcc/kv_view.go +++ b/server/mvcc/kv_view.go @@ -24,19 +24,19 @@ import ( type readView struct{ kv KV } func (rv *readView) FirstRev() int64 { - tr := rv.kv.Read(traceutil.TODO()) + tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer tr.End() return tr.FirstRev() } func (rv *readView) Rev() int64 { - tr := rv.kv.Read(traceutil.TODO()) + tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer tr.End() return tr.Rev() } func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - tr := rv.kv.Read(traceutil.TODO()) + tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO()) defer tr.End() return tr.Range(ctx, key, end, ro) } diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index 1bb3fae24..f6f6313f8 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { s.Put([]byte("foo"), []byte("bar"), lease.NoLease) // readTx simulates a long read request - readTx1 := s.Read(traceutil.TODO()) + readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO()) // write should not be blocked by reads done := make(chan struct{}, 1) @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) { } // readTx2 simulates a short read request - readTx2 := s.Read(traceutil.TODO()) + readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO()) ro := RangeOptions{Limit: 1, Rev: 0, Count: false} ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro) if err != nil { @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { mu.Lock() wKVs := make(kvs, len(committedKVs)) copy(wKVs, committedKVs) - tx := s.Read(traceutil.TODO()) + tx := s.Read(ConcurrentReadTxMode, traceutil.TODO()) mu.Unlock() // get all keys in backend store, and compare with wKVs ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{}) diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index aaa93d9ab..7b170ff01 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -34,12 +34,18 @@ type storeTxnRead struct { trace *traceutil.Trace } -func (s *store) Read(trace *traceutil.Trace) TxnRead { +func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead { + var tx backend.ReadTx s.mu.RLock() s.revMu.RLock() // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After // ConcurrentReadTx is created, it will not block write transaction. - tx := s.b.ConcurrentReadTx() + if mode == ConcurrentReadTxMode { + tx = s.b.ConcurrentReadTx() + } else { + tx = s.b.ReadTx() + } + tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created. firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock()