From 8d438c2939468a8539fdd8928e73f3d92584ce34 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 5 Jan 2017 02:07:50 -0800 Subject: [PATCH] backend: readtx ReadTxs are designed for read-only accesses to the backend using a read-only boltDB transaction. Since BatchTx's are long-running transactions, all writes to BatchTx will writeback to ReadTx, overlaying the base read-only transaction. --- mvcc/backend/backend.go | 27 +++++- mvcc/backend/backend_test.go | 75 +++++++++++++++ mvcc/backend/batch_tx.go | 150 ++++++++++++++++++++--------- mvcc/backend/read_tx.go | 92 ++++++++++++++++++ mvcc/backend/tx_buffer.go | 181 +++++++++++++++++++++++++++++++++++ 5 files changed, 480 insertions(+), 45 deletions(-) create mode 100644 mvcc/backend/read_tx.go create mode 100644 mvcc/backend/tx_buffer.go diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 1be8b6697..f65e6c963 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -53,7 +53,9 @@ const ( ) type Backend interface { + ReadTx() ReadTx BatchTx() BatchTx + Snapshot() Snapshot Hash(ignores map[IgnoreKey]struct{}) (uint32, error) // Size returns the current size of the backend. @@ -86,7 +88,9 @@ type backend struct { batchInterval time.Duration batchLimit int - batchTx *batchTx + batchTx *batchTxBuffered + + readTx *readTx stopc chan struct{} donec chan struct{} @@ -106,16 +110,22 @@ func newBackend(path string, d time.Duration, limit int) *backend { plog.Panicf("cannot open database at %s (%v)", path, err) } + // In future, may want to make buffering optional for low-concurrency systems + // or dynamically swap between buffered/non-buffered depending on workload. b := &backend{ db: db, batchInterval: d, batchLimit: limit, + readTx: &readTx{buf: txReadBuffer{ + txBuffer: txBuffer{make(map[string]*bucketBuffer)}}, + }, + stopc: make(chan struct{}), donec: make(chan struct{}), } - b.batchTx = newBatchTx(b) + b.batchTx = newBatchTxBuffered(b) go b.run() return b } @@ -127,6 +137,8 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) ReadTx() ReadTx { return b.readTx } + // ForceCommit forces the current batching tx to commit. func (b *backend) ForceCommit() { b.batchTx.Commit() @@ -328,6 +340,17 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { return tmptx.Commit() } +func (b *backend) begin(write bool) *bolt.Tx { + b.mu.RLock() + tx, err := b.db.Begin(write) + if err != nil { + plog.Fatalf("cannot begin tx (%s)", err) + } + b.mu.RUnlock() + atomic.StoreInt64(&b.size, tx.Size()) + return tx +} + // NewTmpBackend creates a backend implementation for testing. func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) { dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test") diff --git a/mvcc/backend/backend_test.go b/mvcc/backend/backend_test.go index 06d908db5..68d0b1959 100644 --- a/mvcc/backend/backend_test.go +++ b/mvcc/backend/backend_test.go @@ -18,6 +18,7 @@ import ( "fmt" "io/ioutil" "os" + "reflect" "testing" "time" @@ -173,6 +174,80 @@ func TestBackendDefrag(t *testing.T) { b.ForceCommit() } +// TestBackendWriteback ensures writes are stored to the read txn on write txn unlock. +func TestBackendWriteback(t *testing.T) { + b, tmpPath := NewDefaultTmpBackend() + defer cleanup(b, tmpPath) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("key")) + tx.UnsafePut([]byte("key"), []byte("abc"), []byte("bar")) + tx.UnsafePut([]byte("key"), []byte("def"), []byte("baz")) + tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("1")) + tx.Unlock() + + // overwrites should be propagated too + tx.Lock() + tx.UnsafePut([]byte("key"), []byte("overwrite"), []byte("2")) + tx.Unlock() + + keys := []struct { + key []byte + end []byte + limit int64 + + wkey [][]byte + wval [][]byte + }{ + { + key: []byte("abc"), + end: nil, + + wkey: [][]byte{[]byte("abc")}, + wval: [][]byte{[]byte("bar")}, + }, + { + key: []byte("abc"), + end: []byte("def"), + + wkey: [][]byte{[]byte("abc")}, + wval: [][]byte{[]byte("bar")}, + }, + { + key: []byte("abc"), + end: []byte("deg"), + + wkey: [][]byte{[]byte("abc"), []byte("def")}, + wval: [][]byte{[]byte("bar"), []byte("baz")}, + }, + { + key: []byte("abc"), + end: []byte("\xff"), + limit: 1, + + wkey: [][]byte{[]byte("abc")}, + wval: [][]byte{[]byte("bar")}, + }, + { + key: []byte("abc"), + end: []byte("\xff"), + + wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}, + wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")}, + }, + } + rtx := b.ReadTx() + for i, tt := range keys { + rtx.Lock() + k, v := rtx.UnsafeRange([]byte("key"), tt.key, tt.end, tt.limit) + rtx.Unlock() + if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) { + t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v) + } + } +} + func cleanup(b Backend, path string) { b.Close() os.Remove(path) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 04fea1e94..1c248de59 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -16,6 +16,8 @@ package backend import ( "bytes" + "fmt" + "math" "sync" "sync/atomic" "time" @@ -24,15 +26,14 @@ import ( ) type BatchTx interface { - Lock() - Unlock() + ReadTx UnsafeCreateBucket(name []byte) UnsafePut(bucketName []byte, key []byte, value []byte) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) - UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) UnsafeDelete(bucketName []byte, key []byte) - UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error + // Commit commits a previous tx and begins a new writable one. Commit() + // CommitAndStop commits the previous tx and does not create a new one. CommitAndStop() } @@ -40,13 +41,8 @@ type batchTx struct { sync.Mutex tx *bolt.Tx backend *backend - pending int -} -func newBatchTx(backend *backend) *batchTx { - tx := &batchTx{backend: backend} - tx.Commit() - return tx + pending int } func (t *batchTx) UnsafeCreateBucket(name []byte) { @@ -84,30 +80,37 @@ func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq boo } // UnsafeRange must be called holding the lock on the tx. -func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) { - bucket := t.tx.Bucket(bucketName) +func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + k, v, err := unsafeRange(t.tx, bucketName, key, endKey, limit) + if err != nil { + plog.Fatal(err) + } + return k, v +} + +func unsafeRange(tx *bolt.Tx, bucketName, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte, err error) { + bucket := tx.Bucket(bucketName) if bucket == nil { - plog.Fatalf("bucket %s does not exist", bucketName) + return nil, nil, fmt.Errorf("bucket %s does not exist", bucketName) } - if len(endKey) == 0 { - if v := bucket.Get(key); v == nil { - return keys, vs - } else { - return append(keys, key), append(vs, v) + if v := bucket.Get(key); v != nil { + return append(keys, key), append(vs, v), nil } + return nil, nil, nil + } + if limit <= 0 { + limit = math.MaxInt64 } - c := bucket.Cursor() for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() { vs = append(vs, cv) keys = append(keys, ck) - if limit > 0 && limit == int64(len(keys)) { + if limit == int64(len(keys)) { break } } - - return keys, vs + return keys, vs, nil } // UnsafeDelete must be called holding the lock on the tx. @@ -125,12 +128,14 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { // UnsafeForEach must be called holding the lock on the tx. func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { - b := t.tx.Bucket(bucketName) - if b == nil { - // bucket does not exist - return nil + return unsafeForEach(t.tx, bucketName, visitor) +} + +func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error { + if b := tx.Bucket(bucket); b != nil { + return b.ForEach(visitor) } - return b.ForEach(visitor) + return nil } // Commit commits a previous tx and begins a new writable one. @@ -140,7 +145,7 @@ func (t *batchTx) Commit() { t.commit(false) } -// CommitAndStop commits the previous tx and do not create a new one. +// CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { t.Lock() defer t.Unlock() @@ -150,13 +155,11 @@ func (t *batchTx) CommitAndStop() { func (t *batchTx) Unlock() { if t.pending >= t.backend.batchLimit { t.commit(false) - t.pending = 0 } t.Mutex.Unlock() } func (t *batchTx) commit(stop bool) { - var err error // commit the last tx if t.tx != nil { if t.pending == 0 && !stop { @@ -178,9 +181,10 @@ func (t *batchTx) commit(stop bool) { } return } + start := time.Now() // gofail: var beforeCommit struct{} - err = t.tx.Commit() + err := t.tx.Commit() // gofail: var afterCommit struct{} commitDurations.Observe(time.Since(start).Seconds()) atomic.AddInt64(&t.backend.commits, 1) @@ -190,17 +194,77 @@ func (t *batchTx) commit(stop bool) { plog.Fatalf("cannot commit tx (%s)", err) } } - - if stop { - return + if !stop { + t.tx = t.backend.begin(true) } - - t.backend.mu.RLock() - defer t.backend.mu.RUnlock() - // begin a new tx - t.tx, err = t.backend.db.Begin(true) - if err != nil { - plog.Fatalf("cannot begin tx (%s)", err) - } - atomic.StoreInt64(&t.backend.size, t.tx.Size()) +} + +type batchTxBuffered struct { + batchTx + buf txWriteBuffer +} + +func newBatchTxBuffered(backend *backend) *batchTxBuffered { + tx := &batchTxBuffered{ + batchTx: batchTx{backend: backend}, + buf: txWriteBuffer{ + txBuffer: txBuffer{make(map[string]*bucketBuffer)}, + seq: true, + }, + } + tx.Commit() + return tx +} + +func (t *batchTxBuffered) Unlock() { + if t.pending != 0 { + t.backend.readTx.mu.Lock() + t.buf.writeback(&t.backend.readTx.buf) + t.backend.readTx.mu.Unlock() + if t.pending >= t.backend.batchLimit { + t.commit(false) + } + } + t.batchTx.Unlock() +} + +func (t *batchTxBuffered) Commit() { + t.Lock() + defer t.Unlock() + t.commit(false) +} + +func (t *batchTxBuffered) CommitAndStop() { + t.Lock() + defer t.Unlock() + t.commit(true) +} + +func (t *batchTxBuffered) commit(stop bool) { + // all read txs must be closed to acquire boltdb commit rwlock + t.backend.readTx.mu.Lock() + defer t.backend.readTx.mu.Unlock() + if t.backend.readTx.tx != nil { + if err := t.backend.readTx.tx.Rollback(); err != nil { + plog.Fatalf("cannot rollback tx (%s)", err) + } + t.backend.readTx.buf.reset() + t.backend.readTx.tx = nil + } + + t.batchTx.commit(stop) + + if !stop { + t.backend.readTx.tx = t.backend.begin(false) + } +} + +func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) { + t.batchTx.UnsafePut(bucketName, key, value) + t.buf.put(bucketName, key, value) +} + +func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) { + t.batchTx.UnsafeSeqPut(bucketName, key, value) + t.buf.putSeq(bucketName, key, value) } diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go new file mode 100644 index 000000000..51596ffdf --- /dev/null +++ b/mvcc/backend/read_tx.go @@ -0,0 +1,92 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "bytes" + "math" + "sync" + + "github.com/boltdb/bolt" +) + +// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys; +// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket +// is known to never overwrite any key so range is safe. +var safeRangeBucket = []byte("key") + +type ReadTx interface { + Lock() + Unlock() + + UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) + UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error +} + +type readTx struct { + // mu protects accesses to the txReadBuffer + mu sync.RWMutex + buf txReadBuffer + + // txmu protects accesses to the Tx on Range requests + txmu sync.Mutex + tx *bolt.Tx +} + +func (rt *readTx) Lock() { rt.mu.RLock() } +func (rt *readTx) Unlock() { rt.mu.RUnlock() } + +func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if endKey == nil { + // forbid duplicates for single keys + limit = 1 + } + if limit <= 0 { + limit = math.MaxInt64 + } + if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) { + panic("do not use unsafeRange on non-keys bucket") + } + keys, vals := rt.buf.Range(bucketName, key, endKey, limit) + if int64(len(keys)) == limit { + return keys, vals + } + rt.txmu.Lock() + // ignore error since bucket may have been created in this batch + k2, v2, _ := unsafeRange(rt.tx, bucketName, key, endKey, limit-int64(len(keys))) + rt.txmu.Unlock() + return append(k2, keys...), append(v2, vals...) +} + +func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { + dups := make(map[string]struct{}) + f1 := func(k, v []byte) error { + dups[string(k)] = struct{}{} + return visitor(k, v) + } + f2 := func(k, v []byte) error { + if _, ok := dups[string(k)]; ok { + return nil + } + return visitor(k, v) + } + if err := rt.buf.ForEach(bucketName, f1); err != nil { + return err + } + rt.txmu.Lock() + err := unsafeForEach(rt.tx, bucketName, f2) + rt.txmu.Unlock() + return err +} diff --git a/mvcc/backend/tx_buffer.go b/mvcc/backend/tx_buffer.go new file mode 100644 index 000000000..56e885dbf --- /dev/null +++ b/mvcc/backend/tx_buffer.go @@ -0,0 +1,181 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backend + +import ( + "bytes" + "sort" +) + +// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer. +type txBuffer struct { + buckets map[string]*bucketBuffer +} + +func (txb *txBuffer) reset() { + for k, v := range txb.buckets { + if v.used == 0 { + // demote + delete(txb.buckets, k) + } + v.used = 0 + } +} + +// txWriteBuffer buffers writes of pending updates that have not yet committed. +type txWriteBuffer struct { + txBuffer + seq bool +} + +func (txw *txWriteBuffer) put(bucket, k, v []byte) { + txw.seq = false + txw.putSeq(bucket, k, v) +} + +func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) { + b, ok := txw.buckets[string(bucket)] + if !ok { + b = newBucketBuffer() + txw.buckets[string(bucket)] = b + } + b.add(k, v) +} + +func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { + for k, wb := range txw.buckets { + rb, ok := txr.buckets[k] + if !ok { + delete(txw.buckets, k) + txr.buckets[k] = wb + continue + } + if !txw.seq && wb.used > 1 { + // assume no duplicate keys + sort.Sort(wb) + } + rb.merge(wb) + } + txw.reset() +} + +// txReadBuffer accesses buffered updates. +type txReadBuffer struct{ txBuffer } + +func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) { + if b := txr.buckets[string(bucketName)]; b != nil { + return b.Range(key, endKey, limit) + } + return nil, nil +} + +func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error { + if b := txr.buckets[string(bucketName)]; b != nil { + return b.ForEach(visitor) + } + return nil +} + +type kv struct { + key []byte + val []byte +} + +// bucketBuffer buffers key-value pairs that are pending commit. +type bucketBuffer struct { + buf []kv + // used tracks number of elements in use so buf can be reused without reallocation. + used int +} + +func newBucketBuffer() *bucketBuffer { + return &bucketBuffer{buf: make([]kv, 512), used: 0} +} + +func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { + f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 } + idx := sort.Search(bb.used, f) + if idx < 0 { + return nil, nil + } + if len(endKey) == 0 { + if bytes.Equal(key, bb.buf[idx].key) { + keys = append(keys, bb.buf[idx].key) + vals = append(vals, bb.buf[idx].val) + } + return keys, vals + } + if bytes.Compare(endKey, bb.buf[idx].key) <= 0 { + return nil, nil + } + for i := idx; i < bb.used && int64(len(keys)) < limit; i++ { + if bytes.Compare(endKey, bb.buf[i].key) <= 0 { + break + } + keys = append(keys, bb.buf[i].key) + vals = append(vals, bb.buf[i].val) + } + return keys, vals +} + +func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error { + for i := 0; i < bb.used; i++ { + if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil { + return err + } + } + return nil +} + +func (bb *bucketBuffer) add(k, v []byte) { + bb.buf[bb.used].key, bb.buf[bb.used].val = k, v + bb.used++ + if bb.used == len(bb.buf) { + buf := make([]kv, (3*len(bb.buf))/2) + copy(buf, bb.buf) + bb.buf = buf + } +} + +// merge merges data from bb into bbsrc. +func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) { + for i := 0; i < bbsrc.used; i++ { + bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val) + } + if bb.used == bbsrc.used { + return + } + if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 { + return + } + + sort.Stable(bb) + + // remove duplicates, using only newest update + widx := 0 + for ridx := 1; ridx < bb.used; ridx++ { + if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { + widx++ + } + bb.buf[widx] = bb.buf[ridx] + } + bb.used = widx + 1 +} + +func (bb *bucketBuffer) Len() int { return bb.used } +func (bb *bucketBuffer) Less(i, j int) bool { + return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0 +} +func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }