From 7bb388ed5248bd1937edc0df94a6e18bb7ad3aee Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 21 May 2015 12:03:58 -0700 Subject: [PATCH 1/8] storage: initial kv api --- storage/kv.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/storage/kv.go b/storage/kv.go index 2cb1587cb..c26917073 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -16,9 +16,40 @@ var ( keyBucketName = []byte("key") ) +type KV interface { + // Range gets the keys in the range at rangeIndex. + // If rangeIndex <=0, range gets the keys at currentIndex. + // If `end` is nil, the request returns the key. + // If `end` is not nil, it gets the keys in range [key, range_end). + // Limit limits the number of keys returned. + Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) + + // Put puts the given key,value into the store. + // A put also increases the index of the store, and generates one event in the event history. + Put(key, value []byte) (index int64) + + // DeleteRange deletes the given range from the store. + // A deleteRange increases the index of the store if any key in the range exists. + // The number of key deleted will be returned. + // It also generates one event for each key delete in the event history. + // if the `end` is nil, deleteRange deletes the key. + // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end). + DeleteRange(key, end []byte) (n, index int64) + + // TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked + // until tnx ends. Only one on-going tnx is allowed. + TnxBegin() + // TnxEnd ends the on-going tnx. + TnxEnd() + TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) + TnxPut(key, value []byte) (index int64) + TnxDeleteRange(key, end []byte) (n, index int64) +} + type store struct { // read operation MUST hold read lock // write opeartion MUST hold write lock + // tnx operation MUST hold write lock sync.RWMutex b backend.Backend From 9db360387dea2471fba5a409ce55d1e2bea087b0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 22 May 2015 08:11:43 -0700 Subject: [PATCH 2/8] storage: support Range --- storage/index.go | 38 ++++++++ storage/kv.go | 132 +-------------------------- storage/kv_test.go | 24 ----- storage/kvstore.go | 195 ++++++++++++++++++++++++++++++++++++++++ storage/kvstore_test.go | 170 +++++++++++++++++++++++++++++++++++ 5 files changed, 404 insertions(+), 155 deletions(-) delete mode 100644 storage/kv_test.go create mode 100644 storage/kvstore.go create mode 100644 storage/kvstore_test.go diff --git a/storage/index.go b/storage/index.go index 37b17e7ff..8af0d17db 100644 --- a/storage/index.go +++ b/storage/index.go @@ -9,11 +9,17 @@ import ( type index interface { Get(key []byte, atIndex uint64) (index uint64, err error) + Range(key, end []byte, atIndex uint64) []kipair Put(key []byte, index uint64) Tombstone(key []byte, index uint64) error Compact(index uint64) map[uint64]struct{} } +type kipair struct { + index uint64 + key []byte +} + type treeIndex struct { sync.RWMutex tree *btree.BTree @@ -54,6 +60,38 @@ func (ti *treeIndex) Get(key []byte, atIndex uint64) (index uint64, err error) { return keyi.get(atIndex) } +func (ti *treeIndex) Range(key, end []byte, atIndex uint64) []kipair { + if end == nil { + index, err := ti.Get(key, atIndex) + if err != nil { + return nil + } + return []kipair{{key: key, index: index}} + } + + keyi := &keyIndex{key: key} + endi := &keyIndex{key: end} + pairs := make([]kipair, 0) + + ti.RLock() + defer ti.RUnlock() + + ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool { + if !item.Less(endi) { + return false + } + curKeyi := item.(*keyIndex) + index, err := curKeyi.get(atIndex) + if err != nil { + return true + } + pairs = append(pairs, kipair{index, curKeyi.key}) + return true + }) + + return pairs +} + func (ti *treeIndex) Tombstone(key []byte, index uint64) error { keyi := &keyIndex{key: key} diff --git a/storage/kv.go b/storage/kv.go index c26917073..6611107e9 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -1,20 +1,6 @@ package storage -import ( - "encoding/binary" - "log" - "sync" - "time" - - "github.com/coreos/etcd/storage/backend" - "github.com/coreos/etcd/storage/storagepb" -) - -var ( - batchLimit = 10000 - batchInterval = 100 * time.Millisecond - keyBucketName = []byte("key") -) +import "github.com/coreos/etcd/storage/storagepb" type KV interface { // Range gets the keys in the range at rangeIndex. @@ -45,119 +31,3 @@ type KV interface { TnxPut(key, value []byte) (index int64) TnxDeleteRange(key, end []byte) (n, index int64) } - -type store struct { - // read operation MUST hold read lock - // write opeartion MUST hold write lock - // tnx operation MUST hold write lock - sync.RWMutex - - b backend.Backend - kvindex index - - currentIndex uint64 - marshalBuf []byte // buffer for marshal protobuf -} - -func newStore(path string) *store { - s := &store{ - b: backend.New(path, batchInterval, batchLimit), - kvindex: newTreeIndex(), - currentIndex: 0, - marshalBuf: make([]byte, 1024*1024), - } - - tx := s.b.BatchTx() - tx.Lock() - tx.UnsafeCreateBucket(keyBucketName) - tx.Unlock() - s.b.ForceCommit() - - return s -} - -func (s *store) Put(key, value []byte) { - s.Lock() - defer s.Unlock() - - currentIndex := s.currentIndex + 1 - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, currentIndex) - - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - s.currentIndex = currentIndex - - event := storagepb.Event{ - Type: storagepb.PUT, - Kv: storagepb.KeyValue{ - Key: key, - Value: value, - }, - } - - var ( - d []byte - err error - n int - ) - - if event.Size() < len(s.marshalBuf) { - n, err = event.MarshalTo(s.marshalBuf) - d = s.marshalBuf[:n] - } else { - d, err = event.Marshal() - } - if err != nil { - log.Fatalf("storage: cannot marshal event: %v", err) - } - - tx.UnsafePut(keyBucketName, ibytes, d) - - s.kvindex.Put(key, currentIndex) -} - -func (s *store) Get(key []byte) []byte { - s.RLock() - defer s.RUnlock() - - index, err := s.kvindex.Get(key, s.currentIndex) - if err != nil { - return nil - } - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, index) - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - vs := tx.UnsafeRange(keyBucketName, ibytes, nil, 0) - // TODO: the value will be an event type. - // TODO: copy out the bytes, decode it, return the value. - return vs[0] -} - -func (s *store) Delete(key []byte) error { - s.Lock() - defer s.Unlock() - - _, err := s.kvindex.Get(key, s.currentIndex) - if err != nil { - return nil - } - - currentIndex := s.currentIndex + 1 - - ibytes := make([]byte, 8) - binary.BigEndian.PutUint64(ibytes, currentIndex) - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - // TODO: the value will be an event type. - // A tombstone is simple a "Delete" type event. - tx.UnsafePut(keyBucketName, key, []byte("tombstone")) - - return s.kvindex.Tombstone(key, currentIndex) -} diff --git a/storage/kv_test.go b/storage/kv_test.go deleted file mode 100644 index a9e2cb8d4..000000000 --- a/storage/kv_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package storage - -import ( - "crypto/rand" - "os" - "testing" -) - -func BenchmarkStorePut(b *testing.B) { - s := newStore("test") - defer os.Remove("test") - - // prepare keys - keys := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - keys[i] = make([]byte, 64) - rand.Read(keys[i]) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - s.Put(keys[i], []byte("foo")) - } -} diff --git a/storage/kvstore.go b/storage/kvstore.go new file mode 100644 index 000000000..73d154f7a --- /dev/null +++ b/storage/kvstore.go @@ -0,0 +1,195 @@ +package storage + +import ( + "bytes" + "encoding/binary" + "log" + "sync" + "time" + + "github.com/coreos/etcd/storage/backend" + "github.com/coreos/etcd/storage/storagepb" +) + +var ( + batchLimit = 10000 + batchInterval = 100 * time.Millisecond + keyBucketName = []byte("key") +) + +type store struct { + // read operation MUST hold read lock + // write opeartion MUST hold write lock + // tnx operation MUST hold write lock + sync.RWMutex + + b backend.Backend + kvindex index + + currentIndex uint64 +} + +func newStore(path string) *store { + s := &store{ + b: backend.New(path, batchInterval, batchLimit), + kvindex: newTreeIndex(), + currentIndex: 0, + } + + tx := s.b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(keyBucketName) + tx.Unlock() + s.b.ForceCommit() + + return s +} + +func (s *store) Put(key, value []byte) int64 { + s.Lock() + defer s.Unlock() + + s.put(key, value, s.currentIndex+1, 0) + s.currentIndex = s.currentIndex + 1 + return int64(s.currentIndex) +} + +func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { + s.RLock() + defer s.RUnlock() + + if rangeIndex <= 0 { + index = int64(s.currentIndex) + } else { + index = rangeIndex + } + + pairs := s.kvindex.Range(key, end, uint64(index)) + if len(pairs) == 0 { + return nil, index + } + if limit > 0 && len(pairs) > int(limit) { + pairs = pairs[:limit] + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + + for _, pair := range pairs { + ibytes := make([]byte, 8) + endbytes := make([]byte, 8) + binary.BigEndian.PutUint64(ibytes, pair.index) + binary.BigEndian.PutUint64(endbytes, pair.index+1) + + found := false + + vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0) + for _, v := range vs { + var e storagepb.Event + err := e.Unmarshal(v) + if err != nil { + log.Fatalf("storage: range cannot unmarshal event: %v", err) + } + if bytes.Equal(e.Kv.Key, pair.key) { + if e.Type == storagepb.PUT { + kvs = append(kvs, e.Kv) + } + found = true + break + } + } + + if !found { + log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index) + } + } + return kvs, index +} + +func (s *store) DeleteRange(key, end []byte) (n, index int64) { + s.Lock() + defer s.Unlock() + + index = int64(s.currentIndex) + 1 + + pairs := s.kvindex.Range(key, end, s.currentIndex) + if len(pairs) == 0 { + return 0, int64(s.currentIndex) + } + + for i, pair := range pairs { + ok := s.delete(pair.key, uint64(index), uint32(i)) + if ok { + n++ + } + } + if n != 0 { + s.currentIndex = s.currentIndex + 1 + } + return n, int64(s.currentIndex) +} + +func (s *store) put(key, value []byte, index uint64, subindex uint32) { + ibytes := make([]byte, 8+1+4) + indexToBytes(index, subindex, ibytes) + + event := storagepb.Event{ + Type: storagepb.PUT, + Kv: storagepb.KeyValue{ + Key: key, + Value: value, + }, + } + + d, err := event.Marshal() + if err != nil { + log.Fatalf("storage: cannot marshal event: %v", err) + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(keyBucketName, ibytes, d) + s.kvindex.Put(key, index) +} + +func (s *store) delete(key []byte, index uint64, subindex uint32) bool { + _, err := s.kvindex.Get(key, index) + if err != nil { + // key not exist + return false + } + + ibytes := make([]byte, 8+1+4) + indexToBytes(index, subindex, ibytes) + + event := storagepb.Event{ + Type: storagepb.DELETE, + Kv: storagepb.KeyValue{ + Key: key, + }, + } + + d, err := event.Marshal() + if err != nil { + log.Fatalf("storage: cannot marshal event: %v", err) + } + + tx := s.b.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafePut(keyBucketName, ibytes, d) + err = s.kvindex.Tombstone(key, index) + if err != nil { + log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) + } + + return true +} + +func indexToBytes(index uint64, subindex uint32, bytes []byte) { + binary.BigEndian.PutUint64(bytes, index) + bytes[8] = '_' + binary.BigEndian.PutUint32(bytes[9:], subindex) +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go new file mode 100644 index 000000000..2a70d229f --- /dev/null +++ b/storage/kvstore_test.go @@ -0,0 +1,170 @@ +package storage + +import ( + "crypto/rand" + "os" + "testing" +) + +func TestRange(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + tests := []struct { + key, end []byte + index int64 + + windex int64 + // TODO: change this to the actual kv + wN int64 + }{ + { + []byte("foo"), []byte("foo3"), 0, + 3, 3, + }, + { + []byte("foo"), []byte("foo1"), 0, + 3, 1, + }, + { + []byte("foo"), []byte("foo3"), 1, + 1, 1, + }, + { + []byte("foo"), []byte("foo3"), 2, + 2, 2, + }, + } + + for i, tt := range tests { + kvs, index := s.Range(tt.key, tt.end, 0, tt.index) + if len(kvs) != int(tt.wN) { + t.Errorf("#%d: len(kvs) = %d, want %d", i, len(kvs), tt.wN) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, wang %d", i, tt.index, tt.windex) + } + } +} + +func TestSimpleDeleteRange(t *testing.T) { + tests := []struct { + key, end []byte + + windex int64 + wN int64 + }{ + { + []byte("foo"), []byte("foo1"), + 4, 1, + }, + { + []byte("foo"), []byte("foo2"), + 4, 2, + }, + { + []byte("foo"), []byte("foo3"), + 4, 3, + }, + { + []byte("foo3"), []byte("foo8"), + 3, 0, + }, + } + + for i, tt := range tests { + s := newStore("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + n, index := s.DeleteRange(tt.key, tt.end) + if n != tt.wN { + t.Errorf("#%d: n = %d, want %d", i, n, tt.wN) + } + if index != tt.windex { + t.Errorf("#%d: index = %d, wang %d", i, index, tt.windex) + } + + os.Remove("test") + } +} + +func TestRangeInSequence(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo2"), []byte("bar2")) + + // remove foo + n, index := s.DeleteRange([]byte("foo"), nil) + if n != 1 || index != 4 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 4) + } + + // before removal foo + kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 3) + if len(kvs) != 3 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 3) + } + + // after removal foo + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 4) + if len(kvs) != 2 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) + } + + // remove again -> expect nothing + n, index = s.DeleteRange([]byte("foo"), nil) + if n != 0 || index != 4 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 4) + } + + // remove foo1 + n, index = s.DeleteRange([]byte("foo"), []byte("foo2")) + if n != 1 || index != 5 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 5) + } + + // after removal foo1 + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 5) + if len(kvs) != 1 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) + } + + // remove foo2 + n, index = s.DeleteRange([]byte("foo2"), []byte("foo3")) + if n != 1 || index != 6 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 6) + } + + // after removal foo2 + kvs, index = s.Range([]byte("foo"), []byte("foo3"), 0, 6) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } +} + +func BenchmarkStorePut(b *testing.B) { + s := newStore("test") + defer os.Remove("test") + + // prepare keys + keys := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + keys[i] = make([]byte, 64) + rand.Read(keys[i]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Put(keys[i], []byte("foo")) + } +} From 93ecf368557c75a5b4de93429dcea7c3e01a1b81 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 22 May 2015 13:35:43 -0700 Subject: [PATCH 3/8] storage: support tnx --- storage/kv.go | 1 + storage/kvstore.go | 126 +++++++++++++++++++++++++++------------- storage/kvstore_test.go | 63 ++++++++++++++++++++ 3 files changed, 151 insertions(+), 39 deletions(-) diff --git a/storage/kv.go b/storage/kv.go index 6611107e9..f5a19c90c 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -26,6 +26,7 @@ type KV interface { // until tnx ends. Only one on-going tnx is allowed. TnxBegin() // TnxEnd ends the on-going tnx. + // TODO: generate and verify tnx id for safty. TnxEnd() TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) TnxPut(key, value []byte) (index int64) diff --git a/storage/kvstore.go b/storage/kvstore.go index 73d154f7a..75bc9cd9a 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -18,15 +18,14 @@ var ( ) type store struct { - // read operation MUST hold read lock - // write opeartion MUST hold write lock - // tnx operation MUST hold write lock - sync.RWMutex + mu sync.RWMutex b backend.Backend kvindex index currentIndex uint64 + + subIndex uint32 // tracks next subIndex to put into backend } func newStore(path string) *store { @@ -46,20 +45,48 @@ func newStore(path string) *store { } func (s *store) Put(key, value []byte) int64 { - s.Lock() - defer s.Unlock() + s.TnxBegin() + s.put(key, value, s.currentIndex+1) + s.TnxEnd() - s.put(key, value, s.currentIndex+1, 0) - s.currentIndex = s.currentIndex + 1 return int64(s.currentIndex) } func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { - s.RLock() - defer s.RUnlock() + s.TnxBegin() + kvs, index = s.TnxRange(key, end, limit, rangeIndex) + s.TnxEnd() + return kvs, index +} + +func (s *store) DeleteRange(key, end []byte) (n, index int64) { + s.TnxBegin() + n = s.deleteRange(key, end, s.currentIndex+1) + s.TnxEnd() + + return n, int64(s.currentIndex) +} + +func (s *store) TnxBegin() { + s.mu.Lock() + s.subIndex = 0 +} + +func (s *store) TnxEnd() { + if s.subIndex != 0 { + s.currentIndex += 1 + } + s.subIndex = 0 + s.mu.Unlock() +} + +func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { if rangeIndex <= 0 { index = int64(s.currentIndex) + if s.subIndex > 0 { + index += 1 + } } else { index = rangeIndex } @@ -83,6 +110,7 @@ func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb binary.BigEndian.PutUint64(endbytes, pair.index+1) found := false + var kv *storagepb.KeyValue vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0) for _, v := range vs { @@ -93,46 +121,40 @@ func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb } if bytes.Equal(e.Kv.Key, pair.key) { if e.Type == storagepb.PUT { - kvs = append(kvs, e.Kv) + kv = &e.Kv + } else { + kv = nil } found = true - break } } if !found { log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index) } + if kv != nil { + kvs = append(kvs, *kv) + } } return kvs, index } -func (s *store) DeleteRange(key, end []byte) (n, index int64) { - s.Lock() - defer s.Unlock() - - index = int64(s.currentIndex) + 1 - - pairs := s.kvindex.Range(key, end, s.currentIndex) - if len(pairs) == 0 { - return 0, int64(s.currentIndex) - } - - for i, pair := range pairs { - ok := s.delete(pair.key, uint64(index), uint32(i)) - if ok { - n++ - } - } - if n != 0 { - s.currentIndex = s.currentIndex + 1 - } - return n, int64(s.currentIndex) +func (s *store) TnxPut(key, value []byte) int64 { + s.put(key, value, s.currentIndex+1) + return int64(s.currentIndex + 1) } -func (s *store) put(key, value []byte, index uint64, subindex uint32) { +func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) { + n = s.deleteRange(key, end, s.currentIndex+1) + if n != 0 || s.subIndex != 0 { + index = int64(s.currentIndex + 1) + } + return n, index +} + +func (s *store) put(key, value []byte, index uint64) { ibytes := make([]byte, 8+1+4) - indexToBytes(index, subindex, ibytes) + indexToBytes(index, s.subIndex, ibytes) event := storagepb.Event{ Type: storagepb.PUT, @@ -152,17 +174,43 @@ func (s *store) put(key, value []byte, index uint64, subindex uint32) { defer tx.Unlock() tx.UnsafePut(keyBucketName, ibytes, d) s.kvindex.Put(key, index) + s.subIndex += 1 } -func (s *store) delete(key []byte, index uint64, subindex uint32) bool { - _, err := s.kvindex.Get(key, index) +func (s *store) deleteRange(key, end []byte, index uint64) int64 { + var n int64 + rindex := index + if s.subIndex > 0 { + rindex += 1 + } + pairs := s.kvindex.Range(key, end, rindex) + + if len(pairs) == 0 { + return 0 + } + + for _, pair := range pairs { + ok := s.delete(pair.key, index) + if ok { + n++ + } + } + return n +} + +func (s *store) delete(key []byte, index uint64) bool { + gindex := index + if s.subIndex > 0 { + gindex += 1 + } + _, err := s.kvindex.Get(key, gindex) if err != nil { // key not exist return false } ibytes := make([]byte, 8+1+4) - indexToBytes(index, subindex, ibytes) + indexToBytes(index, s.subIndex, ibytes) event := storagepb.Event{ Type: storagepb.DELETE, @@ -184,7 +232,7 @@ func (s *store) delete(key []byte, index uint64, subindex uint32) bool { if err != nil { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) } - + s.subIndex += 1 return true } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 2a70d229f..ebacf2614 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -152,6 +152,69 @@ func TestRangeInSequence(t *testing.T) { } } +func TestOneTnx(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.TnxBegin() + for i := 0; i < 3; i++ { + s.TnxPut([]byte("foo"), []byte("bar")) + s.TnxPut([]byte("foo1"), []byte("bar1")) + s.TnxPut([]byte("foo2"), []byte("bar2")) + + // remove foo + n, index := s.TnxDeleteRange([]byte("foo"), nil) + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + kvs, index := s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + if len(kvs) != 2 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) + } + + // remove again -> expect nothing + n, index = s.TnxDeleteRange([]byte("foo"), nil) + if n != 0 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1) + } + + // remove foo1 + n, index = s.TnxDeleteRange([]byte("foo"), []byte("foo2")) + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + // after removal foo1 + kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + if len(kvs) != 1 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) + } + + // remove foo2 + n, index = s.TnxDeleteRange([]byte("foo2"), []byte("foo3")) + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + // after removal foo2 + kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } + } + s.TnxEnd() + + // After tnx + kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } + if index != 1 { + t.Fatalf("index = %d, want %d", index, 1) + } +} + func BenchmarkStorePut(b *testing.B) { s := newStore("test") defer os.Remove("test") From fb12a4e412fee79c8c59277f7e49d5cca5ed901f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 22 May 2015 13:58:26 -0700 Subject: [PATCH 4/8] storage: fix a deadlock in batch tx --- storage/backend/batch_tx.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/storage/backend/batch_tx.go b/storage/backend/batch_tx.go index 5df76f22e..ca8cc4532 100644 --- a/storage/backend/batch_tx.go +++ b/storage/backend/batch_tx.go @@ -43,7 +43,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.Commit() + t.commit() t.pending = 0 } } @@ -85,7 +85,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { } t.pending++ if t.pending > t.backend.batchLimit { - t.Commit() + t.commit() t.pending = 0 } } @@ -94,7 +94,10 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { func (t *batchTx) Commit() { t.Lock() defer t.Unlock() + t.commit() +} +func (t *batchTx) commit() { var err error // commit the last tx if t.tx != nil { From 9c1aec68779bde475d9399163d9ff339591658bb Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 May 2015 09:58:21 -0700 Subject: [PATCH 5/8] storage: add rangeKeys func --- storage/kvstore.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index 75bc9cd9a..b3672d5da 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -54,7 +54,7 @@ func (s *store) Put(key, value []byte) int64 { func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { s.TnxBegin() - kvs, index = s.TnxRange(key, end, limit, rangeIndex) + kvs, index = s.rangeKeys(key, end, limit, rangeIndex) s.TnxEnd() return kvs, index @@ -82,6 +82,24 @@ func (s *store) TnxEnd() { } func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { + return s.rangeKeys(key, end, limit, rangeIndex) +} + +func (s *store) TnxPut(key, value []byte) int64 { + s.put(key, value, s.currentIndex+1) + return int64(s.currentIndex + 1) +} + +func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) { + n = s.deleteRange(key, end, s.currentIndex+1) + if n != 0 || s.subIndex != 0 { + index = int64(s.currentIndex + 1) + } + return n, index +} + +// range is a keyword in Go, add Keys suffix. +func (s *store) rangeKeys(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { if rangeIndex <= 0 { index = int64(s.currentIndex) if s.subIndex > 0 { @@ -139,19 +157,6 @@ func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storag return kvs, index } -func (s *store) TnxPut(key, value []byte) int64 { - s.put(key, value, s.currentIndex+1) - return int64(s.currentIndex + 1) -} - -func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) { - n = s.deleteRange(key, end, s.currentIndex+1) - if n != 0 || s.subIndex != 0 { - index = int64(s.currentIndex + 1) - } - return n, index -} - func (s *store) put(key, value []byte, index uint64) { ibytes := make([]byte, 8+1+4) indexToBytes(index, s.subIndex, ibytes) From cbb8b9bb08fbaa01554f41fe88a39064590f67a9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 May 2015 10:35:51 -0700 Subject: [PATCH 6/8] stroage: add tnx id --- storage/kvstore.go | 66 +++++++++++++++++++++++++++++++---------- storage/kvstore_test.go | 48 ++++++++++++++++++++++-------- 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index b3672d5da..697c841bb 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -3,7 +3,9 @@ package storage import ( "bytes" "encoding/binary" + "errors" "log" + "math/rand" "sync" "time" @@ -15,6 +17,8 @@ var ( batchLimit = 10000 batchInterval = 100 * time.Millisecond keyBucketName = []byte("key") + + ErrTnxIDMismatch = errors.New("storage: tnx id mismatch") ) type store struct { @@ -24,8 +28,10 @@ type store struct { kvindex index currentIndex uint64 + subIndex uint32 // tracks next subIndex to put into backend - subIndex uint32 // tracks next subIndex to put into backend + tmu sync.Mutex // protect the tnxID field + tnxID int64 // tracks the current tnxID to verify tnx operations } func newStore(path string) *store { @@ -45,57 +51,87 @@ func newStore(path string) *store { } func (s *store) Put(key, value []byte) int64 { - s.TnxBegin() + id := s.TnxBegin() s.put(key, value, s.currentIndex+1) - s.TnxEnd() + s.TnxEnd(id) return int64(s.currentIndex) } func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { - s.TnxBegin() + id := s.TnxBegin() kvs, index = s.rangeKeys(key, end, limit, rangeIndex) - s.TnxEnd() + s.TnxEnd(id) return kvs, index } func (s *store) DeleteRange(key, end []byte) (n, index int64) { - s.TnxBegin() + id := s.TnxBegin() n = s.deleteRange(key, end, s.currentIndex+1) - s.TnxEnd() + s.TnxEnd(id) return n, int64(s.currentIndex) } -func (s *store) TnxBegin() { +func (s *store) TnxBegin() int64 { s.mu.Lock() s.subIndex = 0 + + s.tmu.Lock() + defer s.tmu.Unlock() + s.tnxID = rand.Int63() + return s.tnxID } -func (s *store) TnxEnd() { +func (s *store) TnxEnd(tnxID int64) error { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return ErrTnxIDMismatch + } + if s.subIndex != 0 { s.currentIndex += 1 } s.subIndex = 0 s.mu.Unlock() + return nil } -func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { - return s.rangeKeys(key, end, limit, rangeIndex) +func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return nil, 0, ErrTnxIDMismatch + } + kvs, index = s.rangeKeys(key, end, limit, rangeIndex) + return kvs, index, nil } -func (s *store) TnxPut(key, value []byte) int64 { +func (s *store) TnxPut(tnxID int64, key, value []byte) (index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return 0, ErrTnxIDMismatch + } + s.put(key, value, s.currentIndex+1) - return int64(s.currentIndex + 1) + return int64(s.currentIndex + 1), nil } -func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) { +func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return 0, 0, ErrTnxIDMismatch + } + n = s.deleteRange(key, end, s.currentIndex+1) if n != 0 || s.subIndex != 0 { index = int64(s.currentIndex + 1) } - return n, index + return n, index, nil } // range is a keyword in Go, add Keys suffix. diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index ebacf2614..5aacd165f 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -156,54 +156,78 @@ func TestOneTnx(t *testing.T) { s := newStore("test") defer os.Remove("test") - s.TnxBegin() + id := s.TnxBegin() for i := 0; i < 3; i++ { - s.TnxPut([]byte("foo"), []byte("bar")) - s.TnxPut([]byte("foo1"), []byte("bar1")) - s.TnxPut([]byte("foo2"), []byte("bar2")) + s.TnxPut(id, []byte("foo"), []byte("bar")) + s.TnxPut(id, []byte("foo1"), []byte("bar1")) + s.TnxPut(id, []byte("foo2"), []byte("bar2")) // remove foo - n, index := s.TnxDeleteRange([]byte("foo"), nil) + n, index, err := s.TnxDeleteRange(id, []byte("foo"), nil) + if err != nil { + t.Fatal(err) + } if n != 1 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) } - kvs, index := s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + kvs, index, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } if len(kvs) != 2 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) } // remove again -> expect nothing - n, index = s.TnxDeleteRange([]byte("foo"), nil) + n, index, err = s.TnxDeleteRange(id, []byte("foo"), nil) + if err != nil { + t.Fatal(err) + } if n != 0 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1) } // remove foo1 - n, index = s.TnxDeleteRange([]byte("foo"), []byte("foo2")) + n, index, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2")) + if err != nil { + t.Fatal(err) + } if n != 1 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) } // after removal foo1 - kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } if len(kvs) != 1 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) } // remove foo2 - n, index = s.TnxDeleteRange([]byte("foo2"), []byte("foo3")) + n, index, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3")) + if err != nil { + t.Fatal(err) + } if n != 1 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) } // after removal foo2 - kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } if len(kvs) != 0 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) } } - s.TnxEnd() + err := s.TnxEnd(id) + if err != nil { + t.Fatal(err) + } // After tnx kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1) From 69d02410cfa989f7be613e60679e9c357d752049 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 May 2015 14:24:23 -0700 Subject: [PATCH 7/8] stroage: adopt KV interface --- storage/kv.go | 10 +++++----- storage/kvstore.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/storage/kv.go b/storage/kv.go index f5a19c90c..566cf71c4 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -24,11 +24,11 @@ type KV interface { // TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked // until tnx ends. Only one on-going tnx is allowed. - TnxBegin() + TnxBegin() int64 // TnxEnd ends the on-going tnx. // TODO: generate and verify tnx id for safty. - TnxEnd() - TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) - TnxPut(key, value []byte) (index int64) - TnxDeleteRange(key, end []byte) (n, index int64) + TnxEnd(tnxID int64) error + TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) + TnxPut(tnxID int64, key, value []byte) (index int64, err error) + TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) } diff --git a/storage/kvstore.go b/storage/kvstore.go index 697c841bb..d4237b293 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -34,7 +34,7 @@ type store struct { tnxID int64 // tracks the current tnxID to verify tnx operations } -func newStore(path string) *store { +func newStore(path string) KV { s := &store{ b: backend.New(path, batchInterval, batchLimit), kvindex: newTreeIndex(), From 6c207b9277f874a7f2c839dc19145afc1ff39797 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 May 2015 14:46:59 -0700 Subject: [PATCH 8/8] stroage: kill todo --- storage/kv.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storage/kv.go b/storage/kv.go index 566cf71c4..d99934be3 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -24,9 +24,10 @@ type KV interface { // TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked // until tnx ends. Only one on-going tnx is allowed. + // TnxBegin returns an int64 tnx ID. + // All tnx prefixed operations with same tnx ID will be done with the same index. TnxBegin() int64 - // TnxEnd ends the on-going tnx. - // TODO: generate and verify tnx id for safty. + // TnxEnd ends the on-going tnx with tnx ID. If the on-going tnx ID is not matched, error is returned. TnxEnd(tnxID int64) error TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) TnxPut(tnxID int64, key, value []byte) (index int64, err error)