commit bbolt transaction if there is any pending deleting operations

Signed-off-by: Siyuan Zhang <sizhang@google.com>
This commit is contained in:
Siyuan Zhang 2024-01-12 09:16:49 -08:00
parent c3af9427ed
commit b70684b93d

View File

@ -258,7 +258,8 @@ func (t *batchTx) commit(stop bool) {
type batchTxBuffered struct {
batchTx
buf txWriteBuffer
buf txWriteBuffer
pendingDeleteOperations int
}
func newBatchTxBuffered(backend *backend) *batchTxBuffered {
@ -279,7 +280,27 @@ func (t *batchTxBuffered) Unlock() {
// gofail: var beforeWritebackBuf struct{}
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
// We commit the transaction when the number of pending operations
// reaches the configured limit(batchLimit) to prevent it from
// becoming excessively large.
//
// But we also need to commit the transaction immediately if there
// is any pending deleting operation, otherwise etcd might run into
// a situation that it haven't finished committing the data into backend
// storage (note: etcd periodically commits the bbolt transactions
// instead of on each request) when it applies next request. Accordingly,
// etcd may still read the stale data from bbolt when processing next
// request. So it breaks the linearizability.
//
// Note we don't need to commit the transaction for put requests if
// it doesn't exceed the batch limit, because there is a buffer on top
// of the bbolt. Each time when etcd reads data from backend storage,
// it will read data from both bbolt and the buffer. But there is no
// such a buffer for delete requests.
//
// Please also refer to
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
}
@ -323,6 +344,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}
t.batchTx.commit(stop)
t.pendingDeleteOperations = 0
if !stop {
t.backend.readTx.tx = t.backend.begin(false)
@ -338,3 +360,8 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []by
t.batchTx.UnsafeSeqPut(bucketName, key, value)
t.buf.putSeq(bucketName, key, value)
}
func (t *batchTxBuffered) UnsafeDelete(bucketName []byte, key []byte) {
t.batchTx.UnsafeDelete(bucketName, key)
t.pendingDeleteOperations++
}