mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
mvcc/backend: hold 'readTx.Lock' until completing bolt.Tx reset
Fix https://github.com/coreos/etcd/issues/7526. When resetting `bolt.Tx` in `defrag` and `batchTxBuffered.commit` operation, we do not hold `readTx` lock, so the inflight range requests can trigger panic in `mvcc.Range` paths. This fixes by moving mutexes out and hold it while resetting the `readTx`. Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
049ca8746a
commit
26abd25cd3
@ -247,7 +247,11 @@ func (b *backend) defrag() error {
|
|||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
b.batchTx.commit(true)
|
// block concurrent read requests while resetting tx
|
||||||
|
b.readTx.mu.Lock()
|
||||||
|
defer b.readTx.mu.Unlock()
|
||||||
|
|
||||||
|
b.batchTx.unsafeCommit(true)
|
||||||
b.batchTx.tx = nil
|
b.batchTx.tx = nil
|
||||||
|
|
||||||
tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
|
tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
|
||||||
@ -288,6 +292,10 @@ func (b *backend) defrag() error {
|
|||||||
plog.Fatalf("cannot begin tx (%s)", err)
|
plog.Fatalf("cannot begin tx (%s)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.readTx.buf.reset()
|
||||||
|
b.readTx.tx = b.unsafeBegin(false)
|
||||||
|
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -345,12 +353,17 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
|||||||
|
|
||||||
func (b *backend) begin(write bool) *bolt.Tx {
|
func (b *backend) begin(write bool) *bolt.Tx {
|
||||||
b.mu.RLock()
|
b.mu.RLock()
|
||||||
|
tx := b.unsafeBegin(write)
|
||||||
|
b.mu.RUnlock()
|
||||||
|
atomic.StoreInt64(&b.size, tx.Size())
|
||||||
|
return tx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *backend) unsafeBegin(write bool) *bolt.Tx {
|
||||||
tx, err := b.db.Begin(write)
|
tx, err := b.db.Begin(write)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Fatalf("cannot begin tx (%s)", err)
|
plog.Fatalf("cannot begin tx (%s)", err)
|
||||||
}
|
}
|
||||||
b.mu.RUnlock()
|
|
||||||
atomic.StoreInt64(&b.size, tx.Size())
|
|
||||||
return tx
|
return tx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,6 +244,10 @@ func (t *batchTxBuffered) commit(stop bool) {
|
|||||||
// all read txs must be closed to acquire boltdb commit rwlock
|
// all read txs must be closed to acquire boltdb commit rwlock
|
||||||
t.backend.readTx.mu.Lock()
|
t.backend.readTx.mu.Lock()
|
||||||
defer t.backend.readTx.mu.Unlock()
|
defer t.backend.readTx.mu.Unlock()
|
||||||
|
t.unsafeCommit(stop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||||
if t.backend.readTx.tx != nil {
|
if t.backend.readTx.tx != nil {
|
||||||
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
||||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user