[NOD-1176] Implement a struct for each flow to share flow data (#811)

* [NOD-1176] Implement a struct for each flow to share flow data

* [NOD-1178] Add empty contexts to flow structs for consistency
This commit is contained in:
Ori Newman 2020-07-22 15:12:54 +03:00 committed by GitHub
parent 63646c8c92
commit 83a3c30d01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 432 additions and 262 deletions

View File

@ -26,28 +26,45 @@ type RelayInvsContext interface {
Broadcast(message wire.Message) error
}
type handleRelayInvsFlow struct {
RelayInvsContext
incomingRoute, outgoingRoute *router.Route
peer *peerpkg.Peer
invsQueue []*wire.MsgInvRelayBlock
}
// HandleRelayInvs listens to wire.MsgInvRelayBlock messages, requests their corresponding blocks if they
// are missing, adds them to the DAG and propagates them to the rest of the network.
func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) error {
invsQueue := make([]*wire.MsgInvRelayBlock, 0)
flow := &handleRelayInvsFlow{
RelayInvsContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
peer: peer,
invsQueue: make([]*wire.MsgInvRelayBlock, 0),
}
return flow.start()
}
func (flow *handleRelayInvsFlow) start() error {
for {
inv, err := readInv(incomingRoute, &invsQueue)
inv, err := flow.readInv()
if err != nil {
return err
}
if context.DAG().IsKnownBlock(inv.Hash) {
if context.DAG().IsKnownInvalid(inv.Hash) {
if flow.DAG().IsKnownBlock(inv.Hash) {
if flow.DAG().IsKnownInvalid(inv.Hash) {
return protocolerrors.Errorf(true, "sent inv of an invalid block %s",
inv.Hash)
}
continue
}
context.StartIBDIfRequired()
if context.IsInIBD() {
flow.StartIBDIfRequired()
if flow.IsInIBD() {
// Block relay is disabled during IBD
continue
}
@ -56,7 +73,7 @@ func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outg
requestQueue.enqueueIfNotExists(inv.Hash)
for requestQueue.len() > 0 {
err := requestBlocks(context, outgoingRoute, peer, incomingRoute, &invsQueue, requestQueue)
err := flow.requestBlocks(requestQueue)
if err != nil {
return err
}
@ -64,30 +81,28 @@ func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outg
}
}
func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlock) (*wire.MsgInvRelayBlock, error) {
func (flow *handleRelayInvsFlow) readInv() (*wire.MsgInvRelayBlock, error) {
if len(*invsQueue) > 0 {
if len(flow.invsQueue) > 0 {
var inv *wire.MsgInvRelayBlock
inv, *invsQueue = (*invsQueue)[0], (*invsQueue)[1:]
inv, flow.invsQueue = flow.invsQueue[0], flow.invsQueue[1:]
return inv, nil
}
msg, err := incomingRoute.Dequeue()
msg, err := flow.incomingRoute.Dequeue()
if err != nil {
return nil, err
}
inv, ok := msg.(*wire.MsgInvRelayBlock)
if !ok {
return nil, protocolerrors.Errorf(true, "unexpected %s message in the block relay flow while "+
return nil, protocolerrors.Errorf(true, "unexpected %s message in the block relay handleRelayInvsFlow while "+
"expecting an inv message", msg.Command())
}
return inv, nil
}
func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route,
peer *peerpkg.Peer, incomingRoute *router.Route,
invsQueue *[]*wire.MsgInvRelayBlock, requestQueue *hashesQueueSet) error {
func (flow *handleRelayInvsFlow) requestBlocks(requestQueue *hashesQueueSet) error {
numHashesToRequest := mathUtil.MinInt(wire.MsgGetRelayBlocksHashes, requestQueue.len())
hashesToRequest := requestQueue.dequeue(numHashesToRequest)
@ -95,7 +110,7 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route,
pendingBlocks := map[daghash.Hash]struct{}{}
var filteredHashesToRequest []*daghash.Hash
for _, hash := range hashesToRequest {
exists := context.SharedRequestedBlocks().addIfNotExists(hash)
exists := flow.SharedRequestedBlocks().addIfNotExists(hash)
if !exists {
continue
}
@ -106,16 +121,16 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route,
// In case the function returns earlier than expected, we want to make sure requestedBlocks is
// clean from any pending blocks.
defer context.SharedRequestedBlocks().removeSet(pendingBlocks)
defer flow.SharedRequestedBlocks().removeSet(pendingBlocks)
getRelayBlocksMsg := wire.NewMsgGetRelayBlocks(filteredHashesToRequest)
err := outgoingRoute.Enqueue(getRelayBlocksMsg)
err := flow.outgoingRoute.Enqueue(getRelayBlocksMsg)
if err != nil {
return err
}
for len(pendingBlocks) > 0 {
msgBlock, err := readMsgBlock(incomingRoute, invsQueue)
msgBlock, err := flow.readMsgBlock()
if err != nil {
return err
}
@ -126,9 +141,9 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route,
return protocolerrors.Errorf(true, "got unrequested block %s", block.Hash())
}
delete(pendingBlocks, *blockHash)
context.SharedRequestedBlocks().remove(blockHash)
flow.SharedRequestedBlocks().remove(blockHash)
err = processAndRelayBlock(context, peer, requestQueue, block)
err = flow.processAndRelayBlock(requestQueue, block)
if err != nil {
return err
}
@ -139,18 +154,18 @@ func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route,
// readMsgBlock returns the next msgBlock in msgChan, and populates invsQueue with any inv messages that meanwhile arrive.
//
// Note: this function assumes msgChan can contain only wire.MsgInvRelayBlock and wire.MsgBlock messages.
func readMsgBlock(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlock) (
func (flow *handleRelayInvsFlow) readMsgBlock() (
msgBlock *wire.MsgBlock, err error) {
for {
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}
switch message := message.(type) {
case *wire.MsgInvRelayBlock:
*invsQueue = append(*invsQueue, message)
flow.invsQueue = append(flow.invsQueue, message)
case *wire.MsgBlock:
return message, nil
default:
@ -159,11 +174,10 @@ func readMsgBlock(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlo
}
}
func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer,
requestQueue *hashesQueueSet, block *util.Block) error {
func (flow *handleRelayInvsFlow) processAndRelayBlock(requestQueue *hashesQueueSet, block *util.Block) error {
blockHash := block.Hash()
isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone)
isOrphan, isDelayed, err := flow.DAG().ProcessBlock(block, blockdag.BFNone)
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
@ -173,7 +187,7 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer,
blockHash))
}
log.Infof("Rejected block %s from %s: %s", blockHash,
peer, err)
flow.peer, err)
return protocolerrors.Wrap(true, err, "got invalid block")
}
@ -190,7 +204,7 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer,
}
const maxOrphanBlueScoreDiff = 10000
selectedTipBlueScore := context.DAG().SelectedTipBlueScore()
selectedTipBlueScore := flow.DAG().SelectedTipBlueScore()
if blueScore > selectedTipBlueScore+maxOrphanBlueScoreDiff {
log.Infof("Orphan block %s has blue score %d and the selected tip blue score is "+
"%d. Ignoring orphans with a blue score difference from the selected tip greater than %d",
@ -199,7 +213,7 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer,
}
// Request the parents for the orphan block from the peer that sent it.
missingAncestors := context.DAG().GetOrphanMissingAncestorHashes(blockHash)
missingAncestors := flow.DAG().GetOrphanMissingAncestorHashes(blockHash)
for _, missingAncestor := range missingAncestors {
requestQueue.enqueueIfNotExists(missingAncestor)
}
@ -215,13 +229,13 @@ func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer,
// sm.restartSyncIfNeeded()
//// Clear the rejected transactions.
//sm.rejectedTxns = make(map[daghash.TxID]struct{})
err = context.Broadcast(wire.NewMsgInvBlock(blockHash))
err = flow.Broadcast(wire.NewMsgInvBlock(blockHash))
if err != nil {
return err
}
context.StartIBDIfRequired()
err = context.OnNewBlock(block)
flow.StartIBDIfRequired()
err = flow.OnNewBlock(block)
if err != nil {
panic(err)
}

View File

@ -19,12 +19,29 @@ var (
minAcceptableProtocolVersion = wire.ProtocolVersion
)
type receiveVersionFlow struct {
HandleHandshakeContext
incomingRoute, outgoingRoute *router.Route
peer *peerpkg.Peer
}
// ReceiveVersion waits for the peer to send a version message, sends a
// verack in response, and updates its info accordingly.
func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) (*wire.NetAddress, error) {
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
flow := &receiveVersionFlow{
HandleHandshakeContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
peer: peer,
}
return flow.start()
}
func (flow *receiveVersionFlow) start() (*wire.NetAddress, error) {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}
@ -34,7 +51,7 @@ func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route,
return nil, protocolerrors.New(true, "a version message must precede all others")
}
if !allowSelfConnections && context.NetAdapter().ID().IsEqual(msgVersion.ID) {
if !allowSelfConnections && flow.NetAdapter().ID().IsEqual(msgVersion.ID) {
return nil, protocolerrors.New(true, "connected to self")
}
@ -51,7 +68,7 @@ func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route,
}
// Disconnect from partial nodes in networks that don't allow them
if !context.DAG().Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil {
if !flow.DAG().Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil {
return nil, protocolerrors.New(true, "partial nodes are not allowed")
}
@ -68,11 +85,10 @@ func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route,
// return nil, false, errors.New("incompatible subnetworks")
//}
peer.UpdateFieldsFromMsgVersion(msgVersion)
err = outgoingRoute.Enqueue(wire.NewMsgVerAck())
flow.peer.UpdateFieldsFromMsgVersion(msgVersion)
err = flow.outgoingRoute.Enqueue(wire.NewMsgVerAck())
if err != nil {
return nil, err
}
// TODO(libp2p) Register peer ID
return msgVersion.Address, nil
}

View File

@ -25,19 +25,32 @@ var (
defaultRequiredServices = wire.SFNodeNetwork
)
type sendVersionFlow struct {
HandleHandshakeContext
incomingRoute, outgoingRoute *router.Route
}
// SendVersion sends a version to a peer and waits for verack.
func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
flow := &sendVersionFlow{
HandleHandshakeContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
}
return flow.start()
}
selectedTipHash := context.DAG().SelectedTipHash()
subnetworkID := context.Config().SubnetworkID
func (flow *sendVersionFlow) start() error {
selectedTipHash := flow.DAG().SelectedTipHash()
subnetworkID := flow.Config().SubnetworkID
// Version message.
localAddress, err := context.NetAdapter().GetBestLocalAddress()
localAddress, err := flow.NetAdapter().GetBestLocalAddress()
if err != nil {
panic(err)
}
msg := wire.NewMsgVersion(localAddress, context.NetAdapter().ID(), selectedTipHash, subnetworkID)
msg.AddUserAgent(userAgentName, userAgentVersion, context.Config().UserAgentComments...)
msg := wire.NewMsgVersion(localAddress, flow.NetAdapter().ID(), selectedTipHash, subnetworkID)
msg.AddUserAgent(userAgentName, userAgentVersion, flow.Config().UserAgentComments...)
// Advertise the services flag
msg.Services = defaultServices
@ -46,15 +59,15 @@ func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, ou
msg.ProtocolVersion = wire.ProtocolVersion
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = context.Config().BlocksOnly
msg.DisableRelayTx = flow.Config().BlocksOnly
err = outgoingRoute.Enqueue(msg)
err = flow.outgoingRoute.Enqueue(msg)
if err != nil {
return err
}
// Wait for verack
_, err = incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
_, err = flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return err
}

View File

@ -13,33 +13,47 @@ type GetBlockLocatorContext interface {
DAG() *blockdag.BlockDAG
}
type handleGetBlockLocatorFlow struct {
GetBlockLocatorContext
incomingRoute, outgoingRoute *router.Route
}
// HandleGetBlockLocator handles getBlockLocator messages
func HandleGetBlockLocator(context GetBlockLocatorContext, incomingRoute *router.Route,
outgoingRoute *router.Route) error {
flow := &handleGetBlockLocatorFlow{
GetBlockLocatorContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
}
return flow.start()
}
func (flow *handleGetBlockLocatorFlow) start() error {
for {
lowHash, highHash, err := receiveGetBlockLocator(incomingRoute)
lowHash, highHash, err := flow.receiveGetBlockLocator()
if err != nil {
return err
}
locator, err := context.DAG().BlockLocatorFromHashes(highHash, lowHash)
locator, err := flow.DAG().BlockLocatorFromHashes(highHash, lowHash)
if err != nil || len(locator) == 0 {
return protocolerrors.Errorf(true, "couldn't build a block "+
"locator between blocks %s and %s", lowHash, highHash)
}
err = sendBlockLocator(outgoingRoute, locator)
err = flow.sendBlockLocator(locator)
if err != nil {
return err
}
}
}
func receiveGetBlockLocator(incomingRoute *router.Route) (lowHash *daghash.Hash,
func (flow *handleGetBlockLocatorFlow) receiveGetBlockLocator() (lowHash *daghash.Hash,
highHash *daghash.Hash, err error) {
message, err := incomingRoute.Dequeue()
message, err := flow.incomingRoute.Dequeue()
if err != nil {
return nil, nil, err
}
@ -48,9 +62,9 @@ func receiveGetBlockLocator(incomingRoute *router.Route) (lowHash *daghash.Hash,
return msgGetBlockLocator.LowHash, msgGetBlockLocator.HighHash, nil
}
func sendBlockLocator(outgoingRoute *router.Route, locator blockdag.BlockLocator) error {
func (flow *handleGetBlockLocatorFlow) sendBlockLocator(locator blockdag.BlockLocator) error {
msgBlockLocator := wire.NewMsgBlockLocator(locator)
err := outgoingRoute.Enqueue(msgBlockLocator)
err := flow.outgoingRoute.Enqueue(msgBlockLocator)
if err != nil {
return err
}

View File

@ -12,20 +12,34 @@ type GetBlocksContext interface {
DAG() *blockdag.BlockDAG
}
type handleGetBlocksFlow struct {
GetBlocksContext
incomingRoute, outgoingRoute *router.Route
}
// HandleGetBlocks handles getBlocks messages
func HandleGetBlocks(context GetBlocksContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
flow := &handleGetBlocksFlow{
GetBlocksContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
}
return flow.start()
}
func (flow *handleGetBlocksFlow) start() error {
for {
lowHash, highHash, err := receiveGetBlocks(incomingRoute)
lowHash, highHash, err := receiveGetBlocks(flow.incomingRoute)
if err != nil {
return err
}
msgIBDBlocks, err := buildMsgIBDBlocks(context, lowHash, highHash)
msgIBDBlocks, err := flow.buildMsgIBDBlocks(lowHash, highHash)
if err != nil {
return err
}
err = sendMsgIBDBlocks(outgoingRoute, msgIBDBlocks)
err = flow.sendMsgIBDBlocks(msgIBDBlocks)
if err != nil {
return nil
}
@ -44,18 +58,18 @@ func receiveGetBlocks(incomingRoute *router.Route) (lowHash *daghash.Hash,
return msgGetBlocks.LowHash, msgGetBlocks.HighHash, nil
}
func buildMsgIBDBlocks(context GetBlocksContext, lowHash *daghash.Hash,
func (flow *handleGetBlocksFlow) buildMsgIBDBlocks(lowHash *daghash.Hash,
highHash *daghash.Hash) ([]*wire.MsgIBDBlock, error) {
const maxHashesInMsgIBDBlocks = wire.MaxInvPerMsg
blockHashes, err := context.DAG().AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks)
blockHashes, err := flow.DAG().AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks)
if err != nil {
return nil, err
}
msgIBDBlocks := make([]*wire.MsgIBDBlock, len(blockHashes))
for i, blockHash := range blockHashes {
block, err := context.DAG().BlockByHash(blockHash)
block, err := flow.DAG().BlockByHash(blockHash)
if err != nil {
return nil, err
}
@ -65,9 +79,9 @@ func buildMsgIBDBlocks(context GetBlocksContext, lowHash *daghash.Hash,
return msgIBDBlocks, nil
}
func sendMsgIBDBlocks(outgoingRoute *router.Route, msgIBDBlocks []*wire.MsgIBDBlock) error {
func (flow *handleGetBlocksFlow) sendMsgIBDBlocks(msgIBDBlocks []*wire.MsgIBDBlock) error {
for _, msgIBDBlock := range msgIBDBlocks {
err := outgoingRoute.Enqueue(msgIBDBlock)
err := flow.outgoingRoute.Enqueue(msgIBDBlock)
if err != nil {
return err
}

View File

@ -19,49 +19,65 @@ type HandleIBDContext interface {
FinishIBD()
}
// HandleIBD waits for IBD start and handles it when IBD is triggered for this peer
func HandleIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
type handleIBDFlow struct {
HandleIBDContext
incomingRoute, outgoingRoute *router.Route
peer *peerpkg.Peer
}
// HandleIBD waits for IBD start and handles it when IBD is triggered for this peer
func HandleIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) error {
flow := &handleIBDFlow{
HandleIBDContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
peer: peer,
}
return flow.start()
}
func (flow *handleIBDFlow) start() error {
for {
err := runIBD(context, incomingRoute, outgoingRoute, peer)
err := flow.runIBD()
if err != nil {
return err
}
}
}
func runIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
func (flow *handleIBDFlow) runIBD() error {
flow.peer.WaitForIBDStart()
defer flow.FinishIBD()
peer.WaitForIBDStart()
defer context.FinishIBD()
peerSelectedTipHash := peer.SelectedTipHash()
highestSharedBlockHash, err := findHighestSharedBlockHash(context, incomingRoute, outgoingRoute, peerSelectedTipHash)
peerSelectedTipHash := flow.peer.SelectedTipHash()
highestSharedBlockHash, err := flow.findHighestSharedBlockHash(peerSelectedTipHash)
if err != nil {
return err
}
if context.DAG().IsKnownFinalizedBlock(highestSharedBlockHash) {
if flow.DAG().IsKnownFinalizedBlock(highestSharedBlockHash) {
return protocolerrors.Errorf(false, "cannot initiate "+
"IBD with peer %s because the highest shared chain block (%s) is "+
"below the finality point", peer, highestSharedBlockHash)
"below the finality point", flow.peer, highestSharedBlockHash)
}
return downloadBlocks(context, incomingRoute, outgoingRoute, highestSharedBlockHash, peerSelectedTipHash)
return flow.downloadBlocks(highestSharedBlockHash, peerSelectedTipHash)
}
func findHighestSharedBlockHash(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peerSelectedTipHash *daghash.Hash) (lowHash *daghash.Hash, err error) {
func (flow *handleIBDFlow) findHighestSharedBlockHash(peerSelectedTipHash *daghash.Hash) (lowHash *daghash.Hash,
err error) {
lowHash = context.DAG().Params.GenesisHash
lowHash = flow.DAG().Params.GenesisHash
highHash := peerSelectedTipHash
for {
err := sendGetBlockLocator(outgoingRoute, lowHash, highHash)
err := flow.sendGetBlockLocator(lowHash, highHash)
if err != nil {
return nil, err
}
blockLocatorHashes, err := receiveBlockLocator(incomingRoute)
blockLocatorHashes, err := flow.receiveBlockLocator()
if err != nil {
return nil, err
}
@ -70,23 +86,22 @@ func findHighestSharedBlockHash(context HandleIBDContext, incomingRoute *router.
// If it is, return it. If it isn't, we need to narrow our
// getBlockLocator request and try again.
locatorHighHash := blockLocatorHashes[0]
if context.DAG().IsInDAG(locatorHighHash) {
if flow.DAG().IsInDAG(locatorHighHash) {
return locatorHighHash, nil
}
highHash, lowHash = context.DAG().FindNextLocatorBoundaries(blockLocatorHashes)
highHash, lowHash = flow.DAG().FindNextLocatorBoundaries(blockLocatorHashes)
}
}
func sendGetBlockLocator(outgoingRoute *router.Route, lowHash *daghash.Hash,
highHash *daghash.Hash) error {
func (flow *handleIBDFlow) sendGetBlockLocator(lowHash *daghash.Hash, highHash *daghash.Hash) error {
msgGetBlockLocator := wire.NewMsgGetBlockLocator(highHash, lowHash)
return outgoingRoute.Enqueue(msgGetBlockLocator)
return flow.outgoingRoute.Enqueue(msgGetBlockLocator)
}
func receiveBlockLocator(incomingRoute *router.Route) (blockLocatorHashes []*daghash.Hash, err error) {
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
func (flow *handleIBDFlow) receiveBlockLocator() (blockLocatorHashes []*daghash.Hash, err error) {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}
@ -99,21 +114,20 @@ func receiveBlockLocator(incomingRoute *router.Route) (blockLocatorHashes []*dag
return msgBlockLocator.BlockLocatorHashes, nil
}
func downloadBlocks(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route,
highestSharedBlockHash *daghash.Hash,
func (flow *handleIBDFlow) downloadBlocks(highestSharedBlockHash *daghash.Hash,
peerSelectedTipHash *daghash.Hash) error {
err := sendGetBlocks(outgoingRoute, highestSharedBlockHash, peerSelectedTipHash)
err := flow.sendGetBlocks(highestSharedBlockHash, peerSelectedTipHash)
if err != nil {
return err
}
for {
msgIBDBlock, err := receiveIBDBlock(incomingRoute)
msgIBDBlock, err := flow.receiveIBDBlock()
if err != nil {
return err
}
err = processIBDBlock(context, msgIBDBlock)
err = flow.processIBDBlock(msgIBDBlock)
if err != nil {
return err
}
@ -123,15 +137,15 @@ func downloadBlocks(context HandleIBDContext, incomingRoute *router.Route, outgo
}
}
func sendGetBlocks(outgoingRoute *router.Route, highestSharedBlockHash *daghash.Hash,
func (flow *handleIBDFlow) sendGetBlocks(highestSharedBlockHash *daghash.Hash,
peerSelectedTipHash *daghash.Hash) error {
msgGetBlockInvs := wire.NewMsgGetBlocks(highestSharedBlockHash, peerSelectedTipHash)
return outgoingRoute.Enqueue(msgGetBlockInvs)
return flow.outgoingRoute.Enqueue(msgGetBlockInvs)
}
func receiveIBDBlock(incomingRoute *router.Route) (msgIBDBlock *wire.MsgIBDBlock, err error) {
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
func (flow *handleIBDFlow) receiveIBDBlock() (msgIBDBlock *wire.MsgIBDBlock, err error) {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}
@ -144,13 +158,13 @@ func receiveIBDBlock(incomingRoute *router.Route) (msgIBDBlock *wire.MsgIBDBlock
return msgIBDBlock, nil
}
func processIBDBlock(context HandleIBDContext, msgIBDBlock *wire.MsgIBDBlock) error {
func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *wire.MsgIBDBlock) error {
block := util.NewBlock(&msgIBDBlock.MsgBlock)
if context.DAG().IsInDAG(block.Hash()) {
if flow.DAG().IsInDAG(block.Hash()) {
return nil
}
isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone)
isOrphan, isDelayed, err := flow.DAG().ProcessBlock(block, blockdag.BFNone)
if err != nil {
return err
}
@ -162,7 +176,7 @@ func processIBDBlock(context HandleIBDContext, msgIBDBlock *wire.MsgIBDBlock) er
return protocolerrors.Errorf(false, "received delayed block %s "+
"during IBD", block.Hash())
}
err = context.OnNewBlock(block)
err = flow.OnNewBlock(block)
if err != nil {
panic(err)
}

View File

@ -1,104 +0,0 @@
package ibd
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/common"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
// RequestSelectedTipContext is the interface for the context needed for the RequestSelectedTip flow.
type RequestSelectedTipContext interface {
DAG() *blockdag.BlockDAG
StartIBDIfRequired()
}
// RequestSelectedTip waits for selected tip requests and handles them
func RequestSelectedTip(context RequestSelectedTipContext, incomingRoute *router.Route,
outgoingRoute *router.Route, peer *peerpkg.Peer) error {
for {
err := runSelectedTipRequest(context, incomingRoute, outgoingRoute, peer)
if err != nil {
return err
}
}
}
func runSelectedTipRequest(context RequestSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) error {
peer.WaitForSelectedTipRequests()
defer peer.FinishRequestingSelectedTip()
err := requestSelectedTip(outgoingRoute)
if err != nil {
return err
}
peerSelectedTipHash, err := receiveSelectedTip(incomingRoute)
if err != nil {
return err
}
peer.SetSelectedTipHash(peerSelectedTipHash)
context.StartIBDIfRequired()
return nil
}
func requestSelectedTip(outgoingRoute *router.Route) error {
msgGetSelectedTip := wire.NewMsgGetSelectedTip()
return outgoingRoute.Enqueue(msgGetSelectedTip)
}
func receiveSelectedTip(incomingRoute *router.Route) (selectedTipHash *daghash.Hash, err error) {
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}
msgSelectedTip := message.(*wire.MsgSelectedTip)
return msgSelectedTip.SelectedTipHash, nil
}
// GetSelectedTipContext is the interface for the context needed for the HandleGetSelectedTip flow.
type GetSelectedTipContext interface {
DAG() *blockdag.BlockDAG
}
// HandleGetSelectedTip handles getSelectedTip messages
func HandleGetSelectedTip(context GetSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
for {
err := receiveGetSelectedTip(incomingRoute)
if err != nil {
return err
}
selectedTipHash := context.DAG().SelectedTipHash()
err = sendSelectedTipHash(outgoingRoute, selectedTipHash)
if err != nil {
return err
}
}
}
func receiveGetSelectedTip(incomingRoute *router.Route) error {
message, err := incomingRoute.Dequeue()
if err != nil {
return err
}
_, ok := message.(*wire.MsgGetSelectedTip)
if !ok {
panic(errors.Errorf("received unexpected message type. "+
"expected: %s, got: %s", wire.CmdGetSelectedTip, message.Command()))
}
return nil
}
func sendSelectedTipHash(outgoingRoute *router.Route, selectedTipHash *daghash.Hash) error {
msgSelectedTip := wire.NewMsgSelectedTip(selectedTipHash)
return outgoingRoute.Enqueue(msgSelectedTip)
}

View File

@ -0,0 +1,61 @@
package selectedtip
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
// GetSelectedTipContext is the interface for the context needed for the HandleGetSelectedTip flow.
type GetSelectedTipContext interface {
DAG() *blockdag.BlockDAG
}
type handleGetSelectedTipFlow struct {
GetSelectedTipContext
incomingRoute, outgoingRoute *router.Route
}
// HandleGetSelectedTip handles getSelectedTip messages
func HandleGetSelectedTip(context GetSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
flow := &handleGetSelectedTipFlow{
GetSelectedTipContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
}
return flow.start()
}
func (flow *handleGetSelectedTipFlow) start() error {
for {
err := flow.receiveGetSelectedTip()
if err != nil {
return err
}
err = flow.sendSelectedTipHash()
if err != nil {
return err
}
}
}
func (flow *handleGetSelectedTipFlow) receiveGetSelectedTip() error {
message, err := flow.incomingRoute.Dequeue()
if err != nil {
return err
}
_, ok := message.(*wire.MsgGetSelectedTip)
if !ok {
panic(errors.Errorf("received unexpected message type. "+
"expected: %s, got: %s", wire.CmdGetSelectedTip, message.Command()))
}
return nil
}
func (flow *handleGetSelectedTipFlow) sendSelectedTipHash() error {
msgSelectedTip := wire.NewMsgSelectedTip(flow.DAG().SelectedTipHash())
return flow.outgoingRoute.Enqueue(msgSelectedTip)
}

View File

@ -0,0 +1,79 @@
package selectedtip
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/common"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
)
// RequestSelectedTipContext is the interface for the context needed for the RequestSelectedTip flow.
type RequestSelectedTipContext interface {
DAG() *blockdag.BlockDAG
StartIBDIfRequired()
}
type requestSelectedTipFlow struct {
RequestSelectedTipContext
incomingRoute, outgoingRoute *router.Route
peer *peerpkg.Peer
}
// RequestSelectedTip waits for selected tip requests and handles them
func RequestSelectedTip(context RequestSelectedTipContext, incomingRoute *router.Route,
outgoingRoute *router.Route, peer *peerpkg.Peer) error {
flow := &requestSelectedTipFlow{
RequestSelectedTipContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
peer: peer,
}
return flow.start()
}
func (flow *requestSelectedTipFlow) start() error {
for {
err := flow.runSelectedTipRequest()
if err != nil {
return err
}
}
}
func (flow *requestSelectedTipFlow) runSelectedTipRequest() error {
flow.peer.WaitForSelectedTipRequests()
defer flow.peer.FinishRequestingSelectedTip()
err := flow.requestSelectedTip()
if err != nil {
return err
}
peerSelectedTipHash, err := flow.receiveSelectedTip()
if err != nil {
return err
}
flow.peer.SetSelectedTipHash(peerSelectedTipHash)
flow.StartIBDIfRequired()
return nil
}
func (flow *requestSelectedTipFlow) requestSelectedTip() error {
msgGetSelectedTip := wire.NewMsgGetSelectedTip()
return flow.outgoingRoute.Enqueue(msgGetSelectedTip)
}
func (flow *requestSelectedTipFlow) receiveSelectedTip() (selectedTipHash *daghash.Hash, err error) {
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}
msgSelectedTip := message.(*wire.MsgSelectedTip)
return msgSelectedTip.SelectedTipHash, nil
}

View File

@ -0,0 +1,42 @@
package ping
import (
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/wire"
)
// ReceivePingsContext is the interface for the context needed for the ReceivePings flow.
type ReceivePingsContext interface {
}
type receivePingsFlow struct {
ReceivePingsContext
incomingRoute, outgoingRoute *router.Route
}
// ReceivePings handles all ping messages coming through incomingRoute.
// This function assumes that incomingRoute will only return MsgPing.
func ReceivePings(context ReceivePingsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
flow := &receivePingsFlow{
ReceivePingsContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
}
return flow.start()
}
func (flow *receivePingsFlow) start() error {
for {
message, err := flow.incomingRoute.Dequeue()
if err != nil {
return err
}
pingMessage := message.(*wire.MsgPing)
pongMessage := wire.NewMsgPong(pingMessage.Nonce)
err = flow.outgoingRoute.Enqueue(pongMessage)
if err != nil {
return err
}
}
}

View File

@ -1,46 +1,39 @@
package ping
import (
"github.com/kaspanet/kaspad/protocol/common"
"time"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/common"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/util/random"
"github.com/kaspanet/kaspad/wire"
"time"
)
// ReceivePingsContext is the interface for the context needed for the ReceivePings flow.
type ReceivePingsContext interface {
}
// ReceivePings handles all ping messages coming through incomingRoute.
// This function assumes that incomingRoute will only return MsgPing.
func ReceivePings(_ ReceivePingsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
for {
message, err := incomingRoute.Dequeue()
if err != nil {
return err
}
pingMessage := message.(*wire.MsgPing)
pongMessage := wire.NewMsgPong(pingMessage.Nonce)
err = outgoingRoute.Enqueue(pongMessage)
if err != nil {
return err
}
}
}
// SendPingsContext is the interface for the context needed for the SendPings flow.
type SendPingsContext interface {
}
type sendPingsFlow struct {
SendPingsContext
incomingRoute, outgoingRoute *router.Route
peer *peerpkg.Peer
}
// SendPings starts sending MsgPings every pingInterval seconds to the
// given peer.
// This function assumes that incomingRoute will only return MsgPong.
func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
func SendPings(context SendPingsContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
flow := &sendPingsFlow{
SendPingsContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
peer: peer,
}
return flow.start()
}
func (flow *sendPingsFlow) start() error {
const pingInterval = 2 * time.Minute
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()
@ -50,15 +43,15 @@ func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *r
if err != nil {
return err
}
peer.SetPingPending(nonce)
flow.peer.SetPingPending(nonce)
pingMessage := wire.NewMsgPing(nonce)
err = outgoingRoute.Enqueue(pingMessage)
err = flow.outgoingRoute.Enqueue(pingMessage)
if err != nil {
return err
}
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return err
}
@ -66,7 +59,7 @@ func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *r
if pongMessage.Nonce != pingMessage.Nonce {
return protocolerrors.New(true, "nonce mismatch between ping and pong")
}
peer.SetPingIdle()
flow.peer.SetPingIdle()
}
return nil
}

View File

@ -22,38 +22,52 @@ type RelayedTransactionsContext interface {
Broadcast(message wire.Message) error
}
type handleRelayedTransactionsFlow struct {
RelayedTransactionsContext
incomingRoute, outgoingRoute *router.Route
invsQueue []*wire.MsgInvTransaction
}
// HandleRelayedTransactions listens to wire.MsgInvTransaction messages, requests their corresponding transactions if they
// are missing, adds them to the mempool and propagates them to the rest of the network.
func HandleRelayedTransactions(context RelayedTransactionsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
flow := &handleRelayedTransactionsFlow{
RelayedTransactionsContext: context,
incomingRoute: incomingRoute,
outgoingRoute: outgoingRoute,
invsQueue: make([]*wire.MsgInvTransaction, 0),
}
return flow.start()
}
invsQueue := make([]*wire.MsgInvTransaction, 0)
func (flow *handleRelayedTransactionsFlow) start() error {
for {
inv, err := readInv(incomingRoute, &invsQueue)
inv, err := flow.readInv()
if err != nil {
return err
}
requestedIDs, err := requestInvTransactions(context, outgoingRoute, inv)
requestedIDs, err := flow.requestInvTransactions(inv)
if err != nil {
return err
}
err = receiveTransactions(context, requestedIDs, incomingRoute, &invsQueue)
err = flow.receiveTransactions(requestedIDs)
if err != nil {
return err
}
}
}
func requestInvTransactions(context RelayedTransactionsContext, outgoingRoute *router.Route,
func (flow *handleRelayedTransactionsFlow) requestInvTransactions(
inv *wire.MsgInvTransaction) (requestedIDs []*daghash.TxID, err error) {
idsToRequest := make([]*daghash.TxID, 0, len(inv.TxIDS))
for _, txID := range inv.TxIDS {
if isKnownTransaction(context, txID) {
if flow.isKnownTransaction(txID) {
continue
}
exists := context.SharedRequestedTransactions().addIfNotExists(txID)
exists := flow.SharedRequestedTransactions().addIfNotExists(txID)
if exists {
continue
}
@ -65,18 +79,18 @@ func requestInvTransactions(context RelayedTransactionsContext, outgoingRoute *r
}
msgGetTransactions := wire.NewMsgGetTransactions(idsToRequest)
err = outgoingRoute.Enqueue(msgGetTransactions)
err = flow.outgoingRoute.Enqueue(msgGetTransactions)
if err != nil {
context.SharedRequestedTransactions().removeMany(idsToRequest)
flow.SharedRequestedTransactions().removeMany(idsToRequest)
return nil, err
}
return idsToRequest, nil
}
func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID) bool {
func (flow *handleRelayedTransactionsFlow) isKnownTransaction(txID *daghash.TxID) bool {
// Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan).
if context.TxPool().HaveTransaction(txID) {
if flow.TxPool().HaveTransaction(txID) {
return true
}
@ -91,7 +105,7 @@ func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID)
prevOut := wire.Outpoint{TxID: *txID}
for i := uint32(0); i < 2; i++ {
prevOut.Index = i
_, ok := context.DAG().GetUTXOEntry(prevOut)
_, ok := flow.DAG().GetUTXOEntry(prevOut)
if ok {
return true
}
@ -99,15 +113,15 @@ func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID)
return false
}
func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction) (*wire.MsgInvTransaction, error) {
func (flow *handleRelayedTransactionsFlow) readInv() (*wire.MsgInvTransaction, error) {
if len(*invsQueue) > 0 {
if len(flow.invsQueue) > 0 {
var inv *wire.MsgInvTransaction
inv, *invsQueue = (*invsQueue)[0], (*invsQueue)[1:]
inv, flow.invsQueue = flow.invsQueue[0], flow.invsQueue[1:]
return inv, nil
}
msg, err := incomingRoute.Dequeue()
msg, err := flow.incomingRoute.Dequeue()
if err != nil {
return nil, err
}
@ -120,7 +134,7 @@ func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction)
return inv, nil
}
func broadcastAcceptedTransactions(context RelayedTransactionsContext, acceptedTxs []*mempool.TxDesc) error {
func (flow *handleRelayedTransactionsFlow) broadcastAcceptedTransactions(acceptedTxs []*mempool.TxDesc) error {
// TODO(libp2p) Add mechanism to avoid sending to other peers invs that are known to them (e.g. mruinvmap)
// TODO(libp2p) Consider broadcasting in bulks
idsToBroadcast := make([]*daghash.TxID, len(acceptedTxs))
@ -128,24 +142,24 @@ func broadcastAcceptedTransactions(context RelayedTransactionsContext, acceptedT
idsToBroadcast[i] = tx.Tx.ID()
}
inv := wire.NewMsgTxInv(idsToBroadcast)
return context.Broadcast(inv)
return flow.Broadcast(inv)
}
// readMsgTx returns the next msgTx in incomingRoute, and populates invsQueue with any inv messages that meanwhile arrive.
//
// Note: this function assumes msgChan can contain only wire.MsgInvTransaction and wire.MsgBlock messages.
func readMsgTx(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction) (
func (flow *handleRelayedTransactionsFlow) readMsgTx() (
msgTx *wire.MsgTx, err error) {
for {
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
message, err := flow.incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
return nil, err
}
switch message := message.(type) {
case *wire.MsgInvTransaction:
*invsQueue = append(*invsQueue, message)
flow.invsQueue = append(flow.invsQueue, message)
case *wire.MsgTx:
return message, nil
default:
@ -154,14 +168,13 @@ func readMsgTx(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction
}
}
func receiveTransactions(context RelayedTransactionsContext, requestedTransactions []*daghash.TxID, incomingRoute *router.Route,
invsQueue *[]*wire.MsgInvTransaction) error {
func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransactions []*daghash.TxID) error {
// In case the function returns earlier than expected, we want to make sure sharedRequestedTransactions is
// clean from any pending transactions.
defer context.SharedRequestedTransactions().removeMany(requestedTransactions)
defer flow.SharedRequestedTransactions().removeMany(requestedTransactions)
for _, expectedID := range requestedTransactions {
msgTx, err := readMsgTx(incomingRoute, invsQueue)
msgTx, err := flow.readMsgTx()
if err != nil {
return err
}
@ -170,7 +183,7 @@ func receiveTransactions(context RelayedTransactionsContext, requestedTransactio
return protocolerrors.Errorf(true, "expected transaction %s", expectedID)
}
acceptedTxs, err := context.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag
acceptedTxs, err := flow.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
@ -196,7 +209,7 @@ func receiveTransactions(context RelayedTransactionsContext, requestedTransactio
return protocolerrors.Errorf(true, "rejected transaction %s", tx.ID())
}
err = broadcastAcceptedTransactions(context, acceptedTxs)
err = flow.broadcastAcceptedTransactions(acceptedTxs)
if err != nil {
panic(err)
}

View File

@ -3,6 +3,7 @@ package protocol
import (
"fmt"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/flows/ibd/selectedtip"
"sync/atomic"
"github.com/kaspanet/kaspad/protocol/flows/handshake"
@ -139,13 +140,13 @@ func (m *Manager) addIBDFlows(router *routerpkg.Router, stopped *uint32, stop ch
addFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ibd.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer)
return selectedtip.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer)
},
)
addFlow("HandleGetSelectedTip", router, []wire.MessageCommand{wire.CmdGetSelectedTip}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ibd.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute)
return selectedtip.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute)
},
)