From 558640d91eb2d00fadb2fbb5879263f42cf460b9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 2 Mar 2016 12:00:32 -0800 Subject: [PATCH] backend: support shrink db --- storage/backend/backend.go | 119 +++++++++++++++++++++++++++++++- storage/backend/backend_test.go | 42 +++++++++++ storage/backend/batch_tx.go | 2 + storage/kvstore_test.go | 1 + 4 files changed, 163 insertions(+), 1 deletion(-) diff --git a/storage/backend/backend.go b/storage/backend/backend.go index bf6f3b35e..155cdac74 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -22,6 +22,7 @@ import ( "log" "os" "path" + "sync" "sync/atomic" "time" @@ -32,6 +33,8 @@ var ( defaultBatchLimit = 10000 defaultBatchInterval = 100 * time.Millisecond + defragLimit = 10000 + // InitialMmapSize is the initial size of the mmapped region. Setting this larger than // the potential max db size can prevent writer from blocking reader. // This only works for linux. @@ -44,6 +47,7 @@ type Backend interface { Hash() (uint32, error) // Size returns the current size of the backend. Size() int64 + Defrag() error ForceCommit() Close() error } @@ -58,6 +62,7 @@ type Snapshot interface { } type backend struct { + mu sync.RWMutex db *bolt.DB batchInterval time.Duration @@ -114,9 +119,12 @@ func (b *backend) ForceCommit() { func (b *backend) Snapshot() Snapshot { b.batchTx.Commit() + + b.mu.RLock() + defer b.mu.RUnlock() tx, err := b.db.Begin(false) if err != nil { - log.Fatalf("storage: cannot begin tx (%s)", err) + log.Fatalf("backend: cannot begin tx (%s)", err) } return &snapshot{tx} } @@ -124,6 +132,8 @@ func (b *backend) Snapshot() Snapshot { func (b *backend) Hash() (uint32, error) { h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + b.mu.RLock() + defer b.mu.RUnlock() err := b.db.View(func(tx *bolt.Tx) error { c := tx.Cursor() for next, _ := c.First(); next != nil; next, _ = c.Next() { @@ -177,6 +187,113 @@ func (b *backend) Commits() int64 { return atomic.LoadInt64(&b.commits) } +func (b *backend) Defrag() error { + // TODO: make this non-blocking? + // lock batchTx to ensure nobody is using previous tx, and then + // close previous ongoing tx. + b.batchTx.Lock() + defer b.batchTx.Unlock() + + // lock database after lock tx to avoid deadlock. + b.mu.Lock() + defer b.mu.Unlock() + + b.batchTx.commit(true) + b.batchTx.tx = nil + + tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) + if err != nil { + return err + } + + err = defragdb(b.db, tmpdb, defragLimit) + + if err != nil { + tmpdb.Close() + os.RemoveAll(tmpdb.Path()) + return err + } + + dbp := b.db.Path() + tdbp := tmpdb.Path() + + err = b.db.Close() + if err != nil { + log.Fatalf("backend: cannot close database (%s)", err) + } + err = tmpdb.Close() + if err != nil { + log.Fatalf("backend: cannot close database (%s)", err) + } + err = os.Rename(tdbp, dbp) + if err != nil { + log.Fatalf("backend: cannot rename database (%s)", err) + } + + b.db, err = bolt.Open(dbp, 0600, boltOpenOptions) + if err != nil { + log.Panicf("backend: cannot open database at %s (%v)", dbp, err) + } + b.batchTx.tx, err = b.db.Begin(true) + if err != nil { + log.Fatalf("backend: cannot begin tx (%s)", err) + } + + return nil +} + +func defragdb(odb, tmpdb *bolt.DB, limit int) error { + // open a tx on tmpdb for writes + tmptx, err := tmpdb.Begin(true) + if err != nil { + return err + } + + // open a tx on old db for read + tx, err := odb.Begin(false) + if err != nil { + return err + } + defer tx.Rollback() + + c := tx.Cursor() + + count := 0 + for next, _ := c.First(); next != nil; next, _ = c.Next() { + b := tx.Bucket(next) + if b == nil { + return fmt.Errorf("backend: cannot defrag bucket %s", string(next)) + } + + tmpb, berr := tmptx.CreateBucketIfNotExists(next) + if berr != nil { + return berr + } + + b.ForEach(func(k, v []byte) error { + count++ + if count > limit { + err = tmptx.Commit() + if err != nil { + return err + } + tmptx, err = tmpdb.Begin(true) + if err != nil { + return err + } + tmpb = tmptx.Bucket(next) + } + err = tmpb.Put(k, v) + if err != nil { + return err + } + return nil + }) + } + + return tmptx.Commit() +} + // NewTmpBackend creates a backend implementation for testing. func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") diff --git a/storage/backend/backend_test.go b/storage/backend/backend_test.go index 3482cb7af..623c6ca6a 100644 --- a/storage/backend/backend_test.go +++ b/storage/backend/backend_test.go @@ -15,6 +15,7 @@ package backend import ( + "fmt" "io/ioutil" "os" "testing" @@ -115,6 +116,47 @@ func TestBackendBatchIntervalCommit(t *testing.T) { }) } +func TestBackendDefrag(t *testing.T) { + b, tmpPath := NewDefaultTmpBackend() + defer cleanup(b, tmpPath) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("test")) + for i := 0; i < defragLimit+100; i++ { + tx.UnsafePut([]byte("test"), []byte(fmt.Sprintf("foo_%d", i)), []byte("bar")) + } + tx.Unlock() + b.ForceCommit() + + // shrink and check hash + oh, err := b.Hash() + if err != nil { + t.Fatal(err) + } + + err = b.Defrag() + if err != nil { + t.Fatal(err) + } + + nh, err := b.Hash() + if err != nil { + t.Fatal(err) + } + if oh != nh { + t.Errorf("hash = %v, want %v", nh, oh) + } + + // try put more keys after shrink. + tx = b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafePut([]byte("test"), []byte("more"), []byte("bar")) + tx.Unlock() + b.ForceCommit() +} + func cleanup(b Backend, path string) { b.Close() os.Remove(path) diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index d4439a878..be2cf4cb9 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -149,6 +149,8 @@ func (t *batchTx) commit(stop bool) { return } + t.backend.mu.RLock() + defer t.backend.mu.RUnlock() // begin a new tx t.tx, err = t.backend.db.Begin(true) if err != nil { diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index a6dce9a11..e9d449ec7 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -593,6 +593,7 @@ func (b *fakeBackend) Hash() (uint32, error) { return 0, nil } func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } func (b *fakeBackend) ForceCommit() {} +func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } type indexGetResp struct {