diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 0cc54b7e8..eaf2740b4 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -247,7 +247,11 @@ func (b *backend) defrag() error { b.mu.Lock() defer b.mu.Unlock() - b.batchTx.commit(true) + // block concurrent read requests while resetting tx + b.readTx.mu.Lock() + defer b.readTx.mu.Unlock() + + b.batchTx.unsafeCommit(true) b.batchTx.tx = nil tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) @@ -288,6 +292,10 @@ func (b *backend) defrag() error { plog.Fatalf("cannot begin tx (%s)", err) } + b.readTx.buf.reset() + b.readTx.tx = b.unsafeBegin(false) + atomic.StoreInt64(&b.size, b.readTx.tx.Size()) + return nil } @@ -345,12 +353,17 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { func (b *backend) begin(write bool) *bolt.Tx { b.mu.RLock() + tx := b.unsafeBegin(write) + b.mu.RUnlock() + atomic.StoreInt64(&b.size, tx.Size()) + return tx +} + +func (b *backend) unsafeBegin(write bool) *bolt.Tx { tx, err := b.db.Begin(write) if err != nil { plog.Fatalf("cannot begin tx (%s)", err) } - b.mu.RUnlock() - atomic.StoreInt64(&b.size, tx.Size()) return tx } diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 1c248de59..2cafb9f7e 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -244,6 +244,10 @@ func (t *batchTxBuffered) commit(stop bool) { // all read txs must be closed to acquire boltdb commit rwlock t.backend.readTx.mu.Lock() defer t.backend.readTx.mu.Unlock() + t.unsafeCommit(stop) +} + +func (t *batchTxBuffered) unsafeCommit(stop bool) { if t.backend.readTx.tx != nil { if err := t.backend.readTx.tx.Rollback(); err != nil { plog.Fatalf("cannot rollback tx (%s)", err)