[NOD-276] Extracted p2p onXXX methods to separate files. (#418)

This commit is contained in:
stasatdaglabs 2019-09-22 17:40:10 +03:00 committed by Svarog
parent 40342eb45a
commit c5108a4abd
21 changed files with 917 additions and 781 deletions

65
server/p2p/on_addr.go Normal file
View File

@ -0,0 +1,65 @@
package p2p
import (
"github.com/daglabs/btcd/config"
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
"time"
)
// OnAddr is invoked when a peer receives an addr bitcoin message and is
// used to notify the server about advertised addresses.
func (sp *Peer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
// Ignore addresses when running on the simulation test network. This
// helps prevent the network from becoming another public test network
// since it will not be able to learn about other peers that have not
// specifically been provided.
if config.MainConfig().SimNet {
return
}
// A message that has no addresses is invalid.
if len(msg.AddrList) == 0 {
peerLog.Errorf("Command [%s] from %s does not contain any addresses",
msg.Command(), sp.Peer)
sp.Disconnect()
return
}
if msg.IncludeAllSubnetworks {
peerLog.Errorf("Got unexpected IncludeAllSubnetworks=true in [%s] command from %s",
msg.Command(), sp.Peer)
sp.Disconnect()
return
} else if !msg.SubnetworkID.IsEqual(config.MainConfig().SubnetworkID) && msg.SubnetworkID != nil {
peerLog.Errorf("Only full nodes and %s subnetwork IDs are allowed in [%s] command, but got subnetwork ID %s from %s",
config.MainConfig().SubnetworkID, msg.Command(), msg.SubnetworkID, sp.Peer)
sp.Disconnect()
return
}
for _, na := range msg.AddrList {
// Don't add more address if we're disconnecting.
if !sp.Connected() {
return
}
// Set the timestamp to 5 days ago if it's more than 24 hours
// in the future so this address is one of the first to be
// removed when space is needed.
now := time.Now()
if na.Timestamp.After(now.Add(time.Minute * 10)) {
na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
}
// Add address to known addresses for this peer.
sp.addKnownAddresses([]*wire.NetAddress{na})
}
// Add addresses to server address manager. The address manager handles
// the details of things such as preventing duplicate addresses, max
// addresses, and last seen updates.
// XXX bitcoind gives a 2 hour time penalty here, do we want to do the
// same?
sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA(), msg.SubnetworkID)
}

33
server/p2p/on_block.go Normal file
View File

@ -0,0 +1,33 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/wire"
)
// OnBlock is invoked when a peer receives a block bitcoin message. It
// blocks until the bitcoin block has been fully processed.
func (sp *Peer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
// Convert the raw MsgBlock to a util.Block which provides some
// convenience methods and things such as hash caching.
block := util.NewBlockFromBlockAndBytes(msg, buf)
// Add the block to the known inventory for the peer.
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sp.AddKnownInventory(iv)
// Queue the block up to be handled by the block
// manager and intentionally block further receives
// until the bitcoin block is fully processed and known
// good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad blocks before
// disconnecting (or being disconnected) and wasting
// memory. Additionally, this behavior is depended on
// by at least the block acceptance test tool as the
// reference implementation processes blocks in the same
// thread and therefore blocks further messages until
// the bitcoin block has been fully processed.
sp.server.SyncManager.QueueBlock(block, sp.Peer, false, sp.blockProcessed)
<-sp.blockProcessed
}

View File

@ -0,0 +1,44 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnBlockLocator is invoked when a peer receives a locator bitcoin
// message.
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
// Find the highest known shared block between the peers, and asks
// the block and its future from the peer. If the block is not
// found, create a lower resolution block locator and send it to
// the peer in order to find it in the next iteration.
dag := sp.server.DAG
if len(msg.BlockLocatorHashes) == 0 {
peerLog.Warnf("Got empty block locator from peer %s",
sp)
return
}
// If the first hash of the block locator is known, it means we found
// the highest shared block.
firstHash := msg.BlockLocatorHashes[0]
if dag.BlockExists(firstHash) {
if dag.IsKnownFinalizedBlock(firstHash) {
peerLog.Warnf("Cannot sync with peer %s because the highest"+
" shared chain block (%s) is below the finality point", sp, firstHash)
sp.Disconnect()
return
}
err := sp.server.SyncManager.PushGetBlockInvsOrHeaders(sp.Peer, firstHash)
if err != nil {
peerLog.Errorf("Failed pushing get blocks message for peer %s: %s",
sp, err)
return
}
return
}
startHash, stopHash := dag.FindNextLocatorBoundaries(msg.BlockLocatorHashes)
if startHash == nil {
panic("Couldn't find any unknown hashes in the block locator.")
}
sp.PushGetBlockLocatorMsg(startHash, stopHash)
}

View File

@ -0,0 +1,24 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/wire"
"sync/atomic"
)
// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and
// is used by remote peers to request that no transactions which have a fee rate
// lower than provided value are inventoried to them. The peer will be
// disconnected if an invalid fee filter value is provided.
func (sp *Peer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) {
// Check that the passed minimum fee is a valid amount.
if msg.MinFee < 0 || msg.MinFee > util.MaxSatoshi {
peerLog.Debugf("Peer %s sent an invalid feefilter '%s' -- "+
"disconnecting", sp, util.Amount(msg.MinFee))
sp.Disconnect()
return
}
atomic.StoreInt64(&sp.FeeFilterInt, msg.MinFee)
}

View File

@ -0,0 +1,27 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnFilterAdd is invoked when a peer receives a filteradd bitcoin
// message and is used by remote peers to add data to an already loaded bloom
// filter. The peer will be disconnected if a filter is not loaded when this
// message is received or the server is not configured to allow bloom filters.
func (sp *Peer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) {
// Disconnect and/or ban depending on the node bloom services flag and
// negotiated protocol version.
if !sp.enforceNodeBloomFlag(msg.Command()) {
return
}
if sp.filter.IsLoaded() {
peerLog.Debugf("%s sent a filteradd request with no filter "+
"loaded -- disconnecting", sp)
sp.Disconnect()
return
}
sp.filter.Add(msg.Data)
}

View File

