mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-06-13 01:26:43 +00:00
Batch transaction inv messages (#1788)
* Implement EnqueueTransactionIDsForPropagation. * Fix tests. * Fix TxRelayTest. * Add a log for transaction propagation. Co-authored-by: Ori Newman <orinewman1@gmail.com>
This commit is contained in:
parent
487fab0e2b
commit
a4d241c30a
@ -95,15 +95,7 @@ func (f *FlowContext) broadcastTransactionsAfterBlockAdded(
|
||||
for i, txID := range txIDsToRebroadcast {
|
||||
txIDsToBroadcast[offset+i] = txID
|
||||
}
|
||||
|
||||
if len(txIDsToBroadcast) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(txIDsToBroadcast) > appmessage.MaxInvPerTxInvMsg {
|
||||
txIDsToBroadcast = txIDsToBroadcast[:appmessage.MaxInvPerTxInvMsg]
|
||||
}
|
||||
inv := appmessage.NewMsgInvTransaction(txIDsToBroadcast)
|
||||
return f.Broadcast(inv)
|
||||
return f.EnqueueTransactionIDsForPropagation(txIDsToBroadcast)
|
||||
}
|
||||
|
||||
// SharedRequestedBlocks returns a *blockrelay.SharedRequestedBlocks for sharing
|
||||
|
@ -61,6 +61,10 @@ type FlowContext struct {
|
||||
orphans map[externalapi.DomainHash]*externalapi.DomainBlock
|
||||
orphansMutex sync.RWMutex
|
||||
|
||||
transactionIDsToPropagate []*externalapi.DomainTransactionID
|
||||
lastTransactionIDPropagationTime time.Time
|
||||
transactionIDPropagationLock sync.Mutex
|
||||
|
||||
shutdownChan chan struct{}
|
||||
}
|
||||
|
||||
@ -69,17 +73,19 @@ func New(cfg *config.Config, domain domain.Domain, addressManager *addressmanage
|
||||
netAdapter *netadapter.NetAdapter, connectionManager *connmanager.ConnectionManager) *FlowContext {
|
||||
|
||||
return &FlowContext{
|
||||
cfg: cfg,
|
||||
netAdapter: netAdapter,
|
||||
domain: domain,
|
||||
addressManager: addressManager,
|
||||
connectionManager: connectionManager,
|
||||
sharedRequestedTransactions: transactionrelay.NewSharedRequestedTransactions(),
|
||||
sharedRequestedBlocks: blockrelay.NewSharedRequestedBlocks(),
|
||||
peers: make(map[id.ID]*peerpkg.Peer),
|
||||
orphans: make(map[externalapi.DomainHash]*externalapi.DomainBlock),
|
||||
timeStarted: mstime.Now().UnixMilliseconds(),
|
||||
shutdownChan: make(chan struct{}),
|
||||
cfg: cfg,
|
||||
netAdapter: netAdapter,
|
||||
domain: domain,
|
||||
addressManager: addressManager,
|
||||
connectionManager: connectionManager,
|
||||
sharedRequestedTransactions: transactionrelay.NewSharedRequestedTransactions(),
|
||||
sharedRequestedBlocks: blockrelay.NewSharedRequestedBlocks(),
|
||||
peers: make(map[id.ID]*peerpkg.Peer),
|
||||
orphans: make(map[externalapi.DomainHash]*externalapi.DomainBlock),
|
||||
timeStarted: mstime.Now().UnixMilliseconds(),
|
||||
transactionIDsToPropagate: []*externalapi.DomainTransactionID{},
|
||||
lastTransactionIDPropagationTime: time.Now(),
|
||||
shutdownChan: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,9 @@ import (
|
||||
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
|
||||
)
|
||||
|
||||
// TransactionIDPropagationInterval is the interval between transaction IDs propagations
|
||||
const TransactionIDPropagationInterval = 10 * time.Second
|
||||
|
||||
// AddTransaction adds transaction to the mempool and propagates it.
|
||||
func (f *FlowContext) AddTransaction(tx *externalapi.DomainTransaction, allowOrphan bool) error {
|
||||
acceptedTransactions, err := f.Domain().MiningManager().ValidateAndInsertTransaction(tx, true, allowOrphan)
|
||||
@ -17,9 +20,7 @@ func (f *FlowContext) AddTransaction(tx *externalapi.DomainTransaction, allowOrp
|
||||
}
|
||||
|
||||
acceptedTransactionIDs := consensushashing.TransactionIDs(acceptedTransactions)
|
||||
inv := appmessage.NewMsgInvTransaction(acceptedTransactionIDs)
|
||||
|
||||
return f.Broadcast(inv)
|
||||
return f.EnqueueTransactionIDsForPropagation(acceptedTransactionIDs)
|
||||
}
|
||||
|
||||
func (f *FlowContext) shouldRebroadcastTransactions() bool {
|
||||
@ -40,3 +41,39 @@ func (f *FlowContext) OnTransactionAddedToMempool() {
|
||||
f.onTransactionAddedToMempoolHandler()
|
||||
}
|
||||
}
|
||||
|
||||
// EnqueueTransactionIDsForPropagation add the given transactions IDs to a set of IDs to
|
||||
// propagate. The IDs will be broadcast to all peers within a single transaction Inv message.
|
||||
// The broadcast itself may happen only during a subsequent call to this method
|
||||
func (f *FlowContext) EnqueueTransactionIDsForPropagation(transactionIDs []*externalapi.DomainTransactionID) error {
|
||||
if len(transactionIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
f.transactionIDPropagationLock.Lock()
|
||||
defer f.transactionIDPropagationLock.Unlock()
|
||||
|
||||
f.transactionIDsToPropagate = append(f.transactionIDsToPropagate, transactionIDs...)
|
||||
|
||||
if time.Since(f.lastTransactionIDPropagationTime) < TransactionIDPropagationInterval &&
|
||||
len(f.transactionIDsToPropagate) < appmessage.MaxInvPerTxInvMsg {
|
||||
return nil
|
||||
}
|
||||
f.lastTransactionIDPropagationTime = time.Now()
|
||||
|
||||
transactionIDsToBroadcast := f.transactionIDsToPropagate
|
||||
if len(transactionIDsToBroadcast) > appmessage.MaxInvPerTxInvMsg {
|
||||
transactionIDsToBroadcast = transactionIDsToBroadcast[:appmessage.MaxInvPerTxInvMsg]
|
||||
}
|
||||
|
||||
log.Infof("Transaction propagation: broadcasting %d transactions", len(transactionIDsToBroadcast))
|
||||
|
||||
inv := appmessage.NewMsgInvTransaction(transactionIDsToBroadcast)
|
||||
err := f.Broadcast(inv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f.transactionIDsToPropagate = f.transactionIDsToPropagate[len(transactionIDsToBroadcast):]
|
||||
return nil
|
||||
}
|
||||
|
@ -19,8 +19,8 @@ type TransactionsRelayContext interface {
|
||||
NetAdapter() *netadapter.NetAdapter
|
||||
Domain() domain.Domain
|
||||
SharedRequestedTransactions() *SharedRequestedTransactions
|
||||
Broadcast(message appmessage.Message) error
|
||||
OnTransactionAddedToMempool()
|
||||
EnqueueTransactionIDsForPropagation(transactionIDs []*externalapi.DomainTransactionID) error
|
||||
}
|
||||
|
||||
type handleRelayedTransactionsFlow struct {
|
||||
@ -119,8 +119,7 @@ func (flow *handleRelayedTransactionsFlow) readInv() (*appmessage.MsgInvTransact
|
||||
}
|
||||
|
||||
func (flow *handleRelayedTransactionsFlow) broadcastAcceptedTransactions(acceptedTxIDs []*externalapi.DomainTransactionID) error {
|
||||
inv := appmessage.NewMsgInvTransaction(acceptedTxIDs)
|
||||
return flow.Broadcast(inv)
|
||||
return flow.EnqueueTransactionIDsForPropagation(acceptedTxIDs)
|
||||
}
|
||||
|
||||
// readMsgTxOrNotFound returns the next msgTx or msgTransactionNotFound in incomingRoute,
|
||||
|
@ -39,7 +39,7 @@ func (m *mocTransactionsRelayContext) SharedRequestedTransactions() *transaction
|
||||
return m.sharedRequestedTransactions
|
||||
}
|
||||
|
||||
func (m *mocTransactionsRelayContext) Broadcast(appmessage.Message) error {
|
||||
func (m *mocTransactionsRelayContext) EnqueueTransactionIDsForPropagation(transactionIDs []*externalapi.DomainTransactionID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package integration
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"github.com/kaspanet/kaspad/app/protocol/flowcontext"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -44,6 +45,10 @@ func TestTxRelay(t *testing.T) {
|
||||
waitForPayeeToReceiveBlock(t, payeeBlockAddedChan)
|
||||
}
|
||||
|
||||
// Sleep for `TransactionIDPropagationInterval` to make sure that our transaction will
|
||||
// be propagated
|
||||
time.Sleep(flowcontext.TransactionIDPropagationInterval)
|
||||
|
||||
msgTx := generateTx(t, secondBlock.Transactions[transactionhelper.CoinbaseTransactionIndex], payer, payee)
|
||||
domainTransaction := appmessage.MsgTxToDomainTransaction(msgTx)
|
||||
rpcTransaction := appmessage.DomainTransactionToRPCTransaction(domainTransaction)
|
||||
|
Loading…
x
Reference in New Issue
Block a user