mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-09-13 13:00:10 +00:00

* [NOD-1162] Separate kaspad to it's own package, so that I can use it out of integration test * [NOD-1162] Begin integration tests * [NOD-1162] [FIX] Assign cfg to RPCServer * [NOD-1162] Basic integration test ready * [NOD-1162] Wait for connection for real * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] Make connect something that can be invoked in middle of test * [NOD-1162] Complete first integration test * [NOD-1162] Undo refactor error * [NOD-1162] Rename Kaspad to App * [NOD-1162] Convert checking connection to polling * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] Add comment * [NOD-1162] Added support for 3 nodes and clients in integration tests * [NOD-1162] Add third node to integration test * [NOD-1192] Use lock-less functions in TxPool.HandleNewBlock * [NOD-1192] Broadcast transactions only if there's more then 0 * [NOD-1162] Removed double waitTillNextIteration * [NOD-1192] Rename: broadcastTransactions -> broadcastTransactionsAfterBlockAdded * [NOD-1162] Call NotifyBlocks on client3 as well * [NOD-1162] ErrTimeout and ErrRouteClosed should be ProtocolErrors * [NOD-1162] Added comment and removed redundant type PeerAddedCallback * [NOD-1162] Revert overly eager rename * [NOD-1162] Move DisalbeTLS to common config + minimize call for ioutil.TempDir() * [NOD-1162] Add some clarifications in code * [NOD-1193] Skip closed connections in NetAdapter.Broadcast * [NOD-1193] Make sure to protect connectionsToRouters from concurrent access * [NOD-1162] Add _test to all files in integration package * [NOD-1162] Introduced appHarness to better encapsulate a single node * [NOD-1162] Removed onChainChanged handler * [NOD-1162] Remove redundant closure * [NOD-1162] Correctly mark integration_test config as Simnet * [NOD-1162] Rename app.ID -> app.P2PNodeID * [NOD-1162] Move TestIntegrationBasicSync to basic_sync_test.go * [NOD-1210] Made it possible to setup any number of harnesses needed * [NOD-1210] Rename appHarness1/2 to incoming/outgoing in connect function * [NOD-1210] Add the 117-incoming-connections integration test * [NOD-1210] Delete 117-incoming-connections test because it opens too much files * [NOD-1210] Added function to notify of blocks conveniently * [NOD-1210] Added function to mine a block from-A-to-Z * [NOD-1210] Added IBD integration test * [NOD-1210] Finish test for IBD and fix bug where requestSelectedTipsIfRequired ran in handshake's goroutine * [NOD-1210] Set log level to debug * [NOD-1211] Add test for transaction relay * [NOD-1211] Compare fix incorrect comaprison in KaspadMessage_RequestTransactions.fromWireMessage * [NOD-1211] Return ok instead of err from FetchTxDesc and FetchTransaction * [NOD-1211] Added MsgTransactionNotFound type * [NOD-1211] Added HandlRequestedTransactions flow * [NOD-1211] Wait for blocks to be accepted before moving forward * [NOD-1211] Rename CmdNotFound to CmdTransactionNotFound * [NOD-1211] Rename: requestAndSolveTemplate -> mineNextBlock * [NOD-1211] Renamed incoming/outgoing to appHarness1/appHarness2 in isConnected * [NOD-1211] Move check of Hash == nil to outside wireHashToProto * [NOD-1211] Instantiate payloadHash before *x
227 lines
7.0 KiB
Go
227 lines
7.0 KiB
Go
package relaytransactions
|
|
|
|
import (
|
|
"github.com/kaspanet/kaspad/blockdag"
|
|
"github.com/kaspanet/kaspad/mempool"
|
|
"github.com/kaspanet/kaspad/netadapter"
|
|
"github.com/kaspanet/kaspad/netadapter/router"
|
|
"github.com/kaspanet/kaspad/protocol/common"
|
|
"github.com/kaspanet/kaspad/protocol/protocolerrors"
|
|
"github.com/kaspanet/kaspad/util"
|
|
"github.com/kaspanet/kaspad/util/daghash"
|
|
"github.com/kaspanet/kaspad/wire"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// TransactionsRelayContext is the interface for the context needed for the
|
|
// HandleRelayedTransactions and HandleRequestedTransactions flows.
|
|
type TransactionsRelayContext interface {
|
|
NetAdapter() *netadapter.NetAdapter
|
|
DAG() *blockdag.BlockDAG
|
|
SharedRequestedTransactions() *SharedRequestedTransactions
|
|
TxPool() *mempool.TxPool
|
|
Broadcast(message wire.Message) error
|
|
}
|
|
|
|
type handleRelayedTransactionsFlow struct {
|
|
TransactionsRelayContext
|
|
incomingRoute, outgoingRoute *router.Route
|
|
invsQueue []*wire.MsgInvTransaction
|
|
}
|
|
|
|
// HandleRelayedTransactions listens to wire.MsgInvTransaction messages, requests their corresponding transactions if they
|
|
// are missing, adds them to the mempool and propagates them to the rest of the network.
|
|
func HandleRelayedTransactions(context TransactionsRelayContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
|
|
flow := &handleRelayedTransactionsFlow{
|
|
TransactionsRelayContext: context,
|
|
incomingRoute: incomingRoute,
|
|
outgoingRoute: outgoingRoute,
|
|
invsQueue: make([]*wire.MsgInvTransaction, 0),
|
|
}
|
|
return flow.start()
|
|
}
|
|
|
|
func (flow *handleRelayedTransactionsFlow) start() error {
|
|
for {
|
|
inv, err := flow.readInv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
requestedIDs, err := flow.requestInvTransactions(inv)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = flow.receiveTransactions(requestedIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (flow *handleRelayedTransactionsFlow) requestInvTransactions(
|
|
inv *wire.MsgInvTransaction) (requestedIDs []*daghash.TxID, err error) {
|
|
|
|
idsToRequest := make([]*daghash.TxID, 0, len(inv.TxIDs))
|
|
for _, txID := range inv.TxIDs {
|
|
if flow.isKnownTransaction(txID) {
|
|
continue
|
|
}
|
|
exists := flow.SharedRequestedTransactions().addIfNotExists(txID)
|
|
if exists {
|
|
continue
|
|
}
|
|
idsToRequest = append(idsToRequest, txID)
|
|
}
|
|
|
|
if len(idsToRequest) == 0 {
|
|
return idsToRequest, nil
|
|
}
|
|
|
|
msgGetTransactions := wire.NewMsgRequestTransactions(idsToRequest)
|
|
err = flow.outgoingRoute.Enqueue(msgGetTransactions)
|
|
if err != nil {
|
|
flow.SharedRequestedTransactions().removeMany(idsToRequest)
|
|
return nil, err
|
|
}
|
|
return idsToRequest, nil
|
|
}
|
|
|
|
func (flow *handleRelayedTransactionsFlow) isKnownTransaction(txID *daghash.TxID) bool {
|
|
// Ask the transaction memory pool if the transaction is known
|
|
// to it in any form (main pool or orphan).
|
|
if flow.TxPool().HaveTransaction(txID) {
|
|
return true
|
|
}
|
|
|
|
// Check if the transaction exists from the point of view of the
|
|
// DAG's virtual block. Note that this is only a best effort
|
|
// since it is expensive to check existence of every output and
|
|
// the only purpose of this check is to avoid downloading
|
|
// already known transactions. Only the first two outputs are
|
|
// checked because the vast majority of transactions consist of
|
|
// two outputs where one is some form of "pay-to-somebody-else"
|
|
// and the other is a change output.
|
|
prevOut := wire.Outpoint{TxID: *txID}
|
|
for i := uint32(0); i < 2; i++ {
|
|
prevOut.Index = i
|
|
_, ok := flow.DAG().GetUTXOEntry(prevOut)
|
|
if ok {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (flow *handleRelayedTransactionsFlow) readInv() (*wire.MsgInvTransaction, error) {
|
|
if len(flow.invsQueue) > 0 {
|
|
var inv *wire.MsgInvTransaction
|
|
inv, flow.invsQueue = flow.invsQueue[0], flow.invsQueue[1:]
|
|
return inv, nil
|
|
}
|
|
|
|
msg, err := flow.incomingRoute.Dequeue()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
inv, ok := msg.(*wire.MsgInvTransaction)
|
|
if !ok {
|
|
return nil, protocolerrors.Errorf(true, "unexpected %s message in the block relay flow while "+
|
|
"expecting an inv message", msg.Command())
|
|
}
|
|
return inv, nil
|
|
}
|
|
|
|
func (flow *handleRelayedTransactionsFlow) broadcastAcceptedTransactions(acceptedTxs []*mempool.TxDesc) error {
|
|
// TODO(libp2p) Add mechanism to avoid sending to other peers invs that are known to them (e.g. mruinvmap)
|
|
// TODO(libp2p) Consider broadcasting in bulks
|
|
idsToBroadcast := make([]*daghash.TxID, len(acceptedTxs))
|
|
for i, tx := range acceptedTxs {
|
|
idsToBroadcast[i] = tx.Tx.ID()
|
|
}
|
|
inv := wire.NewMsgInvTransaction(idsToBroadcast)
|
|
return flow.Broadcast(inv)
|
|
}
|
|
|
|
// readMsgTxOrNotFound returns the next msgTx or msgTransactionNotFound in incomingRoute,
|
|
// returning only one of the message types at a time.
|
|
//
|
|
// and populates invsQueue with any inv messages that meanwhile arrive.
|
|
func (flow *handleRelayedTransactionsFlow) readMsgTxOrNotFound() (
|
|
msgTx *wire.MsgTx, msgNotFound *wire.MsgTransactionNotFound, err error) {
|
|
|
|
for {
|
|
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
switch message := message.(type) {
|
|
case *wire.MsgInvTransaction:
|
|
flow.invsQueue = append(flow.invsQueue, message)
|
|
case *wire.MsgTx:
|
|
return message, nil, nil
|
|
case *wire.MsgTransactionNotFound:
|
|
return nil, message, nil
|
|
default:
|
|
return nil, nil, errors.Errorf("unexpected message %s", message.Command())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransactions []*daghash.TxID) error {
|
|
// In case the function returns earlier than expected, we want to make sure sharedRequestedTransactions is
|
|
// clean from any pending transactions.
|
|
defer flow.SharedRequestedTransactions().removeMany(requestedTransactions)
|
|
for _, expectedID := range requestedTransactions {
|
|
msgTx, msgTxNotFound, err := flow.readMsgTxOrNotFound()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if msgTxNotFound != nil {
|
|
if !msgTxNotFound.ID.IsEqual(expectedID) {
|
|
return protocolerrors.Errorf(true, "expected transaction %s, but got %s",
|
|
expectedID, msgTxNotFound.ID)
|
|
}
|
|
|
|
continue
|
|
}
|
|
tx := util.NewTx(msgTx)
|
|
if !tx.ID().IsEqual(expectedID) {
|
|
return protocolerrors.Errorf(true, "expected transaction %s, but got %s",
|
|
expectedID, tx.ID())
|
|
}
|
|
|
|
acceptedTxs, err := flow.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag
|
|
if err != nil {
|
|
ruleErr := &mempool.RuleError{}
|
|
if !errors.As(err, ruleErr) {
|
|
return errors.Wrapf(err, "failed to process transaction %s", tx.ID())
|
|
}
|
|
|
|
shouldBan := false
|
|
if txRuleErr := (&mempool.TxRuleError{}); errors.As(ruleErr.Err, txRuleErr) {
|
|
if txRuleErr.RejectCode == mempool.RejectInvalid {
|
|
shouldBan = true
|
|
}
|
|
} else if dagRuleErr := (&blockdag.RuleError{}); errors.As(ruleErr.Err, dagRuleErr) {
|
|
shouldBan = true
|
|
}
|
|
|
|
if !shouldBan {
|
|
continue
|
|
}
|
|
|
|
return protocolerrors.Errorf(true, "rejected transaction %s", tx.ID())
|
|
}
|
|
err = flow.broadcastAcceptedTransactions(acceptedTxs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// TODO(libp2p) Notify transactionsAcceptedToMempool to RPC
|
|
}
|
|
return nil
|
|
}
|