[NOD-1569] Stop using ReceiveFromChanWhenDone (#1151)

This commit is contained in:
Ori Newman 2020-11-25 01:30:07 -08:00 committed by GitHub
parent 45d9b63572
commit 3bad9ec1eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 26 deletions

View File

@ -1,7 +1,6 @@
package handshake package handshake
import ( import (
"sync"
"sync/atomic" "sync/atomic"
"github.com/kaspanet/kaspad/domain" "github.com/kaspanet/kaspad/domain"
@ -16,7 +15,6 @@ import (
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/util/locks"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -38,10 +36,12 @@ func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.N
) (*peerpkg.Peer, error) { ) (*peerpkg.Peer, error) {
// For HandleHandshake to finish, we need to get from the other node // For HandleHandshake to finish, we need to get from the other node
// a version and verack messages, so we increase the wait group by 2 // a version and verack messages, so we set doneCount to 2, decrease it
// and block HandleHandshake with wg.Wait(). // when sending and receiving the version, and close the doneChan when
wg := sync.WaitGroup{} // it's 0. Then we wait for on select for a tick from doneChan or from
wg.Add(2) // errChan.
doneCount := int32(2)
doneChan := make(chan struct{})
isStopping := uint32(0) isStopping := uint32(0)
errChan := make(chan error) errChan := make(chan error)
@ -56,7 +56,9 @@ func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.N
return return
} }
peerAddress = address peerAddress = address
wg.Done() if atomic.AddInt32(&doneCount, -1) == 0 {
close(doneChan)
}
}) })
spawn("HandleHandshake-SendVersion", func() { spawn("HandleHandshake-SendVersion", func() {
@ -65,7 +67,9 @@ func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.N
handleError(err, "SendVersion", &isStopping, errChan) handleError(err, "SendVersion", &isStopping, errChan)
return return
} }
wg.Done() if atomic.AddInt32(&doneCount, -1) == 0 {
close(doneChan)
}
}) })
select { select {
@ -74,7 +78,7 @@ func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.N
return nil, err return nil, err
} }
return nil, nil return nil, nil
case <-locks.ReceiveFromChanWhenDone(func() { wg.Wait() }): case <-doneChan:
} }
err := context.AddToPeers(peer) err := context.AddToPeers(peer)

View File

@ -6,8 +6,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/kaspanet/kaspad/util/locks"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
) )
@ -56,6 +54,16 @@ func Test64IncomingConnections(t *testing.T) {
select { select {
case <-time.After(defaultTimeout): case <-time.After(defaultTimeout):
t.Fatalf("Timeout waiting for block added notification from the bullies") t.Fatalf("Timeout waiting for block added notification from the bullies")
case <-locks.ReceiveFromChanWhenDone(func() { blockAddedWG.Wait() }): case <-ReceiveFromChanWhenDone(func() { blockAddedWG.Wait() }):
} }
} }
// ReceiveFromChanWhenDone takes a blocking function and returns a channel that sends an empty struct when the function is done.
func ReceiveFromChanWhenDone(callback func()) <-chan struct{} {
ch := make(chan struct{})
spawn("ReceiveFromChanWhenDone", func() {
callback()
close(ch)
})
return ch
}

View File

@ -5,8 +5,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/kaspanet/kaspad/util/locks"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
) )
@ -33,7 +31,7 @@ func TestIBD(t *testing.T) {
select { select {
case <-time.After(defaultTimeout): case <-time.After(defaultTimeout):
t.Fatalf("Timeout waiting for IBD to finish. Received %d blocks out of %d", receivedBlocks, numBlocks) t.Fatalf("Timeout waiting for IBD to finish. Received %d blocks out of %d", receivedBlocks, numBlocks)
case <-locks.ReceiveFromChanWhenDone(func() { blockAddedWG.Wait() }): case <-ReceiveFromChanWhenDone(func() { blockAddedWG.Wait() }):
} }
tip1Hash, err := syncer.rpcClient.GetSelectedTipHash() tip1Hash, err := syncer.rpcClient.GetSelectedTipHash()

View File

@ -1,11 +0,0 @@
package locks
// ReceiveFromChanWhenDone takes a blocking function and returns a channel that sends an empty struct when the function is done.
func ReceiveFromChanWhenDone(callback func()) <-chan struct{} {
ch := make(chan struct{})
spawn("ReceiveFromChanWhenDone", func() {
callback()
close(ch)
})
return ch
}