@ -0,0 +1,27 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnFilterClear is invoked when a peer receives a filterclear bitcoin
// message and is used by remote peers to clear an already loaded bloom filter.
// The peer will be disconnected if a filter is not loaded when this message is
// received or the server is not configured to allow bloom filters.
func (sp *Peer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) {
// Disconnect and/or ban depending on the node bloom services flag and
// negotiated protocol version.
if !sp.enforceNodeBloomFlag(msg.Command()) {
return
}
if !sp.filter.IsLoaded() {
peerLog.Debugf("%s sent a filterclear request with no "+
"filter loaded -- disconnecting", sp)
sp.Disconnect()
return
}
sp.filter.Unload()
}

View File

@ -0,0 +1,23 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnFilterLoad is invoked when a peer receives a filterload bitcoin
// message and it used to load a bloom filter that should be used for
// delivering merkle blocks and associated transactions that match the filter.
// The peer will be disconnected if the server is not configured to allow bloom
// filters.
func (sp *Peer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) {
// Disconnect and/or ban depending on the node bloom services flag and
// negotiated protocol version.
if !sp.enforceNodeBloomFlag(msg.Command()) {
return
}
sp.setDisableRelayTx(false)
sp.filter.Reload(msg)
}

43
server/p2p/on_get_addr.go Normal file
View File

@ -0,0 +1,43 @@
package p2p
import (
"github.com/daglabs/btcd/config"
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnGetAddr is invoked when a peer receives a getaddr bitcoin message
// and is used to provide the peer with known addresses from the address
// manager.
func (sp *Peer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
// Don't return any addresses when running on the simulation test
// network. This helps prevent the network from becoming another
// public test network since it will not be able to learn about other
// peers that have not specifically been provided.
if config.MainConfig().SimNet {
return
}
// Do not accept getaddr requests from outbound peers. This reduces
// fingerprinting attacks.
if !sp.Inbound() {
peerLog.Debugf("Ignoring getaddr request from outbound peer ",
"%s", sp)
return
}
// Only allow one getaddr request per connection to discourage
// address stamping of inv announcements.
if sp.sentAddrs {
peerLog.Debugf("Ignoring repeated getaddr request from peer ",
"%s", sp)
return
}
sp.sentAddrs = true
// Get the current known addresses from the address manager.
addrCache := sp.server.addrManager.AddressCache(msg.IncludeAllSubnetworks, msg.SubnetworkID)
// Push the addresses.
sp.pushAddrMsg(addrCache, sp.SubnetworkID())
}

View File

@ -0,0 +1,37 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnGetBlockInvs is invoked when a peer receives a getblockinvs bitcoin
// message.
// It finds the blue future between msg.StartHash and msg.StopHash
// and send the invs to the requesting peer.
func (sp *Peer) OnGetBlockInvs(_ *peer.Peer, msg *wire.MsgGetBlockInvs) {
dag := sp.server.DAG
// We want to prevent a situation where the syncing peer needs
// to call getblocks once again, but the block we sent him
// won't affect his selected chain, so next time it'll try
// to find the highest shared chain block, it'll find the
// same one as before.
// To prevent that we use blockdag.FinalityInterval as maxHashes.
// This way, if one getblocks is not enough to get the peer
// synced, we can know for sure that its selected chain will
// change, so we'll have higher shared chain block.
hashList := dag.GetBlueBlocksHashesBetween(msg.StartHash, msg.StopHash,
wire.MaxInvPerMsg)
// Generate inventory message.
invMsg := wire.NewMsgInv()
for i := range hashList {
iv := wire.NewInvVect(wire.InvTypeSyncBlock, hashList[i])
invMsg.AddInvVect(iv)
}
// Send the inventory message if there is anything to send.
if len(invMsg.InvList) > 0 {
sp.QueueMessage(invMsg, nil)
}
}

View File

@ -0,0 +1,25 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnGetBlockLocator is invoked when a peer receives a getlocator bitcoin
// message.
func (sp *Peer) OnGetBlockLocator(_ *peer.Peer, msg *wire.MsgGetBlockLocator) {
locator := sp.server.DAG.BlockLocatorFromHashes(msg.StartHash, msg.StopHash)
if len(locator) == 0 {
peerLog.Infof("Couldn't build a block locator between blocks %s and %s"+
" that was requested from peer %s",
sp)
return
}
err := sp.PushBlockLocatorMsg(locator)
if err != nil {
peerLog.Errorf("Failed to send block locator message to peer %s: %s",
sp, err)
return
}
}

View File

@ -0,0 +1,37 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message.
func (sp *Peer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
// Ignore getcfilters requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
hashes, err := sp.server.DAG.HeightToHashRange(msg.StartHeight,
msg.StopHash, wire.MaxGetCFiltersReqRange)
if err != nil {
peerLog.Debugf("Invalid getcfilters request: %s", err)
return
}
filters, err := sp.server.CfIndex.FiltersByBlockHashes(hashes,
msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving cfilters: %s", err)
return
}
for i, filterBytes := range filters {
if len(filterBytes) == 0 {
peerLog.Warnf("Could not obtain cfilter for %s", hashes[i])
return
}
filterMsg := wire.NewMsgCFilter(msg.FilterType, hashes[i], filterBytes)
sp.QueueMessage(filterMsg, nil)
}
}

View File

@ -0,0 +1,106 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"
)
// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
func (sp *Peer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
// Ignore getcfcheckpt requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
blockHashes, err := sp.server.DAG.IntervalBlockHashes(msg.StopHash,
wire.CFCheckptInterval)
if err != nil {
peerLog.Debugf("Invalid getcfilters request: %s", err)
return
}
var updateCache bool
var checkptCache []cfHeaderKV
if len(blockHashes) > len(checkptCache) {
// Update the cache if the checkpoint chain is longer than the cached
// one. This ensures that the cache is relatively stable and mostly
// overlaps with the best chain, since it follows the longest chain
// heuristic.
updateCache = true
// Take write lock because we are going to update cache.
sp.server.cfCheckptCachesMtx.Lock()
defer sp.server.cfCheckptCachesMtx.Unlock()
// Grow the checkptCache to be the length of blockHashes.
additionalLength := len(blockHashes) - len(checkptCache)
checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType],
make([]cfHeaderKV, additionalLength)...)
} else {
updateCache = false
// Take reader lock because we are not going to update cache.
sp.server.cfCheckptCachesMtx.RLock()
defer sp.server.cfCheckptCachesMtx.RUnlock()
checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
}
// Iterate backwards until the block hash is found in the cache.
var forkIdx int
for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- {
if checkptCache[forkIdx-1].blockHash.IsEqual(blockHashes[forkIdx-1]) {
break
}
}
// Populate results with cached checkpoints.
checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, msg.StopHash,
len(blockHashes))
for i := 0; i < forkIdx; i++ {
checkptMsg.AddCFHeader(checkptCache[i].filterHeader)
}
// Look up any filter headers that aren't cached.
blockHashPtrs := make([]*daghash.Hash, 0, len(blockHashes)-forkIdx)
for i := forkIdx; i < len(blockHashes); i++ {
blockHashPtrs = append(blockHashPtrs, blockHashes[i])
}
filterHeaders, err := sp.server.CfIndex.FilterHeadersByBlockHashes(blockHashPtrs,
msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving cfilter headers: %s", err)
return
}
for i, filterHeaderBytes := range filterHeaders {
if len(filterHeaderBytes) == 0 {
peerLog.Warnf("Could not obtain CF header for %s", blockHashPtrs[i])
return
}
filterHeader, err := daghash.NewHash(filterHeaderBytes)
if err != nil {
peerLog.Warnf("Committed filter header deserialize "+
"failed: %s", err)
return
}
checkptMsg.AddCFHeader(filterHeader)
if updateCache {
checkptCache[forkIdx+i] = cfHeaderKV{
blockHash: blockHashes[forkIdx+i],
filterHeader: filterHeader,
}
}
}
if updateCache {
sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
}
sp.QueueMessage(checkptMsg, nil)
}

