diff --git a/logger/logger.go b/logger/logger.go index 2fcef731c..055dc417d 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -53,6 +53,7 @@ var ( ntarLog = BackendLog.Logger("NTAR") dnssLog = BackendLog.Logger("DNSS") snvrLog = BackendLog.Logger("SNVR") + ibdsLog = BackendLog.Logger("IBDS") ) // SubsystemTags is an enum of all sub system tags @@ -81,7 +82,8 @@ var SubsystemTags = struct { P2PS, NTAR, DNSS, - SNVR string + SNVR, + IBDS string }{ ADXR: "ADXR", AMGR: "AMGR", @@ -108,6 +110,7 @@ var SubsystemTags = struct { NTAR: "NTAR", DNSS: "DNSS", SNVR: "SNVR", + IBDS: "IBDS", } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -137,6 +140,7 @@ var subsystemLoggers = map[string]*logs.Logger{ SubsystemTags.NTAR: ntarLog, SubsystemTags.DNSS: dnssLog, SubsystemTags.SNVR: snvrLog, + SubsystemTags.IBDS: ibdsLog, } // InitLog attaches log file and error log file to the backend log. diff --git a/netadapter/router/route.go b/netadapter/router/route.go index 9df7e0b33..5692ca3b2 100644 --- a/netadapter/router/route.go +++ b/netadapter/router/route.go @@ -9,7 +9,7 @@ import ( ) const ( - maxMessages = 100 + defaultMaxMessages = 100 ) // ErrTimeout signifies that one of the router functions had a timeout. @@ -32,8 +32,12 @@ type Route struct { // NewRoute create a new Route func NewRoute() *Route { + return newRouteWithCapacity(defaultMaxMessages) +} + +func newRouteWithCapacity(capacity int) *Route { return &Route{ - channel: make(chan wire.Message, maxMessages), + channel: make(chan wire.Message, capacity), closed: false, } } @@ -46,33 +50,13 @@ func (r *Route) Enqueue(message wire.Message) (isOpen bool) { if r.closed { return false } - if len(r.channel) == maxMessages { + if len(r.channel) == defaultMaxMessages { r.onCapacityReachedHandler() } r.channel <- message return true } -// EnqueueWithTimeout attempts to enqueue a message to the Route -// and returns an error if the given timeout expires first. -func (r *Route) EnqueueWithTimeout(message wire.Message, timeout time.Duration) (isOpen bool, err error) { - r.closeLock.Lock() - defer r.closeLock.Unlock() - - if r.closed { - return false, nil - } - if len(r.channel) == maxMessages { - r.onCapacityReachedHandler() - } - select { - case <-time.After(timeout): - return false, errors.Wrapf(ErrTimeout, "got timeout after %s", timeout) - case r.channel <- message: - return true, nil - } -} - // Dequeue dequeues a message from the Route func (r *Route) Dequeue() (message wire.Message, isOpen bool) { message, isOpen = <-r.channel diff --git a/netadapter/router/router.go b/netadapter/router/router.go index 72e19b389..9a6a6063d 100644 --- a/netadapter/router/router.go +++ b/netadapter/router/router.go @@ -5,6 +5,8 @@ import ( "github.com/pkg/errors" ) +const outgoingRouteMaxMessages = wire.MaxInvPerMsg + defaultMaxMessages + // OnRouteCapacityReachedHandler is a function that is to // be called when one of the routes reaches capacity. type OnRouteCapacityReachedHandler func() @@ -22,7 +24,7 @@ type Router struct { func NewRouter() *Router { router := Router{ incomingRoutes: make(map[wire.MessageCommand]*Route), - outgoingRoute: NewRoute(), + outgoingRoute: newRouteWithCapacity(outgoingRouteMaxMessages), } router.outgoingRoute.setOnCapacityReachedHandler(func() { router.onRouteCapacityReachedHandler() diff --git a/peer/message_logging.go b/peer/message_logging.go index 84cd2b536..39c2eb8ab 100644 --- a/peer/message_logging.go +++ b/peer/message_logging.go @@ -131,7 +131,7 @@ func messageSummary(msg wire.Message) string { case *wire.MsgGetData: return invSummary(msg.InvList) - case *wire.MsgGetBlockInvs: + case *wire.MsgGetBlocks: return fmt.Sprintf("low hash %s, high hash %s", msg.LowHash, msg.HighHash) diff --git a/peer/peer.go b/peer/peer.go index 1550ae52e..d1171b848 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -138,7 +138,7 @@ type MessageListeners struct { // OnGetBlockInvs is invoked when a peer receives a getblockinvs kaspa // message. - OnGetBlockInvs func(p *Peer, msg *wire.MsgGetBlockInvs) + OnGetBlockInvs func(p *Peer, msg *wire.MsgGetBlocks) // OnFeeFilter is invoked when a peer receives a feefilter kaspa message. OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter) @@ -803,7 +803,7 @@ func (p *Peer) PushGetBlockInvsMsg(lowHash, highHash *daghash.Hash) error { } // Construct the getblockinvs request and queue it to be sent. - msg := wire.NewMsgGetBlockInvs(lowHash, highHash) + msg := wire.NewMsgGetBlocks(lowHash, highHash) p.QueueMessage(msg, nil) // Update the previous getblockinvs request information for filtering @@ -820,13 +820,7 @@ func (p *Peer) PushGetBlockInvsMsg(lowHash, highHash *daghash.Hash) error { // This function is safe for concurrent access. func (p *Peer) PushBlockLocatorMsg(locator blockdag.BlockLocator) error { // Construct the locator request and queue it to be sent. - msg := wire.NewMsgBlockLocator() - for _, hash := range locator { - err := msg.AddBlockLocatorHash(hash) - if err != nil { - return err - } - } + msg := wire.NewMsgBlockLocator(locator) p.QueueMessage(msg, nil) return nil } @@ -1107,7 +1101,7 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[wire.MessageCommand]time.Ti // Expects a verack message. pendingResponses[wire.CmdVerAck] = deadline - case wire.CmdGetBlockInvs: + case wire.CmdGetBlocks: // Expects an inv message. pendingResponses[wire.CmdInv] = deadline @@ -1401,7 +1395,7 @@ out: p.cfg.Listeners.OnBlockLocator(p, msg) } - case *wire.MsgGetBlockInvs: + case *wire.MsgGetBlocks: if p.cfg.Listeners.OnGetBlockInvs != nil { p.cfg.Listeners.OnGetBlockInvs(p, msg) } diff --git a/protocol/common/common.go b/protocol/common/common.go new file mode 100644 index 000000000..8a766b6b2 --- /dev/null +++ b/protocol/common/common.go @@ -0,0 +1,7 @@ +package common + +import "time" + +// DefaultTimeout is the default duration to wait for enqueuing/dequeuing +// to/from routes. +const DefaultTimeout = 30 * time.Second diff --git a/protocol/flows/addressexchange/receiveaddresses.go b/protocol/flows/addressexchange/receiveaddresses.go index e26dffef6..8839693b3 100644 --- a/protocol/flows/addressexchange/receiveaddresses.go +++ b/protocol/flows/addressexchange/receiveaddresses.go @@ -17,20 +17,13 @@ const timeout = 30 * time.Second func ReceiveAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, peer *peerpkg.Peer, addressManager *addrmgr.AddrManager) (routeClosed bool, err error) { - subnetworkID, err := peer.SubnetworkID() - if err != nil { - panic(err) - } - if !addressManager.NeedMoreAddresses() { return false, nil } + subnetworkID := peer.SubnetworkID() msgGetAddresses := wire.NewMsgGetAddresses(false, subnetworkID) - isOpen, err := outgoingRoute.EnqueueWithTimeout(msgGetAddresses, timeout) - if err != nil { - return false, err - } + isOpen := outgoingRoute.Enqueue(msgGetAddresses) if !isOpen { return true, nil } diff --git a/protocol/flows/addressexchange/sendaddresses.go b/protocol/flows/addressexchange/sendaddresses.go index e7513fa95..fd144b0e0 100644 --- a/protocol/flows/addressexchange/sendaddresses.go +++ b/protocol/flows/addressexchange/sendaddresses.go @@ -1,12 +1,10 @@ package addressexchange import ( - "math/rand" - "time" - "github.com/kaspanet/kaspad/addrmgr" "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/wire" + "math/rand" ) // SendAddresses sends addresses to a peer that requests it. @@ -26,11 +24,7 @@ func SendAddresses(incomingRoute *router.Route, outgoingRoute *router.Route, panic(err) } - const timeout = 30 * time.Second - isOpen, err = outgoingRoute.EnqueueWithTimeout(msgAddresses, timeout) - if err != nil { - return false, err - } + isOpen = outgoingRoute.Enqueue(msgAddresses) if !isOpen { return true, nil } diff --git a/protocol/flows/blockrelay/handle_relay_block_requests.go b/protocol/flows/blockrelay/handle_relay_block_requests.go index 313d89366..0cb254e0d 100644 --- a/protocol/flows/blockrelay/handle_relay_block_requests.go +++ b/protocol/flows/blockrelay/handle_relay_block_requests.go @@ -33,10 +33,7 @@ func HandleRelayBlockRequests(incomingRoute *router.Route, outgoingRoute *router // If we are a full node and the peer is a partial node, we must convert // the block to a partial block. nodeSubnetworkID := dag.SubnetworkID() - peerSubnetworkID, err := peer.SubnetworkID() - if err != nil { - return err - } + peerSubnetworkID := peer.SubnetworkID() isNodeFull := nodeSubnetworkID == nil isPeerFull := peerSubnetworkID == nil diff --git a/protocol/flows/blockrelay/handle_relay_invs.go b/protocol/flows/blockrelay/handle_relay_invs.go index 5d4d60cd8..e9cc33063 100644 --- a/protocol/flows/blockrelay/handle_relay_invs.go +++ b/protocol/flows/blockrelay/handle_relay_invs.go @@ -7,6 +7,7 @@ import ( "github.com/kaspanet/kaspad/netadapter" "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/protocol/blocklogger" + "github.com/kaspanet/kaspad/protocol/flows/ibd" peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/protocol/protocolerrors" "github.com/kaspanet/kaspad/util" @@ -41,6 +42,12 @@ func HandleRelayInvs(incomingRoute *router.Route, outgoingRoute *router.Route, continue } + ibd.StartIBDIfRequired(dag) + if ibd.IsInIBD() { + // Block relay is disabled during IBD + continue + } + requestQueue := newHashesQueueSet() requestQueue.enqueueIfNotExists(inv.Hash) @@ -102,10 +109,7 @@ func requestBlocks(netAdapater *netadapter.NetAdapter, outgoingRoute *router.Rou defer requestedBlocks.removeSet(pendingBlocks) getRelayBlocksMsg := wire.NewMsgGetRelayBlocks(filteredHashesToRequest) - isOpen, err := outgoingRoute.EnqueueWithTimeout(getRelayBlocksMsg, timeout) - if err != nil { - return false, err - } + isOpen := outgoingRoute.Enqueue(getRelayBlocksMsg) if !isOpen { return true, nil } @@ -224,5 +228,8 @@ func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer, if err != nil { return false, err } + + ibd.StartIBDIfRequired(dag) + return false, nil } diff --git a/protocol/flows/handshake/handshake.go b/protocol/flows/handshake/handshake.go index 478d5bb45..8e7561c81 100644 --- a/protocol/flows/handshake/handshake.go +++ b/protocol/flows/handshake/handshake.go @@ -8,6 +8,7 @@ import ( "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/netadapter" routerpkg "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/protocol/flows/ibd" peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/util/locks" "github.com/kaspanet/kaspad/wire" @@ -16,8 +17,8 @@ import ( // HandleHandshake sets up the handshake protocol - It sends a version message and waits for an incoming // version message, as well as a verack for the sent version -func HandleHandshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer, - dag *blockdag.BlockDAG, addressManager *addrmgr.AddrManager) (closed bool, err error) { +func HandleHandshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, + dag *blockdag.BlockDAG, addressManager *addrmgr.AddrManager) (peer *peerpkg.Peer, closed bool, err error) { receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion}) if err != nil { @@ -71,42 +72,39 @@ func HandleHandshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter select { case err := <-errChan: if err != nil { - return false, err + return nil, false, err } - return true, nil + return nil, true, nil case <-locks.ReceiveFromChanWhenDone(func() { wg.Wait() }): } + peer = peerpkg.New() err = peerpkg.AddToReadyPeers(peer) if err != nil { if errors.Is(err, peerpkg.ErrPeerWithSameIDExists) { - return false, err + return nil, false, err } panic(err) } - peerID, err := peer.ID() - if err != nil { - panic(err) - } - + peerID := peer.ID() err = netAdapter.AssociateRouterID(router, peerID) if err != nil { panic(err) } if peerAddress != nil { - subnetworkID, err := peer.SubnetworkID() - if err != nil { - panic(err) - } + subnetworkID := peer.SubnetworkID() addressManager.AddAddress(peerAddress, peerAddress, subnetworkID) addressManager.Good(peerAddress, subnetworkID) } + ibd.StartIBDIfRequired(dag) + err = router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck}) if err != nil { panic(err) } - return false, nil + + return peer, false, nil } diff --git a/protocol/flows/handshake/receiveversion.go b/protocol/flows/handshake/receiveversion.go index 2eedde27c..af1edf2f1 100644 --- a/protocol/flows/handshake/receiveversion.go +++ b/protocol/flows/handshake/receiveversion.go @@ -77,10 +77,7 @@ func ReceiveVersion(incomingRoute *router.Route, outgoingRoute *router.Route, ne //} peer.UpdateFieldsFromMsgVersion(msgVersion) - isOpen, err = outgoingRoute.EnqueueWithTimeout(wire.NewMsgVerAck(), timeout) - if err != nil { - return nil, false, err - } + isOpen = outgoingRoute.Enqueue(wire.NewMsgVerAck()) if !isOpen { return nil, true, nil } diff --git a/protocol/flows/handshake/sendversion.go b/protocol/flows/handshake/sendversion.go index e6da4e057..fa0463f46 100644 --- a/protocol/flows/handshake/sendversion.go +++ b/protocol/flows/handshake/sendversion.go @@ -51,10 +51,7 @@ func SendVersion(incomingRoute *router.Route, outgoingRoute *router.Route, netAd // Advertise if inv messages for transactions are desired. msg.DisableRelayTx = config.ActiveConfig().BlocksOnly - isOpen, err := outgoingRoute.EnqueueWithTimeout(msg, timeout) - if err != nil { - return false, err - } + isOpen := outgoingRoute.Enqueue(msg) if !isOpen { return true, nil } diff --git a/protocol/flows/ibd/handle_get_block_locator.go b/protocol/flows/ibd/handle_get_block_locator.go new file mode 100644 index 000000000..6156b6bea --- /dev/null +++ b/protocol/flows/ibd/handle_get_block_locator.go @@ -0,0 +1,51 @@ +package ibd + +import ( + "github.com/kaspanet/kaspad/blockdag" + "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/protocol/protocolerrors" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/kaspanet/kaspad/wire" +) + +// HandleGetBlockLocator handles getBlockLocator messages +func HandleGetBlockLocator(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error { + for { + lowHash, highHash, shouldStop, err := receiveGetBlockLocator(incomingRoute) + if err != nil { + return err + } + if shouldStop { + return nil + } + + locator, err := dag.BlockLocatorFromHashes(highHash, lowHash) + if err != nil || len(locator) == 0 { + return protocolerrors.Errorf(true, "couldn't build a block "+ + "locator between blocks %s and %s", lowHash, highHash) + } + + shouldStop = sendBlockLocator(outgoingRoute, locator) + if shouldStop { + return nil + } + } +} + +func receiveGetBlockLocator(incomingRoute *router.Route) (lowHash *daghash.Hash, + highHash *daghash.Hash, shouldStop bool, err error) { + + message, isOpen := incomingRoute.Dequeue() + if !isOpen { + return nil, nil, true, nil + } + msgGetBlockLocator := message.(*wire.MsgGetBlockLocator) + + return msgGetBlockLocator.LowHash, msgGetBlockLocator.HighHash, false, nil +} + +func sendBlockLocator(outgoingRoute *router.Route, locator blockdag.BlockLocator) (shouldStop bool) { + msgBlockLocator := wire.NewMsgBlockLocator(locator) + isOpen := outgoingRoute.Enqueue(msgBlockLocator) + return !isOpen +} diff --git a/protocol/flows/ibd/handle_get_blocks.go b/protocol/flows/ibd/handle_get_blocks.go new file mode 100644 index 000000000..95da52cc7 --- /dev/null +++ b/protocol/flows/ibd/handle_get_blocks.go @@ -0,0 +1,74 @@ +package ibd + +import ( + "github.com/kaspanet/kaspad/blockdag" + "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/kaspanet/kaspad/wire" +) + +// HandleGetBlocks handles getBlocks messages +func HandleGetBlocks(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error { + for { + lowHash, highHash, shouldStop, err := receiveGetBlocks(incomingRoute) + if err != nil { + return err + } + if shouldStop { + return nil + } + + msgIBDBlocks, err := buildMsgIBDBlocks(lowHash, highHash, dag) + if err != nil { + return err + } + + shouldStop = sendMsgIBDBlocks(outgoingRoute, msgIBDBlocks) + if shouldStop { + return nil + } + } +} + +func receiveGetBlocks(incomingRoute *router.Route) (lowHash *daghash.Hash, + highHash *daghash.Hash, shouldStop bool, err error) { + + message, isOpen := incomingRoute.Dequeue() + if !isOpen { + return nil, nil, true, nil + } + msgGetBlocks := message.(*wire.MsgGetBlocks) + + return msgGetBlocks.LowHash, msgGetBlocks.HighHash, false, nil +} + +func buildMsgIBDBlocks(lowHash *daghash.Hash, highHash *daghash.Hash, + dag *blockdag.BlockDAG) ([]*wire.MsgIBDBlock, error) { + + const maxHashesInMsgIBDBlocks = wire.MaxInvPerMsg + blockHashes, err := dag.AntiPastHashesBetween(lowHash, highHash, maxHashesInMsgIBDBlocks) + if err != nil { + return nil, err + } + + msgIBDBlocks := make([]*wire.MsgIBDBlock, len(blockHashes)) + for i, blockHash := range blockHashes { + block, err := dag.BlockByHash(blockHash) + if err != nil { + return nil, err + } + msgIBDBlocks[i] = wire.NewMsgIBDBlock(block.MsgBlock()) + } + + return msgIBDBlocks, nil +} + +func sendMsgIBDBlocks(outgoingRoute *router.Route, msgIBDBlocks []*wire.MsgIBDBlock) (shouldStop bool) { + for _, msgIBDBlock := range msgIBDBlocks { + isOpen := outgoingRoute.Enqueue(msgIBDBlock) + if !isOpen { + return true + } + } + return false +} diff --git a/protocol/flows/ibd/ibd.go b/protocol/flows/ibd/ibd.go new file mode 100644 index 000000000..f859e2039 --- /dev/null +++ b/protocol/flows/ibd/ibd.go @@ -0,0 +1,237 @@ +package ibd + +import ( + "github.com/kaspanet/kaspad/blockdag" + "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/protocol/common" + peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/protocol/protocolerrors" + "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/kaspanet/kaspad/wire" + "sync" + "sync/atomic" +) + +var ( + isIBDRunning uint32 + startIBDMutex sync.Mutex +) + +// StartIBDIfRequired selects a peer and starts IBD against it +// if required +func StartIBDIfRequired(dag *blockdag.BlockDAG) { + startIBDMutex.Lock() + defer startIBDMutex.Unlock() + + if IsInIBD() { + return + } + + peer := selectPeerForIBD(dag) + if peer == nil { + requestSelectedTipsIfRequired(dag) + return + } + + atomic.StoreUint32(&isIBDRunning, 1) + peer.StartIBD() +} + +// IsInIBD is true if IBD is currently running +func IsInIBD() bool { + return atomic.LoadUint32(&isIBDRunning) != 0 +} + +// selectPeerForIBD returns the first peer whose selected tip +// hash is not in our DAG +func selectPeerForIBD(dag *blockdag.BlockDAG) *peerpkg.Peer { + for _, peer := range peerpkg.ReadyPeers() { + peerSelectedTipHash := peer.SelectedTipHash() + if !dag.IsInDAG(peerSelectedTipHash) { + return peer + } + } + return nil +} + +// HandleIBD waits for IBD start and handles it when IBD is triggered for this peer +func HandleIBD(incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer, dag *blockdag.BlockDAG) error { + + for { + shouldStop, err := runIBD(incomingRoute, outgoingRoute, peer, dag) + if err != nil { + return err + } + if shouldStop { + return nil + } + } +} + +func runIBD(incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer, dag *blockdag.BlockDAG) (shouldStop bool, err error) { + + peer.WaitForIBDStart() + defer finishIBD(dag) + + peerSelectedTipHash := peer.SelectedTipHash() + highestSharedBlockHash, shouldStop, err := findHighestSharedBlockHash(incomingRoute, outgoingRoute, dag, peerSelectedTipHash) + if err != nil { + return false, err + } + if shouldStop { + return true, nil + } + if dag.IsKnownFinalizedBlock(highestSharedBlockHash) { + return false, protocolerrors.Errorf(false, "cannot initiate "+ + "IBD with peer %s because the highest shared chain block (%s) is "+ + "below the finality point", peer, highestSharedBlockHash) + } + + shouldStop, err = downloadBlocks(incomingRoute, outgoingRoute, dag, highestSharedBlockHash, peerSelectedTipHash) + if err != nil { + return false, err + } + return shouldStop, nil +} + +func findHighestSharedBlockHash(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG, + peerSelectedTipHash *daghash.Hash) (lowHash *daghash.Hash, shouldStop bool, err error) { + + lowHash = dag.Params.GenesisHash + highHash := peerSelectedTipHash + + for { + shouldStop = sendGetBlockLocator(outgoingRoute, lowHash, highHash) + if shouldStop { + return nil, true, nil + } + + blockLocatorHashes, shouldStop, err := receiveBlockLocator(incomingRoute) + if err != nil { + return nil, false, err + } + if shouldStop { + return nil, true, nil + } + + // We check whether the locator's highest hash is in the local DAG. + // If it is, return it. If it isn't, we need to narrow our + // getBlockLocator request and try again. + locatorHighHash := blockLocatorHashes[0] + if dag.IsInDAG(locatorHighHash) { + return locatorHighHash, false, nil + } + + highHash, lowHash = dag.FindNextLocatorBoundaries(blockLocatorHashes) + } +} + +func sendGetBlockLocator(outgoingRoute *router.Route, lowHash *daghash.Hash, + highHash *daghash.Hash) (shouldStop bool) { + + msgGetBlockLocator := wire.NewMsgGetBlockLocator(highHash, lowHash) + isOpen := outgoingRoute.Enqueue(msgGetBlockLocator) + return !isOpen +} + +func receiveBlockLocator(incomingRoute *router.Route) (blockLocatorHashes []*daghash.Hash, + shouldStop bool, err error) { + + message, isOpen, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + if err != nil { + return nil, false, err + } + if !isOpen { + return nil, true, nil + } + msgBlockLocator, ok := message.(*wire.MsgBlockLocator) + if !ok { + return nil, false, + protocolerrors.Errorf(true, "received unexpected message type. "+ + "expected: %s, got: %s", wire.CmdBlockLocator, message.Command()) + } + return msgBlockLocator.BlockLocatorHashes, false, nil +} + +func downloadBlocks(incomingRoute *router.Route, outgoingRoute *router.Route, + dag *blockdag.BlockDAG, highestSharedBlockHash *daghash.Hash, peerSelectedTipHash *daghash.Hash) (shouldStop bool, err error) { + + shouldStop = sendGetBlocks(outgoingRoute, highestSharedBlockHash, peerSelectedTipHash) + if shouldStop { + return true, nil + } + + for { + msgIBDBlock, shouldStop, err := receiveIBDBlock(incomingRoute) + if err != nil { + return false, err + } + if shouldStop { + return true, nil + } + shouldStop, err = processIBDBlock(dag, msgIBDBlock) + if err != nil { + return false, err + } + if shouldStop { + return true, nil + } + if msgIBDBlock.BlockHash().IsEqual(peerSelectedTipHash) { + return true, nil + } + } +} + +func sendGetBlocks(outgoingRoute *router.Route, highestSharedBlockHash *daghash.Hash, + peerSelectedTipHash *daghash.Hash) (shouldStop bool) { + + msgGetBlockInvs := wire.NewMsgGetBlocks(highestSharedBlockHash, peerSelectedTipHash) + isOpen := outgoingRoute.Enqueue(msgGetBlockInvs) + return !isOpen +} + +func receiveIBDBlock(incomingRoute *router.Route) (msgIBDBlock *wire.MsgIBDBlock, shouldStop bool, err error) { + message, isOpen, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + if err != nil { + return nil, false, err + } + if !isOpen { + return nil, true, nil + } + msgIBDBlock, ok := message.(*wire.MsgIBDBlock) + if !ok { + return nil, false, + protocolerrors.Errorf(true, "received unexpected message type. "+ + "expected: %s, got: %s", wire.CmdIBDBlock, message.Command()) + } + return msgIBDBlock, false, nil +} + +func processIBDBlock(dag *blockdag.BlockDAG, msgIBDBlock *wire.MsgIBDBlock) (shouldStop bool, err error) { + block := util.NewBlock(&msgIBDBlock.MsgBlock) + if dag.IsInDAG(block.Hash()) { + return false, nil + } + isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNone) + if err != nil { + return false, err + } + if isOrphan { + return false, protocolerrors.Errorf(true, "received orphan block %s "+ + "during IBD", block.Hash()) + } + if isDelayed { + return false, protocolerrors.Errorf(false, "received delayed block %s "+ + "during IBD", block.Hash()) + } + return false, nil +} + +func finishIBD(dag *blockdag.BlockDAG) { + atomic.StoreUint32(&isIBDRunning, 0) + + StartIBDIfRequired(dag) +} diff --git a/protocol/flows/ibd/log.go b/protocol/flows/ibd/log.go new file mode 100644 index 000000000..570740c06 --- /dev/null +++ b/protocol/flows/ibd/log.go @@ -0,0 +1,9 @@ +package ibd + +import ( + "github.com/kaspanet/kaspad/logger" + "github.com/kaspanet/kaspad/util/panics" +) + +var log, _ = logger.Get(logger.SubsystemTags.IBDS) +var spawn = panics.GoroutineWrapperFunc(log) diff --git a/protocol/flows/ibd/selected_tip.go b/protocol/flows/ibd/selected_tip.go new file mode 100644 index 000000000..363f4f9dc --- /dev/null +++ b/protocol/flows/ibd/selected_tip.go @@ -0,0 +1,127 @@ +package ibd + +import ( + "github.com/kaspanet/kaspad/blockdag" + "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/protocol/common" + peerpkg "github.com/kaspanet/kaspad/protocol/peer" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/kaspanet/kaspad/wire" + "github.com/pkg/errors" + "time" +) + +const minDurationToRequestSelectedTips = time.Minute + +func requestSelectedTipsIfRequired(dag *blockdag.BlockDAG) { + if isDAGTimeCurrent(dag) { + return + } + requestSelectedTips() +} + +func isDAGTimeCurrent(dag *blockdag.BlockDAG) bool { + return dag.Now().Sub(dag.SelectedTipHeader().Timestamp) > minDurationToRequestSelectedTips +} + +func requestSelectedTips() { + for _, peer := range peerpkg.ReadyPeers() { + peer.RequestSelectedTipIfRequired() + } +} + +// RequestSelectedTip waits for selected tip requests and handles them +func RequestSelectedTip(incomingRoute *router.Route, + outgoingRoute *router.Route, peer *peerpkg.Peer, dag *blockdag.BlockDAG) error { + for { + shouldStop, err := runSelectedTipRequest(incomingRoute, outgoingRoute, peer, dag) + if err != nil { + return err + } + if shouldStop { + return nil + } + } +} + +func runSelectedTipRequest(incomingRoute *router.Route, outgoingRoute *router.Route, + peer *peerpkg.Peer, dag *blockdag.BlockDAG) (shouldStop bool, err error) { + + peer.WaitForSelectedTipRequests() + defer peer.FinishRequestingSelectedTip() + + shouldStop = requestSelectedTip(outgoingRoute) + if shouldStop { + return true, nil + } + + peerSelectedTipHash, shouldStop, err := receiveSelectedTip(incomingRoute) + if err != nil { + return false, err + } + if shouldStop { + return true, nil + } + + peer.SetSelectedTipHash(peerSelectedTipHash) + StartIBDIfRequired(dag) + return false, nil +} + +func requestSelectedTip(outgoingRoute *router.Route) (shouldStop bool) { + msgGetSelectedTip := wire.NewMsgGetSelectedTip() + isOpen := outgoingRoute.Enqueue(msgGetSelectedTip) + return !isOpen +} + +func receiveSelectedTip(incomingRoute *router.Route) (selectedTipHash *daghash.Hash, shouldStop bool, err error) { + message, isOpen, err := incomingRoute.DequeueWithTimeout(common.DefaultTimeout) + if err != nil { + return nil, false, err + } + if !isOpen { + return nil, true, nil + } + msgSelectedTip := message.(*wire.MsgSelectedTip) + + return msgSelectedTip.SelectedTipHash, false, nil +} + +// HandleGetSelectedTip handles getSelectedTip messages +func HandleGetSelectedTip(incomingRoute *router.Route, outgoingRoute *router.Route, dag *blockdag.BlockDAG) error { + for { + shouldStop, err := receiveGetSelectedTip(incomingRoute) + if err != nil { + return err + } + if shouldStop { + return nil + } + + selectedTipHash := dag.SelectedTipHash() + shouldStop = sendSelectedTipHash(outgoingRoute, selectedTipHash) + if shouldStop { + return nil + } + } +} + +func receiveGetSelectedTip(incomingRoute *router.Route) (shouldStop bool, err error) { + message, isOpen := incomingRoute.Dequeue() + if !isOpen { + return true, nil + } + _, ok := message.(*wire.MsgGetSelectedTip) + if !ok { + panic(errors.Errorf("received unexpected message type. "+ + "expected: %s, got: %s", wire.CmdGetSelectedTip, message.Command())) + } + + return false, nil +} + +func sendSelectedTipHash(outgoingRoute *router.Route, selectedTipHash *daghash.Hash) (shouldStop bool) { + msgSelectedTip := wire.NewMsgSelectedTip(selectedTipHash) + isOpen := outgoingRoute.Enqueue(msgSelectedTip) + return !isOpen +} diff --git a/protocol/flows/ping/ping.go b/protocol/flows/ping/ping.go index e215c46a8..35824182c 100644 --- a/protocol/flows/ping/ping.go +++ b/protocol/flows/ping/ping.go @@ -23,10 +23,7 @@ func ReceivePings(incomingRoute *router.Route, outgoingRoute *router.Route) erro pingMessage := message.(*wire.MsgPing) pongMessage := wire.NewMsgPong(pingMessage.Nonce) - isOpen, err := outgoingRoute.EnqueueWithTimeout(pongMessage, pingTimeout) - if err != nil { - return err - } + isOpen = outgoingRoute.Enqueue(pongMessage) if !isOpen { return nil } @@ -49,10 +46,7 @@ func SendPings(incomingRoute *router.Route, outgoingRoute *router.Route, peer *p peer.SetPingPending(nonce) pingMessage := wire.NewMsgPing(nonce) - isOpen, err := outgoingRoute.EnqueueWithTimeout(pingMessage, pingTimeout) - if err != nil { - return err - } + isOpen := outgoingRoute.Enqueue(pingMessage) if !isOpen { return nil } diff --git a/protocol/peer/peer.go b/protocol/peer/peer.go index 82e17719d..134cf7de3 100644 --- a/protocol/peer/peer.go +++ b/protocol/peer/peer.go @@ -8,6 +8,7 @@ import ( "github.com/kaspanet/kaspad/netadapter/id" "github.com/kaspanet/kaspad/util/daghash" mathUtil "github.com/kaspanet/kaspad/util/math" + "github.com/kaspanet/kaspad/util/mstime" "github.com/kaspanet/kaspad/util/subnetworkid" "github.com/kaspanet/kaspad/wire" "github.com/pkg/errors" @@ -15,8 +16,6 @@ import ( // Peer holds data about a peer. type Peer struct { - ready uint32 - selectedTipHashMtx sync.RWMutex selectedTipHash *daghash.Hash @@ -32,52 +31,45 @@ type Peer struct { lastPingNonce uint64 // The nonce of the last ping we sent lastPingTime time.Time // Time we sent last ping lastPingDuration time.Duration // Time for last ping to return + + isSelectedTipRequested uint32 + selectedTipRequestChan chan struct{} + lastSelectedTipRequest mstime.Time + + ibdStartChan chan struct{} +} + +// New returns a new Peer +func New() *Peer { + return &Peer{ + selectedTipRequestChan: make(chan struct{}), + ibdStartChan: make(chan struct{}), + } } // SelectedTipHash returns the selected tip of the peer. -func (p *Peer) SelectedTipHash() (*daghash.Hash, error) { - if atomic.LoadUint32(&p.ready) == 0 { - return nil, errors.New("peer is not ready yet") - } +func (p *Peer) SelectedTipHash() *daghash.Hash { p.selectedTipHashMtx.RLock() defer p.selectedTipHashMtx.RUnlock() - return p.selectedTipHash, nil + return p.selectedTipHash } // SetSelectedTipHash sets the selected tip of the peer. -func (p *Peer) SetSelectedTipHash(hash *daghash.Hash) error { - if atomic.LoadUint32(&p.ready) == 0 { - return errors.New("peer is not ready yet") - } +func (p *Peer) SetSelectedTipHash(hash *daghash.Hash) { p.selectedTipHashMtx.Lock() defer p.selectedTipHashMtx.Unlock() p.selectedTipHash = hash - return nil } // SubnetworkID returns the subnetwork the peer is associated with. // It is nil in full nodes. -func (p *Peer) SubnetworkID() (*subnetworkid.SubnetworkID, error) { - if atomic.LoadUint32(&p.ready) == 0 { - return nil, errors.New("peer is not ready yet") - } - return p.subnetworkID, nil +func (p *Peer) SubnetworkID() *subnetworkid.SubnetworkID { + return p.subnetworkID } // ID returns the peer ID. -func (p *Peer) ID() (*id.ID, error) { - if atomic.LoadUint32(&p.ready) == 0 { - return nil, errors.New("peer is not ready yet") - } - return p.id, nil -} - -// MarkAsReady marks the peer as ready. -func (p *Peer) MarkAsReady() error { - if atomic.AddUint32(&p.ready, 1) != 1 { - return errors.New("peer is already ready") - } - return nil +func (p *Peer) ID() *id.ID { + return p.id } // UpdateFieldsFromMsgVersion updates the peer with the data from the version message. @@ -143,11 +135,6 @@ func AddToReadyPeers(peer *Peer) error { return errors.Wrapf(ErrPeerWithSameIDExists, "peer with ID %s already exists", peer.id) } - err := peer.MarkAsReady() - if err != nil { - return err - } - readyPeers[peer.id] = peer return nil } @@ -165,8 +152,51 @@ func GetReadyPeerIDs() []*id.ID { return peerIDs } -// IDExists returns whether there's a peer with the given ID. -func IDExists(peerID *id.ID) bool { - _, ok := readyPeers[peerID] - return ok +// ReadyPeers returns a copy of the currently ready peers +func ReadyPeers() []*Peer { + peers := make([]*Peer, 0, len(readyPeers)) + for _, readyPeer := range readyPeers { + peers = append(peers, readyPeer) + } + return peers +} + +// RequestSelectedTipIfRequired notifies the peer that requesting +// a selected tip is required. This triggers the selected tip +// request flow. +func (p *Peer) RequestSelectedTipIfRequired() { + if atomic.SwapUint32(&p.isSelectedTipRequested, 1) != 0 { + return + } + + const minGetSelectedTipInterval = time.Minute + if mstime.Since(p.lastSelectedTipRequest) < minGetSelectedTipInterval { + return + } + + p.lastSelectedTipRequest = mstime.Now() + p.selectedTipRequestChan <- struct{}{} +} + +// WaitForSelectedTipRequests blocks the current thread until +// a selected tip is requested from this peer +func (p *Peer) WaitForSelectedTipRequests() { + <-p.selectedTipRequestChan +} + +// FinishRequestingSelectedTip finishes requesting the selected +// tip from this peer +func (p *Peer) FinishRequestingSelectedTip() { + atomic.StoreUint32(&p.isSelectedTipRequested, 0) +} + +// StartIBD starts the IBD process for this peer +func (p *Peer) StartIBD() { + p.ibdStartChan <- struct{}{} +} + +// WaitForIBDStart blocks the current thread until +// IBD start is requested from this peer +func (p *Peer) WaitForIBDStart() { + <-p.ibdStartChan } diff --git a/protocol/protocol.go b/protocol/protocol.go index c47a1e3a9..1855f8328 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -12,6 +12,7 @@ import ( "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/netadapter" routerpkg "github.com/kaspanet/kaspad/netadapter/router" + "github.com/kaspanet/kaspad/protocol/flows/ibd" "github.com/kaspanet/kaspad/protocol/flows/ping" peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/protocol/protocolerrors" @@ -58,15 +59,13 @@ func newRouterInitializer(netAdapter *netadapter.NetAdapter, } } -func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag *blockdag.BlockDAG, - addressManager *addrmgr.AddrManager) error { +func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, + dag *blockdag.BlockDAG, addressManager *addrmgr.AddrManager) error { + stop := make(chan error) stopped := uint32(0) - outgoingRoute := router.OutgoingRoute() - peer := new(peerpkg.Peer) - - closed, err := handshake.HandleHandshake(router, netAdapter, peer, dag, addressManager) + peer, closed, err := handshake.HandleHandshake(router, netAdapter, dag, addressManager) if err != nil { return err } @@ -74,44 +73,101 @@ func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag return nil } - addOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdGetAddresses}, &stopped, stop, + addAddressFlows(router, &stopped, stop, peer, addressManager) + addBlockRelayFlows(netAdapter, router, &stopped, stop, peer, dag) + addPingFlows(router, &stopped, stop, peer) + addIBDFlows(router, &stopped, stop, peer, dag) + + err = <-stop + return err +} + +func addAddressFlows(router *routerpkg.Router, stopped *uint32, stop chan error, + peer *peerpkg.Peer, addressManager *addrmgr.AddrManager) { + + outgoingRoute := router.OutgoingRoute() + + addOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdGetAddresses}, stopped, stop, func(incomingRoute *routerpkg.Route) (routeClosed bool, err error) { return addressexchange.SendAddresses(incomingRoute, outgoingRoute, addressManager) }, ) - addOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddress}, &stopped, stop, + addOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddress}, stopped, stop, func(incomingRoute *routerpkg.Route) (routeClosed bool, err error) { return addressexchange.ReceiveAddresses(incomingRoute, outgoingRoute, peer, addressManager) }, ) +} - addFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, &stopped, stop, +func addBlockRelayFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, + stopped *uint32, stop chan error, peer *peerpkg.Peer, dag *blockdag.BlockDAG) { + + outgoingRoute := router.OutgoingRoute() + + addFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, stopped, stop, func(incomingRoute *routerpkg.Route) error { return blockrelay.HandleRelayInvs(incomingRoute, outgoingRoute, peer, netAdapter, dag) }, ) - addFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdGetRelayBlocks}, &stopped, stop, + addFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdGetRelayBlocks}, stopped, stop, func(incomingRoute *routerpkg.Route) error { return blockrelay.HandleRelayBlockRequests(incomingRoute, outgoingRoute, peer, dag) }, ) +} - addFlow("ReceivePings", router, []wire.MessageCommand{wire.CmdPing}, &stopped, stop, +func addPingFlows(router *routerpkg.Router, stopped *uint32, stop chan error, peer *peerpkg.Peer) { + outgoingRoute := router.OutgoingRoute() + + addFlow("ReceivePings", router, []wire.MessageCommand{wire.CmdPing}, stopped, stop, func(incomingRoute *routerpkg.Route) error { return ping.ReceivePings(incomingRoute, outgoingRoute) }, ) - addFlow("SendPings", router, []wire.MessageCommand{wire.CmdPong}, &stopped, stop, + addFlow("SendPings", router, []wire.MessageCommand{wire.CmdPong}, stopped, stop, func(incomingRoute *routerpkg.Route) error { return ping.SendPings(incomingRoute, outgoingRoute, peer) }, ) +} - err = <-stop - return err +func addIBDFlows(router *routerpkg.Router, stopped *uint32, stop chan error, + peer *peerpkg.Peer, dag *blockdag.BlockDAG) { + + outgoingRoute := router.OutgoingRoute() + + addFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock}, stopped, stop, + func(incomingRoute *routerpkg.Route) error { + return ibd.HandleIBD(incomingRoute, outgoingRoute, peer, dag) + }, + ) + + addFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, stopped, stop, + func(incomingRoute *routerpkg.Route) error { + return ibd.RequestSelectedTip(incomingRoute, outgoingRoute, peer, dag) + }, + ) + + addFlow("HandleGetSelectedTip", router, []wire.MessageCommand{wire.CmdGetSelectedTip}, stopped, stop, + func(incomingRoute *routerpkg.Route) error { + return ibd.HandleGetSelectedTip(incomingRoute, outgoingRoute, dag) + }, + ) + + addFlow("HandleGetBlockLocator", router, []wire.MessageCommand{wire.CmdGetBlockLocator}, stopped, stop, + func(incomingRoute *routerpkg.Route) error { + return ibd.HandleGetBlockLocator(incomingRoute, outgoingRoute, dag) + }, + ) + + addFlow("HandleGetBlocks", router, []wire.MessageCommand{wire.CmdGetBlocks}, stopped, stop, + func(incomingRoute *routerpkg.Route) error { + return ibd.HandleGetBlocks(incomingRoute, outgoingRoute, dag) + }, + ) } func addFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageCommand, stopped *uint32, diff --git a/server/p2p/on_get_block_invs.go b/server/p2p/on_get_block_invs.go index 1fdd0b7e7..828c39170 100644 --- a/server/p2p/on_get_block_invs.go +++ b/server/p2p/on_get_block_invs.go @@ -10,7 +10,7 @@ import ( // message. // It finds the blue future between msg.LowHash and msg.HighHash // and send the invs to the requesting peer. -func (sp *Peer) OnGetBlockInvs(_ *peer.Peer, msg *wire.MsgGetBlockInvs) { +func (sp *Peer) OnGetBlockInvs(_ *peer.Peer, msg *wire.MsgGetBlocks) { dag := sp.server.DAG // We want to prevent a situation where the syncing peer needs // to call getblocks once again, but the block we sent it @@ -24,7 +24,7 @@ func (sp *Peer) OnGetBlockInvs(_ *peer.Peer, msg *wire.MsgGetBlockInvs) { hashList, err := dag.AntiPastHashesBetween(msg.LowHash, msg.HighHash, wire.MaxInvPerMsg) if err != nil { - sp.AddBanScoreAndPushRejectMsg(wire.CmdGetBlockInvs, wire.RejectInvalid, nil, + sp.AddBanScoreAndPushRejectMsg(wire.CmdGetBlocks, wire.RejectInvalid, nil, peer.BanScoreInvalidMsgGetBlockInvs, 0, fmt.Sprintf("error getting antiPast hashes between %s and %s: %s", msg.LowHash, msg.HighHash, err)) return diff --git a/wire/bench_test.go b/wire/bench_test.go index 39fe235a8..72ae71fd6 100644 --- a/wire/bench_test.go +++ b/wire/bench_test.go @@ -378,19 +378,19 @@ func BenchmarkWriteBlockHeader(b *testing.B) { // decode a getblockinvs message. func BenchmarkDecodeGetBlockInvs(b *testing.B) { pver := ProtocolVersion - var m MsgGetBlockInvs + var m MsgGetBlocks m.LowHash = &daghash.Hash{1} m.HighHash = &daghash.Hash{1} // Serialize it so the bytes are available to test the decode below. var bb bytes.Buffer if err := m.KaspaEncode(&bb, pver); err != nil { - b.Fatalf("MsgGetBlockInvs.KaspaEncode: unexpected error: %v", err) + b.Fatalf("MsgGetBlocks.KaspaEncode: unexpected error: %v", err) } buf := bb.Bytes() r := bytes.NewReader(buf) - var msg MsgGetBlockInvs + var msg MsgGetBlocks b.ResetTimer() for i := 0; i < b.N; i++ { r.Seek(0, 0) diff --git a/wire/message.go b/wire/message.go index 76bad9af0..592eee18d 100644 --- a/wire/message.go +++ b/wire/message.go @@ -40,7 +40,7 @@ const ( CmdVerAck MessageCommand = 1 CmdGetAddresses MessageCommand = 2 CmdAddress MessageCommand = 3 - CmdGetBlockInvs MessageCommand = 4 + CmdGetBlocks MessageCommand = 4 CmdInv MessageCommand = 5 CmdGetData MessageCommand = 6 CmdNotFound MessageCommand = 7 @@ -61,6 +61,7 @@ const ( CmdInvRelayBlock MessageCommand = 22 CmdGetRelayBlocks MessageCommand = 23 CmdRejectMalformed MessageCommand = 24 // Used only for reject message + CmdIBDBlock MessageCommand = 25 ) var messageCommandToString = map[MessageCommand]string{ @@ -68,7 +69,7 @@ var messageCommandToString = map[MessageCommand]string{ CmdVerAck: "VerAck", CmdGetAddresses: "GetAddress", CmdAddress: "Address", - CmdGetBlockInvs: "GetBlockInvs", + CmdGetBlocks: "GetBlocks", CmdInv: "Inv", CmdGetData: "GetData", CmdNotFound: "NotFound", @@ -89,6 +90,7 @@ var messageCommandToString = map[MessageCommand]string{ CmdInvRelayBlock: "InvRelayBlock", CmdGetRelayBlocks: "GetRelayBlocks", CmdRejectMalformed: "RejectMalformed", + CmdIBDBlock: "IBDBlock", } // Message is an interface that describes a kaspa message. A type that @@ -119,8 +121,8 @@ func MakeEmptyMessage(command MessageCommand) (Message, error) { case CmdAddress: msg = &MsgAddresses{} - case CmdGetBlockInvs: - msg = &MsgGetBlockInvs{} + case CmdGetBlocks: + msg = &MsgGetBlocks{} case CmdBlock: msg = &MsgBlock{} @@ -173,6 +175,9 @@ func MakeEmptyMessage(command MessageCommand) (Message, error) { case CmdSelectedTip: msg = &MsgSelectedTip{} + case CmdIBDBlock: + msg = &MsgIBDBlock{} + default: return nil, errors.Errorf("unhandled command [%s]", command) } diff --git a/wire/message_test.go b/wire/message_test.go index 639314c41..54b325a03 100644 --- a/wire/message_test.go +++ b/wire/message_test.go @@ -57,7 +57,7 @@ func TestMessage(t *testing.T) { msgVerack := NewMsgVerAck() msgGetAddresses := NewMsgGetAddresses(false, nil) msgAddresses := NewMsgAddresses(false, nil) - msgGetBlockInvs := NewMsgGetBlockInvs(&daghash.Hash{}, &daghash.Hash{}) + msgGetBlockInvs := NewMsgGetBlocks(&daghash.Hash{}, &daghash.Hash{}) msgBlock := &blockOne msgInv := NewMsgInv() msgGetData := NewMsgGetData() @@ -66,7 +66,7 @@ func TestMessage(t *testing.T) { msgPing := NewMsgPing(123123) msgPong := NewMsgPong(123123) msgGetBlockLocator := NewMsgGetBlockLocator(&daghash.ZeroHash, &daghash.ZeroHash) - msgBlockLocator := NewMsgBlockLocator() + msgBlockLocator := NewMsgBlockLocator([]*daghash.Hash{}) msgFeeFilter := NewMsgFeeFilter(123456) msgFilterAdd := NewMsgFilterAdd([]byte{0x01}) msgFilterClear := NewMsgFilterClear() diff --git a/wire/msgblocklocator.go b/wire/msgblocklocator.go index 0ac85a7d7..0b354df55 100644 --- a/wire/msgblocklocator.go +++ b/wire/msgblocklocator.go @@ -18,18 +18,6 @@ type MsgBlockLocator struct { BlockLocatorHashes []*daghash.Hash } -// AddBlockLocatorHash adds a new block locator hash to the message. -func (msg *MsgBlockLocator) AddBlockLocatorHash(hash *daghash.Hash) error { - if len(msg.BlockLocatorHashes) >= MaxBlockLocatorsPerMsg { - str := fmt.Sprintf("too many block locator hashes for message [max %d]", - MaxBlockLocatorsPerMsg) - return messageError("MsgBlockLocator.AddBlockLocatorHash", str) - } - - msg.BlockLocatorHashes = append(msg.BlockLocatorHashes, hash) - return nil -} - // KaspaDecode decodes r using the kaspa protocol encoding into the receiver. // This is part of the Message interface implementation. func (msg *MsgBlockLocator) KaspaDecode(r io.Reader, pver uint32) error { @@ -54,7 +42,8 @@ func (msg *MsgBlockLocator) KaspaDecode(r io.Reader, pver uint32) error { if err != nil { return err } - err = msg.AddBlockLocatorHash(hash) + msg.BlockLocatorHashes = append(msg.BlockLocatorHashes, hash) + if err != nil { return err } @@ -104,9 +93,8 @@ func (msg *MsgBlockLocator) MaxPayloadLength(pver uint32) uint32 { // NewMsgBlockLocator returns a new kaspa locator message that conforms to // the Message interface. See MsgBlockLocator for details. -func NewMsgBlockLocator() *MsgBlockLocator { +func NewMsgBlockLocator(locatorHashes []*daghash.Hash) *MsgBlockLocator { return &MsgBlockLocator{ - BlockLocatorHashes: make([]*daghash.Hash, 0, - MaxBlockLocatorsPerMsg), + BlockLocatorHashes: locatorHashes, } } diff --git a/wire/msgblocklocator_test.go b/wire/msgblocklocator_test.go index b89fec1f7..d7b2a184c 100644 --- a/wire/msgblocklocator_test.go +++ b/wire/msgblocklocator_test.go @@ -21,7 +21,7 @@ func TestBlockLocator(t *testing.T) { t.Errorf("NewHashFromStr: %v", err) } - msg := NewMsgBlockLocator() + msg := NewMsgBlockLocator([]*daghash.Hash{locatorHash}) // Ensure the command is expected value. wantCmd := MessageCommand(19) @@ -42,26 +42,12 @@ func TestBlockLocator(t *testing.T) { } // Ensure block locator hashes are added properly. - err = msg.AddBlockLocatorHash(locatorHash) - if err != nil { - t.Errorf("AddBlockLocatorHash: %v", err) - } if msg.BlockLocatorHashes[0] != locatorHash { t.Errorf("AddBlockLocatorHash: wrong block locator added - "+ "got %v, want %v", spew.Sprint(msg.BlockLocatorHashes[0]), spew.Sprint(locatorHash)) } - - // Ensure adding more than the max allowed block locator hashes per - // message returns an error. - for i := 0; i < MaxBlockLocatorsPerMsg; i++ { - err = msg.AddBlockLocatorHash(locatorHash) - } - if err == nil { - t.Errorf("AddBlockLocatorHash: expected error on too many " + - "block locator hashes not received") - } } // TestBlockLocatorWire tests the MsgBlockLocator wire encode and decode for various @@ -80,15 +66,13 @@ func TestBlockLocatorWire(t *testing.T) { } // MsgBlockLocator message with no block locators. - noLocators := NewMsgBlockLocator() + noLocators := NewMsgBlockLocator([]*daghash.Hash{}) noLocatorsEncoded := []byte{ 0x00, // Varint for number of block locator hashes } // MsgBlockLocator message with multiple block locators. - multiLocators := NewMsgBlockLocator() - multiLocators.AddBlockLocatorHash(hashLocator2) - multiLocators.AddBlockLocatorHash(hashLocator) + multiLocators := NewMsgBlockLocator([]*daghash.Hash{hashLocator2, hashLocator}) multiLocatorsEncoded := []byte{ 0x02, // Varint for number of block locator hashes 0xe0, 0xde, 0x06, 0x44, 0x68, 0x13, 0x2c, 0x63, @@ -177,9 +161,7 @@ func TestBlockLocatorWireErrors(t *testing.T) { } // MsgBlockLocator message with multiple block locators and a low hash. - baseGetBlocks := NewMsgBlockLocator() - baseGetBlocks.AddBlockLocatorHash(hashLocator2) - baseGetBlocks.AddBlockLocatorHash(hashLocator) + baseGetBlocks := NewMsgBlockLocator([]*daghash.Hash{hashLocator2, hashLocator}) baseGetBlocksEncoded := []byte{ 0x02, // Varint for number of block locator hashes 0xe0, 0xde, 0x06, 0x44, 0x68, 0x13, 0x2c, 0x63, @@ -194,10 +176,11 @@ func TestBlockLocatorWireErrors(t *testing.T) { // Message that forces an error by having more than the max allowed // block locator hashes. - maxGetBlocks := NewMsgBlockLocator() + maxLocaterHashesSlice := make([]*daghash.Hash, MaxBlockLocatorsPerMsg) for i := 0; i < MaxBlockLocatorsPerMsg; i++ { - maxGetBlocks.AddBlockLocatorHash(mainnetGenesisHash) + maxLocaterHashesSlice[i] = mainnetGenesisHash } + maxGetBlocks := NewMsgBlockLocator(maxLocaterHashesSlice) maxGetBlocks.BlockLocatorHashes = append(maxGetBlocks.BlockLocatorHashes, mainnetGenesisHash) maxGetBlocksEncoded := []byte{ diff --git a/wire/msggetblockinvs.go b/wire/msggetblocks.go similarity index 66% rename from wire/msggetblockinvs.go rename to wire/msggetblocks.go index 7dc2607d0..3474f234a 100644 --- a/wire/msggetblockinvs.go +++ b/wire/msggetblocks.go @@ -10,17 +10,17 @@ import ( "github.com/kaspanet/kaspad/util/daghash" ) -// MsgGetBlockInvs implements the Message interface and represents a kaspa -// getblockinvs message. It is used to request a list of blocks starting after the +// MsgGetBlocks implements the Message interface and represents a kaspa +// getblocks message. It is used to request a list of blocks starting after the // low hash and until the high hash. -type MsgGetBlockInvs struct { +type MsgGetBlocks struct { LowHash *daghash.Hash HighHash *daghash.Hash } // KaspaDecode decodes r using the kaspa protocol encoding into the receiver. // This is part of the Message interface implementation. -func (msg *MsgGetBlockInvs) KaspaDecode(r io.Reader, pver uint32) error { +func (msg *MsgGetBlocks) KaspaDecode(r io.Reader, pver uint32) error { msg.LowHash = &daghash.Hash{} err := ReadElement(r, msg.LowHash) if err != nil { @@ -33,7 +33,7 @@ func (msg *MsgGetBlockInvs) KaspaDecode(r io.Reader, pver uint32) error { // KaspaEncode encodes the receiver to w using the kaspa protocol encoding. // This is part of the Message interface implementation. -func (msg *MsgGetBlockInvs) KaspaEncode(w io.Writer, pver uint32) error { +func (msg *MsgGetBlocks) KaspaEncode(w io.Writer, pver uint32) error { err := WriteElement(w, msg.LowHash) if err != nil { return err @@ -44,22 +44,22 @@ func (msg *MsgGetBlockInvs) KaspaEncode(w io.Writer, pver uint32) error { // Command returns the protocol command string for the message. This is part // of the Message interface implementation. -func (msg *MsgGetBlockInvs) Command() MessageCommand { - return CmdGetBlockInvs +func (msg *MsgGetBlocks) Command() MessageCommand { + return CmdGetBlocks } // MaxPayloadLength returns the maximum length the payload can be for the // receiver. This is part of the Message interface implementation. -func (msg *MsgGetBlockInvs) MaxPayloadLength(pver uint32) uint32 { +func (msg *MsgGetBlocks) MaxPayloadLength(pver uint32) uint32 { // low hash + high hash. return 2 * daghash.HashSize } -// NewMsgGetBlockInvs returns a new kaspa getblockinvs message that conforms to the +// NewMsgGetBlocks returns a new kaspa getblocks message that conforms to the // Message interface using the passed parameters and defaults for the remaining // fields. -func NewMsgGetBlockInvs(lowHash, highHash *daghash.Hash) *MsgGetBlockInvs { - return &MsgGetBlockInvs{ +func NewMsgGetBlocks(lowHash, highHash *daghash.Hash) *MsgGetBlocks { + return &MsgGetBlocks{ LowHash: lowHash, HighHash: highHash, } diff --git a/wire/msggetblockinvs_test.go b/wire/msggetblocks_test.go similarity index 80% rename from wire/msggetblockinvs_test.go rename to wire/msggetblocks_test.go index 64b0f3ed3..293f3bd01 100644 --- a/wire/msggetblockinvs_test.go +++ b/wire/msggetblocks_test.go @@ -15,8 +15,8 @@ import ( "github.com/kaspanet/kaspad/util/daghash" ) -// TestGetBlockInvs tests the MsgGetBlockInvs API. -func TestGetBlockInvs(t *testing.T) { +// TestGetBlocks tests the MsgGetBlocks API. +func TestGetBlocks(t *testing.T) { pver := ProtocolVersion hashStr := "000000000002e7ad7b9eef9479e4aabc65cb831269cc20d2632c13684406dee0" @@ -32,16 +32,16 @@ func TestGetBlockInvs(t *testing.T) { } // Ensure we get the same data back out. - msg := NewMsgGetBlockInvs(lowHash, highHash) + msg := NewMsgGetBlocks(lowHash, highHash) if !msg.HighHash.IsEqual(highHash) { - t.Errorf("NewMsgGetBlockInvs: wrong high hash - got %v, want %v", + t.Errorf("NewMsgGetBlocks: wrong high hash - got %v, want %v", msg.HighHash, highHash) } // Ensure the command is expected value. wantCmd := MessageCommand(4) if cmd := msg.Command(); cmd != wantCmd { - t.Errorf("NewMsgGetBlockInvs: wrong command - got %v want %v", + t.Errorf("NewMsgGetBlocks: wrong command - got %v want %v", cmd, wantCmd) } @@ -55,9 +55,9 @@ func TestGetBlockInvs(t *testing.T) { } } -// TestGetBlockInvsWire tests the MsgGetBlockInvs wire encode and decode for various +// TestGetBlocksWire tests the MsgGetBlocks wire encode and decode for various // numbers of block locator hashes and protocol versions. -func TestGetBlockInvsWire(t *testing.T) { +func TestGetBlocksWire(t *testing.T) { hashStr := "2710f40c87ec93d010a6fd95f42c59a2cbacc60b18cf6b7957535" lowHash, err := daghash.NewHashFromStr(hashStr) if err != nil { @@ -71,7 +71,7 @@ func TestGetBlockInvsWire(t *testing.T) { } // MsgGetBlocks message with no start or high hash. - noStartOrStop := NewMsgGetBlockInvs(&daghash.Hash{}, &daghash.Hash{}) + noStartOrStop := NewMsgGetBlocks(&daghash.Hash{}, &daghash.Hash{}) noStartOrStopEncoded := []byte{ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -83,8 +83,8 @@ func TestGetBlockInvsWire(t *testing.T) { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // High hash } - // MsgGetBlockInvs message with a low hash and a high hash. - withLowAndHighHash := NewMsgGetBlockInvs(lowHash, highHash) + // MsgGetBlocks message with a low hash and a high hash. + withLowAndHighHash := NewMsgGetBlocks(lowHash, highHash) withLowAndHighHashEncoded := []byte{ 0x35, 0x75, 0x95, 0xb7, 0xf6, 0x8c, 0xb1, 0x60, 0xcc, 0xba, 0x2c, 0x9a, 0xc5, 0x42, 0x5f, 0xd9, @@ -97,10 +97,10 @@ func TestGetBlockInvsWire(t *testing.T) { } tests := []struct { - in *MsgGetBlockInvs // Message to encode - out *MsgGetBlockInvs // Expected decoded message - buf []byte // Wire encoding - pver uint32 // Protocol version for wire encoding + in *MsgGetBlocks // Message to encode + out *MsgGetBlocks // Expected decoded message + buf []byte // Wire encoding + pver uint32 // Protocol version for wire encoding }{ // Latest protocol version with no block locators. { @@ -135,7 +135,7 @@ func TestGetBlockInvsWire(t *testing.T) { } // Decode the message from wire format. - var msg MsgGetBlockInvs + var msg MsgGetBlocks rbuf := bytes.NewReader(test.buf) err = msg.KaspaDecode(rbuf, test.pver) if err != nil { @@ -150,9 +150,9 @@ func TestGetBlockInvsWire(t *testing.T) { } } -// TestGetBlockInvsWireErrors performs negative tests against wire encode and -// decode of MsgGetBlockInvs to confirm error paths work correctly. -func TestGetBlockInvsWireErrors(t *testing.T) { +// TestGetBlocksWireErrors performs negative tests against wire encode and +// decode of MsgGetBlocks to confirm error paths work correctly. +func TestGetBlocksWireErrors(t *testing.T) { // Set protocol inside getheaders message. pver := ProtocolVersion @@ -168,8 +168,8 @@ func TestGetBlockInvsWireErrors(t *testing.T) { t.Errorf("NewHashFromStr: %v", err) } - // MsgGetBlockInvs message with multiple block locators and a high hash. - baseGetBlocks := NewMsgGetBlockInvs(lowHash, highHash) + // MsgGetBlocks message with multiple block locators and a high hash. + baseGetBlocks := NewMsgGetBlocks(lowHash, highHash) baseGetBlocksEncoded := []byte{ 0x35, 0x75, 0x95, 0xb7, 0xf6, 0x8c, 0xb1, 0x60, 0xcc, 0xba, 0x2c, 0x9a, 0xc5, 0x42, 0x5f, 0xd9, @@ -182,12 +182,12 @@ func TestGetBlockInvsWireErrors(t *testing.T) { } tests := []struct { - in *MsgGetBlockInvs // Value to encode - buf []byte // Wire encoding - pver uint32 // Protocol version for wire encoding - max int // Max size of fixed buffer to induce errors - writeErr error // Expected write error - readErr error // Expected read error + in *MsgGetBlocks // Value to encode + buf []byte // Wire encoding + pver uint32 // Protocol version for wire encoding + max int // Max size of fixed buffer to induce errors + writeErr error // Expected write error + readErr error // Expected read error }{ // Force error in low hash. {baseGetBlocks, baseGetBlocksEncoded, pver, 0, io.ErrShortWrite, io.EOF}, @@ -218,7 +218,7 @@ func TestGetBlockInvsWireErrors(t *testing.T) { } // Decode from wire format. - var msg MsgGetBlockInvs + var msg MsgGetBlocks r := newFixedReader(test.max, test.buf) err = msg.KaspaDecode(r, test.pver) if reflect.TypeOf(err) != reflect.TypeOf(test.readErr) { diff --git a/wire/msgibdblock.go b/wire/msgibdblock.go new file mode 100644 index 000000000..15fba8e4f --- /dev/null +++ b/wire/msgibdblock.go @@ -0,0 +1,44 @@ +// Copyright (c) 2013-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import "io" + +// MsgIBDBlock implements the Message interface and represents a kaspa +// ibdblock message. It is used to deliver block and transaction information in +// response to a getblocks message (MsgGetBlocks). +type MsgIBDBlock struct { + MsgBlock +} + +// KaspaDecode decodes r using the kaspa protocol encoding into the receiver. +// This is part of the Message interface implementation. +func (msg *MsgIBDBlock) KaspaDecode(r io.Reader, pver uint32) error { + return msg.MsgBlock.KaspaDecode(r, pver) +} + +// KaspaEncode encodes the receiver to w using the kaspa protocol encoding. +// This is part of the Message interface implementation. +func (msg *MsgIBDBlock) KaspaEncode(w io.Writer, pver uint32) error { + return msg.MsgBlock.KaspaEncode(w, pver) +} + +// Command returns the protocol command string for the message. This is part +// of the Message interface implementation. +func (msg *MsgIBDBlock) Command() MessageCommand { + return CmdIBDBlock +} + +// MaxPayloadLength returns the maximum length the payload can be for the +// receiver. This is part of the Message interface implementation. +func (msg *MsgIBDBlock) MaxPayloadLength(pver uint32) uint32 { + return MaxMessagePayload +} + +// NewMsgIBDBlock returns a new kaspa ibdblock message that conforms to the +// Message interface. See MsgIBDBlock for details. +func NewMsgIBDBlock(msgBlock *MsgBlock) *MsgIBDBlock { + return &MsgIBDBlock{*msgBlock} +} diff --git a/wire/msgibdblock_test.go b/wire/msgibdblock_test.go new file mode 100644 index 000000000..b09153706 --- /dev/null +++ b/wire/msgibdblock_test.go @@ -0,0 +1,117 @@ +// Copyright (c) 2013-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package wire + +import ( + "bytes" + "reflect" + "testing" + + "github.com/davecgh/go-spew/spew" +) + +// TestIBDBlock tests the MsgIBDBlock API. +func TestIBDBlock(t *testing.T) { + pver := ProtocolVersion + + // Block 1 header. + parentHashes := blockOne.Header.ParentHashes + hashMerkleRoot := blockOne.Header.HashMerkleRoot + acceptedIDMerkleRoot := blockOne.Header.AcceptedIDMerkleRoot + utxoCommitment := blockOne.Header.UTXOCommitment + bits := blockOne.Header.Bits + nonce := blockOne.Header.Nonce + bh := NewBlockHeader(1, parentHashes, hashMerkleRoot, acceptedIDMerkleRoot, utxoCommitment, bits, nonce) + + // Ensure the command is expected value. + wantCmd := MessageCommand(25) + msg := NewMsgIBDBlock(NewMsgBlock(bh)) + if cmd := msg.Command(); cmd != wantCmd { + t.Errorf("NewMsgIBDBlock: wrong command - got %v want %v", + cmd, wantCmd) + } + + // Ensure max payload is expected value for latest protocol version. + wantPayload := uint32(1024 * 1024 * 32) + maxPayload := msg.MaxPayloadLength(pver) + if maxPayload != wantPayload { + t.Errorf("MaxPayloadLength: wrong max payload length for "+ + "protocol version %d - got %v, want %v", pver, + maxPayload, wantPayload) + } + + // Ensure we get the same block header data back out. + if !reflect.DeepEqual(&msg.Header, bh) { + t.Errorf("NewMsgIBDBlock: wrong block header - got %v, want %v", + spew.Sdump(&msg.Header), spew.Sdump(bh)) + } + + // Ensure transactions are added properly. + tx := blockOne.Transactions[0].Copy() + msg.AddTransaction(tx) + if !reflect.DeepEqual(msg.Transactions, blockOne.Transactions) { + t.Errorf("AddTransaction: wrong transactions - got %v, want %v", + spew.Sdump(msg.Transactions), + spew.Sdump(blockOne.Transactions)) + } + + // Ensure transactions are properly cleared. + msg.ClearTransactions() + if len(msg.Transactions) != 0 { + t.Errorf("ClearTransactions: wrong transactions - got %v, want %v", + len(msg.Transactions), 0) + } +} + +// TestIBDBlockWire tests the MsgIBDBlock wire encode and decode for various numbers +// of transaction inputs and outputs and protocol versions. +func TestIBDBlockWire(t *testing.T) { + tests := []struct { + in *MsgIBDBlock // Message to encode + out *MsgIBDBlock // Expected decoded message + buf []byte // Wire encoding + txLocs []TxLoc // Expected transaction locations + pver uint32 // Protocol version for wire encoding + }{ + // Latest protocol version. + { + &MsgIBDBlock{blockOne}, + &MsgIBDBlock{blockOne}, + blockOneBytes, + blockOneTxLocs, + ProtocolVersion, + }, + } + + t.Logf("Running %d tests", len(tests)) + for i, test := range tests { + // Encode the message to wire format. + var buf bytes.Buffer + err := test.in.KaspaEncode(&buf, test.pver) + if err != nil { + t.Errorf("KaspaEncode #%d error %v", i, err) + continue + } + if !bytes.Equal(buf.Bytes(), test.buf) { + t.Errorf("KaspaEncode #%d\n got: %s want: %s", i, + spew.Sdump(buf.Bytes()), spew.Sdump(test.buf)) + continue + } + + // Decode the message from wire format. + var msg MsgIBDBlock + rbuf := bytes.NewReader(test.buf) + err = msg.KaspaDecode(rbuf, test.pver) + if err != nil { + t.Errorf("KaspaDecode #%d error %v", i, err) + continue + } + if !reflect.DeepEqual(&msg, test.out) { + t.Errorf("KaspaDecode #%d\n got: %s want: %s", i, + spew.Sdump(&msg), spew.Sdump(test.out)) + continue + } + } +}