From 4b0d9f69c76e94da1e6e13ef02f7a57e76422a90 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 14 May 2015 20:25:52 -0700 Subject: [PATCH] storage: add a simple backend and kv example --- storage/backend/backend.go | 107 ++++++++++++++++++++++++++ storage/backend/backend_bench_test.go | 36 +++++++++ storage/backend/backend_test.go | 61 +++++++++++++++ storage/backend/batch_tx.go | 98 +++++++++++++++++++++++ storage/kv.go | 66 ++++++++++++++++ storage/kv_test.go | 24 ++++++ 6 files changed, 392 insertions(+) create mode 100644 storage/backend/backend.go create mode 100644 storage/backend/backend_bench_test.go create mode 100644 storage/backend/backend_test.go create mode 100644 storage/backend/batch_tx.go create mode 100644 storage/kv.go create mode 100644 storage/kv_test.go diff --git a/storage/backend/backend.go b/storage/backend/backend.go new file mode 100644 index 000000000..d040a563e --- /dev/null +++ b/storage/backend/backend.go @@ -0,0 +1,107 @@ +package backend + +import ( + "log" + "time" + + "github.com/boltdb/bolt" +) + +type Backend interface { + BatchTx() BatchTx + ForceCommit() + Close() error +} + +type backend struct { + db *bolt.DB + + batchInterval time.Duration + batchLimit int + batchTx *batchTx + + stopc chan struct{} + startc chan struct{} + donec chan struct{} +} + +func New(path string, d time.Duration, limit int) Backend { + db, err := bolt.Open(path, 0600, nil) + if err != nil { + log.Panicf("backend: cannot open database at %s (%v)", path, err) + } + + b := &backend{ + db: db, + + batchInterval: d, + batchLimit: limit, + batchTx: &batchTx{}, + + stopc: make(chan struct{}), + startc: make(chan struct{}), + donec: make(chan struct{}), + } + b.batchTx.backend = b + go b.run() + <-b.startc + return b +} + +// BatchTnx returns the current batch tx in coalescer. The tx can be used for read and +// write operations. The write result can be retrieved within the same tx immediately. +// The write result is isolated with other txs until the current one get committed. +func (b *backend) BatchTx() BatchTx { + return b.batchTx +} + +// force commit the current batching tx. +func (b *backend) ForceCommit() { + b.batchTx.Lock() + b.commitAndBegin() + b.batchTx.Unlock() +} + +func (b *backend) run() { + defer close(b.donec) + + b.batchTx.Lock() + b.commitAndBegin() + b.batchTx.Unlock() + b.startc <- struct{}{} + + for { + select { + case <-time.After(b.batchInterval): + case <-b.stopc: + return + } + b.batchTx.Lock() + b.commitAndBegin() + b.batchTx.Unlock() + } +} + +func (b *backend) Close() error { + close(b.stopc) + <-b.donec + return b.db.Close() +} + +// commitAndBegin commits a previous tx and begins a new writable one. +func (b *backend) commitAndBegin() { + var err error + // commit the last batchTx + if b.batchTx.tx != nil { + err = b.batchTx.tx.Commit() + if err != nil { + log.Fatalf("storage: cannot commit tx (%s)", err) + } + } + + // begin a new tx + b.batchTx.tx, err = b.db.Begin(true) + if err != nil { + log.Fatalf("storage: cannot begin tx (%s)", err) + } +} diff --git a/storage/backend/backend_bench_test.go b/storage/backend/backend_bench_test.go new file mode 100644 index 000000000..ab4130959 --- /dev/null +++ b/storage/backend/backend_bench_test.go @@ -0,0 +1,36 @@ +package backend + +import ( + "crypto/rand" + "os" + "testing" + "time" +) + +func BenchmarkBackendPut(b *testing.B) { + backend := New("test", 100*time.Millisecond, 10000) + defer backend.Close() + defer os.Remove("test") + + // prepare keys + keys := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + keys[i] = make([]byte, 64) + rand.Read(keys[i]) + } + value := make([]byte, 128) + rand.Read(value) + + batchTx := backend.BatchTx() + + batchTx.Lock() + batchTx.UnsafeCreateBucket([]byte("test")) + batchTx.Unlock() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + batchTx.Lock() + batchTx.UnsafePut([]byte("test"), keys[i], value) + batchTx.Unlock() + } +} diff --git a/storage/backend/backend_test.go b/storage/backend/backend_test.go new file mode 100644 index 000000000..3d4983ec1 --- /dev/null +++ b/storage/backend/backend_test.go @@ -0,0 +1,61 @@ +package backend + +import ( + "os" + "reflect" + "testing" + "time" +) + +func TestBackendPut(t *testing.T) { + backend := New("test", 10*time.Second, 10000) + defer backend.Close() + defer os.Remove("test") + + v := []byte("foo") + + batchTx := backend.BatchTx() + batchTx.Lock() + + batchTx.UnsafeCreateBucket([]byte("test")) + + batchTx.UnsafePut([]byte("test"), []byte("foo"), v) + gv := batchTx.UnsafeRange([]byte("test"), v, nil, -1) + if !reflect.DeepEqual(gv[0], v) { + t.Errorf("v = %s, want %s", string(gv[0]), string(v)) + } + + batchTx.Unlock() +} + +func TestBackendForceCommit(t *testing.T) { + backend := New("test", 10*time.Second, 10000) + defer backend.Close() + defer os.Remove("test") + + v := []byte("foo") + batchTx := backend.BatchTx() + + batchTx.Lock() + + batchTx.UnsafeCreateBucket([]byte("test")) + batchTx.UnsafePut([]byte("test"), []byte("foo"), v) + + batchTx.Unlock() + + // expect to see nothing that the batch tx created + tx := backend.ReadTnx() + gbucket := tx.Bucket([]byte("test")) + if gbucket != nil { + t.Errorf("readtx.bu = %p, want nil", gbucket) + } + tx.Commit() + + // commit batch tx + backend.ForceCommit() + tx = backend.ReadTnx() + gbucket = tx.Bucket([]byte("test")) + if gbucket == nil { + t.Errorf("readtx.bu = nil, want not nil") + } +} diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go new file mode 100644 index 000000000..2bbe16064 --- /dev/null +++ b/storage/backend/batch_tx.go @@ -0,0 +1,98 @@ +package backend + +import ( + "bytes" + "log" + "sync" + + "github.com/boltdb/bolt" +) + +type BatchTx interface { + Lock() + Unlock() + UnsafeCreateBucket(name []byte) + UnsafePut(bucketName []byte, key []byte, value []byte) + UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte + UnsafeDelete(bucketName []byte, key []byte) +} + +type batchTx struct { + mu sync.Mutex + tx *bolt.Tx + backend *backend + pending int +} + +func (t *batchTx) Lock() { + t.mu.Lock() +} + +func (t *batchTx) Unlock() { + t.mu.Unlock() +} + +func (t *batchTx) UnsafeCreateBucket(name []byte) { + _, err := t.tx.CreateBucket(name) + if err != nil && err != bolt.ErrBucketExists { + log.Fatalf("storage: cannot create bucket %s (%v)", string(name), err) + } +} + +// before calling unsafePut, the caller MUST hold the lock on tnx. +func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { + bucket := t.tx.Bucket(bucketName) + if bucket == nil { + log.Fatalf("storage: bucket %s does not exist", string(bucketName)) + } + if err := bucket.Put(key, value); err != nil { + log.Fatalf("storage: cannot put key into bucket (%v)", err) + } + t.pending++ + if t.pending > t.backend.batchLimit { + t.backend.commitAndBegin() + t.pending = 0 + } +} + +// before calling unsafeRange, the caller MUST hold the lock on tnx. +func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) [][]byte { + bucket := t.tx.Bucket(bucketName) + if bucket == nil { + log.Fatalf("storage: bucket %s does not exist", string(bucketName)) + } + + var vs [][]byte + + if len(endKey) == 0 { + if v := bucket.Get(key); v == nil { + return vs + } else { + return append(vs, v) + } + } + + c := bucket.Cursor() + for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() { + vs = append(vs, cv) + } + + return vs +} + +// before calling unsafeDelete, the caller MUST hold the lock on tnx. +func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { + bucket := t.tx.Bucket(bucketName) + if bucket == nil { + log.Fatalf("storage: bucket %s does not exist", string(bucketName)) + } + err := bucket.Delete(key) + if err != nil { + log.Fatalf("storage: cannot delete key from bucket (%v)", err) + } + t.pending++ + if t.pending > t.backend.batchLimit { + t.backend.commitAndBegin() + t.pending = 0 + } +} diff --git a/storage/kv.go b/storage/kv.go new file mode 100644 index 000000000..cadcbebef --- /dev/null +++ b/storage/kv.go @@ -0,0 +1,66 @@ +package storage + +import ( + "encoding/binary" + "time" + + "github.com/coreos/etcd/storage/backend" +) + +var ( + batchLimit = 10000 + batchInterval = 100 * time.Millisecond + keyBucketName = []byte("key") +) + +type store struct { + b backend.Backend + kvindex index + + now uint64 // current index of the store +} + +func newStore(path string) *store { + s := &store{ + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + now: 0, + } + + tx := s.b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(keyBucketName) + tx.Unlock() + s.b.ForceCommit() + + return s +} + +func (s *store) Put(key, value []byte) { + now := s.now + 1 + + s.kvindex.Put(key, now) + ibytes := make([]byte, 8) + binary.BigEndian.PutUint64(ibytes, now) + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + s.now = now + tx.UnsafePut(keyBucketName, ibytes, value) +} + +func (s *store) Get(key []byte) []byte { + index, err := s.kvindex.Get(key, s.now) + if err != nil { + return nil + } + + ibytes := make([]byte, 8) + binary.BigEndian.PutUint64(ibytes, index) + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0) + return vs[0] +} diff --git a/storage/kv_test.go b/storage/kv_test.go new file mode 100644 index 000000000..a9e2cb8d4 --- /dev/null +++ b/storage/kv_test.go @@ -0,0 +1,24 @@ +package storage + +import ( + "crypto/rand" + "os" + "testing" +) + +func BenchmarkStorePut(b *testing.B) { + s := newStore("test") + defer os.Remove("test") + + // prepare keys + keys := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + keys[i] = make([]byte, 64) + rand.Read(keys[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Put(keys[i], []byte("foo")) + } +}