View File

@ -0,0 +1,102 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"
)
// OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message.
func (sp *Peer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
// Ignore getcfilterheader requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
startHeight := msg.StartHeight
maxResults := wire.MaxCFHeadersPerMsg
// If StartHeight is positive, fetch the predecessor block hash so we can
// populate the PrevFilterHeader field.
if msg.StartHeight > 0 {
startHeight--
maxResults++
}
// Fetch the hashes from the block index.
hashList, err := sp.server.DAG.HeightToHashRange(startHeight,
msg.StopHash, maxResults)
if err != nil {
peerLog.Debugf("Invalid getcfheaders request: %s", err)
}
// This is possible if StartHeight is one greater that the height of
// StopHash, and we pull a valid range of hashes including the previous
// filter header.
if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) {
peerLog.Debug("No results for getcfheaders request")
return
}
// Fetch the raw filter hash bytes from the database for all blocks.
filterHashes, err := sp.server.CfIndex.FilterHashesByBlockHashes(hashList,
msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving cfilter hashes: %s", err)
return
}
// Generate cfheaders message and send it.
headersMsg := wire.NewMsgCFHeaders()
// Populate the PrevFilterHeader field.
if msg.StartHeight > 0 {
parentHash := hashList[0]
// Fetch the raw committed filter header bytes from the
// database.
headerBytes, err := sp.server.CfIndex.FilterHeaderByBlockHash(
parentHash, msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving CF header: %s", err)
return
}
if len(headerBytes) == 0 {
peerLog.Warnf("Could not obtain CF header for %s", parentHash)
return
}
// Deserialize the hash into PrevFilterHeader.
err = headersMsg.PrevFilterHeader.SetBytes(headerBytes)
if err != nil {
peerLog.Warnf("Committed filter header deserialize "+
"failed: %s", err)
return
}
hashList = hashList[1:]
filterHashes = filterHashes[1:]
}
// Populate HeaderHashes.
for i, hashBytes := range filterHashes {
if len(hashBytes) == 0 {
peerLog.Warnf("Could not obtain CF hash for %s", hashList[i])
return
}
// Deserialize the hash.
filterHash, err := daghash.NewHash(hashBytes)
if err != nil {
peerLog.Warnf("Committed filter hash deserialize "+
"failed: %s", err)
return
}
headersMsg.AddCFHash(filterHash)
}
headersMsg.FilterType = msg.FilterType
headersMsg.StopHash = msg.StopHash
sp.QueueMessage(headersMsg, nil)
}

83
server/p2p/on_get_data.go Normal file
View File

@ -0,0 +1,83 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"
)
// OnGetData is invoked when a peer receives a getdata bitcoin message and
// is used to deliver block and transaction information.
func (sp *Peer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
numAdded := 0
notFound := wire.NewMsgNotFound()
length := len(msg.InvList)
// A decaying ban score increase is applied to prevent exhausting resources
// with unusually large inventory queries.
// Requesting more than the maximum inventory vector length within a short
// period of time yields a score above the default ban threshold. Sustained
// bursts of small requests are not penalized as that would potentially ban
// peers performing IBD.
// This incremental score decays each minute to half of its value.
sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
// We wait on this wait channel periodically to prevent queuing
// far more data than we can send in a reasonable time, wasting memory.
// The waiting occurs after the database fetch for the next one to
// provide a little pipelining.
var waitChan chan struct{}
doneChan := make(chan struct{}, 1)
for i, iv := range msg.InvList {
var c chan struct{}
// If this will be the last message we send.
if i == length-1 && len(notFound.InvList) == 0 {
c = doneChan
} else if (i+1)%3 == 0 {
// Buffered so as to not make the send goroutine block.
c = make(chan struct{}, 1)
}
var err error
switch iv.Type {
case wire.InvTypeTx:
err = sp.server.pushTxMsg(sp, (*daghash.TxID)(iv.Hash), c, waitChan)
case wire.InvTypeSyncBlock:
fallthrough
case wire.InvTypeBlock:
err = sp.server.pushBlockMsg(sp, iv.Hash, c, waitChan)
case wire.InvTypeFilteredBlock:
err = sp.server.pushMerkleBlockMsg(sp, iv.Hash, c, waitChan)
default:
peerLog.Warnf("Unknown type in inventory request %d",
iv.Type)
continue
}
if err != nil {
notFound.AddInvVect(iv)
// When there is a failure fetching the final entry
// and the done channel was sent in due to there
// being no outstanding not found inventory, consume
// it here because there is now not found inventory
// that will use the channel momentarily.
if i == len(msg.InvList)-1 && c != nil {
<-c
}
}
numAdded++
waitChan = c
}
if len(notFound.InvList) != 0 {
sp.QueueMessage(notFound, doneChan)
}
// Wait for messages to be sent. We can send quite a lot of data at this
// point and this will keep the peer busy for a decent amount of time.
// We don't process anything else by them in this time so that we
// have an idea of when we should hear back from them - else the idle
// timeout could fire when we were only half done sending the blocks.
if numAdded > 0 {
<-doneChan
}
}

