vendor: Switch from boltdb v1.3.0 to coreos/bbolt v1.3.1-coreos.3

This commit is contained in:
Joe Betz 2017-11-02 14:17:11 -07:00
parent 232a81d804
commit b32ec69f9b
36 changed files with 558 additions and 210 deletions

View File

@ -5,3 +5,6 @@ const maxMapSize = 0x7FFFFFFF // 2GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0xFFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

View File

@ -5,3 +5,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

28
cmd/vendor/github.com/coreos/bbolt/bolt_arm.go generated vendored Normal file
View File

@ -0,0 +1,28 @@
package bolt
import "unsafe"
// maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0x7FFFFFFF // 2GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0xFFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned bool
func init() {
// Simple check to see whether this arch handles unaligned load/stores
// correctly.
// ARM9 and older devices require load/stores to be from/to aligned
// addresses. If not, the lower 2 bits are cleared and that address is
// read in a jumbled up order.
// See http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.faqs/ka15414.html
raw := [6]byte{0xfe, 0xef, 0x11, 0x22, 0x22, 0x11}
val := *(*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(&raw)) + 2))
brokenUnaligned = val != 0x11222211
}

View File

@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

12
cmd/vendor/github.com/coreos/bbolt/bolt_mips64x.go generated vendored Normal file
View File

@ -0,0 +1,12 @@
// +build mips64 mips64le
package bolt
// maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0x8000000000 // 512GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

View File

@ -1,7 +1,12 @@
// +build mips mipsle
package bolt
// maxMapSize represents the largest mmap size supported by Bolt.
const maxMapSize = 0x7FFFFFFF // 2GB
const maxMapSize = 0x40000000 // 1GB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0xFFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

View File

@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

View File

@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

View File

@ -7,3 +7,6 @@ const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF
// Are unaligned load/stores broken on this arch?
var brokenUnaligned = false

View File

@ -13,29 +13,32 @@ import (
// flock acquires an advisory lock on a file descriptor.
func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
var t time.Time
if timeout != 0 {
t = time.Now()
}
fd := db.file.Fd()
flag := syscall.LOCK_NB
if exclusive {
flag |= syscall.LOCK_EX
} else {
flag |= syscall.LOCK_SH
}
for {
// If we're beyond our timeout then return an error.
// This can only occur after we've attempted a flock once.
if t.IsZero() {
t = time.Now()
} else if timeout > 0 && time.Since(t) > timeout {
return ErrTimeout
}
flag := syscall.LOCK_SH
if exclusive {
flag = syscall.LOCK_EX
}
// Otherwise attempt to obtain an exclusive lock.
err := syscall.Flock(int(db.file.Fd()), flag|syscall.LOCK_NB)
// Attempt to obtain an exclusive lock.
err := syscall.Flock(int(fd), flag)
if err == nil {
return nil
} else if err != syscall.EWOULDBLOCK {
return err
}
// If we timed out then return an error.
if timeout != 0 && time.Since(t) > timeout-flockRetryTimeout {
return ErrTimeout
}
// Wait for a bit and try again.
time.Sleep(50 * time.Millisecond)
time.Sleep(flockRetryTimeout)
}
}

View File

@ -13,34 +13,33 @@ import (
// flock acquires an advisory lock on a file descriptor.
func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
var t time.Time
if timeout != 0 {
t = time.Now()
}
fd := db.file.Fd()
var lockType int16
if exclusive {
lockType = syscall.F_WRLCK
} else {
lockType = syscall.F_RDLCK
}
for {
// If we're beyond our timeout then return an error.
// This can only occur after we've attempted a flock once.
if t.IsZero() {
t = time.Now()
} else if timeout > 0 && time.Since(t) > timeout {
return ErrTimeout
}
var lock syscall.Flock_t
lock.Start = 0
lock.Len = 0
lock.Pid = 0
lock.Whence = 0
lock.Pid = 0
if exclusive {
lock.Type = syscall.F_WRLCK
} else {
lock.Type = syscall.F_RDLCK
}
err := syscall.FcntlFlock(db.file.Fd(), syscall.F_SETLK, &lock)
// Attempt to obtain an exclusive lock.
lock := syscall.Flock_t{Type: lockType}
err := syscall.FcntlFlock(fd, syscall.F_SETLK, &lock)
if err == nil {
return nil
} else if err != syscall.EAGAIN {
return err
}
// If we timed out then return an error.
if timeout != 0 && time.Since(t) > timeout-flockRetryTimeout {
return ErrTimeout
}
// Wait for a bit and try again.
time.Sleep(50 * time.Millisecond)
time.Sleep(flockRetryTimeout)
}
}

View File

