[NOD-447] fix deadlocks and hanging goroutines (#481)

* [NOD-447] Fix deadlocks and hanging goroutines

* [NOD-447] Add tests

* [NOD-447] Add unpatch to spawnPatch

* [NOD-447] Don't send to releaseWait if waitingCounter is zero

* [NOD-447] Change waitingCounter to boolean and rename to isReleaseWaitWaiting, change checkIfRunningSpawnsAreLeft to return only one function, and lock critical code related to wg.isReleaseWaitWaiting

* [NOD-447] Rename txConfirmations -> txConfirmationsNoLock, txConfirmationsWithLock -> txConfirmations

* [NOD-447] Add documentation and delete redundant spawn

* [NOD-447] Fix comments

* [NOD-447] Fix comments
This commit is contained in:
Ori Newman 2019-11-24 15:59:45 +02:00 committed by Dan Aharoni
parent 9a1c2e2641
commit 8e0e62f21a
4 changed files with 158 additions and 55 deletions

View File

@ -282,7 +282,7 @@ func buildGetBlockVerboseResult(s *Server, block *util.Block, isVerboseTx bool)
if err != nil {
return nil, err
}
txConfirmations, err := txConfirmations(s, tx.ID())
txConfirmations, err := txConfirmationsNoLock(s, tx.ID())
if err != nil {
return nil, err
}
@ -362,11 +362,13 @@ func hashesToGetBlockVerboseResults(s *Server, hashes []*daghash.Hash) ([]btcjso
return getBlockVerboseResults, nil
}
// txConfirmations returns the confirmations number for the given transaction
// txConfirmationsNoLock returns the confirmations number for the given transaction
// The confirmations number is defined as follows:
// If the transaction is in the mempool/in a red block/is a double spend -> 0
// Otherwise -> The confirmations number of the accepting block
func txConfirmations(s *Server, txID *daghash.TxID) (uint64, error) {
//
// This function MUST be called with the DAG state lock held (for reads).
func txConfirmationsNoLock(s *Server, txID *daghash.TxID) (uint64, error) {
if s.cfg.TxIndex == nil {
return 0, errors.New("transaction index must be enabled (--txindex)")
}
@ -379,10 +381,16 @@ func txConfirmations(s *Server, txID *daghash.TxID) (uint64, error) {
return 0, nil
}
confirmations, err := s.cfg.DAG.BlockConfirmationsByHash(acceptingBlock)
confirmations, err := s.cfg.DAG.BlockConfirmationsByHashNoLock(acceptingBlock)
if err != nil {
return 0, errors.Errorf("could not get confirmations for block that accepted tx %s: %s", txID, err)
}
return confirmations, nil
}
func txConfirmations(s *Server, txID *daghash.TxID) (uint64, error) {
s.cfg.DAG.RLock()
defer s.cfg.DAG.RUnlock()
return txConfirmationsNoLock(s, txID)
}

View File

@ -54,7 +54,7 @@ func (mtx *PriorityMutex) LowPriorityWriteUnlock() {
// HighPriorityWriteLock acquires a high-priority write lock.
func (mtx *PriorityMutex) HighPriorityWriteLock() {
mtx.highPriorityWaiting.add()
mtx.highPriorityWaiting.add(1)
mtx.dataMutex.Lock()
}
@ -67,7 +67,7 @@ func (mtx *PriorityMutex) HighPriorityWriteUnlock() {
// HighPriorityReadLock acquires a high-priority read
// lock.
func (mtx *PriorityMutex) HighPriorityReadLock() {
mtx.highPriorityWaiting.add()
mtx.highPriorityWaiting.add(1)
mtx.dataMutex.RLock()
}

View File

@ -5,21 +5,69 @@ import (
"sync/atomic"
)
// waitGroup is a type that implements the same API
// as sync.WaitGroup but allows concurrent calls to
// add() and wait().
//
// The wait group maintains a counter that can be
// increased by delta by using the waitGroup.add
// method, and decreased by 1 by using the
// waitGroup.done method.
// The general idea of the waitGroup.wait method
// is to block the current goroutine until the
// counter is set to 0. This is how it's implemented:
//
// 1) The waitGroup.wait method is checking if waitGroup.counter
// is 0. If it's the case the function returns. otherwise,
// it sets the flag waitGroup.isReleaseWaitWaiting to 1 so
// that there's a pending wait function, and waits for a signal
// from the channel waitGroup.releaseWait (waitGroup.isReleaseWaitWaiting
// is set to 1 wrapped with waitGroup.isReleaseWaitWaitingLock to
// synchronize with the reader from waitGroup.done).
//
// 2) When waitGroup.done is called, it checks if waitGroup.counter
// is 0.
//
// 3) If waitGroup.counter is 0, it also checks if there's any pending
// wait function by checking if wg.isReleaseWaitWaiting is 1, and if
// this is the case, it sends a signal to release the pending wait
// function, and then waits for a signal from waitGroup.releaseDone,
// and when the signal is received, the function returns.
// This step is wrapped with isReleaseWaitWaitingLock for two reasons:
// a) Prevent a situation where waitGroup.wait goroutine yields just
// before it sets wg.isReleaseWaitWaiting to 1, and then
// waitGroup.done will exit the function without sending the signal
// to waitGroup.wait.
// b) Prevent two waitGroup.done send concurrently a signal to the
// channel wg.releaseWait and making one of them hang forever.
//
// 4) After the waitGroup.wait is released, it sets
// waitGroup.isReleaseWaitWaiting to 0, and sends
// a signal to wg.releaseDone and go back to step 1.
//
// The waitGroup.wait is wrapped with waitGroup.mainWaitLock. It
// is used to enable multiple waits pending for the counter to be
// set to zero. This will cause a situation when one wait function
// will return, the other waits that are pending to waitGroup.mainWaitLock
// will immediately return as well. Without that lock, any call
// to waitGroup.wait will wait to its own signal from waitGroup.releaseWait
// which means that for n waits to be unblocked, the counter has to be set
// to 0 n times.
type waitGroup struct {
counter int64
waitLock sync.Mutex
syncChannel chan struct{}
counter, isReleaseWaitWaiting int64
mainWaitLock, isReleaseWaitWaitingLock sync.Mutex
releaseWait, releaseDone chan struct{}
}
func newWaitGroup() *waitGroup {
return &waitGroup{
waitLock: sync.Mutex{},
syncChannel: make(chan struct{}),
releaseWait: make(chan struct{}),
releaseDone: make(chan struct{}),
}
}
func (wg *waitGroup) add() {
atomic.AddInt64(&wg.counter, 1)
func (wg *waitGroup) add(delta int64) {
atomic.AddInt64(&wg.counter, delta)
}
func (wg *waitGroup) done() {
@ -27,17 +75,34 @@ func (wg *waitGroup) done() {
if counter < 0 {
panic("negative values for wg.counter are not allowed. This was likely caused by calling done() before add()")
}
// To avoid a situation where a struct is
// being sent to wg.releaseWait while there
// are no listeners to the channel (which will
// cause the goroutine to hang for eternity),
// we check wg.isReleaseWaitWaiting to see
// if there is a listener to wg.releaseWait.
if atomic.LoadInt64(&wg.counter) == 0 {
spawn(func() {
wg.syncChannel <- struct{}{}
})
wg.isReleaseWaitWaitingLock.Lock()
if atomic.LoadInt64(&wg.isReleaseWaitWaiting) == 1 {
wg.releaseWait <- struct{}{}
<-wg.releaseDone
}
wg.isReleaseWaitWaitingLock.Unlock()
}
}
func (wg *waitGroup) wait() {
wg.waitLock.Lock()
defer wg.waitLock.Unlock()
wg.mainWaitLock.Lock()
defer wg.mainWaitLock.Unlock()
wg.isReleaseWaitWaitingLock.Lock()
for atomic.LoadInt64(&wg.counter) != 0 {
<-wg.syncChannel
atomic.StoreInt64(&wg.isReleaseWaitWaiting, 1)
wg.isReleaseWaitWaitingLock.Unlock()
<-wg.releaseWait
atomic.StoreInt64(&wg.isReleaseWaitWaiting, 0)
wg.releaseDone <- struct{}{}
wg.isReleaseWaitWaitingLock.Lock()
}
wg.isReleaseWaitWaitingLock.Unlock()
}

View File

@ -7,47 +7,53 @@ package locks
import (
"sync/atomic"
"testing"
"time"
)
// All of the tests, except TestAddAfterWait, are copied
// All of the tests, except TestAddAfterWait and
// TestWaitAfterAddDoneCounterHasReset, are copied
// from the native sync/waitgroup_test.go (with some
// minor changes), to check that the new waitGroup
// behaves the same, except enabling the use of add()
// concurrently with wait()
// syncWaitGroupCompatible is a type that was created in order to
// make the use of waitGroup similar to the native one, so it'll
// be more convenient to use the same tests from sync/waitgroup_test.go
type syncWaitGroupCompatible struct {
*waitGroup
}
func (swg *syncWaitGroupCompatible) add(delta int) {
for i := 0; i < delta; i++ {
swg.waitGroup.add()
func spawnPatch(t *testing.T) (checkIfRunningSpawnsAreLeft func()) {
realSpawn := spawn
runningSpawns := int32(0)
spawn = func(f func()) {
atomic.AddInt32(&runningSpawns, 1)
realSpawn(func() {
f()
atomic.AddInt32(&runningSpawns, -1)
})
}
return func() {
defer func() {
spawn = realSpawn
}()
if runningSpawns != 0 {
time.Sleep(10 * time.Millisecond)
if runningSpawns != 0 {
t.Fatalf("%d running spawns left", runningSpawns)
}
}
}
}
func newSyncWgCompatible() *syncWaitGroupCompatible {
return &syncWaitGroupCompatible{
waitGroup: newWaitGroup(),
}
}
func testWaitGroup(t *testing.T, wg1 *syncWaitGroupCompatible, wg2 *syncWaitGroupCompatible) {
n := 16
func testWaitGroup(t *testing.T, wg1 *waitGroup, wg2 *waitGroup) {
n := int64(16)
wg1.add(n)
wg2.add(n)
exited := make(chan struct{}, n)
for i := 0; i != n; i++ {
go func(i int) {
for i := int64(0); i != n; i++ {
go func(i int64) {
wg1.done()
wg2.wait()
exited <- struct{}{}
}(i)
}
wg1.wait()
for i := 0; i != n; i++ {
for i := int64(0); i != n; i++ {
select {
case <-exited:
t.Fatal("waitGroup released group too soon")
@ -55,19 +61,22 @@ func testWaitGroup(t *testing.T, wg1 *syncWaitGroupCompatible, wg2 *syncWaitGrou
}
wg2.done()
}
for i := 0; i != n; i++ {
for i := int64(0); i != n; i++ {
<-exited // Will block if barrier fails to unlock someone.
}
}
func TestWaitGroup(t *testing.T) {
wg1 := newSyncWgCompatible()
wg2 := newSyncWgCompatible()
checkIfRunningSpawnsAreLeft := spawnPatch(t)
defer checkIfRunningSpawnsAreLeft()
wg1 := newWaitGroup()
wg2 := newWaitGroup()
// Run the same test a few times to ensure barrier is in a proper state.
for i := 0; i != 8; i++ {
for i := 0; i != 1000; i++ {
testWaitGroup(t, wg1, wg2)
}
}
func TestWaitGroupMisuse(t *testing.T) {
@ -77,7 +86,7 @@ func TestWaitGroupMisuse(t *testing.T) {
t.Fatalf("Unexpected panic: %#v", err)
}
}()
wg := newSyncWgCompatible()
wg := newWaitGroup()
wg.add(1)
wg.done()
wg.done()
@ -85,7 +94,9 @@ func TestWaitGroupMisuse(t *testing.T) {
}
func TestAddAfterWait(t *testing.T) {
wg := newSyncWgCompatible()
checkIfRunningSpawnsAreLeft := spawnPatch(t)
defer checkIfRunningSpawnsAreLeft()
wg := newWaitGroup()
wg.add(1)
syncChan := make(chan struct{})
go func() {
@ -98,12 +109,15 @@ func TestAddAfterWait(t *testing.T) {
wg.done()
wg.done()
<-syncChan
}
func TestWaitGroupRace(t *testing.T) {
checkIfRunningSpawnsAreLeft := spawnPatch(t)
defer checkIfRunningSpawnsAreLeft()
// Run this test for about 1ms.
for i := 0; i < 1000; i++ {
wg := newSyncWgCompatible()
wg := newWaitGroup()
n := new(int32)
// spawn goroutine 1
wg.add(1)
@ -123,29 +137,45 @@ func TestWaitGroupRace(t *testing.T) {
t.Fatal("Spurious wakeup from Wait")
}
}
}
func TestWaitGroupAlign(t *testing.T) {
checkIfRunningSpawnsAreLeft := spawnPatch(t)
defer checkIfRunningSpawnsAreLeft()
type X struct {
x byte
wg *syncWaitGroupCompatible
wg *waitGroup
}
x := X{wg: newSyncWgCompatible()}
x := X{wg: newWaitGroup()}
x.wg.add(1)
go func(x *X) {
x.wg.done()
}(&x)
x.wg.wait()
}
func TestWaitAfterAddDoneCounterHasReset(t *testing.T) {
checkIfRunningSpawnsAreLeft := spawnPatch(t)
defer checkIfRunningSpawnsAreLeft()
wg := newWaitGroup()
wg.add(1)
wg.done()
wg.add(1)
wg.done()
wg.wait()
}
func BenchmarkWaitGroupUncontended(b *testing.B) {
type PaddedWaitGroup struct {
*syncWaitGroupCompatible
*waitGroup
pad [128]uint8
}
b.RunParallel(func(pb *testing.PB) {
wg := PaddedWaitGroup{
syncWaitGroupCompatible: newSyncWgCompatible(),
waitGroup: newWaitGroup(),
}
for pb.Next() {
wg.add(1)
@ -156,7 +186,7 @@ func BenchmarkWaitGroupUncontended(b *testing.B) {
}
func benchmarkWaitGroupAdddone(b *testing.B, localWork int) {
wg := newSyncWgCompatible()
wg := newWaitGroup()
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
@ -180,7 +210,7 @@ func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
}
func benchmarkWaitGroupwait(b *testing.B, localWork int) {
wg := newSyncWgCompatible()
wg := newWaitGroup()
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
@ -206,7 +236,7 @@ func BenchmarkWaitGroupActuallywait(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg := newSyncWgCompatible()
wg := newWaitGroup()
wg.add(1)
go func() {
wg.done()