Merge branch 'dev' into replace-daglabs-dnsseeder

This commit is contained in:
Michael Sutton 2022-08-21 02:33:01 +03:00 committed by GitHub
commit 5dbbdef0eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 137 additions and 102 deletions

View File

@ -223,18 +223,9 @@ func (m *Manager) notifyVirtualSelectedParentChainChanged(virtualChangeSet *exte
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentChainChanged") onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentChainChanged")
defer onEnd() defer onEnd()
listenersThatPropagateSelectedParentChanged := hasListeners, includeAcceptedTransactionIDs := m.context.NotificationManager.HasListenersThatPropagateVirtualSelectedParentChainChanged()
m.context.NotificationManager.AllListenersThatPropagateVirtualSelectedParentChainChanged()
if len(listenersThatPropagateSelectedParentChanged) > 0 {
// Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener.
includeAcceptedTransactionIDs := false
for _, listener := range listenersThatPropagateSelectedParentChanged {
if listener.IncludeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications() {
includeAcceptedTransactionIDs = true
break
}
}
if hasListeners {
notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage( notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage(
virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs) virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs)
if err != nil { if err != nil {

View File

@ -142,16 +142,29 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged(
return nil return nil
} }
// AllListenersThatPropagateVirtualSelectedParentChainChanged returns true if there's any listener that is // HasListenersThatPropagateVirtualSelectedParentChainChanged returns whether there's any listener that is
// subscribed to VirtualSelectedParentChainChanged notifications. // subscribed to VirtualSelectedParentChainChanged notifications as well as checks if any such listener requested
func (nm *NotificationManager) AllListenersThatPropagateVirtualSelectedParentChainChanged() []*NotificationListener { // to include AcceptedTransactionIDs.
var listenersThatPropagate []*NotificationListener func (nm *NotificationManager) HasListenersThatPropagateVirtualSelectedParentChainChanged() (hasListeners, hasListenersThatRequireAcceptedTransactionIDs bool) {
nm.RLock()
defer nm.RUnlock()
hasListeners = false
hasListenersThatRequireAcceptedTransactionIDs = false
for _, listener := range nm.listeners { for _, listener := range nm.listeners {
if listener.propagateVirtualSelectedParentChainChangedNotifications { if listener.propagateVirtualSelectedParentChainChangedNotifications {
listenersThatPropagate = append(listenersThatPropagate, listener) hasListeners = true
// Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener.
if listener.includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications {
hasListenersThatRequireAcceptedTransactionIDs = true
break
}
} }
} }
return listenersThatPropagate
return hasListeners, hasListenersThatRequireAcceptedTransactionIDs
} }
// NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG // NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG
@ -338,7 +351,11 @@ func (nl *NotificationListener) PropagateFinalityConflictResolvedNotifications()
// to the remote listener for the given addresses. Subsequent calls instruct the listener to // to the remote listener for the given addresses. Subsequent calls instruct the listener to
// send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses // send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses
// are ignored. // are ignored.
func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) { func (nm *NotificationManager) PropagateUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) {
// Apply a write-lock since the internal listener address map is modified
nm.Lock()
defer nm.Unlock()
if !nl.propagateUTXOsChangedNotifications { if !nl.propagateUTXOsChangedNotifications {
nl.propagateUTXOsChangedNotifications = true nl.propagateUTXOsChangedNotifications = true
nl.propagateUTXOsChangedNotificationAddresses = nl.propagateUTXOsChangedNotificationAddresses =
@ -353,7 +370,11 @@ func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []*
// StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs // StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs
// changed notifications to the remote listener for the given addresses. Addresses for which // changed notifications to the remote listener for the given addresses. Addresses for which
// notifications are not currently sent are ignored. // notifications are not currently sent are ignored.
func (nl *NotificationListener) StopPropagatingUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) { func (nm *NotificationManager) StopPropagatingUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) {
// Apply a write-lock since the internal listener address map is modified
nm.Lock()
defer nm.Unlock()
if !nl.propagateUTXOsChangedNotifications { if !nl.propagateUTXOsChangedNotifications {
return return
} }

View File

@ -26,7 +26,7 @@ func HandleNotifyUTXOsChanged(context *rpccontext.Context, router *router.Router
if err != nil { if err != nil {
return nil, err return nil, err
} }
listener.PropagateUTXOsChangedNotifications(addresses) context.NotificationManager.PropagateUTXOsChangedNotifications(listener, addresses)
response := appmessage.NewNotifyUTXOsChangedResponseMessage() response := appmessage.NewNotifyUTXOsChangedResponseMessage()
return response, nil return response, nil

View File

@ -26,7 +26,7 @@ func HandleStopNotifyingUTXOsChanged(context *rpccontext.Context, router *router
if err != nil { if err != nil {
return nil, err return nil, err
} }
listener.StopPropagatingUTXOsChangedNotifications(addresses) context.NotificationManager.StopPropagatingUTXOsChangedNotifications(listener, addresses)
response := appmessage.NewStopNotifyingUTXOsChangedResponseMessage() response := appmessage.NewStopNotifyingUTXOsChangedResponseMessage()
return response, nil return response, nil

View File

@ -64,6 +64,12 @@ type consensus struct {
virtualNotUpdated bool virtualNotUpdated bool
} }
// In order to prevent a situation that the consensus lock is held for too much time, we
// release the lock each time we resolve 100 blocks.
// Note: `virtualResolveChunk` should be smaller than `params.FinalityDuration` in order to avoid a situation
// where UpdatePruningPointByVirtual skips a pruning point.
const virtualResolveChunk = 100
func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) error { func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
@ -198,15 +204,32 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, updat
if updateVirtual { if updateVirtual {
s.lock.Lock() s.lock.Lock()
if s.virtualNotUpdated { if s.virtualNotUpdated {
s.lock.Unlock() // We enter the loop in locked state
err := s.ResolveVirtual(nil) for {
if err != nil { _, isCompletelyResolved, err := s.resolveVirtualChunkNoLock(virtualResolveChunk)
return err if err != nil {
s.lock.Unlock()
return err
}
if isCompletelyResolved {
// Make sure we enter the block insertion function w/o releasing the lock.
// Otherwise, we might actually enter it in `s.virtualNotUpdated == true` state
_, err = s.validateAndInsertBlockNoLock(block, updateVirtual)
// Finally, unlock for the last iteration and return
s.lock.Unlock()
if err != nil {
return err
}
return nil
}
// Unlock to allow other threads to enter consensus
s.lock.Unlock()
// Lock for the next iteration
s.lock.Lock()
} }
return s.validateAndInsertBlockWithLock(block, updateVirtual)
} }
defer s.lock.Unlock()
_, err := s.validateAndInsertBlockNoLock(block, updateVirtual) _, err := s.validateAndInsertBlockNoLock(block, updateVirtual)
s.lock.Unlock()
if err != nil { if err != nil {
return err return err
} }
@ -912,11 +935,7 @@ func (s *consensus) ResolveVirtual(progressReportCallback func(uint64, uint64))
progressReportCallback(virtualDAAScoreStart, virtualDAAScore) progressReportCallback(virtualDAAScoreStart, virtualDAAScore)
} }
// In order to prevent a situation that the consensus lock is held for too much time, we _, isCompletelyResolved, err := s.resolveVirtualChunkWithLock(virtualResolveChunk)
// release the lock each time we resolve 100 blocks.
// Note: maxBlocksToResolve should be smaller than `params.FinalityDuration` in order to avoid a situation
// where UpdatePruningPointByVirtual skips a pruning point.
_, isCompletelyResolved, err := s.resolveVirtualChunkWithLock(100)
if err != nil { if err != nil {
return err return err
} }

View File

@ -772,78 +772,41 @@ func (pm *pruningManager) calculateDiffBetweenPreviousAndCurrentPruningPoints(st
if err != nil { if err != nil {
return nil, err return nil, err
} }
currentPruningGhostDAG, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, currentPruningHash, false)
utxoDiff := utxo.NewMutableUTXODiff()
iterator, err := pm.dagTraversalManager.SelectedChildIterator(stagingArea, currentPruningHash, previousPruningHash, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
previousPruningGhostDAG, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, previousPruningHash, false) defer iterator.Close()
if err != nil {
return nil, err for ok := iterator.First(); ok; ok = iterator.Next() {
child, err := iterator.Get()
if err != nil {
return nil, err
}
chainBlockAcceptanceData, err := pm.acceptanceDataStore.Get(pm.databaseContext, stagingArea, child)
if err != nil {
return nil, err
}
chainBlockHeader, err := pm.blockHeaderStore.BlockHeader(pm.databaseContext, stagingArea, child)
if err != nil {
return nil, err
}
for _, blockAcceptanceData := range chainBlockAcceptanceData {
for _, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData {
if transactionAcceptanceData.IsAccepted {
err = utxoDiff.AddTransaction(transactionAcceptanceData.Transaction, chainBlockHeader.DAAScore())
if err != nil {
return nil, err
}
}
}
}
} }
currentPruningCurrentDiffChild := currentPruningHash return utxoDiff.ToImmutable(), err
previousPruningCurrentDiffChild := previousPruningHash
// We need to use BlueWork because it's the only thing that's monotonic in the whole DAG
// We use the BlueWork to know which point is currently lower on the DAG so we can keep climbing its children,
// that way we keep climbing on the lowest point until they both reach the exact same descendant
currentPruningCurrentDiffChildBlueWork := currentPruningGhostDAG.BlueWork()
previousPruningCurrentDiffChildBlueWork := previousPruningGhostDAG.BlueWork()
var diffHashesFromPrevious []*externalapi.DomainHash
var diffHashesFromCurrent []*externalapi.DomainHash
for {
// if currentPruningCurrentDiffChildBlueWork > previousPruningCurrentDiffChildBlueWork
if currentPruningCurrentDiffChildBlueWork.Cmp(previousPruningCurrentDiffChildBlueWork) == 1 {
diffHashesFromPrevious = append(diffHashesFromPrevious, previousPruningCurrentDiffChild)
previousPruningCurrentDiffChild, err = pm.utxoDiffStore.UTXODiffChild(pm.databaseContext, stagingArea, previousPruningCurrentDiffChild)
if err != nil {
return nil, err
}
diffChildGhostDag, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, previousPruningCurrentDiffChild, false)
if err != nil {
return nil, err
}
previousPruningCurrentDiffChildBlueWork = diffChildGhostDag.BlueWork()
} else if currentPruningCurrentDiffChild.Equal(previousPruningCurrentDiffChild) {
break
} else {
diffHashesFromCurrent = append(diffHashesFromCurrent, currentPruningCurrentDiffChild)
currentPruningCurrentDiffChild, err = pm.utxoDiffStore.UTXODiffChild(pm.databaseContext, stagingArea, currentPruningCurrentDiffChild)
if err != nil {
return nil, err
}
diffChildGhostDag, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, currentPruningCurrentDiffChild, false)
if err != nil {
return nil, err
}
currentPruningCurrentDiffChildBlueWork = diffChildGhostDag.BlueWork()
}
}
// The order in which we apply the diffs should be from top to bottom, but we traversed from bottom to top
// so we apply the diffs in reverse order.
oldDiff := utxo.NewMutableUTXODiff()
for i := len(diffHashesFromPrevious) - 1; i >= 0; i-- {
utxoDiff, err := pm.utxoDiffStore.UTXODiff(pm.databaseContext, stagingArea, diffHashesFromPrevious[i])
if err != nil {
return nil, err
}
err = oldDiff.WithDiffInPlace(utxoDiff)
if err != nil {
return nil, err
}
}
newDiff := utxo.NewMutableUTXODiff()
for i := len(diffHashesFromCurrent) - 1; i >= 0; i-- {
utxoDiff, err := pm.utxoDiffStore.UTXODiff(pm.databaseContext, stagingArea, diffHashesFromCurrent[i])
if err != nil {
return nil, err
}
err = newDiff.WithDiffInPlace(utxoDiff)
if err != nil {
return nil, err
}
}
return oldDiff.DiffFrom(newDiff.ToImmutable())
} }
// finalityScore is the number of finality intervals passed since // finalityScore is the number of finality intervals passed since

View File

@ -297,7 +297,7 @@ var TestnetParams = Params{
Net: appmessage.Testnet, Net: appmessage.Testnet,
RPCPort: "16210", RPCPort: "16210",
DefaultPort: "16211", DefaultPort: "16211",
DNSSeeds: []string{"testnet-9-dnsseed.daglabs-dev.com"}, DNSSeeds: []string{"testnet-10-dnsseed.kas.pa"},
// DAG parameters // DAG parameters
GenesisBlock: &testnetGenesisBlock, GenesisBlock: &testnetGenesisBlock,

View File

@ -22,6 +22,7 @@ type OnDisconnectedHandler func()
// GRPCClient is a gRPC-based RPC client // GRPCClient is a gRPC-based RPC client
type GRPCClient struct { type GRPCClient struct {
stream protowire.RPC_MessageStreamClient stream protowire.RPC_MessageStreamClient
connection *grpc.ClientConn
onErrorHandler OnErrorHandler onErrorHandler OnErrorHandler
onDisconnectedHandler OnDisconnectedHandler onDisconnectedHandler OnDisconnectedHandler
} }
@ -43,7 +44,12 @@ func Connect(address string) (*GRPCClient, error) {
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error getting client stream for %s", address) return nil, errors.Wrapf(err, "error getting client stream for %s", address)
} }
return &GRPCClient{stream: stream}, nil return &GRPCClient{stream: stream, connection: gRPCConnection}, nil
}
// Close closes the underlying grpc connection
func (c *GRPCClient) Close() error {
return c.connection.Close()
} }
// Disconnect disconnects from the RPC server // Disconnect disconnects from the RPC server