View File

@ -0,0 +1,33 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
// message.
func (sp *Peer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
// Ignore getheaders requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
// Find the most recent known block in the best chain based on the block
// locator and fetch all of the headers after it until either
// wire.MaxBlockHeadersPerMsg have been fetched or the provided stop
// hash is encountered.
//
// Use the block after the genesis block if no other blocks in the
// provided locator are known. This does mean the client will start
// over with the genesis block if unknown block locators are provided.
dag := sp.server.DAG
headers := dag.GetBlueBlocksHeadersBetween(msg.StartHash, msg.StopHash)
// Send found headers to the requesting peer.
blockHeaders := make([]*wire.BlockHeader, len(headers))
for i := range headers {
blockHeaders[i] = headers[i]
}
sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
}

12
server/p2p/on_headers.go Normal file
View File

@ -0,0 +1,12 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnHeaders is invoked when a peer receives a headers bitcoin
// message. The message is passed down to the sync manager.
func (sp *Peer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
sp.server.SyncManager.QueueHeaders(msg, sp.Peer)
}

41
server/p2p/on_inv.go Normal file
View File

@ -0,0 +1,41 @@
package p2p
import (
"github.com/daglabs/btcd/config"
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnInv is invoked when a peer receives an inv bitcoin message and is
// used to examine the inventory being advertised by the remote peer and react
// accordingly. We pass the message down to blockmanager which will call
// QueueMessage with any appropriate responses.
func (sp *Peer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
if !config.MainConfig().BlocksOnly {
if len(msg.InvList) > 0 {
sp.server.SyncManager.QueueInv(msg, sp.Peer)
}
return
}
newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
for _, invVect := range msg.InvList {
if invVect.Type == wire.InvTypeTx {
peerLog.Tracef("Ignoring tx %s in inv from %s -- "+
"blocksonly enabled", invVect.Hash, sp)
peerLog.Infof("Peer %s is announcing "+
"transactions -- disconnecting", sp)
sp.Disconnect()
return
}
err := newInv.AddInvVect(invVect)
if err != nil {
peerLog.Errorf("Failed to add inventory vector: %s", err)
break
}
}
if len(newInv.InvList) > 0 {
sp.server.SyncManager.QueueInv(newInv, sp.Peer)
}
}

55
server/p2p/on_mempool.go Normal file
View File

@ -0,0 +1,55 @@
package p2p
import (
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"
)
// OnMemPool is invoked when a peer receives a mempool bitcoin message.
// It creates and sends an inventory message with the contents of the memory
// pool up to the maximum inventory allowed per message. When the peer has a
// bloom filter loaded, the contents are filtered accordingly.
func (sp *Peer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
// Only allow mempool requests if the server has bloom filtering
// enabled.
if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom {
peerLog.Debugf("peer %s sent mempool request with bloom "+
"filtering disabled -- disconnecting", sp)
sp.Disconnect()
return
}
// A decaying ban score increase is applied to prevent flooding.
// The ban score accumulates and passes the ban threshold if a burst of
// mempool messages comes from a peer. The score decays each minute to
// half of its value.
sp.addBanScore(0, 33, "mempool")
// Generate inventory message with the available transactions in the
// transaction memory pool. Limit it to the max allowed inventory
// per message. The NewMsgInvSizeHint function automatically limits
// the passed hint to the maximum allowed, so it's safe to pass it
// without double checking it here.
txMemPool := sp.server.TxMemPool
txDescs := txMemPool.TxDescs()
invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
for _, txDesc := range txDescs {
// Either add all transactions when there is no bloom filter,
// or only the transactions that match the filter when there is
// one.
if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) {
iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(txDesc.Tx.ID()))
invMsg.AddInvVect(iv)
if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
break
}
}
}
// Send the inventory message if there is anything to send.
if len(invMsg.InvList) > 0 {
sp.QueueMessage(invMsg, nil)
}
}

36
server/p2p/on_tx.go Normal file
View File

@ -0,0 +1,36 @@
package p2p
import (
"github.com/daglabs/btcd/config"
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"
)
// OnTx is invoked when a peer receives a tx bitcoin message. It blocks
// until the bitcoin transaction has been fully processed. Unlock the block
// handler this does not serialize all transactions through a single thread
// transactions don't rely on the previous one in a linear fashion like blocks.
func (sp *Peer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
if config.MainConfig().BlocksOnly {
peerLog.Tracef("Ignoring tx %s from %s - blocksonly enabled",
msg.TxID(), sp)
return
}
// Add the transaction to the known inventory for the peer.
// Convert the raw MsgTx to a util.Tx which provides some convenience
// methods and things such as hash caching.
tx := util.NewTx(msg)
iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(tx.ID()))
sp.AddKnownInventory(iv)
// Queue the transaction up to be handled by the sync manager and
// intentionally block further receives until the transaction is fully
// processed and known good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad transactions before disconnecting (or
// being disconnected) and wasting memory.
sp.server.SyncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
<-sp.txProcessed
}

64
server/p2p/on_version.go Normal file
View File

@ -0,0 +1,64 @@
package p2p
import (
"github.com/daglabs/btcd/addrmgr"
"github.com/daglabs/btcd/config"
"github.com/daglabs/btcd/peer"
"github.com/daglabs/btcd/wire"
)
// OnVersion is invoked when a peer receives a version bitcoin message
// and is used to negotiate the protocol version details as well as kick start
// the communications.
func (sp *Peer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// Add the remote peer time as a sample for creating an offset against
// the local clock to keep the network time in sync.
sp.server.TimeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
// Signal the sync manager this peer is a new sync candidate.
sp.server.SyncManager.NewPeer(sp.Peer)
// Choose whether or not to relay transactions before a filter command
// is received.
sp.setDisableRelayTx(msg.DisableRelayTx)
// Update the address manager and request known addresses from the
// remote peer for outbound connections. This is skipped when running
// on the simulation test network since it is only intended to connect
// to specified peers and actively avoids advertising and connecting to
// discovered peers.
if !config.MainConfig().SimNet {
addrManager := sp.server.addrManager
// Outbound connections.
if !sp.Inbound() {
// TODO(davec): Only do this if not doing the initial block
// download and the local address is routable.
if !config.MainConfig().DisableListen /* && isCurrent? */ {
// Get address that best matches.
lna := addrManager.GetBestLocalAddress(sp.NA())
if addrmgr.IsRoutable(lna) {
// Filter addresses the peer already knows about.
addresses := []*wire.NetAddress{lna}
sp.pushAddrMsg(addresses, sp.SubnetworkID())
}
}
// Request known addresses if the server address manager needs
// more.
if addrManager.NeedMoreAddresses() {
sp.QueueMessage(wire.NewMsgGetAddr(false, sp.SubnetworkID()), nil)
if sp.SubnetworkID() != nil {
sp.QueueMessage(wire.NewMsgGetAddr(false, nil), nil)
}
}
// Mark the address as a known good address.
addrManager.Good(sp.NA(), msg.SubnetworkID)
}
}
// Add valid peer to the server.
sp.server.AddPeer(sp)
}