@ -59,29 +59,30 @@ func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) erro
db.lockfile = f
var t time.Time
if timeout != 0 {
t = time.Now()
}
fd := f.Fd()
var flag uint32 = flagLockFailImmediately
if exclusive {
flag |= flagLockExclusive
}
for {
// If we're beyond our timeout then return an error.
// This can only occur after we've attempted a flock once.
if t.IsZero() {
t = time.Now()
} else if timeout > 0 && time.Since(t) > timeout {
return ErrTimeout
}
var flag uint32 = flagLockFailImmediately
if exclusive {
flag |= flagLockExclusive
}
err := lockFileEx(syscall.Handle(db.lockfile.Fd()), flag, 0, 1, 0, &syscall.Overlapped{})
// Attempt to obtain an exclusive lock.
err := lockFileEx(syscall.Handle(fd), flag, 0, 1, 0, &syscall.Overlapped{})
if err == nil {
return nil
} else if err != errLockViolation {
return err
}
// If we timed oumercit then return an error.
if timeout != 0 && time.Since(t) > timeout-flockRetryTimeout {
return ErrTimeout
}
// Wait for a bit and try again.
time.Sleep(50 * time.Millisecond)
time.Sleep(flockRetryTimeout)
}
}
@ -89,7 +90,7 @@ func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) erro
func funlock(db *DB) error {
err := unlockFileEx(syscall.Handle(db.lockfile.Fd()), 0, 1, 0, &syscall.Overlapped{})
db.lockfile.Close()
os.Remove(db.path+lockExt)
os.Remove(db.path + lockExt)
return err
}

View File

