From 9db360387dea2471fba5a409ce55d1e2bea087b0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 22 May 2015 08:11:43 -0700 Subject: [PATCH] storage: support Range --- storage/index.go | 38 ++++++++ storage/kv.go | 132 +-------------------------- storage/kv_test.go | 24 ----- storage/kvstore.go | 195 ++++++++++++++++++++++++++++++++++++++++ storage/kvstore_test.go | 170 +++++++++++++++++++++++++++++++++++ 5 files changed, 404 insertions(+), 155 deletions(-) delete mode 100644 storage/kv_test.go create mode 100644 storage/kvstore.go create mode 100644 storage/kvstore_test.go diff --git a/storage/index.go b/storage/index.go index 37b17e7ff..8af0d17db 100644 --- a/storage/index.go +++ b/storage/index.go @@ -9,11 +9,17 @@ import ( type index interface { Get(key []byte, atIndex uint64) (index uint64, err error) + Range(key, end []byte, atIndex uint64) []kipair Put(key []byte, index uint64) Tombstone(key []byte, index uint64) error Compact(index uint64) map[uint64]struct{} } +type kipair struct { + index uint64 + key []byte +} + type treeIndex struct { sync.RWMutex tree *btree.BTree @@ -54,6 +60,38 @@ func (ti *treeIndex) Get(key []byte, atIndex uint64) (index uint64, err error) { return keyi.get(atIndex) } +func (ti *treeIndex) Range(key, end []byte, atIndex uint64) []kipair { + if end == nil { + index, err := ti.Get(key, atIndex) + if err != nil { + return nil + } + return []kipair{{key: key, index: index}} + } + + keyi := &keyIndex{key: key} + endi := &keyIndex{key: end} + pairs := make([]kipair, 0) + + ti.RLock() + defer ti.RUnlock() + + ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool { + if !item.Less(endi) { + return false + } + curKeyi := item.(*keyIndex) + index, err := curKeyi.get(atIndex) + if err != nil { + return true + } + pairs = append(pairs, kipair{index, curKeyi.key}) + return true + }) + + return pairs +} + func (ti *treeIndex) Tombstone(key []byte, index uint64) error { keyi := &keyIndex{key: key} diff --git a/storage/kv.go b/storage/kv.go index c26917073..6611107e9 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -1,20 +1,6 @@ package storage -import ( - "encoding/binary" - "log" - "sync" - "time" - - "github.com/coreos/etcd/storage/backend" - "github.com/coreos/etcd/storage/storagepb" -) - -var ( - batchLimit = 10000 - batchInterval = 100 * time.Millisecond - keyBucketName = []byte("key") -) +import "github.com/coreos/etcd/storage/storagepb" type KV interface { // Range gets the keys in the range at rangeIndex. @@ -45,119 +31,3 @@ type KV interface { TnxPut(key, value []byte) (index int64) TnxDeleteRange(key, end []byte) (n, index int64) } - -type store struct { - // read operation MUST hold read lock - // write opeartion MUST hold write lock - // tnx operation MUST hold write lock - sync.RWMutex - - b backend.Backend - kvindex index - - currentIndex uint64 - marshalBuf []byte // buffer for marshal protobuf -} - -func newStore(path string) *store { - s := &store{ - b: backend.New(path, batchInterval, batchLimit), - kvindex: newTreeIndex(), - currentIndex: 0, - marshalBuf: make([]byte, 1024*1024), - } - - tx := s.b.BatchTx() - tx.Lock() - tx.UnsafeCreateBucket(keyBucketName) - tx.Unlock() - s.b.ForceCommit() - - return s -} - -func (s *store) Put(key, value []byte) { - s.Lock() - defer s.Unlock() - - currentIndex := s.currentIndex + 1 - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, currentIndex) - - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - s.currentIndex = currentIndex - - event := storagepb.Event{ - Type: storagepb.PUT, - Kv: storagepb.KeyValue{ - Key: key, - Value: value, - }, - } - - var ( - d []byte - err error - n int - ) - - if event.Size() < len(s.marshalBuf) { - n, err = event.MarshalTo(s.marshalBuf) - d = s.marshalBuf[:n] - } else { - d, err = event.Marshal() - } - if err != nil { - log.Fatalf("storage: cannot marshal event: %v", err) - } - - tx.UnsafePut(keyBucketName, ibytes, d) - - s.kvindex.Put(key, currentIndex) -} - -func (s *store) Get(key []byte) []byte { - s.RLock() - defer s.RUnlock() - - index, err := s.kvindex.Get(key, s.currentIndex) - if err != nil { - return nil - } - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, index) - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0) - // TODO: the value will be an event type. - // TODO: copy out the bytes, decode it, return the value. - return vs[0] -} - -func (s *store) Delete(key []byte) error { - s.Lock() - defer s.Unlock() - - _, err := s.kvindex.Get(key, s.currentIndex) - if err != nil { - return nil - } - - currentIndex := s.currentIndex + 1 - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, currentIndex) - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - // TODO: the value will be an event type. - // A tombstone is simple a "Delete" type event. - tx.UnsafePut(keyBucketName, key, []byte("tombstone")) - - return s.kvindex.Tombstone(key, currentIndex) -} diff --git a/storage/kv_test.go b/storage/kv_test.go deleted file mode 100644 index a9e2cb8d4..000000000 --- a/storage/kv_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package storage - -import ( - "crypto/rand" - "os" - "testing" -) - -func BenchmarkStorePut(b *testing.B) { - s := newStore("test") - defer os.Remove("test") - - // prepare keys - keys := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - keys[i] = make([]byte, 64) - rand.Read(keys[i]) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - s.Put(keys[i], []byte("foo")) - } -} diff --git a/storage/kvstore.go b/storage/kvstore.go new file mode 100644 index 000000000..73d154f7a --- /dev/null +++ b/storage/kvstore.go @@ -0,0 +1,195 @@ +package storage + +import ( + "bytes" + "encoding/binary" + "log" + "sync" + "time" + + "github.com/coreos/etcd/storage/backend" + "github.com/coreos/etcd/storage/storagepb" +) + +var ( + batchLimit = 10000 + batchInterval = 100 * time.Millisecond + keyBucketName = []byte("key") +) + +type store struct { + // read operation MUST hold read lock + // write opeartion MUST hold write lock + // tnx operation MUST hold write lock + sync.RWMutex + + b backend.Backend + kvindex index + + currentIndex uint64 +} + +func newStore(path string) *store { + s := &store{ + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + currentIndex: 0, + } + + tx := s.b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(keyBucketName) + tx.Unlock() + s.b.ForceCommit() + + return s +} + +func (s *store) Put(key, value []byte) int64 { + s.Lock() + defer s.Unlock() + + s.put(key, value, s.currentIndex+1, 0) + s.currentIndex = s.currentIndex + 1 + return int64(s.currentIndex) +} + +func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { + s.RLock() + defer s.RUnlock() + + if rangeIndex <= 0 { + index = int64(s.currentIndex) + } else { + index = rangeIndex + } + + pairs := s.kvindex.Range(key, end, uint64(index)) + if len(pairs) == 0 { + return nil, index + } + if limit > 0 && len(pairs) > int(limit) { + pairs = pairs[:limit] + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + + for _, pair := range pairs { + ibytes := make([]byte, 8) + endbytes := make([]byte, 8) + binary.BigEndian.PutUint64(ibytes, pair.index) + binary.BigEndian.PutUint64(endbytes, pair.index+1) + + found := false + + vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0) + for _, v := range vs { + var e storagepb.Event + err := e.Unmarshal(v) + if err != nil { + log.Fatalf("storage: range cannot unmarshal event: %v", err) + } + if bytes.Equal(e.Kv.Key, pair.key) { + if e.Type == storagepb.PUT { + kvs = append(kvs, e.Kv) + } + found = true + break + } + } + + if !found { + log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index) + } + } + return kvs, index +} + +func (s *store) DeleteRange(key, end []byte) (n, index int64) { + s.Lock() + defer s.Unlock() + + index = int64(s.currentIndex) + 1 + + pairs := s.kvindex.Range(key, end, s.currentIndex) + if len(pairs) == 0 { + return 0, int64(s.currentIndex) + } + + for i, pair := range pairs { + ok := s.delete(pair.key, uint64(index), uint32(i)) + if ok { + n++ + } + } + if n != 0 { + s.currentIndex = s.currentIndex + 1 + } + return n, int64(s.currentIndex) +} + +func (s *store) put(key, value []byte, index uint64, subindex uint32) { + ibytes := make([]byte, 8+1+4) + indexToBytes(index, subindex, ibytes) + + event := storagepb.Event{ + Type: storagepb.PUT, + Kv: storagepb.KeyValue{ + Key: key, + Value: value, + }, + } + + d, err := event.Marshal() + if err != nil { + log.Fatalf("storage: cannot marshal event: %v", err) + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(keyBucketName, ibytes, d) + s.kvindex.Put(key, index) +} + +func (s *store) delete(key []byte, index uint64, subindex uint32) bool { + _, err := s.kvindex.Get(key, index) + if err != nil { + // key not exist + return false + } + + ibytes := make([]byte, 8+1+4) + indexToBytes(index, subindex, ibytes) + + event := storagepb.Event{ + Type: storagepb.DELETE, + Kv: storagepb.KeyValue{ + Key: key, + }, + } + + d, err := event.Marshal() + if err != nil { + log.Fatalf("storage: cannot marshal event: %v", err) + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(keyBucketName, ibytes, d) + err = s.kvindex.Tombstone(key, index) + if err != nil { + log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) + } + + return true +} + +func indexToBytes(index uint64, subindex uint32, bytes []byte) { + binary.BigEndian.PutUint64(bytes, index) + bytes[8] = '_' + binary.BigEndian.PutUint32(bytes[9:], subindex) +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go new file mode 100644 index 000000000..2a70d229f --- /dev/null +++ b/storage/kvstore_test.go @@ -0,0 +1,170 @@ +package storage + +import ( + "crypto/rand" + "os" + "testing" +) + +func TestRange(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + tests := []struct { + key, end []byte + index int64 + + windex int64 + // TODO: change this to the actual kv + wN int64 + }{ + { + []byte("foo"), []byte("foo3"), 0, + 3, 3, + }, + { + []byte("foo"), []byte("foo1"), 0, + 3, 1, + }, + { + []byte("foo"), []byte("foo3"), 1, + 1, 1, + }, + { + []byte("foo"), []byte("foo3"), 2, + 2, 2, + }, + } + + for i, tt := range tests { + kvs, index := s.Range(tt.key, tt.end, 0, tt.index) + if len(kvs) != int(tt.wN) { + t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, wang %d", i, tt.index, tt.windex) + } + } +} + +func TestSimpleDeleteRange(t *testing.T) { + tests := []struct { + key, end []byte + + windex int64 + wN int64 + }{ + { + []byte("foo"), []byte("foo1"), + 4, 1, + }, + { + []byte("foo"), []byte("foo2"), + 4, 2, + }, + { + []byte("foo"), []byte("foo3"), + 4, 3, + }, + { + []byte("foo3"), []byte("foo8"), + 3, 0, + }, + } + + for i, tt := range tests { + s := newStore("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + n, index := s.DeleteRange(tt.key, tt.end) + if n != tt.wN { + t.Errorf("#%d: n = %d, want %d", i, n, tt.wN) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, wang %d", i, index, tt.windex) + } + + os.Remove("test") + } +} + +func TestRangeInSequence(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + // remove foo + n, index := s.DeleteRange([]byte("foo"), nil) + if n != 1 || index != 4 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 4) + } + + // before removal foo + kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 3) + if len(kvs) != 3 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3) + } + + // after removal foo + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 4) + if len(kvs) != 2 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) + } + + // remove again -> expect nothing + n, index = s.DeleteRange([]byte("foo"), nil) + if n != 0 || index != 4 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 4) + } + + // remove foo1 + n, index = s.DeleteRange([]byte("foo"), []byte("foo2")) + if n != 1 || index != 5 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 5) + } + + // after removal foo1 + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 5) + if len(kvs) != 1 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) + } + + // remove foo2 + n, index = s.DeleteRange([]byte("foo2"), []byte("foo3")) + if n != 1 || index != 6 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 6) + } + + // after removal foo2 + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 6) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } +} + +func BenchmarkStorePut(b *testing.B) { + s := newStore("test") + defer os.Remove("test") + + // prepare keys + keys := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + keys[i] = make([]byte, 64) + rand.Read(keys[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Put(keys[i], []byte("foo")) + } +}