View File

@ -143,6 +143,9 @@ func (c *RPCClient) handleClientDisconnected() {
} }
func (c *RPCClient) handleClientError(err error) { func (c *RPCClient) handleClientError(err error) {
if atomic.LoadUint32(&c.isClosed) == 1 {
return
}
log.Warnf("Received error from client: %s", err) log.Warnf("Received error from client: %s", err)
c.handleClientDisconnected() c.handleClientDisconnected()
} }
@ -159,7 +162,7 @@ func (c *RPCClient) Close() error {
return errors.Errorf("Cannot close a client that had already been closed") return errors.Errorf("Cannot close a client that had already been closed")
} }
c.rpcRouter.router.Close() c.rpcRouter.router.Close()
return nil return c.GRPCClient.Close()
} }
// Address returns the address the RPC client connected to // Address returns the address the RPC client connected to

View File

@ -2,6 +2,7 @@ package integration
import ( import (
"github.com/kaspanet/kaspad/infrastructure/config" "github.com/kaspanet/kaspad/infrastructure/config"
"runtime"
"testing" "testing"
"time" "time"
@ -26,6 +27,37 @@ func newTestRPCClient(rpcAddress string) (*testRPCClient, error) {
}, nil }, nil
} }
func connectAndClose(rpcAddress string) error {
client, err := rpcclient.NewRPCClient(rpcAddress)
if err != nil {
return err
}
defer client.Close()
return nil
}
func TestRPCClientGoroutineLeak(t *testing.T) {
_, teardown := setupHarness(t, &harnessParams{
p2pAddress: p2pAddress1,
rpcAddress: rpcAddress1,
miningAddress: miningAddress1,
miningAddressPrivateKey: miningAddress1PrivateKey,
})
defer teardown()
numGoroutinesBefore := runtime.NumGoroutine()
for i := 1; i < 100; i++ {
err := connectAndClose(rpcAddress1)
if err != nil {
t.Fatalf("Failed to set up an RPC client: %s", err)
}
time.Sleep(10 * time.Millisecond)
if runtime.NumGoroutine() > numGoroutinesBefore+10 {
t.Fatalf("Number of goroutines is increasing for each RPC client open (%d -> %d), which indicates a memory leak",
numGoroutinesBefore, runtime.NumGoroutine())
}
}
}
func TestRPCMaxInboundConnections(t *testing.T) { func TestRPCMaxInboundConnections(t *testing.T) {
harness, teardown := setupHarness(t, &harnessParams{ harness, teardown := setupHarness(t, &harnessParams{
p2pAddress: p2pAddress1, p2pAddress: p2pAddress1,