From 8e1958c20b54defda73dffb7d776915ba7f832f6 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 21 Jul 2020 18:02:33 +0300 Subject: [PATCH] [NOD-1168] Add context interfaces for flows (#808) * [NOD-1168] Add context interfaces to flows * [NOD-1168] Move IBD state to protocol manager * [NOD-1168] Move ready peers to protocol manager * [NOD-1168] Add comments * [NOD-1168] Separate context interfaces for send and receive pings * [NOD-1168] Add protocol shared state to FlowContext * [NOD-1168] Fix comment * [NOD-1168] Rename Context->HandleHandshakeContext * [NOD-1168] Initialize readyPeers and transactionsToRebroadcast * [NOD-1168] Rename readyPeers -> peers --- protocol/common/common.go | 4 + protocol/flowcontext/addresses.go | 8 ++ protocol/{ => flowcontext}/blocks.go | 25 +++-- protocol/flowcontext/config.go | 8 ++ protocol/flowcontext/consensus.go | 8 ++ protocol/flowcontext/flow_context.go | 57 ++++++++++ protocol/flowcontext/ibd.go | 70 ++++++++++++ protocol/flowcontext/network.go | 46 ++++++++ protocol/flowcontext/transactions.go | 70 ++++++++++++ .../flows/addressexchange/receiveaddresses.go | 20 ++-- .../flows/addressexchange/sendaddresses.go | 11 +- .../blockrelay/handle_relay_block_requests.go | 13 ++- .../flows/blockrelay/handle_relay_invs.go | 61 ++++++----- .../blockrelay/shared_requested_blocks.go | 17 +-- protocol/flows/handshake/handshake.go | 31 ++++-- protocol/flows/handshake/receiveversion.go | 10 +- protocol/flows/handshake/sendversion.go | 18 ++-- .../flows/ibd/handle_get_block_locator.go | 11 +- protocol/flows/ibd/handle_get_blocks.go | 17 +-- protocol/flows/ibd/ibd.go | 100 +++++------------- protocol/flows/ibd/selected_tip.go | 43 +++----- protocol/flows/ping/ping.go | 12 ++- .../relaytransactions/relaytransactions.go | 53 +++++----- protocol/manager.go | 30 +----- protocol/peer/peer.go | 44 -------- protocol/protocol.go | 33 +++--- protocol/transactions.go | 58 ---------- 27 files changed, 509 insertions(+), 369 deletions(-) create mode 100644 protocol/flowcontext/addresses.go rename protocol/{ => flowcontext}/blocks.go (58%) create mode 100644 protocol/flowcontext/config.go create mode 100644 protocol/flowcontext/consensus.go create mode 100644 protocol/flowcontext/flow_context.go create mode 100644 protocol/flowcontext/ibd.go create mode 100644 protocol/flowcontext/network.go create mode 100644 protocol/flowcontext/transactions.go delete mode 100644 protocol/transactions.go diff --git a/protocol/common/common.go b/protocol/common/common.go index e5ed3812a..5e7d28da7 100644 --- a/protocol/common/common.go +++ b/protocol/common/common.go @@ -1,9 +1,13 @@ package common import ( + "github.com/pkg/errors" "time" ) // DefaultTimeout is the default duration to wait for enqueuing/dequeuing // to/from routes. const DefaultTimeout = 30 * time.Second + +// ErrPeerWithSameIDExists signifies that a peer with the same ID already exist. +var ErrPeerWithSameIDExists = errors.New("ready with the same ID already exists") diff --git a/protocol/flowcontext/addresses.go b/protocol/flowcontext/addresses.go new file mode 100644 index 000000000..3e27b3978 --- /dev/null +++ b/protocol/flowcontext/addresses.go @@ -0,0 +1,8 @@ +package flowcontext + +import "github.com/kaspanet/kaspad/addrmgr" + +// AddressManager returns the address manager associated to the flow context. +func (f *FlowContext) AddressManager() *addrmgr.AddrManager { + return f.addressManager +} diff --git a/protocol/blocks.go b/protocol/flowcontext/blocks.go similarity index 58% rename from protocol/blocks.go rename to protocol/flowcontext/blocks.go index f2dfae521..4c2f26222 100644 --- a/protocol/blocks.go +++ b/protocol/flowcontext/blocks.go @@ -1,7 +1,7 @@ -package protocol +package flowcontext import ( - peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/protocol/flows/blockrelay" "github.com/kaspanet/kaspad/util" "github.com/kaspanet/kaspad/util/daghash" "github.com/kaspanet/kaspad/wire" @@ -11,24 +11,23 @@ import ( // OnNewBlock updates the mempool after a new block arrival, and // relays newly unorphaned transactions and possibly rebroadcast // manually added transactions when not in IBD. -// TODO(libp2p) Call this function from IBD as well. -func (m *Manager) OnNewBlock(block *util.Block) error { - transactionsAcceptedToMempool, err := m.txPool.HandleNewBlock(block) +func (f *FlowContext) OnNewBlock(block *util.Block) error { + transactionsAcceptedToMempool, err := f.txPool.HandleNewBlock(block) if err != nil { return err } // TODO(libp2p) Notify transactionsAcceptedToMempool to RPC - m.updateTransactionsToRebroadcast(block) + f.updateTransactionsToRebroadcast(block) // Don't relay transactions when in IBD. - if atomic.LoadUint32(&m.isInIBD) != 0 { + if atomic.LoadUint32(&f.isInIBD) != 0 { return nil } var txIDsToRebroadcast []*daghash.TxID - if m.shouldRebroadcastTransactions() { - txIDsToRebroadcast = m.txIDsToRebroadcast() + if f.shouldRebroadcastTransactions() { + txIDsToRebroadcast = f.txIDsToRebroadcast() } txIDsToBroadcast := make([]*daghash.TxID, len(transactionsAcceptedToMempool)+len(txIDsToRebroadcast)) @@ -39,5 +38,11 @@ func (m *Manager) OnNewBlock(block *util.Block) error { copy(txIDsToBroadcast[len(transactionsAcceptedToMempool):], txIDsToBroadcast) txIDsToBroadcast = txIDsToBroadcast[:wire.MaxInvPerTxInvMsg] inv := wire.NewMsgTxInv(txIDsToBroadcast) - return m.netAdapter.Broadcast(peerpkg.ReadyPeerIDs(), inv) + return f.Broadcast(inv) +} + +// SharedRequestedBlocks returns a *blockrelay.SharedRequestedBlocks for sharing +// data about requested blocks between different peers. +func (f *FlowContext) SharedRequestedBlocks() *blockrelay.SharedRequestedBlocks { + return f.sharedRequestedBlocks } diff --git a/protocol/flowcontext/config.go b/protocol/flowcontext/config.go new file mode 100644 index 000000000..65aeddfb4 --- /dev/null +++ b/protocol/flowcontext/config.go @@ -0,0 +1,8 @@ +package flowcontext + +import "github.com/kaspanet/kaspad/config" + +// Config returns an instance of *config.Config associated to the flow context. +func (f *FlowContext) Config() *config.Config { + return f.cfg +} diff --git a/protocol/flowcontext/consensus.go b/protocol/flowcontext/consensus.go new file mode 100644 index 000000000..75d2d2a20 --- /dev/null +++ b/protocol/flowcontext/consensus.go @@ -0,0 +1,8 @@ +package flowcontext + +import "github.com/kaspanet/kaspad/blockdag" + +// DAG returns the DAG associated to the flow context. +func (f *FlowContext) DAG() *blockdag.BlockDAG { + return f.dag +} diff --git a/protocol/flowcontext/flow_context.go b/protocol/flowcontext/flow_context.go new file mode 100644 index 000000000..a7cdf31e2 --- /dev/null +++ b/protocol/flowcontext/flow_context.go @@ -0,0 +1,57 @@ +package flowcontext + +import ( + "github.com/kaspanet/kaspad/addrmgr" + "github.com/kaspanet/kaspad/blockdag" + "github.com/kaspanet/kaspad/config" + "github.com/kaspanet/kaspad/mempool" + "github.com/kaspanet/kaspad/netadapter" + "github.com/kaspanet/kaspad/netadapter/id" + "github.com/kaspanet/kaspad/protocol/flows/blockrelay" + "github.com/kaspanet/kaspad/protocol/flows/relaytransactions" + peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/util/daghash" + "sync" + "time" +) + +// FlowContext holds state that is relevant to more than one flow or one peer, and allows communication between +// different flows that can be associated to different peers. +type FlowContext struct { + cfg *config.Config + netAdapter *netadapter.NetAdapter + txPool *mempool.TxPool + addedTransactions []*util.Tx + dag *blockdag.BlockDAG + addressManager *addrmgr.AddrManager + + transactionsToRebroadcastLock sync.Mutex + transactionsToRebroadcast map[daghash.TxID]*util.Tx + lastRebroadcastTime time.Time + sharedRequestedTransactions *relaytransactions.SharedRequestedTransactions + + sharedRequestedBlocks *blockrelay.SharedRequestedBlocks + + isInIBD uint32 + startIBDMutex sync.Mutex + + peers map[*id.ID]*peerpkg.Peer + peersMutex sync.RWMutex +} + +// New returns a new instance of FlowContext. +func New(cfg *config.Config, dag *blockdag.BlockDAG, + addressManager *addrmgr.AddrManager, txPool *mempool.TxPool, netAdapter *netadapter.NetAdapter) *FlowContext { + return &FlowContext{ + cfg: cfg, + netAdapter: netAdapter, + dag: dag, + addressManager: addressManager, + txPool: txPool, + sharedRequestedTransactions: relaytransactions.NewSharedRequestedTransactions(), + sharedRequestedBlocks: blockrelay.NewSharedRequestedBlocks(), + peers: make(map[*id.ID]*peerpkg.Peer), + transactionsToRebroadcast: make(map[daghash.TxID]*util.Tx), + } +} diff --git a/protocol/flowcontext/ibd.go b/protocol/flowcontext/ibd.go new file mode 100644 index 000000000..ef1b3e056 --- /dev/null +++ b/protocol/flowcontext/ibd.go @@ -0,0 +1,70 @@ +package flowcontext + +import ( + "github.com/kaspanet/kaspad/blockdag" + peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "sync/atomic" + "time" +) + +// StartIBDIfRequired selects a peer and starts IBD against it +// if required +func (f *FlowContext) StartIBDIfRequired() { + f.startIBDMutex.Lock() + defer f.startIBDMutex.Unlock() + + if f.IsInIBD() { + return + } + + peer := f.selectPeerForIBD(f.dag) + if peer == nil { + f.requestSelectedTipsIfRequired() + return + } + + atomic.StoreUint32(&f.isInIBD, 1) + peer.StartIBD() +} + +// IsInIBD is true if IBD is currently running +func (f *FlowContext) IsInIBD() bool { + return atomic.LoadUint32(&f.isInIBD) != 0 +} + +// selectPeerForIBD returns the first peer whose selected tip +// hash is not in our DAG +func (f *FlowContext) selectPeerForIBD(dag *blockdag.BlockDAG) *peerpkg.Peer { + for _, peer := range f.peers { + peerSelectedTipHash := peer.SelectedTipHash() + if !dag.IsInDAG(peerSelectedTipHash) { + return peer + } + } + return nil +} + +func (f *FlowContext) requestSelectedTipsIfRequired() { + if f.isDAGTimeCurrent() { + return + } + f.requestSelectedTips() +} + +func (f *FlowContext) isDAGTimeCurrent() bool { + const minDurationToRequestSelectedTips = time.Minute + return f.dag.Now().Sub(f.dag.SelectedTipHeader().Timestamp) > minDurationToRequestSelectedTips +} + +func (f *FlowContext) requestSelectedTips() { + for _, peer := range f.peers { + peer.RequestSelectedTipIfRequired() + } +} + +// FinishIBD finishes the current IBD flow and starts a new one if required. +func (f *FlowContext) FinishIBD() { + atomic.StoreUint32(&f.isInIBD, 0) + + f.StartIBDIfRequired() +} diff --git a/protocol/flowcontext/network.go b/protocol/flowcontext/network.go new file mode 100644 index 000000000..252d58a52 --- /dev/null +++ b/protocol/flowcontext/network.go @@ -0,0 +1,46 @@ +package flowcontext + +import ( + "github.com/kaspanet/kaspad/netadapter" + "github.com/kaspanet/kaspad/netadapter/id" + "github.com/kaspanet/kaspad/protocol/common" + peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/wire" + "github.com/pkg/errors" +) + +// NetAdapter returns the net adapter that is associated to the flow context. +func (f *FlowContext) NetAdapter() *netadapter.NetAdapter { + return f.netAdapter +} + +// AddToPeers marks this peer as ready and adds it to the ready peers list. +func (f *FlowContext) AddToPeers(peer *peerpkg.Peer) error { + f.peersMutex.RLock() + defer f.peersMutex.RUnlock() + + if _, ok := f.peers[peer.ID()]; ok { + return errors.Wrapf(common.ErrPeerWithSameIDExists, "peer with ID %s already exists", peer.ID()) + } + + f.peers[peer.ID()] = peer + return nil +} + +// readyPeerIDs returns the peer IDs of all the ready peers. +func (f *FlowContext) readyPeerIDs() []*id.ID { + f.peersMutex.RLock() + defer f.peersMutex.RUnlock() + peerIDs := make([]*id.ID, len(f.peers)) + i := 0 + for peerID := range f.peers { + peerIDs[i] = peerID + i++ + } + return peerIDs +} + +// Broadcast broadcast the given message to all the ready peers. +func (f *FlowContext) Broadcast(message wire.Message) error { + return f.netAdapter.Broadcast(f.readyPeerIDs(), message) +} diff --git a/protocol/flowcontext/transactions.go b/protocol/flowcontext/transactions.go new file mode 100644 index 000000000..80a3750d0 --- /dev/null +++ b/protocol/flowcontext/transactions.go @@ -0,0 +1,70 @@ +package flowcontext + +import ( + "github.com/kaspanet/kaspad/mempool" + "github.com/kaspanet/kaspad/protocol/flows/relaytransactions" + "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/kaspanet/kaspad/wire" + "github.com/pkg/errors" + "time" +) + +// AddTransaction adds transaction to the mempool and propagates it. +func (f *FlowContext) AddTransaction(tx *util.Tx) error { + f.transactionsToRebroadcastLock.Lock() + defer f.transactionsToRebroadcastLock.Unlock() + + transactionsAcceptedToMempool, err := f.txPool.ProcessTransaction(tx, false, 0) + if err != nil { + return err + } + + if len(transactionsAcceptedToMempool) > 1 { + panic(errors.New("got more than one accepted transactions when no orphans were allowed")) + } + + f.transactionsToRebroadcast[*tx.ID()] = tx + inv := wire.NewMsgTxInv([]*daghash.TxID{tx.ID()}) + return f.Broadcast(inv) +} + +func (f *FlowContext) updateTransactionsToRebroadcast(block *util.Block) { + f.transactionsToRebroadcastLock.Lock() + defer f.transactionsToRebroadcastLock.Unlock() + // Note: if the block is red, its transactions won't be rebroadcasted + // anymore, although they are not included in the UTXO set. + // This is probably ok, since red blocks are quite rare. + for _, tx := range block.Transactions() { + delete(f.transactionsToRebroadcast, *tx.ID()) + } +} + +func (f *FlowContext) shouldRebroadcastTransactions() bool { + const rebroadcastInterval = 30 * time.Second + return time.Since(f.lastRebroadcastTime) > rebroadcastInterval +} + +func (f *FlowContext) txIDsToRebroadcast() []*daghash.TxID { + f.transactionsToRebroadcastLock.Lock() + defer f.transactionsToRebroadcastLock.Unlock() + + txIDs := make([]*daghash.TxID, len(f.transactionsToRebroadcast)) + i := 0 + for _, tx := range f.transactionsToRebroadcast { + txIDs[i] = tx.ID() + i++ + } + return txIDs +} + +// SharedRequestedTransactions returns a *relaytransactions.SharedRequestedTransactions for sharing +// data about requested transactions between different peers. +func (f *FlowContext) SharedRequestedTransactions() *relaytransactions.SharedRequestedTransactions { + return f.sharedRequestedTransactions +} + +// TxPool returns the transaction pool associated to the manager. +func (f *FlowContext) TxPool() *mempool.TxPool { + return f.txPool +} diff --git a/protocol/flows/addressexchange/receiveaddresses.go b/protocol/flows/addressexchange/receiveaddresses.go index 7e4a42d99..a9b2e602a 100644 --- a/protocol/flows/addressexchange/receiveaddresses.go +++ b/protocol/flows/addressexchange/receiveaddresses.go @@ -10,11 +10,17 @@ import ( "github.com/kaspanet/kaspad/wire" ) -// ReceiveAddresses asks a peer for more addresses if needed. -func ReceiveAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, cfg *config.Config, peer *peerpkg.Peer, - addressManager *addrmgr.AddrManager) error { +// ReceiveAddressesContext is the interface for the context needed for the ReceiveAddresses flow. +type ReceiveAddressesContext interface { + Config() *config.Config + AddressManager() *addrmgr.AddrManager +} - if !addressManager.NeedMoreAddresses() { +// ReceiveAddresses asks a peer for more addresses if needed. +func ReceiveAddresses(context ReceiveAddressesContext, incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer) error { + + if !context.AddressManager().NeedMoreAddresses() { return nil } @@ -39,15 +45,15 @@ func ReceiveAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, return protocolerrors.Errorf(true, "got unexpected "+ "IncludeAllSubnetworks=true in [%s] command", msgAddresses.Command()) } - if !msgAddresses.SubnetworkID.IsEqual(cfg.SubnetworkID) && msgAddresses.SubnetworkID != nil { + if !msgAddresses.SubnetworkID.IsEqual(context.Config().SubnetworkID) && msgAddresses.SubnetworkID != nil { return protocolerrors.Errorf(false, "only full nodes and %s subnetwork IDs "+ "are allowed in [%s] command, but got subnetwork ID %s", - cfg.SubnetworkID, msgAddresses.Command(), msgAddresses.SubnetworkID) + context.Config().SubnetworkID, msgAddresses.Command(), msgAddresses.SubnetworkID) } // TODO(libp2p) Consider adding to peer known addresses set // TODO(libp2p) Replace with real peer IP fakeSourceAddress := new(wire.NetAddress) - addressManager.AddAddresses(msgAddresses.AddrList, fakeSourceAddress, msgAddresses.SubnetworkID) + context.AddressManager().AddAddresses(msgAddresses.AddrList, fakeSourceAddress, msgAddresses.SubnetworkID) return nil } diff --git a/protocol/flows/addressexchange/sendaddresses.go b/protocol/flows/addressexchange/sendaddresses.go index b2575b0d6..6215f478d 100644 --- a/protocol/flows/addressexchange/sendaddresses.go +++ b/protocol/flows/addressexchange/sendaddresses.go @@ -7,9 +7,13 @@ import ( "math/rand" ) +// SendAddressesContext is the interface for the context needed for the SendAddresses flow. +type SendAddressesContext interface { + AddressManager() *addrmgr.AddrManager +} + // SendAddresses sends addresses to a peer that requests it. -func SendAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, - addressManager *addrmgr.AddrManager) error { +func SendAddresses(context SendAddressesContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { message, err := incomingRoute.Dequeue() if err != nil { @@ -17,7 +21,8 @@ func SendAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, } msgGetAddresses := message.(*wire.MsgGetAddresses) - addresses := addressManager.AddressCache(msgGetAddresses.IncludeAllSubnetworks, msgGetAddresses.SubnetworkID) + addresses := context.AddressManager().AddressCache(msgGetAddresses.IncludeAllSubnetworks, + msgGetAddresses.SubnetworkID) msgAddresses := wire.NewMsgAddresses(msgGetAddresses.IncludeAllSubnetworks, msgGetAddresses.SubnetworkID) err = msgAddresses.AddAddresses(shuffleAddresses(addresses)...) if err != nil { diff --git a/protocol/flows/blockrelay/handle_relay_block_requests.go b/protocol/flows/blockrelay/handle_relay_block_requests.go index a7b160641..62afd5435 100644 --- a/protocol/flows/blockrelay/handle_relay_block_requests.go +++ b/protocol/flows/blockrelay/handle_relay_block_requests.go @@ -9,10 +9,15 @@ import ( "github.com/pkg/errors" ) +// RelayBlockRequestsContext is the interface for the context needed for the HandleRelayBlockRequests flow. +type RelayBlockRequestsContext interface { + DAG() *blockdag.BlockDAG +} + // HandleRelayBlockRequests listens to wire.MsgGetRelayBlocks messages and sends // their corresponding blocks to the requesting peer. -func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router.Route, - peer *peerpkg.Peer, dag *blockdag.BlockDAG) error { +func HandleRelayBlockRequests(context RelayBlockRequestsContext, incomingRoute *router.Route, + outgoingRoute *router.Route, peer *peerpkg.Peer) error { for { message, err := incomingRoute.Dequeue() @@ -22,7 +27,7 @@ func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router getRelayBlocksMessage := message.(*wire.MsgGetRelayBlocks) for _, hash := range getRelayBlocksMessage.Hashes { // Fetch the block from the database. - block, err := dag.BlockByHash(hash) + block, err := context.DAG().BlockByHash(hash) if blockdag.IsNotInDAGErr(err) { return protocolerrors.Errorf(true, "block %s not found", hash) } else if err != nil { @@ -32,7 +37,7 @@ func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router // If we are a full node and the peer is a partial node, we must convert // the block to a partial block. - nodeSubnetworkID := dag.SubnetworkID() + nodeSubnetworkID := context.DAG().SubnetworkID() peerSubnetworkID := peer.SubnetworkID() isNodeFull := nodeSubnetworkID == nil diff --git a/protocol/flows/blockrelay/handle_relay_invs.go b/protocol/flows/blockrelay/handle_relay_invs.go index 7435ace42..5bfe9481e 100644 --- a/protocol/flows/blockrelay/handle_relay_invs.go +++ b/protocol/flows/blockrelay/handle_relay_invs.go @@ -6,7 +6,6 @@ import ( "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/protocol/blocklogger" "github.com/kaspanet/kaspad/protocol/common" - "github.com/kaspanet/kaspad/protocol/flows/ibd" peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/protocol/protocolerrors" "github.com/kaspanet/kaspad/util" @@ -16,14 +15,21 @@ import ( "github.com/pkg/errors" ) -// NewBlockHandler is a function that is to be -// called when a new block is successfully processed. -type NewBlockHandler func(block *util.Block) error +// RelayInvsContext is the interface for the context needed for the HandleRelayInvs flow. +type RelayInvsContext interface { + NetAdapter() *netadapter.NetAdapter + DAG() *blockdag.BlockDAG + OnNewBlock(block *util.Block) error + SharedRequestedBlocks() *SharedRequestedBlocks + StartIBDIfRequired() + IsInIBD() bool + Broadcast(message wire.Message) error +} // HandleRelayInvs listens to wire.MsgInvRelayBlock messages, requests their corresponding blocks if they // are missing, adds them to the DAG and propagates them to the rest of the network. -func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route, - peer *peerpkg.Peer, netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG, newBlockHandler NewBlockHandler) error { +func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer) error { invsQueue := make([]*wire.MsgInvRelayBlock, 0) for { @@ -32,16 +38,16 @@ func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route, return err } - if dag.IsKnownBlock(inv.Hash) { - if dag.IsKnownInvalid(inv.Hash) { + if context.DAG().IsKnownBlock(inv.Hash) { + if context.DAG().IsKnownInvalid(inv.Hash) { return protocolerrors.Errorf(true, "sent inv of an invalid block %s", inv.Hash) } continue } - ibd.StartIBDIfRequired(dag) - if ibd.IsInIBD() { + context.StartIBDIfRequired() + if context.IsInIBD() { // Block relay is disabled during IBD continue } @@ -50,8 +56,7 @@ func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route, requestQueue.enqueueIfNotExists(inv.Hash) for requestQueue.len() > 0 { - err := requestBlocks(netAdapter, outgoingRoute, peer, incomingRoute, dag, &invsQueue, - requestQueue, newBlockHandler) + err := requestBlocks(context, outgoingRoute, peer, incomingRoute, &invsQueue, requestQueue) if err != nil { return err } @@ -80,10 +85,9 @@ func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlock) ( return inv, nil } -func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Route, - peer *peerpkg.Peer, incomingRoute *router.Route, dag *blockdag.BlockDAG, - invsQueue *[]*wire.MsgInvRelayBlock, requestQueue *hashesQueueSet, - newBlockHandler NewBlockHandler) error { +func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route, + peer *peerpkg.Peer, incomingRoute *router.Route, + invsQueue *[]*wire.MsgInvRelayBlock, requestQueue *hashesQueueSet) error { numHashesToRequest := mathUtil.MinInt(wire.MsgGetRelayBlocksHashes, requestQueue.len()) hashesToRequest := requestQueue.dequeue(numHashesToRequest) @@ -91,7 +95,7 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou pendingBlocks := map[daghash.Hash]struct{}{} var filteredHashesToRequest []*daghash.Hash for _, hash := range hashesToRequest { - exists := requestedBlocks.addIfNotExists(hash) + exists := context.SharedRequestedBlocks().addIfNotExists(hash) if !exists { continue } @@ -102,7 +106,7 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou // In case the function returns earlier than expected, we want to make sure requestedBlocks is // clean from any pending blocks. - defer requestedBlocks.removeSet(pendingBlocks) + defer context.SharedRequestedBlocks().removeSet(pendingBlocks) getRelayBlocksMsg := wire.NewMsgGetRelayBlocks(filteredHashesToRequest) err := outgoingRoute.Enqueue(getRelayBlocksMsg) @@ -122,9 +126,9 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou return protocolerrors.Errorf(true, "got unrequested block %s", block.Hash()) } delete(pendingBlocks, *blockHash) - requestedBlocks.remove(blockHash) + context.SharedRequestedBlocks().remove(blockHash) - err = processAndRelayBlock(netAdapater, peer, dag, requestQueue, block, newBlockHandler) + err = processAndRelayBlock(context, peer, requestQueue, block) if err != nil { return err } @@ -155,12 +159,11 @@ func readMsgBlock(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlo } } -func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer, - dag *blockdag.BlockDAG, requestQueue *hashesQueueSet, block *util.Block, - newBlockHandler NewBlockHandler) error { +func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer, + requestQueue *hashesQueueSet, block *util.Block) error { blockHash := block.Hash() - isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNone) + isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone) if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -187,7 +190,7 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer, } const maxOrphanBlueScoreDiff = 10000 - selectedTipBlueScore := dag.SelectedTipBlueScore() + selectedTipBlueScore := context.DAG().SelectedTipBlueScore() if blueScore > selectedTipBlueScore+maxOrphanBlueScoreDiff { log.Infof("Orphan block %s has blue score %d and the selected tip blue score is "+ "%d. Ignoring orphans with a blue score difference from the selected tip greater than %d", @@ -196,7 +199,7 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer, } // Request the parents for the orphan block from the peer that sent it. - missingAncestors := dag.GetOrphanMissingAncestorHashes(blockHash) + missingAncestors := context.DAG().GetOrphanMissingAncestorHashes(blockHash) for _, missingAncestor := range missingAncestors { requestQueue.enqueueIfNotExists(missingAncestor) } @@ -212,13 +215,13 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer, // sm.restartSyncIfNeeded() //// Clear the rejected transactions. //sm.rejectedTxns = make(map[daghash.TxID]struct{}) - err = netAdapter.Broadcast(peerpkg.ReadyPeerIDs(), wire.NewMsgInvBlock(blockHash)) + err = context.Broadcast(wire.NewMsgInvBlock(blockHash)) if err != nil { return err } - ibd.StartIBDIfRequired(dag) - err = newBlockHandler(block) + context.StartIBDIfRequired() + err = context.OnNewBlock(block) if err != nil { panic(err) } diff --git a/protocol/flows/blockrelay/shared_requested_blocks.go b/protocol/flows/blockrelay/shared_requested_blocks.go index c32c0d953..19eabac98 100644 --- a/protocol/flows/blockrelay/shared_requested_blocks.go +++ b/protocol/flows/blockrelay/shared_requested_blocks.go @@ -6,18 +6,20 @@ import ( "github.com/kaspanet/kaspad/util/daghash" ) -type sharedRequestedBlocks struct { +// SharedRequestedBlocks is a data structure that is shared between peers that +// holds the hashes of all the requested blocks to prevent redundant requests. +type SharedRequestedBlocks struct { blocks map[daghash.Hash]struct{} sync.Mutex } -func (s *sharedRequestedBlocks) remove(hash *daghash.Hash) { +func (s *SharedRequestedBlocks) remove(hash *daghash.Hash) { s.Lock() defer s.Unlock() delete(s.blocks, *hash) } -func (s *sharedRequestedBlocks) removeSet(blockHashes map[daghash.Hash]struct{}) { +func (s *SharedRequestedBlocks) removeSet(blockHashes map[daghash.Hash]struct{}) { s.Lock() defer s.Unlock() for hash := range blockHashes { @@ -25,7 +27,7 @@ func (s *sharedRequestedBlocks) removeSet(blockHashes map[daghash.Hash]struct{}) } } -func (s *sharedRequestedBlocks) addIfNotExists(hash *daghash.Hash) (exists bool) { +func (s *SharedRequestedBlocks) addIfNotExists(hash *daghash.Hash) (exists bool) { s.Lock() defer s.Unlock() _, ok := s.blocks[*hash] @@ -36,6 +38,9 @@ func (s *sharedRequestedBlocks) addIfNotExists(hash *daghash.Hash) (exists bool) return false } -var requestedBlocks = &sharedRequestedBlocks{ - blocks: make(map[daghash.Hash]struct{}), +// NewSharedRequestedBlocks returns a new instance of SharedRequestedBlocks. +func NewSharedRequestedBlocks() *SharedRequestedBlocks { + return &SharedRequestedBlocks{ + blocks: make(map[daghash.Hash]struct{}), + } } diff --git a/protocol/flows/handshake/handshake.go b/protocol/flows/handshake/handshake.go index 814bc7522..eab28a907 100644 --- a/protocol/flows/handshake/handshake.go +++ b/protocol/flows/handshake/handshake.go @@ -5,21 +5,30 @@ import ( "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/config" "github.com/kaspanet/kaspad/netadapter" + "github.com/kaspanet/kaspad/protocol/common" "sync" "sync/atomic" routerpkg "github.com/kaspanet/kaspad/netadapter/router" - "github.com/kaspanet/kaspad/protocol/flows/ibd" peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/util/locks" "github.com/kaspanet/kaspad/wire" "github.com/pkg/errors" ) +// HandleHandshakeContext is the interface for the context needed for the HandleHandshake flow. +type HandleHandshakeContext interface { + Config() *config.Config + NetAdapter() *netadapter.NetAdapter + DAG() *blockdag.BlockDAG + AddressManager() *addrmgr.AddrManager + StartIBDIfRequired() + AddToPeers(peer *peerpkg.Peer) error +} + // HandleHandshake sets up the handshake protocol - It sends a version message and waits for an incoming // version message, as well as a verack for the sent version -func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *netadapter.NetAdapter, - dag *blockdag.BlockDAG, addressManager *addrmgr.AddrManager) (peer *peerpkg.Peer, closed bool, err error) { +func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router) (peer *peerpkg.Peer, closed bool, err error) { receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion}) if err != nil { @@ -45,7 +54,7 @@ func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *n var peerAddress *wire.NetAddress spawn("HandleHandshake-ReceiveVersion", func() { defer wg.Done() - address, err := ReceiveVersion(receiveVersionRoute, router.OutgoingRoute(), netAdapter, peer, dag) + address, err := ReceiveVersion(context, receiveVersionRoute, router.OutgoingRoute(), peer) if err != nil { log.Errorf("error from ReceiveVersion: %s", err) } @@ -60,7 +69,7 @@ func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *n spawn("HandleHandshake-SendVersion", func() { defer wg.Done() - err := SendVersion(cfg, sendVersionRoute, router.OutgoingRoute(), netAdapter, dag) + err := SendVersion(context, sendVersionRoute, router.OutgoingRoute()) if err != nil { log.Errorf("error from SendVersion: %s", err) } @@ -81,27 +90,27 @@ func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *n case <-locks.ReceiveFromChanWhenDone(func() { wg.Wait() }): } - err = peerpkg.AddToReadyPeers(peer) + err = context.AddToPeers(peer) if err != nil { - if errors.Is(err, peerpkg.ErrPeerWithSameIDExists) { + if errors.Is(err, common.ErrPeerWithSameIDExists) { return nil, false, err } panic(err) } peerID := peer.ID() - err = netAdapter.AssociateRouterID(router, peerID) + err = context.NetAdapter().AssociateRouterID(router, peerID) if err != nil { panic(err) } if peerAddress != nil { subnetworkID := peer.SubnetworkID() - addressManager.AddAddress(peerAddress, peerAddress, subnetworkID) - addressManager.Good(peerAddress, subnetworkID) + context.AddressManager().AddAddress(peerAddress, peerAddress, subnetworkID) + context.AddressManager().Good(peerAddress, subnetworkID) } - ibd.StartIBDIfRequired(dag) + context.StartIBDIfRequired() err = router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck}) if err != nil { diff --git a/protocol/flows/handshake/receiveversion.go b/protocol/flows/handshake/receiveversion.go index cc94783b0..419000a9a 100644 --- a/protocol/flows/handshake/receiveversion.go +++ b/protocol/flows/handshake/receiveversion.go @@ -1,8 +1,6 @@ package handshake import ( - "github.com/kaspanet/kaspad/blockdag" - "github.com/kaspanet/kaspad/netadapter" "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/protocol/common" peerpkg "github.com/kaspanet/kaspad/protocol/peer" @@ -23,8 +21,8 @@ var ( // ReceiveVersion waits for the peer to send a version message, sends a // verack in response, and updates its info accordingly. -func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, netAdapter *netadapter.NetAdapter, - peer *peerpkg.Peer, dag *blockdag.BlockDAG) (*wire.NetAddress, error) { +func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer) (*wire.NetAddress, error) { message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { @@ -36,7 +34,7 @@ func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, ne return nil, protocolerrors.New(true, "a version message must precede all others") } - if !allowSelfConnections && netAdapter.ID().IsEqual(msgVersion.ID) { + if !allowSelfConnections && context.NetAdapter().ID().IsEqual(msgVersion.ID) { return nil, protocolerrors.New(true, "connected to self") } @@ -53,7 +51,7 @@ func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, ne } // Disconnect from partial nodes in networks that don't allow them - if !dag.Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil { + if !context.DAG().Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil { return nil, protocolerrors.New(true, "partial nodes are not allowed") } diff --git a/protocol/flows/handshake/sendversion.go b/protocol/flows/handshake/sendversion.go index b35c4ef22..4eaa3a39c 100644 --- a/protocol/flows/handshake/sendversion.go +++ b/protocol/flows/handshake/sendversion.go @@ -1,9 +1,6 @@ package handshake import ( - "github.com/kaspanet/kaspad/blockdag" - "github.com/kaspanet/kaspad/config" - "github.com/kaspanet/kaspad/netadapter" "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/protocol/common" "github.com/kaspanet/kaspad/version" @@ -29,19 +26,18 @@ var ( ) // SendVersion sends a version to a peer and waits for verack. -func SendVersion(cfg *config.Config, incomingRoute *router.Route, outgoingRoute *router.Route, - netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) error { +func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { - selectedTipHash := dag.SelectedTipHash() - subnetworkID := cfg.SubnetworkID + selectedTipHash := context.DAG().SelectedTipHash() + subnetworkID := context.Config().SubnetworkID // Version message. - localAddress, err := netAdapter.GetBestLocalAddress() + localAddress, err := context.NetAdapter().GetBestLocalAddress() if err != nil { panic(err) } - msg := wire.NewMsgVersion(localAddress, netAdapter.ID(), selectedTipHash, subnetworkID) - msg.AddUserAgent(userAgentName, userAgentVersion, cfg.UserAgentComments...) + msg := wire.NewMsgVersion(localAddress, context.NetAdapter().ID(), selectedTipHash, subnetworkID) + msg.AddUserAgent(userAgentName, userAgentVersion, context.Config().UserAgentComments...) // Advertise the services flag msg.Services = defaultServices @@ -50,7 +46,7 @@ func SendVersion(cfg *config.Config, incomingRoute *router.Route, outgoingRoute msg.ProtocolVersion = wire.ProtocolVersion // Advertise if inv messages for transactions are desired. - msg.DisableRelayTx = cfg.BlocksOnly + msg.DisableRelayTx = context.Config().BlocksOnly err = outgoingRoute.Enqueue(msg) if err != nil { diff --git a/protocol/flows/ibd/handle_get_block_locator.go b/protocol/flows/ibd/handle_get_block_locator.go index d1dcfd0a6..324d3b9cc 100644 --- a/protocol/flows/ibd/handle_get_block_locator.go +++ b/protocol/flows/ibd/handle_get_block_locator.go @@ -8,15 +8,22 @@ import ( "github.com/kaspanet/kaspad/wire" ) +// GetBlockLocatorContext is the interface for the context needed for the HandleGetBlockLocator flow. +type GetBlockLocatorContext interface { + DAG() *blockdag.BlockDAG +} + // HandleGetBlockLocator handles getBlockLocator messages -func HandleGetBlockLocator(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error { +func HandleGetBlockLocator(context GetBlockLocatorContext, incomingRoute *router.Route, + outgoingRoute *router.Route) error { + for { lowHash, highHash, err := receiveGetBlockLocator(incomingRoute) if err != nil { return err } - locator, err := dag.BlockLocatorFromHashes(highHash, lowHash) + locator, err := context.DAG().BlockLocatorFromHashes(highHash, lowHash) if err != nil || len(locator) == 0 { return protocolerrors.Errorf(true, "couldn't build a block "+ "locator between blocks %s and %s", lowHash, highHash) diff --git a/protocol/flows/ibd/handle_get_blocks.go b/protocol/flows/ibd/handle_get_blocks.go index 144a1b187..c37832ddd 100644 --- a/protocol/flows/ibd/handle_get_blocks.go +++ b/protocol/flows/ibd/handle_get_blocks.go @@ -7,15 +7,20 @@ import ( "github.com/kaspanet/kaspad/wire" ) +// GetBlocksContext is the interface for the context needed for the HandleGetBlocks flow. +type GetBlocksContext interface { + DAG() *blockdag.BlockDAG +} + // HandleGetBlocks handles getBlocks messages -func HandleGetBlocks(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error { +func HandleGetBlocks(context GetBlocksContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { for { lowHash, highHash, err := receiveGetBlocks(incomingRoute) if err != nil { return err } - msgIBDBlocks, err := buildMsgIBDBlocks(lowHash, highHash, dag) + msgIBDBlocks, err := buildMsgIBDBlocks(context, lowHash, highHash) if err != nil { return err } @@ -39,18 +44,18 @@ func receiveGetBlocks(incomingRoute *router.Route) (lowHash *daghash.Hash, return msgGetBlocks.LowHash, msgGetBlocks.HighHash, nil } -func buildMsgIBDBlocks(lowHash *daghash.Hash, highHash *daghash.Hash, - dag *blockdag.BlockDAG) ([]*wire.MsgIBDBlock, error) { +func buildMsgIBDBlocks(context GetBlocksContext, lowHash *daghash.Hash, + highHash *daghash.Hash) ([]*wire.MsgIBDBlock, error) { const maxHashesInMsgIBDBlocks = wire.MaxInvPerMsg - blockHashes, err := dag.AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks) + blockHashes, err := context.DAG().AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks) if err != nil { return nil, err } msgIBDBlocks := make([]*wire.MsgIBDBlock, len(blockHashes)) for i, blockHash := range blockHashes { - block, err := dag.BlockByHash(blockHash) + block, err := context.DAG().BlockByHash(blockHash) if err != nil { return nil, err } diff --git a/protocol/flows/ibd/ibd.go b/protocol/flows/ibd/ibd.go index 76de12d6b..9a665a53c 100644 --- a/protocol/flows/ibd/ibd.go +++ b/protocol/flows/ibd/ibd.go @@ -9,93 +9,50 @@ import ( "github.com/kaspanet/kaspad/util" "github.com/kaspanet/kaspad/util/daghash" "github.com/kaspanet/kaspad/wire" - "sync" - "sync/atomic" ) -var ( - isIBDRunning uint32 - startIBDMutex sync.Mutex -) - -// NewBlockHandler is a function that is to be -// called when a new block is successfully processed. -type NewBlockHandler func(block *util.Block) error - -// StartIBDIfRequired selects a peer and starts IBD against it -// if required -func StartIBDIfRequired(dag *blockdag.BlockDAG) { - startIBDMutex.Lock() - defer startIBDMutex.Unlock() - - if IsInIBD() { - return - } - - peer := selectPeerForIBD(dag) - if peer == nil { - requestSelectedTipsIfRequired(dag) - return - } - - atomic.StoreUint32(&isIBDRunning, 1) - peer.StartIBD() -} - -// IsInIBD is true if IBD is currently running -func IsInIBD() bool { - return atomic.LoadUint32(&isIBDRunning) != 0 -} - -// selectPeerForIBD returns the first peer whose selected tip -// hash is not in our DAG -func selectPeerForIBD(dag *blockdag.BlockDAG) *peerpkg.Peer { - for _, peer := range peerpkg.ReadyPeers() { - peerSelectedTipHash := peer.SelectedTipHash() - if !dag.IsInDAG(peerSelectedTipHash) { - return peer - } - } - return nil +// HandleIBDContext is the interface for the context needed for the HandleIBD flow. +type HandleIBDContext interface { + DAG() *blockdag.BlockDAG + OnNewBlock(block *util.Block) error + StartIBDIfRequired() + FinishIBD() } // HandleIBD waits for IBD start and handles it when IBD is triggered for this peer -func HandleIBD(incomingRoute *router.Route, outgoingRoute *router.Route, - peer *peerpkg.Peer, dag *blockdag.BlockDAG, newBlockHandler NewBlockHandler) error { +func HandleIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { for { - err := runIBD(incomingRoute, outgoingRoute, peer, dag, newBlockHandler) + err := runIBD(context, incomingRoute, outgoingRoute, peer) if err != nil { return err } } } -func runIBD(incomingRoute *router.Route, outgoingRoute *router.Route, - peer *peerpkg.Peer, dag *blockdag.BlockDAG, newBlockHandler NewBlockHandler) error { +func runIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { peer.WaitForIBDStart() - defer finishIBD(dag) + defer context.FinishIBD() peerSelectedTipHash := peer.SelectedTipHash() - highestSharedBlockHash, err := findHighestSharedBlockHash(incomingRoute, outgoingRoute, dag, peerSelectedTipHash) + highestSharedBlockHash, err := findHighestSharedBlockHash(context, incomingRoute, outgoingRoute, peerSelectedTipHash) if err != nil { return err } - if dag.IsKnownFinalizedBlock(highestSharedBlockHash) { + if context.DAG().IsKnownFinalizedBlock(highestSharedBlockHash) { return protocolerrors.Errorf(false, "cannot initiate "+ "IBD with peer %s because the highest shared chain block (%s) is "+ "below the finality point", peer, highestSharedBlockHash) } - return downloadBlocks(incomingRoute, outgoingRoute, dag, highestSharedBlockHash, peerSelectedTipHash, - newBlockHandler) + return downloadBlocks(context, incomingRoute, outgoingRoute, highestSharedBlockHash, peerSelectedTipHash) } -func findHighestSharedBlockHash(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG, +func findHighestSharedBlockHash(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peerSelectedTipHash *daghash.Hash) (lowHash *daghash.Hash, err error) { - lowHash = dag.Params.GenesisHash + lowHash = context.DAG().Params.GenesisHash highHash := peerSelectedTipHash for { @@ -113,11 +70,11 @@ func findHighestSharedBlockHash(incomingRoute *router.Route, outgoingRoute *rout // If it is, return it. If it isn't, we need to narrow our // getBlockLocator request and try again. locatorHighHash := blockLocatorHashes[0] - if dag.IsInDAG(locatorHighHash) { + if context.DAG().IsInDAG(locatorHighHash) { return locatorHighHash, nil } - highHash, lowHash = dag.FindNextLocatorBoundaries(blockLocatorHashes) + highHash, lowHash = context.DAG().FindNextLocatorBoundaries(blockLocatorHashes) } } @@ -142,9 +99,9 @@ func receiveBlockLocator(incomingRoute *router.Route) (blockLocatorHashes []*dag return msgBlockLocator.BlockLocatorHashes, nil } -func downloadBlocks(incomingRoute *router.Route, outgoingRoute *router.Route, - dag *blockdag.BlockDAG, highestSharedBlockHash *daghash.Hash, - peerSelectedTipHash *daghash.Hash, newBlockHandler NewBlockHandler) error { +func downloadBlocks(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, + highestSharedBlockHash *daghash.Hash, + peerSelectedTipHash *daghash.Hash) error { err := sendGetBlocks(outgoingRoute, highestSharedBlockHash, peerSelectedTipHash) if err != nil { @@ -156,7 +113,7 @@ func downloadBlocks(incomingRoute *router.Route, outgoingRoute *router.Route, if err != nil { return err } - err = processIBDBlock(dag, msgIBDBlock, newBlockHandler) + err = processIBDBlock(context, msgIBDBlock) if err != nil { return err } @@ -187,14 +144,13 @@ func receiveIBDBlock(incomingRoute *router.Route) (msgIBDBlock *wire.MsgIBDBlock return msgIBDBlock, nil } -func processIBDBlock(dag *blockdag.BlockDAG, msgIBDBlock *wire.MsgIBDBlock, - newBlockHandler NewBlockHandler) error { +func processIBDBlock(context HandleIBDContext, msgIBDBlock *wire.MsgIBDBlock) error { block := util.NewBlock(&msgIBDBlock.MsgBlock) - if dag.IsInDAG(block.Hash()) { + if context.DAG().IsInDAG(block.Hash()) { return nil } - isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNone) + isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone) if err != nil { return err } @@ -206,15 +162,9 @@ func processIBDBlock(dag *blockdag.BlockDAG, msgIBDBlock *wire.MsgIBDBlock, return protocolerrors.Errorf(false, "received delayed block %s "+ "during IBD", block.Hash()) } - err = newBlockHandler(block) + err = context.OnNewBlock(block) if err != nil { panic(err) } return nil } - -func finishIBD(dag *blockdag.BlockDAG) { - atomic.StoreUint32(&isIBDRunning, 0) - - StartIBDIfRequired(dag) -} diff --git a/protocol/flows/ibd/selected_tip.go b/protocol/flows/ibd/selected_tip.go index 3f2614074..d2b850721 100644 --- a/protocol/flows/ibd/selected_tip.go +++ b/protocol/flows/ibd/selected_tip.go @@ -8,41 +8,27 @@ import ( "github.com/kaspanet/kaspad/util/daghash" "github.com/kaspanet/kaspad/wire" "github.com/pkg/errors" - "time" ) -const minDurationToRequestSelectedTips = time.Minute - -func requestSelectedTipsIfRequired(dag *blockdag.BlockDAG) { - if isDAGTimeCurrent(dag) { - return - } - requestSelectedTips() -} - -func isDAGTimeCurrent(dag *blockdag.BlockDAG) bool { - return dag.Now().Sub(dag.SelectedTipHeader().Timestamp) > minDurationToRequestSelectedTips -} - -func requestSelectedTips() { - for _, peer := range peerpkg.ReadyPeers() { - peer.RequestSelectedTipIfRequired() - } +// RequestSelectedTipContext is the interface for the context needed for the RequestSelectedTip flow. +type RequestSelectedTipContext interface { + DAG() *blockdag.BlockDAG + StartIBDIfRequired() } // RequestSelectedTip waits for selected tip requests and handles them -func RequestSelectedTip(incomingRoute *router.Route, - outgoingRoute *router.Route, peer *peerpkg.Peer, dag *blockdag.BlockDAG) error { +func RequestSelectedTip(context RequestSelectedTipContext, incomingRoute *router.Route, + outgoingRoute *router.Route, peer *peerpkg.Peer) error { for { - err := runSelectedTipRequest(incomingRoute, outgoingRoute, peer, dag) + err := runSelectedTipRequest(context, incomingRoute, outgoingRoute, peer) if err != nil { return err } } } -func runSelectedTipRequest(incomingRoute *router.Route, outgoingRoute *router.Route, - peer *peerpkg.Peer, dag *blockdag.BlockDAG) error { +func runSelectedTipRequest(context RequestSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer) error { peer.WaitForSelectedTipRequests() defer peer.FinishRequestingSelectedTip() @@ -58,7 +44,7 @@ func runSelectedTipRequest(incomingRoute *router.Route, outgoingRoute *router.Ro } peer.SetSelectedTipHash(peerSelectedTipHash) - StartIBDIfRequired(dag) + context.StartIBDIfRequired() return nil } @@ -77,15 +63,20 @@ func receiveSelectedTip(incomingRoute *router.Route) (selectedTipHash *daghash.H return msgSelectedTip.SelectedTipHash, nil } +// GetSelectedTipContext is the interface for the context needed for the HandleGetSelectedTip flow. +type GetSelectedTipContext interface { + DAG() *blockdag.BlockDAG +} + // HandleGetSelectedTip handles getSelectedTip messages -func HandleGetSelectedTip(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error { +func HandleGetSelectedTip(context GetSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { for { err := receiveGetSelectedTip(incomingRoute) if err != nil { return err } - selectedTipHash := dag.SelectedTipHash() + selectedTipHash := context.DAG().SelectedTipHash() err = sendSelectedTipHash(outgoingRoute, selectedTipHash) if err != nil { return err diff --git a/protocol/flows/ping/ping.go b/protocol/flows/ping/ping.go index dffa0bda4..8c0daa16c 100644 --- a/protocol/flows/ping/ping.go +++ b/protocol/flows/ping/ping.go @@ -11,9 +11,13 @@ import ( "github.com/kaspanet/kaspad/wire" ) +// ReceivePingsContext is the interface for the context needed for the ReceivePings flow. +type ReceivePingsContext interface { +} + // ReceivePings handles all ping messages coming through incomingRoute. // This function assumes that incomingRoute will only return MsgPing. -func ReceivePings(incomingRoute *router.Route, outgoingRoute *router.Route) error { +func ReceivePings(_ ReceivePingsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { for { message, err := incomingRoute.Dequeue() if err != nil { @@ -29,10 +33,14 @@ func ReceivePings(incomingRoute *router.Route, outgoingRoute *router.Route) erro } } +// SendPingsContext is the interface for the context needed for the SendPings flow. +type SendPingsContext interface { +} + // SendPings starts sending MsgPings every pingInterval seconds to the // given peer. // This function assumes that incomingRoute will only return MsgPong. -func SendPings(incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { +func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { const pingInterval = 2 * time.Minute ticker := time.NewTicker(pingInterval) defer ticker.Stop() diff --git a/protocol/flows/relaytransactions/relaytransactions.go b/protocol/flows/relaytransactions/relaytransactions.go index eefaebf4e..7833539be 100644 --- a/protocol/flows/relaytransactions/relaytransactions.go +++ b/protocol/flows/relaytransactions/relaytransactions.go @@ -6,7 +6,6 @@ import ( "github.com/kaspanet/kaspad/netadapter" "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/protocol/common" - peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/protocol/protocolerrors" "github.com/kaspanet/kaspad/util" "github.com/kaspanet/kaspad/util/daghash" @@ -14,15 +13,18 @@ import ( "github.com/pkg/errors" ) -// NewBlockHandler is a function that is to be -// called when a new block is successfully processed. -type NewBlockHandler func(block *util.Block) error +// RelayedTransactionsContext is the interface for the context needed for the HandleRelayedTransactions flow. +type RelayedTransactionsContext interface { + NetAdapter() *netadapter.NetAdapter + DAG() *blockdag.BlockDAG + SharedRequestedTransactions() *SharedRequestedTransactions + TxPool() *mempool.TxPool + Broadcast(message wire.Message) error +} // HandleRelayedTransactions listens to wire.MsgInvTransaction messages, requests their corresponding transactions if they // are missing, adds them to the mempool and propagates them to the rest of the network. -func HandleRelayedTransactions(incomingRoute *router.Route, outgoingRoute *router.Route, - netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG, txPool *mempool.TxPool, - sharedRequestedTransactions *SharedRequestedTransactions) error { +func HandleRelayedTransactions(context RelayedTransactionsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { invsQueue := make([]*wire.MsgInvTransaction, 0) for { @@ -31,29 +33,27 @@ func HandleRelayedTransactions(incomingRoute *router.Route, outgoingRoute *route return err } - requestedIDs, err := requestInvTransactions(outgoingRoute, txPool, dag, sharedRequestedTransactions, inv) + requestedIDs, err := requestInvTransactions(context, outgoingRoute, inv) if err != nil { return err } - err = receiveTransactions(requestedIDs, incomingRoute, &invsQueue, txPool, netAdapter, - sharedRequestedTransactions) + err = receiveTransactions(context, requestedIDs, incomingRoute, &invsQueue) if err != nil { return err } } } -func requestInvTransactions(outgoingRoute *router.Route, txPool *mempool.TxPool, dag *blockdag.BlockDAG, - sharedRequestedTransactions *SharedRequestedTransactions, inv *wire.MsgInvTransaction) (requestedIDs []*daghash.TxID, - err error) { +func requestInvTransactions(context RelayedTransactionsContext, outgoingRoute *router.Route, + inv *wire.MsgInvTransaction) (requestedIDs []*daghash.TxID, err error) { idsToRequest := make([]*daghash.TxID, 0, len(inv.TxIDS)) for _, txID := range inv.TxIDS { - if isKnownTransaction(txPool, dag, txID) { + if isKnownTransaction(context, txID) { continue } - exists := sharedRequestedTransactions.addIfNotExists(txID) + exists := context.SharedRequestedTransactions().addIfNotExists(txID) if exists { continue } @@ -67,16 +67,16 @@ func requestInvTransactions(outgoingRoute *router.Route, txPool *mempool.TxPool, msgGetTransactions := wire.NewMsgGetTransactions(idsToRequest) err = outgoingRoute.Enqueue(msgGetTransactions) if err != nil { - sharedRequestedTransactions.removeMany(idsToRequest) + context.SharedRequestedTransactions().removeMany(idsToRequest) return nil, err } return idsToRequest, nil } -func isKnownTransaction(txPool *mempool.TxPool, dag *blockdag.BlockDAG, txID *daghash.TxID) bool { +func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID) bool { // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). - if txPool.HaveTransaction(txID) { + if context.TxPool().HaveTransaction(txID) { return true } @@ -91,7 +91,7 @@ func isKnownTransaction(txPool *mempool.TxPool, dag *blockdag.BlockDAG, txID *da prevOut := wire.Outpoint{TxID: *txID} for i := uint32(0); i < 2; i++ { prevOut.Index = i - _, ok := dag.GetUTXOEntry(prevOut) + _, ok := context.DAG().GetUTXOEntry(prevOut) if ok { return true } @@ -120,7 +120,7 @@ func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction) return inv, nil } -func broadcastAcceptedTransactions(netAdapter *netadapter.NetAdapter, acceptedTxs []*mempool.TxDesc) error { +func broadcastAcceptedTransactions(context RelayedTransactionsContext, acceptedTxs []*mempool.TxDesc) error { // TODO(libp2p) Add mechanism to avoid sending to other peers invs that are known to them (e.g. mruinvmap) // TODO(libp2p) Consider broadcasting in bulks idsToBroadcast := make([]*daghash.TxID, len(acceptedTxs)) @@ -128,7 +128,7 @@ func broadcastAcceptedTransactions(netAdapter *netadapter.NetAdapter, acceptedTx idsToBroadcast[i] = tx.Tx.ID() } inv := wire.NewMsgTxInv(idsToBroadcast) - return netAdapter.Broadcast(peerpkg.ReadyPeerIDs(), inv) + return context.Broadcast(inv) } // readMsgTx returns the next msgTx in incomingRoute, and populates invsQueue with any inv messages that meanwhile arrive. @@ -154,13 +154,12 @@ func readMsgTx(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction } } -func receiveTransactions(requestedTransactions []*daghash.TxID, incomingRoute *router.Route, - invsQueue *[]*wire.MsgInvTransaction, txPool *mempool.TxPool, netAdapter *netadapter.NetAdapter, - sharedRequestedTransactions *SharedRequestedTransactions) error { +func receiveTransactions(context RelayedTransactionsContext, requestedTransactions []*daghash.TxID, incomingRoute *router.Route, + invsQueue *[]*wire.MsgInvTransaction) error { // In case the function returns earlier than expected, we want to make sure sharedRequestedTransactions is // clean from any pending transactions. - defer sharedRequestedTransactions.removeMany(requestedTransactions) + defer context.SharedRequestedTransactions().removeMany(requestedTransactions) for _, expectedID := range requestedTransactions { msgTx, err := readMsgTx(incomingRoute, invsQueue) if err != nil { @@ -171,7 +170,7 @@ func receiveTransactions(requestedTransactions []*daghash.TxID, incomingRoute *r return protocolerrors.Errorf(true, "expected transaction %s", expectedID) } - acceptedTxs, err := txPool.ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag + acceptedTxs, err := context.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -197,7 +196,7 @@ func receiveTransactions(requestedTransactions []*daghash.TxID, incomingRoute *r return protocolerrors.Errorf(true, "rejected transaction %s", tx.ID()) } - err = broadcastAcceptedTransactions(netAdapter, acceptedTxs) + err = broadcastAcceptedTransactions(context, acceptedTxs) if err != nil { panic(err) } diff --git a/protocol/manager.go b/protocol/manager.go index aec18de74..6a32a2e9b 100644 --- a/protocol/manager.go +++ b/protocol/manager.go @@ -6,28 +6,12 @@ import ( "github.com/kaspanet/kaspad/config" "github.com/kaspanet/kaspad/mempool" "github.com/kaspanet/kaspad/netadapter" - "github.com/kaspanet/kaspad/protocol/flows/relaytransactions" - "github.com/kaspanet/kaspad/util" - "github.com/kaspanet/kaspad/util/daghash" - "sync" - "time" + "github.com/kaspanet/kaspad/protocol/flowcontext" ) // Manager manages the p2p protocol type Manager struct { - cfg *config.Config - netAdapter *netadapter.NetAdapter - txPool *mempool.TxPool - addedTransactions []*util.Tx - dag *blockdag.BlockDAG - addressManager *addrmgr.AddrManager - - transactionsToRebroadcastLock sync.Mutex - transactionsToRebroadcast map[daghash.TxID]*util.Tx - lastRebroadcastTime time.Time - sharedRequestedTransactions *relaytransactions.SharedRequestedTransactions - - isInIBD uint32 // TODO(libp2p) populate this var + context *flowcontext.FlowContext } // NewManager creates a new instance of the p2p protocol manager @@ -40,11 +24,7 @@ func NewManager(cfg *config.Config, dag *blockdag.BlockDAG, } manager := Manager{ - netAdapter: netAdapter, - dag: dag, - addressManager: addressManager, - txPool: txPool, - sharedRequestedTransactions: relaytransactions.NewSharedRequestedTransactions(), + context: flowcontext.New(cfg, dag, addressManager, txPool, netAdapter), } netAdapter.SetRouterInitializer(manager.routerInitializer) return &manager, nil @@ -52,10 +32,10 @@ func NewManager(cfg *config.Config, dag *blockdag.BlockDAG, // Start starts the p2p protocol func (m *Manager) Start() error { - return m.netAdapter.Start() + return m.context.NetAdapter().Start() } // Stop stops the p2p protocol func (m *Manager) Stop() error { - return m.netAdapter.Stop() + return m.context.NetAdapter().Stop() } diff --git a/protocol/peer/peer.go b/protocol/peer/peer.go index ed85256a7..d7669c17a 100644 --- a/protocol/peer/peer.go +++ b/protocol/peer/peer.go @@ -11,7 +11,6 @@ import ( "github.com/kaspanet/kaspad/util/mstime" "github.com/kaspanet/kaspad/util/subnetworkid" "github.com/kaspanet/kaspad/wire" - "github.com/pkg/errors" ) // Peer holds data about a peer. @@ -118,49 +117,6 @@ func (p *Peer) String() string { panic("unimplemented") } -var ( - readyPeers = make(map[*id.ID]*Peer, 0) - readyPeersMutex sync.RWMutex -) - -// ErrPeerWithSameIDExists signifies that a peer with the same ID already exist. -var ErrPeerWithSameIDExists = errors.New("ready with the same ID already exists") - -// AddToReadyPeers marks this peer as ready and adds it to the ready peers list. -func AddToReadyPeers(peer *Peer) error { - readyPeersMutex.RLock() - defer readyPeersMutex.RUnlock() - - if _, ok := readyPeers[peer.id]; ok { - return errors.Wrapf(ErrPeerWithSameIDExists, "peer with ID %s already exists", peer.id) - } - - readyPeers[peer.id] = peer - return nil -} - -// ReadyPeerIDs returns the peer IDs of all the ready peers. -func ReadyPeerIDs() []*id.ID { - readyPeersMutex.RLock() - defer readyPeersMutex.RUnlock() - peerIDs := make([]*id.ID, len(readyPeers)) - i := 0 - for peerID := range readyPeers { - peerIDs[i] = peerID - i++ - } - return peerIDs -} - -// ReadyPeers returns a copy of the currently ready peers -func ReadyPeers() []*Peer { - peers := make([]*Peer, 0, len(readyPeers)) - for _, readyPeer := range readyPeers { - peers = append(peers, readyPeer) - } - return peers -} - // RequestSelectedTipIfRequired notifies the peer that requesting // a selected tip is required. This triggers the selected tip // request flow. diff --git a/protocol/protocol.go b/protocol/protocol.go index 1f6e8da7c..68c0443e7 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -30,14 +30,14 @@ func (m *Manager) routerInitializer() (*routerpkg.Router, error) { // TODO(libp2p) Ban peer panic("unimplemented") } - err = m.netAdapter.DisconnectAssociatedConnection(router) + err = m.context.NetAdapter().DisconnectAssociatedConnection(router) if err != nil { panic(err) } return } if errors.Is(err, routerpkg.ErrTimeout) { - err = m.netAdapter.DisconnectAssociatedConnection(router) + err = m.context.NetAdapter().DisconnectAssociatedConnection(router) if err != nil { panic(err) } @@ -57,7 +57,7 @@ func (m *Manager) startFlows(router *routerpkg.Router) error { stop := make(chan error) stopped := uint32(0) - peer, closed, err := handshake.HandleHandshake(m.cfg, router, m.netAdapter, m.dag, m.addressManager) + peer, closed, err := handshake.HandleHandshake(m.context, router) if err != nil { return err } @@ -82,13 +82,13 @@ func (m *Manager) addAddressFlows(router *routerpkg.Router, stopped *uint32, sto addOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdGetAddresses}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return addressexchange.SendAddresses(incomingRoute, outgoingRoute, m.addressManager) + return addressexchange.SendAddresses(m.context, incomingRoute, outgoingRoute) }, ) addOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddress}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return addressexchange.ReceiveAddresses(incomingRoute, outgoingRoute, m.cfg, peer, m.addressManager) + return addressexchange.ReceiveAddresses(m.context, incomingRoute, outgoingRoute, peer) }, ) } @@ -99,14 +99,14 @@ func (m *Manager) addBlockRelayFlows(router *routerpkg.Router, stopped *uint32, addFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return blockrelay.HandleRelayInvs(incomingRoute, - outgoingRoute, peer, m.netAdapter, m.dag, m.OnNewBlock) + return blockrelay.HandleRelayInvs(m.context, incomingRoute, + outgoingRoute, peer) }, ) addFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdGetRelayBlocks}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return blockrelay.HandleRelayBlockRequests(incomingRoute, outgoingRoute, peer, m.dag) + return blockrelay.HandleRelayBlockRequests(m.context, incomingRoute, outgoingRoute, peer) }, ) } @@ -116,13 +116,13 @@ func (m *Manager) addPingFlows(router *routerpkg.Router, stopped *uint32, stop c addFlow("ReceivePings", router, []wire.MessageCommand{wire.CmdPing}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ping.ReceivePings(incomingRoute, outgoingRoute) + return ping.ReceivePings(m.context, incomingRoute, outgoingRoute) }, ) addFlow("SendPings", router, []wire.MessageCommand{wire.CmdPong}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ping.SendPings(incomingRoute, outgoingRoute, peer) + return ping.SendPings(m.context, incomingRoute, outgoingRoute, peer) }, ) } @@ -134,31 +134,31 @@ func (m *Manager) addIBDFlows(router *routerpkg.Router, stopped *uint32, stop ch addFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ibd.HandleIBD(incomingRoute, outgoingRoute, peer, m.dag, m.OnNewBlock) + return ibd.HandleIBD(m.context, incomingRoute, outgoingRoute, peer) }, ) addFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ibd.RequestSelectedTip(incomingRoute, outgoingRoute, peer, m.dag) + return ibd.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer) }, ) addFlow("HandleGetSelectedTip", router, []wire.MessageCommand{wire.CmdGetSelectedTip}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ibd.HandleGetSelectedTip(incomingRoute, outgoingRoute, m.dag) + return ibd.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute) }, ) addFlow("HandleGetBlockLocator", router, []wire.MessageCommand{wire.CmdGetBlockLocator}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ibd.HandleGetBlockLocator(incomingRoute, outgoingRoute, m.dag) + return ibd.HandleGetBlockLocator(m.context, incomingRoute, outgoingRoute) }, ) addFlow("HandleGetBlocks", router, []wire.MessageCommand{wire.CmdGetBlocks}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ibd.HandleGetBlocks(incomingRoute, outgoingRoute, m.dag) + return ibd.HandleGetBlocks(m.context, incomingRoute, outgoingRoute) }, ) } @@ -169,8 +169,7 @@ func (m *Manager) addTransactionRelayFlow(router *routerpkg.Router, stopped *uin addFlow("HandleRelayedTransactions", router, []wire.MessageCommand{wire.CmdInv, wire.CmdTx}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return relaytransactions.HandleRelayedTransactions(incomingRoute, outgoingRoute, m.netAdapter, m.dag, - m.txPool, m.sharedRequestedTransactions) + return relaytransactions.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute) }, ) } diff --git a/protocol/transactions.go b/protocol/transactions.go deleted file mode 100644 index 99f69aaa2..000000000 --- a/protocol/transactions.go +++ /dev/null @@ -1,58 +0,0 @@ -package protocol - -import ( - "github.com/kaspanet/kaspad/protocol/peer" - "github.com/kaspanet/kaspad/util" - "github.com/kaspanet/kaspad/util/daghash" - "github.com/kaspanet/kaspad/wire" - "github.com/pkg/errors" - "time" -) - -// AddTransaction adds transaction to the mempool and propagates it. -func (m *Manager) AddTransaction(tx *util.Tx) error { - m.transactionsToRebroadcastLock.Lock() - defer m.transactionsToRebroadcastLock.Unlock() - - transactionsAcceptedToMempool, err := m.txPool.ProcessTransaction(tx, false, 0) - if err != nil { - return err - } - - if len(transactionsAcceptedToMempool) > 1 { - panic(errors.New("got more than one accepted transactions when no orphans were allowed")) - } - - m.transactionsToRebroadcast[*tx.ID()] = tx - inv := wire.NewMsgTxInv([]*daghash.TxID{tx.ID()}) - return m.netAdapter.Broadcast(peer.ReadyPeerIDs(), inv) -} - -func (m *Manager) updateTransactionsToRebroadcast(block *util.Block) { - m.transactionsToRebroadcastLock.Lock() - defer m.transactionsToRebroadcastLock.Unlock() - // Note: if the block is red, its transactions won't be rebroadcasted - // anymore, although they are not included in the UTXO set. - // This is probably ok, since red blocks are quite rare. - for _, tx := range block.Transactions() { - delete(m.transactionsToRebroadcast, *tx.ID()) - } -} - -func (m *Manager) shouldRebroadcastTransactions() bool { - const rebroadcastInterval = 30 * time.Second - return time.Since(m.lastRebroadcastTime) > rebroadcastInterval -} - -func (m *Manager) txIDsToRebroadcast() []*daghash.TxID { - m.transactionsToRebroadcastLock.Lock() - defer m.transactionsToRebroadcastLock.Unlock() - - txIDs := make([]*daghash.TxID, len(m.transactionsToRebroadcast)) - i := 0 - for _, tx := range m.transactionsToRebroadcast { - txIDs[i] = tx.ID() - i++ - } - return txIDs -}