@ -14,13 +14,6 @@ const (
MaxValueSize = (1 << 31) - 2
)
const (
maxUint = ^uint(0)
minUint = 0
maxInt = int(^uint(0) >> 1)
minInt = -maxInt - 1
)
const bucketHeaderSize = int(unsafe.Sizeof(bucket{}))
const (
@ -130,9 +123,17 @@ func (b *Bucket) Bucket(name []byte) *Bucket {
func (b *Bucket) openBucket(value []byte) *Bucket {
var child = newBucket(b.tx)
// If unaligned load/stores are broken on this arch and value is
// unaligned simply clone to an aligned byte array.
unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0
if unaligned {
value = cloneBytes(value)
}
// If this is a writable transaction then we need to copy the bucket entry.
// Read-only transactions can point directly at the mmap entry.
if b.tx.writable {
if b.tx.writable && !unaligned {
child.bucket = &bucket{}
*child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
} else {
@ -167,9 +168,8 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
if bytes.Equal(key, k) {
if (flags & bucketLeafFlag) != 0 {
return nil, ErrBucketExists
} else {
return nil, ErrIncompatibleValue
}
return nil, ErrIncompatibleValue
}
// Create empty, inline bucket.
@ -316,7 +316,12 @@ func (b *Bucket) Delete(key []byte) error {
// Move cursor to correct position.
c := b.Cursor()
_, _, flags := c.seek(key)
k, _, flags := c.seek(key)
// Return nil if the key doesn't exist.
if !bytes.Equal(key, k) {
return nil
}
// Return an error if there is already existing bucket value.
if (flags & bucketLeafFlag) != 0 {
@ -329,6 +334,28 @@ func (b *Bucket) Delete(key []byte) error {
return nil
}
// Sequence returns the current integer for the bucket without incrementing it.
func (b *Bucket) Sequence() uint64 { return b.bucket.sequence }
// SetSequence updates the sequence number for the bucket.
func (b *Bucket) SetSequence(v uint64) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
return ErrTxNotWritable
}
// Materialize the root node if it hasn't been already so that the
// bucket will be saved during commit.
if b.rootNode == nil {
_ = b.node(b.root, nil)
}
// Increment and return the sequence.
b.bucket.sequence = v
return nil
}
// NextSequence returns an autoincrementing integer for the bucket.
func (b *Bucket) NextSequence() (uint64, error) {
if b.tx.db == nil {

View File

@ -7,8 +7,7 @@ import (
"log"
"os"
"runtime"
"runtime/debug"
"strings"
"sort"
"sync"
"time"
"unsafe"
@ -23,6 +22,8 @@ const version = 2
// Represents a marker value to indicate that a file is a Bolt DB.
const magic uint32 = 0xED0CDAED
const pgidNoFreelist pgid = 0xffffffffffffffff
// IgnoreNoSync specifies whether the NoSync field of a DB is ignored when
// syncing changes to a file. This is required as some operating systems,
// such as OpenBSD, do not have a unified buffer cache (UBC) and writes
@ -39,6 +40,9 @@ const (
// default page size for db is set to the OS page size.
var defaultPageSize = os.Getpagesize()
// The time elapsed between consecutive file locking attempts.
const flockRetryTimeout = 50 * time.Millisecond
// DB represents a collection of buckets persisted to a file on disk.
// All data access is performed through transactions which can be obtained through the DB.
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
@ -61,6 +65,11 @@ type DB struct {
// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
NoSync bool
// When true, skips syncing freelist to disk. This improves the database
// write performance under normal operation, but requires a full database
// re-sync during recovery.
NoFreelistSync bool
// When true, skips the truncate call when growing the database.
// Setting this to true is only safe on non-ext3/ext4 systems.
// Skipping truncation avoids preallocation of hard drive space and
@ -107,9 +116,11 @@ type DB struct {
opened bool
rwtx *Tx
txs []*Tx
freelist *freelist
stats Stats
freelist *freelist
freelistLoad sync.Once
pagePool sync.Pool
batchMu sync.Mutex
@ -148,14 +159,17 @@ func (db *DB) String() string {
// If the file does not exist then it will be created automatically.
// Passing in nil options will cause Bolt to open the database with the default options.
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
var db = &DB{opened: true}
db := &DB{
opened: true,
}
// Set default options if no options are provided.
if options == nil {
options = DefaultOptions
}
db.NoSync = options.NoSync
db.NoGrowSync = options.NoGrowSync
db.MmapFlags = options.MmapFlags
db.NoFreelistSync = options.NoFreelistSync
// Set default values for later DB operations.
db.MaxBatchSize = DefaultMaxBatchSize
@ -184,6 +198,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
// The database file is locked using the shared lock (more than one process may
// hold a lock at the same time) otherwise (options.ReadOnly is set).
if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {
db.lockfile = nil // make 'unused' happy. TODO: rework locks
_ = db.close()
return nil, err
}
@ -191,6 +206,11 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
// Default values for test hooks
db.ops.writeAt = db.file.WriteAt
if db.pageSize = options.PageSize; db.pageSize == 0 {
// Set the default page size to the OS page size.
db.pageSize = defaultPageSize
}
// Initialize the database if it doesn't exist.
if info, err := db.file.Stat(); err != nil {
return nil, err
@ -202,20 +222,21 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
} else {
// Read the first meta page to determine the page size.
var buf [0x1000]byte
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
m := db.pageInBuffer(buf[:], 0).meta()
if err := m.validate(); err != nil {
// If we can't read the page size, we can assume it's the same
// as the OS -- since that's how the page size was chosen in the
// first place.
//
// If the first page is invalid and this OS uses a different
// page size than what the database was created with then we
// are out of luck and cannot access the database.
db.pageSize = os.Getpagesize()
} else {
// If we can't read the page size, but can read a page, assume
// it's the same as the OS or one given -- since that's how the
// page size was chosen in the first place.
//
// If the first page is invalid and this OS uses a different
// page size than what the database was created with then we
// are out of luck and cannot access the database.
//
// TODO: scan for next page
if bw, err := db.file.ReadAt(buf[:], 0); err == nil && bw == len(buf) {
if m := db.pageInBuffer(buf[:], 0).meta(); m.validate() == nil {
db.pageSize = int(m.pageSize)
}
} else {
return nil, ErrInvalid
}
}
@ -232,14 +253,50 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
return nil, err
}
// Read in the freelist.
db.freelist = newFreelist()
db.freelist.read(db.page(db.meta().freelist))
if db.readOnly {
return db, nil
}
db.loadFreelist()
// Flush freelist when transitioning from no sync to sync so
// NoFreelistSync unaware boltdb can open the db later.
if !db.NoFreelistSync && !db.hasSyncedFreelist() {
tx, err := db.Begin(true)
if tx != nil {
err = tx.Commit()
}
if err != nil {
_ = db.close()
return nil, err
}
}
// Mark the database as opened and return.
return db, nil
}
// loadFreelist reads the freelist if it is synced, or reconstructs it
// by scanning the DB if it is not synced. It assumes there are no
// concurrent accesses being made to the freelist.
func (db *DB) loadFreelist() {
db.freelistLoad.Do(func() {
db.freelist = newFreelist()
if !db.hasSyncedFreelist() {
// Reconstruct free list by scanning the DB.
db.freelist.readIDs(db.freepages())
} else {
// Read free list from freelist page.
db.freelist.read(db.page(db.meta().freelist))
}
db.stats.FreePageN = len(db.freelist.ids)
})
}
func (db *DB) hasSyncedFreelist() bool {
return db.meta().freelist != pgidNoFreelist
}
// mmap opens the underlying memory-mapped file and initializes the meta references.
// minsz is the minimum size that the new mmap can be.
func (db *DB) mmap(minsz int) error {
@ -341,9 +398,6 @@ func (db *DB) mmapSize(size int) (int, error) {
// init creates a new database file and initializes its meta pages.
func (db *DB) init() error {
// Set the page size to the OS page size.
db.pageSize = os.Getpagesize()
// Create two meta pages on a buffer.
buf := make([]byte, db.pageSize*4)
for i := 0; i < 2; i++ {
@ -526,21 +580,36 @@ func (db *DB) beginRWTx() (*Tx, error) {
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
db.freePages()
return t, nil
}
// Free any pages associated with closed read-only transactions.
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
// freePages releases any pages associated with closed read-only transactions.
func (db *DB) freePages() {
// Free all pending pages prior to earliest open transaction.
sort.Sort(txsById(db.txs))
minid := txid(0xFFFFFFFFFFFFFFFF)
if len(db.txs) > 0 {
minid = db.txs[0].meta.txid
}
if minid > 0 {
db.freelist.release(minid - 1)
}
return t, nil
// Release unused txid extents.
for _, t := range db.txs {
db.freelist.releaseRange(minid, t.meta.txid-1)
minid = t.meta.txid + 1
}
db.freelist.releaseRange(minid, txid(0xFFFFFFFFFFFFFFFF))
// Any page both allocated and freed in an extent is safe to release.
}
type txsById []*Tx
func (t txsById) Len() int { return len(t) }
func (t txsById) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t txsById) Less(i, j int) bool { return t[i].meta.txid < t[j].meta.txid }
// removeTx removes a transaction from the database.
func (db *DB) removeTx(tx *Tx) {
// Release the read lock on the mmap.
@ -552,7 +621,10 @@ func (db *DB) removeTx(tx *Tx) {
// Remove the transaction.
for i, t := range db.txs {
if t == tx {
db.txs = append(db.txs[:i], db.txs[i+1:]...)
last := len(db.txs) - 1
db.txs[i] = db.txs[last]
db.txs[last] = nil
db.txs = db.txs[:last]
break
}
}
@ -630,11 +702,7 @@ func (db *DB) View(fn func(*Tx) error) error {
return err
}
if err := t.Rollback(); err != nil {
return err
}
return nil
return t.Rollback()
}
// Batch calls fn as part of a batch. It behaves similar to Update,
@ -823,7 +891,7 @@ func (db *DB) meta() *meta {
}
// allocate returns a contiguous block of memory starting at a given page.
func (db *DB) allocate(count int) (*page, error) {
func (db *DB) allocate(txid txid, count int) (*page, error) {
// Allocate a temporary buffer for the page.
var buf []byte
if count == 1 {
@ -835,7 +903,7 @@ func (db *DB) allocate(count int) (*page, error) {
p.overflow = uint32(count - 1)
// Use pages from the freelist if they are available.
if p.id = db.freelist.allocate(count); p.id != 0 {
if p.id = db.freelist.allocate(txid, count); p.id != 0 {
return p, nil
}
@ -890,6 +958,38 @@ func (db *DB) IsReadOnly() bool {
return db.readOnly
}
func (db *DB) freepages() []pgid {
tx, err := db.beginTx()
defer func() {
err = tx.Rollback()
if err != nil {
panic("freepages: failed to rollback tx")
}
}()
if err != nil {
panic("freepages: failed to open read only tx")
}
reachable := make(map[pgid]*page)
nofreed := make(map[pgid]bool)
ech := make(chan error)
go func() {
for e := range ech {
panic(fmt.Sprintf("freepages: failed to get all reachable pages (%v)", e))
}
}()
tx.checkBucket(&tx.root, reachable, nofreed, ech)
close(ech)
var fids []pgid
for i := pgid(2); i < db.meta().pgid; i++ {
if _, ok := reachable[i]; !ok {
fids = append(fids, i)
}
}
return fids
}
// Options represents the options that can be set when opening a database.
type Options struct {
// Timeout is the amount of time to wait to obtain a file lock.
@ -900,6 +1000,10 @@ type Options struct {
// Sets the DB.NoGrowSync flag before memory mapping the file.
NoGrowSync bool
// Do not sync freelist to disk. This improves the database write performance
// under normal operation, but requires a full database re-sync during recovery.
NoFreelistSync bool
// Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to
// grab a shared lock (UNIX).
ReadOnly bool
@ -916,6 +1020,14 @@ type Options struct {
// If initialMmapSize is smaller than the previous database size,
// it takes no effect.
InitialMmapSize int
// PageSize overrides the default OS page size.
PageSize int
// NoSync sets the initial value of DB.NoSync. Normally this can just be
// set directly on the DB itself when returned from Open(), but this option
// is useful in APIs which expose Options but not the underlying DB.
NoSync bool
}
// DefaultOptions represent the options used if nil options are passed into Open().
@ -952,15 +1064,11 @@ func (s *Stats) Sub(other *Stats) Stats {
diff.PendingPageN = s.PendingPageN
diff.FreeAlloc = s.FreeAlloc
diff.FreelistInuse = s.FreelistInuse
diff.TxN = other.TxN - s.TxN
diff.TxN = s.TxN - other.TxN
diff.TxStats = s.TxStats.Sub(&other.TxStats)
return diff
}
func (s *Stats) add(other *Stats) {
s.TxStats.add(&other.TxStats)
}
type Info struct {
Data uintptr
PageSize int
@ -999,7 +1107,8 @@ func (m *meta) copy(dest *meta) {
func (m *meta) write(p *page) {
if m.root.root >= m.pgid {
panic(fmt.Sprintf("root bucket pgid (%d) above high water mark (%d)", m.root.root, m.pgid))
} else if m.freelist >= m.pgid {
} else if m.freelist >= m.pgid && m.freelist != pgidNoFreelist {
// TODO: reject pgidNoFreeList if !NoFreelistSync
panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid))
}
@ -1026,11 +1135,3 @@ func _assert(condition bool, msg string, v ...interface{}) {
panic(fmt.Sprintf("assertion failed: "+msg, v...))
}
}
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
func printstack() {
stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n")
fmt.Fprintln(os.Stderr, stack)
}

View File

@ -6,25 +6,40 @@ import (
"unsafe"
)
// txPending holds a list of pgids and corresponding allocation txns
// that are pending to be freed.
type txPending struct {
ids []pgid
alloctx []txid // txids allocating the ids
lastReleaseBegin txid // beginning txid of last matching releaseRange
}
// freelist represents a list of all pages that are available for allocation.
// It also tracks pages that have been freed but are still in use by open transactions.
type freelist struct {
ids []pgid // all free and available free page ids.
pending map[txid][]pgid // mapping of soon-to-be free page ids by tx.
cache map[pgid]bool // fast lookup of all free and pending page ids.
ids []pgid // all free and available free page ids.
allocs map[pgid]txid // mapping of txid that allocated a pgid.
pending map[txid]*txPending // mapping of soon-to-be free page ids by tx.
cache map[pgid]bool // fast lookup of all free and pending page ids.
}
// newFreelist returns an empty, initialized freelist.
func newFreelist() *freelist {
return &freelist{
pending: make(map[txid][]pgid),
allocs: make(map[pgid]txid),
pending: make(map[txid]*txPending),
cache: make(map[pgid]bool),
}
}
// size returns the size of the page after serialization.
func (f *freelist) size() int {
return pageHeaderSize + (int(unsafe.Sizeof(pgid(0))) * f.count())
n := f.count()
if n >= 0xFFFF {
// The first element will be used to store the count. See freelist.write.
n++
}
return pageHeaderSize + (int(unsafe.Sizeof(pgid(0))) * n)
}
// count returns count of pages on the freelist
@ -40,27 +55,26 @@ func (f *freelist) free_count() int {
// pending_count returns count of pending pages
func (f *freelist) pending_count() int {
var count int
for _, list := range f.pending {
count += len(list)
for _, txp := range f.pending {
count += len(txp.ids)
}
return count
}
// all returns a list of all free ids and all pending ids in one sorted list.
func (f *freelist) all() []pgid {
m := make(pgids, 0)
for _, list := range f.pending {
m = append(m, list...)
// copyall copies into dst a list of all free ids and all pending ids in one sorted list.
// f.count returns the minimum length required for dst.
func (f *freelist) copyall(dst []pgid) {
m := make(pgids, 0, f.pending_count())
for _, txp := range f.pending {
m = append(m, txp.ids...)
}
sort.Sort(m)
return pgids(f.ids).merge(m)
mergepgids(dst, f.ids, m)
}
// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(n int) pgid {
func (f *freelist) allocate(txid txid, n int) pgid {
if len(f.ids) == 0 {
return 0
}
@ -93,7 +107,7 @@ func (f *freelist) allocate(n int) pgid {
for i := pgid(0); i < pgid(n); i++ {
delete(f.cache, initial+i)
}
f.allocs[initial] = txid
return initial
}
@ -110,28 +124,73 @@ func (f *freelist) free(txid txid, p *page) {
}
// Free page and all its overflow pages.
var ids = f.pending[txid]
txp := f.pending[txid]
if txp == nil {
txp = &txPending{}
f.pending[txid] = txp
}
allocTxid, ok := f.allocs[p.id]
if ok {
delete(f.allocs, p.id)
} else if (p.flags & (freelistPageFlag | metaPageFlag)) != 0 {
// Safe to claim txid as allocating since these types are private to txid.
allocTxid = txid
}
for id := p.id; id <= p.id+pgid(p.overflow); id++ {
// Verify that page is not already free.
if f.cache[id] {
panic(fmt.Sprintf("page %d already freed", id))
}
// Add to the freelist and cache.
ids = append(ids, id)
txp.ids = append(txp.ids, id)
txp.alloctx = append(txp.alloctx, allocTxid)
f.cache[id] = true
}
f.pending[txid] = ids
}
// release moves all page ids for a transaction id (or older) to the freelist.
func (f *freelist) release(txid txid) {
m := make(pgids, 0)
for tid, ids := range f.pending {
for tid, txp := range f.pending {
if tid <= txid {
// Move transaction's pending pages to the available freelist.
// Don't remove from the cache since the page is still free.
m = append(m, ids...)
m = append(m, txp.ids...)
delete(f.pending, tid)
}
}
sort.Sort(m)
f.ids = pgids(f.ids).merge(m)
}
// releaseRange moves pending pages allocated within an extent [begin,end] to the free list.
func (f *freelist) releaseRange(begin, end txid) {
if begin > end {
return
}
var m pgids
for tid, txp := range f.pending {
if tid < begin || tid > end {
continue
}
// Don't recompute freed pages if ranges haven't updated.
if txp.lastReleaseBegin == begin {
continue
}
for i := 0; i < len(txp.ids); i++ {
if atx := txp.alloctx[i]; atx < begin || atx > end {
continue
}
m = append(m, txp.ids[i])
txp.ids[i] = txp.ids[len(txp.ids)-1]
txp.ids = txp.ids[:len(txp.ids)-1]
txp.alloctx[i] = txp.alloctx[len(txp.alloctx)-1]
txp.alloctx = txp.alloctx[:len(txp.alloctx)-1]
i--
}
txp.lastReleaseBegin = begin
if len(txp.ids) == 0 {
delete(f.pending, tid)
}
}
@ -142,12 +201,29 @@ func (f *freelist) release(txid txid) {
// rollback removes the pages from a given pending tx.
func (f *freelist) rollback(txid txid) {
// Remove page ids from cache.
for _, id := range f.pending[txid] {
delete(f.cache, id)
txp := f.pending[txid]
if txp == nil {
return
}
// Remove pages from pending list.
var m pgids
for i, pgid := range txp.ids {
delete(f.cache, pgid)
tx := txp.alloctx[i]
if tx == 0 {
continue
}
if tx != txid {
// Pending free aborted; restore page back to alloc list.
f.allocs[pgid] = tx
} else {
// Freed page was allocated by this txn; OK to throw away.
m = append(m, pgid)
}
}
// Remove pages from pending list and mark as free if allocated by txid.
delete(f.pending, txid)
sort.Sort(m)
f.ids = pgids(f.ids).merge(m)
}
// freed returns whether a given page is in the free list.
@ -169,7 +245,7 @@ func (f *freelist) read(p *page) {
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx : idx+count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)
@ -181,27 +257,33 @@ func (f *freelist) read(p *page) {
f.reindex()
}
// read initializes the freelist from a given list of ids.
func (f *freelist) readIDs(ids []pgid) {
f.ids = ids
f.reindex()
}
// write writes the page ids onto a freelist page. All free and pending ids are
// saved to disk since in the event of a program crash, all pending ids will
// become free.
func (f *freelist) write(p *page) error {
// Combine the old free pgids and pgids waiting on an open transaction.
ids := f.all()
// Update the header flag.
p.flags |= freelistPageFlag
// The page.count can only hold up to 64k elements so if we overflow that
// number then we handle it by putting the size in the first element.
if len(ids) == 0 {
p.count = uint16(len(ids))
} else if len(ids) < 0xFFFF {
p.count = uint16(len(ids))
copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:], ids)
lenids := f.count()
if lenids == 0 {
p.count = uint16(lenids)
} else if lenids < 0xFFFF {
p.count = uint16(lenids)
f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:])
} else {
p.count = 0xFFFF
((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pgid(len(ids))
copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:], ids)
((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pgid(lenids)
f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:])
}
return nil
@ -213,8 +295,8 @@ func (f *freelist) reload(p *page) {
// Build a cache of only pending pages.
pcache := make(map[pgid]bool)
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
for _, txp := range f.pending {
for _, pendingID := range txp.ids {
pcache[pendingID] = true
}
}
@ -236,12 +318,12 @@ func (f *freelist) reload(p *page) {
// reindex rebuilds the free cache based on available and pending free lists.
func (f *freelist) reindex() {
f.cache = make(map[pgid]bool)
f.cache = make(map[pgid]bool, len(f.ids))
for _, id := range f.ids {
f.cache[id] = true
}
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
for _, txp := range f.pending {
for _, pendingID := range txp.ids {
f.cache[pendingID] = true
}
}

View File

@ -365,7 +365,7 @@ func (n *node) spill() error {
}
// Allocate contiguous space for the node.
p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
p, err := tx.allocate((node.size() + tx.db.pageSize - 1) / tx.db.pageSize)
if err != nil {
return err
}

View File

@ -145,12 +145,33 @@ func (a pgids) merge(b pgids) pgids {
// Return the opposite slice if one is nil.
if len(a) == 0 {
return b
} else if len(b) == 0 {
}
if len(b) == 0 {
return a
}
merged := make(pgids, len(a)+len(b))
mergepgids(merged, a, b)
return merged
}
// Create a list to hold all elements from both lists.
merged := make(pgids, 0, len(a)+len(b))
// mergepgids copies the sorted union of a and b into dst.
// If dst is too small, it panics.
func mergepgids(dst, a, b pgids) {
if len(dst) < len(a)+len(b) {
panic(fmt.Errorf("mergepgids bad len %d < %d + %d", len(dst), len(a), len(b)))
}
// Copy in the opposite slice if one is nil.
if len(a) == 0 {
copy(dst, b)
return
}
if len(b) == 0 {
copy(dst, a)
return
}
// Merged will hold all elements from both lists.
merged := dst[:0]
// Assign lead to the slice with a lower starting value, follow to the higher value.
lead, follow := a, b
@ -172,7 +193,5 @@ func (a pgids) merge(b pgids) pgids {
}
// Append what's left in follow.
merged = append(merged, follow...)
return merged
_ = append(merged, follow...)
}

View File

@ -126,10 +126,7 @@ func (tx *Tx) DeleteBucket(name []byte) error {
// the error is returned to the caller.
func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
return tx.root.ForEach(func(k, v []byte) error {
if err := fn(k, tx.root.Bucket(k)); err != nil {
return err
}
return nil
return fn(k, tx.root.Bucket(k))
})
}
@ -169,28 +166,18 @@ func (tx *Tx) Commit() error {
// Free the old root bucket.
tx.meta.root.root = tx.root.root
opgid := tx.meta.pgid
// Free the freelist and allocate new pages for it. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.rollback()
return err
// Free the old freelist because commit writes out a fresh freelist.
if tx.meta.freelist != pgidNoFreelist {
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
}
if err := tx.db.freelist.write(p); err != nil {
tx.rollback()
return err
}
tx.meta.freelist = p.id
// If the high water mark has moved up then attempt to grow the database.
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
if !tx.db.NoFreelistSync {
err := tx.commitFreelist()
if err != nil {
return err
}
} else {
tx.meta.freelist = pgidNoFreelist
}
// Write dirty pages to disk.
@ -235,6 +222,31 @@ func (tx *Tx) Commit() error {
return nil
}
func (tx *Tx) commitFreelist() error {
// Allocate new pages for the new free list. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
opgid := tx.meta.pgid
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.rollback()
return err
}
if err := tx.db.freelist.write(p); err != nil {
tx.rollback()
return err
}
tx.meta.freelist = p.id
// If the high water mark has moved up then attempt to grow the database.
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
return err
}
}
return nil
}
// Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error {
@ -305,7 +317,11 @@ func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
if err != nil {
return 0, err
}
defer func() { _ = f.Close() }()
defer func() {
if cerr := f.Close(); err == nil {
err = cerr
}
}()
// Generate a meta page. We use the same page data for both meta pages.
buf := make([]byte, tx.db.pageSize)
@ -333,7 +349,7 @@ func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
}
// Move past the meta pages in the file.
if _, err := f.Seek(int64(tx.db.pageSize*2), os.SEEK_SET); err != nil {
if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil {
return n, fmt.Errorf("seek: %s", err)
}
@ -344,7 +360,7 @@ func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
return n, f.Close()
return n, nil
}
// CopyFile copies the entire database to file at the given path.
@ -379,9 +395,14 @@ func (tx *Tx) Check() <-chan error {
}
func (tx *Tx) check(ch chan error) {
// Force loading free list if opened in ReadOnly mode.
tx.db.loadFreelist()
// Check if any pages are double freed.
freed := make(map[pgid]bool)
for _, id := range tx.db.freelist.all() {
all := make([]pgid, tx.db.freelist.count())
tx.db.freelist.copyall(all)
for _, id := range all {
if freed[id] {
ch <- fmt.Errorf("page %d: already freed", id)
}
@ -392,8 +413,10 @@ func (tx *Tx) check(ch chan error) {
reachable := make(map[pgid]*page)
reachable[0] = tx.page(0) // meta0
reachable[1] = tx.page(1) // meta1
for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
if tx.meta.freelist != pgidNoFreelist {
for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
}
}
// Recursively check buckets.
@ -451,7 +474,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo
// allocate returns a contiguous block of memory starting at a given page.
func (tx *Tx) allocate(count int) (*page, error) {
p, err := tx.db.allocate(count)
p, err := tx.db.allocate(tx.meta.txid, count)
if err != nil {
return nil, err
}

View File

@ -27,7 +27,7 @@ import (
"reflect"
"strings"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"

10
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: ca3c895fa60c9ca9f53408202fb7643705f9960212d342967ed0da8e93606cc4
updated: 2017-01-18T10:26:48.990115455-08:00
hash: f93ce836ad95fff4294b4b774ef1465d243ab4d6b20d96f27fabf9f003e71332
updated: 2017-11-21T11:33:17.843967822-08:00
imports:
- name: github.com/beorn7/perks
version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
@ -7,10 +7,10 @@ imports:
- quantile
- name: github.com/bgentry/speakeasy
version: 36e9cfdd690967f4f690c6edcc9ffacd006014a0
- name: github.com/boltdb/bolt
version: 583e8937c61f1af6513608ccc75c97b6abdf4ff9
- name: github.com/cockroachdb/cmux
version: 112f0506e7743d64a6eb8fedbcff13d9979bbf92
- name: github.com/coreos/bbolt
version: 3c6cbfb299c11444eb2f8c9d48f0d2ce09157423
- name: github.com/coreos/go-semver
version: 568e959cd89871e61434c1143528d9162da89ef2
subpackages:
@ -74,7 +74,7 @@ imports:
subpackages:
- prometheus
- name: github.com/prometheus/client_model
version: fa8ad6fec33561be4280a8f0514318c79d7f6cb6
version: 6f3806018612930941127f2a7c6c453ba2c527d2
subpackages:
- go
- name: github.com/prometheus/common

View File

@ -2,8 +2,8 @@ package: github.com/coreos/etcd
import:
- package: github.com/bgentry/speakeasy
version: 36e9cfdd690967f4f690c6edcc9ffacd006014a0
- package: github.com/boltdb/bolt
version: v1.3.0
- package: github.com/coreos/bbolt
version: v1.3.1-coreos.3
- package: github.com/cockroachdb/cmux
version: 112f0506e7743d64a6eb8fedbcff13d9979bbf92
- package: github.com/coreos/go-semver
@ -48,9 +48,21 @@ import:
- package: github.com/olekukonko/tablewriter
version: cca8bbc0798408af109aaaa239cbd2634846b340
- package: github.com/prometheus/client_golang
version: v0.8.0
version: c5b7fccd204277076155f10851dad72b76a49317
subpackages:
- prometheus
- package: github.com/prometheus/client_model
version: 6f3806018612930941127f2a7c6c453ba2c527d2
subpackages:
- go
- package: github.com/prometheus/common
version: 195bde7883f7c39ea62b0d92ab7359b5327065cb
subpackages:
- expfmt
- internal/bitbucket.org/ww/goautoneg
- model
- package: github.com/prometheus/procfs
version: fcdb11ccb4389efb1b210b7ffb623ab71c5fdd60
- package: github.com/spf13/cobra
version: 1c44ec8d3f1552cac48999f9306da23c4d8a288b
- package: github.com/spf13/pflag
@ -102,4 +114,22 @@ import:
subpackages:
- assert
- package: github.com/karlseguin/ccache
version: v2.0.2
version: a2d62155777b39595c825ed3824279e642a5db3c
- package: github.com/beorn7/perks
version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9
- package: github.com/cpuguy83/go-md2man
version: a65d4d2de4d5f7c74868dfa9b202a3c8be315aaa
- package: github.com/matttproud/golang_protobuf_extensions
version: c12348ce28de40eed0136aa2b644d0ee0650e56c
subpackages:
- pbutil
- package: github.com/russross/blackfriday
version: 5f33e7b7878355cd2b7e6b8eefc48a5472c69f70
- package: github.com/mattn/go-runewidth
version: 737072b4e32b7a5018b4a7125da8d12de90e8045
- package: github.com/shurcooL/sanitized_anchor_name
version: 1dba4b3954bc059efc3991ec364f9f9a35f597d2
- package: golang.org/x/sys
version: 478fcf54317e52ab69f40bb4c7a1520288d7f7ea
subpackages:
- unix

View File

@ -25,7 +25,7 @@ import (
"sync/atomic"
"time"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
"github.com/coreos/pkg/capnslog"
)

View File

@ -21,7 +21,7 @@ import (
"testing"
"time"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
)
func TestBackendClose(t *testing.T) {

View File

@ -20,7 +20,7 @@ import (
"sync/atomic"
"time"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
)
type BatchTx interface {

View File

@ -19,7 +19,7 @@ import (
"testing"
"time"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
)
func TestBatchTxPut(t *testing.T) {

View File

@ -16,6 +16,6 @@
package backend
import "github.com/boltdb/bolt"
import bolt "github.com/coreos/bbolt"
var boltOpenOptions *bolt.Options = nil

View File

@ -17,7 +17,7 @@ package backend
import (
"syscall"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
)
// syscall.MAP_POPULATE on linux 2.6.23+ does sequential read-ahead

View File

@ -18,7 +18,7 @@ import (
"fmt"
"path/filepath"
"github.com/boltdb/bolt"
bolt "github.com/coreos/bbolt"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
)