mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

If backend is closed, the operations on backend in compact goroutine will panic. So this PR waits for compact goroutine to exit before close backend. This fixes the TestWorkflow failure too.
48 lines
960 B
Go
48 lines
960 B
Go
package storage
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"time"
|
|
)
|
|
|
|
func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) {
|
|
defer s.wg.Done()
|
|
end := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
|
|
|
|
batchsize := int64(10000)
|
|
last := make([]byte, 8+1+8)
|
|
for {
|
|
var rev reversion
|
|
|
|
tx := s.b.BatchTx()
|
|
tx.Lock()
|
|
|
|
keys, _ := tx.UnsafeRange(keyBucketName, last, end, batchsize)
|
|
for _, key := range keys {
|
|
rev = bytesToRev(key)
|
|
if _, ok := keep[rev]; !ok {
|
|
tx.UnsafeDelete(keyBucketName, key)
|
|
}
|
|
}
|
|
|
|
if len(keys) == 0 {
|
|
rbytes := make([]byte, 8+1+8)
|
|
revToBytes(reversion{main: compactMainRev}, rbytes)
|
|
tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
|
|
tx.Unlock()
|
|
return
|
|
}
|
|
|
|
// update last
|
|
revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last)
|
|
tx.Unlock()
|
|
|
|
select {
|
|
case <-time.After(100 * time.Millisecond):
|
|
case <-s.stopc:
|
|
return
|
|
}
|
|
}
|
|
}
|