From 5dd3f91903d548af8a3e06e9954d3563d883b369 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 5 Jan 2016 19:45:18 -0800 Subject: [PATCH] *: make backend outside kv KV and lease will share the same backend. Thus we need to make backend outside KV. --- etcdserver/server.go | 7 +- etcdserver/server_test.go | 10 +- storage/backend/backend.go | 26 +++++ storage/backend/backend_test.go | 21 +--- storage/backend/batch_tx_test.go | 13 +-- storage/consistent_watchable_store.go | 14 +-- storage/consistent_watchable_store_test.go | 16 ++- storage/kv_test.go | 108 +++++++++++---------- storage/kvstore.go | 14 +-- storage/kvstore_bench_test.go | 13 ++- storage/kvstore_compaction_test.go | 7 +- storage/kvstore_test.go | 16 +-- storage/watchable_store.go | 5 +- storage/watchable_store_bench_test.go | 17 +++- storage/watchable_store_test.go | 21 +++- storage/watcher_bench_test.go | 8 +- storage/watcher_test.go | 16 ++- tools/benchmark/cmd/storage.go | 4 +- 18 files changed, 204 insertions(+), 132 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 3ad9a6dd5..aec1f22e7 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -47,6 +47,7 @@ import ( "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/snap" dstorage "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/store" "github.com/coreos/etcd/version" "github.com/coreos/etcd/wal" @@ -358,7 +359,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } if cfg.V3demo { - srv.kv = dstorage.New(path.Join(cfg.SnapDir(), databaseFilename), &srv.consistIndex) + be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) + srv.kv = dstorage.New(be, &srv.consistIndex) if err := srv.kv.Restore(); err != nil { plog.Fatalf("v3 storage restore error: %v", err) } @@ -583,7 +585,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { plog.Panicf("rename snapshot file error: %v", err) } - newKV := dstorage.New(fn, &s.consistIndex) + newbe := backend.NewDefaultBackend(fn) + newKV := dstorage.New(newbe, &s.consistIndex) if err := newKV.Restore(); err != nil { plog.Panicf("restore KV error: %v", err) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a6743cc16..c89a6395d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -36,6 +36,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" dstorage "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/store" ) @@ -864,9 +865,12 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } - s.kv = dstorage.New( - path.Join(testdir, "testdb.db"), - &s.consistIndex) + be, tmpPath := backend.NewDefaultTmpBackend() + defer func() { + be.Close() + os.RemoveAll(tmpPath) + }() + s.kv = dstorage.New(be, &s.consistIndex) s.start() defer s.Stop() diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 608752452..9f5ba33a7 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -18,13 +18,21 @@ import ( "fmt" "hash/crc32" "io" + "io/ioutil" "log" + "os" + "path" "sync/atomic" "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt" ) +var ( + defaultBatchLimit = 10000 + defaultBatchInterval = 100 * time.Millisecond +) + type Backend interface { BatchTx() BatchTx Snapshot() Snapshot @@ -60,6 +68,10 @@ func New(path string, d time.Duration, limit int) Backend { return newBackend(path, d, limit) } +func NewDefaultBackend(path string) Backend { + return newBackend(path, defaultBatchInterval, defaultBatchLimit) +} + func newBackend(path string, d time.Duration, limit int) *backend { db, err := bolt.Open(path, 0600, boltOpenOptions) if err != nil { @@ -151,6 +163,20 @@ func (b *backend) Close() error { return b.db.Close() } +// NewTmpBackend creates a backend implementation for testing. +func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { + dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") + if err != nil { + log.Fatal(err) + } + tmpPath := path.Join(dir, "database") + return newBackend(tmpPath, batchInterval, batchLimit), tmpPath +} + +func NewDefaultTmpBackend() (*backend, string) { + return NewTmpBackend(defaultBatchInterval, defaultBatchLimit) +} + type snapshot struct { *bolt.Tx } diff --git a/storage/backend/backend_test.go b/storage/backend/backend_test.go index 9a3fcb112..039ab4a66 100644 --- a/storage/backend/backend_test.go +++ b/storage/backend/backend_test.go @@ -16,9 +16,7 @@ package backend import ( "io/ioutil" - "log" "os" - "path" "testing" "time" @@ -26,18 +24,8 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) -var tmpPath string - -func init() { - dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") - if err != nil { - log.Fatal(err) - } - tmpPath = path.Join(dir, "database") -} - func TestBackendClose(t *testing.T) { - b := newBackend(tmpPath, time.Hour, 10000) + b, tmpPath := NewTmpBackend(time.Hour, 10000) defer os.Remove(tmpPath) // check close could work @@ -57,7 +45,7 @@ func TestBackendClose(t *testing.T) { } func TestBackendSnapshot(t *testing.T) { - b := New(tmpPath, time.Hour, 10000) + b, tmpPath := NewTmpBackend(time.Hour, 10000) defer cleanup(b, tmpPath) tx := b.BatchTx() @@ -93,8 +81,9 @@ func TestBackendSnapshot(t *testing.T) { } func TestBackendBatchIntervalCommit(t *testing.T) { - // start backend with super short batch interval - b := newBackend(tmpPath, time.Nanosecond, 10000) + // start backend with super short batch interval so + // we do not need to wait long before commit to happen. + b, tmpPath := NewTmpBackend(time.Nanosecond, 10000) defer cleanup(b, tmpPath) tx := b.BatchTx() diff --git a/storage/backend/batch_tx_test.go b/storage/backend/batch_tx_test.go index 56a8b385f..402aeaf2e 100644 --- a/storage/backend/batch_tx_test.go +++ b/storage/backend/batch_tx_test.go @@ -23,7 +23,7 @@ import ( ) func TestBatchTxPut(t *testing.T) { - b := newBackend(tmpPath, time.Hour, 10000) + b, tmpPath := NewTmpBackend(time.Hour, 10000) defer cleanup(b, tmpPath) tx := b.batchTx @@ -48,7 +48,7 @@ func TestBatchTxPut(t *testing.T) { } func TestBatchTxRange(t *testing.T) { - b := newBackend(tmpPath, time.Hour, 10000) + b, tmpPath := NewTmpBackend(time.Hour, 10000) defer cleanup(b, tmpPath) tx := b.batchTx @@ -119,7 +119,7 @@ func TestBatchTxRange(t *testing.T) { } func TestBatchTxDelete(t *testing.T) { - b := newBackend(tmpPath, time.Hour, 10000) + b, tmpPath := NewTmpBackend(time.Hour, 10000) defer cleanup(b, tmpPath) tx := b.batchTx @@ -142,7 +142,7 @@ func TestBatchTxDelete(t *testing.T) { } func TestBatchTxCommit(t *testing.T) { - b := newBackend(tmpPath, time.Hour, 10000) + b, tmpPath := NewTmpBackend(time.Hour, 10000) defer cleanup(b, tmpPath) tx := b.batchTx @@ -169,8 +169,9 @@ func TestBatchTxCommit(t *testing.T) { } func TestBatchTxBatchLimitCommit(t *testing.T) { - // start backend with batch limit 1 - b := newBackend(tmpPath, time.Hour, 1) + // start backend with batch limit 1 so one write can + // trigger a commit + b, tmpPath := NewTmpBackend(time.Hour, 1) defer cleanup(b, tmpPath) tx := b.batchTx diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go index 688959bff..56c7342c8 100644 --- a/storage/consistent_watchable_store.go +++ b/storage/consistent_watchable_store.go @@ -19,6 +19,7 @@ import ( "log" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) @@ -46,16 +47,15 @@ type consistentWatchableStore struct { skip bool // indicate whether or not to skip an operation } -func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV { - return newConsistentWatchableStore(path, ig) +func New(b backend.Backend, ig ConsistentIndexGetter) ConsistentWatchableKV { + return newConsistentWatchableStore(b, ig) } -// newConsistentWatchableStore creates a new consistentWatchableStore -// using the file at the given path. -// If the file at the given path does not exist then it will be created automatically. -func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consistentWatchableStore { +// newConsistentWatchableStore creates a new consistentWatchableStore with the give +// backend. +func newConsistentWatchableStore(b backend.Backend, ig ConsistentIndexGetter) *consistentWatchableStore { return &consistentWatchableStore{ - watchableStore: newWatchableStore(path), + watchableStore: newWatchableStore(b), ig: ig, } } diff --git a/storage/consistent_watchable_store_test.go b/storage/consistent_watchable_store_test.go index d4a01b940..7602594e0 100644 --- a/storage/consistent_watchable_store_test.go +++ b/storage/consistent_watchable_store_test.go @@ -14,7 +14,11 @@ package storage -import "testing" +import ( + "testing" + + "github.com/coreos/etcd/storage/backend" +) type indexVal uint64 @@ -22,8 +26,9 @@ func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) } func TestConsistentWatchableStoreConsistentIndex(t *testing.T) { var idx indexVal - s := newConsistentWatchableStore(tmpPath, &idx) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := newConsistentWatchableStore(b, &idx) + defer cleanup(s, b, tmpPath) tests := []uint64{1, 2, 3, 5, 10} for i, tt := range tests { @@ -41,8 +46,9 @@ func TestConsistentWatchableStoreConsistentIndex(t *testing.T) { func TestConsistentWatchableStoreSkip(t *testing.T) { idx := indexVal(5) - s := newConsistentWatchableStore(tmpPath, &idx) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := newConsistentWatchableStore(b, &idx) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), NoLease) diff --git a/storage/kv_test.go b/storage/kv_test.go index ec367f993..9f18d1d0f 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -15,16 +15,14 @@ package storage import ( - "io/ioutil" - "log" "os" - "path" "reflect" "testing" "time" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/pkg/testutil" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) @@ -74,24 +72,15 @@ var ( } return n, rev } - - tmpPath string ) -func init() { - tmpDir, err := ioutil.TempDir(os.TempDir(), "etcd_test_storage") - if err != nil { - log.Fatal(err) - } - tmpPath = path.Join(tmpDir, "database") -} - func TestKVRange(t *testing.T) { testKVRange(t, normalRangeFunc) } func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) } func testKVRange(t *testing.T, f rangeFunc) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), 1) s.Put([]byte("foo1"), []byte("bar1"), 2) @@ -157,8 +146,9 @@ func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), 1) s.Put([]byte("foo1"), []byte("bar1"), 2) @@ -199,8 +189,9 @@ func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), NoLease) s.Put([]byte("foo1"), []byte("bar1"), NoLease) @@ -231,8 +222,9 @@ func TestKVRangeLimit(t *testing.T) { testKVRangeLimit(t, normalRangeFunc) } func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) } func testKVRangeLimit(t *testing.T, f rangeFunc) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), 1) s.Put([]byte("foo1"), []byte("bar1"), 2) @@ -275,8 +267,9 @@ func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalP func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) } func testKVPutMultipleTimes(t *testing.T, f putFunc) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { base := int64(i + 1) @@ -336,7 +329,8 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { } for i, tt := range tests { - s := newDefaultStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) s.Put([]byte("foo"), []byte("bar"), NoLease) s.Put([]byte("foo1"), []byte("bar1"), NoLease) @@ -347,7 +341,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, tt.wN, tt.wrev) } - cleanup(s, tmpPath) + cleanup(s, b, tmpPath) } } @@ -355,8 +349,9 @@ func TestKVDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, n func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) } func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), NoLease) @@ -375,8 +370,9 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { // test that range, put, delete on single key in sequence repeatedly works correctly. func TestKVOperationInSequence(t *testing.T) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { base := int64(i * 2) @@ -421,8 +417,9 @@ func TestKVOperationInSequence(t *testing.T) { } func TestKVTxnBlockNonTnxOperations(t *testing.T) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) tests := []func(){ func() { s.Range([]byte("foo"), nil, 0, 0) }, @@ -452,8 +449,9 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) { } func TestKVTxnWrongID(t *testing.T) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) id := s.TxnBegin() wrongid := id + 1 @@ -488,8 +486,9 @@ func TestKVTxnWrongID(t *testing.T) { // test that txn range, put, delete on single key in sequence repeatedly works correctly. func TestKVTnxOperationInSequence(t *testing.T) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) for i := 0; i < 10; i++ { id := s.TxnBegin() @@ -543,8 +542,9 @@ func TestKVTnxOperationInSequence(t *testing.T) { } func TestKVCompactReserveLastValue(t *testing.T) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), 1) s.Put([]byte("foo"), []byte("bar1"), 2) @@ -596,8 +596,9 @@ func TestKVCompactReserveLastValue(t *testing.T) { } func TestKVCompactBad(t *testing.T) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar0"), NoLease) s.Put([]byte("foo"), []byte("bar1"), NoLease) @@ -628,14 +629,15 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error - kv := newDefaultStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + kv := NewStore(b) kv.Put([]byte("foo0"), []byte("bar0"), NoLease) kv.Put([]byte("foo1"), []byte("bar0"), NoLease) hashes[i], err = kv.Hash() if err != nil { t.Fatalf("failed to get hash: %v", err) } - cleanup(kv, tmpPath) + cleanup(kv, b, tmpPath) } for i := 1; i < len(hashes); i++ { @@ -664,7 +666,8 @@ func TestKVRestore(t *testing.T) { }, } for i, tt := range tests { - s := newDefaultStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) tt(s) var kvss [][]storagepb.KeyValue for k := int64(0); k < 10; k++ { @@ -673,7 +676,7 @@ func TestKVRestore(t *testing.T) { } s.Close() - ns := newDefaultStore(tmpPath) + ns := NewStore(b) ns.Restore() // wait for possible compaction to finish testutil.WaitSchedule() @@ -682,7 +685,7 @@ func TestKVRestore(t *testing.T) { nkvs, _, _ := ns.Range([]byte("a"), []byte("z"), 0, k) nkvss = append(nkvss, nkvs) } - cleanup(ns, tmpPath) + cleanup(ns, b, tmpPath) if !reflect.DeepEqual(nkvss, kvss) { t.Errorf("#%d: kvs history = %+v, want %+v", i, nkvss, kvss) @@ -691,8 +694,9 @@ func TestKVRestore(t *testing.T) { } func TestKVSnapshot(t *testing.T) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) s.Put([]byte("foo"), []byte("bar"), 1) s.Put([]byte("foo1"), []byte("bar1"), 2) @@ -715,8 +719,8 @@ func TestKVSnapshot(t *testing.T) { } f.Close() - ns := newDefaultStore("new_test") - defer cleanup(ns, "new_test") + ns := NewStore(b) + defer ns.Close() ns.Restore() kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0) if err != nil { @@ -731,8 +735,9 @@ func TestKVSnapshot(t *testing.T) { } func TestWatchableKVWatch(t *testing.T) { - s := WatchableKV(newWatchableStore(tmpPath)) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := WatchableKV(newWatchableStore(b)) + defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() @@ -842,7 +847,8 @@ func TestWatchableKVWatch(t *testing.T) { } } -func cleanup(s KV, path string) { +func cleanup(s KV, b backend.Backend, path string) { s.Close() + b.Close() os.Remove(path) } diff --git a/storage/kvstore.go b/storage/kvstore.go index d2c34b862..87b9aacbc 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -28,8 +28,6 @@ import ( ) var ( - batchLimit = 10000 - batchInterval = 100 * time.Millisecond keyBucketName = []byte("key") metaBucketName = []byte("meta") @@ -68,9 +66,11 @@ type store struct { stopc chan struct{} } -func NewStore(path string, bachInterval time.Duration, batchLimit int) KV { +// NewStore returns a new store. It is useful to create a store inside +// storage pkg. It should only be used for testing externally. +func NewStore(b backend.Backend) *store { s := &store{ - b: backend.New(path, batchInterval, batchLimit), + b: b, kvindex: newTreeIndex(), currentRev: revision{}, compactMainRev: -1, @@ -87,10 +87,6 @@ func NewStore(path string, bachInterval time.Duration, batchLimit int) KV { return s } -func newDefaultStore(path string) *store { - return (NewStore(path, batchInterval, batchLimit)).(*store) -} - func (s *store) Rev() int64 { s.mu.Lock() defer s.mu.Unlock() @@ -297,7 +293,7 @@ func (s *store) Restore() error { func (s *store) Close() error { close(s.stopc) s.wg.Wait() - return s.b.Close() + return nil } func (a *store) Equal(b *store) bool { diff --git a/storage/kvstore_bench_test.go b/storage/kvstore_bench_test.go index a4e58cfeb..1921edbf4 100644 --- a/storage/kvstore_bench_test.go +++ b/storage/kvstore_bench_test.go @@ -15,13 +15,15 @@ package storage import ( "log" - "os" "testing" + + "github.com/coreos/etcd/storage/backend" ) func BenchmarkStorePut(b *testing.B) { - s := newDefaultStore(tmpPath) - defer os.Remove(tmpPath) + be, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(be) + defer cleanup(s, be, tmpPath) // arbitrary number of bytes bytesN := 64 @@ -38,8 +40,9 @@ func BenchmarkStorePut(b *testing.B) { // with transaction begin and end, where transaction involves // some synchronization operations, such as mutex locking. func BenchmarkStoreTxnPut(b *testing.B) { - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + be, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(be) + defer cleanup(s, be, tmpPath) // arbitrary number of bytes bytesN := 64 diff --git a/storage/kvstore_compaction_test.go b/storage/kvstore_compaction_test.go index c7e44210d..f4e613e40 100644 --- a/storage/kvstore_compaction_test.go +++ b/storage/kvstore_compaction_test.go @@ -17,6 +17,8 @@ package storage import ( "reflect" "testing" + + "github.com/coreos/etcd/storage/backend" ) func TestScheduleCompaction(t *testing.T) { @@ -58,7 +60,8 @@ func TestScheduleCompaction(t *testing.T) { }, } for i, tt := range tests { - s := newDefaultStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) tx := s.b.BatchTx() tx.Lock() @@ -88,6 +91,6 @@ func TestScheduleCompaction(t *testing.T) { } tx.Unlock() - cleanup(s, tmpPath) + cleanup(s, b, tmpPath) } } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index e8d47bc45..bf906c429 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -30,7 +30,8 @@ import ( ) func TestStoreRev(t *testing.T) { - s := newDefaultStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) defer os.Remove(tmpPath) for i := 0; i < 3; i++ { @@ -358,7 +359,8 @@ func TestStoreRestore(t *testing.T) { } func TestRestoreContinueUnfinishedCompaction(t *testing.T) { - s0 := newDefaultStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s0 := NewStore(b) defer os.Remove(tmpPath) s0.Put([]byte("foo"), []byte("bar"), NoLease) @@ -375,7 +377,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0.Close() - s1 := newDefaultStore(tmpPath) + s1 := NewStore(b) s1.Restore() // wait for scheduled compaction to be finished @@ -413,8 +415,9 @@ func TestTxnPut(t *testing.T) { keys := createBytesSlice(bytesN, sliceN) vals := createBytesSlice(bytesN, sliceN) - s := newDefaultStore(tmpPath) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) + defer cleanup(s, b, tmpPath) for i := 0; i < sliceN; i++ { id := s.TxnBegin() @@ -433,7 +436,8 @@ func TestTxnPut(t *testing.T) { } func TestTxnBlockBackendForceCommit(t *testing.T) { - s := newDefaultStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(b) defer os.Remove(tmpPath) id := s.TxnBegin() diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 9b0ae2912..09f7ec39b 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -22,6 +22,7 @@ import ( "time" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) @@ -58,9 +59,9 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func newWatchableStore(path string) *watchableStore { +func newWatchableStore(b backend.Backend) *watchableStore { s := &watchableStore{ - store: newDefaultStore(path), + store: NewStore(b), unsynced: make(map[*watcher]struct{}), synced: make(map[string]map[*watcher]struct{}), stopc: make(chan struct{}), diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 3efee2267..2aeed52b8 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -18,6 +18,8 @@ import ( "math/rand" "os" "testing" + + "github.com/coreos/etcd/storage/backend" ) // Benchmarks on cancel function performance for unsynced watchers @@ -28,12 +30,15 @@ import ( // TODO: k is an arbitrary constant. We need to figure out what factor // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { + be, tmpPath := backend.NewDefaultTmpBackend() + s := NewStore(be) + // manually create watchableStore instead of newWatchableStore // because newWatchableStore periodically calls syncWatchersLoop // method to sync watchers in unsynced map. We want to keep watchers // in unsynced for this benchmark. - s := &watchableStore{ - store: newDefaultStore(tmpPath), + ws := &watchableStore{ + store: s, unsynced: make(map[*watcher]struct{}), // to make the test not crash from assigning to nil map. @@ -42,7 +47,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { } defer func() { - s.store.Close() + ws.store.Close() os.Remove(tmpPath) }() @@ -54,7 +59,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { testValue := []byte("bar") s.Put(testKey, testValue, NoLease) - w := s.NewWatchStream() + w := ws.NewWatchStream() const k int = 2 benchSampleN := b.N @@ -82,7 +87,9 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { } func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { - s := newWatchableStore(tmpPath) + be, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(be) + defer func() { s.store.Close() os.Remove(tmpPath) diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 9b81a9275..0ea1e97cd 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -20,15 +20,19 @@ import ( "reflect" "testing" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) func TestWatch(t *testing.T) { - s := newWatchableStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b) + defer func() { s.store.Close() os.Remove(tmpPath) }() + testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, NoLease) @@ -43,7 +47,9 @@ func TestWatch(t *testing.T) { } func TestNewWatcherCancel(t *testing.T) { - s := newWatchableStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b) + defer func() { s.store.Close() os.Remove(tmpPath) @@ -67,12 +73,14 @@ func TestNewWatcherCancel(t *testing.T) { // TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced. func TestCancelUnsynced(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + // manually create watchableStore instead of newWatchableStore // because newWatchableStore automatically calls syncWatchers // method to sync watchers in unsynced map. We want to keep watchers // in unsynced to test if syncWatchers works as expected. s := &watchableStore{ - store: newDefaultStore(tmpPath), + store: NewStore(b), unsynced: make(map[*watcher]struct{}), // to make the test not crash from assigning to nil map. @@ -124,8 +132,10 @@ func TestCancelUnsynced(t *testing.T) { // method to see if it correctly sends events to channel of unsynced watchers // and moves these watchers to synced. func TestSyncWatchers(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := &watchableStore{ - store: newDefaultStore(tmpPath), + store: NewStore(b), unsynced: make(map[*watcher]struct{}), synced: make(map[string]map[*watcher]struct{}), } @@ -205,7 +215,8 @@ func TestSyncWatchers(t *testing.T) { } func TestUnsafeAddWatcher(t *testing.T) { - s := newWatchableStore(tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b) defer func() { s.store.Close() os.Remove(tmpPath) diff --git a/storage/watcher_bench_test.go b/storage/watcher_bench_test.go index 7b3cd9cbc..318ab7ea1 100644 --- a/storage/watcher_bench_test.go +++ b/storage/watcher_bench_test.go @@ -17,11 +17,15 @@ package storage import ( "fmt" "testing" + + "github.com/coreos/etcd/storage/backend" ) func BenchmarkKVWatcherMemoryUsage(b *testing.B) { - watchable := newWatchableStore(tmpPath) - defer cleanup(watchable, tmpPath) + be, tmpPath := backend.NewDefaultTmpBackend() + watchable := newWatchableStore(be) + + defer cleanup(watchable, be, tmpPath) w := watchable.NewWatchStream() diff --git a/storage/watcher_test.go b/storage/watcher_test.go index 71e409ef1..0a26add1e 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -14,13 +14,18 @@ package storage -import "testing" +import ( + "testing" + + "github.com/coreos/etcd/storage/backend" +) // TestWatcherWatchID tests that each watcher provides unique watchID, // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { - s := WatchableKV(newWatchableStore(tmpPath)) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := WatchableKV(newWatchableStore(b)) + defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() @@ -70,8 +75,9 @@ func TestWatcherWatchID(t *testing.T) { // TestWatchStreamCancel ensures cancel calls the cancel func of the watcher // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { - s := WatchableKV(newWatchableStore(tmpPath)) - defer cleanup(s, tmpPath) + b, tmpPath := backend.NewDefaultTmpBackend() + s := WatchableKV(newWatchableStore(b)) + defer cleanup(s, b, tmpPath) w := s.NewWatchStream() defer w.Close() diff --git a/tools/benchmark/cmd/storage.go b/tools/benchmark/cmd/storage.go index 813e6ba65..5421dc82b 100644 --- a/tools/benchmark/cmd/storage.go +++ b/tools/benchmark/cmd/storage.go @@ -20,6 +20,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/backend" ) var ( @@ -30,7 +31,8 @@ var ( ) func initStorage() { - s = storage.NewStore("storage-bench", time.Duration(batchInterval), batchLimit) + be := backend.New("storage-bench", time.Duration(batchInterval), batchLimit) + s = storage.NewStore(be) os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok }