From 7ed929fb3d365eb09661745b12f7d933debe3f49 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 27 Aug 2015 22:40:04 -0700 Subject: [PATCH 1/6] storage/backend: fix limit doesn't effect in range --- storage/backend/batch_tx.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index d01df3906..fe68ccd1e 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -68,6 +68,9 @@ func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64 for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) + if limit > 0 && limit == int64(len(keys)) { + break + } } return keys, vs From f04884f74d966b769a5e2024835020032717eae2 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 27 Aug 2015 22:47:24 -0700 Subject: [PATCH 2/6] storage/backend: fix off-by-one error for pending var Or it may commit until batchLimit + 1. --- storage/backend/batch_tx.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index fe68ccd1e..92aced332 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -43,7 +43,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { log.Fatalf("storage: cannot put key into bucket (%v)", err) } t.pending++ - if t.pending > t.backend.batchLimit { + if t.pending >= t.backend.batchLimit { t.commit(false) t.pending = 0 } @@ -87,7 +87,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { log.Fatalf("storage: cannot delete key from bucket (%v)", err) } t.pending++ - if t.pending > t.backend.batchLimit { + if t.pending >= t.backend.batchLimit { t.commit(false) t.pending = 0 } From 054fab84ee9eb84d512524b988ca9f0d4681bfb0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 27 Aug 2015 22:59:14 -0700 Subject: [PATCH 3/6] storage/backend: remove startc var This makes start logic cleaner. --- storage/backend/backend.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 9ce9098c6..9389d308d 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -22,9 +22,8 @@ type backend struct { batchLimit int batchTx *batchTx - stopc chan struct{} - startc chan struct{} - donec chan struct{} + stopc chan struct{} + donec chan struct{} } func New(path string, d time.Duration, limit int) Backend { @@ -40,13 +39,12 @@ func New(path string, d time.Duration, limit int) Backend { batchLimit: limit, batchTx: &batchTx{}, - stopc: make(chan struct{}), - startc: make(chan struct{}), - donec: make(chan struct{}), + stopc: make(chan struct{}), + donec: make(chan struct{}), } b.batchTx.backend = b + b.batchTx.Commit() go b.run() - <-b.startc return b } @@ -73,9 +71,6 @@ func (b *backend) Snapshot(w io.Writer) (n int64, err error) { func (b *backend) run() { defer close(b.donec) - b.batchTx.Commit() - b.startc <- struct{}{} - for { select { case <-time.After(b.batchInterval): From d2cb732c7b9186302b2f0af89c9be456567aca82 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 27 Aug 2015 23:00:05 -0700 Subject: [PATCH 4/6] test: activate test on storage/backend --- test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test b/test index 89d97cbbd..e34a0b08c 100755 --- a/test +++ b/test @@ -16,7 +16,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap storage store version wal" +TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap storage storage/backend store version wal" # TODO: add it to race testing when the issue is resolved # https://github.com/golang/go/issues/9946 NO_RACE_TESTABLE="rafthttp" From 4b9b0cbcc1814e3956307d7003f1e76877802dae Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 28 Aug 2015 22:03:32 -0700 Subject: [PATCH 5/6] storage: add newBackend and newBatchTx This is for ease of testing. --- storage/backend/backend.go | 8 +++++--- storage/backend/batch_tx.go | 6 ++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/storage/backend/backend.go b/storage/backend/backend.go index 9389d308d..0fb9f02eb 100644 --- a/storage/backend/backend.go +++ b/storage/backend/backend.go @@ -27,6 +27,10 @@ type backend struct { } func New(path string, d time.Duration, limit int) Backend { + return newBackend(path, d, limit) +} + +func newBackend(path string, d time.Duration, limit int) *backend { db, err := bolt.Open(path, 0600, nil) if err != nil { log.Panicf("backend: cannot open database at %s (%v)", path, err) @@ -37,13 +41,11 @@ func New(path string, d time.Duration, limit int) Backend { batchInterval: d, batchLimit: limit, - batchTx: &batchTx{}, stopc: make(chan struct{}), donec: make(chan struct{}), } - b.batchTx.backend = b - b.batchTx.Commit() + b.batchTx = newBatchTx(b) go b.run() return b } diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index 92aced332..56797b85c 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -26,6 +26,12 @@ type batchTx struct { pending int } +func newBatchTx(backend *backend) *batchTx { + tx := &batchTx{backend: backend} + tx.Commit() + return tx +} + func (t *batchTx) UnsafeCreateBucket(name []byte) { _, err := t.tx.CreateBucket(name) if err != nil && err != bolt.ErrBucketExists { From 44fd734038088d69c53f3cee2a94124afe6d38a7 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 28 Aug 2015 22:04:19 -0700 Subject: [PATCH 6/6] storage/backend: add unit tests for backend and batchTx --- storage/backend/backend_test.go | 120 +++++++++++++++++--- storage/backend/batch_tx_test.go | 182 +++++++++++++++++++++++++++++++ 2 files changed, 284 insertions(+), 18 deletions(-) create mode 100644 storage/backend/batch_tx_test.go diff --git a/storage/backend/backend_test.go b/storage/backend/backend_test.go index 55d9549e2..d5d8cd9da 100644 --- a/storage/backend/backend_test.go +++ b/storage/backend/backend_test.go @@ -1,29 +1,113 @@ package backend import ( + "io/ioutil" + "log" "os" - "reflect" + "path" "testing" "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt" + "github.com/coreos/etcd/pkg/testutil" ) -func TestBackendPut(t *testing.T) { - backend := New("test", 10*time.Second, 10000) - defer backend.Close() - defer os.Remove("test") +var tmpPath string - v := []byte("foo") - - batchTx := backend.BatchTx() - batchTx.Lock() - - batchTx.UnsafeCreateBucket([]byte("test")) - - batchTx.UnsafePut([]byte("test"), []byte("foo"), v) - _, gv := batchTx.UnsafeRange([]byte("test"), v, nil, -1) - if !reflect.DeepEqual(gv[0], v) { - t.Errorf("v = %s, want %s", string(gv[0]), string(v)) +func init() { + dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") + if err != nil { + log.Fatal(err) } - - batchTx.Unlock() + tmpPath = path.Join(dir, "database") +} + +func TestBackendClose(t *testing.T) { + b := newBackend(tmpPath, time.Hour, 10000) + defer os.Remove(tmpPath) + + // check close could work + done := make(chan struct{}) + go func() { + err := b.Close() + if err != nil { + t.Errorf("close error = %v, want nil", err) + } + done <- struct{}{} + }() + select { + case <-done: + case <-time.After(time.Second): + t.Errorf("failed to close database in 1s") + } +} + +func TestBackendSnapshot(t *testing.T) { + b := New(tmpPath, time.Hour, 10000) + defer cleanup(b, tmpPath) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.Unlock() + b.ForceCommit() + + // write snapshot to a new file + f, err := ioutil.TempFile(os.TempDir(), "etcd_backend_test") + if err != nil { + t.Fatal(err) + } + _, err = b.Snapshot(f) + if err != nil { + t.Fatal(err) + } + f.Close() + + // bootstrap new backend from the snapshot + nb := New(f.Name(), time.Hour, 10000) + defer cleanup(nb, f.Name()) + + newTx := b.BatchTx() + newTx.Lock() + ks, _ := newTx.UnsafeRange([]byte("test"), []byte("foo"), []byte("goo"), 0) + if len(ks) != 1 { + t.Errorf("len(kvs) = %d, want 1", len(ks)) + } + newTx.Unlock() +} + +func TestBackendBatchIntervalCommit(t *testing.T) { + // start backend with super short batch interval + b := newBackend(tmpPath, time.Nanosecond, 10000) + defer cleanup(b, tmpPath) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.Unlock() + + // give time for batch interval commit to happen + time.Sleep(time.Nanosecond) + testutil.WaitSchedule() + + // check whether put happens via db view + b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte("test")) + if bucket == nil { + t.Errorf("bucket test does not exit") + return nil + } + v := bucket.Get([]byte("foo")) + if v == nil { + t.Errorf("foo key failed to written in backend") + } + return nil + }) +} + +func cleanup(b Backend, path string) { + b.Close() + os.Remove(path) } diff --git a/storage/backend/batch_tx_test.go b/storage/backend/batch_tx_test.go new file mode 100644 index 000000000..9328b45d7 --- /dev/null +++ b/storage/backend/batch_tx_test.go @@ -0,0 +1,182 @@ +package backend + +import ( + "reflect" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt" +) + +func TestBatchTxPut(t *testing.T) { + b := newBackend(tmpPath, time.Hour, 10000) + defer cleanup(b, tmpPath) + + tx := b.batchTx + tx.Lock() + defer tx.Unlock() + + // create bucket + tx.UnsafeCreateBucket([]byte("test")) + + // put + v := []byte("bar") + tx.UnsafePut([]byte("test"), []byte("foo"), v) + + // check put result before and after tx is committed + for k := 0; k < 2; k++ { + _, gv := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0) + if !reflect.DeepEqual(gv[0], v) { + t.Errorf("v = %s, want %s", string(gv[0]), string(v)) + } + tx.commit(false) + } +} + +func TestBatchTxRange(t *testing.T) { + b := newBackend(tmpPath, time.Hour, 10000) + defer cleanup(b, tmpPath) + + tx := b.batchTx + tx.Lock() + defer tx.Unlock() + + tx.UnsafeCreateBucket([]byte("test")) + // put keys + allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")} + allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")} + for i := range allKeys { + tx.UnsafePut([]byte("test"), allKeys[i], allVals[i]) + } + + tests := []struct { + key []byte + endKey []byte + limit int64 + + wkeys [][]byte + wvals [][]byte + }{ + // single key + { + []byte("foo"), nil, 0, + allKeys[:1], allVals[:1], + }, + // single key, bad + { + []byte("doo"), nil, 0, + nil, nil, + }, + // key range + { + []byte("foo"), []byte("foo1"), 0, + allKeys[:1], allVals[:1], + }, + // key range, get all keys + { + []byte("foo"), []byte("foo3"), 0, + allKeys, allVals, + }, + // key range, bad + { + []byte("goo"), []byte("goo3"), 0, + nil, nil, + }, + // key range with effective limit + { + []byte("foo"), []byte("foo3"), 1, + allKeys[:1], allVals[:1], + }, + // key range with limit + { + []byte("foo"), []byte("foo3"), 4, + allKeys, allVals, + }, + } + for i, tt := range tests { + keys, vals := tx.UnsafeRange([]byte("test"), tt.key, tt.endKey, tt.limit) + if !reflect.DeepEqual(keys, tt.wkeys) { + t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys) + } + if !reflect.DeepEqual(vals, tt.wvals) { + t.Errorf("#%d: vals = %+v, want %+v", i, vals, tt.wvals) + } + } +} + +func TestBatchTxDelete(t *testing.T) { + b := newBackend(tmpPath, time.Hour, 10000) + defer cleanup(b, tmpPath) + + tx := b.batchTx + tx.Lock() + defer tx.Unlock() + + tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + + tx.UnsafeDelete([]byte("test"), []byte("foo")) + + // check put result before and after tx is committed + for k := 0; k < 2; k++ { + ks, _ := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0) + if len(ks) != 0 { + t.Errorf("keys on foo = %v, want nil", ks) + } + tx.commit(false) + } +} + +func TestBatchTxCommit(t *testing.T) { + b := newBackend(tmpPath, time.Hour, 10000) + defer cleanup(b, tmpPath) + + tx := b.batchTx + tx.Lock() + tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.Unlock() + + tx.Commit() + + // check whether put happens via db view + b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte("test")) + if bucket == nil { + t.Errorf("bucket test does not exit") + return nil + } + v := bucket.Get([]byte("foo")) + if v == nil { + t.Errorf("foo key failed to written in backend") + } + return nil + }) +} + +func TestBatchTxBatchLimitCommit(t *testing.T) { + // start backend with batch limit 1 + b := newBackend(tmpPath, time.Hour, 1) + defer cleanup(b, tmpPath) + + tx := b.batchTx + tx.Lock() + tx.UnsafeCreateBucket([]byte("test")) + tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar")) + tx.Unlock() + + // batch limit commit should have been triggered + // check whether put happens via db view + b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte("test")) + if bucket == nil { + t.Errorf("bucket test does not exit") + return nil + } + v := bucket.Get([]byte("foo")) + if v == nil { + t.Errorf("foo key failed to written in backend") + } + return nil + }) +}