View File

@ -410,619 +410,6 @@ func (sp *Peer) addBanScore(persistent, transient uint32, reason string) {
}
}
// OnVersion is invoked when a peer receives a version bitcoin message
// and is used to negotiate the protocol version details as well as kick start
// the communications.
func (sp *Peer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// Add the remote peer time as a sample for creating an offset against
// the local clock to keep the network time in sync.
sp.server.TimeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
// Signal the sync manager this peer is a new sync candidate.
sp.server.SyncManager.NewPeer(sp.Peer)
// Choose whether or not to relay transactions before a filter command
// is received.
sp.setDisableRelayTx(msg.DisableRelayTx)
// Update the address manager and request known addresses from the
// remote peer for outbound connections. This is skipped when running
// on the simulation test network since it is only intended to connect
// to specified peers and actively avoids advertising and connecting to
// discovered peers.
if !config.MainConfig().SimNet {
addrManager := sp.server.addrManager
// Outbound connections.
if !sp.Inbound() {
// TODO(davec): Only do this if not doing the initial block
// download and the local address is routable.
if !config.MainConfig().DisableListen /* && isCurrent? */ {
// Get address that best matches.
lna := addrManager.GetBestLocalAddress(sp.NA())
if addrmgr.IsRoutable(lna) {
// Filter addresses the peer already knows about.
addresses := []*wire.NetAddress{lna}
sp.pushAddrMsg(addresses, sp.SubnetworkID())
}
}
// Request known addresses if the server address manager needs
// more.
if addrManager.NeedMoreAddresses() {
sp.QueueMessage(wire.NewMsgGetAddr(false, sp.SubnetworkID()), nil)
if sp.SubnetworkID() != nil {
sp.QueueMessage(wire.NewMsgGetAddr(false, nil), nil)
}
}
// Mark the address as a known good address.
addrManager.Good(sp.NA(), msg.SubnetworkID)
}
}
// Add valid peer to the server.
sp.server.AddPeer(sp)
}
// OnMemPool is invoked when a peer receives a mempool bitcoin message.
// It creates and sends an inventory message with the contents of the memory
// pool up to the maximum inventory allowed per message. When the peer has a
// bloom filter loaded, the contents are filtered accordingly.
func (sp *Peer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
// Only allow mempool requests if the server has bloom filtering
// enabled.
if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom {
peerLog.Debugf("peer %s sent mempool request with bloom "+
"filtering disabled -- disconnecting", sp)
sp.Disconnect()
return
}
// A decaying ban score increase is applied to prevent flooding.
// The ban score accumulates and passes the ban threshold if a burst of
// mempool messages comes from a peer. The score decays each minute to
// half of its value.
sp.addBanScore(0, 33, "mempool")
// Generate inventory message with the available transactions in the
// transaction memory pool. Limit it to the max allowed inventory
// per message. The NewMsgInvSizeHint function automatically limits
// the passed hint to the maximum allowed, so it's safe to pass it
// without double checking it here.
txMemPool := sp.server.TxMemPool
txDescs := txMemPool.TxDescs()
invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
for _, txDesc := range txDescs {
// Either add all transactions when there is no bloom filter,
// or only the transactions that match the filter when there is
// one.
if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) {
iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(txDesc.Tx.ID()))
invMsg.AddInvVect(iv)
if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
break
}
}
}
// Send the inventory message if there is anything to send.
if len(invMsg.InvList) > 0 {
sp.QueueMessage(invMsg, nil)
}
}
// OnTx is invoked when a peer receives a tx bitcoin message. It blocks
// until the bitcoin transaction has been fully processed. Unlock the block
// handler this does not serialize all transactions through a single thread
// transactions don't rely on the previous one in a linear fashion like blocks.
func (sp *Peer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
if config.MainConfig().BlocksOnly {
peerLog.Tracef("Ignoring tx %s from %s - blocksonly enabled",
msg.TxID(), sp)
return
}
// Add the transaction to the known inventory for the peer.
// Convert the raw MsgTx to a util.Tx which provides some convenience
// methods and things such as hash caching.
tx := util.NewTx(msg)
iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(tx.ID()))
sp.AddKnownInventory(iv)
// Queue the transaction up to be handled by the sync manager and
// intentionally block further receives until the transaction is fully
// processed and known good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad transactions before disconnecting (or
// being disconnected) and wasting memory.
sp.server.SyncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
<-sp.txProcessed
}
// OnBlock is invoked when a peer receives a block bitcoin message. It
// blocks until the bitcoin block has been fully processed.
func (sp *Peer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
// Convert the raw MsgBlock to a util.Block which provides some
// convenience methods and things such as hash caching.
block := util.NewBlockFromBlockAndBytes(msg, buf)
// Add the block to the known inventory for the peer.
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sp.AddKnownInventory(iv)
// Queue the block up to be handled by the block
// manager and intentionally block further receives
// until the bitcoin block is fully processed and known
// good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad blocks before
// disconnecting (or being disconnected) and wasting
// memory. Additionally, this behavior is depended on
// by at least the block acceptance test tool as the
// reference implementation processes blocks in the same
// thread and therefore blocks further messages until
// the bitcoin block has been fully processed.
sp.server.SyncManager.QueueBlock(block, sp.Peer, false, sp.blockProcessed)
<-sp.blockProcessed
}
// OnInv is invoked when a peer receives an inv bitcoin message and is
// used to examine the inventory being advertised by the remote peer and react
// accordingly. We pass the message down to blockmanager which will call
// QueueMessage with any appropriate responses.
func (sp *Peer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
if !config.MainConfig().BlocksOnly {
if len(msg.InvList) > 0 {
sp.server.SyncManager.QueueInv(msg, sp.Peer)
}
return
}
newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
for _, invVect := range msg.InvList {
if invVect.Type == wire.InvTypeTx {
peerLog.Tracef("Ignoring tx %s in inv from %s -- "+
"blocksonly enabled", invVect.Hash, sp)
peerLog.Infof("Peer %s is announcing "+
"transactions -- disconnecting", sp)
sp.Disconnect()
return
}
err := newInv.AddInvVect(invVect)
if err != nil {
peerLog.Errorf("Failed to add inventory vector: %s", err)
break
}
}
if len(newInv.InvList) > 0 {
sp.server.SyncManager.QueueInv(newInv, sp.Peer)
}
}
// OnHeaders is invoked when a peer receives a headers bitcoin
// message. The message is passed down to the sync manager.
func (sp *Peer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
sp.server.SyncManager.QueueHeaders(msg, sp.Peer)
}
// OnGetData is invoked when a peer receives a getdata bitcoin message and
// is used to deliver block and transaction information.
func (sp *Peer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
numAdded := 0
notFound := wire.NewMsgNotFound()
length := len(msg.InvList)
// A decaying ban score increase is applied to prevent exhausting resources
// with unusually large inventory queries.
// Requesting more than the maximum inventory vector length within a short
// period of time yields a score above the default ban threshold. Sustained
// bursts of small requests are not penalized as that would potentially ban
// peers performing IBD.
// This incremental score decays each minute to half of its value.
sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
// We wait on this wait channel periodically to prevent queuing
// far more data than we can send in a reasonable time, wasting memory.
// The waiting occurs after the database fetch for the next one to
// provide a little pipelining.
var waitChan chan struct{}
doneChan := make(chan struct{}, 1)
for i, iv := range msg.InvList {
var c chan struct{}
// If this will be the last message we send.
if i == length-1 && len(notFound.InvList) == 0 {
c = doneChan
} else if (i+1)%3 == 0 {
// Buffered so as to not make the send goroutine block.
c = make(chan struct{}, 1)
}
var err error
switch iv.Type {
case wire.InvTypeTx:
err = sp.server.pushTxMsg(sp, (*daghash.TxID)(iv.Hash), c, waitChan)
case wire.InvTypeSyncBlock:
fallthrough
case wire.InvTypeBlock:
err = sp.server.pushBlockMsg(sp, iv.Hash, c, waitChan)
case wire.InvTypeFilteredBlock:
err = sp.server.pushMerkleBlockMsg(sp, iv.Hash, c, waitChan)
default:
peerLog.Warnf("Unknown type in inventory request %d",
iv.Type)
continue
}
if err != nil {
notFound.AddInvVect(iv)
// When there is a failure fetching the final entry
// and the done channel was sent in due to there
// being no outstanding not found inventory, consume
// it here because there is now not found inventory
// that will use the channel momentarily.
if i == len(msg.InvList)-1 && c != nil {
<-c
}
}
numAdded++
waitChan = c
}
if len(notFound.InvList) != 0 {
sp.QueueMessage(notFound, doneChan)
}
// Wait for messages to be sent. We can send quite a lot of data at this
// point and this will keep the peer busy for a decent amount of time.
// We don't process anything else by them in this time so that we
// have an idea of when we should hear back from them - else the idle
// timeout could fire when we were only half done sending the blocks.
if numAdded > 0 {
<-doneChan
}
}
// OnGetBlockLocator is invoked when a peer receives a getlocator bitcoin
// message.
func (sp *Peer) OnGetBlockLocator(_ *peer.Peer, msg *wire.MsgGetBlockLocator) {
locator := sp.server.DAG.BlockLocatorFromHashes(msg.StartHash, msg.StopHash)
if len(locator) == 0 {
peerLog.Infof("Couldn't build a block locator between blocks %s and %s"+
" that was requested from peer %s",
sp)
return
}
err := sp.PushBlockLocatorMsg(locator)
if err != nil {
peerLog.Errorf("Failed to send block locator message to peer %s: %s",
sp, err)
return
}
}
// OnBlockLocator is invoked when a peer receives a locator bitcoin
// message.
func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) {
// Find the highest known shared block between the peers, and asks
// the block and its future from the peer. If the block is not
// found, create a lower resolution block locator and send it to
// the peer in order to find it in the next iteration.
dag := sp.server.DAG
if len(msg.BlockLocatorHashes) == 0 {
peerLog.Warnf("Got empty block locator from peer %s",
sp)
return
}
// If the first hash of the block locator is known, it means we found
// the highest shared block.
firstHash := msg.BlockLocatorHashes[0]
if dag.BlockExists(firstHash) {
if dag.IsKnownFinalizedBlock(firstHash) {
peerLog.Warnf("Cannot sync with peer %s because the highest"+
" shared chain block (%s) is below the finality point", sp, firstHash)
sp.Disconnect()
return
}
err := sp.server.SyncManager.PushGetBlockInvsOrHeaders(sp.Peer, firstHash)
if err != nil {
peerLog.Errorf("Failed pushing get blocks message for peer %s: %s",
sp, err)
return
}
return
}
startHash, stopHash := dag.FindNextLocatorBoundaries(msg.BlockLocatorHashes)
if startHash == nil {
panic("Couldn't find any unknown hashes in the block locator.")
}
sp.PushGetBlockLocatorMsg(startHash, stopHash)
}
// OnGetBlockInvs is invoked when a peer receives a getblockinvs bitcoin
// message.
// It finds the blue future between msg.StartHash and msg.StopHash
// and send the invs to the requesting peer.
func (sp *Peer) OnGetBlockInvs(_ *peer.Peer, msg *wire.MsgGetBlockInvs) {
dag := sp.server.DAG
// We want to prevent a situation where the syncing peer needs
// to call getblocks once again, but the block we sent him
// won't affect his selected chain, so next time it'll try
// to find the highest shared chain block, it'll find the
// same one as before.
// To prevent that we use blockdag.FinalityInterval as maxHashes.
// This way, if one getblocks is not enough to get the peer
// synced, we can know for sure that its selected chain will
// change, so we'll have higher shared chain block.
hashList := dag.GetBlueBlocksHashesBetween(msg.StartHash, msg.StopHash,
wire.MaxInvPerMsg)
// Generate inventory message.
invMsg := wire.NewMsgInv()
for i := range hashList {
iv := wire.NewInvVect(wire.InvTypeSyncBlock, hashList[i])
invMsg.AddInvVect(iv)
}
// Send the inventory message if there is anything to send.
if len(invMsg.InvList) > 0 {
sp.QueueMessage(invMsg, nil)
}
}
// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
// message.
func (sp *Peer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
// Ignore getheaders requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
// Find the most recent known block in the best chain based on the block
// locator and fetch all of the headers after it until either
// wire.MaxBlockHeadersPerMsg have been fetched or the provided stop
// hash is encountered.
//
// Use the block after the genesis block if no other blocks in the
// provided locator are known. This does mean the client will start
// over with the genesis block if unknown block locators are provided.
dag := sp.server.DAG
headers := dag.GetBlueBlocksHeadersBetween(msg.StartHash, msg.StopHash)
// Send found headers to the requesting peer.
blockHeaders := make([]*wire.BlockHeader, len(headers))
for i := range headers {
blockHeaders[i] = headers[i]
}
sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
}
// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message.
func (sp *Peer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
// Ignore getcfilters requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
hashes, err := sp.server.DAG.HeightToHashRange(msg.StartHeight,
msg.StopHash, wire.MaxGetCFiltersReqRange)
if err != nil {
peerLog.Debugf("Invalid getcfilters request: %s", err)
return
}
filters, err := sp.server.CfIndex.FiltersByBlockHashes(hashes,
msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving cfilters: %s", err)
return
}
for i, filterBytes := range filters {
if len(filterBytes) == 0 {
peerLog.Warnf("Could not obtain cfilter for %s", hashes[i])
return
}
filterMsg := wire.NewMsgCFilter(msg.FilterType, hashes[i], filterBytes)
sp.QueueMessage(filterMsg, nil)
}
}
// OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message.
func (sp *Peer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
// Ignore getcfilterheader requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
startHeight := msg.StartHeight
maxResults := wire.MaxCFHeadersPerMsg
// If StartHeight is positive, fetch the predecessor block hash so we can
// populate the PrevFilterHeader field.
if msg.StartHeight > 0 {
startHeight--
maxResults++
}
// Fetch the hashes from the block index.
hashList, err := sp.server.DAG.HeightToHashRange(startHeight,
msg.StopHash, maxResults)
if err != nil {
peerLog.Debugf("Invalid getcfheaders request: %s", err)
}
// This is possible if StartHeight is one greater that the height of
// StopHash, and we pull a valid range of hashes including the previous
// filter header.
if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) {
peerLog.Debug("No results for getcfheaders request")
return
}
// Fetch the raw filter hash bytes from the database for all blocks.
filterHashes, err := sp.server.CfIndex.FilterHashesByBlockHashes(hashList,
msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving cfilter hashes: %s", err)
return
}
// Generate cfheaders message and send it.
headersMsg := wire.NewMsgCFHeaders()
// Populate the PrevFilterHeader field.
if msg.StartHeight > 0 {
parentHash := hashList[0]
// Fetch the raw committed filter header bytes from the
// database.
headerBytes, err := sp.server.CfIndex.FilterHeaderByBlockHash(
parentHash, msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving CF header: %s", err)
return
}
if len(headerBytes) == 0 {
peerLog.Warnf("Could not obtain CF header for %s", parentHash)
return
}
// Deserialize the hash into PrevFilterHeader.
err = headersMsg.PrevFilterHeader.SetBytes(headerBytes)
if err != nil {
peerLog.Warnf("Committed filter header deserialize "+
"failed: %s", err)
return
}
hashList = hashList[1:]
filterHashes = filterHashes[1:]
}
// Populate HeaderHashes.
for i, hashBytes := range filterHashes {
if len(hashBytes) == 0 {
peerLog.Warnf("Could not obtain CF hash for %s", hashList[i])
return
}
// Deserialize the hash.
filterHash, err := daghash.NewHash(hashBytes)
if err != nil {
peerLog.Warnf("Committed filter hash deserialize "+
"failed: %s", err)
return
}
headersMsg.AddCFHash(filterHash)
}
headersMsg.FilterType = msg.FilterType
headersMsg.StopHash = msg.StopHash
sp.QueueMessage(headersMsg, nil)
}
// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
func (sp *Peer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
// Ignore getcfcheckpt requests if not in sync.
if !sp.server.SyncManager.IsCurrent() {
return
}
blockHashes, err := sp.server.DAG.IntervalBlockHashes(msg.StopHash,
wire.CFCheckptInterval)
if err != nil {
peerLog.Debugf("Invalid getcfilters request: %s", err)
return
}
var updateCache bool
var checkptCache []cfHeaderKV
if len(blockHashes) > len(checkptCache) {
// Update the cache if the checkpoint chain is longer than the cached
// one. This ensures that the cache is relatively stable and mostly
// overlaps with the best chain, since it follows the longest chain
// heuristic.
updateCache = true
// Take write lock because we are going to update cache.
sp.server.cfCheckptCachesMtx.Lock()
defer sp.server.cfCheckptCachesMtx.Unlock()
// Grow the checkptCache to be the length of blockHashes.
additionalLength := len(blockHashes) - len(checkptCache)
checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType],
make([]cfHeaderKV, additionalLength)...)
} else {
updateCache = false
// Take reader lock because we are not going to update cache.
sp.server.cfCheckptCachesMtx.RLock()
defer sp.server.cfCheckptCachesMtx.RUnlock()
checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
}
// Iterate backwards until the block hash is found in the cache.
var forkIdx int
for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- {
if checkptCache[forkIdx-1].blockHash.IsEqual(blockHashes[forkIdx-1]) {
break
}
}
// Populate results with cached checkpoints.
checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, msg.StopHash,
len(blockHashes))
for i := 0; i < forkIdx; i++ {
checkptMsg.AddCFHeader(checkptCache[i].filterHeader)
}
// Look up any filter headers that aren't cached.
blockHashPtrs := make([]*daghash.Hash, 0, len(blockHashes)-forkIdx)
for i := forkIdx; i < len(blockHashes); i++ {
blockHashPtrs = append(blockHashPtrs, blockHashes[i])
}
filterHeaders, err := sp.server.CfIndex.FilterHeadersByBlockHashes(blockHashPtrs,
msg.FilterType)
if err != nil {
peerLog.Errorf("Error retrieving cfilter headers: %s", err)
return
}
for i, filterHeaderBytes := range filterHeaders {
if len(filterHeaderBytes) == 0 {
peerLog.Warnf("Could not obtain CF header for %s", blockHashPtrs[i])
return
}
filterHeader, err := daghash.NewHash(filterHeaderBytes)
if err != nil {
peerLog.Warnf("Committed filter header deserialize "+
"failed: %s", err)
return
}
checkptMsg.AddCFHeader(filterHeader)
if updateCache {
checkptCache[forkIdx+i] = cfHeaderKV{
blockHash: blockHashes[forkIdx+i],
filterHeader: filterHeader,
}
}
}
if updateCache {
sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
}
sp.QueueMessage(checkptMsg, nil)
}
// enforceNodeBloomFlag disconnects the peer if the server is not configured to
// allow bloom filters. Additionally, if the peer has negotiated to a protocol
// version that is high enough to observe the bloom filter service support bit,
@ -1053,174 +440,6 @@ func (sp *Peer) enforceNodeBloomFlag(cmd string) bool {
return true
}
// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and
// is used by remote peers to request that no transactions which have a fee rate
// lower than provided value are inventoried to them. The peer will be
// disconnected if an invalid fee filter value is provided.
func (sp *Peer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) {
// Check that the passed minimum fee is a valid amount.
if msg.MinFee < 0 || msg.MinFee > util.MaxSatoshi {
peerLog.Debugf("Peer %s sent an invalid feefilter '%s' -- "+
"disconnecting", sp, util.Amount(msg.MinFee))
sp.Disconnect()
return
}
atomic.StoreInt64(&sp.FeeFilterInt, msg.MinFee)
}
// OnFilterAdd is invoked when a peer receives a filteradd bitcoin
// message and is used by remote peers to add data to an already loaded bloom
// filter. The peer will be disconnected if a filter is not loaded when this
// message is received or the server is not configured to allow bloom filters.
func (sp *Peer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) {
// Disconnect and/or ban depending on the node bloom services flag and
// negotiated protocol version.
if !sp.enforceNodeBloomFlag(msg.Command()) {
return
}
if sp.filter.IsLoaded() {
peerLog.Debugf("%s sent a filteradd request with no filter "+
"loaded -- disconnecting", sp)
sp.Disconnect()
return
}
sp.filter.Add(msg.Data)
}
// OnFilterClear is invoked when a peer receives a filterclear bitcoin
// message and is used by remote peers to clear an already loaded bloom filter.
// The peer will be disconnected if a filter is not loaded when this message is
// received or the server is not configured to allow bloom filters.
func (sp *Peer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) {
// Disconnect and/or ban depending on the node bloom services flag and
// negotiated protocol version.
if !sp.enforceNodeBloomFlag(msg.Command()) {
return
}
if !sp.filter.IsLoaded() {
peerLog.Debugf("%s sent a filterclear request with no "+
"filter loaded -- disconnecting", sp)
sp.Disconnect()
return
}
sp.filter.Unload()
}
// OnFilterLoad is invoked when a peer receives a filterload bitcoin
// message and it used to load a bloom filter that should be used for
// delivering merkle blocks and associated transactions that match the filter.
// The peer will be disconnected if the server is not configured to allow bloom
// filters.
func (sp *Peer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) {
// Disconnect and/or ban depending on the node bloom services flag and
// negotiated protocol version.
if !sp.enforceNodeBloomFlag(msg.Command()) {
return
}
sp.setDisableRelayTx(false)
sp.filter.Reload(msg)
}
// OnGetAddr is invoked when a peer receives a getaddr bitcoin message
// and is used to provide the peer with known addresses from the address
// manager.
func (sp *Peer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
// Don't return any addresses when running on the simulation test
// network. This helps prevent the network from becoming another
// public test network since it will not be able to learn about other
// peers that have not specifically been provided.
if config.MainConfig().SimNet {
return
}
// Do not accept getaddr requests from outbound peers. This reduces
// fingerprinting attacks.
if !sp.Inbound() {
peerLog.Debugf("Ignoring getaddr request from outbound peer ",
"%s", sp)
return
}
// Only allow one getaddr request per connection to discourage
// address stamping of inv announcements.
if sp.sentAddrs {
peerLog.Debugf("Ignoring repeated getaddr request from peer ",
"%s", sp)
return
}
sp.sentAddrs = true
// Get the current known addresses from the address manager.
addrCache := sp.server.addrManager.AddressCache(msg.IncludeAllSubnetworks, msg.SubnetworkID)
// Push the addresses.
sp.pushAddrMsg(addrCache, sp.SubnetworkID())
}
// OnAddr is invoked when a peer receives an addr bitcoin message and is
// used to notify the server about advertised addresses.
func (sp *Peer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
// Ignore addresses when running on the simulation test network. This
// helps prevent the network from becoming another public test network
// since it will not be able to learn about other peers that have not
// specifically been provided.
if config.MainConfig().SimNet {
return
}
// A message that has no addresses is invalid.
if len(msg.AddrList) == 0 {
peerLog.Errorf("Command [%s] from %s does not contain any addresses",
msg.Command(), sp.Peer)
sp.Disconnect()
return
}
if msg.IncludeAllSubnetworks {
peerLog.Errorf("Got unexpected IncludeAllSubnetworks=true in [%s] command from %s",
msg.Command(), sp.Peer)
sp.Disconnect()
return
} else if !msg.SubnetworkID.IsEqual(config.MainConfig().SubnetworkID) && msg.SubnetworkID != nil {
peerLog.Errorf("Only full nodes and %s subnetwork IDs are allowed in [%s] command, but got subnetwork ID %s from %s",
config.MainConfig().SubnetworkID, msg.Command(), msg.SubnetworkID, sp.Peer)
sp.Disconnect()
return
}
for _, na := range msg.AddrList {
// Don't add more address if we're disconnecting.
if !sp.Connected() {
return
}
// Set the timestamp to 5 days ago if it's more than 24 hours
// in the future so this address is one of the first to be
// removed when space is needed.
now := time.Now()
if na.Timestamp.After(now.Add(time.Minute * 10)) {
na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
}
// Add address to known addresses for this peer.
sp.addKnownAddresses([]*wire.NetAddress{na})
}
// Add addresses to server address manager. The address manager handles
// the details of things such as preventing duplicate addresses, max
// addresses, and last seen updates.
// XXX bitcoind gives a 2 hour time penalty here, do we want to do the
// same?
sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA(), msg.SubnetworkID)
}
// OnRead is invoked when a peer receives a message and it is used to update
// the bytes received by the server.
func (sp *Peer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) {