diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index 74107b445..d5980f796 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -254,7 +254,7 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered { batchTx: batchTx{backend: backend}, buf: txWriteBuffer{ txBuffer: txBuffer{make(map[string]*bucketBuffer)}, - seq: true, + seq: make(map[string]bool), }, } tx.Commit() diff --git a/server/mvcc/backend/tx_buffer.go b/server/mvcc/backend/tx_buffer.go index 4df6d0c59..db766219f 100644 --- a/server/mvcc/backend/tx_buffer.go +++ b/server/mvcc/backend/tx_buffer.go @@ -37,11 +37,12 @@ func (txb *txBuffer) reset() { // txWriteBuffer buffers writes of pending updates that have not yet committed. type txWriteBuffer struct { txBuffer - seq bool + seq map[string]bool } + func (txw *txWriteBuffer) put(bucket, k, v []byte) { - txw.seq = false + txw.seq[string(bucket)] = false txw.putSeq(bucket, k, v) } @@ -54,6 +55,18 @@ func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) { b.add(k, v) } +func (txw *txWriteBuffer) reset() { + txw.txBuffer.reset() + for k := range txw.seq { + v, ok := txw.buckets[k] + if !ok { + delete(txw.seq, k) + } else if v.used == 0 { + txw.seq[k] = true + } + } +} + func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { for k, wb := range txw.buckets { rb, ok := txr.buckets[k] @@ -62,7 +75,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { txr.buckets[k] = wb continue } - if !txw.seq && wb.used > 1 { + if seq, ok := txw.seq[k]; ok && !seq && wb.used > 1 { // assume no duplicate keys sort.Sort(wb) }