From ba9a46aa02fd8c1562a64e3f76c226a1a02a68dc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Jun 2015 09:26:56 -0700 Subject: [PATCH] storage: initial snapshot and restore Snapshot takes an io.Writer and writes the entire backend data to the given writer. Snapshot writes a consistent view and does not block other storage operations. Restore restores the in-memory states (index and book keeping) of the storage from the backend data. --- storage/backend/backend.go | 11 +++++ storage/index.go | 23 ++++++++++ storage/key_index.go | 36 ++++++++++++++++ storage/kv.go | 12 +++++- storage/kvstore.go | 88 +++++++++++++++++++++++++++++++++++--- storage/kvstore_test.go | 25 +++++++++++ storage/reversion.go | 4 ++ 7 files changed, 192 insertions(+), 7 deletions(-) diff --git a/storage/backend/backend.go b/storage/backend/backend.go index b2b85b63d..56b3bad8f 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -1,6 +1,7 @@ package backend import ( + "io" "log" "time" @@ -9,6 +10,7 @@ import ( type Backend interface { BatchTx() BatchTx + Snapshot(w io.Writer) (n int64, err error) ForceCommit() Close() error } @@ -60,6 +62,14 @@ func (b *backend) ForceCommit() { b.batchTx.Commit() } +func (b *backend) Snapshot(w io.Writer) (n int64, err error) { + b.db.View(func(tx *bolt.Tx) error { + n, err = tx.WriteTo(w) + return nil + }) + return n, err +} + func (b *backend) run() { defer close(b.donec) @@ -70,6 +80,7 @@ func (b *backend) run() { select { case <-time.After(b.batchInterval): case <-b.stopc: + b.batchTx.Commit() return } b.batchTx.Commit() diff --git a/storage/index.go b/storage/index.go index af8a40d27..1c576ca8e 100644 --- a/storage/index.go +++ b/storage/index.go @@ -13,6 +13,7 @@ type index interface { Put(key []byte, rev reversion) Tombstone(key []byte, rev reversion) error Compact(rev int64) map[reversion]struct{} + Equal(b index) bool } type treeIndex struct { @@ -130,3 +131,25 @@ func compactIndex(rev int64, available map[reversion]struct{}, emptyki *[]*keyIn return true } } + +func (a *treeIndex) Equal(bi index) bool { + b := bi.(*treeIndex) + + if a.tree.Len() != b.tree.Len() { + return false + } + + equal := true + + a.tree.Ascend(func(item btree.Item) bool { + aki := item.(*keyIndex) + bki := b.tree.Get(item).(*keyIndex) + if !aki.equal(bki) { + equal = false + return false + } + return true + }) + + return equal +} diff --git a/storage/key_index.go b/storage/key_index.go index e0235bbfd..af717f1e1 100644 --- a/storage/key_index.go +++ b/storage/key_index.go @@ -187,6 +187,25 @@ func (a *keyIndex) Less(b btree.Item) bool { return bytes.Compare(a.key, b.(*keyIndex).key) == -1 } +func (a *keyIndex) equal(b *keyIndex) bool { + if !bytes.Equal(a.key, b.key) { + return false + } + if a.rev != b.rev { + return false + } + if len(a.generations) != len(b.generations) { + return false + } + for i := range a.generations { + ag, bg := a.generations[i], b.generations[i] + if !ag.equal(bg) { + return false + } + } + return true +} + func (ki *keyIndex) String() string { var s string for _, g := range ki.generations { @@ -221,3 +240,20 @@ func (g *generation) walk(f func(rev reversion) bool) int { func (g *generation) String() string { return fmt.Sprintf("g: ver[%d], revs %#v\n", g.ver, g.revs) } + +func (a generation) equal(b generation) bool { + if a.ver != b.ver { + return false + } + if len(a.revs) != len(b.revs) { + return false + } + + for i := range a.revs { + ar, br := a.revs[i], b.revs[i] + if ar != br { + return false + } + } + return true +} diff --git a/storage/kv.go b/storage/kv.go index 436e64465..a041d1e05 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -1,6 +1,10 @@ package storage -import "github.com/coreos/etcd/storage/storagepb" +import ( + "io" + + "github.com/coreos/etcd/storage/storagepb" +) type KV interface { // Range gets the keys in the range at rangeRev. @@ -35,4 +39,10 @@ type KV interface { TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) Compact(rev int64) error + + // Write a snapshot to the given io writer + Snapshot(w io.Writer) (int64, error) + + Restore() error + Close() error } diff --git a/storage/kvstore.go b/storage/kvstore.go index 6d93ebd87..2bbdcd625 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -2,7 +2,9 @@ package storage import ( "errors" + "io" "log" + "math" "math/rand" "sync" "time" @@ -37,7 +39,7 @@ type store struct { tnxID int64 // tracks the current tnxID to verify tnx operations } -func newStore(path string) KV { +func newStore(path string) *store { s := &store{ b: backend.New(path, batchInterval, batchLimit), kvindex: newTreeIndex(), @@ -146,7 +148,7 @@ func (s *store) Compact(rev int64) error { s.compactMainRev = rev - rbytes := make([]byte, 8+1+8) + rbytes := newRevBytes() revToBytes(reversion{main: rev}, rbytes) tx := s.b.BatchTx() @@ -160,6 +162,80 @@ func (s *store) Compact(rev int64) error { return nil } +func (s *store) Snapshot(w io.Writer) (int64, error) { + s.b.ForceCommit() + return s.b.Snapshot(w) +} + +func (s *store) Restore() error { + s.mu.Lock() + defer s.mu.Unlock() + + min, max := newRevBytes(), newRevBytes() + revToBytes(reversion{}, min) + revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, max) + + // restore index + tx := s.b.BatchTx() + tx.Lock() + _, finishedCompactBytes := tx.UnsafeRange(keyBucketName, finishedCompactKeyName, nil, 0) + if len(finishedCompactBytes) != 0 { + s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main + log.Printf("storage: restore compact to %d", s.compactMainRev) + } + + // TODO: limit N to reduce max memory usage + keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) + for i, key := range keys { + e := &storagepb.Event{} + if err := e.Unmarshal(vals[i]); err != nil { + log.Fatalf("storage: cannot unmarshal event: %v", err) + } + + rev := bytesToRev(key) + + // restore index + switch e.Type { + case storagepb.PUT: + s.kvindex.Put(e.Kv.Key, rev) + case storagepb.DELETE: + s.kvindex.Tombstone(e.Kv.Key, rev) + default: + log.Panicf("storage: unexpected event type %s", e.Type) + } + + // update reversion + s.currentRev = rev + } + + _, scheduledCompactBytes := tx.UnsafeRange(keyBucketName, scheduledCompactKeyName, nil, 0) + if len(scheduledCompactBytes) != 0 { + scheduledCompact := bytesToRev(finishedCompactBytes[0]).main + if scheduledCompact > s.compactMainRev { + log.Printf("storage: resume scheduled compaction at %d", scheduledCompact) + go s.Compact(scheduledCompact) + } + } + + tx.Unlock() + + return nil +} + +func (s *store) Close() error { + return s.b.Close() +} + +func (a *store) Equal(b *store) bool { + if a.currentRev != b.currentRev { + return false + } + if a.compactMainRev != b.compactMainRev { + return false + } + return a.kvindex.Equal(b.kvindex) +} + // range is a keyword in Go, add Keys suffix. func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { if rangeRev <= 0 { @@ -186,7 +262,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage tx.Lock() defer tx.Unlock() for _, revpair := range revpairs { - revbytes := make([]byte, 8+1+8) + revbytes := newRevBytes() revToBytes(revpair, revbytes) _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) @@ -206,7 +282,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage } func (s *store) put(key, value []byte, rev int64) { - ibytes := make([]byte, 8+1+8) + ibytes := newRevBytes() revToBytes(reversion{main: rev, sub: s.currentRev.sub}, ibytes) event := storagepb.Event{ @@ -266,7 +342,7 @@ func (s *store) delete(key []byte, mainrev int64) bool { tx.Lock() defer tx.Unlock() - revbytes := make([]byte, 8+1+8) + revbytes := newRevBytes() revToBytes(rev, revbytes) _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) @@ -282,7 +358,7 @@ func (s *store) delete(key []byte, mainrev int64) bool { return false } - ibytes := make([]byte, 8+1+8) + ibytes := newRevBytes() revToBytes(reversion{main: mainrev, sub: s.currentRev.sub}, ibytes) event := storagepb.Event{ diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index d3ef5ab9c..87c48bde6 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -320,6 +320,31 @@ func TestCompaction(t *testing.T) { } } +// TODO: test more complicated cases: +// with unfinished compaction +// with removed keys +func TestRestore(t *testing.T) { + s0 := newStore("test") + defer os.Remove("test") + + s0.Put([]byte("foo"), []byte("bar")) + s0.Put([]byte("foo1"), []byte("bar1")) + s0.Put([]byte("foo2"), []byte("bar2")) + s0.Put([]byte("foo"), []byte("bar11")) + s0.Put([]byte("foo1"), []byte("bar12")) + s0.Put([]byte("foo2"), []byte("bar13")) + s0.Put([]byte("foo1"), []byte("bar14")) + + s0.Close() + + s1 := newStore("test") + s1.Restore() + + if !s0.Equal(s1) { + t.Errorf("not equal!") + } +} + func BenchmarkStorePut(b *testing.B) { s := newStore("test") defer os.Remove("test") diff --git a/storage/reversion.go b/storage/reversion.go index 581c713d0..bb346c743 100644 --- a/storage/reversion.go +++ b/storage/reversion.go @@ -7,6 +7,10 @@ type reversion struct { sub int64 } +func newRevBytes() []byte { + return make([]byte, 8+1+8) +} + func revToBytes(rev reversion, bytes []byte) { binary.BigEndian.PutUint64(bytes, uint64(rev.main)) bytes[8] = '_'