mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: set multiple concurrentReadTx instances share one txReadBuffer.
This commit is contained in:
parent
46b49a6ecf
commit
9c82e8c72b
@ -79,6 +79,12 @@ type Snapshot interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
type txReadBufferCache struct {
|
||||
mu sync.Mutex
|
||||
buf *txReadBuffer
|
||||
bufVersion uint64
|
||||
}
|
||||
|
||||
type backend struct {
|
||||
// size and commits are used with atomic operations so they must be
|
||||
// 64-bit aligned, otherwise 32-bit tests will crash
|
||||
@ -102,6 +108,11 @@ type backend struct {
|
||||
batchTx *batchTxBuffered
|
||||
|
||||
readTx *readTx
|
||||
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
|
||||
// When creating "concurrentReadTx":
|
||||
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
|
||||
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
|
||||
txReadBufferCache txReadBufferCache
|
||||
|
||||
stopc chan struct{}
|
||||
donec chan struct{}
|
||||
@ -183,19 +194,26 @@ func newBackend(bcfg BackendConfig) *backend {
|
||||
readTx: &readTx{
|
||||
baseReadTx: baseReadTx{
|
||||
buf: txReadBuffer{
|
||||
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
|
||||
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
|
||||
bufVersion: 0,
|
||||
},
|
||||
buckets: make(map[BucketID]*bolt.Bucket),
|
||||
txWg: new(sync.WaitGroup),
|
||||
txMu: new(sync.RWMutex),
|
||||
},
|
||||
},
|
||||
txReadBufferCache: txReadBufferCache{
|
||||
mu: sync.Mutex{},
|
||||
bufVersion: 0,
|
||||
buf: nil,
|
||||
},
|
||||
|
||||
stopc: make(chan struct{}),
|
||||
donec: make(chan struct{}),
|
||||
|
||||
lg: bcfg.Logger,
|
||||
}
|
||||
|
||||
b.batchTx = newBatchTxBuffered(b)
|
||||
// We set it after newBatchTxBuffered to skip the 'empty' commit.
|
||||
b.hooks = bcfg.Hooks
|
||||
@ -221,10 +239,68 @@ func (b *backend) ConcurrentReadTx() ReadTx {
|
||||
defer b.readTx.RUnlock()
|
||||
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
|
||||
b.readTx.txWg.Add(1)
|
||||
|
||||
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
|
||||
|
||||
// inspect/update cache recency iff there's no ongoing update to the cache
|
||||
// this falls through if there's no cache update
|
||||
|
||||
// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
|
||||
// which requires write lock to update "readTx.baseReadTx.buf".
|
||||
// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
|
||||
// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
|
||||
// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
|
||||
// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
|
||||
// by avoiding copying "readTx.baseReadTx.buf".
|
||||
b.txReadBufferCache.mu.Lock()
|
||||
|
||||
curCache := b.txReadBufferCache.buf
|
||||
curCacheVer := b.txReadBufferCache.bufVersion
|
||||
curBufVer := b.readTx.buf.bufVersion
|
||||
|
||||
isEmptyCache := curCache == nil
|
||||
isStaleCache := curCacheVer != curBufVer
|
||||
|
||||
var buf *txReadBuffer
|
||||
switch {
|
||||
case isEmptyCache:
|
||||
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
|
||||
// this is only supposed to run once so there won't be much overhead
|
||||
curBuf := b.readTx.buf.unsafeCopy()
|
||||
buf = &curBuf
|
||||
case isStaleCache:
|
||||
// to maximize the concurrency, try unsafe copy of buffer
|
||||
// release the lock while copying buffer -- cache may become stale again and
|
||||
// get overwritten by someone else.
|
||||
// therefore, we need to check the readTx buffer version again
|
||||
b.txReadBufferCache.mu.Unlock()
|
||||
curBuf := b.readTx.buf.unsafeCopy()
|
||||
b.txReadBufferCache.mu.Lock()
|
||||
buf = &curBuf
|
||||
default:
|
||||
// neither empty nor stale cache, just use the current buffer
|
||||
buf = curCache
|
||||
}
|
||||
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
|
||||
// as a result, curCacheVer could be no longer the same as
|
||||
// txReadBufferCache.bufVersion
|
||||
// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
|
||||
// then the cache became stale while copying "readTx.baseReadTx.buf".
|
||||
// It is safe to not update "txReadBufferCache.buf", because the next following
|
||||
// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
|
||||
// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
|
||||
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
|
||||
// continue if the cache is never set or no one has modified the cache
|
||||
b.txReadBufferCache.buf = buf
|
||||
b.txReadBufferCache.bufVersion = curBufVer
|
||||
}
|
||||
|
||||
b.txReadBufferCache.mu.Unlock()
|
||||
|
||||
// concurrentReadTx is not supposed to write to its txReadBuffer
|
||||
return &concurrentReadTx{
|
||||
baseReadTx: baseReadTx{
|
||||
buf: b.readTx.buf.unsafeCopy(),
|
||||
buf: *buf,
|
||||
txMu: b.readTx.txMu,
|
||||
tx: b.readTx.tx,
|
||||
buckets: b.readTx.buckets,
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
const bucketBufferInitialSize = 512
|
||||
|
||||
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
|
||||
type txBuffer struct {
|
||||
buckets map[BucketID]*bucketBuffer
|
||||
@ -88,10 +90,16 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
||||
rb.merge(wb)
|
||||
}
|
||||
txw.reset()
|
||||
// increase the buffer version
|
||||
txr.bufVersion++
|
||||
}
|
||||
|
||||
// txReadBuffer accesses buffered updates.
|
||||
type txReadBuffer struct{ txBuffer }
|
||||
type txReadBuffer struct {
|
||||
txBuffer
|
||||
// bufVersion is used to check if the buffer is modified recently
|
||||
bufVersion uint64
|
||||
}
|
||||
|
||||
func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||
if b := txr.buckets[bucket.ID()]; b != nil {
|
||||
@ -113,6 +121,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
|
||||
txBuffer: txBuffer{
|
||||
buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)),
|
||||
},
|
||||
bufVersion: 0,
|
||||
}
|
||||
for bucketName, bucket := range txr.txBuffer.buckets {
|
||||
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
|
||||
@ -133,7 +142,7 @@ type bucketBuffer struct {
|
||||
}
|
||||
|
||||
func newBucketBuffer() *bucketBuffer {
|
||||
return &bucketBuffer{buf: make([]kv, 512), used: 0}
|
||||
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
|
||||
}
|
||||
|
||||
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user