Merge pull request #3608 from yichengq/storage-snapshot

storage: update KV.Snapshot function
This commit is contained in:
Yicheng Qin 2015-10-03 15:32:14 -07:00
commit 581cc5cff4
7 changed files with 49 additions and 30 deletions

View File

@ -28,16 +28,17 @@ import (
) )
type snapshot struct { type snapshot struct {
r raftpb.Snapshot r raftpb.Snapshot
kv dstorage.Snapshot
} }
func (s *snapshot) raft() raftpb.Snapshot { return s.r } func (s *snapshot) raft() raftpb.Snapshot { return s.r }
func (s *snapshot) size() uint64 { return 0 } func (s *snapshot) size() int64 { return s.kv.Size() }
func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return 0, nil } func (s *snapshot) writeTo(w io.Writer) (n int64, err error) { return s.kv.WriteTo(w) }
func (s *snapshot) close() {} func (s *snapshot) close() error { return s.kv.Close() }
type snapshotStore struct { type snapshotStore struct {
// dir to save snapshot data // dir to save snapshot data
@ -72,10 +73,12 @@ func (ss *snapshotStore) getSnap() (*snapshot, error) {
// ask to generate v2 snapshot // ask to generate v2 snapshot
ss.reqsnapc <- struct{}{} ss.reqsnapc <- struct{}{}
// TODO: generate v3 snapshot at here // generate KV snapshot
kvsnap := ss.kv.Snapshot()
raftsnap := <-ss.raftsnapc raftsnap := <-ss.raftsnapc
ss.snap = &snapshot{ ss.snap = &snapshot{
r: raftsnap, r: raftsnap,
kv: kvsnap,
} }
return ss.snap, nil return ss.snap, nil
} }

View File

@ -26,12 +26,21 @@ import (
type Backend interface { type Backend interface {
BatchTx() BatchTx BatchTx() BatchTx
Snapshot(w io.Writer) (n int64, err error) Snapshot() Snapshot
Hash() (uint32, error) Hash() (uint32, error)
ForceCommit() ForceCommit()
Close() error Close() error
} }
type Snapshot interface {
// Size gets the size of the snapshot.
Size() int64
// WriteTo writes the snapshot into the given writter.
WriteTo(w io.Writer) (n int64, err error)
// Close closes the snapshot.
Close() error
}
type backend struct { type backend struct {
db *bolt.DB db *bolt.DB
@ -79,12 +88,12 @@ func (b *backend) ForceCommit() {
b.batchTx.Commit() b.batchTx.Commit()
} }
func (b *backend) Snapshot(w io.Writer) (n int64, err error) { func (b *backend) Snapshot() Snapshot {
b.db.View(func(tx *bolt.Tx) error { tx, err := b.db.Begin(false)
n, err = tx.WriteTo(w) if err != nil {
return nil log.Fatalf("storage: cannot begin tx (%s)", err)
}) }
return n, err return &snapshot{tx}
} }
func (b *backend) Hash() (uint32, error) { func (b *backend) Hash() (uint32, error) {
@ -133,3 +142,9 @@ func (b *backend) Close() error {
<-b.donec <-b.donec
return b.db.Close() return b.db.Close()
} }
type snapshot struct {
*bolt.Tx
}
func (s *snapshot) Close() error { return s.Tx.Rollback() }

View File

@ -72,8 +72,9 @@ func TestBackendSnapshot(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = b.Snapshot(f) snap := b.Snapshot()
if err != nil { defer snap.Close()
if _, err := snap.WriteTo(f); err != nil {
t.Fatal(err) t.Fatal(err)
} }
f.Close() f.Close()

View File

@ -15,8 +15,7 @@
package storage package storage
import ( import (
"io" "github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/storage/storagepb" "github.com/coreos/etcd/storage/storagepb"
) )
@ -24,6 +23,8 @@ import (
// wait for the work to stop. // wait for the work to stop.
type CancelFunc func() type CancelFunc func()
type Snapshot backend.Snapshot
type KV interface { type KV interface {
// Rev returns the current revision of the KV. // Rev returns the current revision of the KV.
Rev() int64 Rev() int64
@ -65,8 +66,8 @@ type KV interface {
// This method is designed for consistency checking purpose. // This method is designed for consistency checking purpose.
Hash() (uint32, error) Hash() (uint32, error)
// Write a snapshot to the given io writer // Snapshot snapshots the full KV store.
Snapshot(w io.Writer) (int64, error) Snapshot() Snapshot
Restore() error Restore() error
Close() error Close() error

View File

@ -706,7 +706,9 @@ func TestKVSnapshot(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = s.Snapshot(f) snap := s.Snapshot()
defer snap.Close()
_, err = snap.WriteTo(f)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -16,7 +16,6 @@ package storage
import ( import (
"errors" "errors"
"io"
"log" "log"
"math" "math"
"math/rand" "math/rand"
@ -292,9 +291,9 @@ func (s *store) Hash() (uint32, error) {
return s.b.Hash() return s.b.Hash()
} }
func (s *store) Snapshot(w io.Writer) (int64, error) { func (s *store) Snapshot() Snapshot {
s.b.ForceCommit() s.b.ForceCommit()
return s.b.Snapshot(w) return s.b.Snapshot()
} }
func (s *store) Restore() error { func (s *store) Restore() error {

View File

@ -17,8 +17,6 @@ package storage
import ( import (
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"errors"
"io"
"math" "math"
"os" "os"
"reflect" "reflect"
@ -721,11 +719,11 @@ type fakeBackend struct {
tx *fakeBatchTx tx *fakeBatchTx
} }
func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx } func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
func (b *fakeBackend) Hash() (uint32, error) { return 0, nil } func (b *fakeBackend) Hash() (uint32, error) { return 0, nil }
func (b *fakeBackend) Snapshot(w io.Writer) (n int64, err error) { return 0, errors.New("unsupported") } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Close() error { return nil } func (b *fakeBackend) Close() error { return nil }
type indexGetResp struct { type indexGetResp struct {
rev revision rev revision