From b70684b93dab2a83951428a7c107d943c8da7399 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Fri, 12 Jan 2024 09:16:49 -0800 Subject: [PATCH] commit bbolt transaction if there is any pending deleting operations Signed-off-by: Siyuan Zhang --- mvcc/backend/batch_tx.go | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index adebe7d14..2931c04e8 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -258,7 +258,8 @@ func (t *batchTx) commit(stop bool) { type batchTxBuffered struct { batchTx - buf txWriteBuffer + buf txWriteBuffer + pendingDeleteOperations int } func newBatchTxBuffered(backend *backend) *batchTxBuffered { @@ -279,7 +280,27 @@ func (t *batchTxBuffered) Unlock() { // gofail: var beforeWritebackBuf struct{} t.buf.writeback(&t.backend.readTx.buf) t.backend.readTx.Unlock() - if t.pending >= t.backend.batchLimit { + // We commit the transaction when the number of pending operations + // reaches the configured limit(batchLimit) to prevent it from + // becoming excessively large. + // + // But we also need to commit the transaction immediately if there + // is any pending deleting operation, otherwise etcd might run into + // a situation that it haven't finished committing the data into backend + // storage (note: etcd periodically commits the bbolt transactions + // instead of on each request) when it applies next request. Accordingly, + // etcd may still read the stale data from bbolt when processing next + // request. So it breaks the linearizability. + // + // Note we don't need to commit the transaction for put requests if + // it doesn't exceed the batch limit, because there is a buffer on top + // of the bbolt. Each time when etcd reads data from backend storage, + // it will read data from both bbolt and the buffer. But there is no + // such a buffer for delete requests. + // + // Please also refer to + // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 + if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { t.commit(false) } } @@ -323,6 +344,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } t.batchTx.commit(stop) + t.pendingDeleteOperations = 0 if !stop { t.backend.readTx.tx = t.backend.begin(false) @@ -338,3 +360,8 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []by t.batchTx.UnsafeSeqPut(bucketName, key, value) t.buf.putSeq(bucketName, key, value) } + +func (t *batchTxBuffered) UnsafeDelete(bucketName []byte, key []byte) { + t.batchTx.UnsafeDelete(bucketName, key) + t.pendingDeleteOperations++ +}