diff --git a/protocol/flows/blockrelay/handle_relay_invs.go b/protocol/flows/blockrelay/handle_relay_invs.go index 5bfe9481e..ff9a25dd0 100644 --- a/protocol/flows/blockrelay/handle_relay_invs.go +++ b/protocol/flows/blockrelay/handle_relay_invs.go @@ -26,28 +26,45 @@ type RelayInvsContext interface { Broadcast(message wire.Message) error } +type handleRelayInvsFlow struct { + RelayInvsContext + incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer + invsQueue []*wire.MsgInvRelayBlock +} + // HandleRelayInvs listens to wire.MsgInvRelayBlock messages, requests their corresponding blocks if they // are missing, adds them to the DAG and propagates them to the rest of the network. func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { - invsQueue := make([]*wire.MsgInvRelayBlock, 0) + flow := &handleRelayInvsFlow{ + RelayInvsContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + peer: peer, + invsQueue: make([]*wire.MsgInvRelayBlock, 0), + } + return flow.start() +} + +func (flow *handleRelayInvsFlow) start() error { for { - inv, err := readInv(incomingRoute, &invsQueue) + inv, err := flow.readInv() if err != nil { return err } - if context.DAG().IsKnownBlock(inv.Hash) { - if context.DAG().IsKnownInvalid(inv.Hash) { + if flow.DAG().IsKnownBlock(inv.Hash) { + if flow.DAG().IsKnownInvalid(inv.Hash) { return protocolerrors.Errorf(true, "sent inv of an invalid block %s", inv.Hash) } continue } - context.StartIBDIfRequired() - if context.IsInIBD() { + flow.StartIBDIfRequired() + if flow.IsInIBD() { // Block relay is disabled during IBD continue } @@ -56,7 +73,7 @@ func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outg requestQueue.enqueueIfNotExists(inv.Hash) for requestQueue.len() > 0 { - err := requestBlocks(context, outgoingRoute, peer, incomingRoute, &invsQueue, requestQueue) + err := flow.requestBlocks(requestQueue) if err != nil { return err } @@ -64,30 +81,28 @@ func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outg } } -func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlock) (*wire.MsgInvRelayBlock, error) { +func (flow *handleRelayInvsFlow) readInv() (*wire.MsgInvRelayBlock, error) { - if len(*invsQueue) > 0 { + if len(flow.invsQueue) > 0 { var inv *wire.MsgInvRelayBlock - inv, *invsQueue = (*invsQueue)[0], (*invsQueue)[1:] + inv, flow.invsQueue = flow.invsQueue[0], flow.invsQueue[1:] return inv, nil } - msg, err := incomingRoute.Dequeue() + msg, err := flow.incomingRoute.Dequeue() if err != nil { return nil, err } inv, ok := msg.(*wire.MsgInvRelayBlock) if !ok { - return nil, protocolerrors.Errorf(true, "unexpected %s message in the block relay flow while "+ + return nil, protocolerrors.Errorf(true, "unexpected %s message in the block relay handleRelayInvsFlow while "+ "expecting an inv message", msg.Command()) } return inv, nil } -func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route, - peer *peerpkg.Peer, incomingRoute *router.Route, - invsQueue *[]*wire.MsgInvRelayBlock, requestQueue *hashesQueueSet) error { +func (flow *handleRelayInvsFlow) requestBlocks(requestQueue *hashesQueueSet) error { numHashesToRequest := mathUtil.MinInt(wire.MsgGetRelayBlocksHashes, requestQueue.len()) hashesToRequest := requestQueue.dequeue(numHashesToRequest) @@ -95,7 +110,7 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route, pendingBlocks := map[daghash.Hash]struct{}{} var filteredHashesToRequest []*daghash.Hash for _, hash := range hashesToRequest { - exists := context.SharedRequestedBlocks().addIfNotExists(hash) + exists := flow.SharedRequestedBlocks().addIfNotExists(hash) if !exists { continue } @@ -106,16 +121,16 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route, // In case the function returns earlier than expected, we want to make sure requestedBlocks is // clean from any pending blocks. - defer context.SharedRequestedBlocks().removeSet(pendingBlocks) + defer flow.SharedRequestedBlocks().removeSet(pendingBlocks) getRelayBlocksMsg := wire.NewMsgGetRelayBlocks(filteredHashesToRequest) - err := outgoingRoute.Enqueue(getRelayBlocksMsg) + err := flow.outgoingRoute.Enqueue(getRelayBlocksMsg) if err != nil { return err } for len(pendingBlocks) > 0 { - msgBlock, err := readMsgBlock(incomingRoute, invsQueue) + msgBlock, err := flow.readMsgBlock() if err != nil { return err } @@ -126,9 +141,9 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route, return protocolerrors.Errorf(true, "got unrequested block %s", block.Hash()) } delete(pendingBlocks, *blockHash) - context.SharedRequestedBlocks().remove(blockHash) + flow.SharedRequestedBlocks().remove(blockHash) - err = processAndRelayBlock(context, peer, requestQueue, block) + err = flow.processAndRelayBlock(requestQueue, block) if err != nil { return err } @@ -139,18 +154,18 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route, // readMsgBlock returns the next msgBlock in msgChan, and populates invsQueue with any inv messages that meanwhile arrive. // // Note: this function assumes msgChan can contain only wire.MsgInvRelayBlock and wire.MsgBlock messages. -func readMsgBlock(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlock) ( +func (flow *handleRelayInvsFlow) readMsgBlock() ( msgBlock *wire.MsgBlock, err error) { for { - message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } switch message := message.(type) { case *wire.MsgInvRelayBlock: - *invsQueue = append(*invsQueue, message) + flow.invsQueue = append(flow.invsQueue, message) case *wire.MsgBlock: return message, nil default: @@ -159,11 +174,10 @@ func readMsgBlock(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlo } } -func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer, - requestQueue *hashesQueueSet, block *util.Block) error { +func (flow *handleRelayInvsFlow) processAndRelayBlock(requestQueue *hashesQueueSet, block *util.Block) error { blockHash := block.Hash() - isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone) + isOrphan, isDelayed, err := flow.DAG().ProcessBlock(block, blockdag.BFNone) if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log @@ -173,7 +187,7 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer, blockHash)) } log.Infof("Rejected block %s from %s: %s", blockHash, - peer, err) + flow.peer, err) return protocolerrors.Wrap(true, err, "got invalid block") } @@ -190,7 +204,7 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer, } const maxOrphanBlueScoreDiff = 10000 - selectedTipBlueScore := context.DAG().SelectedTipBlueScore() + selectedTipBlueScore := flow.DAG().SelectedTipBlueScore() if blueScore > selectedTipBlueScore+maxOrphanBlueScoreDiff { log.Infof("Orphan block %s has blue score %d and the selected tip blue score is "+ "%d. Ignoring orphans with a blue score difference from the selected tip greater than %d", @@ -199,7 +213,7 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer, } // Request the parents for the orphan block from the peer that sent it. - missingAncestors := context.DAG().GetOrphanMissingAncestorHashes(blockHash) + missingAncestors := flow.DAG().GetOrphanMissingAncestorHashes(blockHash) for _, missingAncestor := range missingAncestors { requestQueue.enqueueIfNotExists(missingAncestor) } @@ -215,13 +229,13 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer, // sm.restartSyncIfNeeded() //// Clear the rejected transactions. //sm.rejectedTxns = make(map[daghash.TxID]struct{}) - err = context.Broadcast(wire.NewMsgInvBlock(blockHash)) + err = flow.Broadcast(wire.NewMsgInvBlock(blockHash)) if err != nil { return err } - context.StartIBDIfRequired() - err = context.OnNewBlock(block) + flow.StartIBDIfRequired() + err = flow.OnNewBlock(block) if err != nil { panic(err) } diff --git a/protocol/flows/handshake/receiveversion.go b/protocol/flows/handshake/receiveversion.go index 419000a9a..a17e5f69e 100644 --- a/protocol/flows/handshake/receiveversion.go +++ b/protocol/flows/handshake/receiveversion.go @@ -19,12 +19,29 @@ var ( minAcceptableProtocolVersion = wire.ProtocolVersion ) +type receiveVersionFlow struct { + HandleHandshakeContext + incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer +} + // ReceiveVersion waits for the peer to send a version message, sends a // verack in response, and updates its info accordingly. func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) (*wire.NetAddress, error) { - message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + flow := &receiveVersionFlow{ + HandleHandshakeContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + peer: peer, + } + + return flow.start() +} + +func (flow *receiveVersionFlow) start() (*wire.NetAddress, error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } @@ -34,7 +51,7 @@ func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route, return nil, protocolerrors.New(true, "a version message must precede all others") } - if !allowSelfConnections && context.NetAdapter().ID().IsEqual(msgVersion.ID) { + if !allowSelfConnections && flow.NetAdapter().ID().IsEqual(msgVersion.ID) { return nil, protocolerrors.New(true, "connected to self") } @@ -51,7 +68,7 @@ func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route, } // Disconnect from partial nodes in networks that don't allow them - if !context.DAG().Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil { + if !flow.DAG().Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil { return nil, protocolerrors.New(true, "partial nodes are not allowed") } @@ -68,11 +85,10 @@ func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route, // return nil, false, errors.New("incompatible subnetworks") //} - peer.UpdateFieldsFromMsgVersion(msgVersion) - err = outgoingRoute.Enqueue(wire.NewMsgVerAck()) + flow.peer.UpdateFieldsFromMsgVersion(msgVersion) + err = flow.outgoingRoute.Enqueue(wire.NewMsgVerAck()) if err != nil { return nil, err } - // TODO(libp2p) Register peer ID return msgVersion.Address, nil } diff --git a/protocol/flows/handshake/sendversion.go b/protocol/flows/handshake/sendversion.go index 4eaa3a39c..0f0049eb7 100644 --- a/protocol/flows/handshake/sendversion.go +++ b/protocol/flows/handshake/sendversion.go @@ -25,19 +25,32 @@ var ( defaultRequiredServices = wire.SFNodeNetwork ) +type sendVersionFlow struct { + HandleHandshakeContext + incomingRoute, outgoingRoute *router.Route +} + // SendVersion sends a version to a peer and waits for verack. func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { + flow := &sendVersionFlow{ + HandleHandshakeContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + } + return flow.start() +} - selectedTipHash := context.DAG().SelectedTipHash() - subnetworkID := context.Config().SubnetworkID +func (flow *sendVersionFlow) start() error { + selectedTipHash := flow.DAG().SelectedTipHash() + subnetworkID := flow.Config().SubnetworkID // Version message. - localAddress, err := context.NetAdapter().GetBestLocalAddress() + localAddress, err := flow.NetAdapter().GetBestLocalAddress() if err != nil { panic(err) } - msg := wire.NewMsgVersion(localAddress, context.NetAdapter().ID(), selectedTipHash, subnetworkID) - msg.AddUserAgent(userAgentName, userAgentVersion, context.Config().UserAgentComments...) + msg := wire.NewMsgVersion(localAddress, flow.NetAdapter().ID(), selectedTipHash, subnetworkID) + msg.AddUserAgent(userAgentName, userAgentVersion, flow.Config().UserAgentComments...) // Advertise the services flag msg.Services = defaultServices @@ -46,15 +59,15 @@ func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, ou msg.ProtocolVersion = wire.ProtocolVersion // Advertise if inv messages for transactions are desired. - msg.DisableRelayTx = context.Config().BlocksOnly + msg.DisableRelayTx = flow.Config().BlocksOnly - err = outgoingRoute.Enqueue(msg) + err = flow.outgoingRoute.Enqueue(msg) if err != nil { return err } // Wait for verack - _, err = incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + _, err = flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return err } diff --git a/protocol/flows/ibd/handle_get_block_locator.go b/protocol/flows/ibd/handle_get_block_locator.go index 324d3b9cc..c31f38adc 100644 --- a/protocol/flows/ibd/handle_get_block_locator.go +++ b/protocol/flows/ibd/handle_get_block_locator.go @@ -13,33 +13,47 @@ type GetBlockLocatorContext interface { DAG() *blockdag.BlockDAG } +type handleGetBlockLocatorFlow struct { + GetBlockLocatorContext + incomingRoute, outgoingRoute *router.Route +} + // HandleGetBlockLocator handles getBlockLocator messages func HandleGetBlockLocator(context GetBlockLocatorContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { + flow := &handleGetBlockLocatorFlow{ + GetBlockLocatorContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + } + return flow.start() +} + +func (flow *handleGetBlockLocatorFlow) start() error { for { - lowHash, highHash, err := receiveGetBlockLocator(incomingRoute) + lowHash, highHash, err := flow.receiveGetBlockLocator() if err != nil { return err } - locator, err := context.DAG().BlockLocatorFromHashes(highHash, lowHash) + locator, err := flow.DAG().BlockLocatorFromHashes(highHash, lowHash) if err != nil || len(locator) == 0 { return protocolerrors.Errorf(true, "couldn't build a block "+ "locator between blocks %s and %s", lowHash, highHash) } - err = sendBlockLocator(outgoingRoute, locator) + err = flow.sendBlockLocator(locator) if err != nil { return err } } } -func receiveGetBlockLocator(incomingRoute *router.Route) (lowHash *daghash.Hash, +func (flow *handleGetBlockLocatorFlow) receiveGetBlockLocator() (lowHash *daghash.Hash, highHash *daghash.Hash, err error) { - message, err := incomingRoute.Dequeue() + message, err := flow.incomingRoute.Dequeue() if err != nil { return nil, nil, err } @@ -48,9 +62,9 @@ func receiveGetBlockLocator(incomingRoute *router.Route) (lowHash *daghash.Hash, return msgGetBlockLocator.LowHash, msgGetBlockLocator.HighHash, nil } -func sendBlockLocator(outgoingRoute *router.Route, locator blockdag.BlockLocator) error { +func (flow *handleGetBlockLocatorFlow) sendBlockLocator(locator blockdag.BlockLocator) error { msgBlockLocator := wire.NewMsgBlockLocator(locator) - err := outgoingRoute.Enqueue(msgBlockLocator) + err := flow.outgoingRoute.Enqueue(msgBlockLocator) if err != nil { return err } diff --git a/protocol/flows/ibd/handle_get_blocks.go b/protocol/flows/ibd/handle_get_blocks.go index c37832ddd..575cb50f0 100644 --- a/protocol/flows/ibd/handle_get_blocks.go +++ b/protocol/flows/ibd/handle_get_blocks.go @@ -12,20 +12,34 @@ type GetBlocksContext interface { DAG() *blockdag.BlockDAG } +type handleGetBlocksFlow struct { + GetBlocksContext + incomingRoute, outgoingRoute *router.Route +} + // HandleGetBlocks handles getBlocks messages func HandleGetBlocks(context GetBlocksContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { + flow := &handleGetBlocksFlow{ + GetBlocksContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + } + return flow.start() +} + +func (flow *handleGetBlocksFlow) start() error { for { - lowHash, highHash, err := receiveGetBlocks(incomingRoute) + lowHash, highHash, err := receiveGetBlocks(flow.incomingRoute) if err != nil { return err } - msgIBDBlocks, err := buildMsgIBDBlocks(context, lowHash, highHash) + msgIBDBlocks, err := flow.buildMsgIBDBlocks(lowHash, highHash) if err != nil { return err } - err = sendMsgIBDBlocks(outgoingRoute, msgIBDBlocks) + err = flow.sendMsgIBDBlocks(msgIBDBlocks) if err != nil { return nil } @@ -44,18 +58,18 @@ func receiveGetBlocks(incomingRoute *router.Route) (lowHash *daghash.Hash, return msgGetBlocks.LowHash, msgGetBlocks.HighHash, nil } -func buildMsgIBDBlocks(context GetBlocksContext, lowHash *daghash.Hash, +func (flow *handleGetBlocksFlow) buildMsgIBDBlocks(lowHash *daghash.Hash, highHash *daghash.Hash) ([]*wire.MsgIBDBlock, error) { const maxHashesInMsgIBDBlocks = wire.MaxInvPerMsg - blockHashes, err := context.DAG().AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks) + blockHashes, err := flow.DAG().AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks) if err != nil { return nil, err } msgIBDBlocks := make([]*wire.MsgIBDBlock, len(blockHashes)) for i, blockHash := range blockHashes { - block, err := context.DAG().BlockByHash(blockHash) + block, err := flow.DAG().BlockByHash(blockHash) if err != nil { return nil, err } @@ -65,9 +79,9 @@ func buildMsgIBDBlocks(context GetBlocksContext, lowHash *daghash.Hash, return msgIBDBlocks, nil } -func sendMsgIBDBlocks(outgoingRoute *router.Route, msgIBDBlocks []*wire.MsgIBDBlock) error { +func (flow *handleGetBlocksFlow) sendMsgIBDBlocks(msgIBDBlocks []*wire.MsgIBDBlock) error { for _, msgIBDBlock := range msgIBDBlocks { - err := outgoingRoute.Enqueue(msgIBDBlock) + err := flow.outgoingRoute.Enqueue(msgIBDBlock) if err != nil { return err } diff --git a/protocol/flows/ibd/ibd.go b/protocol/flows/ibd/ibd.go index 9a665a53c..691c6a72c 100644 --- a/protocol/flows/ibd/ibd.go +++ b/protocol/flows/ibd/ibd.go @@ -19,49 +19,65 @@ type HandleIBDContext interface { FinishIBD() } -// HandleIBD waits for IBD start and handles it when IBD is triggered for this peer -func HandleIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { +type handleIBDFlow struct { + HandleIBDContext + incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer +} +// HandleIBD waits for IBD start and handles it when IBD is triggered for this peer +func HandleIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer) error { + + flow := &handleIBDFlow{ + HandleIBDContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + peer: peer, + } + return flow.start() +} + +func (flow *handleIBDFlow) start() error { for { - err := runIBD(context, incomingRoute, outgoingRoute, peer) + err := flow.runIBD() if err != nil { return err } } } -func runIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { +func (flow *handleIBDFlow) runIBD() error { + flow.peer.WaitForIBDStart() + defer flow.FinishIBD() - peer.WaitForIBDStart() - defer context.FinishIBD() - - peerSelectedTipHash := peer.SelectedTipHash() - highestSharedBlockHash, err := findHighestSharedBlockHash(context, incomingRoute, outgoingRoute, peerSelectedTipHash) + peerSelectedTipHash := flow.peer.SelectedTipHash() + highestSharedBlockHash, err := flow.findHighestSharedBlockHash(peerSelectedTipHash) if err != nil { return err } - if context.DAG().IsKnownFinalizedBlock(highestSharedBlockHash) { + if flow.DAG().IsKnownFinalizedBlock(highestSharedBlockHash) { return protocolerrors.Errorf(false, "cannot initiate "+ "IBD with peer %s because the highest shared chain block (%s) is "+ - "below the finality point", peer, highestSharedBlockHash) + "below the finality point", flow.peer, highestSharedBlockHash) } - return downloadBlocks(context, incomingRoute, outgoingRoute, highestSharedBlockHash, peerSelectedTipHash) + return flow.downloadBlocks(highestSharedBlockHash, peerSelectedTipHash) } -func findHighestSharedBlockHash(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, - peerSelectedTipHash *daghash.Hash) (lowHash *daghash.Hash, err error) { +func (flow *handleIBDFlow) findHighestSharedBlockHash(peerSelectedTipHash *daghash.Hash) (lowHash *daghash.Hash, + err error) { - lowHash = context.DAG().Params.GenesisHash + lowHash = flow.DAG().Params.GenesisHash highHash := peerSelectedTipHash for { - err := sendGetBlockLocator(outgoingRoute, lowHash, highHash) + err := flow.sendGetBlockLocator(lowHash, highHash) if err != nil { return nil, err } - blockLocatorHashes, err := receiveBlockLocator(incomingRoute) + blockLocatorHashes, err := flow.receiveBlockLocator() if err != nil { return nil, err } @@ -70,23 +86,22 @@ func findHighestSharedBlockHash(context HandleIBDContext, incomingRoute *router. // If it is, return it. If it isn't, we need to narrow our // getBlockLocator request and try again. locatorHighHash := blockLocatorHashes[0] - if context.DAG().IsInDAG(locatorHighHash) { + if flow.DAG().IsInDAG(locatorHighHash) { return locatorHighHash, nil } - highHash, lowHash = context.DAG().FindNextLocatorBoundaries(blockLocatorHashes) + highHash, lowHash = flow.DAG().FindNextLocatorBoundaries(blockLocatorHashes) } } -func sendGetBlockLocator(outgoingRoute *router.Route, lowHash *daghash.Hash, - highHash *daghash.Hash) error { +func (flow *handleIBDFlow) sendGetBlockLocator(lowHash *daghash.Hash, highHash *daghash.Hash) error { msgGetBlockLocator := wire.NewMsgGetBlockLocator(highHash, lowHash) - return outgoingRoute.Enqueue(msgGetBlockLocator) + return flow.outgoingRoute.Enqueue(msgGetBlockLocator) } -func receiveBlockLocator(incomingRoute *router.Route) (blockLocatorHashes []*daghash.Hash, err error) { - message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) +func (flow *handleIBDFlow) receiveBlockLocator() (blockLocatorHashes []*daghash.Hash, err error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } @@ -99,21 +114,20 @@ func receiveBlockLocator(incomingRoute *router.Route) (blockLocatorHashes []*dag return msgBlockLocator.BlockLocatorHashes, nil } -func downloadBlocks(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, - highestSharedBlockHash *daghash.Hash, +func (flow *handleIBDFlow) downloadBlocks(highestSharedBlockHash *daghash.Hash, peerSelectedTipHash *daghash.Hash) error { - err := sendGetBlocks(outgoingRoute, highestSharedBlockHash, peerSelectedTipHash) + err := flow.sendGetBlocks(highestSharedBlockHash, peerSelectedTipHash) if err != nil { return err } for { - msgIBDBlock, err := receiveIBDBlock(incomingRoute) + msgIBDBlock, err := flow.receiveIBDBlock() if err != nil { return err } - err = processIBDBlock(context, msgIBDBlock) + err = flow.processIBDBlock(msgIBDBlock) if err != nil { return err } @@ -123,15 +137,15 @@ func downloadBlocks(context HandleIBDContext, incomingRoute *router.Route, outgo } } -func sendGetBlocks(outgoingRoute *router.Route, highestSharedBlockHash *daghash.Hash, +func (flow *handleIBDFlow) sendGetBlocks(highestSharedBlockHash *daghash.Hash, peerSelectedTipHash *daghash.Hash) error { msgGetBlockInvs := wire.NewMsgGetBlocks(highestSharedBlockHash, peerSelectedTipHash) - return outgoingRoute.Enqueue(msgGetBlockInvs) + return flow.outgoingRoute.Enqueue(msgGetBlockInvs) } -func receiveIBDBlock(incomingRoute *router.Route) (msgIBDBlock *wire.MsgIBDBlock, err error) { - message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) +func (flow *handleIBDFlow) receiveIBDBlock() (msgIBDBlock *wire.MsgIBDBlock, err error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } @@ -144,13 +158,13 @@ func receiveIBDBlock(incomingRoute *router.Route) (msgIBDBlock *wire.MsgIBDBlock return msgIBDBlock, nil } -func processIBDBlock(context HandleIBDContext, msgIBDBlock *wire.MsgIBDBlock) error { +func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *wire.MsgIBDBlock) error { block := util.NewBlock(&msgIBDBlock.MsgBlock) - if context.DAG().IsInDAG(block.Hash()) { + if flow.DAG().IsInDAG(block.Hash()) { return nil } - isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone) + isOrphan, isDelayed, err := flow.DAG().ProcessBlock(block, blockdag.BFNone) if err != nil { return err } @@ -162,7 +176,7 @@ func processIBDBlock(context HandleIBDContext, msgIBDBlock *wire.MsgIBDBlock) er return protocolerrors.Errorf(false, "received delayed block %s "+ "during IBD", block.Hash()) } - err = context.OnNewBlock(block) + err = flow.OnNewBlock(block) if err != nil { panic(err) } diff --git a/protocol/flows/ibd/selected_tip.go b/protocol/flows/ibd/selected_tip.go deleted file mode 100644 index d2b850721..000000000 --- a/protocol/flows/ibd/selected_tip.go +++ /dev/null @@ -1,104 +0,0 @@ -package ibd - -import ( - "github.com/kaspanet/kaspad/blockdag" - "github.com/kaspanet/kaspad/netadapter/router" - "github.com/kaspanet/kaspad/protocol/common" - peerpkg "github.com/kaspanet/kaspad/protocol/peer" - "github.com/kaspanet/kaspad/util/daghash" - "github.com/kaspanet/kaspad/wire" - "github.com/pkg/errors" -) - -// RequestSelectedTipContext is the interface for the context needed for the RequestSelectedTip flow. -type RequestSelectedTipContext interface { - DAG() *blockdag.BlockDAG - StartIBDIfRequired() -} - -// RequestSelectedTip waits for selected tip requests and handles them -func RequestSelectedTip(context RequestSelectedTipContext, incomingRoute *router.Route, - outgoingRoute *router.Route, peer *peerpkg.Peer) error { - for { - err := runSelectedTipRequest(context, incomingRoute, outgoingRoute, peer) - if err != nil { - return err - } - } -} - -func runSelectedTipRequest(context RequestSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route, - peer *peerpkg.Peer) error { - - peer.WaitForSelectedTipRequests() - defer peer.FinishRequestingSelectedTip() - - err := requestSelectedTip(outgoingRoute) - if err != nil { - return err - } - - peerSelectedTipHash, err := receiveSelectedTip(incomingRoute) - if err != nil { - return err - } - - peer.SetSelectedTipHash(peerSelectedTipHash) - context.StartIBDIfRequired() - return nil -} - -func requestSelectedTip(outgoingRoute *router.Route) error { - msgGetSelectedTip := wire.NewMsgGetSelectedTip() - return outgoingRoute.Enqueue(msgGetSelectedTip) -} - -func receiveSelectedTip(incomingRoute *router.Route) (selectedTipHash *daghash.Hash, err error) { - message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) - if err != nil { - return nil, err - } - msgSelectedTip := message.(*wire.MsgSelectedTip) - - return msgSelectedTip.SelectedTipHash, nil -} - -// GetSelectedTipContext is the interface for the context needed for the HandleGetSelectedTip flow. -type GetSelectedTipContext interface { - DAG() *blockdag.BlockDAG -} - -// HandleGetSelectedTip handles getSelectedTip messages -func HandleGetSelectedTip(context GetSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { - for { - err := receiveGetSelectedTip(incomingRoute) - if err != nil { - return err - } - - selectedTipHash := context.DAG().SelectedTipHash() - err = sendSelectedTipHash(outgoingRoute, selectedTipHash) - if err != nil { - return err - } - } -} - -func receiveGetSelectedTip(incomingRoute *router.Route) error { - message, err := incomingRoute.Dequeue() - if err != nil { - return err - } - _, ok := message.(*wire.MsgGetSelectedTip) - if !ok { - panic(errors.Errorf("received unexpected message type. "+ - "expected: %s, got: %s", wire.CmdGetSelectedTip, message.Command())) - } - - return nil -} - -func sendSelectedTipHash(outgoingRoute *router.Route, selectedTipHash *daghash.Hash) error { - msgSelectedTip := wire.NewMsgSelectedTip(selectedTipHash) - return outgoingRoute.Enqueue(msgSelectedTip) -} diff --git a/protocol/flows/ibd/selectedtip/handle_get_selected_tip.go b/protocol/flows/ibd/selectedtip/handle_get_selected_tip.go new file mode 100644 index 000000000..3c13044f2 --- /dev/null +++ b/protocol/flows/ibd/selectedtip/handle_get_selected_tip.go @@ -0,0 +1,61 @@ +package selectedtip + +import ( + "github.com/kaspanet/kaspad/blockdag" + "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/wire" + "github.com/pkg/errors" +) + +// GetSelectedTipContext is the interface for the context needed for the HandleGetSelectedTip flow. +type GetSelectedTipContext interface { + DAG() *blockdag.BlockDAG +} + +type handleGetSelectedTipFlow struct { + GetSelectedTipContext + incomingRoute, outgoingRoute *router.Route +} + +// HandleGetSelectedTip handles getSelectedTip messages +func HandleGetSelectedTip(context GetSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { + flow := &handleGetSelectedTipFlow{ + GetSelectedTipContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + } + return flow.start() +} + +func (flow *handleGetSelectedTipFlow) start() error { + for { + err := flow.receiveGetSelectedTip() + if err != nil { + return err + } + + err = flow.sendSelectedTipHash() + if err != nil { + return err + } + } +} + +func (flow *handleGetSelectedTipFlow) receiveGetSelectedTip() error { + message, err := flow.incomingRoute.Dequeue() + if err != nil { + return err + } + _, ok := message.(*wire.MsgGetSelectedTip) + if !ok { + panic(errors.Errorf("received unexpected message type. "+ + "expected: %s, got: %s", wire.CmdGetSelectedTip, message.Command())) + } + + return nil +} + +func (flow *handleGetSelectedTipFlow) sendSelectedTipHash() error { + msgSelectedTip := wire.NewMsgSelectedTip(flow.DAG().SelectedTipHash()) + return flow.outgoingRoute.Enqueue(msgSelectedTip) +} diff --git a/protocol/flows/ibd/selectedtip/request_selected_tip.go b/protocol/flows/ibd/selectedtip/request_selected_tip.go new file mode 100644 index 000000000..d75853894 --- /dev/null +++ b/protocol/flows/ibd/selectedtip/request_selected_tip.go @@ -0,0 +1,79 @@ +package selectedtip + +import ( + "github.com/kaspanet/kaspad/blockdag" + "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/protocol/common" + peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/kaspanet/kaspad/wire" +) + +// RequestSelectedTipContext is the interface for the context needed for the RequestSelectedTip flow. +type RequestSelectedTipContext interface { + DAG() *blockdag.BlockDAG + StartIBDIfRequired() +} + +type requestSelectedTipFlow struct { + RequestSelectedTipContext + incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer +} + +// RequestSelectedTip waits for selected tip requests and handles them +func RequestSelectedTip(context RequestSelectedTipContext, incomingRoute *router.Route, + outgoingRoute *router.Route, peer *peerpkg.Peer) error { + + flow := &requestSelectedTipFlow{ + RequestSelectedTipContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + peer: peer, + } + return flow.start() +} + +func (flow *requestSelectedTipFlow) start() error { + for { + err := flow.runSelectedTipRequest() + if err != nil { + return err + } + } +} + +func (flow *requestSelectedTipFlow) runSelectedTipRequest() error { + + flow.peer.WaitForSelectedTipRequests() + defer flow.peer.FinishRequestingSelectedTip() + + err := flow.requestSelectedTip() + if err != nil { + return err + } + + peerSelectedTipHash, err := flow.receiveSelectedTip() + if err != nil { + return err + } + + flow.peer.SetSelectedTipHash(peerSelectedTipHash) + flow.StartIBDIfRequired() + return nil +} + +func (flow *requestSelectedTipFlow) requestSelectedTip() error { + msgGetSelectedTip := wire.NewMsgGetSelectedTip() + return flow.outgoingRoute.Enqueue(msgGetSelectedTip) +} + +func (flow *requestSelectedTipFlow) receiveSelectedTip() (selectedTipHash *daghash.Hash, err error) { + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + if err != nil { + return nil, err + } + msgSelectedTip := message.(*wire.MsgSelectedTip) + + return msgSelectedTip.SelectedTipHash, nil +} diff --git a/protocol/flows/ping/receive.go b/protocol/flows/ping/receive.go new file mode 100644 index 000000000..63863b3dc --- /dev/null +++ b/protocol/flows/ping/receive.go @@ -0,0 +1,42 @@ +package ping + +import ( + "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/wire" +) + +// ReceivePingsContext is the interface for the context needed for the ReceivePings flow. +type ReceivePingsContext interface { +} + +type receivePingsFlow struct { + ReceivePingsContext + incomingRoute, outgoingRoute *router.Route +} + +// ReceivePings handles all ping messages coming through incomingRoute. +// This function assumes that incomingRoute will only return MsgPing. +func ReceivePings(context ReceivePingsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { + flow := &receivePingsFlow{ + ReceivePingsContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + } + return flow.start() +} + +func (flow *receivePingsFlow) start() error { + for { + message, err := flow.incomingRoute.Dequeue() + if err != nil { + return err + } + pingMessage := message.(*wire.MsgPing) + + pongMessage := wire.NewMsgPong(pingMessage.Nonce) + err = flow.outgoingRoute.Enqueue(pongMessage) + if err != nil { + return err + } + } +} diff --git a/protocol/flows/ping/ping.go b/protocol/flows/ping/send.go similarity index 54% rename from protocol/flows/ping/ping.go rename to protocol/flows/ping/send.go index 8c0daa16c..99462aac8 100644 --- a/protocol/flows/ping/ping.go +++ b/protocol/flows/ping/send.go @@ -1,46 +1,39 @@ package ping import ( - "github.com/kaspanet/kaspad/protocol/common" - "time" - "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/protocol/common" peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/protocol/protocolerrors" "github.com/kaspanet/kaspad/util/random" "github.com/kaspanet/kaspad/wire" + "time" ) -// ReceivePingsContext is the interface for the context needed for the ReceivePings flow. -type ReceivePingsContext interface { -} - -// ReceivePings handles all ping messages coming through incomingRoute. -// This function assumes that incomingRoute will only return MsgPing. -func ReceivePings(_ ReceivePingsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { - for { - message, err := incomingRoute.Dequeue() - if err != nil { - return err - } - pingMessage := message.(*wire.MsgPing) - - pongMessage := wire.NewMsgPong(pingMessage.Nonce) - err = outgoingRoute.Enqueue(pongMessage) - if err != nil { - return err - } - } -} - // SendPingsContext is the interface for the context needed for the SendPings flow. type SendPingsContext interface { } +type sendPingsFlow struct { + SendPingsContext + incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer +} + // SendPings starts sending MsgPings every pingInterval seconds to the // given peer. // This function assumes that incomingRoute will only return MsgPong. -func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { +func SendPings(context SendPingsContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error { + flow := &sendPingsFlow{ + SendPingsContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + peer: peer, + } + return flow.start() +} + +func (flow *sendPingsFlow) start() error { const pingInterval = 2 * time.Minute ticker := time.NewTicker(pingInterval) defer ticker.Stop() @@ -50,15 +43,15 @@ func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *r if err != nil { return err } - peer.SetPingPending(nonce) + flow.peer.SetPingPending(nonce) pingMessage := wire.NewMsgPing(nonce) - err = outgoingRoute.Enqueue(pingMessage) + err = flow.outgoingRoute.Enqueue(pingMessage) if err != nil { return err } - message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return err } @@ -66,7 +59,7 @@ func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *r if pongMessage.Nonce != pingMessage.Nonce { return protocolerrors.New(true, "nonce mismatch between ping and pong") } - peer.SetPingIdle() + flow.peer.SetPingIdle() } return nil } diff --git a/protocol/flows/relaytransactions/relaytransactions.go b/protocol/flows/relaytransactions/relaytransactions.go index 7833539be..5c9411073 100644 --- a/protocol/flows/relaytransactions/relaytransactions.go +++ b/protocol/flows/relaytransactions/relaytransactions.go @@ -22,38 +22,52 @@ type RelayedTransactionsContext interface { Broadcast(message wire.Message) error } +type handleRelayedTransactionsFlow struct { + RelayedTransactionsContext + incomingRoute, outgoingRoute *router.Route + invsQueue []*wire.MsgInvTransaction +} + // HandleRelayedTransactions listens to wire.MsgInvTransaction messages, requests their corresponding transactions if they // are missing, adds them to the mempool and propagates them to the rest of the network. func HandleRelayedTransactions(context RelayedTransactionsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { + flow := &handleRelayedTransactionsFlow{ + RelayedTransactionsContext: context, + incomingRoute: incomingRoute, + outgoingRoute: outgoingRoute, + invsQueue: make([]*wire.MsgInvTransaction, 0), + } + return flow.start() +} - invsQueue := make([]*wire.MsgInvTransaction, 0) +func (flow *handleRelayedTransactionsFlow) start() error { for { - inv, err := readInv(incomingRoute, &invsQueue) + inv, err := flow.readInv() if err != nil { return err } - requestedIDs, err := requestInvTransactions(context, outgoingRoute, inv) + requestedIDs, err := flow.requestInvTransactions(inv) if err != nil { return err } - err = receiveTransactions(context, requestedIDs, incomingRoute, &invsQueue) + err = flow.receiveTransactions(requestedIDs) if err != nil { return err } } } -func requestInvTransactions(context RelayedTransactionsContext, outgoingRoute *router.Route, +func (flow *handleRelayedTransactionsFlow) requestInvTransactions( inv *wire.MsgInvTransaction) (requestedIDs []*daghash.TxID, err error) { idsToRequest := make([]*daghash.TxID, 0, len(inv.TxIDS)) for _, txID := range inv.TxIDS { - if isKnownTransaction(context, txID) { + if flow.isKnownTransaction(txID) { continue } - exists := context.SharedRequestedTransactions().addIfNotExists(txID) + exists := flow.SharedRequestedTransactions().addIfNotExists(txID) if exists { continue } @@ -65,18 +79,18 @@ func requestInvTransactions(context RelayedTransactionsContext, outgoingRoute *r } msgGetTransactions := wire.NewMsgGetTransactions(idsToRequest) - err = outgoingRoute.Enqueue(msgGetTransactions) + err = flow.outgoingRoute.Enqueue(msgGetTransactions) if err != nil { - context.SharedRequestedTransactions().removeMany(idsToRequest) + flow.SharedRequestedTransactions().removeMany(idsToRequest) return nil, err } return idsToRequest, nil } -func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID) bool { +func (flow *handleRelayedTransactionsFlow) isKnownTransaction(txID *daghash.TxID) bool { // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). - if context.TxPool().HaveTransaction(txID) { + if flow.TxPool().HaveTransaction(txID) { return true } @@ -91,7 +105,7 @@ func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID) prevOut := wire.Outpoint{TxID: *txID} for i := uint32(0); i < 2; i++ { prevOut.Index = i - _, ok := context.DAG().GetUTXOEntry(prevOut) + _, ok := flow.DAG().GetUTXOEntry(prevOut) if ok { return true } @@ -99,15 +113,15 @@ func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID) return false } -func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction) (*wire.MsgInvTransaction, error) { +func (flow *handleRelayedTransactionsFlow) readInv() (*wire.MsgInvTransaction, error) { - if len(*invsQueue) > 0 { + if len(flow.invsQueue) > 0 { var inv *wire.MsgInvTransaction - inv, *invsQueue = (*invsQueue)[0], (*invsQueue)[1:] + inv, flow.invsQueue = flow.invsQueue[0], flow.invsQueue[1:] return inv, nil } - msg, err := incomingRoute.Dequeue() + msg, err := flow.incomingRoute.Dequeue() if err != nil { return nil, err } @@ -120,7 +134,7 @@ func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction) return inv, nil } -func broadcastAcceptedTransactions(context RelayedTransactionsContext, acceptedTxs []*mempool.TxDesc) error { +func (flow *handleRelayedTransactionsFlow) broadcastAcceptedTransactions(acceptedTxs []*mempool.TxDesc) error { // TODO(libp2p) Add mechanism to avoid sending to other peers invs that are known to them (e.g. mruinvmap) // TODO(libp2p) Consider broadcasting in bulks idsToBroadcast := make([]*daghash.TxID, len(acceptedTxs)) @@ -128,24 +142,24 @@ func broadcastAcceptedTransactions(context RelayedTransactionsContext, acceptedT idsToBroadcast[i] = tx.Tx.ID() } inv := wire.NewMsgTxInv(idsToBroadcast) - return context.Broadcast(inv) + return flow.Broadcast(inv) } // readMsgTx returns the next msgTx in incomingRoute, and populates invsQueue with any inv messages that meanwhile arrive. // // Note: this function assumes msgChan can contain only wire.MsgInvTransaction and wire.MsgBlock messages. -func readMsgTx(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction) ( +func (flow *handleRelayedTransactionsFlow) readMsgTx() ( msgTx *wire.MsgTx, err error) { for { - message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout) if err != nil { return nil, err } switch message := message.(type) { case *wire.MsgInvTransaction: - *invsQueue = append(*invsQueue, message) + flow.invsQueue = append(flow.invsQueue, message) case *wire.MsgTx: return message, nil default: @@ -154,14 +168,13 @@ func readMsgTx(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction } } -func receiveTransactions(context RelayedTransactionsContext, requestedTransactions []*daghash.TxID, incomingRoute *router.Route, - invsQueue *[]*wire.MsgInvTransaction) error { +func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransactions []*daghash.TxID) error { // In case the function returns earlier than expected, we want to make sure sharedRequestedTransactions is // clean from any pending transactions. - defer context.SharedRequestedTransactions().removeMany(requestedTransactions) + defer flow.SharedRequestedTransactions().removeMany(requestedTransactions) for _, expectedID := range requestedTransactions { - msgTx, err := readMsgTx(incomingRoute, invsQueue) + msgTx, err := flow.readMsgTx() if err != nil { return err } @@ -170,7 +183,7 @@ func receiveTransactions(context RelayedTransactionsContext, requestedTransactio return protocolerrors.Errorf(true, "expected transaction %s", expectedID) } - acceptedTxs, err := context.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag + acceptedTxs, err := flow.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -196,7 +209,7 @@ func receiveTransactions(context RelayedTransactionsContext, requestedTransactio return protocolerrors.Errorf(true, "rejected transaction %s", tx.ID()) } - err = broadcastAcceptedTransactions(context, acceptedTxs) + err = flow.broadcastAcceptedTransactions(acceptedTxs) if err != nil { panic(err) } diff --git a/protocol/protocol.go b/protocol/protocol.go index cde666c8a..27399ed1f 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -3,6 +3,7 @@ package protocol import ( "fmt" "github.com/kaspanet/kaspad/netadapter" + "github.com/kaspanet/kaspad/protocol/flows/ibd/selectedtip" "sync/atomic" "github.com/kaspanet/kaspad/protocol/flows/handshake" @@ -139,13 +140,13 @@ func (m *Manager) addIBDFlows(router *routerpkg.Router, stopped *uint32, stop ch addFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ibd.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer) + return selectedtip.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer) }, ) addFlow("HandleGetSelectedTip", router, []wire.MessageCommand{wire.CmdGetSelectedTip}, stopped, stop, func(incomingRoute *routerpkg.Route) error { - return ibd.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute) + return selectedtip.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute) }, )