diff --git a/storage/kvstore.go b/storage/kvstore.go index 8e2a530d4..2f6d398e9 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -52,6 +52,7 @@ type store struct { // the main revision of the last compaction compactMainRev int64 + tx backend.BatchTx tmu sync.Mutex // protect the txnID field txnID int64 // tracks the current txnID to verify txn operations @@ -122,6 +123,8 @@ func (s *store) DeleteRange(key, end []byte) (n, rev int64) { func (s *store) TxnBegin() int64 { s.mu.Lock() s.currentRev.sub = 0 + s.tx = s.b.BatchTx() + s.tx.Lock() s.tmu.Lock() defer s.tmu.Unlock() @@ -148,6 +151,7 @@ func (s *store) txnEnd(txnID int64) error { return ErrTxnIDMismatch } + s.tx.Unlock() if s.currentRev.sub != 0 { s.currentRev.main += 1 } @@ -392,14 +396,11 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage return nil, rev, nil } - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() for _, revpair := range revpairs { revbytes := newRevBytes() revToBytes(revpair, revbytes) - _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + _, vs := s.tx.UnsafeRange(keyBucketName, revbytes, nil, 0) if len(vs) != 1 { log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub) } @@ -446,10 +447,7 @@ func (s *store) put(key, value []byte) { log.Fatalf("storage: cannot marshal event: %v", err) } - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafePut(keyBucketName, ibytes, d) + s.tx.UnsafePut(keyBucketName, ibytes, d) s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) s.currentRev.sub += 1 } @@ -474,10 +472,6 @@ func (s *store) deleteRange(key, end []byte) int64 { func (s *store) delete(key []byte) { mainrev := s.currentRev.main + 1 - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - ibytes := newRevBytes() revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) @@ -493,7 +487,7 @@ func (s *store) delete(key []byte) { log.Fatalf("storage: cannot marshal event: %v", err) } - tx.UnsafePut(keyBucketName, ibytes, d) + s.tx.UnsafePut(keyBucketName, ibytes, d) err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub}) if err != nil { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index e74a5c2b8..1b3a23683 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -103,6 +103,7 @@ func TestStorePut(t *testing.T) { for i, tt := range tests { s, b, index := newFakeStore() s.currentRev = tt.rev + s.tx = b.BatchTx() index.indexGetRespc <- tt.r s.put([]byte("foo"), []byte("bar")) @@ -164,6 +165,7 @@ func TestStoreRange(t *testing.T) { for i, tt := range tests { s, b, index := newFakeStore() s.currentRev = currev + s.tx = b.BatchTx() b.tx.rangeRespc <- tt.r index.indexRangeRespc <- tt.idxr @@ -223,6 +225,7 @@ func TestStoreDeleteRange(t *testing.T) { for i, tt := range tests { s, b, index := newFakeStore() s.currentRev = tt.rev + s.tx = b.BatchTx() index.indexRangeRespc <- tt.r n := s.deleteRange([]byte("foo"), []byte("goo")) @@ -651,6 +654,32 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tx.Unlock() } +func TestTxnBlockBackendForceCommit(t *testing.T) { + s := newStore(tmpPath) + defer os.Remove(tmpPath) + + id := s.TxnBegin() + + done := make(chan struct{}) + go func() { + s.b.ForceCommit() + done <- struct{}{} + }() + select { + case <-done: + t.Fatalf("failed to block ForceCommit") + case <-time.After(100 * time.Millisecond): + } + + s.TxnEnd(id) + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Fatalf("failed to execute ForceCommit") + } + +} + func BenchmarkStorePut(b *testing.B) { s := newStore(tmpPath) defer os.Remove(tmpPath)