From e8f40b04128b39a6e05f6636cd267be934407585 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 25 Aug 2015 10:57:23 -0700 Subject: [PATCH] storage/backend: add commitAndStop After the upgrade of boltdb, db.Close waits for all txn to finish. CommitAndStop commits the current txn and stop creating new ones. --- storage/backend/backend.go | 2 +- storage/backend/batch_tx.go | 22 +++++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 9cb995db0..9ce9098c6 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -80,7 +80,7 @@ func (b *backend) run() { select { case <-time.After(b.batchInterval): case <-b.stopc: - b.batchTx.Commit() + b.batchTx.CommitAndStop() return } b.batchTx.Commit() diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index 8cdcf23b2..d01df3906 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -16,6 +16,7 @@ type BatchTx interface { UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeDelete(bucketName []byte, key []byte) Commit() + CommitAndStop() } type batchTx struct { @@ -43,7 +44,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.commit() + t.commit(false) t.pending = 0 } } @@ -84,19 +85,26 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.commit() + t.commit(false) t.pending = 0 } } -// commitAndBegin commits a previous tx and begins a new writable one. +// Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { t.Lock() defer t.Unlock() - t.commit() + t.commit(false) } -func (t *batchTx) commit() { +// CommitAndStop commits the previous tx and do not create a new one. +func (t *batchTx) CommitAndStop() { + t.Lock() + defer t.Unlock() + t.commit(true) +} + +func (t *batchTx) commit(stop bool) { var err error // commit the last tx if t.tx != nil { @@ -106,6 +114,10 @@ func (t *batchTx) commit() { } } + if stop { + return + } + // begin a new tx t.tx, err = t.backend.db.Begin(true) if err != nil {