From 93ecf368557c75a5b4de93429dcea7c3e01a1b81 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 22 May 2015 13:35:43 -0700 Subject: [PATCH] storage: support tnx --- storage/kv.go | 1 + storage/kvstore.go | 126 +++++++++++++++++++++++++++------------- storage/kvstore_test.go | 63 ++++++++++++++++++++ 3 files changed, 151 insertions(+), 39 deletions(-) diff --git a/storage/kv.go b/storage/kv.go index 6611107e9..f5a19c90c 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -26,6 +26,7 @@ type KV interface { // until tnx ends. Only one on-going tnx is allowed. TnxBegin() // TnxEnd ends the on-going tnx. + // TODO: generate and verify tnx id for safty. TnxEnd() TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) TnxPut(key, value []byte) (index int64) diff --git a/storage/kvstore.go b/storage/kvstore.go index 73d154f7a..75bc9cd9a 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -18,15 +18,14 @@ var ( ) type store struct { - // read operation MUST hold read lock - // write opeartion MUST hold write lock - // tnx operation MUST hold write lock - sync.RWMutex + mu sync.RWMutex b backend.Backend kvindex index currentIndex uint64 + + subIndex uint32 // tracks next subIndex to put into backend } func newStore(path string) *store { @@ -46,20 +45,48 @@ func newStore(path string) *store { } func (s *store) Put(key, value []byte) int64 { - s.Lock() - defer s.Unlock() + s.TnxBegin() + s.put(key, value, s.currentIndex+1) + s.TnxEnd() - s.put(key, value, s.currentIndex+1, 0) - s.currentIndex = s.currentIndex + 1 return int64(s.currentIndex) } func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { - s.RLock() - defer s.RUnlock() + s.TnxBegin() + kvs, index = s.TnxRange(key, end, limit, rangeIndex) + s.TnxEnd() + return kvs, index +} + +func (s *store) DeleteRange(key, end []byte) (n, index int64) { + s.TnxBegin() + n = s.deleteRange(key, end, s.currentIndex+1) + s.TnxEnd() + + return n, int64(s.currentIndex) +} + +func (s *store) TnxBegin() { + s.mu.Lock() + s.subIndex = 0 +} + +func (s *store) TnxEnd() { + if s.subIndex != 0 { + s.currentIndex += 1 + } + s.subIndex = 0 + s.mu.Unlock() +} + +func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { if rangeIndex <= 0 { index = int64(s.currentIndex) + if s.subIndex > 0 { + index += 1 + } } else { index = rangeIndex } @@ -83,6 +110,7 @@ func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb binary.BigEndian.PutUint64(endbytes, pair.index+1) found := false + var kv *storagepb.KeyValue vs := tx.UnsafeRange(keyBucketName, ibytes, endbytes, 0) for _, v := range vs { @@ -93,46 +121,40 @@ func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb } if bytes.Equal(e.Kv.Key, pair.key) { if e.Type == storagepb.PUT { - kvs = append(kvs, e.Kv) + kv = &e.Kv + } else { + kv = nil } found = true - break } } if !found { log.Fatalf("storage: range cannot find key %s at index %d", string(pair.key), pair.index) } + if kv != nil { + kvs = append(kvs, *kv) + } } return kvs, index } -func (s *store) DeleteRange(key, end []byte) (n, index int64) { - s.Lock() - defer s.Unlock() - - index = int64(s.currentIndex) + 1 - - pairs := s.kvindex.Range(key, end, s.currentIndex) - if len(pairs) == 0 { - return 0, int64(s.currentIndex) - } - - for i, pair := range pairs { - ok := s.delete(pair.key, uint64(index), uint32(i)) - if ok { - n++ - } - } - if n != 0 { - s.currentIndex = s.currentIndex + 1 - } - return n, int64(s.currentIndex) +func (s *store) TnxPut(key, value []byte) int64 { + s.put(key, value, s.currentIndex+1) + return int64(s.currentIndex + 1) } -func (s *store) put(key, value []byte, index uint64, subindex uint32) { +func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) { + n = s.deleteRange(key, end, s.currentIndex+1) + if n != 0 || s.subIndex != 0 { + index = int64(s.currentIndex + 1) + } + return n, index +} + +func (s *store) put(key, value []byte, index uint64) { ibytes := make([]byte, 8+1+4) - indexToBytes(index, subindex, ibytes) + indexToBytes(index, s.subIndex, ibytes) event := storagepb.Event{ Type: storagepb.PUT, @@ -152,17 +174,43 @@ func (s *store) put(key, value []byte, index uint64, subindex uint32) { defer tx.Unlock() tx.UnsafePut(keyBucketName, ibytes, d) s.kvindex.Put(key, index) + s.subIndex += 1 } -func (s *store) delete(key []byte, index uint64, subindex uint32) bool { - _, err := s.kvindex.Get(key, index) +func (s *store) deleteRange(key, end []byte, index uint64) int64 { + var n int64 + rindex := index + if s.subIndex > 0 { + rindex += 1 + } + pairs := s.kvindex.Range(key, end, rindex) + + if len(pairs) == 0 { + return 0 + } + + for _, pair := range pairs { + ok := s.delete(pair.key, index) + if ok { + n++ + } + } + return n +} + +func (s *store) delete(key []byte, index uint64) bool { + gindex := index + if s.subIndex > 0 { + gindex += 1 + } + _, err := s.kvindex.Get(key, gindex) if err != nil { // key not exist return false } ibytes := make([]byte, 8+1+4) - indexToBytes(index, subindex, ibytes) + indexToBytes(index, s.subIndex, ibytes) event := storagepb.Event{ Type: storagepb.DELETE, @@ -184,7 +232,7 @@ func (s *store) delete(key []byte, index uint64, subindex uint32) bool { if err != nil { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) } - + s.subIndex += 1 return true } diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 2a70d229f..ebacf2614 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -152,6 +152,69 @@ func TestRangeInSequence(t *testing.T) { } } +func TestOneTnx(t *testing.T) { + s := newStore("test") + defer os.Remove("test") + + s.TnxBegin() + for i := 0; i < 3; i++ { + s.TnxPut([]byte("foo"), []byte("bar")) + s.TnxPut([]byte("foo1"), []byte("bar1")) + s.TnxPut([]byte("foo2"), []byte("bar2")) + + // remove foo + n, index := s.TnxDeleteRange([]byte("foo"), nil) + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + kvs, index := s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + if len(kvs) != 2 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) + } + + // remove again -> expect nothing + n, index = s.TnxDeleteRange([]byte("foo"), nil) + if n != 0 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1) + } + + // remove foo1 + n, index = s.TnxDeleteRange([]byte("foo"), []byte("foo2")) + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + // after removal foo1 + kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + if len(kvs) != 1 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) + } + + // remove foo2 + n, index = s.TnxDeleteRange([]byte("foo2"), []byte("foo3")) + if n != 1 || index != 1 { + t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) + } + + // after removal foo2 + kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } + } + s.TnxEnd() + + // After tnx + kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1) + if len(kvs) != 0 { + t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) + } + if index != 1 { + t.Fatalf("index = %d, want %d", index, 1) + } +} + func BenchmarkStorePut(b *testing.B) { s := newStore("test") defer os.Remove("test")