mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13036 from gyuho/cherry-pick
[release-3.5] server: set multiple concurrentReadTx instances share one txReadBuffer.
This commit is contained in:
commit
dd22bd747e
@ -79,6 +79,12 @@ type Snapshot interface {
|
|||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type txReadBufferCache struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
buf *txReadBuffer
|
||||||
|
bufVersion uint64
|
||||||
|
}
|
||||||
|
|
||||||
type backend struct {
|
type backend struct {
|
||||||
// size and commits are used with atomic operations so they must be
|
// size and commits are used with atomic operations so they must be
|
||||||
// 64-bit aligned, otherwise 32-bit tests will crash
|
// 64-bit aligned, otherwise 32-bit tests will crash
|
||||||
@ -102,6 +108,11 @@ type backend struct {
|
|||||||
batchTx *batchTxBuffered
|
batchTx *batchTxBuffered
|
||||||
|
|
||||||
readTx *readTx
|
readTx *readTx
|
||||||
|
// txReadBufferCache mirrors "txReadBuffer" within "readTx" -- readTx.baseReadTx.buf.
|
||||||
|
// When creating "concurrentReadTx":
|
||||||
|
// - if the cache is up-to-date, "readTx.baseReadTx.buf" copy can be skipped
|
||||||
|
// - if the cache is empty or outdated, "readTx.baseReadTx.buf" copy is required
|
||||||
|
txReadBufferCache txReadBufferCache
|
||||||
|
|
||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
donec chan struct{}
|
donec chan struct{}
|
||||||
@ -184,18 +195,25 @@ func newBackend(bcfg BackendConfig) *backend {
|
|||||||
baseReadTx: baseReadTx{
|
baseReadTx: baseReadTx{
|
||||||
buf: txReadBuffer{
|
buf: txReadBuffer{
|
||||||
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
|
txBuffer: txBuffer{make(map[string]*bucketBuffer)},
|
||||||
|
bufVersion: 0,
|
||||||
},
|
},
|
||||||
buckets: make(map[string]*bolt.Bucket),
|
buckets: make(map[string]*bolt.Bucket),
|
||||||
txWg: new(sync.WaitGroup),
|
txWg: new(sync.WaitGroup),
|
||||||
txMu: new(sync.RWMutex),
|
txMu: new(sync.RWMutex),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
txReadBufferCache: txReadBufferCache{
|
||||||
|
mu: sync.Mutex{},
|
||||||
|
bufVersion: 0,
|
||||||
|
buf: nil,
|
||||||
|
},
|
||||||
|
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
|
|
||||||
lg: bcfg.Logger,
|
lg: bcfg.Logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
b.batchTx = newBatchTxBuffered(b)
|
b.batchTx = newBatchTxBuffered(b)
|
||||||
// We set it after newBatchTxBuffered to skip the 'empty' commit.
|
// We set it after newBatchTxBuffered to skip the 'empty' commit.
|
||||||
b.hooks = bcfg.Hooks
|
b.hooks = bcfg.Hooks
|
||||||
@ -221,10 +239,68 @@ func (b *backend) ConcurrentReadTx() ReadTx {
|
|||||||
defer b.readTx.RUnlock()
|
defer b.readTx.RUnlock()
|
||||||
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
|
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
|
||||||
b.readTx.txWg.Add(1)
|
b.readTx.txWg.Add(1)
|
||||||
|
|
||||||
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
|
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
|
||||||
|
|
||||||
|
// inspect/update cache recency iff there's no ongoing update to the cache
|
||||||
|
// this falls through if there's no cache update
|
||||||
|
|
||||||
|
// by this line, "ConcurrentReadTx" code path is already protected against concurrent "writeback" operations
|
||||||
|
// which requires write lock to update "readTx.baseReadTx.buf".
|
||||||
|
// Which means setting "buf *txReadBuffer" with "readTx.buf.unsafeCopy()" is guaranteed to be up-to-date,
|
||||||
|
// whereas "txReadBufferCache.buf" may be stale from concurrent "writeback" operations.
|
||||||
|
// We only update "txReadBufferCache.buf" if we know "buf *txReadBuffer" is up-to-date.
|
||||||
|
// The update to "txReadBufferCache.buf" will benefit the following "ConcurrentReadTx" creation
|
||||||
|
// by avoiding copying "readTx.baseReadTx.buf".
|
||||||
|
b.txReadBufferCache.mu.Lock()
|
||||||
|
|
||||||
|
curCache := b.txReadBufferCache.buf
|
||||||
|
curCacheVer := b.txReadBufferCache.bufVersion
|
||||||
|
curBufVer := b.readTx.buf.bufVersion
|
||||||
|
|
||||||
|
isEmptyCache := curCache == nil
|
||||||
|
isStaleCache := curCacheVer != curBufVer
|
||||||
|
|
||||||
|
var buf *txReadBuffer
|
||||||
|
switch {
|
||||||
|
case isEmptyCache:
|
||||||
|
// perform safe copy of buffer while holding "b.txReadBufferCache.mu.Lock"
|
||||||
|
// this is only supposed to run once so there won't be much overhead
|
||||||
|
curBuf := b.readTx.buf.unsafeCopy()
|
||||||
|
buf = &curBuf
|
||||||
|
case isStaleCache:
|
||||||
|
// to maximize the concurrency, try unsafe copy of buffer
|
||||||
|
// release the lock while copying buffer -- cache may become stale again and
|
||||||
|
// get overwritten by someone else.
|
||||||
|
// therefore, we need to check the readTx buffer version again
|
||||||
|
b.txReadBufferCache.mu.Unlock()
|
||||||
|
curBuf := b.readTx.buf.unsafeCopy()
|
||||||
|
b.txReadBufferCache.mu.Lock()
|
||||||
|
buf = &curBuf
|
||||||
|
default:
|
||||||
|
// neither empty nor stale cache, just use the current buffer
|
||||||
|
buf = curCache
|
||||||
|
}
|
||||||
|
// txReadBufferCache.bufVersion can be modified when we doing an unsafeCopy()
|
||||||
|
// as a result, curCacheVer could be no longer the same as
|
||||||
|
// txReadBufferCache.bufVersion
|
||||||
|
// if !isEmptyCache && curCacheVer != b.txReadBufferCache.bufVersion
|
||||||
|
// then the cache became stale while copying "readTx.baseReadTx.buf".
|
||||||
|
// It is safe to not update "txReadBufferCache.buf", because the next following
|
||||||
|
// "ConcurrentReadTx" creation will trigger a new "readTx.baseReadTx.buf" copy
|
||||||
|
// and "buf" is still used for the current "concurrentReadTx.baseReadTx.buf".
|
||||||
|
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
|
||||||
|
// continue if the cache is never set or no one has modified the cache
|
||||||
|
b.txReadBufferCache.buf = buf
|
||||||
|
b.txReadBufferCache.bufVersion = curBufVer
|
||||||
|
}
|
||||||
|
|
||||||
|
b.txReadBufferCache.mu.Unlock()
|
||||||
|
|
||||||
|
// concurrentReadTx is not supposed to write to its txReadBuffer
|
||||||
return &concurrentReadTx{
|
return &concurrentReadTx{
|
||||||
baseReadTx: baseReadTx{
|
baseReadTx: baseReadTx{
|
||||||
buf: b.readTx.buf.unsafeCopy(),
|
buf: *buf,
|
||||||
txMu: b.readTx.txMu,
|
txMu: b.readTx.txMu,
|
||||||
tx: b.readTx.tx,
|
tx: b.readTx.tx,
|
||||||
buckets: b.readTx.buckets,
|
buckets: b.readTx.buckets,
|
||||||
|
@ -19,6 +19,8 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const bucketBufferInitialSize = 512
|
||||||
|
|
||||||
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
|
// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
|
||||||
type txBuffer struct {
|
type txBuffer struct {
|
||||||
buckets map[string]*bucketBuffer
|
buckets map[string]*bucketBuffer
|
||||||
@ -69,10 +71,16 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
|
|||||||
rb.merge(wb)
|
rb.merge(wb)
|
||||||
}
|
}
|
||||||
txw.reset()
|
txw.reset()
|
||||||
|
// increase the buffer version
|
||||||
|
txr.bufVersion++
|
||||||
}
|
}
|
||||||
|
|
||||||
// txReadBuffer accesses buffered updates.
|
// txReadBuffer accesses buffered updates.
|
||||||
type txReadBuffer struct{ txBuffer }
|
type txReadBuffer struct {
|
||||||
|
txBuffer
|
||||||
|
// bufVersion is used to check if the buffer is modified recently
|
||||||
|
bufVersion uint64
|
||||||
|
}
|
||||||
|
|
||||||
func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
|
||||||
if b := txr.buckets[string(bucketName)]; b != nil {
|
if b := txr.buckets[string(bucketName)]; b != nil {
|
||||||
@ -94,6 +102,7 @@ func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
|
|||||||
txBuffer: txBuffer{
|
txBuffer: txBuffer{
|
||||||
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
|
buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
|
||||||
},
|
},
|
||||||
|
bufVersion: 0,
|
||||||
}
|
}
|
||||||
for bucketName, bucket := range txr.txBuffer.buckets {
|
for bucketName, bucket := range txr.txBuffer.buckets {
|
||||||
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
|
txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
|
||||||
@ -114,7 +123,7 @@ type bucketBuffer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newBucketBuffer() *bucketBuffer {
|
func newBucketBuffer() *bucketBuffer {
|
||||||
return &bucketBuffer{buf: make([]kv, 512), used: 0}
|
return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
|
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user