[NOD-1168] Add context interfaces for flows (#808)

* [NOD-1168] Add context interfaces to flows

* [NOD-1168] Move IBD state to protocol manager

* [NOD-1168] Move ready peers to protocol manager

* [NOD-1168] Add comments

* [NOD-1168] Separate context interfaces for send and receive pings

* [NOD-1168] Add protocol shared state to FlowContext

* [NOD-1168] Fix comment

* [NOD-1168] Rename Context->HandleHandshakeContext

* [NOD-1168] Initialize readyPeers and transactionsToRebroadcast

* [NOD-1168] Rename readyPeers -> peers
This commit is contained in:
Ori Newman 2020-07-21 18:02:33 +03:00 committed by GitHub
parent 3e6c1792ef
commit 8e1958c20b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 509 additions and 369 deletions

View File

@ -1,9 +1,13 @@
package common
import (
"github.com/pkg/errors"
"time"
)
// DefaultTimeout is the default duration to wait for enqueuing/dequeuing
// to/from routes.
const DefaultTimeout = 30 * time.Second
// ErrPeerWithSameIDExists signifies that a peer with the same ID already exist.
var ErrPeerWithSameIDExists = errors.New("ready with the same ID already exists")

View File

@ -0,0 +1,8 @@
package flowcontext
import "github.com/kaspanet/kaspad/addrmgr"
// AddressManager returns the address manager associated to the flow context.
func (f *FlowContext) AddressManager() *addrmgr.AddrManager {
return f.addressManager
}

View File

@ -1,7 +1,7 @@
package protocol
package flowcontext
import (
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/flows/blockrelay"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
@ -11,24 +11,23 @@ import (
// OnNewBlock updates the mempool after a new block arrival, and
// relays newly unorphaned transactions and possibly rebroadcast
// manually added transactions when not in IBD.
// TODO(libp2p) Call this function from IBD as well.
func (m *Manager) OnNewBlock(block *util.Block) error {
transactionsAcceptedToMempool, err := m.txPool.HandleNewBlock(block)
func (f *FlowContext) OnNewBlock(block *util.Block) error {
transactionsAcceptedToMempool, err := f.txPool.HandleNewBlock(block)
if err != nil {
return err
}
// TODO(libp2p) Notify transactionsAcceptedToMempool to RPC
m.updateTransactionsToRebroadcast(block)
f.updateTransactionsToRebroadcast(block)
// Don't relay transactions when in IBD.
if atomic.LoadUint32(&m.isInIBD) != 0 {
if atomic.LoadUint32(&f.isInIBD) != 0 {
return nil
}
var txIDsToRebroadcast []*daghash.TxID
if m.shouldRebroadcastTransactions() {
txIDsToRebroadcast = m.txIDsToRebroadcast()
if f.shouldRebroadcastTransactions() {
txIDsToRebroadcast = f.txIDsToRebroadcast()
}
txIDsToBroadcast := make([]*daghash.TxID, len(transactionsAcceptedToMempool)+len(txIDsToRebroadcast))
@ -39,5 +38,11 @@ func (m *Manager) OnNewBlock(block *util.Block) error {
copy(txIDsToBroadcast[len(transactionsAcceptedToMempool):], txIDsToBroadcast)
txIDsToBroadcast = txIDsToBroadcast[:wire.MaxInvPerTxInvMsg]
inv := wire.NewMsgTxInv(txIDsToBroadcast)
return m.netAdapter.Broadcast(peerpkg.ReadyPeerIDs(), inv)
return f.Broadcast(inv)
}
// SharedRequestedBlocks returns a *blockrelay.SharedRequestedBlocks for sharing
// data about requested blocks between different peers.
func (f *FlowContext) SharedRequestedBlocks() *blockrelay.SharedRequestedBlocks {
return f.sharedRequestedBlocks
}

View File

@ -0,0 +1,8 @@
package flowcontext
import "github.com/kaspanet/kaspad/config"
// Config returns an instance of *config.Config associated to the flow context.
func (f *FlowContext) Config() *config.Config {
return f.cfg
}

View File

@ -0,0 +1,8 @@
package flowcontext
import "github.com/kaspanet/kaspad/blockdag"
// DAG returns the DAG associated to the flow context.
func (f *FlowContext) DAG() *blockdag.BlockDAG {
return f.dag
}

View File

@ -0,0 +1,57 @@
package flowcontext
import (
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/mempool"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/protocol/flows/blockrelay"
"github.com/kaspanet/kaspad/protocol/flows/relaytransactions"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
"sync"
"time"
)
// FlowContext holds state that is relevant to more than one flow or one peer, and allows communication between
// different flows that can be associated to different peers.
type FlowContext struct {
cfg *config.Config
netAdapter *netadapter.NetAdapter
txPool *mempool.TxPool
addedTransactions []*util.Tx
dag *blockdag.BlockDAG
addressManager *addrmgr.AddrManager
transactionsToRebroadcastLock sync.Mutex
transactionsToRebroadcast map[daghash.TxID]*util.Tx
lastRebroadcastTime time.Time
sharedRequestedTransactions *relaytransactions.SharedRequestedTransactions
sharedRequestedBlocks *blockrelay.SharedRequestedBlocks
isInIBD uint32
startIBDMutex sync.Mutex
peers map[*id.ID]*peerpkg.Peer
peersMutex sync.RWMutex
}
// New returns a new instance of FlowContext.
func New(cfg *config.Config, dag *blockdag.BlockDAG,
addressManager *addrmgr.AddrManager, txPool *mempool.TxPool, netAdapter *netadapter.NetAdapter) *FlowContext {
return &FlowContext{
cfg: cfg,
netAdapter: netAdapter,
dag: dag,
addressManager: addressManager,
txPool: txPool,
sharedRequestedTransactions: relaytransactions.NewSharedRequestedTransactions(),
sharedRequestedBlocks: blockrelay.NewSharedRequestedBlocks(),
peers: make(map[*id.ID]*peerpkg.Peer),
transactionsToRebroadcast: make(map[daghash.TxID]*util.Tx),
}
}

View File

@ -0,0 +1,70 @@
package flowcontext
import (
"github.com/kaspanet/kaspad/blockdag"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"sync/atomic"
"time"
)
// StartIBDIfRequired selects a peer and starts IBD against it
// if required
func (f *FlowContext) StartIBDIfRequired() {
f.startIBDMutex.Lock()
defer f.startIBDMutex.Unlock()
if f.IsInIBD() {
return
}
peer := f.selectPeerForIBD(f.dag)
if peer == nil {
f.requestSelectedTipsIfRequired()
return
}
atomic.StoreUint32(&f.isInIBD, 1)
peer.StartIBD()
}
// IsInIBD is true if IBD is currently running
func (f *FlowContext) IsInIBD() bool {
return atomic.LoadUint32(&f.isInIBD) != 0
}
// selectPeerForIBD returns the first peer whose selected tip
// hash is not in our DAG
func (f *FlowContext) selectPeerForIBD(dag *blockdag.BlockDAG) *peerpkg.Peer {
for _, peer := range f.peers {
peerSelectedTipHash := peer.SelectedTipHash()
if !dag.IsInDAG(peerSelectedTipHash) {
return peer
}
}
return nil
}
func (f *FlowContext) requestSelectedTipsIfRequired() {
if f.isDAGTimeCurrent() {
return
}
f.requestSelectedTips()
}
func (f *FlowContext) isDAGTimeCurrent() bool {
const minDurationToRequestSelectedTips = time.Minute
return f.dag.Now().Sub(f.dag.SelectedTipHeader().Timestamp) > minDurationToRequestSelectedTips
}
func (f *FlowContext) requestSelectedTips() {
for _, peer := range f.peers {
peer.RequestSelectedTipIfRequired()
}
}
// FinishIBD finishes the current IBD flow and starts a new one if required.
func (f *FlowContext) FinishIBD() {
atomic.StoreUint32(&f.isInIBD, 0)
f.StartIBDIfRequired()
}

View File

@ -0,0 +1,46 @@
package flowcontext
import (
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/protocol/common"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
// NetAdapter returns the net adapter that is associated to the flow context.
func (f *FlowContext) NetAdapter() *netadapter.NetAdapter {
return f.netAdapter
}
// AddToPeers marks this peer as ready and adds it to the ready peers list.
func (f *FlowContext) AddToPeers(peer *peerpkg.Peer) error {
f.peersMutex.RLock()
defer f.peersMutex.RUnlock()
if _, ok := f.peers[peer.ID()]; ok {
return errors.Wrapf(common.ErrPeerWithSameIDExists, "peer with ID %s already exists", peer.ID())
}
f.peers[peer.ID()] = peer
return nil
}
// readyPeerIDs returns the peer IDs of all the ready peers.
func (f *FlowContext) readyPeerIDs() []*id.ID {
f.peersMutex.RLock()
defer f.peersMutex.RUnlock()
peerIDs := make([]*id.ID, len(f.peers))
i := 0
for peerID := range f.peers {
peerIDs[i] = peerID
i++
}
return peerIDs
}
// Broadcast broadcast the given message to all the ready peers.
func (f *FlowContext) Broadcast(message wire.Message) error {
return f.netAdapter.Broadcast(f.readyPeerIDs(), message)
}

View File

@ -0,0 +1,70 @@
package flowcontext
import (
"github.com/kaspanet/kaspad/mempool"
"github.com/kaspanet/kaspad/protocol/flows/relaytransactions"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"time"
)
// AddTransaction adds transaction to the mempool and propagates it.
func (f *FlowContext) AddTransaction(tx *util.Tx) error {
f.transactionsToRebroadcastLock.Lock()
defer f.transactionsToRebroadcastLock.Unlock()
transactionsAcceptedToMempool, err := f.txPool.ProcessTransaction(tx, false, 0)
if err != nil {
return err
}
if len(transactionsAcceptedToMempool) > 1 {
panic(errors.New("got more than one accepted transactions when no orphans were allowed"))
}
f.transactionsToRebroadcast[*tx.ID()] = tx
inv := wire.NewMsgTxInv([]*daghash.TxID{tx.ID()})
return f.Broadcast(inv)
}
func (f *FlowContext) updateTransactionsToRebroadcast(block *util.Block) {
f.transactionsToRebroadcastLock.Lock()
defer f.transactionsToRebroadcastLock.Unlock()
// Note: if the block is red, its transactions won't be rebroadcasted
// anymore, although they are not included in the UTXO set.
// This is probably ok, since red blocks are quite rare.
for _, tx := range block.Transactions() {
delete(f.transactionsToRebroadcast, *tx.ID())
}
}
func (f *FlowContext) shouldRebroadcastTransactions() bool {
const rebroadcastInterval = 30 * time.Second
return time.Since(f.lastRebroadcastTime) > rebroadcastInterval
}
func (f *FlowContext) txIDsToRebroadcast() []*daghash.TxID {
f.transactionsToRebroadcastLock.Lock()
defer f.transactionsToRebroadcastLock.Unlock()
txIDs := make([]*daghash.TxID, len(f.transactionsToRebroadcast))
i := 0
for _, tx := range f.transactionsToRebroadcast {
txIDs[i] = tx.ID()
i++
}
return txIDs
}
// SharedRequestedTransactions returns a *relaytransactions.SharedRequestedTransactions for sharing
// data about requested transactions between different peers.
func (f *FlowContext) SharedRequestedTransactions() *relaytransactions.SharedRequestedTransactions {
return f.sharedRequestedTransactions
}
// TxPool returns the transaction pool associated to the manager.
func (f *FlowContext) TxPool() *mempool.TxPool {
return f.txPool
}

View File

@ -10,11 +10,17 @@ import (
"github.com/kaspanet/kaspad/wire"
)
// ReceiveAddresses asks a peer for more addresses if needed.
func ReceiveAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, cfg *config.Config, peer *peerpkg.Peer,
addressManager *addrmgr.AddrManager) error {
// ReceiveAddressesContext is the interface for the context needed for the ReceiveAddresses flow.
type ReceiveAddressesContext interface {
Config() *config.Config
AddressManager() *addrmgr.AddrManager
}
if !addressManager.NeedMoreAddresses() {
// ReceiveAddresses asks a peer for more addresses if needed.
func ReceiveAddresses(context ReceiveAddressesContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) error {
if !context.AddressManager().NeedMoreAddresses() {
return nil
}
@ -39,15 +45,15 @@ func ReceiveAddresses(incomingRoute *router.Route, outgoingRoute *router.Route,
return protocolerrors.Errorf(true, "got unexpected "+
"IncludeAllSubnetworks=true in [%s] command", msgAddresses.Command())
}
if !msgAddresses.SubnetworkID.IsEqual(cfg.SubnetworkID) && msgAddresses.SubnetworkID != nil {
if !msgAddresses.SubnetworkID.IsEqual(context.Config().SubnetworkID) && msgAddresses.SubnetworkID != nil {
return protocolerrors.Errorf(false, "only full nodes and %s subnetwork IDs "+
"are allowed in [%s] command, but got subnetwork ID %s",
cfg.SubnetworkID, msgAddresses.Command(), msgAddresses.SubnetworkID)
context.Config().SubnetworkID, msgAddresses.Command(), msgAddresses.SubnetworkID)
}
// TODO(libp2p) Consider adding to peer known addresses set
// TODO(libp2p) Replace with real peer IP
fakeSourceAddress := new(wire.NetAddress)
addressManager.AddAddresses(msgAddresses.AddrList, fakeSourceAddress, msgAddresses.SubnetworkID)
context.AddressManager().AddAddresses(msgAddresses.AddrList, fakeSourceAddress, msgAddresses.SubnetworkID)
return nil
}

View File

@ -7,9 +7,13 @@ import (
"math/rand"
)
// SendAddressesContext is the interface for the context needed for the SendAddresses flow.
type SendAddressesContext interface {
AddressManager() *addrmgr.AddrManager
}
// SendAddresses sends addresses to a peer that requests it.
func SendAddresses(incomingRoute *router.Route, outgoingRoute *router.Route,
addressManager *addrmgr.AddrManager) error {
func SendAddresses(context SendAddressesContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
message, err := incomingRoute.Dequeue()
if err != nil {
@ -17,7 +21,8 @@ func SendAddresses(incomingRoute *router.Route, outgoingRoute *router.Route,
}
msgGetAddresses := message.(*wire.MsgGetAddresses)
addresses := addressManager.AddressCache(msgGetAddresses.IncludeAllSubnetworks, msgGetAddresses.SubnetworkID)
addresses := context.AddressManager().AddressCache(msgGetAddresses.IncludeAllSubnetworks,
msgGetAddresses.SubnetworkID)
msgAddresses := wire.NewMsgAddresses(msgGetAddresses.IncludeAllSubnetworks, msgGetAddresses.SubnetworkID)
err = msgAddresses.AddAddresses(shuffleAddresses(addresses)...)
if err != nil {

View File

@ -9,10 +9,15 @@ import (
"github.com/pkg/errors"
)
// RelayBlockRequestsContext is the interface for the context needed for the HandleRelayBlockRequests flow.
type RelayBlockRequestsContext interface {
DAG() *blockdag.BlockDAG
}
// HandleRelayBlockRequests listens to wire.MsgGetRelayBlocks messages and sends
// their corresponding blocks to the requesting peer.
func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer, dag *blockdag.BlockDAG) error {
func HandleRelayBlockRequests(context RelayBlockRequestsContext, incomingRoute *router.Route,
outgoingRoute *router.Route, peer *peerpkg.Peer) error {
for {
message, err := incomingRoute.Dequeue()
@ -22,7 +27,7 @@ func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router
getRelayBlocksMessage := message.(*wire.MsgGetRelayBlocks)
for _, hash := range getRelayBlocksMessage.Hashes {
// Fetch the block from the database.
block, err := dag.BlockByHash(hash)
block, err := context.DAG().BlockByHash(hash)
if blockdag.IsNotInDAGErr(err) {
return protocolerrors.Errorf(true, "block %s not found", hash)
} else if err != nil {
@ -32,7 +37,7 @@ func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router
// If we are a full node and the peer is a partial node, we must convert
// the block to a partial block.
nodeSubnetworkID := dag.SubnetworkID()
nodeSubnetworkID := context.DAG().SubnetworkID()
peerSubnetworkID := peer.SubnetworkID()
isNodeFull := nodeSubnetworkID == nil

View File

@ -6,7 +6,6 @@ import (
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/blocklogger"
"github.com/kaspanet/kaspad/protocol/common"
"github.com/kaspanet/kaspad/protocol/flows/ibd"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/util"
@ -16,14 +15,21 @@ import (
"github.com/pkg/errors"
)
// NewBlockHandler is a function that is to be
// called when a new block is successfully processed.
type NewBlockHandler func(block *util.Block) error
// RelayInvsContext is the interface for the context needed for the HandleRelayInvs flow.
type RelayInvsContext interface {
NetAdapter() *netadapter.NetAdapter
DAG() *blockdag.BlockDAG
OnNewBlock(block *util.Block) error
SharedRequestedBlocks() *SharedRequestedBlocks
StartIBDIfRequired()
IsInIBD() bool
Broadcast(message wire.Message) error
}
// HandleRelayInvs listens to wire.MsgInvRelayBlock messages, requests their corresponding blocks if they
// are missing, adds them to the DAG and propagates them to the rest of the network.
func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer, netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG, newBlockHandler NewBlockHandler) error {
func HandleRelayInvs(context RelayInvsContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) error {
invsQueue := make([]*wire.MsgInvRelayBlock, 0)
for {
@ -32,16 +38,16 @@ func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route,
return err
}
if dag.IsKnownBlock(inv.Hash) {
if dag.IsKnownInvalid(inv.Hash) {
if context.DAG().IsKnownBlock(inv.Hash) {
if context.DAG().IsKnownInvalid(inv.Hash) {
return protocolerrors.Errorf(true, "sent inv of an invalid block %s",
inv.Hash)
}
continue
}
ibd.StartIBDIfRequired(dag)
if ibd.IsInIBD() {
context.StartIBDIfRequired()
if context.IsInIBD() {
// Block relay is disabled during IBD
continue
}
@ -50,8 +56,7 @@ func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route,
requestQueue.enqueueIfNotExists(inv.Hash)
for requestQueue.len() > 0 {
err := requestBlocks(netAdapter, outgoingRoute, peer, incomingRoute, dag, &invsQueue,
requestQueue, newBlockHandler)
err := requestBlocks(context, outgoingRoute, peer, incomingRoute, &invsQueue, requestQueue)
if err != nil {
return err
}
@ -80,10 +85,9 @@ func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlock) (
return inv, nil
}
func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Route,
peer *peerpkg.Peer, incomingRoute *router.Route, dag *blockdag.BlockDAG,
invsQueue *[]*wire.MsgInvRelayBlock, requestQueue *hashesQueueSet,
newBlockHandler NewBlockHandler) error {
func requestBlocks(context RelayInvsContext, outgoingRoute *router.Route,
peer *peerpkg.Peer, incomingRoute *router.Route,
invsQueue *[]*wire.MsgInvRelayBlock, requestQueue *hashesQueueSet) error {
numHashesToRequest := mathUtil.MinInt(wire.MsgGetRelayBlocksHashes, requestQueue.len())
hashesToRequest := requestQueue.dequeue(numHashesToRequest)
@ -91,7 +95,7 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou
pendingBlocks := map[daghash.Hash]struct{}{}
var filteredHashesToRequest []*daghash.Hash
for _, hash := range hashesToRequest {
exists := requestedBlocks.addIfNotExists(hash)
exists := context.SharedRequestedBlocks().addIfNotExists(hash)
if !exists {
continue
}
@ -102,7 +106,7 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou
// In case the function returns earlier than expected, we want to make sure requestedBlocks is
// clean from any pending blocks.
defer requestedBlocks.removeSet(pendingBlocks)
defer context.SharedRequestedBlocks().removeSet(pendingBlocks)
getRelayBlocksMsg := wire.NewMsgGetRelayBlocks(filteredHashesToRequest)
err := outgoingRoute.Enqueue(getRelayBlocksMsg)
@ -122,9 +126,9 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou
return protocolerrors.Errorf(true, "got unrequested block %s", block.Hash())
}
delete(pendingBlocks, *blockHash)
requestedBlocks.remove(blockHash)
context.SharedRequestedBlocks().remove(blockHash)
err = processAndRelayBlock(netAdapater, peer, dag, requestQueue, block, newBlockHandler)
err = processAndRelayBlock(context, peer, requestQueue, block)
if err != nil {
return err
}
@ -155,12 +159,11 @@ func readMsgBlock(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvRelayBlo
}
}
func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
dag *blockdag.BlockDAG, requestQueue *hashesQueueSet, block *util.Block,
newBlockHandler NewBlockHandler) error {
func processAndRelayBlock(context RelayInvsContext, peer *peerpkg.Peer,
requestQueue *hashesQueueSet, block *util.Block) error {
blockHash := block.Hash()
isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNone)
isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone)
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
@ -187,7 +190,7 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
}
const maxOrphanBlueScoreDiff = 10000
selectedTipBlueScore := dag.SelectedTipBlueScore()
selectedTipBlueScore := context.DAG().SelectedTipBlueScore()
if blueScore > selectedTipBlueScore+maxOrphanBlueScoreDiff {
log.Infof("Orphan block %s has blue score %d and the selected tip blue score is "+
"%d. Ignoring orphans with a blue score difference from the selected tip greater than %d",
@ -196,7 +199,7 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
}
// Request the parents for the orphan block from the peer that sent it.
missingAncestors := dag.GetOrphanMissingAncestorHashes(blockHash)
missingAncestors := context.DAG().GetOrphanMissingAncestorHashes(blockHash)
for _, missingAncestor := range missingAncestors {
requestQueue.enqueueIfNotExists(missingAncestor)
}
@ -212,13 +215,13 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
// sm.restartSyncIfNeeded()
//// Clear the rejected transactions.
//sm.rejectedTxns = make(map[daghash.TxID]struct{})
err = netAdapter.Broadcast(peerpkg.ReadyPeerIDs(), wire.NewMsgInvBlock(blockHash))
err = context.Broadcast(wire.NewMsgInvBlock(blockHash))
if err != nil {
return err
}
ibd.StartIBDIfRequired(dag)
err = newBlockHandler(block)
context.StartIBDIfRequired()
err = context.OnNewBlock(block)
if err != nil {
panic(err)
}

View File

@ -6,18 +6,20 @@ import (
"github.com/kaspanet/kaspad/util/daghash"
)
type sharedRequestedBlocks struct {
// SharedRequestedBlocks is a data structure that is shared between peers that
// holds the hashes of all the requested blocks to prevent redundant requests.
type SharedRequestedBlocks struct {
blocks map[daghash.Hash]struct{}
sync.Mutex
}
func (s *sharedRequestedBlocks) remove(hash *daghash.Hash) {
func (s *SharedRequestedBlocks) remove(hash *daghash.Hash) {
s.Lock()
defer s.Unlock()
delete(s.blocks, *hash)
}
func (s *sharedRequestedBlocks) removeSet(blockHashes map[daghash.Hash]struct{}) {
func (s *SharedRequestedBlocks) removeSet(blockHashes map[daghash.Hash]struct{}) {
s.Lock()
defer s.Unlock()
for hash := range blockHashes {
@ -25,7 +27,7 @@ func (s *sharedRequestedBlocks) removeSet(blockHashes map[daghash.Hash]struct{})
}
}
func (s *sharedRequestedBlocks) addIfNotExists(hash *daghash.Hash) (exists bool) {
func (s *SharedRequestedBlocks) addIfNotExists(hash *daghash.Hash) (exists bool) {
s.Lock()
defer s.Unlock()
_, ok := s.blocks[*hash]
@ -36,6 +38,9 @@ func (s *sharedRequestedBlocks) addIfNotExists(hash *daghash.Hash) (exists bool)
return false
}
var requestedBlocks = &sharedRequestedBlocks{
blocks: make(map[daghash.Hash]struct{}),
// NewSharedRequestedBlocks returns a new instance of SharedRequestedBlocks.
func NewSharedRequestedBlocks() *SharedRequestedBlocks {
return &SharedRequestedBlocks{
blocks: make(map[daghash.Hash]struct{}),
}
}

View File

@ -5,21 +5,30 @@ import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/common"
"sync"
"sync/atomic"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/flows/ibd"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/util/locks"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
// HandleHandshakeContext is the interface for the context needed for the HandleHandshake flow.
type HandleHandshakeContext interface {
Config() *config.Config
NetAdapter() *netadapter.NetAdapter
DAG() *blockdag.BlockDAG
AddressManager() *addrmgr.AddrManager
StartIBDIfRequired()
AddToPeers(peer *peerpkg.Peer) error
}
// HandleHandshake sets up the handshake protocol - It sends a version message and waits for an incoming
// version message, as well as a verack for the sent version
func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *netadapter.NetAdapter,
dag *blockdag.BlockDAG, addressManager *addrmgr.AddrManager) (peer *peerpkg.Peer, closed bool, err error) {
func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router) (peer *peerpkg.Peer, closed bool, err error) {
receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion})
if err != nil {
@ -45,7 +54,7 @@ func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *n
var peerAddress *wire.NetAddress
spawn("HandleHandshake-ReceiveVersion", func() {
defer wg.Done()
address, err := ReceiveVersion(receiveVersionRoute, router.OutgoingRoute(), netAdapter, peer, dag)
address, err := ReceiveVersion(context, receiveVersionRoute, router.OutgoingRoute(), peer)
if err != nil {
log.Errorf("error from ReceiveVersion: %s", err)
}
@ -60,7 +69,7 @@ func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *n
spawn("HandleHandshake-SendVersion", func() {
defer wg.Done()
err := SendVersion(cfg, sendVersionRoute, router.OutgoingRoute(), netAdapter, dag)
err := SendVersion(context, sendVersionRoute, router.OutgoingRoute())
if err != nil {
log.Errorf("error from SendVersion: %s", err)
}
@ -81,27 +90,27 @@ func HandleHandshake(cfg *config.Config, router *routerpkg.Router, netAdapter *n
case <-locks.ReceiveFromChanWhenDone(func() { wg.Wait() }):
}
err = peerpkg.AddToReadyPeers(peer)
err = context.AddToPeers(peer)
if err != nil {
if errors.Is(err, peerpkg.ErrPeerWithSameIDExists) {
if errors.Is(err, common.ErrPeerWithSameIDExists) {
return nil, false, err
}
panic(err)
}
peerID := peer.ID()
err = netAdapter.AssociateRouterID(router, peerID)
err = context.NetAdapter().AssociateRouterID(router, peerID)
if err != nil {
panic(err)
}
if peerAddress != nil {
subnetworkID := peer.SubnetworkID()
addressManager.AddAddress(peerAddress, peerAddress, subnetworkID)
addressManager.Good(peerAddress, subnetworkID)
context.AddressManager().AddAddress(peerAddress, peerAddress, subnetworkID)
context.AddressManager().Good(peerAddress, subnetworkID)
}
ibd.StartIBDIfRequired(dag)
context.StartIBDIfRequired()
err = router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck})
if err != nil {

View File

@ -1,8 +1,6 @@
package handshake
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/common"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
@ -23,8 +21,8 @@ var (
// ReceiveVersion waits for the peer to send a version message, sends a
// verack in response, and updates its info accordingly.
func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, netAdapter *netadapter.NetAdapter,
peer *peerpkg.Peer, dag *blockdag.BlockDAG) (*wire.NetAddress, error) {
func ReceiveVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) (*wire.NetAddress, error) {
message, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout)
if err != nil {
@ -36,7 +34,7 @@ func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, ne
return nil, protocolerrors.New(true, "a version message must precede all others")
}
if !allowSelfConnections && netAdapter.ID().IsEqual(msgVersion.ID) {
if !allowSelfConnections && context.NetAdapter().ID().IsEqual(msgVersion.ID) {
return nil, protocolerrors.New(true, "connected to self")
}
@ -53,7 +51,7 @@ func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, ne
}
// Disconnect from partial nodes in networks that don't allow them
if !dag.Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil {
if !context.DAG().Params.EnableNonNativeSubnetworks && msgVersion.SubnetworkID != nil {
return nil, protocolerrors.New(true, "partial nodes are not allowed")
}

View File

@ -1,9 +1,6 @@
package handshake
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/common"
"github.com/kaspanet/kaspad/version"
@ -29,19 +26,18 @@ var (
)
// SendVersion sends a version to a peer and waits for verack.
func SendVersion(cfg *config.Config, incomingRoute *router.Route, outgoingRoute *router.Route,
netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) error {
func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
selectedTipHash := dag.SelectedTipHash()
subnetworkID := cfg.SubnetworkID
selectedTipHash := context.DAG().SelectedTipHash()
subnetworkID := context.Config().SubnetworkID
// Version message.
localAddress, err := netAdapter.GetBestLocalAddress()
localAddress, err := context.NetAdapter().GetBestLocalAddress()
if err != nil {
panic(err)
}
msg := wire.NewMsgVersion(localAddress, netAdapter.ID(), selectedTipHash, subnetworkID)
msg.AddUserAgent(userAgentName, userAgentVersion, cfg.UserAgentComments...)
msg := wire.NewMsgVersion(localAddress, context.NetAdapter().ID(), selectedTipHash, subnetworkID)
msg.AddUserAgent(userAgentName, userAgentVersion, context.Config().UserAgentComments...)
// Advertise the services flag
msg.Services = defaultServices
@ -50,7 +46,7 @@ func SendVersion(cfg *config.Config, incomingRoute *router.Route, outgoingRoute
msg.ProtocolVersion = wire.ProtocolVersion
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = cfg.BlocksOnly
msg.DisableRelayTx = context.Config().BlocksOnly
err = outgoingRoute.Enqueue(msg)
if err != nil {

View File

@ -8,15 +8,22 @@ import (
"github.com/kaspanet/kaspad/wire"
)
// GetBlockLocatorContext is the interface for the context needed for the HandleGetBlockLocator flow.
type GetBlockLocatorContext interface {
DAG() *blockdag.BlockDAG
}
// HandleGetBlockLocator handles getBlockLocator messages
func HandleGetBlockLocator(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error {
func HandleGetBlockLocator(context GetBlockLocatorContext, incomingRoute *router.Route,
outgoingRoute *router.Route) error {
for {
lowHash, highHash, err := receiveGetBlockLocator(incomingRoute)
if err != nil {
return err
}
locator, err := dag.BlockLocatorFromHashes(highHash, lowHash)
locator, err := context.DAG().BlockLocatorFromHashes(highHash, lowHash)
if err != nil || len(locator) == 0 {
return protocolerrors.Errorf(true, "couldn't build a block "+
"locator between blocks %s and %s", lowHash, highHash)

View File

@ -7,15 +7,20 @@ import (
"github.com/kaspanet/kaspad/wire"
)
// GetBlocksContext is the interface for the context needed for the HandleGetBlocks flow.
type GetBlocksContext interface {
DAG() *blockdag.BlockDAG
}
// HandleGetBlocks handles getBlocks messages
func HandleGetBlocks(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error {
func HandleGetBlocks(context GetBlocksContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
for {
lowHash, highHash, err := receiveGetBlocks(incomingRoute)
if err != nil {
return err
}
msgIBDBlocks, err := buildMsgIBDBlocks(lowHash, highHash, dag)
msgIBDBlocks, err := buildMsgIBDBlocks(context, lowHash, highHash)
if err != nil {
return err
}
@ -39,18 +44,18 @@ func receiveGetBlocks(incomingRoute *router.Route) (lowHash *daghash.Hash,
return msgGetBlocks.LowHash, msgGetBlocks.HighHash, nil
}
func buildMsgIBDBlocks(lowHash *daghash.Hash, highHash *daghash.Hash,
dag *blockdag.BlockDAG) ([]*wire.MsgIBDBlock, error) {
func buildMsgIBDBlocks(context GetBlocksContext, lowHash *daghash.Hash,
highHash *daghash.Hash) ([]*wire.MsgIBDBlock, error) {
const maxHashesInMsgIBDBlocks = wire.MaxInvPerMsg
blockHashes, err := dag.AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks)
blockHashes, err := context.DAG().AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks)
if err != nil {
return nil, err
}
msgIBDBlocks := make([]*wire.MsgIBDBlock, len(blockHashes))
for i, blockHash := range blockHashes {
block, err := dag.BlockByHash(blockHash)
block, err := context.DAG().BlockByHash(blockHash)
if err != nil {
return nil, err
}

View File

@ -9,93 +9,50 @@ import (
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
"sync"
"sync/atomic"
)
var (
isIBDRunning uint32
startIBDMutex sync.Mutex
)
// NewBlockHandler is a function that is to be
// called when a new block is successfully processed.
type NewBlockHandler func(block *util.Block) error
// StartIBDIfRequired selects a peer and starts IBD against it
// if required
func StartIBDIfRequired(dag *blockdag.BlockDAG) {
startIBDMutex.Lock()
defer startIBDMutex.Unlock()
if IsInIBD() {
return
}
peer := selectPeerForIBD(dag)
if peer == nil {
requestSelectedTipsIfRequired(dag)
return
}
atomic.StoreUint32(&isIBDRunning, 1)
peer.StartIBD()
}
// IsInIBD is true if IBD is currently running
func IsInIBD() bool {
return atomic.LoadUint32(&isIBDRunning) != 0
}
// selectPeerForIBD returns the first peer whose selected tip
// hash is not in our DAG
func selectPeerForIBD(dag *blockdag.BlockDAG) *peerpkg.Peer {
for _, peer := range peerpkg.ReadyPeers() {
peerSelectedTipHash := peer.SelectedTipHash()
if !dag.IsInDAG(peerSelectedTipHash) {
return peer
}
}
return nil
// HandleIBDContext is the interface for the context needed for the HandleIBD flow.
type HandleIBDContext interface {
DAG() *blockdag.BlockDAG
OnNewBlock(block *util.Block) error
StartIBDIfRequired()
FinishIBD()
}
// HandleIBD waits for IBD start and handles it when IBD is triggered for this peer
func HandleIBD(incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer, dag *blockdag.BlockDAG, newBlockHandler NewBlockHandler) error {
func HandleIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
for {
err := runIBD(incomingRoute, outgoingRoute, peer, dag, newBlockHandler)
err := runIBD(context, incomingRoute, outgoingRoute, peer)
if err != nil {
return err
}
}
}
func runIBD(incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer, dag *blockdag.BlockDAG, newBlockHandler NewBlockHandler) error {
func runIBD(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
peer.WaitForIBDStart()
defer finishIBD(dag)
defer context.FinishIBD()
peerSelectedTipHash := peer.SelectedTipHash()
highestSharedBlockHash, err := findHighestSharedBlockHash(incomingRoute, outgoingRoute, dag, peerSelectedTipHash)
highestSharedBlockHash, err := findHighestSharedBlockHash(context, incomingRoute, outgoingRoute, peerSelectedTipHash)
if err != nil {
return err
}
if dag.IsKnownFinalizedBlock(highestSharedBlockHash) {
if context.DAG().IsKnownFinalizedBlock(highestSharedBlockHash) {
return protocolerrors.Errorf(false, "cannot initiate "+
"IBD with peer %s because the highest shared chain block (%s) is "+
"below the finality point", peer, highestSharedBlockHash)
}
return downloadBlocks(incomingRoute, outgoingRoute, dag, highestSharedBlockHash, peerSelectedTipHash,
newBlockHandler)
return downloadBlocks(context, incomingRoute, outgoingRoute, highestSharedBlockHash, peerSelectedTipHash)
}
func findHighestSharedBlockHash(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG,
func findHighestSharedBlockHash(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peerSelectedTipHash *daghash.Hash) (lowHash *daghash.Hash, err error) {
lowHash = dag.Params.GenesisHash
lowHash = context.DAG().Params.GenesisHash
highHash := peerSelectedTipHash
for {
@ -113,11 +70,11 @@ func findHighestSharedBlockHash(incomingRoute *router.Route, outgoingRoute *rout
// If it is, return it. If it isn't, we need to narrow our
// getBlockLocator request and try again.
locatorHighHash := blockLocatorHashes[0]
if dag.IsInDAG(locatorHighHash) {
if context.DAG().IsInDAG(locatorHighHash) {
return locatorHighHash, nil
}
highHash, lowHash = dag.FindNextLocatorBoundaries(blockLocatorHashes)
highHash, lowHash = context.DAG().FindNextLocatorBoundaries(blockLocatorHashes)
}
}
@ -142,9 +99,9 @@ func receiveBlockLocator(incomingRoute *router.Route) (blockLocatorHashes []*dag
return msgBlockLocator.BlockLocatorHashes, nil
}
func downloadBlocks(incomingRoute *router.Route, outgoingRoute *router.Route,
dag *blockdag.BlockDAG, highestSharedBlockHash *daghash.Hash,
peerSelectedTipHash *daghash.Hash, newBlockHandler NewBlockHandler) error {
func downloadBlocks(context HandleIBDContext, incomingRoute *router.Route, outgoingRoute *router.Route,
highestSharedBlockHash *daghash.Hash,
peerSelectedTipHash *daghash.Hash) error {
err := sendGetBlocks(outgoingRoute, highestSharedBlockHash, peerSelectedTipHash)
if err != nil {
@ -156,7 +113,7 @@ func downloadBlocks(incomingRoute *router.Route, outgoingRoute *router.Route,
if err != nil {
return err
}
err = processIBDBlock(dag, msgIBDBlock, newBlockHandler)
err = processIBDBlock(context, msgIBDBlock)
if err != nil {
return err
}
@ -187,14 +144,13 @@ func receiveIBDBlock(incomingRoute *router.Route) (msgIBDBlock *wire.MsgIBDBlock
return msgIBDBlock, nil
}
func processIBDBlock(dag *blockdag.BlockDAG, msgIBDBlock *wire.MsgIBDBlock,
newBlockHandler NewBlockHandler) error {
func processIBDBlock(context HandleIBDContext, msgIBDBlock *wire.MsgIBDBlock) error {
block := util.NewBlock(&msgIBDBlock.MsgBlock)
if dag.IsInDAG(block.Hash()) {
if context.DAG().IsInDAG(block.Hash()) {
return nil
}
isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNone)
isOrphan, isDelayed, err := context.DAG().ProcessBlock(block, blockdag.BFNone)
if err != nil {
return err
}
@ -206,15 +162,9 @@ func processIBDBlock(dag *blockdag.BlockDAG, msgIBDBlock *wire.MsgIBDBlock,
return protocolerrors.Errorf(false, "received delayed block %s "+
"during IBD", block.Hash())
}
err = newBlockHandler(block)
err = context.OnNewBlock(block)
if err != nil {
panic(err)
}
return nil
}
func finishIBD(dag *blockdag.BlockDAG) {
atomic.StoreUint32(&isIBDRunning, 0)
StartIBDIfRequired(dag)
}

View File

@ -8,41 +8,27 @@ import (
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"time"
)
const minDurationToRequestSelectedTips = time.Minute
func requestSelectedTipsIfRequired(dag *blockdag.BlockDAG) {
if isDAGTimeCurrent(dag) {
return
}
requestSelectedTips()
}
func isDAGTimeCurrent(dag *blockdag.BlockDAG) bool {
return dag.Now().Sub(dag.SelectedTipHeader().Timestamp) > minDurationToRequestSelectedTips
}
func requestSelectedTips() {
for _, peer := range peerpkg.ReadyPeers() {
peer.RequestSelectedTipIfRequired()
}
// RequestSelectedTipContext is the interface for the context needed for the RequestSelectedTip flow.
type RequestSelectedTipContext interface {
DAG() *blockdag.BlockDAG
StartIBDIfRequired()
}
// RequestSelectedTip waits for selected tip requests and handles them
func RequestSelectedTip(incomingRoute *router.Route,
outgoingRoute *router.Route, peer *peerpkg.Peer, dag *blockdag.BlockDAG) error {
func RequestSelectedTip(context RequestSelectedTipContext, incomingRoute *router.Route,
outgoingRoute *router.Route, peer *peerpkg.Peer) error {
for {
err := runSelectedTipRequest(incomingRoute, outgoingRoute, peer, dag)
err := runSelectedTipRequest(context, incomingRoute, outgoingRoute, peer)
if err != nil {
return err
}
}
}
func runSelectedTipRequest(incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer, dag *blockdag.BlockDAG) error {
func runSelectedTipRequest(context RequestSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route,
peer *peerpkg.Peer) error {
peer.WaitForSelectedTipRequests()
defer peer.FinishRequestingSelectedTip()
@ -58,7 +44,7 @@ func runSelectedTipRequest(incomingRoute *router.Route, outgoingRoute *router.Ro
}
peer.SetSelectedTipHash(peerSelectedTipHash)
StartIBDIfRequired(dag)
context.StartIBDIfRequired()
return nil
}
@ -77,15 +63,20 @@ func receiveSelectedTip(incomingRoute *router.Route) (selectedTipHash *daghash.H
return msgSelectedTip.SelectedTipHash, nil
}
// GetSelectedTipContext is the interface for the context needed for the HandleGetSelectedTip flow.
type GetSelectedTipContext interface {
DAG() *blockdag.BlockDAG
}
// HandleGetSelectedTip handles getSelectedTip messages
func HandleGetSelectedTip(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error {
func HandleGetSelectedTip(context GetSelectedTipContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
for {
err := receiveGetSelectedTip(incomingRoute)
if err != nil {
return err
}
selectedTipHash := dag.SelectedTipHash()
selectedTipHash := context.DAG().SelectedTipHash()
err = sendSelectedTipHash(outgoingRoute, selectedTipHash)
if err != nil {
return err

View File

@ -11,9 +11,13 @@ import (
"github.com/kaspanet/kaspad/wire"
)
// ReceivePingsContext is the interface for the context needed for the ReceivePings flow.
type ReceivePingsContext interface {
}
// ReceivePings handles all ping messages coming through incomingRoute.
// This function assumes that incomingRoute will only return MsgPing.
func ReceivePings(incomingRoute *router.Route, outgoingRoute *router.Route) error {
func ReceivePings(_ ReceivePingsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
for {
message, err := incomingRoute.Dequeue()
if err != nil {
@ -29,10 +33,14 @@ func ReceivePings(incomingRoute *router.Route, outgoingRoute *router.Route) erro
}
}
// SendPingsContext is the interface for the context needed for the SendPings flow.
type SendPingsContext interface {
}
// SendPings starts sending MsgPings every pingInterval seconds to the
// given peer.
// This function assumes that incomingRoute will only return MsgPong.
func SendPings(incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
func SendPings(_ SendPingsContext, incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer) error {
const pingInterval = 2 * time.Minute
ticker := time.NewTicker(pingInterval)
defer ticker.Stop()

View File

@ -6,7 +6,6 @@ import (
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/common"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
@ -14,15 +13,18 @@ import (
"github.com/pkg/errors"
)
// NewBlockHandler is a function that is to be
// called when a new block is successfully processed.
type NewBlockHandler func(block *util.Block) error
// RelayedTransactionsContext is the interface for the context needed for the HandleRelayedTransactions flow.
type RelayedTransactionsContext interface {
NetAdapter() *netadapter.NetAdapter
DAG() *blockdag.BlockDAG
SharedRequestedTransactions() *SharedRequestedTransactions
TxPool() *mempool.TxPool
Broadcast(message wire.Message) error
}
// HandleRelayedTransactions listens to wire.MsgInvTransaction messages, requests their corresponding transactions if they
// are missing, adds them to the mempool and propagates them to the rest of the network.
func HandleRelayedTransactions(incomingRoute *router.Route, outgoingRoute *router.Route,
netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG, txPool *mempool.TxPool,
sharedRequestedTransactions *SharedRequestedTransactions) error {
func HandleRelayedTransactions(context RelayedTransactionsContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
invsQueue := make([]*wire.MsgInvTransaction, 0)
for {
@ -31,29 +33,27 @@ func HandleRelayedTransactions(incomingRoute *router.Route, outgoingRoute *route
return err
}
requestedIDs, err := requestInvTransactions(outgoingRoute, txPool, dag, sharedRequestedTransactions, inv)
requestedIDs, err := requestInvTransactions(context, outgoingRoute, inv)
if err != nil {
return err
}
err = receiveTransactions(requestedIDs, incomingRoute, &invsQueue, txPool, netAdapter,
sharedRequestedTransactions)
err = receiveTransactions(context, requestedIDs, incomingRoute, &invsQueue)
if err != nil {
return err
}
}
}
func requestInvTransactions(outgoingRoute *router.Route, txPool *mempool.TxPool, dag *blockdag.BlockDAG,
sharedRequestedTransactions *SharedRequestedTransactions, inv *wire.MsgInvTransaction) (requestedIDs []*daghash.TxID,
err error) {
func requestInvTransactions(context RelayedTransactionsContext, outgoingRoute *router.Route,
inv *wire.MsgInvTransaction) (requestedIDs []*daghash.TxID, err error) {
idsToRequest := make([]*daghash.TxID, 0, len(inv.TxIDS))
for _, txID := range inv.TxIDS {
if isKnownTransaction(txPool, dag, txID) {
if isKnownTransaction(context, txID) {
continue
}
exists := sharedRequestedTransactions.addIfNotExists(txID)
exists := context.SharedRequestedTransactions().addIfNotExists(txID)
if exists {
continue
}
@ -67,16 +67,16 @@ func requestInvTransactions(outgoingRoute *router.Route, txPool *mempool.TxPool,
msgGetTransactions := wire.NewMsgGetTransactions(idsToRequest)
err = outgoingRoute.Enqueue(msgGetTransactions)
if err != nil {
sharedRequestedTransactions.removeMany(idsToRequest)
context.SharedRequestedTransactions().removeMany(idsToRequest)
return nil, err
}
return idsToRequest, nil
}
func isKnownTransaction(txPool *mempool.TxPool, dag *blockdag.BlockDAG, txID *daghash.TxID) bool {
func isKnownTransaction(context RelayedTransactionsContext, txID *daghash.TxID) bool {
// Ask the transaction memory pool if the transaction is known
// to it in any form (main pool or orphan).
if txPool.HaveTransaction(txID) {
if context.TxPool().HaveTransaction(txID) {
return true
}
@ -91,7 +91,7 @@ func isKnownTransaction(txPool *mempool.TxPool, dag *blockdag.BlockDAG, txID *da
prevOut := wire.Outpoint{TxID: *txID}
for i := uint32(0); i < 2; i++ {
prevOut.Index = i
_, ok := dag.GetUTXOEntry(prevOut)
_, ok := context.DAG().GetUTXOEntry(prevOut)
if ok {
return true
}
@ -120,7 +120,7 @@ func readInv(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction)
return inv, nil
}
func broadcastAcceptedTransactions(netAdapter *netadapter.NetAdapter, acceptedTxs []*mempool.TxDesc) error {
func broadcastAcceptedTransactions(context RelayedTransactionsContext, acceptedTxs []*mempool.TxDesc) error {
// TODO(libp2p) Add mechanism to avoid sending to other peers invs that are known to them (e.g. mruinvmap)
// TODO(libp2p) Consider broadcasting in bulks
idsToBroadcast := make([]*daghash.TxID, len(acceptedTxs))
@ -128,7 +128,7 @@ func broadcastAcceptedTransactions(netAdapter *netadapter.NetAdapter, acceptedTx
idsToBroadcast[i] = tx.Tx.ID()
}
inv := wire.NewMsgTxInv(idsToBroadcast)
return netAdapter.Broadcast(peerpkg.ReadyPeerIDs(), inv)
return context.Broadcast(inv)
}
// readMsgTx returns the next msgTx in incomingRoute, and populates invsQueue with any inv messages that meanwhile arrive.
@ -154,13 +154,12 @@ func readMsgTx(incomingRoute *router.Route, invsQueue *[]*wire.MsgInvTransaction
}
}
func receiveTransactions(requestedTransactions []*daghash.TxID, incomingRoute *router.Route,
invsQueue *[]*wire.MsgInvTransaction, txPool *mempool.TxPool, netAdapter *netadapter.NetAdapter,
sharedRequestedTransactions *SharedRequestedTransactions) error {
func receiveTransactions(context RelayedTransactionsContext, requestedTransactions []*daghash.TxID, incomingRoute *router.Route,
invsQueue *[]*wire.MsgInvTransaction) error {
// In case the function returns earlier than expected, we want to make sure sharedRequestedTransactions is
// clean from any pending transactions.
defer sharedRequestedTransactions.removeMany(requestedTransactions)
defer context.SharedRequestedTransactions().removeMany(requestedTransactions)
for _, expectedID := range requestedTransactions {
msgTx, err := readMsgTx(incomingRoute, invsQueue)
if err != nil {
@ -171,7 +170,7 @@ func receiveTransactions(requestedTransactions []*daghash.TxID, incomingRoute *r
return protocolerrors.Errorf(true, "expected transaction %s", expectedID)
}
acceptedTxs, err := txPool.ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag
acceptedTxs, err := context.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
@ -197,7 +196,7 @@ func receiveTransactions(requestedTransactions []*daghash.TxID, incomingRoute *r
return protocolerrors.Errorf(true, "rejected transaction %s", tx.ID())
}
err = broadcastAcceptedTransactions(netAdapter, acceptedTxs)
err = broadcastAcceptedTransactions(context, acceptedTxs)
if err != nil {
panic(err)
}

View File

@ -6,28 +6,12 @@ import (
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/mempool"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/flows/relaytransactions"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
"sync"
"time"
"github.com/kaspanet/kaspad/protocol/flowcontext"
)
// Manager manages the p2p protocol
type Manager struct {
cfg *config.Config
netAdapter *netadapter.NetAdapter
txPool *mempool.TxPool
addedTransactions []*util.Tx
dag *blockdag.BlockDAG
addressManager *addrmgr.AddrManager
transactionsToRebroadcastLock sync.Mutex
transactionsToRebroadcast map[daghash.TxID]*util.Tx
lastRebroadcastTime time.Time
sharedRequestedTransactions *relaytransactions.SharedRequestedTransactions
isInIBD uint32 // TODO(libp2p) populate this var
context *flowcontext.FlowContext
}
// NewManager creates a new instance of the p2p protocol manager
@ -40,11 +24,7 @@ func NewManager(cfg *config.Config, dag *blockdag.BlockDAG,
}
manager := Manager{
netAdapter: netAdapter,
dag: dag,
addressManager: addressManager,
txPool: txPool,
sharedRequestedTransactions: relaytransactions.NewSharedRequestedTransactions(),
context: flowcontext.New(cfg, dag, addressManager, txPool, netAdapter),
}
netAdapter.SetRouterInitializer(manager.routerInitializer)
return &manager, nil
@ -52,10 +32,10 @@ func NewManager(cfg *config.Config, dag *blockdag.BlockDAG,
// Start starts the p2p protocol
func (m *Manager) Start() error {
return m.netAdapter.Start()
return m.context.NetAdapter().Start()
}
// Stop stops the p2p protocol
func (m *Manager) Stop() error {
return m.netAdapter.Stop()
return m.context.NetAdapter().Stop()
}

View File

@ -11,7 +11,6 @@ import (
"github.com/kaspanet/kaspad/util/mstime"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
// Peer holds data about a peer.
@ -118,49 +117,6 @@ func (p *Peer) String() string {
panic("unimplemented")
}
var (
readyPeers = make(map[*id.ID]*Peer, 0)
readyPeersMutex sync.RWMutex
)
// ErrPeerWithSameIDExists signifies that a peer with the same ID already exist.
var ErrPeerWithSameIDExists = errors.New("ready with the same ID already exists")
// AddToReadyPeers marks this peer as ready and adds it to the ready peers list.
func AddToReadyPeers(peer *Peer) error {
readyPeersMutex.RLock()
defer readyPeersMutex.RUnlock()
if _, ok := readyPeers[peer.id]; ok {
return errors.Wrapf(ErrPeerWithSameIDExists, "peer with ID %s already exists", peer.id)
}
readyPeers[peer.id] = peer
return nil
}
// ReadyPeerIDs returns the peer IDs of all the ready peers.
func ReadyPeerIDs() []*id.ID {
readyPeersMutex.RLock()
defer readyPeersMutex.RUnlock()
peerIDs := make([]*id.ID, len(readyPeers))
i := 0
for peerID := range readyPeers {
peerIDs[i] = peerID
i++
}
return peerIDs
}
// ReadyPeers returns a copy of the currently ready peers
func ReadyPeers() []*Peer {
peers := make([]*Peer, 0, len(readyPeers))
for _, readyPeer := range readyPeers {
peers = append(peers, readyPeer)
}
return peers
}
// RequestSelectedTipIfRequired notifies the peer that requesting
// a selected tip is required. This triggers the selected tip
// request flow.

View File

@ -30,14 +30,14 @@ func (m *Manager) routerInitializer() (*routerpkg.Router, error) {
// TODO(libp2p) Ban peer
panic("unimplemented")
}
err = m.netAdapter.DisconnectAssociatedConnection(router)
err = m.context.NetAdapter().DisconnectAssociatedConnection(router)
if err != nil {
panic(err)
}
return
}
if errors.Is(err, routerpkg.ErrTimeout) {
err = m.netAdapter.DisconnectAssociatedConnection(router)
err = m.context.NetAdapter().DisconnectAssociatedConnection(router)
if err != nil {
panic(err)
}
@ -57,7 +57,7 @@ func (m *Manager) startFlows(router *routerpkg.Router) error {
stop := make(chan error)
stopped := uint32(0)
peer, closed, err := handshake.HandleHandshake(m.cfg, router, m.netAdapter, m.dag, m.addressManager)
peer, closed, err := handshake.HandleHandshake(m.context, router)
if err != nil {
return err
}
@ -82,13 +82,13 @@ func (m *Manager) addAddressFlows(router *routerpkg.Router, stopped *uint32, sto
addOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdGetAddresses}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return addressexchange.SendAddresses(incomingRoute, outgoingRoute, m.addressManager)
return addressexchange.SendAddresses(m.context, incomingRoute, outgoingRoute)
},
)
addOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddress}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return addressexchange.ReceiveAddresses(incomingRoute, outgoingRoute, m.cfg, peer, m.addressManager)
return addressexchange.ReceiveAddresses(m.context, incomingRoute, outgoingRoute, peer)
},
)
}
@ -99,14 +99,14 @@ func (m *Manager) addBlockRelayFlows(router *routerpkg.Router, stopped *uint32,
addFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return blockrelay.HandleRelayInvs(incomingRoute,
outgoingRoute, peer, m.netAdapter, m.dag, m.OnNewBlock)
return blockrelay.HandleRelayInvs(m.context, incomingRoute,
outgoingRoute, peer)
},
)
addFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdGetRelayBlocks}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return blockrelay.HandleRelayBlockRequests(incomingRoute, outgoingRoute, peer, m.dag)
return blockrelay.HandleRelayBlockRequests(m.context, incomingRoute, outgoingRoute, peer)
},
)
}
@ -116,13 +116,13 @@ func (m *Manager) addPingFlows(router *routerpkg.Router, stopped *uint32, stop c
addFlow("ReceivePings", router, []wire.MessageCommand{wire.CmdPing}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ping.ReceivePings(incomingRoute, outgoingRoute)
return ping.ReceivePings(m.context, incomingRoute, outgoingRoute)
},
)
addFlow("SendPings", router, []wire.MessageCommand{wire.CmdPong}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ping.SendPings(incomingRoute, outgoingRoute, peer)
return ping.SendPings(m.context, incomingRoute, outgoingRoute, peer)
},
)
}
@ -134,31 +134,31 @@ func (m *Manager) addIBDFlows(router *routerpkg.Router, stopped *uint32, stop ch
addFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ibd.HandleIBD(incomingRoute, outgoingRoute, peer, m.dag, m.OnNewBlock)
return ibd.HandleIBD(m.context, incomingRoute, outgoingRoute, peer)
},
)
addFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ibd.RequestSelectedTip(incomingRoute, outgoingRoute, peer, m.dag)
return ibd.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer)
},
)
addFlow("HandleGetSelectedTip", router, []wire.MessageCommand{wire.CmdGetSelectedTip}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ibd.HandleGetSelectedTip(incomingRoute, outgoingRoute, m.dag)
return ibd.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute)
},
)
addFlow("HandleGetBlockLocator", router, []wire.MessageCommand{wire.CmdGetBlockLocator}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ibd.HandleGetBlockLocator(incomingRoute, outgoingRoute, m.dag)
return ibd.HandleGetBlockLocator(m.context, incomingRoute, outgoingRoute)
},
)
addFlow("HandleGetBlocks", router, []wire.MessageCommand{wire.CmdGetBlocks}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ibd.HandleGetBlocks(incomingRoute, outgoingRoute, m.dag)
return ibd.HandleGetBlocks(m.context, incomingRoute, outgoingRoute)
},
)
}
@ -169,8 +169,7 @@ func (m *Manager) addTransactionRelayFlow(router *routerpkg.Router, stopped *uin
addFlow("HandleRelayedTransactions", router, []wire.MessageCommand{wire.CmdInv, wire.CmdTx}, stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return relaytransactions.HandleRelayedTransactions(incomingRoute, outgoingRoute, m.netAdapter, m.dag,
m.txPool, m.sharedRequestedTransactions)
return relaytransactions.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute)
},
)
}

View File

@ -1,58 +0,0 @@
package protocol
import (
"github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"time"
)
// AddTransaction adds transaction to the mempool and propagates it.
func (m *Manager) AddTransaction(tx *util.Tx) error {
m.transactionsToRebroadcastLock.Lock()
defer m.transactionsToRebroadcastLock.Unlock()
transactionsAcceptedToMempool, err := m.txPool.ProcessTransaction(tx, false, 0)
if err != nil {
return err
}
if len(transactionsAcceptedToMempool) > 1 {
panic(errors.New("got more than one accepted transactions when no orphans were allowed"))
}
m.transactionsToRebroadcast[*tx.ID()] = tx
inv := wire.NewMsgTxInv([]*daghash.TxID{tx.ID()})
return m.netAdapter.Broadcast(peer.ReadyPeerIDs(), inv)
}
func (m *Manager) updateTransactionsToRebroadcast(block *util.Block) {
m.transactionsToRebroadcastLock.Lock()
defer m.transactionsToRebroadcastLock.Unlock()
// Note: if the block is red, its transactions won't be rebroadcasted
// anymore, although they are not included in the UTXO set.
// This is probably ok, since red blocks are quite rare.
for _, tx := range block.Transactions() {
delete(m.transactionsToRebroadcast, *tx.ID())
}
}
func (m *Manager) shouldRebroadcastTransactions() bool {
const rebroadcastInterval = 30 * time.Second
return time.Since(m.lastRebroadcastTime) > rebroadcastInterval
}
func (m *Manager) txIDsToRebroadcast() []*daghash.TxID {
m.transactionsToRebroadcastLock.Lock()
defer m.transactionsToRebroadcastLock.Unlock()
txIDs := make([]*daghash.TxID, len(m.transactionsToRebroadcast))
i := 0
for _, tx := range m.transactionsToRebroadcast {
txIDs[i] = tx.ID()
i++
}
return txIDs
}