From cbb8b9bb08fbaa01554f41fe88a39064590f67a9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 May 2015 10:35:51 -0700 Subject: [PATCH] stroage: add tnx id --- storage/kvstore.go | 66 +++++++++++++++++++++++++++++++---------- storage/kvstore_test.go | 48 ++++++++++++++++++++++-------- 2 files changed, 87 insertions(+), 27 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index b3672d5da..697c841bb 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -3,7 +3,9 @@ package storage import ( "bytes" "encoding/binary" + "errors" "log" + "math/rand" "sync" "time" @@ -15,6 +17,8 @@ var ( batchLimit = 10000 batchInterval = 100 * time.Millisecond keyBucketName = []byte("key") + + ErrTnxIDMismatch = errors.New("storage: tnx id mismatch") ) type store struct { @@ -24,8 +28,10 @@ type store struct { kvindex index currentIndex uint64 + subIndex uint32 // tracks next subIndex to put into backend - subIndex uint32 // tracks next subIndex to put into backend + tmu sync.Mutex // protect the tnxID field + tnxID int64 // tracks the current tnxID to verify tnx operations } func newStore(path string) *store { @@ -45,57 +51,87 @@ func newStore(path string) *store { } func (s *store) Put(key, value []byte) int64 { - s.TnxBegin() + id := s.TnxBegin() s.put(key, value, s.currentIndex+1) - s.TnxEnd() + s.TnxEnd(id) return int64(s.currentIndex) } func (s *store) Range(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { - s.TnxBegin() + id := s.TnxBegin() kvs, index = s.rangeKeys(key, end, limit, rangeIndex) - s.TnxEnd() + s.TnxEnd(id) return kvs, index } func (s *store) DeleteRange(key, end []byte) (n, index int64) { - s.TnxBegin() + id := s.TnxBegin() n = s.deleteRange(key, end, s.currentIndex+1) - s.TnxEnd() + s.TnxEnd(id) return n, int64(s.currentIndex) } -func (s *store) TnxBegin() { +func (s *store) TnxBegin() int64 { s.mu.Lock() s.subIndex = 0 + + s.tmu.Lock() + defer s.tmu.Unlock() + s.tnxID = rand.Int63() + return s.tnxID } -func (s *store) TnxEnd() { +func (s *store) TnxEnd(tnxID int64) error { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return ErrTnxIDMismatch + } + if s.subIndex != 0 { s.currentIndex += 1 } s.subIndex = 0 s.mu.Unlock() + return nil } -func (s *store) TnxRange(key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64) { - return s.rangeKeys(key, end, limit, rangeIndex) +func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeIndex int64) (kvs []storagepb.KeyValue, index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return nil, 0, ErrTnxIDMismatch + } + kvs, index = s.rangeKeys(key, end, limit, rangeIndex) + return kvs, index, nil } -func (s *store) TnxPut(key, value []byte) int64 { +func (s *store) TnxPut(tnxID int64, key, value []byte) (index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return 0, ErrTnxIDMismatch + } + s.put(key, value, s.currentIndex+1) - return int64(s.currentIndex + 1) + return int64(s.currentIndex + 1), nil } -func (s *store) TnxDeleteRange(key, end []byte) (n, index int64) { +func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, index int64, err error) { + s.tmu.Lock() + defer s.tmu.Unlock() + if tnxID != s.tnxID { + return 0, 0, ErrTnxIDMismatch + } + n = s.deleteRange(key, end, s.currentIndex+1) if n != 0 || s.subIndex != 0 { index = int64(s.currentIndex + 1) } - return n, index + return n, index, nil } // range is a keyword in Go, add Keys suffix. diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index ebacf2614..5aacd165f 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -156,54 +156,78 @@ func TestOneTnx(t *testing.T) { s := newStore("test") defer os.Remove("test") - s.TnxBegin() + id := s.TnxBegin() for i := 0; i < 3; i++ { - s.TnxPut([]byte("foo"), []byte("bar")) - s.TnxPut([]byte("foo1"), []byte("bar1")) - s.TnxPut([]byte("foo2"), []byte("bar2")) + s.TnxPut(id, []byte("foo"), []byte("bar")) + s.TnxPut(id, []byte("foo1"), []byte("bar1")) + s.TnxPut(id, []byte("foo2"), []byte("bar2")) // remove foo - n, index := s.TnxDeleteRange([]byte("foo"), nil) + n, index, err := s.TnxDeleteRange(id, []byte("foo"), nil) + if err != nil { + t.Fatal(err) + } if n != 1 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) } - kvs, index := s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + kvs, index, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } if len(kvs) != 2 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 2) } // remove again -> expect nothing - n, index = s.TnxDeleteRange([]byte("foo"), nil) + n, index, err = s.TnxDeleteRange(id, []byte("foo"), nil) + if err != nil { + t.Fatal(err) + } if n != 0 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 0, 1) } // remove foo1 - n, index = s.TnxDeleteRange([]byte("foo"), []byte("foo2")) + n, index, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2")) + if err != nil { + t.Fatal(err) + } if n != 1 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) } // after removal foo1 - kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } if len(kvs) != 1 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 1) } // remove foo2 - n, index = s.TnxDeleteRange([]byte("foo2"), []byte("foo3")) + n, index, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3")) + if err != nil { + t.Fatal(err) + } if n != 1 || index != 1 { t.Fatalf("n = %d, index = %d, want (%d, %d)", n, index, 1, 1) } // after removal foo2 - kvs, index = s.TnxRange([]byte("foo"), []byte("foo3"), 0, 0) + kvs, index, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) + if err != nil { + t.Fatal(err) + } if len(kvs) != 0 { t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) } } - s.TnxEnd() + err := s.TnxEnd(id) + if err != nil { + t.Fatal(err) + } // After tnx kvs, index := s.Range([]byte("foo"), []byte("foo3"), 0, 1)