diff --git a/etcdserver/snapshot_store.go b/etcdserver/snapshot_store.go index ac8ae5364..8f0e752b6 100644 --- a/etcdserver/snapshot_store.go +++ b/etcdserver/snapshot_store.go @@ -28,16 +28,17 @@ import ( ) type snapshot struct { - r raftpb.Snapshot + r raftpb.Snapshot + kv dstorage.Snapshot } func (s *snapshot) raft() raftpb.Snapshot { return s.r } -func (s *snapshot) size() uint64 { return 0 } +func (s *snapshot) size() int64 { return s.kv.Size() } -func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return 0, nil } +func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return s.kv.WriteTo(w) } -func (s *snapshot) close() {} +func (s *snapshot) close() error { return s.kv.Close() } type snapshotStore struct { // dir to save snapshot data @@ -72,10 +73,12 @@ func (ss *snapshotStore) getSnap() (*snapshot, error) { // ask to generate v2 snapshot ss.reqsnapc <- struct{}{} - // TODO: generate v3 snapshot at here + // generate KV snapshot + kvsnap := ss.kv.Snapshot() raftsnap := <-ss.raftsnapc ss.snap = &snapshot{ - r: raftsnap, + r: raftsnap, + kv: kvsnap, } return ss.snap, nil } diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 1ce9907af..d4f054b32 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -26,12 +26,21 @@ import ( type Backend interface { BatchTx() BatchTx - Snapshot(w io.Writer) (n int64, err error) + Snapshot() Snapshot Hash() (uint32, error) ForceCommit() Close() error } +type Snapshot interface { + // Size gets the size of the snapshot. + Size() int64 + // WriteTo writes the snapshot into the given writter. + WriteTo(w io.Writer) (n int64, err error) + // Close closes the snapshot. + Close() error +} + type backend struct { db *bolt.DB @@ -79,12 +88,12 @@ func (b *backend) ForceCommit() { b.batchTx.Commit() } -func (b *backend) Snapshot(w io.Writer) (n int64, err error) { - b.db.View(func(tx *bolt.Tx) error { - n, err = tx.WriteTo(w) - return nil - }) - return n, err +func (b *backend) Snapshot() Snapshot { + tx, err := b.db.Begin(false) + if err != nil { + log.Fatalf("storage: cannot begin tx (%s)", err) + } + return &snapshot{tx} } func (b *backend) Hash() (uint32, error) { @@ -133,3 +142,9 @@ func (b *backend) Close() error { <-b.donec return b.db.Close() } + +type snapshot struct { + *bolt.Tx +} + +func (s *snapshot) Close() error { return s.Tx.Rollback() } diff --git a/storage/backend/backend_test.go b/storage/backend/backend_test.go index 96ed973a1..9a3fcb112 100644 --- a/storage/backend/backend_test.go +++ b/storage/backend/backend_test.go @@ -72,8 +72,9 @@ func TestBackendSnapshot(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = b.Snapshot(f) - if err != nil { + snap := b.Snapshot() + defer snap.Close() + if _, err := snap.WriteTo(f); err != nil { t.Fatal(err) } f.Close() diff --git a/storage/kv.go b/storage/kv.go index 3e026251b..e8250f081 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -15,8 +15,7 @@ package storage import ( - "io" - + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) @@ -24,6 +23,8 @@ import ( // wait for the work to stop. type CancelFunc func() +type Snapshot backend.Snapshot + type KV interface { // Rev returns the current revision of the KV. Rev() int64 @@ -65,8 +66,8 @@ type KV interface { // This method is designed for consistency checking purpose. Hash() (uint32, error) - // Write a snapshot to the given io writer - Snapshot(w io.Writer) (int64, error) + // Snapshot snapshots the full KV store. + Snapshot() Snapshot Restore() error Close() error diff --git a/storage/kv_test.go b/storage/kv_test.go index bc7e12a91..cfb4bb6e8 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -706,7 +706,9 @@ func TestKVSnapshot(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = s.Snapshot(f) + snap := s.Snapshot() + defer snap.Close() + _, err = snap.WriteTo(f) if err != nil { t.Fatal(err) } diff --git a/storage/kvstore.go b/storage/kvstore.go index 8e2a530d4..628348801 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -16,7 +16,6 @@ package storage import ( "errors" - "io" "log" "math" "math/rand" @@ -292,9 +291,9 @@ func (s *store) Hash() (uint32, error) { return s.b.Hash() } -func (s *store) Snapshot(w io.Writer) (int64, error) { +func (s *store) Snapshot() Snapshot { s.b.ForceCommit() - return s.b.Snapshot(w) + return s.b.Snapshot() } func (s *store) Restore() error { diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index e74a5c2b8..c1bc0d195 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -17,8 +17,6 @@ package storage import ( "crypto/rand" "encoding/binary" - "errors" - "io" "math" "os" "reflect" @@ -721,11 +719,11 @@ type fakeBackend struct { tx *fakeBatchTx } -func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } -func (b *fakeBackend) Hash() (uint32, error) { return 0, nil } -func (b *fakeBackend) Snapshot(w io.Writer) (n int64, err error) { return 0, errors.New("unsupported") } -func (b *fakeBackend) ForceCommit() {} -func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } +func (b *fakeBackend) Hash() (uint32, error) { return 0, nil } +func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } +func (b *fakeBackend) ForceCommit() {} +func (b *fakeBackend) Close() error { return nil } type indexGetResp struct { rev revision