From 84d777305d2a0884ae5e9a64092a295dab771cb8 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 21 Dec 2015 05:37:40 -0800 Subject: [PATCH] storage: use only one mutex for store struct Mutex is a variable, which means there needs to be only one mutex value per scope. We don't need a separate mutex inside store struct, **if we assume that `TxnPut` and `TxnRange` are called ONLY ONCE per transaction (between `TxnBegin` and `TxnEnd`)**, as documented. --- storage/kvstore.go | 19 +++--------- storage/kvstore_bench_test.go | 57 +++++++++++++++++++++++++++++++++++ storage/kvstore_test.go | 57 ++++++++++++++++++++++++----------- 3 files changed, 101 insertions(+), 32 deletions(-) create mode 100644 storage/kvstore_bench_test.go diff --git a/storage/kvstore.go b/storage/kvstore.go index 042e7b80e..6dc11a4cc 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -49,7 +49,7 @@ var ( ) type store struct { - mu sync.RWMutex + mu sync.Mutex // guards the following b backend.Backend kvindex index @@ -59,8 +59,7 @@ type store struct { compactMainRev int64 tx backend.BatchTx - tmu sync.Mutex // protect the txnID field - txnID int64 // tracks the current txnID to verify txn operations + txnID int64 // tracks the current txnID to verify txn operations wg sync.WaitGroup stopc chan struct{} @@ -86,8 +85,8 @@ func newStore(path string) *store { } func (s *store) Rev() int64 { - s.mu.RLock() - defer s.mu.RUnlock() + s.mu.Lock() + defer s.mu.Unlock() return s.currentRev.main } @@ -128,8 +127,6 @@ func (s *store) TxnBegin() int64 { s.tx = s.b.BatchTx() s.tx.Lock() - s.tmu.Lock() - defer s.tmu.Unlock() s.txnID = rand.Int63() return s.txnID } @@ -147,8 +144,6 @@ func (s *store) TxnEnd(txnID int64) error { // txnEnd is used for unlocking an internal txn. It does // not increase the txnCounter. func (s *store) txnEnd(txnID int64) error { - s.tmu.Lock() - defer s.tmu.Unlock() if txnID != s.txnID { return ErrTxnIDMismatch } @@ -165,8 +160,6 @@ func (s *store) txnEnd(txnID int64) error { } func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { - s.tmu.Lock() - defer s.tmu.Unlock() if txnID != s.txnID { return nil, 0, ErrTxnIDMismatch } @@ -174,8 +167,6 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k } func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { - s.tmu.Lock() - defer s.tmu.Unlock() if txnID != s.txnID { return 0, ErrTxnIDMismatch } @@ -185,8 +176,6 @@ func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { } func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - s.tmu.Lock() - defer s.tmu.Unlock() if txnID != s.txnID { return 0, 0, ErrTxnIDMismatch } diff --git a/storage/kvstore_bench_test.go b/storage/kvstore_bench_test.go new file mode 100644 index 000000000..70874de5e --- /dev/null +++ b/storage/kvstore_bench_test.go @@ -0,0 +1,57 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package storage + +import ( + "log" + "os" + "testing" +) + +func BenchmarkStorePut(b *testing.B) { + s := newStore(tmpPath) + defer os.Remove(tmpPath) + + // arbitrary number of bytes + bytesN := 64 + keys := createBytesSlice(bytesN, b.N) + vals := createBytesSlice(bytesN, b.N) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Put(keys[i], vals[i]) + } +} + +// BenchmarkStoreTxnPut benchmarks the Put operation +// with transaction begin and end, where transaction involves +// some synchronization operations, such as mutex locking. +func BenchmarkStoreTxnPut(b *testing.B) { + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + // arbitrary number of bytes + bytesN := 64 + keys := createBytesSlice(bytesN, b.N) + vals := createBytesSlice(bytesN, b.N) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + id := s.TxnBegin() + if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil { + log.Fatalf("txn put error: %v", err) + } + s.TxnEnd(id) + } +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 3ae16e645..c179147fc 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -18,6 +18,7 @@ import ( "crypto/rand" "encoding/binary" "math" + mrand "math/rand" "os" "reflect" "testing" @@ -665,6 +666,32 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) } +func TestTxnPut(t *testing.T) { + // assign arbitrary size + bytesN := 30 + sliceN := 100 + keys := createBytesSlice(bytesN, sliceN) + vals := createBytesSlice(bytesN, sliceN) + + s := newStore(tmpPath) + defer cleanup(s, tmpPath) + + for i := 0; i < sliceN; i++ { + id := s.TxnBegin() + base := int64(i + 1) + + rev, err := s.TxnPut(id, keys[i], vals[i]) + if err != nil { + t.Error("txn put error") + } + if rev != base { + t.Errorf("#%d: rev = %d, want %d", i, rev, base) + } + + s.TxnEnd(id) + } +} + func TestTxnBlockBackendForceCommit(t *testing.T) { s := newStore(tmpPath) defer os.Remove(tmpPath) @@ -691,23 +718,6 @@ func TestTxnBlockBackendForceCommit(t *testing.T) { } -func BenchmarkStorePut(b *testing.B) { - s := newStore(tmpPath) - defer os.Remove(tmpPath) - - // prepare keys - keys := make([][]byte, b.N) - for i := 0; i < b.N; i++ { - keys[i] = make([]byte, 64) - rand.Read(keys[i]) - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - s.Put(keys[i], []byte("foo")) - } -} - func newTestRevBytes(rev revision) []byte { bytes := newRevBytes() revToBytes(rev, bytes) @@ -831,3 +841,16 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} { return <-i.indexCompactRespc } func (i *fakeIndex) Equal(b index) bool { return false } + +func createBytesSlice(bytesN, sliceN int) [][]byte { + rs := [][]byte{} + for len(rs) != sliceN { + mrand.Seed(time.Now().UnixNano()) + v := make([]byte, bytesN) + if _, err := rand.Read(v); err != nil { + panic(err) + } + rs = append(rs, v) + } + return rs +}