From 8738a88fae8c3929187629ee3e014f78b74859e3 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 25 Aug 2015 10:39:29 -0700 Subject: [PATCH 1/2] Godeps: update bolt dependency --- Godeps/Godeps.json | 4 +- .../src/github.com/boltdb/bolt/.gitignore | 1 + .../src/github.com/boltdb/bolt/README.md | 32 ++++- .../src/github.com/boltdb/bolt/batch.go | 3 + .../src/github.com/boltdb/bolt/bolt_unix.go | 36 +++-- .../github.com/boltdb/bolt/bolt_windows.go | 10 +- .../src/github.com/boltdb/bolt/bucket_test.go | 16 +++ .../github.com/boltdb/bolt/cmd/bolt/main.go | 6 +- .../src/github.com/boltdb/bolt/db.go | 82 ++++++++++-- .../src/github.com/boltdb/bolt/db_test.go | 125 +++++++++++++++++- .../src/github.com/boltdb/bolt/errors.go | 4 + .../src/github.com/boltdb/bolt/freelist.go | 15 ++- .../github.com/boltdb/bolt/freelist_test.go | 27 ++++ .../src/github.com/boltdb/bolt/node.go | 13 +- .../src/github.com/boltdb/bolt/page.go | 44 +++++- .../src/github.com/boltdb/bolt/page_test.go | 43 ++++++ .../src/github.com/boltdb/bolt/tx.go | 42 ++++-- .../src/github.com/boltdb/bolt/tx_test.go | 32 +++++ 18 files changed, 485 insertions(+), 50 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 967fd49b7..34948e373 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -20,8 +20,8 @@ }, { "ImportPath": "github.com/boltdb/bolt", - "Comment": "v1.0-71-g71f28ea", - "Rev": "71f28eaecbebd00604d87bb1de0dae8fcfa54bbd" + "Comment": "v1.0-119-g90fef38", + "Rev": "90fef389f98027ca55594edd7dbd6e7f3926fdad" }, { "ImportPath": "github.com/bradfitz/http2", diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/.gitignore b/Godeps/_workspace/src/github.com/boltdb/bolt/.gitignore index b2bb382bd..c7bd2b7a5 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/.gitignore +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/.gitignore @@ -1,3 +1,4 @@ *.prof *.test +*.swp /bin/ diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/README.md b/Godeps/_workspace/src/github.com/boltdb/bolt/README.md index 401a757c3..00fad6afb 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/README.md +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/README.md @@ -87,6 +87,11 @@ are not thread safe. To work with data in multiple goroutines you must start a transaction for each one or use locking to ensure only one goroutine accesses a transaction at a time. Creating transaction from the `DB` is thread safe. +Read-only transactions and read-write transactions should not depend on one +another and generally shouldn't be opened simultaneously in the same goroutine. +This can cause a deadlock as the read-write transaction needs to periodically +re-map the data file but it cannot do so while a read-only transaction is open. + #### Read-write transactions @@ -446,6 +451,21 @@ It's also useful to pipe these stats to a service such as statsd for monitoring or to provide an HTTP endpoint that will perform a fixed-length sample. +### Read-Only Mode + +Sometimes it is useful to create a shared, read-only Bolt database. To this, +set the `Options.ReadOnly` flag when opening your database. Read-only mode +uses a shared lock to allow multiple processes to read from the database but +it will block any processes from opening the database in read-write mode. + +```go +db, err := bolt.Open("my.db", 0666, &bolt.Options{ReadOnly: true}) +if err != nil { + log.Fatal(err) +} +``` + + ## Resources For more information on getting started with Bolt, check out the following articles: @@ -550,6 +570,11 @@ Here are a few things to note when evaluating and using Bolt: However, this is expected and the OS will release memory as needed. Bolt can handle databases much larger than the available physical RAM. +* The data structures in the Bolt database are memory mapped so the data file + will be endian specific. This means that you cannot copy a Bolt file from a + little endian machine to a big endian machine and have it work. For most + users this is not a concern since most modern CPUs are little endian. + * Because of the way pages are laid out on disk, Bolt cannot truncate data files and return free pages back to the disk. Instead, Bolt maintains a free list of unused pages within its data file. These free pages can be reused by later @@ -567,7 +592,7 @@ Here are a few things to note when evaluating and using Bolt: Below is a list of public, open source projects that use Bolt: * [Operation Go: A Routine Mission](http://gocode.io) - An online programming game for Golang using Bolt for user accounts and a leaderboard. -* [Bazil](https://github.com/bazillion/bazil) - A file system that lets your data reside where it is most convenient for it to reside. +* [Bazil](https://bazil.org/) - A file system that lets your data reside where it is most convenient for it to reside. * [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb. * [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics. * [Scuttlebutt](https://github.com/benbjohnson/scuttlebutt) - Uses Bolt to store and process all Twitter mentions of GitHub projects. @@ -587,5 +612,10 @@ Below is a list of public, open source projects that use Bolt: * [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database. * [Seaweed File System](https://github.com/chrislusf/weed-fs) - Highly scalable distributed key~file system with O(1) disk read. * [InfluxDB](http://influxdb.com) - Scalable datastore for metrics, events, and real-time analytics. +* [Freehold](http://tshannon.bitbucket.org/freehold/) - An open, secure, and lightweight platform for your files and data. +* [Prometheus Annotation Server](https://github.com/oliver006/prom_annotation_server) - Annotation server for PromDash & Prometheus service monitoring system. +* [Consul](https://github.com/hashicorp/consul) - Consul is service discovery and configuration made easy. Distributed, highly available, and datacenter-aware. +* [Kala](https://github.com/ajvb/kala) - Kala is a modern job scheduler optimized to run on a single node. It is persistant, JSON over HTTP API, ISO 8601 duration notation, and dependent jobs. +* [drive](https://github.com/odeke-em/drive) - drive is an unofficial Google Drive command line client for \*NIX operating systems. If you are using Bolt in a project please send a pull request to add it to the list. diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/batch.go b/Godeps/_workspace/src/github.com/boltdb/bolt/batch.go index bef1f4aa2..84acae6bb 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/batch.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/batch.go @@ -20,6 +20,9 @@ import ( // take permanent effect only after a successful return is seen in // caller. // +// The maximum batch size and delay can be adjusted with DB.MaxBatchSize +// and DB.MaxBatchDelay, respectively. +// // Batch is only useful when there are multiple goroutines calling it. func (db *DB) Batch(fn func(*Tx) error) error { errCh := make(chan error, 1) diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_unix.go b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_unix.go index e222cfdcc..6eef6b220 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_unix.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_unix.go @@ -1,4 +1,4 @@ -// +build !windows,!plan9 +// +build !windows,!plan9,!solaris package bolt @@ -11,7 +11,7 @@ import ( ) // flock acquires an advisory lock on a file descriptor. -func flock(f *os.File, timeout time.Duration) error { +func flock(f *os.File, exclusive bool, timeout time.Duration) error { var t time.Time for { // If we're beyond our timeout then return an error. @@ -21,9 +21,13 @@ func flock(f *os.File, timeout time.Duration) error { } else if timeout > 0 && time.Since(t) > timeout { return ErrTimeout } + flag := syscall.LOCK_SH + if exclusive { + flag = syscall.LOCK_EX + } // Otherwise attempt to obtain an exclusive lock. - err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + err := syscall.Flock(int(f.Fd()), flag|syscall.LOCK_NB) if err == nil { return nil } else if err != syscall.EWOULDBLOCK { @@ -44,11 +48,13 @@ func funlock(f *os.File) error { func mmap(db *DB, sz int) error { // Truncate and fsync to ensure file size metadata is flushed. // https://github.com/boltdb/bolt/issues/284 - if err := db.file.Truncate(int64(sz)); err != nil { - return fmt.Errorf("file resize error: %s", err) - } - if err := db.file.Sync(); err != nil { - return fmt.Errorf("file sync error: %s", err) + if !db.NoGrowSync && !db.readOnly { + if err := db.file.Truncate(int64(sz)); err != nil { + return fmt.Errorf("file resize error: %s", err) + } + if err := db.file.Sync(); err != nil { + return fmt.Errorf("file sync error: %s", err) + } } // Map the data file to memory. @@ -57,6 +63,11 @@ func mmap(db *DB, sz int) error { return err } + // Advise the kernel that the mmap is accessed randomly. + if err := madvise(b, syscall.MADV_RANDOM); err != nil { + return fmt.Errorf("madvise: %s", err) + } + // Save the original byte slice and convert to a byte array pointer. db.dataref = b db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0])) @@ -78,3 +89,12 @@ func munmap(db *DB) error { db.datasz = 0 return err } + +// NOTE: This function is copied from stdlib because it is not available on darwin. +func madvise(b []byte, advice int) (err error) { + _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice)) + if e1 != 0 { + err = e1 + } + return +} diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_windows.go b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_windows.go index c8539d40b..8b782be5f 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_windows.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/bolt_windows.go @@ -16,7 +16,7 @@ func fdatasync(db *DB) error { } // flock acquires an advisory lock on a file descriptor. -func flock(f *os.File, _ time.Duration) error { +func flock(f *os.File, _ bool, _ time.Duration) error { return nil } @@ -28,9 +28,11 @@ func funlock(f *os.File) error { // mmap memory maps a DB's data file. // Based on: https://github.com/edsrzf/mmap-go func mmap(db *DB, sz int) error { - // Truncate the database to the size of the mmap. - if err := db.file.Truncate(int64(sz)); err != nil { - return fmt.Errorf("truncate: %s", err) + if !db.readOnly { + // Truncate the database to the size of the mmap. + if err := db.file.Truncate(int64(sz)); err != nil { + return fmt.Errorf("truncate: %s", err) + } } // Open a file mapping handle. diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/bucket_test.go b/Godeps/_workspace/src/github.com/boltdb/bolt/bucket_test.go index 7ceb6f536..99292b458 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/bucket_test.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/bucket_test.go @@ -640,6 +640,22 @@ func TestBucket_Put_KeyTooLarge(t *testing.T) { }) } +// Ensure that an error is returned when inserting a value that's too large. +func TestBucket_Put_ValueTooLarge(t *testing.T) { + if os.Getenv("DRONE") == "true" { + t.Skip("not enough RAM for test") + } + + db := NewTestDB() + defer db.Close() + db.Update(func(tx *bolt.Tx) error { + tx.CreateBucket([]byte("widgets")) + err := tx.Bucket([]byte("widgets")).Put([]byte("foo"), make([]byte, bolt.MaxValueSize+1)) + equals(t, err, bolt.ErrValueTooLarge) + return nil + }) +} + // Ensure a bucket can calculate stats. func TestBucket_Stats(t *testing.T) { db := NewTestDB() diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/main.go b/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/main.go index 5a7ae8e4e..c0a836296 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/main.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/cmd/bolt/main.go @@ -344,7 +344,7 @@ func (cmd *DumpCommand) Run(args ...string) error { for i, pageID := range pageIDs { // Print a separator. if i > 0 { - fmt.Fprintln(cmd.Stdout, "===============================================\n") + fmt.Fprintln(cmd.Stdout, "===============================================") } // Print page to stdout. @@ -465,7 +465,7 @@ func (cmd *PageCommand) Run(args ...string) error { for i, pageID := range pageIDs { // Print a separator. if i > 0 { - fmt.Fprintln(cmd.Stdout, "===============================================\n") + fmt.Fprintln(cmd.Stdout, "===============================================") } // Retrieve page info and page size. @@ -917,7 +917,7 @@ func (cmd *BenchCommand) Run(args ...string) error { // Write to the database. var results BenchResults if err := cmd.runWrites(db, options, &results); err != nil { - return fmt.Errorf("write: ", err) + return fmt.Errorf("write: %v", err) } // Read from the database. diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/db.go b/Godeps/_workspace/src/github.com/boltdb/bolt/db.go index 8f0e90b55..d39c4aa9c 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/db.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/db.go @@ -55,6 +55,14 @@ type DB struct { // THIS IS UNSAFE. PLEASE USE WITH CAUTION. NoSync bool + // When true, skips the truncate call when growing the database. + // Setting this to true is only safe on non-ext3/ext4 systems. + // Skipping truncation avoids preallocation of hard drive space and + // bypasses a truncate() and fsync() syscall on remapping. + // + // https://github.com/boltdb/bolt/issues/284 + NoGrowSync bool + // MaxBatchSize is the maximum size of a batch. Default value is // copied from DefaultMaxBatchSize in Open. // @@ -96,6 +104,10 @@ type DB struct { ops struct { writeAt func(b []byte, off int64) (n int, err error) } + + // Read only mode. + // When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately. + readOnly bool } // Path returns the path to currently open database file. @@ -123,24 +135,34 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) { if options == nil { options = DefaultOptions } + db.NoGrowSync = options.NoGrowSync // Set default values for later DB operations. db.MaxBatchSize = DefaultMaxBatchSize db.MaxBatchDelay = DefaultMaxBatchDelay + flag := os.O_RDWR + if options.ReadOnly { + flag = os.O_RDONLY + db.readOnly = true + } + // Open data file and separate sync handler for metadata writes. db.path = path - var err error - if db.file, err = os.OpenFile(db.path, os.O_RDWR|os.O_CREATE, mode); err != nil { + if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil { _ = db.close() return nil, err } - // Lock file so that other processes using Bolt cannot use the database - // at the same time. This would cause corruption since the two processes - // would write meta pages and free pages separately. - if err := flock(db.file, options.Timeout); err != nil { + // Lock file so that other processes using Bolt in read-write mode cannot + // use the database at the same time. This would cause corruption since + // the two processes would write meta pages and free pages separately. + // The database file is locked exclusively (only one process can grab the lock) + // if !options.ReadOnly. + // The database file is locked using the shared lock (more than one process may + // hold a lock at the same time) otherwise (options.ReadOnly is set). + if err := flock(db.file, !db.readOnly, options.Timeout); err != nil { _ = db.close() return nil, err } @@ -247,8 +269,8 @@ func (db *DB) munmap() error { // of the database. The minimum size is 1MB and doubles until it reaches 1GB. // Returns an error if the new mmap size is greater than the max allowed. func (db *DB) mmapSize(size int) (int, error) { - // Double the size from 1MB until 1GB. - for i := uint(20); i <= 30; i++ { + // Double the size from 32KB until 1GB. + for i := uint(15); i <= 30; i++ { if size <= 1< 0 { + // Merge largest prefix of lead that is ahead of follow[0]. + n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] }) + merged = append(merged, lead[:n]...) + if n >= len(lead) { + break + } + + // Swap lead and follow. + lead, follow = follow, lead[n:] + } + + // Append what's left in follow. + merged = append(merged, follow...) + + return merged +} diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/page_test.go b/Godeps/_workspace/src/github.com/boltdb/bolt/page_test.go index 7a4d327fe..59f4a30ed 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/page_test.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/page_test.go @@ -1,7 +1,10 @@ package bolt import ( + "reflect" + "sort" "testing" + "testing/quick" ) // Ensure that the page type can be returned in human readable format. @@ -27,3 +30,43 @@ func TestPage_typ(t *testing.T) { func TestPage_dump(t *testing.T) { (&page{id: 256}).hexdump(16) } + +func TestPgids_merge(t *testing.T) { + a := pgids{4, 5, 6, 10, 11, 12, 13, 27} + b := pgids{1, 3, 8, 9, 25, 30} + c := a.merge(b) + if !reflect.DeepEqual(c, pgids{1, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30}) { + t.Errorf("mismatch: %v", c) + } + + a = pgids{4, 5, 6, 10, 11, 12, 13, 27, 35, 36} + b = pgids{8, 9, 25, 30} + c = a.merge(b) + if !reflect.DeepEqual(c, pgids{4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30, 35, 36}) { + t.Errorf("mismatch: %v", c) + } +} + +func TestPgids_merge_quick(t *testing.T) { + if err := quick.Check(func(a, b pgids) bool { + // Sort incoming lists. + sort.Sort(a) + sort.Sort(b) + + // Merge the two lists together. + got := a.merge(b) + + // The expected value should be the two lists combined and sorted. + exp := append(a, b...) + sort.Sort(exp) + + if !reflect.DeepEqual(exp, got) { + t.Errorf("\nexp=%+v\ngot=%+v\n", exp, got) + return false + } + + return true + }, nil); err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go b/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go index fda6a210a..6b52b2c89 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/tx.go @@ -127,7 +127,8 @@ func (tx *Tx) OnCommit(fn func()) { } // Commit writes all changes to disk and updates the meta page. -// Returns an error if a disk write error occurs. +// Returns an error if a disk write error occurs, or if Commit is +// called on a read-only transaction. func (tx *Tx) Commit() error { _assert(!tx.managed, "managed tx commit not allowed") if tx.db == nil { @@ -203,7 +204,8 @@ func (tx *Tx) Commit() error { return nil } -// Rollback closes the transaction and ignores all previous updates. +// Rollback closes the transaction and ignores all previous updates. Read-only +// transactions must be rolled back and not committed. func (tx *Tx) Rollback() error { _assert(!tx.managed, "managed tx rollback not allowed") if tx.db == nil { @@ -421,15 +423,39 @@ func (tx *Tx) write() error { // Write pages to disk in order. for _, p := range pages { size := (int(p.overflow) + 1) * tx.db.pageSize - buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] offset := int64(p.id) * int64(tx.db.pageSize) - if _, err := tx.db.ops.writeAt(buf, offset); err != nil { - return err - } - // Update statistics. - tx.stats.Write++ + // Write out page in "max allocation" sized chunks. + ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p)) + for { + // Limit our write to our max allocation size. + sz := size + if sz > maxAllocSize-1 { + sz = maxAllocSize - 1 + } + + // Write chunk to disk. + buf := ptr[:sz] + if _, err := tx.db.ops.writeAt(buf, offset); err != nil { + return err + } + + // Update statistics. + tx.stats.Write++ + + // Exit inner for loop if we've written all the chunks. + size -= sz + if size == 0 { + break + } + + // Otherwise move offset forward and move pointer to next chunk. + offset += int64(sz) + ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz])) + } } + + // Ignore file sync if flag is set on DB. if !tx.db.NoSync || IgnoreNoSync { if err := fdatasync(tx.db); err != nil { return err diff --git a/Godeps/_workspace/src/github.com/boltdb/bolt/tx_test.go b/Godeps/_workspace/src/github.com/boltdb/bolt/tx_test.go index 9612f336a..61bcc0eea 100644 --- a/Godeps/_workspace/src/github.com/boltdb/bolt/tx_test.go +++ b/Godeps/_workspace/src/github.com/boltdb/bolt/tx_test.go @@ -252,6 +252,38 @@ func TestTx_DeleteBucket_NotFound(t *testing.T) { }) } +// Ensure that no error is returned when a tx.ForEach function does not return +// an error. +func TestTx_ForEach_NoError(t *testing.T) { + db := NewTestDB() + defer db.Close() + db.Update(func(tx *bolt.Tx) error { + tx.CreateBucket([]byte("widgets")) + tx.Bucket([]byte("widgets")).Put([]byte("foo"), []byte("bar")) + + equals(t, nil, tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return nil + })) + return nil + }) +} + +// Ensure that an error is returned when a tx.ForEach function returns an error. +func TestTx_ForEach_WithError(t *testing.T) { + db := NewTestDB() + defer db.Close() + db.Update(func(tx *bolt.Tx) error { + tx.CreateBucket([]byte("widgets")) + tx.Bucket([]byte("widgets")).Put([]byte("foo"), []byte("bar")) + + err := errors.New("foo") + equals(t, err, tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return err + })) + return nil + }) +} + // Ensure that Tx commit handlers are called after a transaction successfully commits. func TestTx_OnCommit(t *testing.T) { var x int From e8f40b04128b39a6e05f6636cd267be934407585 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 25 Aug 2015 10:57:23 -0700 Subject: [PATCH 2/2] storage/backend: add commitAndStop After the upgrade of boltdb, db.Close waits for all txn to finish. CommitAndStop commits the current txn and stop creating new ones. --- storage/backend/backend.go | 2 +- storage/backend/batch_tx.go | 22 +++++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 9cb995db0..9ce9098c6 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -80,7 +80,7 @@ func (b *backend) run() { select { case <-time.After(b.batchInterval): case <-b.stopc: - b.batchTx.Commit() + b.batchTx.CommitAndStop() return } b.batchTx.Commit() diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index 8cdcf23b2..d01df3906 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -16,6 +16,7 @@ type BatchTx interface { UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeDelete(bucketName []byte, key []byte) Commit() + CommitAndStop() } type batchTx struct { @@ -43,7 +44,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.commit() + t.commit(false) t.pending = 0 } } @@ -84,19 +85,26 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.commit() + t.commit(false) t.pending = 0 } } -// commitAndBegin commits a previous tx and begins a new writable one. +// Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { t.Lock() defer t.Unlock() - t.commit() + t.commit(false) } -func (t *batchTx) commit() { +// CommitAndStop commits the previous tx and do not create a new one. +func (t *batchTx) CommitAndStop() { + t.Lock() + defer t.Unlock() + t.commit(true) +} + +func (t *batchTx) commit(stop bool) { var err error // commit the last tx if t.tx != nil { @@ -106,6 +114,10 @@ func (t *batchTx) commit() { } } + if stop { + return + } + // begin a new tx t.tx, err = t.backend.db.Begin(true) if err != nil {