Merge pull request #4660 from xiang90/shrink_db

backend: support shrink db
This commit is contained in:
Xiang Li 2016-03-02 20:28:51 -08:00
commit 0eeb663754
4 changed files with 163 additions and 1 deletions

View File

@ -22,6 +22,7 @@ import (
"log"
"os"
"path"
"sync"
"sync/atomic"
"time"
@ -32,6 +33,8 @@ var (
defaultBatchLimit = 10000
defaultBatchInterval = 100 * time.Millisecond
defragLimit = 10000
// InitialMmapSize is the initial size of the mmapped region. Setting this larger than
// the potential max db size can prevent writer from blocking reader.
// This only works for linux.
@ -44,6 +47,7 @@ type Backend interface {
Hash() (uint32, error)
// Size returns the current size of the backend.
Size() int64
Defrag() error
ForceCommit()
Close() error
}
@ -58,6 +62,7 @@ type Snapshot interface {
}
type backend struct {
mu sync.RWMutex
db *bolt.DB
batchInterval time.Duration
@ -114,9 +119,12 @@ func (b *backend) ForceCommit() {
func (b *backend) Snapshot() Snapshot {
b.batchTx.Commit()
b.mu.RLock()
defer b.mu.RUnlock()
tx, err := b.db.Begin(false)
if err != nil {
log.Fatalf("storage: cannot begin tx (%s)", err)
log.Fatalf("backend: cannot begin tx (%s)", err)
}
return &snapshot{tx}
}
@ -124,6 +132,8 @@ func (b *backend) Snapshot() Snapshot {
func (b *backend) Hash() (uint32, error) {
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
b.mu.RLock()
defer b.mu.RUnlock()
err := b.db.View(func(tx *bolt.Tx) error {
c := tx.Cursor()
for next, _ := c.First(); next != nil; next, _ = c.Next() {
@ -177,6 +187,113 @@ func (b *backend) Commits() int64 {
return atomic.LoadInt64(&b.commits)
}
func (b *backend) Defrag() error {
// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.Lock()
defer b.batchTx.Unlock()
// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()
b.batchTx.commit(true)
b.batchTx.tx = nil
tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
if err != nil {
return err
}
err = defragdb(b.db, tmpdb, defragLimit)
if err != nil {
tmpdb.Close()
os.RemoveAll(tmpdb.Path())
return err
}
dbp := b.db.Path()
tdbp := tmpdb.Path()
err = b.db.Close()
if err != nil {
log.Fatalf("backend: cannot close database (%s)", err)
}
err = tmpdb.Close()
if err != nil {
log.Fatalf("backend: cannot close database (%s)", err)
}
err = os.Rename(tdbp, dbp)
if err != nil {
log.Fatalf("backend: cannot rename database (%s)", err)
}
b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
if err != nil {
log.Panicf("backend: cannot open database at %s (%v)", dbp, err)
}
b.batchTx.tx, err = b.db.Begin(true)
if err != nil {
log.Fatalf("backend: cannot begin tx (%s)", err)
}
return nil
}
func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// open a tx on tmpdb for writes
tmptx, err := tmpdb.Begin(true)
if err != nil {
return err
}
// open a tx on old db for read
tx, err := odb.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()
c := tx.Cursor()
count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
if b == nil {
return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
}
tmpb, berr := tmptx.CreateBucketIfNotExists(next)
if berr != nil {
return berr
}
b.ForEach(func(k, v []byte) error {
count++
if count > limit {
err = tmptx.Commit()
if err != nil {
return err
}
tmptx, err = tmpdb.Begin(true)
if err != nil {
return err
}
tmpb = tmptx.Bucket(next)
}
err = tmpb.Put(k, v)
if err != nil {
return err
}
return nil
})
}
return tmptx.Commit()
}
// NewTmpBackend creates a backend implementation for testing.
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")

View File

@ -15,6 +15,7 @@
package backend
import (
"fmt"
"io/ioutil"
"os"
"testing"
@ -115,6 +116,47 @@ func TestBackendBatchIntervalCommit(t *testing.T) {
})
}
func TestBackendDefrag(t *testing.T) {
b, tmpPath := NewDefaultTmpBackend()
defer cleanup(b, tmpPath)
tx := b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
for i := 0; i < defragLimit+100; i++ {
tx.UnsafePut([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
}
tx.Unlock()
b.ForceCommit()
// shrink and check hash
oh, err := b.Hash()
if err != nil {
t.Fatal(err)
}
err = b.Defrag()
if err != nil {
t.Fatal(err)
}
nh, err := b.Hash()
if err != nil {
t.Fatal(err)
}
if oh != nh {
t.Errorf("hash = %v, want %v", nh, oh)
}
// try put more keys after shrink.
tx = b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("test"))
tx.UnsafePut([]byte("test"), []byte("more"), []byte("bar"))
tx.Unlock()
b.ForceCommit()
}
func cleanup(b Backend, path string) {
b.Close()
os.Remove(path)

View File

@ -149,6 +149,8 @@ func (t *batchTx) commit(stop bool) {
return
}
t.backend.mu.RLock()
defer t.backend.mu.RUnlock()
// begin a new tx
t.tx, err = t.backend.db.Begin(true)
if err != nil {

View File

@ -593,6 +593,7 @@ func (b *fakeBackend) Hash() (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
type indexGetResp struct {