Godeps: update bolt dependency

This commit is contained in:
Xiang Li 2015-08-25 10:39:29 -07:00
parent 2d06f6b371
commit 8738a88fae
18 changed files with 485 additions and 50 deletions

4
Godeps/Godeps.json generated
View File

@ -20,8 +20,8 @@
}, },
{ {
"ImportPath": "github.com/boltdb/bolt", "ImportPath": "github.com/boltdb/bolt",
"Comment": "v1.0-71-g71f28ea", "Comment": "v1.0-119-g90fef38",
"Rev": "71f28eaecbebd00604d87bb1de0dae8fcfa54bbd" "Rev": "90fef389f98027ca55594edd7dbd6e7f3926fdad"
}, },
{ {
"ImportPath": "github.com/bradfitz/http2", "ImportPath": "github.com/bradfitz/http2",

View File

@ -1,3 +1,4 @@
*.prof *.prof
*.test *.test
*.swp
/bin/ /bin/

View File

@ -87,6 +87,11 @@ are not thread safe. To work with data in multiple goroutines you must start
a transaction for each one or use locking to ensure only one goroutine accesses a transaction for each one or use locking to ensure only one goroutine accesses
a transaction at a time. Creating transaction from the `DB` is thread safe. a transaction at a time. Creating transaction from the `DB` is thread safe.
Read-only transactions and read-write transactions should not depend on one
another and generally shouldn't be opened simultaneously in the same goroutine.
This can cause a deadlock as the read-write transaction needs to periodically
re-map the data file but it cannot do so while a read-only transaction is open.
#### Read-write transactions #### Read-write transactions
@ -446,6 +451,21 @@ It's also useful to pipe these stats to a service such as statsd for monitoring
or to provide an HTTP endpoint that will perform a fixed-length sample. or to provide an HTTP endpoint that will perform a fixed-length sample.
### Read-Only Mode
Sometimes it is useful to create a shared, read-only Bolt database. To this,
set the `Options.ReadOnly` flag when opening your database. Read-only mode
uses a shared lock to allow multiple processes to read from the database but
it will block any processes from opening the database in read-write mode.
```go
db, err := bolt.Open("my.db", 0666, &bolt.Options{ReadOnly: true})
if err != nil {
log.Fatal(err)
}
```
## Resources ## Resources
For more information on getting started with Bolt, check out the following articles: For more information on getting started with Bolt, check out the following articles:
@ -550,6 +570,11 @@ Here are a few things to note when evaluating and using Bolt:
However, this is expected and the OS will release memory as needed. Bolt can However, this is expected and the OS will release memory as needed. Bolt can
handle databases much larger than the available physical RAM. handle databases much larger than the available physical RAM.
* The data structures in the Bolt database are memory mapped so the data file
will be endian specific. This means that you cannot copy a Bolt file from a
little endian machine to a big endian machine and have it work. For most
users this is not a concern since most modern CPUs are little endian.
* Because of the way pages are laid out on disk, Bolt cannot truncate data files * Because of the way pages are laid out on disk, Bolt cannot truncate data files
and return free pages back to the disk. Instead, Bolt maintains a free list and return free pages back to the disk. Instead, Bolt maintains a free list
of unused pages within its data file. These free pages can be reused by later of unused pages within its data file. These free pages can be reused by later
@ -567,7 +592,7 @@ Here are a few things to note when evaluating and using Bolt:
Below is a list of public, open source projects that use Bolt: Below is a list of public, open source projects that use Bolt:
* [Operation Go: A Routine Mission](http://gocode.io) - An online programming game for Golang using Bolt for user accounts and a leaderboard. * [Operation Go: A Routine Mission](http://gocode.io) - An online programming game for Golang using Bolt for user accounts and a leaderboard.
* [Bazil](https://github.com/bazillion/bazil) - A file system that lets your data reside where it is most convenient for it to reside. * [Bazil](https://bazil.org/) - A file system that lets your data reside where it is most convenient for it to reside.
* [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb. * [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb.
* [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics. * [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics.
* [Scuttlebutt](https://github.com/benbjohnson/scuttlebutt) - Uses Bolt to store and process all Twitter mentions of GitHub projects. * [Scuttlebutt](https://github.com/benbjohnson/scuttlebutt) - Uses Bolt to store and process all Twitter mentions of GitHub projects.
@ -587,5 +612,10 @@ Below is a list of public, open source projects that use Bolt:
* [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database. * [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database.
* [Seaweed File System](https://github.com/chrislusf/weed-fs) - Highly scalable distributed key~file system with O(1) disk read. * [Seaweed File System](https://github.com/chrislusf/weed-fs) - Highly scalable distributed key~file system with O(1) disk read.
* [InfluxDB](http://influxdb.com) - Scalable datastore for metrics, events, and real-time analytics. * [InfluxDB](http://influxdb.com) - Scalable datastore for metrics, events, and real-time analytics.
* [Freehold](http://tshannon.bitbucket.org/freehold/) - An open, secure, and lightweight platform for your files and data.
* [Prometheus Annotation Server](https://github.com/oliver006/prom_annotation_server) - Annotation server for PromDash & Prometheus service monitoring system.
* [Consul](https://github.com/hashicorp/consul) - Consul is service discovery and configuration made easy. Distributed, highly available, and datacenter-aware.
* [Kala](https://github.com/ajvb/kala) - Kala is a modern job scheduler optimized to run on a single node. It is persistant, JSON over HTTP API, ISO 8601 duration notation, and dependent jobs.
* [drive](https://github.com/odeke-em/drive) - drive is an unofficial Google Drive command line client for \*NIX operating systems.
If you are using Bolt in a project please send a pull request to add it to the list. If you are using Bolt in a project please send a pull request to add it to the list.

View File

@ -20,6 +20,9 @@ import (
// take permanent effect only after a successful return is seen in // take permanent effect only after a successful return is seen in
// caller. // caller.
// //
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
// and DB.MaxBatchDelay, respectively.
//
// Batch is only useful when there are multiple goroutines calling it. // Batch is only useful when there are multiple goroutines calling it.
func (db *DB) Batch(fn func(*Tx) error) error { func (db *DB) Batch(fn func(*Tx) error) error {
errCh := make(chan error, 1) errCh := make(chan error, 1)

View File

@ -1,4 +1,4 @@
// +build !windows,!plan9 // +build !windows,!plan9,!solaris
package bolt package bolt
@ -11,7 +11,7 @@ import (
) )
// flock acquires an advisory lock on a file descriptor. // flock acquires an advisory lock on a file descriptor.
func flock(f *os.File, timeout time.Duration) error { func flock(f *os.File, exclusive bool, timeout time.Duration) error {
var t time.Time var t time.Time
for { for {
// If we're beyond our timeout then return an error. // If we're beyond our timeout then return an error.
@ -21,9 +21,13 @@ func flock(f *os.File, timeout time.Duration) error {
} else if timeout > 0 && time.Since(t) > timeout { } else if timeout > 0 && time.Since(t) > timeout {
return ErrTimeout return ErrTimeout
} }
flag := syscall.LOCK_SH
if exclusive {
flag = syscall.LOCK_EX
}
// Otherwise attempt to obtain an exclusive lock. // Otherwise attempt to obtain an exclusive lock.
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) err := syscall.Flock(int(f.Fd()), flag|syscall.LOCK_NB)
if err == nil { if err == nil {
return nil return nil
} else if err != syscall.EWOULDBLOCK { } else if err != syscall.EWOULDBLOCK {
@ -44,11 +48,13 @@ func funlock(f *os.File) error {
func mmap(db *DB, sz int) error { func mmap(db *DB, sz int) error {
// Truncate and fsync to ensure file size metadata is flushed. // Truncate and fsync to ensure file size metadata is flushed.
// https://github.com/boltdb/bolt/issues/284 // https://github.com/boltdb/bolt/issues/284
if err := db.file.Truncate(int64(sz)); err != nil { if !db.NoGrowSync && !db.readOnly {
return fmt.Errorf("file resize error: %s", err) if err := db.file.Truncate(int64(sz)); err != nil {
} return fmt.Errorf("file resize error: %s", err)
if err := db.file.Sync(); err != nil { }
return fmt.Errorf("file sync error: %s", err) if err := db.file.Sync(); err != nil {
return fmt.Errorf("file sync error: %s", err)
}
} }
// Map the data file to memory. // Map the data file to memory.
@ -57,6 +63,11 @@ func mmap(db *DB, sz int) error {
return err return err
} }
// Advise the kernel that the mmap is accessed randomly.
if err := madvise(b, syscall.MADV_RANDOM); err != nil {
return fmt.Errorf("madvise: %s", err)
}
// Save the original byte slice and convert to a byte array pointer. // Save the original byte slice and convert to a byte array pointer.
db.dataref = b db.dataref = b
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0])) db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
@ -78,3 +89,12 @@ func munmap(db *DB) error {
db.datasz = 0 db.datasz = 0
return err return err
} }
// NOTE: This function is copied from stdlib because it is not available on darwin.
func madvise(b []byte, advice int) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice))
if e1 != 0 {
err = e1
}
return
}

View File

@ -16,7 +16,7 @@ func fdatasync(db *DB) error {
} }
// flock acquires an advisory lock on a file descriptor. // flock acquires an advisory lock on a file descriptor.
func flock(f *os.File, _ time.Duration) error { func flock(f *os.File, _ bool, _ time.Duration) error {
return nil return nil
} }
@ -28,9 +28,11 @@ func funlock(f *os.File) error {
// mmap memory maps a DB's data file. // mmap memory maps a DB's data file.
// Based on: https://github.com/edsrzf/mmap-go // Based on: https://github.com/edsrzf/mmap-go
func mmap(db *DB, sz int) error { func mmap(db *DB, sz int) error {
// Truncate the database to the size of the mmap. if !db.readOnly {
if err := db.file.Truncate(int64(sz)); err != nil { // Truncate the database to the size of the mmap.
return fmt.Errorf("truncate: %s", err) if err := db.file.Truncate(int64(sz)); err != nil {
return fmt.Errorf("truncate: %s", err)
}
} }
// Open a file mapping handle. // Open a file mapping handle.

View File

@ -640,6 +640,22 @@ func TestBucket_Put_KeyTooLarge(t *testing.T) {
}) })
} }
// Ensure that an error is returned when inserting a value that's too large.
func TestBucket_Put_ValueTooLarge(t *testing.T) {
if os.Getenv("DRONE") == "true" {
t.Skip("not enough RAM for test")
}
db := NewTestDB()
defer db.Close()
db.Update(func(tx *bolt.Tx) error {
tx.CreateBucket([]byte("widgets"))
err := tx.Bucket([]byte("widgets")).Put([]byte("foo"), make([]byte, bolt.MaxValueSize+1))
equals(t, err, bolt.ErrValueTooLarge)
return nil
})
}
// Ensure a bucket can calculate stats. // Ensure a bucket can calculate stats.
func TestBucket_Stats(t *testing.T) { func TestBucket_Stats(t *testing.T) {
db := NewTestDB() db := NewTestDB()

View File

@ -344,7 +344,7 @@ func (cmd *DumpCommand) Run(args ...string) error {
for i, pageID := range pageIDs { for i, pageID := range pageIDs {
// Print a separator. // Print a separator.
if i > 0 { if i > 0 {
fmt.Fprintln(cmd.Stdout, "===============================================\n") fmt.Fprintln(cmd.Stdout, "===============================================")
} }
// Print page to stdout. // Print page to stdout.
@ -465,7 +465,7 @@ func (cmd *PageCommand) Run(args ...string) error {
for i, pageID := range pageIDs { for i, pageID := range pageIDs {
// Print a separator. // Print a separator.
if i > 0 { if i > 0 {
fmt.Fprintln(cmd.Stdout, "===============================================\n") fmt.Fprintln(cmd.Stdout, "===============================================")
} }
// Retrieve page info and page size. // Retrieve page info and page size.
@ -917,7 +917,7 @@ func (cmd *BenchCommand) Run(args ...string) error {
// Write to the database. // Write to the database.
var results BenchResults var results BenchResults
if err := cmd.runWrites(db, options, &results); err != nil { if err := cmd.runWrites(db, options, &results); err != nil {
return fmt.Errorf("write: ", err) return fmt.Errorf("write: %v", err)
} }
// Read from the database. // Read from the database.

View File

@ -55,6 +55,14 @@ type DB struct {
// THIS IS UNSAFE. PLEASE USE WITH CAUTION. // THIS IS UNSAFE. PLEASE USE WITH CAUTION.
NoSync bool NoSync bool
// When true, skips the truncate call when growing the database.
// Setting this to true is only safe on non-ext3/ext4 systems.
// Skipping truncation avoids preallocation of hard drive space and
// bypasses a truncate() and fsync() syscall on remapping.
//
// https://github.com/boltdb/bolt/issues/284
NoGrowSync bool
// MaxBatchSize is the maximum size of a batch. Default value is // MaxBatchSize is the maximum size of a batch. Default value is
// copied from DefaultMaxBatchSize in Open. // copied from DefaultMaxBatchSize in Open.
// //
@ -96,6 +104,10 @@ type DB struct {
ops struct { ops struct {
writeAt func(b []byte, off int64) (n int, err error) writeAt func(b []byte, off int64) (n int, err error)
} }
// Read only mode.
// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
readOnly bool
} }
// Path returns the path to currently open database file. // Path returns the path to currently open database file.
@ -123,24 +135,34 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
if options == nil { if options == nil {
options = DefaultOptions options = DefaultOptions
} }
db.NoGrowSync = options.NoGrowSync
// Set default values for later DB operations. // Set default values for later DB operations.
db.MaxBatchSize = DefaultMaxBatchSize db.MaxBatchSize = DefaultMaxBatchSize
db.MaxBatchDelay = DefaultMaxBatchDelay db.MaxBatchDelay = DefaultMaxBatchDelay
flag := os.O_RDWR
if options.ReadOnly {
flag = os.O_RDONLY
db.readOnly = true
}
// Open data file and separate sync handler for metadata writes. // Open data file and separate sync handler for metadata writes.
db.path = path db.path = path
var err error var err error
if db.file, err = os.OpenFile(db.path, os.O_RDWR|os.O_CREATE, mode); err != nil { if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
_ = db.close() _ = db.close()
return nil, err return nil, err
} }
// Lock file so that other processes using Bolt cannot use the database // Lock file so that other processes using Bolt in read-write mode cannot
// at the same time. This would cause corruption since the two processes // use the database at the same time. This would cause corruption since
// would write meta pages and free pages separately. // the two processes would write meta pages and free pages separately.
if err := flock(db.file, options.Timeout); err != nil { // The database file is locked exclusively (only one process can grab the lock)
// if !options.ReadOnly.
// The database file is locked using the shared lock (more than one process may
// hold a lock at the same time) otherwise (options.ReadOnly is set).
if err := flock(db.file, !db.readOnly, options.Timeout); err != nil {
_ = db.close() _ = db.close()
return nil, err return nil, err
} }
@ -247,8 +269,8 @@ func (db *DB) munmap() error {
// of the database. The minimum size is 1MB and doubles until it reaches 1GB. // of the database. The minimum size is 1MB and doubles until it reaches 1GB.
// Returns an error if the new mmap size is greater than the max allowed. // Returns an error if the new mmap size is greater than the max allowed.
func (db *DB) mmapSize(size int) (int, error) { func (db *DB) mmapSize(size int) (int, error) {
// Double the size from 1MB until 1GB. // Double the size from 32KB until 1GB.
for i := uint(20); i <= 30; i++ { for i := uint(15); i <= 30; i++ {
if size <= 1<<i { if size <= 1<<i {
return 1 << i, nil return 1 << i, nil
} }
@ -329,8 +351,15 @@ func (db *DB) init() error {
// Close releases all database resources. // Close releases all database resources.
// All transactions must be closed before closing the database. // All transactions must be closed before closing the database.
func (db *DB) Close() error { func (db *DB) Close() error {
db.rwlock.Lock()
defer db.rwlock.Unlock()
db.metalock.Lock() db.metalock.Lock()
defer db.metalock.Unlock() defer db.metalock.Unlock()
db.mmaplock.RLock()
defer db.mmaplock.RUnlock()
return db.close() return db.close()
} }
@ -350,8 +379,11 @@ func (db *DB) close() error {
// Close file handles. // Close file handles.
if db.file != nil { if db.file != nil {
// Unlock the file. // No need to unlock read-only file.
_ = funlock(db.file) if !db.readOnly {
// Unlock the file.
_ = funlock(db.file)
}
// Close the file descriptor. // Close the file descriptor.
if err := db.file.Close(); err != nil { if err := db.file.Close(); err != nil {
@ -369,6 +401,11 @@ func (db *DB) close() error {
// will cause the calls to block and be serialized until the current write // will cause the calls to block and be serialized until the current write
// transaction finishes. // transaction finishes.
// //
// Transactions should not be depedent on one another. Opening a read
// transaction and a write transaction in the same goroutine can cause the
// writer to deadlock because the database periodically needs to re-mmap itself
// as it grows and it cannot do that while a read transaction is open.
//
// IMPORTANT: You must close read-only transactions after you are finished or // IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages. // else the database will not reclaim old pages.
func (db *DB) Begin(writable bool) (*Tx, error) { func (db *DB) Begin(writable bool) (*Tx, error) {
@ -417,6 +454,11 @@ func (db *DB) beginTx() (*Tx, error) {
} }
func (db *DB) beginRWTx() (*Tx, error) { func (db *DB) beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, ErrDatabaseReadOnly
}
// Obtain writer lock. This is released by the transaction when it closes. // Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time. // This enforces only one writer transaction at a time.
db.rwlock.Lock() db.rwlock.Lock()
@ -547,6 +589,12 @@ func (db *DB) View(fn func(*Tx) error) error {
return nil return nil
} }
// Sync executes fdatasync() against the database file handle.
//
// This is not necessary under normal operation, however, if you use NoSync
// then it allows you to force the database file to sync against the disk.
func (db *DB) Sync() error { return fdatasync(db) }
// Stats retrieves ongoing performance stats for the database. // Stats retrieves ongoing performance stats for the database.
// This is only updated when a transaction closes. // This is only updated when a transaction closes.
func (db *DB) Stats() Stats { func (db *DB) Stats() Stats {
@ -607,18 +655,30 @@ func (db *DB) allocate(count int) (*page, error) {
return p, nil return p, nil
} }
func (db *DB) IsReadOnly() bool {
return db.readOnly
}
// Options represents the options that can be set when opening a database. // Options represents the options that can be set when opening a database.
type Options struct { type Options struct {
// Timeout is the amount of time to wait to obtain a file lock. // Timeout is the amount of time to wait to obtain a file lock.
// When set to zero it will wait indefinitely. This option is only // When set to zero it will wait indefinitely. This option is only
// available on Darwin and Linux. // available on Darwin and Linux.
Timeout time.Duration Timeout time.Duration
// Sets the DB.NoGrowSync flag before memory mapping the file.
NoGrowSync bool
// Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to
// grab a shared lock (UNIX).
ReadOnly bool
} }
// DefaultOptions represent the options used if nil options are passed into Open(). // DefaultOptions represent the options used if nil options are passed into Open().
// No timeout is used which will cause Bolt to wait indefinitely for a lock. // No timeout is used which will cause Bolt to wait indefinitely for a lock.
var DefaultOptions = &Options{ var DefaultOptions = &Options{
Timeout: 0, Timeout: 0,
NoGrowSync: false,
} }
// Stats represents statistics about the database. // Stats represents statistics about the database.

View File

@ -42,6 +42,9 @@ func TestOpen_Timeout(t *testing.T) {
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
t.Skip("timeout not supported on windows") t.Skip("timeout not supported on windows")
} }
if runtime.GOOS == "solaris" {
t.Skip("solaris fcntl locks don't support intra-process locking")
}
path := tempfile() path := tempfile()
defer os.Remove(path) defer os.Remove(path)
@ -66,6 +69,9 @@ func TestOpen_Wait(t *testing.T) {
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
t.Skip("timeout not supported on windows") t.Skip("timeout not supported on windows")
} }
if runtime.GOOS == "solaris" {
t.Skip("solaris fcntl locks don't support intra-process locking")
}
path := tempfile() path := tempfile()
defer os.Remove(path) defer os.Remove(path)
@ -224,6 +230,80 @@ func TestDB_Open_FileTooSmall(t *testing.T) {
equals(t, errors.New("file size too small"), err) equals(t, errors.New("file size too small"), err)
} }
// Ensure that a database can be opened in read-only mode by multiple processes
// and that a database can not be opened in read-write mode and in read-only
// mode at the same time.
func TestOpen_ReadOnly(t *testing.T) {
if runtime.GOOS == "solaris" {
t.Skip("solaris fcntl locks don't support intra-process locking")
}
bucket, key, value := []byte(`bucket`), []byte(`key`), []byte(`value`)
path := tempfile()
defer os.Remove(path)
// Open in read-write mode.
db, err := bolt.Open(path, 0666, nil)
ok(t, db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket(bucket)
if err != nil {
return err
}
return b.Put(key, value)
}))
assert(t, db != nil, "")
assert(t, !db.IsReadOnly(), "")
ok(t, err)
ok(t, db.Close())
// Open in read-only mode.
db0, err := bolt.Open(path, 0666, &bolt.Options{ReadOnly: true})
ok(t, err)
defer db0.Close()
// Opening in read-write mode should return an error.
_, err = bolt.Open(path, 0666, &bolt.Options{Timeout: time.Millisecond * 100})
assert(t, err != nil, "")
// And again (in read-only mode).
db1, err := bolt.Open(path, 0666, &bolt.Options{ReadOnly: true})
ok(t, err)
defer db1.Close()
// Verify both read-only databases are accessible.
for _, db := range []*bolt.DB{db0, db1} {
// Verify is is in read only mode indeed.
assert(t, db.IsReadOnly(), "")
// Read-only databases should not allow updates.
assert(t,
bolt.ErrDatabaseReadOnly == db.Update(func(*bolt.Tx) error {
panic(`should never get here`)
}),
"")
// Read-only databases should not allow beginning writable txns.
_, err = db.Begin(true)
assert(t, bolt.ErrDatabaseReadOnly == err, "")
// Verify the data.
ok(t, db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
if b == nil {
return fmt.Errorf("expected bucket `%s`", string(bucket))
}
got := string(b.Get(key))
expected := string(value)
if got != expected {
return fmt.Errorf("expected `%s`, got `%s`", expected, got)
}
return nil
}))
}
}
// TODO(benbjohnson): Test corruption at every byte of the first two pages. // TODO(benbjohnson): Test corruption at every byte of the first two pages.
// Ensure that a database cannot open a transaction when it's not open. // Ensure that a database cannot open a transaction when it's not open.
@ -254,6 +334,49 @@ func TestDB_BeginRW_Closed(t *testing.T) {
assert(t, tx == nil, "") assert(t, tx == nil, "")
} }
func TestDB_Close_PendingTx_RW(t *testing.T) { testDB_Close_PendingTx(t, true) }
func TestDB_Close_PendingTx_RO(t *testing.T) { testDB_Close_PendingTx(t, false) }
// Ensure that a database cannot close while transactions are open.
func testDB_Close_PendingTx(t *testing.T, writable bool) {
db := NewTestDB()
defer db.Close()
// Start transaction.
tx, err := db.Begin(true)
if err != nil {
t.Fatal(err)
}
// Open update in separate goroutine.
done := make(chan struct{})
go func() {
db.Close()
close(done)
}()
// Ensure database hasn't closed.
time.Sleep(100 * time.Millisecond)
select {
case <-done:
t.Fatal("database closed too early")
default:
}
// Commit transaction.
if err := tx.Commit(); err != nil {
t.Fatal(err)
}
// Ensure database closed now.
time.Sleep(100 * time.Millisecond)
select {
case <-done:
default:
t.Fatal("database did not close")
}
}
// Ensure a database can provide a transactional block. // Ensure a database can provide a transactional block.
func TestDB_Update(t *testing.T) { func TestDB_Update(t *testing.T) {
db := NewTestDB() db := NewTestDB()
@ -678,7 +801,7 @@ func (db *TestDB) PrintStats() {
// MustCheck runs a consistency check on the database and panics if any errors are found. // MustCheck runs a consistency check on the database and panics if any errors are found.
func (db *TestDB) MustCheck() { func (db *TestDB) MustCheck() {
db.View(func(tx *bolt.Tx) error { db.Update(func(tx *bolt.Tx) error {
// Collect all the errors. // Collect all the errors.
var errors []error var errors []error
for err := range tx.Check() { for err := range tx.Check() {

View File

@ -36,6 +36,10 @@ var (
// ErrTxClosed is returned when committing or rolling back a transaction // ErrTxClosed is returned when committing or rolling back a transaction
// that has already been committed or rolled back. // that has already been committed or rolled back.
ErrTxClosed = errors.New("tx closed") ErrTxClosed = errors.New("tx closed")
// ErrDatabaseReadOnly is returned when a mutating transaction is started on a
// read-only database.
ErrDatabaseReadOnly = errors.New("database is in read-only mode")
) )
// These errors can occur when putting or deleting a value or a bucket. // These errors can occur when putting or deleting a value or a bucket.

View File

@ -48,15 +48,14 @@ func (f *freelist) pending_count() int {
// all returns a list of all free ids and all pending ids in one sorted list. // all returns a list of all free ids and all pending ids in one sorted list.
func (f *freelist) all() []pgid { func (f *freelist) all() []pgid {
ids := make([]pgid, len(f.ids)) m := make(pgids, 0)
copy(ids, f.ids)
for _, list := range f.pending { for _, list := range f.pending {
ids = append(ids, list...) m = append(m, list...)
} }
sort.Sort(pgids(ids)) sort.Sort(m)
return ids return pgids(f.ids).merge(m)
} }
// allocate returns the starting page id of a contiguous list of pages of a given size. // allocate returns the starting page id of a contiguous list of pages of a given size.
@ -127,15 +126,17 @@ func (f *freelist) free(txid txid, p *page) {
// release moves all page ids for a transaction id (or older) to the freelist. // release moves all page ids for a transaction id (or older) to the freelist.
func (f *freelist) release(txid txid) { func (f *freelist) release(txid txid) {
m := make(pgids, 0)
for tid, ids := range f.pending { for tid, ids := range f.pending {
if tid <= txid { if tid <= txid {
// Move transaction's pending pages to the available freelist. // Move transaction's pending pages to the available freelist.
// Don't remove from the cache since the page is still free. // Don't remove from the cache since the page is still free.
f.ids = append(f.ids, ids...) m = append(m, ids...)
delete(f.pending, tid) delete(f.pending, tid)
} }
} }
sort.Sort(pgids(f.ids)) sort.Sort(m)
f.ids = pgids(f.ids).merge(m)
} }
// rollback removes the pages from a given pending tx. // rollback removes the pages from a given pending tx.

View File

@ -1,7 +1,9 @@
package bolt package bolt
import ( import (
"math/rand"
"reflect" "reflect"
"sort"
"testing" "testing"
"unsafe" "unsafe"
) )
@ -127,3 +129,28 @@ func TestFreelist_write(t *testing.T) {
t.Fatalf("exp=%v; got=%v", exp, f2.ids) t.Fatalf("exp=%v; got=%v", exp, f2.ids)
} }
} }
func Benchmark_FreelistRelease10K(b *testing.B) { benchmark_FreelistRelease(b, 10000) }
func Benchmark_FreelistRelease100K(b *testing.B) { benchmark_FreelistRelease(b, 100000) }
func Benchmark_FreelistRelease1000K(b *testing.B) { benchmark_FreelistRelease(b, 1000000) }
func Benchmark_FreelistRelease10000K(b *testing.B) { benchmark_FreelistRelease(b, 10000000) }
func benchmark_FreelistRelease(b *testing.B, size int) {
ids := randomPgids(size)
pending := randomPgids(len(ids) / 400)
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := &freelist{ids: ids, pending: map[txid][]pgid{1: pending}}
f.release(1)
}
}
func randomPgids(n int) []pgid {
rand.Seed(42)
pgids := make(pgids, n)
for i := range pgids {
pgids[i] = pgid(rand.Int63())
}
sort.Sort(pgids)
return pgids
}

View File

@ -221,11 +221,20 @@ func (n *node) write(p *page) {
_assert(elem.pgid != p.id, "write: circular dependency occurred") _assert(elem.pgid != p.id, "write: circular dependency occurred")
} }
// If the length of key+value is larger than the max allocation size
// then we need to reallocate the byte array pointer.
//
// See: https://github.com/boltdb/bolt/pull/335
klen, vlen := len(item.key), len(item.value)
if len(b) < klen+vlen {
b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:]
}
// Write data for the element to the end of the page. // Write data for the element to the end of the page.
copy(b[0:], item.key) copy(b[0:], item.key)
b = b[len(item.key):] b = b[klen:]
copy(b[0:], item.value) copy(b[0:], item.value)
b = b[len(item.value):] b = b[vlen:]
} }
// DEBUG ONLY: n.dump() // DEBUG ONLY: n.dump()

View File

@ -3,6 +3,7 @@ package bolt
import ( import (
"fmt" "fmt"
"os" "os"
"sort"
"unsafe" "unsafe"
) )
@ -96,7 +97,7 @@ type branchPageElement struct {
// key returns a byte slice of the node key. // key returns a byte slice of the node key.
func (n *branchPageElement) key() []byte { func (n *branchPageElement) key() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos : n.pos+n.ksize] return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize]
} }
// leafPageElement represents a node on a leaf page. // leafPageElement represents a node on a leaf page.
@ -110,13 +111,13 @@ type leafPageElement struct {
// key returns a byte slice of the node key. // key returns a byte slice of the node key.
func (n *leafPageElement) key() []byte { func (n *leafPageElement) key() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos : n.pos+n.ksize] return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize]
} }
// value returns a byte slice of the node value. // value returns a byte slice of the node value.
func (n *leafPageElement) value() []byte { func (n *leafPageElement) value() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize] return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos+n.ksize]))[:n.vsize]
} }
// PageInfo represents human readable information about a page. // PageInfo represents human readable information about a page.
@ -132,3 +133,40 @@ type pgids []pgid
func (s pgids) Len() int { return len(s) } func (s pgids) Len() int { return len(s) }
func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s pgids) Less(i, j int) bool { return s[i] < s[j] } func (s pgids) Less(i, j int) bool { return s[i] < s[j] }
// merge returns the sorted union of a and b.
func (a pgids) merge(b pgids) pgids {
// Return the opposite slice if one is nil.
if len(a) == 0 {
return b
} else if len(b) == 0 {
return a
}
// Create a list to hold all elements from both lists.
merged := make(pgids, 0, len(a)+len(b))
// Assign lead to the slice with a lower starting value, follow to the higher value.
lead, follow := a, b
if b[0] < a[0] {
lead, follow = b, a
}
// Continue while there are elements in the lead.
for len(lead) > 0 {
// Merge largest prefix of lead that is ahead of follow[0].
n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] })
merged = append(merged, lead[:n]...)
if n >= len(lead) {
break
}
// Swap lead and follow.
lead, follow = follow, lead[n:]
}
// Append what's left in follow.
merged = append(merged, follow...)
return merged
}

View File

@ -1,7 +1,10 @@
package bolt package bolt
import ( import (
"reflect"
"sort"
"testing" "testing"
"testing/quick"
) )
// Ensure that the page type can be returned in human readable format. // Ensure that the page type can be returned in human readable format.
@ -27,3 +30,43 @@ func TestPage_typ(t *testing.T) {
func TestPage_dump(t *testing.T) { func TestPage_dump(t *testing.T) {
(&page{id: 256}).hexdump(16) (&page{id: 256}).hexdump(16)
} }
func TestPgids_merge(t *testing.T) {
a := pgids{4, 5, 6, 10, 11, 12, 13, 27}
b := pgids{1, 3, 8, 9, 25, 30}
c := a.merge(b)
if !reflect.DeepEqual(c, pgids{1, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30}) {
t.Errorf("mismatch: %v", c)
}
a = pgids{4, 5, 6, 10, 11, 12, 13, 27, 35, 36}
b = pgids{8, 9, 25, 30}
c = a.merge(b)
if !reflect.DeepEqual(c, pgids{4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30, 35, 36}) {
t.Errorf("mismatch: %v", c)
}
}
func TestPgids_merge_quick(t *testing.T) {
if err := quick.Check(func(a, b pgids) bool {
// Sort incoming lists.
sort.Sort(a)
sort.Sort(b)
// Merge the two lists together.
got := a.merge(b)
// The expected value should be the two lists combined and sorted.
exp := append(a, b...)
sort.Sort(exp)
if !reflect.DeepEqual(exp, got) {
t.Errorf("\nexp=%+v\ngot=%+v\n", exp, got)
return false
}
return true
}, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -127,7 +127,8 @@ func (tx *Tx) OnCommit(fn func()) {
} }
// Commit writes all changes to disk and updates the meta page. // Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs. // Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
func (tx *Tx) Commit() error { func (tx *Tx) Commit() error {
_assert(!tx.managed, "managed tx commit not allowed") _assert(!tx.managed, "managed tx commit not allowed")
if tx.db == nil { if tx.db == nil {
@ -203,7 +204,8 @@ func (tx *Tx) Commit() error {
return nil return nil
} }
// Rollback closes the transaction and ignores all previous updates. // Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error { func (tx *Tx) Rollback() error {
_assert(!tx.managed, "managed tx rollback not allowed") _assert(!tx.managed, "managed tx rollback not allowed")
if tx.db == nil { if tx.db == nil {
@ -421,15 +423,39 @@ func (tx *Tx) write() error {
// Write pages to disk in order. // Write pages to disk in order.
for _, p := range pages { for _, p := range pages {
size := (int(p.overflow) + 1) * tx.db.pageSize size := (int(p.overflow) + 1) * tx.db.pageSize
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
offset := int64(p.id) * int64(tx.db.pageSize) offset := int64(p.id) * int64(tx.db.pageSize)
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
// Update statistics. // Write out page in "max allocation" sized chunks.
tx.stats.Write++ ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
for {
// Limit our write to our max allocation size.
sz := size
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
// Write chunk to disk.
buf := ptr[:sz]
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
// Update statistics.
tx.stats.Write++
// Exit inner for loop if we've written all the chunks.
size -= sz
if size == 0 {
break
}
// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz)
ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
}
} }
// Ignore file sync if flag is set on DB.
if !tx.db.NoSync || IgnoreNoSync { if !tx.db.NoSync || IgnoreNoSync {
if err := fdatasync(tx.db); err != nil { if err := fdatasync(tx.db); err != nil {
return err return err

View File

@ -252,6 +252,38 @@ func TestTx_DeleteBucket_NotFound(t *testing.T) {
}) })
} }
// Ensure that no error is returned when a tx.ForEach function does not return
// an error.
func TestTx_ForEach_NoError(t *testing.T) {
db := NewTestDB()
defer db.Close()
db.Update(func(tx *bolt.Tx) error {
tx.CreateBucket([]byte("widgets"))
tx.Bucket([]byte("widgets")).Put([]byte("foo"), []byte("bar"))
equals(t, nil, tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return nil
}))
return nil
})
}
// Ensure that an error is returned when a tx.ForEach function returns an error.
func TestTx_ForEach_WithError(t *testing.T) {
db := NewTestDB()
defer db.Close()
db.Update(func(tx *bolt.Tx) error {
tx.CreateBucket([]byte("widgets"))
tx.Bucket([]byte("widgets")).Put([]byte("foo"), []byte("bar"))
err := errors.New("foo")
equals(t, err, tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return err
}))
return nil
})
}
// Ensure that Tx commit handlers are called after a transaction successfully commits. // Ensure that Tx commit handlers are called after a transaction successfully commits.
func TestTx_OnCommit(t *testing.T) { func TestTx_OnCommit(t *testing.T) {
var x int var x int