[NOD-1126] implement block relay flow (#786)

* [NOD-1126] Implement block relay flow

* [NOD-1126] Add StartGetRelayBlocksListener

* [NOD-1126] Implement block relay flow

* [NOD-1126] Integrate with new interface

* [NOD-1126] Fix comments

* [NOD-1126] Refactor protocol.go

* [NOD-1126] Split long lines

* [NOD-1126] Fix comment

* [NOD-1126] move sharedRequestedBlocks to a separate file

* [NOD-1126] Fix error message

* [NOD-1126] Move handleInv to StartBlockRelay

* [NOD-1126] Create hashesQueueSet type

* [NOD-1126] Make deleteFromRequestedBlocks a method

* [NOD-1126] Fix comment

* [NOD-1126] Add block logger

* [NOD-1126] Rename advertisedProtoVer->advertisedProtocolVer

* [NOD-1126] Fix comment and an error message

* [NOD-1126] Remove redundant loop

* [NOD-1126] Move requestBlocks upper

* [NOD-1126] Remove exiting blocks in requestedBlocks from hashesToRequest

* [NOD-1126] Change comment

* [NOD-1126] Rename stallResponseTimeout->timeout

* [NOD-1126] Use switch inside readMsgBlock

* [NOD-1126] Fix error message and remove redundant log

* [NOD-1126] Rename pacakge names

* [NOD-1126] Fix comment

* [NOD-1126] Change file names

* [NOD-1126] Convert block to partial if needed

* [NOD-1126] Remove function redeclaration

* [NOD-1126] continue instead of return

* [NOD-1126] Rename LogBlockBlueScore->LogBlock

* [NOD-1126] Add minimum functions to utils

* [NOD-1126] Flip condition on readInv

* [NOD-1126] Rename utilMath->mathUtil

* [NOD-1126] Fix comment
This commit is contained in:
Ori Newman 2020-07-12 16:11:42 +03:00 committed by GitHub
parent 4a4dca1926
commit 433cdb6006
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 788 additions and 88 deletions

View File

@ -222,7 +222,7 @@ func (dag *BlockDAG) IsKnownInvalid(hash *daghash.Hash) bool {
// GetOrphanMissingAncestorHashes returns all of the missing parents in the orphan's sub-DAG
//
// This function is safe for concurrent access.
func (dag *BlockDAG) GetOrphanMissingAncestorHashes(orphanHash *daghash.Hash) ([]*daghash.Hash, error) {
func (dag *BlockDAG) GetOrphanMissingAncestorHashes(orphanHash *daghash.Hash) []*daghash.Hash {
// Protect concurrent access. Using a read lock only so multiple
// readers can query without blocking each other.
dag.orphanLock.RLock()
@ -247,7 +247,7 @@ func (dag *BlockDAG) GetOrphanMissingAncestorHashes(orphanHash *daghash.Hash) ([
}
}
}
return missingAncestorsHashes, nil
return missingAncestorsHashes
}
// removeOrphanBlock removes the passed orphan block from the orphan pool and
@ -1584,7 +1584,7 @@ func (dag *BlockDAG) IsInSelectedParentChain(blockHash *daghash.Hash) (bool, err
blockNode, ok := dag.index.LookupNode(blockHash)
if !ok {
str := fmt.Sprintf("block %s is not in the DAG", blockHash)
return false, errNotInDAG(str)
return false, ErrNotInDAG(str)
}
return dag.virtual.selectedParentChainSet.contains(blockNode), nil
}
@ -1697,7 +1697,7 @@ func (dag *BlockDAG) ChildHashesByHash(hash *daghash.Hash) ([]*daghash.Hash, err
node, ok := dag.index.LookupNode(hash)
if !ok {
str := fmt.Sprintf("block %s is not in the DAG", hash)
return nil, errNotInDAG(str)
return nil, ErrNotInDAG(str)
}
@ -1712,7 +1712,7 @@ func (dag *BlockDAG) SelectedParentHash(blockHash *daghash.Hash) (*daghash.Hash,
node, ok := dag.index.LookupNode(blockHash)
if !ok {
str := fmt.Sprintf("block %s is not in the DAG", blockHash)
return nil, errNotInDAG(str)
return nil, ErrNotInDAG(str)
}

View File

@ -28,19 +28,19 @@ var (
byteOrder = binary.LittleEndian
)
// errNotInDAG signifies that a block hash or height that is not in the
// ErrNotInDAG signifies that a block hash that is not in the
// DAG was requested.
type errNotInDAG string
type ErrNotInDAG string
// Error implements the error interface.
func (e errNotInDAG) Error() string {
func (e ErrNotInDAG) Error() string {
return string(e)
}
// isNotInDAGErr returns whether or not the passed error is an
// errNotInDAG error.
func isNotInDAGErr(err error) bool {
var notInDAGErr errNotInDAG
// IsNotInDAGErr returns whether or not the passed error is an
// ErrNotInDAG error.
func IsNotInDAGErr(err error) bool {
var notInDAGErr ErrNotInDAG
return errors.As(err, &notInDAGErr)
}
@ -607,7 +607,7 @@ func (dag *BlockDAG) BlockByHash(hash *daghash.Hash) (*util.Block, error) {
node, ok := dag.index.LookupNode(hash)
if !ok {
str := fmt.Sprintf("block %s is not in the DAG", hash)
return nil, errNotInDAG(str)
return nil, ErrNotInDAG(str)
}
block, err := fetchBlockByHash(dbaccess.NoTx(), node.hash)

View File

@ -14,25 +14,25 @@ import (
"github.com/kaspanet/kaspad/util/daghash"
)
// TestErrNotInDAG ensures the functions related to errNotInDAG work
// TestErrNotInDAG ensures the functions related to ErrNotInDAG work
// as expected.
func TestErrNotInDAG(t *testing.T) {
errStr := "no block at height 1 exists"
err := error(errNotInDAG(errStr))
err := error(ErrNotInDAG(errStr))
// Ensure the stringized output for the error is as expected.
if err.Error() != errStr {
t.Fatalf("errNotInDAG retuned unexpected error string - "+
t.Fatalf("ErrNotInDAG retuned unexpected error string - "+
"got %q, want %q", err.Error(), errStr)
}
// Ensure error is detected as the correct type.
if !isNotInDAGErr(err) {
t.Fatalf("isNotInDAGErr did not detect as expected type")
if !IsNotInDAGErr(err) {
t.Fatalf("IsNotInDAGErr did not detect as expected type")
}
err = errors.New("something else")
if isNotInDAGErr(err) {
t.Fatalf("isNotInDAGErr detected incorrect type")
if IsNotInDAGErr(err) {
t.Fatalf("IsNotInDAGErr detected incorrect type")
}
}

View File

@ -51,6 +51,9 @@ var (
grpcLog = BackendLog.Logger("GRPC")
p2psLog = BackendLog.Logger("P2PS")
ntarLog = BackendLog.Logger("NTAR")
blkrLog = BackendLog.Logger("BLKR")
gbrlLog = BackendLog.Logger("GBRL")
blprLog = BackendLog.Logger("BLPR")
)
// SubsystemTags is an enum of all sub system tags
@ -77,7 +80,10 @@ var SubsystemTags = struct {
MUXX,
GRPC,
P2PS,
NTAR string
BLKR,
NTAR,
GBRL,
BLPR string
}{
ADXR: "ADXR",
AMGR: "AMGR",
@ -101,7 +107,10 @@ var SubsystemTags = struct {
MUXX: "MUXX",
GRPC: "GRPC",
P2PS: "P2PS",
BLKR: "BLKR",
GBRL: "GBRL",
NTAR: "NTAR",
BLPR: "BLPR",
}
// subsystemLoggers maps each subsystem identifier to its associated logger.
@ -128,7 +137,10 @@ var subsystemLoggers = map[string]*logs.Logger{
SubsystemTags.MUXX: muxxLog,
SubsystemTags.GRPC: grpcLog,
SubsystemTags.P2PS: p2psLog,
SubsystemTags.BLKR: blkrLog,
SubsystemTags.GBRL: gbrlLog,
SubsystemTags.NTAR: ntarLog,
SubsystemTags.BLPR: blprLog,
}
// InitLog attaches log file and error log file to the backend log.

View File

@ -1,6 +1,7 @@
package netadapter
import (
"sync"
"sync/atomic"
"github.com/kaspanet/kaspad/config"
@ -28,6 +29,7 @@ type NetAdapter struct {
connectionIDs map[server.Connection]*ID
idsToConnections map[*ID]server.Connection
idsToRouters map[*ID]*Router
sync.RWMutex
}
// NewNetAdapter creates and starts a new NetAdapter on the
@ -131,7 +133,7 @@ func (na *NetAdapter) startReceiveLoop(connection server.Connection, router *Rou
log.Warnf("Failed to receive from %s: %s", connection, err)
break
}
err = router.RouteInputMessage(message)
err = router.RouteIncomingMessage(message)
if err != nil {
// TODO(libp2p): This should never happen, do something more severe
log.Warnf("Failed to route input message from %s: %s", connection, err)
@ -149,7 +151,7 @@ func (na *NetAdapter) startReceiveLoop(connection server.Connection, router *Rou
func (na *NetAdapter) startSendLoop(connection server.Connection, router *Router) {
for atomic.LoadUint32(&na.stop) == 0 {
message := router.TakeOutputMessage()
message := router.ReadOutgoingMessage()
err := connection.Send(message)
if err != nil {
log.Warnf("Failed to send to %s: %s", connection, err)
@ -178,16 +180,15 @@ func (na *NetAdapter) ID() *ID {
// Broadcast sends the given `message` to every peer corresponding
// to each ID in `ids`
func (na *NetAdapter) Broadcast(ids []*ID, message wire.Message) error {
func (na *NetAdapter) Broadcast(ids []*ID, message wire.Message) {
na.RLock()
defer na.RUnlock()
for _, id := range ids {
router, ok := na.idsToRouters[id]
if !ok {
return errors.Errorf("id %s is not registered", id)
}
err := router.RouteInputMessage(message)
if err != nil {
return err
log.Warnf("id %s is not registered", id)
continue
}
router.WriteOutgoingMessage(message)
}
return nil
}

View File

@ -12,16 +12,16 @@ type OnIDReceivedHandler func(id *ID)
// Router routes messages by type to their respective
// input channels
type Router struct {
inputRoutes map[string]chan wire.Message
outputRoute chan wire.Message
incomingRoutes map[string]chan<- wire.Message
outgoingRoute chan wire.Message
onIDReceivedHandler OnIDReceivedHandler
}
// NewRouter creates a new empty router
func NewRouter() *Router {
return &Router{
inputRoutes: make(map[string]chan wire.Message),
outputRoute: make(chan wire.Message),
incomingRoutes: make(map[string]chan<- wire.Message),
outgoingRoute: make(chan wire.Message),
}
}
@ -33,12 +33,12 @@ func (r *Router) SetOnIDReceivedHandler(onIDReceivedHandler OnIDReceivedHandler)
// AddRoute registers the messages of types `messageTypes` to
// be routed to the given `inputChannel`
func (r *Router) AddRoute(messageTypes []string, inputChannel chan wire.Message) error {
func (r *Router) AddRoute(messageTypes []string, inputChannel chan<- wire.Message) error {
for _, messageType := range messageTypes {
if _, ok := r.inputRoutes[messageType]; ok {
if _, ok := r.incomingRoutes[messageType]; ok {
return errors.Errorf("a route for '%s' already exists", messageType)
}
r.inputRoutes[messageType] = inputChannel
r.incomingRoutes[messageType] = inputChannel
}
return nil
}
@ -47,18 +47,18 @@ func (r *Router) AddRoute(messageTypes []string, inputChannel chan wire.Message)
// the router
func (r *Router) RemoveRoute(messageTypes []string) error {
for _, messageType := range messageTypes {
if _, ok := r.inputRoutes[messageType]; !ok {
if _, ok := r.incomingRoutes[messageType]; !ok {
return errors.Errorf("a route for '%s' does not exist", messageType)
}
delete(r.inputRoutes, messageType)
delete(r.incomingRoutes, messageType)
}
return nil
}
// RouteInputMessage sends the given message to the correct input
// RouteIncomingMessage sends the given message to the correct input
// channel as registered with AddRoute
func (r *Router) RouteInputMessage(message wire.Message) error {
routeInChannel, ok := r.inputRoutes[message.Command()]
func (r *Router) RouteIncomingMessage(message wire.Message) error {
routeInChannel, ok := r.incomingRoutes[message.Command()]
if !ok {
return errors.Errorf("a route for '%s' does not exist", message.Command())
}
@ -66,15 +66,15 @@ func (r *Router) RouteInputMessage(message wire.Message) error {
return nil
}
// TakeOutputMessage takes the next output message from
// ReadOutgoingMessage takes the next output message from
// the output channel
func (r *Router) TakeOutputMessage() wire.Message {
return <-r.outputRoute
func (r *Router) ReadOutgoingMessage() wire.Message {
return <-r.outgoingRoute
}
// WriteOutgoingMessage pushes the given message to the output route
func (r *Router) WriteOutgoingMessage(message wire.Message) {
r.outputRoute <- message
r.outgoingRoute <- message
}
// RegisterID registers the remote connection's ID
@ -86,7 +86,7 @@ func (r *Router) RegisterID(id *ID) {
// input channels
func (r *Router) Close() error {
inChannels := make(map[chan<- wire.Message]struct{})
for _, inChannel := range r.inputRoutes {
for _, inChannel := range r.incomingRoutes {
inChannels[inChannel] = struct{}{}
}
for inChannel := range inChannels {

View File

@ -553,12 +553,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
// Request the parents for the orphan block from the peer that sent it.
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(blockHash)
if err != nil {
log.Errorf("Failed to find missing ancestors for block %s: %s",
blockHash, err)
return
}
missingAncestors := sm.dag.GetOrphanMissingAncestorHashes(blockHash)
sm.addBlocksToRequestQueue(state, missingAncestors, wire.InvTypeMissingAncestor)
} else {
// When the block is not an orphan, log information about it and
@ -767,12 +762,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
sm.RemoveFromSyncCandidates(peer)
return
}
missingAncestors, err := sm.dag.GetOrphanMissingAncestorHashes(iv.Hash)
if err != nil {
log.Errorf("Failed to find missing ancestors for block %s: %s",
iv.Hash, err)
return
}
missingAncestors := sm.dag.GetOrphanMissingAncestorHashes(iv.Hash)
sm.addBlocksToRequestQueue(state, missingAncestors, wire.InvTypeMissingAncestor)
continue
}

View File

@ -8,6 +8,8 @@ const (
BanScoreOrphanInvAsPartOfNetsync = 100
BanScoreMalformedBlueScoreInOrphan = 100
BanScoreRequestNonExistingBlock = 10
BanScoreUnrequestedSelectedTip = 20
BanScoreUnrequestedTx = 20
BanScoreInvalidTx = 100
@ -35,4 +37,6 @@ const (
BanScoreNodeBloomFlagViolation = 100
BanScoreStallTimeout = 1
BanScoreUnrequestedMessage = 100
)

View File

@ -8,6 +8,7 @@ import (
"bytes"
"container/list"
"fmt"
mathUtil "github.com/kaspanet/kaspad/util/math"
"github.com/kaspanet/kaspad/util/mstime"
"io"
"math/rand"
@ -258,15 +259,6 @@ type Config struct {
SubnetworkID *subnetworkid.SubnetworkID
}
// minUint32 is a helper function to return the minimum of two uint32s.
// This avoids a math import and the need to cast to floats.
func minUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}
// newNetAddress attempts to extract the IP address and port from the passed
// net.Addr interface and create a kaspa NetAddress structure using that
// information.
@ -767,7 +759,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
msg.Services = p.cfg.Services
// Advertise our max supported protocol version.
msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
msg.ProtocolVersion = p.cfg.ProtocolVersion
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = p.cfg.DisableRelayTx
@ -951,8 +943,8 @@ func (p *Peer) updateFlagsFromVersionMsg(msg *wire.MsgVersion) {
p.flagsMtx.Lock()
defer p.flagsMtx.Unlock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.advertisedProtoVer = msg.ProtocolVersion
p.protocolVersion = mathUtil.MinUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)

View File

@ -0,0 +1,64 @@
// Copyright (c) 2015-2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package blocklogger
import (
"github.com/kaspanet/kaspad/util/mstime"
"sync"
"time"
"github.com/kaspanet/kaspad/util"
)
var (
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime = mstime.Now()
mtx sync.Mutex
)
// LogBlock logs a new block blue score as an information message
// to show progress to the user. In order to prevent spam, it limits logging to
// one message every 10 seconds with duration and totals included.
func LogBlock(block *util.Block) error {
mtx.Lock()
defer mtx.Unlock()
receivedLogBlocks++
receivedLogTx += int64(len(block.MsgBlock().Transactions))
now := mstime.Now()
duration := now.Sub(lastBlockLogTime)
if duration < time.Second*10 {
return nil
}
// Truncate the duration to 10s of milliseconds.
tDuration := duration.Round(10 * time.Millisecond)
// Log information about new block blue score.
blockStr := "blocks"
if receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if receivedLogTx == 1 {
txStr = "transaction"
}
blueScore, err := block.BlueScore()
if err != nil {
return err
}
log.Infof("Processed %d %s in the last %s (%d %s, blue score %d, %s)",
receivedLogBlocks, blockStr, tDuration, receivedLogTx,
txStr, blueScore, block.MsgBlock().Header.Timestamp)
receivedLogBlocks = 0
receivedLogTx = 0
lastBlockLogTime = now
return nil
}

View File

@ -0,0 +1,11 @@
// Copyright (c) 2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package blocklogger
import (
"github.com/kaspanet/kaspad/logger"
)
var log, _ = logger.Get(logger.SubsystemTags.BLPR)

View File

@ -0,0 +1,46 @@
package handlerelayblockrequests
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
// HandleRelayBlockRequests listens to wire.MsgGetRelayBlocks messages and sends
// their corresponding blocks to the requesting peer.
func HandleRelayBlockRequests(msgChan <-chan wire.Message, peer *peerpkg.Peer, router *netadapter.Router,
dag *blockdag.BlockDAG) error {
for msg := range msgChan {
getRelayBlocksMsg := msg.(*wire.MsgGetRelayBlocks)
for _, hash := range getRelayBlocksMsg.Hashes {
// Fetch the block from the database.
block, err := dag.BlockByHash(hash)
if blockdag.IsNotInDAGErr(err) {
return errors.Errorf("block %s not found", hash)
} else if err != nil {
panic(errors.Wrapf(err, "unable to fetch requested block hash %s", hash))
}
msgBlock := block.MsgBlock()
// If we are a full node and the peer is a partial node, we must convert
// the block to a partial block.
nodeSubnetworkID := dag.SubnetworkID()
peerSubnetworkID, err := peer.SubnetworkID()
if err != nil {
panic(err)
}
isNodeFull := nodeSubnetworkID == nil
isPeerFull := peerSubnetworkID == nil
if isNodeFull && !isPeerFull {
msgBlock.ConvertToPartial(peerSubnetworkID)
}
router.WriteOutgoingMessage(msgBlock)
}
}
return nil
}

View File

@ -0,0 +1,9 @@
package handlerelayblockrequests
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.GBRL)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,214 @@
package handlerelayinvs
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/blocklogger"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/daghash"
mathUtil "github.com/kaspanet/kaspad/util/math"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"time"
)
// HandleRelayInvs listens to wire.MsgInvRelayBlock messages, requests their corresponding blocks if they
// are missing, adds them to the DAG and propagates them to the rest of the network.
func HandleRelayInvs(msgChan <-chan wire.Message, peer *peerpkg.Peer, netAdapter *netadapter.NetAdapter, router *netadapter.Router,
dag *blockdag.BlockDAG) error {
invsQueue := make([]*wire.MsgInvRelayBlock, 0)
for {
inv, shouldStop, err := readInv(msgChan, &invsQueue)
if err != nil {
return err
}
if shouldStop {
return nil
}
if dag.IsKnownBlock(inv.Hash) {
if dag.IsKnownInvalid(inv.Hash) {
return errors.Errorf("sent inv of an invalid block %s",
inv.Hash)
}
continue
}
requestQueue := newHashesQueueSet()
requestQueue.enqueueIfNotExists(inv.Hash)
for requestQueue.len() > 0 {
shouldStop, err := requestBlocks(netAdapter, router, peer, msgChan, dag, &invsQueue,
requestQueue)
if err != nil {
return err
}
if shouldStop {
return nil
}
}
}
}
func readInv(msgChan <-chan wire.Message,
invsQueue *[]*wire.MsgInvRelayBlock) (inv *wire.MsgInvRelayBlock, shouldStop bool, err error) {
if len(*invsQueue) > 0 {
inv, *invsQueue = (*invsQueue)[0], (*invsQueue)[1:]
return inv, false, nil
}
msg, isOpen := <-msgChan
if !isOpen {
return nil, true, nil
}
inv, ok := msg.(*wire.MsgInvRelayBlock)
if !ok {
return nil, false, errors.Errorf("unexpected %s message in the block relay flow while "+
"expecting an inv message", msg.Command())
}
return inv, false, nil
}
func requestBlocks(netAdapater *netadapter.NetAdapter, router *netadapter.Router, peer *peerpkg.Peer, msgChan <-chan wire.Message,
dag *blockdag.BlockDAG, invsQueue *[]*wire.MsgInvRelayBlock, requestQueue *hashesQueueSet) (shouldStop bool, err error) {
numHashesToRequest := mathUtil.MinInt(wire.MsgGetRelayBlocksHashes, requestQueue.len())
hashesToRequest := requestQueue.dequeue(numHashesToRequest)
pendingBlocks := map[daghash.Hash]struct{}{}
var filteredHashesToRequest []*daghash.Hash
for _, hash := range hashesToRequest {
exists := requestedBlocks.addIfNotExists(hash)
if !exists {
continue
}
pendingBlocks[*hash] = struct{}{}
filteredHashesToRequest = append(filteredHashesToRequest, hash)
}
// In case the function returns earlier than expected, we want to make sure requestedBlocks is
// clean from any pending blocks.
defer requestedBlocks.removeSet(pendingBlocks)
getRelayBlocksMsg := wire.NewMsgGetRelayBlocks(filteredHashesToRequest)
router.WriteOutgoingMessage(getRelayBlocksMsg)
for len(pendingBlocks) > 0 {
msgBlock, shouldStop, err := readMsgBlock(msgChan, invsQueue)
if err != nil {
return false, err
}
if shouldStop {
return true, nil
}
block := util.NewBlock(msgBlock)
blockHash := block.Hash()
if _, ok := pendingBlocks[*blockHash]; !ok {
return false, errors.Errorf("got unrequested block %s", block.Hash())
}
delete(pendingBlocks, *blockHash)
requestedBlocks.remove(blockHash)
shouldStop, err = processAndRelayBlock(netAdapater, peer, dag, requestQueue, block)
if err != nil {
return false, err
}
if shouldStop {
return true, nil
}
}
return false, nil
}
// readMsgBlock returns the next msgBlock in msgChan, and populates invsQueue with any inv messages that meanwhile arrive.
//
// Note: this function assumes msgChan can contain only wire.MsgInvRelayBlock and wire.MsgBlock messages.
func readMsgBlock(msgChan <-chan wire.Message,
invsQueue *[]*wire.MsgInvRelayBlock) (msgBlock *wire.MsgBlock, shouldStop bool, err error) {
for {
const timeout = 30 * time.Second
select {
case <-time.After(timeout):
return nil, false, errors.Errorf("stalled for %s", timeout)
case msg, isOpen := <-msgChan:
if !isOpen {
return nil, true, nil
}
switch msg := msg.(type) {
case *wire.MsgInvRelayBlock:
*invsQueue = append(*invsQueue, msg)
case *wire.MsgBlock:
return msg, false, nil
default:
panic(errors.Errorf("unexpected message %s", msg.Command()))
}
}
}
}
func processAndRelayBlock(netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
dag *blockdag.BlockDAG, requestQueue *hashesQueueSet, block *util.Block) (shouldStop bool, err error) {
blockHash := block.Hash()
isOrphan, isDelayed, err := dag.ProcessBlock(block, blockdag.BFNone)
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
// it as such. Otherwise, something really did go wrong, so panic.
if !errors.As(err, &blockdag.RuleError{}) {
panic(errors.Wrapf(err, "failed to process block %s",
blockHash))
}
log.Infof("Rejected block %s from %s: %s", blockHash,
peer, err)
return false, errors.Wrap(err, "got invalid block: %s")
}
if isDelayed {
return false, nil
}
if isOrphan {
blueScore, err := block.BlueScore()
if err != nil {
return false, errors.Errorf("received an orphan block %s with malformed blue score", blockHash)
}
const maxOrphanBlueScoreDiff = 10000
selectedTipBlueScore := dag.SelectedTipBlueScore()
if blueScore > selectedTipBlueScore+maxOrphanBlueScoreDiff {
log.Infof("Orphan block %s has blue score %d and the selected tip blue score is "+
"%d. Ignoring orphans with a blue score difference from the selected tip greater than %d",
blockHash, blueScore, selectedTipBlueScore, maxOrphanBlueScoreDiff)
return false, nil
}
// Request the parents for the orphan block from the peer that sent it.
missingAncestors := dag.GetOrphanMissingAncestorHashes(blockHash)
for _, missingAncestor := range missingAncestors {
requestQueue.enqueueIfNotExists(missingAncestor)
}
return false, nil
}
err = blocklogger.LogBlock(block)
if err != nil {
panic(err)
}
//TODO(libp2p)
//// When the block is not an orphan, log information about it and
//// update the DAG state.
// sm.restartSyncIfNeeded()
//// Clear the rejected transactions.
//sm.rejectedTxns = make(map[daghash.TxID]struct{})
netAdapter.Broadcast(peerpkg.GetReadyPeerIDs(), block.MsgBlock())
return false, nil
}

View File

@ -0,0 +1,35 @@
package handlerelayinvs
import "github.com/kaspanet/kaspad/util/daghash"
type hashesQueueSet struct {
queue []*daghash.Hash
set map[daghash.Hash]struct{}
}
func (r *hashesQueueSet) enqueueIfNotExists(hash *daghash.Hash) {
if _, ok := r.set[*hash]; !ok {
return
}
r.queue = append(r.queue, hash)
r.set[*hash] = struct{}{}
}
func (r *hashesQueueSet) dequeue(numItems int) []*daghash.Hash {
var hashes []*daghash.Hash
hashes, r.queue = r.queue[:numItems], r.queue[numItems:]
for _, hash := range hashes {
delete(r.set, *hash)
}
return hashes
}
func (r *hashesQueueSet) len() int {
return len(r.queue)
}
func newHashesQueueSet() *hashesQueueSet {
return &hashesQueueSet{
set: make(map[daghash.Hash]struct{}),
}
}

View File

@ -0,0 +1,9 @@
package handlerelayinvs
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.BLKR)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,38 @@
package handlerelayinvs
import (
"github.com/kaspanet/kaspad/util/daghash"
"sync"
)
type sharedRequestedBlocks struct {
blocks map[daghash.Hash]struct{}
sync.Mutex
}
func (s *sharedRequestedBlocks) remove(hash *daghash.Hash) {
s.Lock()
defer s.Unlock()
delete(s.blocks, *hash)
}
func (s *sharedRequestedBlocks) removeSet(blockHashes map[daghash.Hash]struct{}) {
for hash := range blockHashes {
delete(s.blocks, hash)
}
}
func (s *sharedRequestedBlocks) addIfNotExists(hash *daghash.Hash) (exists bool) {
s.Lock()
defer s.Unlock()
_, ok := s.blocks[*hash]
if ok {
return true
}
s.blocks[*hash] = struct{}{}
return false
}
var requestedBlocks = &sharedRequestedBlocks{
blocks: make(map[daghash.Hash]struct{}),
}

9
protocol/peer/log.go Normal file
View File

@ -0,0 +1,9 @@
package peer
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.PEER)
var spawn = panics.GoroutineWrapperFunc(log)

108
protocol/peer/peer.go Normal file
View File

@ -0,0 +1,108 @@
package peer
import (
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync"
"sync/atomic"
)
// Peer holds data about a peer.
type Peer struct {
ready uint32
selectedTipHashMtx sync.RWMutex
selectedTipHash *daghash.Hash
id uint32
userAgent string
services wire.ServiceFlag
advertisedProtocolVer uint32 // protocol version advertised by remote
protocolVersion uint32 // negotiated protocol version
disableRelayTx bool
subnetworkID *subnetworkid.SubnetworkID
}
// SelectedTipHash returns the selected tip of the peer.
func (p *Peer) SelectedTipHash() (*daghash.Hash, error) {
if atomic.LoadUint32(&p.ready) == 0 {
return nil, errors.New("peer is not ready yet")
}
p.selectedTipHashMtx.RLock()
defer p.selectedTipHashMtx.RUnlock()
return p.selectedTipHash, nil
}
// SetSelectedTipHash sets the selected tip of the peer.
func (p *Peer) SetSelectedTipHash(hash *daghash.Hash) error {
if atomic.LoadUint32(&p.ready) == 0 {
return errors.New("peer is not ready yet")
}
p.selectedTipHashMtx.Lock()
defer p.selectedTipHashMtx.Unlock()
p.selectedTipHash = hash
return nil
}
// SubnetworkID returns the subnetwork the peer is associated with.
// It is nil in full nodes.
func (p *Peer) SubnetworkID() (*subnetworkid.SubnetworkID, error) {
if atomic.LoadUint32(&p.ready) == 0 {
return nil, errors.New("peer is not ready yet")
}
return p.subnetworkID, nil
}
// MarkAsReady marks the peer as ready.
func (p *Peer) MarkAsReady() error {
if atomic.AddUint32(&p.ready, 1) != 1 {
return errors.New("peer is already ready")
}
return nil
}
// UpdateFieldsFromMsgVersion updates the peer with the data from the version message.
func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion, peerID uint32) {
// Negotiate the protocol version.
p.advertisedProtocolVer = msg.ProtocolVersion
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtocolVer)
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = peerID
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
p.disableRelayTx = msg.DisableRelayTx
p.selectedTipHash = msg.SelectedTipHash
p.subnetworkID = msg.SubnetworkID
}
func (p *Peer) String() string {
//TODO(libp2p)
panic("unimplemented")
}
// minUint32 is a helper function to return the minimum of two uint32s.
// This avoids a math import and the need to cast to floats.
func minUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}
// GetReadyPeerIDs returns the peer IDs of all the ready peers.
func GetReadyPeerIDs() []*netadapter.ID {
// TODO(libp2p)
panic("unimplemented")
}

View File

@ -3,7 +3,11 @@ package protocol
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/handlerelayblockrequests"
"github.com/kaspanet/kaspad/protocol/handlerelayinvs"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/wire"
"sync/atomic"
)
// Manager manages the p2p protocol
@ -40,27 +44,68 @@ func (p *Manager) Stop() error {
func newRouterInitializer(netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) netadapter.RouterInitializer {
return func() (*netadapter.Router, error) {
router := netadapter.NewRouter()
err := router.AddRoute([]string{wire.CmdPing, wire.CmdPong}, startPing(netAdapter, router, dag))
if err != nil {
return nil, err
}
spawn(func() {
err := startFlows(netAdapter, router, dag)
if err != nil {
// TODO(libp2p) Ban peer
}
})
return router, nil
}
}
// TODO(libp2p): Remove this and change it with a real Ping-Pong flow.
func startPing(netAdapter *netadapter.NetAdapter, router *netadapter.Router,
dag *blockdag.BlockDAG) chan wire.Message {
func startFlows(netAdapter *netadapter.NetAdapter, router *netadapter.Router, dag *blockdag.BlockDAG) error {
stop := make(chan error)
stopped := uint32(0)
peer := new(peerpkg.Peer)
addFlow("HandleRelayInvs", router, []string{wire.CmdInvRelayBlock, wire.CmdBlock}, &stopped, stop,
func(ch chan wire.Message) error {
return handlerelayinvs.HandleRelayInvs(ch, peer, netAdapter, router, dag)
},
)
addFlow("HandleRelayBlockRequests", router, []string{wire.CmdGetRelayBlocks}, &stopped, stop,
func(ch chan wire.Message) error {
return handlerelayblockrequests.HandleRelayBlockRequests(ch, peer, router, dag)
},
)
// TODO(libp2p): Remove this and change it with a real Ping-Pong flow.
addFlow("PingPong", router, []string{wire.CmdPing, wire.CmdPong}, &stopped, stop,
func(ch chan wire.Message) error {
router.WriteOutgoingMessage(wire.NewMsgPing(666))
for message := range ch {
log.Infof("Got message: %+v", message.Command())
if message.Command() == "ping" {
router.WriteOutgoingMessage(wire.NewMsgPong(666))
}
}
return nil
},
)
err := <-stop
return err
}
func addFlow(name string, router *netadapter.Router, messageTypes []string, stopped *uint32,
stopChan chan error, flow func(ch chan wire.Message) error) {
ch := make(chan wire.Message)
err := router.AddRoute(messageTypes, ch)
if err != nil {
panic(err)
}
spawn(func() {
router.WriteOutgoingMessage(wire.NewMsgPing(666))
for message := range ch {
log.Infof("Got message: %+v", message.Command())
if message.Command() == "ping" {
router.WriteOutgoingMessage(wire.NewMsgPong(666))
}
err := flow(ch)
if err != nil {
log.Errorf("error from %s flow: %s", name, err)
}
if atomic.AddUint32(stopped, 1) == 1 {
stopChan <- err
}
})
return ch
}

17
util/math/min.go Normal file
View File

@ -0,0 +1,17 @@
package math
// MinInt returns the smaller of x or y.
func MinInt(x, y int) int {
if x < y {
return x
}
return y
}
// MinUint32 returns the smaller of x or y.
func MinUint32(x, y uint32) uint32 {
if x < y {
return x
}
return y
}

View File

@ -52,6 +52,8 @@ const (
CmdBlockLocator = "locator"
CmdSelectedTip = "selectedtip"
CmdGetSelectedTip = "getseltip"
CmdInvRelayBlock = "invrelblk"
CmdGetRelayBlocks = "getrelblks"
)
// Message is an interface that describes a kaspa message. A type that

49
wire/msggetrelayblocks.go Normal file
View File

@ -0,0 +1,49 @@
package wire
import (
"github.com/kaspanet/kaspad/util/daghash"
"io"
)
// MsgGetRelayBlocksHashes is the maximum number of hashes that can
// be in a single getrelblks message.
const MsgGetRelayBlocksHashes = MaxInvPerMsg
// MsgGetRelayBlocks implements the Message interface and represents a kaspa
// getrelblks message. It is used to request blocks as part of the block
// relay protocol.
type MsgGetRelayBlocks struct {
Hashes []*daghash.Hash
}
// KaspaDecode decodes r using the kaspa protocol encoding into the receiver.
// This is part of the Message interface implementation.
func (msg *MsgGetRelayBlocks) KaspaDecode(r io.Reader, pver uint32) error {
return ReadElement(r, &msg.Hashes)
}
// KaspaEncode encodes the receiver to w using the kaspa protocol encoding.
// This is part of the Message interface implementation.
func (msg *MsgGetRelayBlocks) KaspaEncode(w io.Writer, pver uint32) error {
return WriteElement(w, msg.Hashes)
}
// Command returns the protocol command string for the message. This is part
// of the Message interface implementation.
func (msg *MsgGetRelayBlocks) Command() string {
return CmdGetRelayBlocks
}
// MaxPayloadLength returns the maximum length the payload can be for the
// receiver. This is part of the Message interface implementation.
func (msg *MsgGetRelayBlocks) MaxPayloadLength(pver uint32) uint32 {
return daghash.HashSize
}
// NewMsgGetRelayBlocks returns a new kaspa getrelblks message that conforms to
// the Message interface. See MsgGetRelayBlocks for details.
func NewMsgGetRelayBlocks(hashes []*daghash.Hash) *MsgGetRelayBlocks {
return &MsgGetRelayBlocks{
Hashes: hashes,
}
}

45
wire/msginvrelayblock.go Normal file
View File

@ -0,0 +1,45 @@
package wire
import (
"github.com/kaspanet/kaspad/util/daghash"
"io"
)
// MsgInvRelayBlock implements the Message interface and represents a kaspa
// block inventory message. It is used to notify the network about new block
// by sending their hash, and let the receiving node decide if it needs it.
type MsgInvRelayBlock struct {
Hash *daghash.Hash
}
// KaspaDecode decodes r using the kaspa protocol encoding into the receiver.
// This is part of the Message interface implementation.
func (msg *MsgInvRelayBlock) KaspaDecode(r io.Reader, pver uint32) error {
return ReadElement(r, &msg.Hash)
}
// KaspaEncode encodes the receiver to w using the kaspa protocol encoding.
// This is part of the Message interface implementation.
func (msg *MsgInvRelayBlock) KaspaEncode(w io.Writer, pver uint32) error {
return WriteElement(w, msg.Hash)
}
// Command returns the protocol command string for the message. This is part
// of the Message interface implementation.
func (msg *MsgInvRelayBlock) Command() string {
return CmdInvRelayBlock
}
// MaxPayloadLength returns the maximum length the payload can be for the
// receiver. This is part of the Message interface implementation.
func (msg *MsgInvRelayBlock) MaxPayloadLength(pver uint32) uint32 {
return daghash.HashSize
}
// NewMsgInvBlock returns a new kaspa invrelblk message that conforms to
// the Message interface. See MsgInvRelayBlock for details.
func NewMsgInvBlock(hash *daghash.Hash) *MsgInvRelayBlock {
return &MsgInvRelayBlock{
Hash: hash,
}
}

View File

@ -33,7 +33,7 @@ var DefaultUserAgent = fmt.Sprintf("/kaspad:%s/", version.Version())
// communication is allowed to proceed.
type MsgVersion struct {
// Version of the protocol the node is using.
ProtocolVersion int32
ProtocolVersion uint32
// Bitfield which identifies the enabled services.
Services ServiceFlag
@ -240,7 +240,7 @@ func NewMsgVersion(me *NetAddress, you *NetAddress, nonce uint64,
// Limit the timestamp to one millisecond precision since the protocol
// doesn't support better.
return &MsgVersion{
ProtocolVersion: int32(ProtocolVersion),
ProtocolVersion: ProtocolVersion,
Services: 0,
Timestamp: mstime.Now(),
AddrYou: *you,

View File

@ -35,7 +35,7 @@ func TestVersion(t *testing.T) {
// Ensure we get the correct data back out.
msg := NewMsgVersion(me, you, nonce, selectedTipHash, nil)
if msg.ProtocolVersion != int32(pver) {
if msg.ProtocolVersion != pver {
t.Errorf("NewMsgVersion: wrong protocol version - got %v, want %v",
msg.ProtocolVersion, pver)
}