diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index 055aedaff..dde78b603 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -511,7 +511,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { if berr != nil { return berr } - tmpb.FillPercent = 0.9 // for seq write in for each + tmpb.FillPercent = 0.9 // for bucket2seq write in for each if err = b.ForEach(func(k, v []byte) error { count++ @@ -525,7 +525,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return err } tmpb = tmptx.Bucket(next) - tmpb.FillPercent = 0.9 // for seq write in for each + tmpb.FillPercent = 0.9 // for bucket2seq write in for each count = 0 } diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index 74107b445..358b47615 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -253,8 +253,8 @@ func newBatchTxBuffered(backend *backend) *batchTxBuffered { tx := &batchTxBuffered{ batchTx: batchTx{backend: backend}, buf: txWriteBuffer{ - txBuffer: txBuffer{make(map[string]*bucketBuffer)}, - seq: true, + txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + bucket2seq: make(map[string]bool), }, } tx.Commit() diff --git a/server/mvcc/backend/tx_buffer.go b/server/mvcc/backend/tx_buffer.go index 4df6d0c59..78844e94a 100644 --- a/server/mvcc/backend/tx_buffer.go +++ b/server/mvcc/backend/tx_buffer.go @@ -37,23 +37,44 @@ func (txb *txBuffer) reset() { // txWriteBuffer buffers writes of pending updates that have not yet committed. type txWriteBuffer struct { txBuffer - seq bool + // Map from bucket name into information whether this bucket is edited + // sequentially (i.e. keys are growing monotonically). + bucket2seq map[string]bool } +// TODO: Passing bucket as an (int) enum would avoid a lot of byte[]->string->hash conversions. func (txw *txWriteBuffer) put(bucket, k, v []byte) { - txw.seq = false - txw.putSeq(bucket, k, v) + bucketstr := string(bucket) + txw.bucket2seq[bucketstr] = false + txw.putInternal(bucketstr, k, v) } func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) { - b, ok := txw.buckets[string(bucket)] + // TODO: Add (in tests?) verification whether k>b[len(b)] + txw.putInternal(string(bucket), k, v) +} + +func (txw *txWriteBuffer) putInternal(bucket string, k, v []byte) { + b, ok := txw.buckets[bucket] if !ok { b = newBucketBuffer() - txw.buckets[string(bucket)] = b + txw.buckets[bucket] = b } b.add(k, v) } +func (txw *txWriteBuffer) reset() { + txw.txBuffer.reset() + for k := range txw.bucket2seq { + v, ok := txw.buckets[k] + if !ok { + delete(txw.bucket2seq, k) + } else if v.used == 0 { + txw.bucket2seq[k] = true + } + } +} + func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { for k, wb := range txw.buckets { rb, ok := txr.buckets[k] @@ -62,7 +83,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { txr.buckets[k] = wb continue } - if !txw.seq && wb.used > 1 { + if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 { // assume no duplicate keys sort.Sort(wb) }