From 715cb3b1ac765937faed119386350f697261a60f Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Tue, 2 Aug 2022 10:33:39 +0300 Subject: [PATCH 1/5] Fix a subtle lock sync issue in consensus insert block (#2121) * Manage the locks more tightly and fix a slight and rare sync issue * Extract virtualResolveChunk constant --- domain/consensus/consensus.go | 41 +++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/domain/consensus/consensus.go b/domain/consensus/consensus.go index 8177389b7..20d02be9c 100644 --- a/domain/consensus/consensus.go +++ b/domain/consensus/consensus.go @@ -64,6 +64,12 @@ type consensus struct { virtualNotUpdated bool } +// In order to prevent a situation that the consensus lock is held for too much time, we +// release the lock each time we resolve 100 blocks. +// Note: `virtualResolveChunk` should be smaller than `params.FinalityDuration` in order to avoid a situation +// where UpdatePruningPointByVirtual skips a pruning point. +const virtualResolveChunk = 100 + func (s *consensus) ValidateAndInsertBlockWithTrustedData(block *externalapi.BlockWithTrustedData, validateUTXO bool) error { s.lock.Lock() defer s.lock.Unlock() @@ -198,15 +204,32 @@ func (s *consensus) ValidateAndInsertBlock(block *externalapi.DomainBlock, updat if updateVirtual { s.lock.Lock() if s.virtualNotUpdated { - s.lock.Unlock() - err := s.ResolveVirtual(nil) - if err != nil { - return err + // We enter the loop in locked state + for { + _, isCompletelyResolved, err := s.resolveVirtualChunkNoLock(virtualResolveChunk) + if err != nil { + s.lock.Unlock() + return err + } + if isCompletelyResolved { + // Make sure we enter the block insertion function w/o releasing the lock. + // Otherwise, we might actually enter it in `s.virtualNotUpdated == true` state + _, err = s.validateAndInsertBlockNoLock(block, updateVirtual) + // Finally, unlock for the last iteration and return + s.lock.Unlock() + if err != nil { + return err + } + return nil + } + // Unlock to allow other threads to enter consensus + s.lock.Unlock() + // Lock for the next iteration + s.lock.Lock() } - return s.validateAndInsertBlockWithLock(block, updateVirtual) } - defer s.lock.Unlock() _, err := s.validateAndInsertBlockNoLock(block, updateVirtual) + s.lock.Unlock() if err != nil { return err } @@ -912,11 +935,7 @@ func (s *consensus) ResolveVirtual(progressReportCallback func(uint64, uint64)) progressReportCallback(virtualDAAScoreStart, virtualDAAScore) } - // In order to prevent a situation that the consensus lock is held for too much time, we - // release the lock each time we resolve 100 blocks. - // Note: maxBlocksToResolve should be smaller than `params.FinalityDuration` in order to avoid a situation - // where UpdatePruningPointByVirtual skips a pruning point. - _, isCompletelyResolved, err := s.resolveVirtualChunkWithLock(100) + _, isCompletelyResolved, err := s.resolveVirtualChunkWithLock(virtualResolveChunk) if err != nil { return err } From 9ee409afaa6955b67e8c464af94c46aea9b77179 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Tue, 9 Aug 2022 16:30:24 +0300 Subject: [PATCH 2/5] Fix RPC client memory/goroutine leak (#2122) * Showcase the RPC client memory leak * Fixes an RPC client goroutine leak by properly closing the underlying connection --- .../rpcclient/grpcclient/grpcclient.go | 8 ++++- infrastructure/network/rpcclient/rpcclient.go | 5 ++- testing/integration/rpc_test.go | 32 +++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/infrastructure/network/rpcclient/grpcclient/grpcclient.go b/infrastructure/network/rpcclient/grpcclient/grpcclient.go index e15731aa3..4042c6267 100644 --- a/infrastructure/network/rpcclient/grpcclient/grpcclient.go +++ b/infrastructure/network/rpcclient/grpcclient/grpcclient.go @@ -22,6 +22,7 @@ type OnDisconnectedHandler func() // GRPCClient is a gRPC-based RPC client type GRPCClient struct { stream protowire.RPC_MessageStreamClient + connection *grpc.ClientConn onErrorHandler OnErrorHandler onDisconnectedHandler OnDisconnectedHandler } @@ -43,7 +44,12 @@ func Connect(address string) (*GRPCClient, error) { if err != nil { return nil, errors.Wrapf(err, "error getting client stream for %s", address) } - return &GRPCClient{stream: stream}, nil + return &GRPCClient{stream: stream, connection: gRPCConnection}, nil +} + +// Close closes the underlying grpc connection +func (c *GRPCClient) Close() error { + return c.connection.Close() } // Disconnect disconnects from the RPC server diff --git a/infrastructure/network/rpcclient/rpcclient.go b/infrastructure/network/rpcclient/rpcclient.go index 7256f6c82..e4671c028 100644 --- a/infrastructure/network/rpcclient/rpcclient.go +++ b/infrastructure/network/rpcclient/rpcclient.go @@ -143,6 +143,9 @@ func (c *RPCClient) handleClientDisconnected() { } func (c *RPCClient) handleClientError(err error) { + if atomic.LoadUint32(&c.isClosed) == 1 { + return + } log.Warnf("Received error from client: %s", err) c.handleClientDisconnected() } @@ -159,7 +162,7 @@ func (c *RPCClient) Close() error { return errors.Errorf("Cannot close a client that had already been closed") } c.rpcRouter.router.Close() - return nil + return c.GRPCClient.Close() } // Address returns the address the RPC client connected to diff --git a/testing/integration/rpc_test.go b/testing/integration/rpc_test.go index 44fb1e40d..76caf1ff5 100644 --- a/testing/integration/rpc_test.go +++ b/testing/integration/rpc_test.go @@ -2,6 +2,7 @@ package integration import ( "github.com/kaspanet/kaspad/infrastructure/config" + "runtime" "testing" "time" @@ -26,6 +27,37 @@ func newTestRPCClient(rpcAddress string) (*testRPCClient, error) { }, nil } +func connectAndClose(rpcAddress string) error { + client, err := rpcclient.NewRPCClient(rpcAddress) + if err != nil { + return err + } + defer client.Close() + return nil +} + +func TestRPCClientGoroutineLeak(t *testing.T) { + _, teardown := setupHarness(t, &harnessParams{ + p2pAddress: p2pAddress1, + rpcAddress: rpcAddress1, + miningAddress: miningAddress1, + miningAddressPrivateKey: miningAddress1PrivateKey, + }) + defer teardown() + numGoroutinesBefore := runtime.NumGoroutine() + for i := 1; i < 100; i++ { + err := connectAndClose(rpcAddress1) + if err != nil { + t.Fatalf("Failed to set up an RPC client: %s", err) + } + time.Sleep(10 * time.Millisecond) + if runtime.NumGoroutine() > numGoroutinesBefore+10 { + t.Fatalf("Number of goroutines is increasing for each RPC client open (%d -> %d), which indicates a memory leak", + numGoroutinesBefore, runtime.NumGoroutine()) + } + } +} + func TestRPCMaxInboundConnections(t *testing.T) { harness, teardown := setupHarness(t, &harnessParams{ p2pAddress: p2pAddress1, From 266ec6c270342869f51adaaa117280ddbe6e0265 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Wed, 17 Aug 2022 15:56:53 +0300 Subject: [PATCH 3/5] Calculate pruning point utxo set from acceptance data (#2123) * Calc new pruning point utxo diff through chain acceptance data * Fix daa score to chain block --- .../pruningmanager/pruningmanager.go | 97 ++++++------------- 1 file changed, 30 insertions(+), 67 deletions(-) diff --git a/domain/consensus/processes/pruningmanager/pruningmanager.go b/domain/consensus/processes/pruningmanager/pruningmanager.go index dd5798afd..e0b7ca4c3 100644 --- a/domain/consensus/processes/pruningmanager/pruningmanager.go +++ b/domain/consensus/processes/pruningmanager/pruningmanager.go @@ -772,78 +772,41 @@ func (pm *pruningManager) calculateDiffBetweenPreviousAndCurrentPruningPoints(st if err != nil { return nil, err } - currentPruningGhostDAG, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, currentPruningHash, false) + + utxoDiff := utxo.NewMutableUTXODiff() + + iterator, err := pm.dagTraversalManager.SelectedChildIterator(stagingArea, currentPruningHash, previousPruningHash, false) if err != nil { return nil, err } - previousPruningGhostDAG, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, previousPruningHash, false) - if err != nil { - return nil, err + defer iterator.Close() + + for ok := iterator.First(); ok; ok = iterator.Next() { + child, err := iterator.Get() + if err != nil { + return nil, err + } + chainBlockAcceptanceData, err := pm.acceptanceDataStore.Get(pm.databaseContext, stagingArea, child) + if err != nil { + return nil, err + } + chainBlockHeader, err := pm.blockHeaderStore.BlockHeader(pm.databaseContext, stagingArea, child) + if err != nil { + return nil, err + } + for _, blockAcceptanceData := range chainBlockAcceptanceData { + for _, transactionAcceptanceData := range blockAcceptanceData.TransactionAcceptanceData { + if transactionAcceptanceData.IsAccepted { + err = utxoDiff.AddTransaction(transactionAcceptanceData.Transaction, chainBlockHeader.DAAScore()) + if err != nil { + return nil, err + } + } + } + } } - currentPruningCurrentDiffChild := currentPruningHash - previousPruningCurrentDiffChild := previousPruningHash - // We need to use BlueWork because it's the only thing that's monotonic in the whole DAG - // We use the BlueWork to know which point is currently lower on the DAG so we can keep climbing its children, - // that way we keep climbing on the lowest point until they both reach the exact same descendant - currentPruningCurrentDiffChildBlueWork := currentPruningGhostDAG.BlueWork() - previousPruningCurrentDiffChildBlueWork := previousPruningGhostDAG.BlueWork() - - var diffHashesFromPrevious []*externalapi.DomainHash - var diffHashesFromCurrent []*externalapi.DomainHash - for { - // if currentPruningCurrentDiffChildBlueWork > previousPruningCurrentDiffChildBlueWork - if currentPruningCurrentDiffChildBlueWork.Cmp(previousPruningCurrentDiffChildBlueWork) == 1 { - diffHashesFromPrevious = append(diffHashesFromPrevious, previousPruningCurrentDiffChild) - previousPruningCurrentDiffChild, err = pm.utxoDiffStore.UTXODiffChild(pm.databaseContext, stagingArea, previousPruningCurrentDiffChild) - if err != nil { - return nil, err - } - diffChildGhostDag, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, previousPruningCurrentDiffChild, false) - if err != nil { - return nil, err - } - previousPruningCurrentDiffChildBlueWork = diffChildGhostDag.BlueWork() - } else if currentPruningCurrentDiffChild.Equal(previousPruningCurrentDiffChild) { - break - } else { - diffHashesFromCurrent = append(diffHashesFromCurrent, currentPruningCurrentDiffChild) - currentPruningCurrentDiffChild, err = pm.utxoDiffStore.UTXODiffChild(pm.databaseContext, stagingArea, currentPruningCurrentDiffChild) - if err != nil { - return nil, err - } - diffChildGhostDag, err := pm.ghostdagDataStore.Get(pm.databaseContext, stagingArea, currentPruningCurrentDiffChild, false) - if err != nil { - return nil, err - } - currentPruningCurrentDiffChildBlueWork = diffChildGhostDag.BlueWork() - } - } - // The order in which we apply the diffs should be from top to bottom, but we traversed from bottom to top - // so we apply the diffs in reverse order. - oldDiff := utxo.NewMutableUTXODiff() - for i := len(diffHashesFromPrevious) - 1; i >= 0; i-- { - utxoDiff, err := pm.utxoDiffStore.UTXODiff(pm.databaseContext, stagingArea, diffHashesFromPrevious[i]) - if err != nil { - return nil, err - } - err = oldDiff.WithDiffInPlace(utxoDiff) - if err != nil { - return nil, err - } - } - newDiff := utxo.NewMutableUTXODiff() - for i := len(diffHashesFromCurrent) - 1; i >= 0; i-- { - utxoDiff, err := pm.utxoDiffStore.UTXODiff(pm.databaseContext, stagingArea, diffHashesFromCurrent[i]) - if err != nil { - return nil, err - } - err = newDiff.WithDiffInPlace(utxoDiff) - if err != nil { - return nil, err - } - } - return oldDiff.DiffFrom(newDiff.ToImmutable()) + return utxoDiff.ToImmutable(), err } // finalityScore is the number of finality intervals passed since From 3f80638c86d334379b43ffc69bbfad6ba6d0fe09 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Sun, 21 Aug 2022 01:26:03 +0300 Subject: [PATCH 4/5] Add missing locks to notification listener modifications (#2124) --- app/rpc/manager.go | 13 +------ app/rpc/rpccontext/notificationmanager.go | 37 +++++++++++++++---- app/rpc/rpchandlers/notify_utxos_changed.go | 2 +- .../stop_notifying_utxos_changed.go | 2 +- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/app/rpc/manager.go b/app/rpc/manager.go index 137fd6555..c9479666b 100644 --- a/app/rpc/manager.go +++ b/app/rpc/manager.go @@ -223,18 +223,9 @@ func (m *Manager) notifyVirtualSelectedParentChainChanged(virtualChangeSet *exte onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualSelectedParentChainChanged") defer onEnd() - listenersThatPropagateSelectedParentChanged := - m.context.NotificationManager.AllListenersThatPropagateVirtualSelectedParentChainChanged() - if len(listenersThatPropagateSelectedParentChanged) > 0 { - // Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener. - includeAcceptedTransactionIDs := false - for _, listener := range listenersThatPropagateSelectedParentChanged { - if listener.IncludeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications() { - includeAcceptedTransactionIDs = true - break - } - } + hasListeners, includeAcceptedTransactionIDs := m.context.NotificationManager.HasListenersThatPropagateVirtualSelectedParentChainChanged() + if hasListeners { notification, err := m.context.ConvertVirtualSelectedParentChainChangesToChainChangedNotificationMessage( virtualChangeSet.VirtualSelectedParentChainChanges, includeAcceptedTransactionIDs) if err != nil { diff --git a/app/rpc/rpccontext/notificationmanager.go b/app/rpc/rpccontext/notificationmanager.go index 0ba132f17..bcd2db4d7 100644 --- a/app/rpc/rpccontext/notificationmanager.go +++ b/app/rpc/rpccontext/notificationmanager.go @@ -142,16 +142,29 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged( return nil } -// AllListenersThatPropagateVirtualSelectedParentChainChanged returns true if there's any listener that is -// subscribed to VirtualSelectedParentChainChanged notifications. -func (nm *NotificationManager) AllListenersThatPropagateVirtualSelectedParentChainChanged() []*NotificationListener { - var listenersThatPropagate []*NotificationListener +// HasListenersThatPropagateVirtualSelectedParentChainChanged returns whether there's any listener that is +// subscribed to VirtualSelectedParentChainChanged notifications as well as checks if any such listener requested +// to include AcceptedTransactionIDs. +func (nm *NotificationManager) HasListenersThatPropagateVirtualSelectedParentChainChanged() (hasListeners, hasListenersThatRequireAcceptedTransactionIDs bool) { + + nm.RLock() + defer nm.RUnlock() + + hasListeners = false + hasListenersThatRequireAcceptedTransactionIDs = false + for _, listener := range nm.listeners { if listener.propagateVirtualSelectedParentChainChangedNotifications { - listenersThatPropagate = append(listenersThatPropagate, listener) + hasListeners = true + // Generating acceptedTransactionIDs is a heavy operation, so we check if it's needed by any listener. + if listener.includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications { + hasListenersThatRequireAcceptedTransactionIDs = true + break + } } } - return listenersThatPropagate + + return hasListeners, hasListenersThatRequireAcceptedTransactionIDs } // NotifyFinalityConflict notifies the notification manager that there's a finality conflict in the DAG @@ -338,7 +351,11 @@ func (nl *NotificationListener) PropagateFinalityConflictResolvedNotifications() // to the remote listener for the given addresses. Subsequent calls instruct the listener to // send UTXOs changed notifications for those addresses along with the old ones. Duplicate addresses // are ignored. -func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) { +func (nm *NotificationManager) PropagateUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) { + // Apply a write-lock since the internal listener address map is modified + nm.Lock() + defer nm.Unlock() + if !nl.propagateUTXOsChangedNotifications { nl.propagateUTXOsChangedNotifications = true nl.propagateUTXOsChangedNotificationAddresses = @@ -353,7 +370,11 @@ func (nl *NotificationListener) PropagateUTXOsChangedNotifications(addresses []* // StopPropagatingUTXOsChangedNotifications instructs the listener to stop sending UTXOs // changed notifications to the remote listener for the given addresses. Addresses for which // notifications are not currently sent are ignored. -func (nl *NotificationListener) StopPropagatingUTXOsChangedNotifications(addresses []*UTXOsChangedNotificationAddress) { +func (nm *NotificationManager) StopPropagatingUTXOsChangedNotifications(nl *NotificationListener, addresses []*UTXOsChangedNotificationAddress) { + // Apply a write-lock since the internal listener address map is modified + nm.Lock() + defer nm.Unlock() + if !nl.propagateUTXOsChangedNotifications { return } diff --git a/app/rpc/rpchandlers/notify_utxos_changed.go b/app/rpc/rpchandlers/notify_utxos_changed.go index 41ffe0dd3..e43f10204 100644 --- a/app/rpc/rpchandlers/notify_utxos_changed.go +++ b/app/rpc/rpchandlers/notify_utxos_changed.go @@ -26,7 +26,7 @@ func HandleNotifyUTXOsChanged(context *rpccontext.Context, router *router.Router if err != nil { return nil, err } - listener.PropagateUTXOsChangedNotifications(addresses) + context.NotificationManager.PropagateUTXOsChangedNotifications(listener, addresses) response := appmessage.NewNotifyUTXOsChangedResponseMessage() return response, nil diff --git a/app/rpc/rpchandlers/stop_notifying_utxos_changed.go b/app/rpc/rpchandlers/stop_notifying_utxos_changed.go index 0da89a9bf..2ef376397 100644 --- a/app/rpc/rpchandlers/stop_notifying_utxos_changed.go +++ b/app/rpc/rpchandlers/stop_notifying_utxos_changed.go @@ -26,7 +26,7 @@ func HandleStopNotifyingUTXOsChanged(context *rpccontext.Context, router *router if err != nil { return nil, err } - listener.StopPropagatingUTXOsChangedNotifications(addresses) + context.NotificationManager.StopPropagatingUTXOsChangedNotifications(listener, addresses) response := appmessage.NewStopNotifyingUTXOsChangedResponseMessage() return response, nil From d941c73701fc3eacd77dd9344cf0e471d74d1f3b Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Sun, 21 Aug 2022 02:32:40 +0300 Subject: [PATCH 5/5] Change testnet dnsseeder (#2126) Co-authored-by: Michael Sutton --- domain/dagconfig/params.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/dagconfig/params.go b/domain/dagconfig/params.go index 52b3cfb6e..7ef89bd31 100644 --- a/domain/dagconfig/params.go +++ b/domain/dagconfig/params.go @@ -296,7 +296,7 @@ var TestnetParams = Params{ Net: appmessage.Testnet, RPCPort: "16210", DefaultPort: "16211", - DNSSeeds: []string{"testnet-9-dnsseed.daglabs-dev.com"}, + DNSSeeds: []string{"testnet-10-dnsseed.kas.pa"}, // DAG parameters GenesisBlock: &testnetGenesisBlock,