mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: make backend outside kv
KV and lease will share the same backend. Thus we need to make backend outside KV.
This commit is contained in:
parent
1e61243fd7
commit
5dd3f91903
@ -47,6 +47,7 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/snap"
|
||||
dstorage "github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/etcd/wal"
|
||||
@ -358,7 +359,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
}
|
||||
|
||||
if cfg.V3demo {
|
||||
srv.kv = dstorage.New(path.Join(cfg.SnapDir(), databaseFilename), &srv.consistIndex)
|
||||
be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
|
||||
srv.kv = dstorage.New(be, &srv.consistIndex)
|
||||
if err := srv.kv.Restore(); err != nil {
|
||||
plog.Fatalf("v3 storage restore error: %v", err)
|
||||
}
|
||||
@ -583,7 +585,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
plog.Panicf("rename snapshot file error: %v", err)
|
||||
}
|
||||
|
||||
newKV := dstorage.New(fn, &s.consistIndex)
|
||||
newbe := backend.NewDefaultBackend(fn)
|
||||
newKV := dstorage.New(newbe, &s.consistIndex)
|
||||
if err := newKV.Restore(); err != nil {
|
||||
plog.Panicf("restore KV error: %v", err)
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
dstorage "github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
@ -864,9 +865,12 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
||||
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
||||
}
|
||||
|
||||
s.kv = dstorage.New(
|
||||
path.Join(testdir, "testdb.db"),
|
||||
&s.consistIndex)
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
defer func() {
|
||||
be.Close()
|
||||
os.RemoveAll(tmpPath)
|
||||
}()
|
||||
s.kv = dstorage.New(be, &s.consistIndex)
|
||||
|
||||
s.start()
|
||||
defer s.Stop()
|
||||
|
@ -18,13 +18,21 @@ import (
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultBatchLimit = 10000
|
||||
defaultBatchInterval = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
type Backend interface {
|
||||
BatchTx() BatchTx
|
||||
Snapshot() Snapshot
|
||||
@ -60,6 +68,10 @@ func New(path string, d time.Duration, limit int) Backend {
|
||||
return newBackend(path, d, limit)
|
||||
}
|
||||
|
||||
func NewDefaultBackend(path string) Backend {
|
||||
return newBackend(path, defaultBatchInterval, defaultBatchLimit)
|
||||
}
|
||||
|
||||
func newBackend(path string, d time.Duration, limit int) *backend {
|
||||
db, err := bolt.Open(path, 0600, boltOpenOptions)
|
||||
if err != nil {
|
||||
@ -151,6 +163,20 @@ func (b *backend) Close() error {
|
||||
return b.db.Close()
|
||||
}
|
||||
|
||||
// NewTmpBackend creates a backend implementation for testing.
|
||||
func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
tmpPath := path.Join(dir, "database")
|
||||
return newBackend(tmpPath, batchInterval, batchLimit), tmpPath
|
||||
}
|
||||
|
||||
func NewDefaultTmpBackend() (*backend, string) {
|
||||
return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
|
||||
}
|
||||
|
||||
type snapshot struct {
|
||||
*bolt.Tx
|
||||
}
|
||||
|
@ -16,9 +16,7 @@ package backend
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -26,18 +24,8 @@ import (
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
var tmpPath string
|
||||
|
||||
func init() {
|
||||
dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
tmpPath = path.Join(dir, "database")
|
||||
}
|
||||
|
||||
func TestBackendClose(t *testing.T) {
|
||||
b := newBackend(tmpPath, time.Hour, 10000)
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
||||
defer os.Remove(tmpPath)
|
||||
|
||||
// check close could work
|
||||
@ -57,7 +45,7 @@ func TestBackendClose(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBackendSnapshot(t *testing.T) {
|
||||
b := New(tmpPath, time.Hour, 10000)
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
tx := b.BatchTx()
|
||||
@ -93,8 +81,9 @@ func TestBackendSnapshot(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBackendBatchIntervalCommit(t *testing.T) {
|
||||
// start backend with super short batch interval
|
||||
b := newBackend(tmpPath, time.Nanosecond, 10000)
|
||||
// start backend with super short batch interval so
|
||||
// we do not need to wait long before commit to happen.
|
||||
b, tmpPath := NewTmpBackend(time.Nanosecond, 10000)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
tx := b.BatchTx()
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
func TestBatchTxPut(t *testing.T) {
|
||||
b := newBackend(tmpPath, time.Hour, 10000)
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
tx := b.batchTx
|
||||
@ -48,7 +48,7 @@ func TestBatchTxPut(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBatchTxRange(t *testing.T) {
|
||||
b := newBackend(tmpPath, time.Hour, 10000)
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
tx := b.batchTx
|
||||
@ -119,7 +119,7 @@ func TestBatchTxRange(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBatchTxDelete(t *testing.T) {
|
||||
b := newBackend(tmpPath, time.Hour, 10000)
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
tx := b.batchTx
|
||||
@ -142,7 +142,7 @@ func TestBatchTxDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBatchTxCommit(t *testing.T) {
|
||||
b := newBackend(tmpPath, time.Hour, 10000)
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 10000)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
tx := b.batchTx
|
||||
@ -169,8 +169,9 @@ func TestBatchTxCommit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBatchTxBatchLimitCommit(t *testing.T) {
|
||||
// start backend with batch limit 1
|
||||
b := newBackend(tmpPath, time.Hour, 1)
|
||||
// start backend with batch limit 1 so one write can
|
||||
// trigger a commit
|
||||
b, tmpPath := NewTmpBackend(time.Hour, 1)
|
||||
defer cleanup(b, tmpPath)
|
||||
|
||||
tx := b.batchTx
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
@ -46,16 +47,15 @@ type consistentWatchableStore struct {
|
||||
skip bool // indicate whether or not to skip an operation
|
||||
}
|
||||
|
||||
func New(path string, ig ConsistentIndexGetter) ConsistentWatchableKV {
|
||||
return newConsistentWatchableStore(path, ig)
|
||||
func New(b backend.Backend, ig ConsistentIndexGetter) ConsistentWatchableKV {
|
||||
return newConsistentWatchableStore(b, ig)
|
||||
}
|
||||
|
||||
// newConsistentWatchableStore creates a new consistentWatchableStore
|
||||
// using the file at the given path.
|
||||
// If the file at the given path does not exist then it will be created automatically.
|
||||
func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consistentWatchableStore {
|
||||
// newConsistentWatchableStore creates a new consistentWatchableStore with the give
|
||||
// backend.
|
||||
func newConsistentWatchableStore(b backend.Backend, ig ConsistentIndexGetter) *consistentWatchableStore {
|
||||
return &consistentWatchableStore{
|
||||
watchableStore: newWatchableStore(path),
|
||||
watchableStore: newWatchableStore(b),
|
||||
ig: ig,
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,11 @@
|
||||
|
||||
package storage
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
type indexVal uint64
|
||||
|
||||
@ -22,8 +26,9 @@ func (v *indexVal) ConsistentIndex() uint64 { return uint64(*v) }
|
||||
|
||||
func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
|
||||
var idx indexVal
|
||||
s := newConsistentWatchableStore(tmpPath, &idx)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newConsistentWatchableStore(b, &idx)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
tests := []uint64{1, 2, 3, 5, 10}
|
||||
for i, tt := range tests {
|
||||
@ -41,8 +46,9 @@ func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
|
||||
|
||||
func TestConsistentWatchableStoreSkip(t *testing.T) {
|
||||
idx := indexVal(5)
|
||||
s := newConsistentWatchableStore(tmpPath, &idx)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newConsistentWatchableStore(b, &idx)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), NoLease)
|
||||
|
||||
|
@ -15,16 +15,14 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
@ -74,24 +72,15 @@ var (
|
||||
}
|
||||
return n, rev
|
||||
}
|
||||
|
||||
tmpPath string
|
||||
)
|
||||
|
||||
func init() {
|
||||
tmpDir, err := ioutil.TempDir(os.TempDir(), "etcd_test_storage")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
tmpPath = path.Join(tmpDir, "database")
|
||||
}
|
||||
|
||||
func TestKVRange(t *testing.T) { testKVRange(t, normalRangeFunc) }
|
||||
func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
|
||||
|
||||
func testKVRange(t *testing.T, f rangeFunc) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), 1)
|
||||
s.Put([]byte("foo1"), []byte("bar1"), 2)
|
||||
@ -157,8 +146,9 @@ func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
|
||||
func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
|
||||
|
||||
func testKVRangeRev(t *testing.T, f rangeFunc) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), 1)
|
||||
s.Put([]byte("foo1"), []byte("bar1"), 2)
|
||||
@ -199,8 +189,9 @@ func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc)
|
||||
func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
|
||||
|
||||
func testKVRangeBadRev(t *testing.T, f rangeFunc) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), NoLease)
|
||||
s.Put([]byte("foo1"), []byte("bar1"), NoLease)
|
||||
@ -231,8 +222,9 @@ func TestKVRangeLimit(t *testing.T) { testKVRangeLimit(t, normalRangeFunc) }
|
||||
func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
|
||||
|
||||
func testKVRangeLimit(t *testing.T, f rangeFunc) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), 1)
|
||||
s.Put([]byte("foo1"), []byte("bar1"), 2)
|
||||
@ -275,8 +267,9 @@ func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalP
|
||||
func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }
|
||||
|
||||
func testKVPutMultipleTimes(t *testing.T, f putFunc) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
base := int64(i + 1)
|
||||
@ -336,7 +329,8 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
s := newDefaultStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), NoLease)
|
||||
s.Put([]byte("foo1"), []byte("bar1"), NoLease)
|
||||
@ -347,7 +341,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
|
||||
t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, tt.wN, tt.wrev)
|
||||
}
|
||||
|
||||
cleanup(s, tmpPath)
|
||||
cleanup(s, b, tmpPath)
|
||||
}
|
||||
}
|
||||
|
||||
@ -355,8 +349,9 @@ func TestKVDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, n
|
||||
func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) }
|
||||
|
||||
func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), NoLease)
|
||||
|
||||
@ -375,8 +370,9 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
|
||||
|
||||
// test that range, put, delete on single key in sequence repeatedly works correctly.
|
||||
func TestKVOperationInSequence(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
base := int64(i * 2)
|
||||
@ -421,8 +417,9 @@ func TestKVOperationInSequence(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestKVTxnBlockNonTnxOperations(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
tests := []func(){
|
||||
func() { s.Range([]byte("foo"), nil, 0, 0) },
|
||||
@ -452,8 +449,9 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestKVTxnWrongID(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
id := s.TxnBegin()
|
||||
wrongid := id + 1
|
||||
@ -488,8 +486,9 @@ func TestKVTxnWrongID(t *testing.T) {
|
||||
|
||||
// test that txn range, put, delete on single key in sequence repeatedly works correctly.
|
||||
func TestKVTnxOperationInSequence(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
id := s.TxnBegin()
|
||||
@ -543,8 +542,9 @@ func TestKVTnxOperationInSequence(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestKVCompactReserveLastValue(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar0"), 1)
|
||||
s.Put([]byte("foo"), []byte("bar1"), 2)
|
||||
@ -596,8 +596,9 @@ func TestKVCompactReserveLastValue(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestKVCompactBad(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar0"), NoLease)
|
||||
s.Put([]byte("foo"), []byte("bar1"), NoLease)
|
||||
@ -628,14 +629,15 @@ func TestKVHash(t *testing.T) {
|
||||
|
||||
for i := 0; i < len(hashes); i++ {
|
||||
var err error
|
||||
kv := newDefaultStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
kv := NewStore(b)
|
||||
kv.Put([]byte("foo0"), []byte("bar0"), NoLease)
|
||||
kv.Put([]byte("foo1"), []byte("bar0"), NoLease)
|
||||
hashes[i], err = kv.Hash()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get hash: %v", err)
|
||||
}
|
||||
cleanup(kv, tmpPath)
|
||||
cleanup(kv, b, tmpPath)
|
||||
}
|
||||
|
||||
for i := 1; i < len(hashes); i++ {
|
||||
@ -664,7 +666,8 @@ func TestKVRestore(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
s := newDefaultStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
tt(s)
|
||||
var kvss [][]storagepb.KeyValue
|
||||
for k := int64(0); k < 10; k++ {
|
||||
@ -673,7 +676,7 @@ func TestKVRestore(t *testing.T) {
|
||||
}
|
||||
s.Close()
|
||||
|
||||
ns := newDefaultStore(tmpPath)
|
||||
ns := NewStore(b)
|
||||
ns.Restore()
|
||||
// wait for possible compaction to finish
|
||||
testutil.WaitSchedule()
|
||||
@ -682,7 +685,7 @@ func TestKVRestore(t *testing.T) {
|
||||
nkvs, _, _ := ns.Range([]byte("a"), []byte("z"), 0, k)
|
||||
nkvss = append(nkvss, nkvs)
|
||||
}
|
||||
cleanup(ns, tmpPath)
|
||||
cleanup(ns, b, tmpPath)
|
||||
|
||||
if !reflect.DeepEqual(nkvss, kvss) {
|
||||
t.Errorf("#%d: kvs history = %+v, want %+v", i, nkvss, kvss)
|
||||
@ -691,8 +694,9 @@ func TestKVRestore(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestKVSnapshot(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"), 1)
|
||||
s.Put([]byte("foo1"), []byte("bar1"), 2)
|
||||
@ -715,8 +719,8 @@ func TestKVSnapshot(t *testing.T) {
|
||||
}
|
||||
f.Close()
|
||||
|
||||
ns := newDefaultStore("new_test")
|
||||
defer cleanup(ns, "new_test")
|
||||
ns := NewStore(b)
|
||||
defer ns.Close()
|
||||
ns.Restore()
|
||||
kvs, rev, err := ns.Range([]byte("a"), []byte("z"), 0, 0)
|
||||
if err != nil {
|
||||
@ -731,8 +735,9 @@ func TestKVSnapshot(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchableKVWatch(t *testing.T) {
|
||||
s := WatchableKV(newWatchableStore(tmpPath))
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := WatchableKV(newWatchableStore(b))
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
@ -842,7 +847,8 @@ func TestWatchableKVWatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func cleanup(s KV, path string) {
|
||||
func cleanup(s KV, b backend.Backend, path string) {
|
||||
s.Close()
|
||||
b.Close()
|
||||
os.Remove(path)
|
||||
}
|
||||
|
@ -28,8 +28,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
batchLimit = 10000
|
||||
batchInterval = 100 * time.Millisecond
|
||||
keyBucketName = []byte("key")
|
||||
metaBucketName = []byte("meta")
|
||||
|
||||
@ -68,9 +66,11 @@ type store struct {
|
||||
stopc chan struct{}
|
||||
}
|
||||
|
||||
func NewStore(path string, bachInterval time.Duration, batchLimit int) KV {
|
||||
// NewStore returns a new store. It is useful to create a store inside
|
||||
// storage pkg. It should only be used for testing externally.
|
||||
func NewStore(b backend.Backend) *store {
|
||||
s := &store{
|
||||
b: backend.New(path, batchInterval, batchLimit),
|
||||
b: b,
|
||||
kvindex: newTreeIndex(),
|
||||
currentRev: revision{},
|
||||
compactMainRev: -1,
|
||||
@ -87,10 +87,6 @@ func NewStore(path string, bachInterval time.Duration, batchLimit int) KV {
|
||||
return s
|
||||
}
|
||||
|
||||
func newDefaultStore(path string) *store {
|
||||
return (NewStore(path, batchInterval, batchLimit)).(*store)
|
||||
}
|
||||
|
||||
func (s *store) Rev() int64 {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@ -297,7 +293,7 @@ func (s *store) Restore() error {
|
||||
func (s *store) Close() error {
|
||||
close(s.stopc)
|
||||
s.wg.Wait()
|
||||
return s.b.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *store) Equal(b *store) bool {
|
||||
|
@ -15,13 +15,15 @@ package storage
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
func BenchmarkStorePut(b *testing.B) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer os.Remove(tmpPath)
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(be)
|
||||
defer cleanup(s, be, tmpPath)
|
||||
|
||||
// arbitrary number of bytes
|
||||
bytesN := 64
|
||||
@ -38,8 +40,9 @@ func BenchmarkStorePut(b *testing.B) {
|
||||
// with transaction begin and end, where transaction involves
|
||||
// some synchronization operations, such as mutex locking.
|
||||
func BenchmarkStoreTxnPut(b *testing.B) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(be)
|
||||
defer cleanup(s, be, tmpPath)
|
||||
|
||||
// arbitrary number of bytes
|
||||
bytesN := 64
|
||||
|
@ -17,6 +17,8 @@ package storage
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
func TestScheduleCompaction(t *testing.T) {
|
||||
@ -58,7 +60,8 @@ func TestScheduleCompaction(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
s := newDefaultStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
tx := s.b.BatchTx()
|
||||
|
||||
tx.Lock()
|
||||
@ -88,6 +91,6 @@ func TestScheduleCompaction(t *testing.T) {
|
||||
}
|
||||
tx.Unlock()
|
||||
|
||||
cleanup(s, tmpPath)
|
||||
cleanup(s, b, tmpPath)
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,8 @@ import (
|
||||
)
|
||||
|
||||
func TestStoreRev(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer os.Remove(tmpPath)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
@ -358,7 +359,8 @@ func TestStoreRestore(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
||||
s0 := newDefaultStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s0 := NewStore(b)
|
||||
defer os.Remove(tmpPath)
|
||||
|
||||
s0.Put([]byte("foo"), []byte("bar"), NoLease)
|
||||
@ -375,7 +377,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
|
||||
|
||||
s0.Close()
|
||||
|
||||
s1 := newDefaultStore(tmpPath)
|
||||
s1 := NewStore(b)
|
||||
s1.Restore()
|
||||
|
||||
// wait for scheduled compaction to be finished
|
||||
@ -413,8 +415,9 @@ func TestTxnPut(t *testing.T) {
|
||||
keys := createBytesSlice(bytesN, sliceN)
|
||||
vals := createBytesSlice(bytesN, sliceN)
|
||||
|
||||
s := newDefaultStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
for i := 0; i < sliceN; i++ {
|
||||
id := s.TxnBegin()
|
||||
@ -433,7 +436,8 @@ func TestTxnPut(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTxnBlockBackendForceCommit(t *testing.T) {
|
||||
s := newDefaultStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(b)
|
||||
defer os.Remove(tmpPath)
|
||||
|
||||
id := s.TxnBegin()
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
@ -58,9 +59,9 @@ type watchableStore struct {
|
||||
// cancel operations.
|
||||
type cancelFunc func()
|
||||
|
||||
func newWatchableStore(path string) *watchableStore {
|
||||
func newWatchableStore(b backend.Backend) *watchableStore {
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(path),
|
||||
store: NewStore(b),
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
synced: make(map[string]map[*watcher]struct{}),
|
||||
stopc: make(chan struct{}),
|
||||
|
@ -18,6 +18,8 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
// Benchmarks on cancel function performance for unsynced watchers
|
||||
@ -28,12 +30,15 @@ import (
|
||||
// TODO: k is an arbitrary constant. We need to figure out what factor
|
||||
// we should put to simulate the real-world use cases.
|
||||
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := NewStore(be)
|
||||
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore periodically calls syncWatchersLoop
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced for this benchmark.
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(tmpPath),
|
||||
ws := &watchableStore{
|
||||
store: s,
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
|
||||
// to make the test not crash from assigning to nil map.
|
||||
@ -42,7 +47,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||
}
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
ws.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
}()
|
||||
|
||||
@ -54,7 +59,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue, NoLease)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
w := ws.NewWatchStream()
|
||||
|
||||
const k int = 2
|
||||
benchSampleN := b.N
|
||||
@ -82,7 +87,9 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(be)
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
|
@ -20,15 +20,19 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b)
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
}()
|
||||
|
||||
testKey := []byte("foo")
|
||||
testValue := []byte("bar")
|
||||
s.Put(testKey, testValue, NoLease)
|
||||
@ -43,7 +47,9 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewWatcherCancel(t *testing.T) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b)
|
||||
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
@ -67,12 +73,14 @@ func TestNewWatcherCancel(t *testing.T) {
|
||||
|
||||
// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
|
||||
func TestCancelUnsynced(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
||||
// manually create watchableStore instead of newWatchableStore
|
||||
// because newWatchableStore automatically calls syncWatchers
|
||||
// method to sync watchers in unsynced map. We want to keep watchers
|
||||
// in unsynced to test if syncWatchers works as expected.
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(tmpPath),
|
||||
store: NewStore(b),
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
|
||||
// to make the test not crash from assigning to nil map.
|
||||
@ -124,8 +132,10 @@ func TestCancelUnsynced(t *testing.T) {
|
||||
// method to see if it correctly sends events to channel of unsynced watchers
|
||||
// and moves these watchers to synced.
|
||||
func TestSyncWatchers(t *testing.T) {
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
|
||||
s := &watchableStore{
|
||||
store: newDefaultStore(tmpPath),
|
||||
store: NewStore(b),
|
||||
unsynced: make(map[*watcher]struct{}),
|
||||
synced: make(map[string]map[*watcher]struct{}),
|
||||
}
|
||||
@ -205,7 +215,8 @@ func TestSyncWatchers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnsafeAddWatcher(t *testing.T) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := newWatchableStore(b)
|
||||
defer func() {
|
||||
s.store.Close()
|
||||
os.Remove(tmpPath)
|
||||
|
@ -17,11 +17,15 @@ package storage
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
||||
watchable := newWatchableStore(tmpPath)
|
||||
defer cleanup(watchable, tmpPath)
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
watchable := newWatchableStore(be)
|
||||
|
||||
defer cleanup(watchable, be, tmpPath)
|
||||
|
||||
w := watchable.NewWatchStream()
|
||||
|
||||
|
@ -14,13 +14,18 @@
|
||||
|
||||
package storage
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
// TestWatcherWatchID tests that each watcher provides unique watchID,
|
||||
// and the watched event attaches the correct watchID.
|
||||
func TestWatcherWatchID(t *testing.T) {
|
||||
s := WatchableKV(newWatchableStore(tmpPath))
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := WatchableKV(newWatchableStore(b))
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
@ -70,8 +75,9 @@ func TestWatcherWatchID(t *testing.T) {
|
||||
// TestWatchStreamCancel ensures cancel calls the cancel func of the watcher
|
||||
// with given id inside watchStream.
|
||||
func TestWatchStreamCancelWatcherByID(t *testing.T) {
|
||||
s := WatchableKV(newWatchableStore(tmpPath))
|
||||
defer cleanup(s, tmpPath)
|
||||
b, tmpPath := backend.NewDefaultTmpBackend()
|
||||
s := WatchableKV(newWatchableStore(b))
|
||||
defer cleanup(s, b, tmpPath)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
|
||||
"github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -30,7 +31,8 @@ var (
|
||||
)
|
||||
|
||||
func initStorage() {
|
||||
s = storage.NewStore("storage-bench", time.Duration(batchInterval), batchLimit)
|
||||
be := backend.New("storage-bench", time.Duration(batchInterval), batchLimit)
|
||||
s = storage.NewStore(be)
|
||||
os.Remove("storage-bench") // boltDB has an opened fd, so removing the file is ok
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user