mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #7579 from gyuho/fix-defrage
*: fix panic during defrag operation
This commit is contained in:
commit
161c7f6bdf
@ -47,3 +47,31 @@ func TestV3MaintenanceHashInflight(t *testing.T) {
|
||||
|
||||
<-donec
|
||||
}
|
||||
|
||||
// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
|
||||
// does not panic the mvcc backend while defragment is running.
|
||||
func TestV3MaintenanceDefragmentInflightRange(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
cli := clus.RandClient()
|
||||
kvc := toGRPC(cli).KV
|
||||
if _, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
defer close(donec)
|
||||
kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo")})
|
||||
}()
|
||||
|
||||
mvc := toGRPC(cli).Maintenance
|
||||
mvc.Defragment(context.Background(), &pb.DefragmentRequest{})
|
||||
cancel()
|
||||
|
||||
<-donec
|
||||
}
|
||||
|
@ -265,7 +265,11 @@ func (b *backend) defrag() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
b.batchTx.commit(true)
|
||||
// block concurrent read requests while resetting tx
|
||||
b.readTx.mu.Lock()
|
||||
defer b.readTx.mu.Unlock()
|
||||
|
||||
b.batchTx.unsafeCommit(true)
|
||||
b.batchTx.tx = nil
|
||||
|
||||
tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
|
||||
@ -306,6 +310,10 @@ func (b *backend) defrag() error {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
|
||||
b.readTx.buf.reset()
|
||||
b.readTx.tx = b.unsafeBegin(false)
|
||||
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -363,12 +371,17 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
|
||||
|
||||
func (b *backend) begin(write bool) *bolt.Tx {
|
||||
b.mu.RLock()
|
||||
tx := b.unsafeBegin(write)
|
||||
b.mu.RUnlock()
|
||||
atomic.StoreInt64(&b.size, tx.Size())
|
||||
return tx
|
||||
}
|
||||
|
||||
func (b *backend) unsafeBegin(write bool) *bolt.Tx {
|
||||
tx, err := b.db.Begin(write)
|
||||
if err != nil {
|
||||
plog.Fatalf("cannot begin tx (%s)", err)
|
||||
}
|
||||
b.mu.RUnlock()
|
||||
atomic.StoreInt64(&b.size, tx.Size())
|
||||
return tx
|
||||
}
|
||||
|
||||
|
@ -244,6 +244,10 @@ func (t *batchTxBuffered) commit(stop bool) {
|
||||
// all read txs must be closed to acquire boltdb commit rwlock
|
||||
t.backend.readTx.mu.Lock()
|
||||
defer t.backend.readTx.mu.Unlock()
|
||||
t.unsafeCommit(stop)
|
||||
}
|
||||
|
||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||
if t.backend.readTx.tx != nil {
|
||||
if err := t.backend.readTx.tx.Rollback(); err != nil {
|
||||
plog.Fatalf("cannot rollback tx (%s)", err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user