From c97dda766ea9623b41c36bd56143241726d8b4e1 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 29 Sep 2015 23:26:01 -0700 Subject: [PATCH] storage: hold batchTx lock during KV txn One txn is treated as atomic, and might contain multiple Put/Delete/Range operations. For now, between these operations, we might call forecCommit to sync the change to disk, or backend may commit it in background. Thus the snapshot state might contains an unfinished multiple objects transaction, which is dangerous if database is restored from the snapshot. This PR makes KV txn hold batchTx lock during the process and avoids commit to happen. --- storage/kvstore.go | 20 +++++++------------- storage/kvstore_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index 8e2a530d4..2f6d398e9 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -52,6 +52,7 @@ type store struct { // the main revision of the last compaction compactMainRev int64 + tx backend.BatchTx tmu sync.Mutex // protect the txnID field txnID int64 // tracks the current txnID to verify txn operations @@ -122,6 +123,8 @@ func (s *store) DeleteRange(key, end []byte) (n, rev int64) { func (s *store) TxnBegin() int64 { s.mu.Lock() s.currentRev.sub = 0 + s.tx = s.b.BatchTx() + s.tx.Lock() s.tmu.Lock() defer s.tmu.Unlock() @@ -148,6 +151,7 @@ func (s *store) txnEnd(txnID int64) error { return ErrTxnIDMismatch } + s.tx.Unlock() if s.currentRev.sub != 0 { s.currentRev.main += 1 } @@ -392,14 +396,11 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage return nil, rev, nil } - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() for _, revpair := range revpairs { revbytes := newRevBytes() revToBytes(revpair, revbytes) - _, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0) + _, vs := s.tx.UnsafeRange(keyBucketName, revbytes, nil, 0) if len(vs) != 1 { log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub) } @@ -446,10 +447,7 @@ func (s *store) put(key, value []byte) { log.Fatalf("storage: cannot marshal event: %v", err) } - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafePut(keyBucketName, ibytes, d) + s.tx.UnsafePut(keyBucketName, ibytes, d) s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) s.currentRev.sub += 1 } @@ -474,10 +472,6 @@ func (s *store) deleteRange(key, end []byte) int64 { func (s *store) delete(key []byte) { mainrev := s.currentRev.main + 1 - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - ibytes := newRevBytes() revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) @@ -493,7 +487,7 @@ func (s *store) delete(key []byte) { log.Fatalf("storage: cannot marshal event: %v", err) } - tx.UnsafePut(keyBucketName, ibytes, d) + s.tx.UnsafePut(keyBucketName, ibytes, d) err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub}) if err != nil { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index e74a5c2b8..1b3a23683 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -103,6 +103,7 @@ func TestStorePut(t *testing.T) { for i, tt := range tests { s, b, index := newFakeStore() s.currentRev = tt.rev + s.tx = b.BatchTx() index.indexGetRespc <- tt.r s.put([]byte("foo"), []byte("bar")) @@ -164,6 +165,7 @@ func TestStoreRange(t *testing.T) { for i, tt := range tests { s, b, index := newFakeStore() s.currentRev = currev + s.tx = b.BatchTx() b.tx.rangeRespc <- tt.r index.indexRangeRespc <- tt.idxr @@ -223,6 +225,7 @@ func TestStoreDeleteRange(t *testing.T) { for i, tt := range tests { s, b, index := newFakeStore() s.currentRev = tt.rev + s.tx = b.BatchTx() index.indexRangeRespc <- tt.r n := s.deleteRange([]byte("foo"), []byte("goo")) @@ -651,6 +654,32 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tx.Unlock() } +func TestTxnBlockBackendForceCommit(t *testing.T) { + s := newStore(tmpPath) + defer os.Remove(tmpPath) + + id := s.TxnBegin() + + done := make(chan struct{}) + go func() { + s.b.ForceCommit() + done <- struct{}{} + }() + select { + case <-done: + t.Fatalf("failed to block ForceCommit") + case <-time.After(100 * time.Millisecond): + } + + s.TxnEnd(id) + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Fatalf("failed to execute ForceCommit") + } + +} + func BenchmarkStorePut(b *testing.B) { s := newStore(tmpPath) defer os.Remove(tmpPath)