diff --git a/server/p2p/on_addr.go b/server/p2p/on_addr.go new file mode 100644 index 000000000..59ca80357 --- /dev/null +++ b/server/p2p/on_addr.go @@ -0,0 +1,65 @@ +package p2p + +import ( + "github.com/daglabs/btcd/config" + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" + "time" +) + +// OnAddr is invoked when a peer receives an addr bitcoin message and is +// used to notify the server about advertised addresses. +func (sp *Peer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { + // Ignore addresses when running on the simulation test network. This + // helps prevent the network from becoming another public test network + // since it will not be able to learn about other peers that have not + // specifically been provided. + if config.MainConfig().SimNet { + return + } + + // A message that has no addresses is invalid. + if len(msg.AddrList) == 0 { + peerLog.Errorf("Command [%s] from %s does not contain any addresses", + msg.Command(), sp.Peer) + sp.Disconnect() + return + } + + if msg.IncludeAllSubnetworks { + peerLog.Errorf("Got unexpected IncludeAllSubnetworks=true in [%s] command from %s", + msg.Command(), sp.Peer) + sp.Disconnect() + return + } else if !msg.SubnetworkID.IsEqual(config.MainConfig().SubnetworkID) && msg.SubnetworkID != nil { + peerLog.Errorf("Only full nodes and %s subnetwork IDs are allowed in [%s] command, but got subnetwork ID %s from %s", + config.MainConfig().SubnetworkID, msg.Command(), msg.SubnetworkID, sp.Peer) + sp.Disconnect() + return + } + + for _, na := range msg.AddrList { + // Don't add more address if we're disconnecting. + if !sp.Connected() { + return + } + + // Set the timestamp to 5 days ago if it's more than 24 hours + // in the future so this address is one of the first to be + // removed when space is needed. + now := time.Now() + if na.Timestamp.After(now.Add(time.Minute * 10)) { + na.Timestamp = now.Add(-1 * time.Hour * 24 * 5) + } + + // Add address to known addresses for this peer. + sp.addKnownAddresses([]*wire.NetAddress{na}) + } + + // Add addresses to server address manager. The address manager handles + // the details of things such as preventing duplicate addresses, max + // addresses, and last seen updates. + // XXX bitcoind gives a 2 hour time penalty here, do we want to do the + // same? + sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA(), msg.SubnetworkID) +} diff --git a/server/p2p/on_block.go b/server/p2p/on_block.go new file mode 100644 index 000000000..006ac0bfb --- /dev/null +++ b/server/p2p/on_block.go @@ -0,0 +1,33 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/util" + "github.com/daglabs/btcd/wire" +) + +// OnBlock is invoked when a peer receives a block bitcoin message. It +// blocks until the bitcoin block has been fully processed. +func (sp *Peer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { + // Convert the raw MsgBlock to a util.Block which provides some + // convenience methods and things such as hash caching. + block := util.NewBlockFromBlockAndBytes(msg, buf) + + // Add the block to the known inventory for the peer. + iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) + sp.AddKnownInventory(iv) + + // Queue the block up to be handled by the block + // manager and intentionally block further receives + // until the bitcoin block is fully processed and known + // good or bad. This helps prevent a malicious peer + // from queuing up a bunch of bad blocks before + // disconnecting (or being disconnected) and wasting + // memory. Additionally, this behavior is depended on + // by at least the block acceptance test tool as the + // reference implementation processes blocks in the same + // thread and therefore blocks further messages until + // the bitcoin block has been fully processed. + sp.server.SyncManager.QueueBlock(block, sp.Peer, false, sp.blockProcessed) + <-sp.blockProcessed +} diff --git a/server/p2p/on_block_locator.go b/server/p2p/on_block_locator.go new file mode 100644 index 000000000..47631e53f --- /dev/null +++ b/server/p2p/on_block_locator.go @@ -0,0 +1,44 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnBlockLocator is invoked when a peer receives a locator bitcoin +// message. +func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) { + // Find the highest known shared block between the peers, and asks + // the block and its future from the peer. If the block is not + // found, create a lower resolution block locator and send it to + // the peer in order to find it in the next iteration. + dag := sp.server.DAG + if len(msg.BlockLocatorHashes) == 0 { + peerLog.Warnf("Got empty block locator from peer %s", + sp) + return + } + // If the first hash of the block locator is known, it means we found + // the highest shared block. + firstHash := msg.BlockLocatorHashes[0] + if dag.BlockExists(firstHash) { + if dag.IsKnownFinalizedBlock(firstHash) { + peerLog.Warnf("Cannot sync with peer %s because the highest"+ + " shared chain block (%s) is below the finality point", sp, firstHash) + sp.Disconnect() + return + } + err := sp.server.SyncManager.PushGetBlockInvsOrHeaders(sp.Peer, firstHash) + if err != nil { + peerLog.Errorf("Failed pushing get blocks message for peer %s: %s", + sp, err) + return + } + return + } + startHash, stopHash := dag.FindNextLocatorBoundaries(msg.BlockLocatorHashes) + if startHash == nil { + panic("Couldn't find any unknown hashes in the block locator.") + } + sp.PushGetBlockLocatorMsg(startHash, stopHash) +} diff --git a/server/p2p/on_fee_filter.go b/server/p2p/on_fee_filter.go new file mode 100644 index 000000000..e753cc182 --- /dev/null +++ b/server/p2p/on_fee_filter.go @@ -0,0 +1,24 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/util" + "github.com/daglabs/btcd/wire" + "sync/atomic" +) + +// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and +// is used by remote peers to request that no transactions which have a fee rate +// lower than provided value are inventoried to them. The peer will be +// disconnected if an invalid fee filter value is provided. +func (sp *Peer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) { + // Check that the passed minimum fee is a valid amount. + if msg.MinFee < 0 || msg.MinFee > util.MaxSatoshi { + peerLog.Debugf("Peer %s sent an invalid feefilter '%s' -- "+ + "disconnecting", sp, util.Amount(msg.MinFee)) + sp.Disconnect() + return + } + + atomic.StoreInt64(&sp.FeeFilterInt, msg.MinFee) +} diff --git a/server/p2p/on_filter_add.go b/server/p2p/on_filter_add.go new file mode 100644 index 000000000..8f43a56fd --- /dev/null +++ b/server/p2p/on_filter_add.go @@ -0,0 +1,27 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnFilterAdd is invoked when a peer receives a filteradd bitcoin +// message and is used by remote peers to add data to an already loaded bloom +// filter. The peer will be disconnected if a filter is not loaded when this +// message is received or the server is not configured to allow bloom filters. +func (sp *Peer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) { + // Disconnect and/or ban depending on the node bloom services flag and + // negotiated protocol version. + if !sp.enforceNodeBloomFlag(msg.Command()) { + return + } + + if sp.filter.IsLoaded() { + peerLog.Debugf("%s sent a filteradd request with no filter "+ + "loaded -- disconnecting", sp) + sp.Disconnect() + return + } + + sp.filter.Add(msg.Data) +} diff --git a/server/p2p/on_filter_clear.go b/server/p2p/on_filter_clear.go new file mode 100644 index 000000000..36149779a --- /dev/null +++ b/server/p2p/on_filter_clear.go @@ -0,0 +1,27 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnFilterClear is invoked when a peer receives a filterclear bitcoin +// message and is used by remote peers to clear an already loaded bloom filter. +// The peer will be disconnected if a filter is not loaded when this message is +// received or the server is not configured to allow bloom filters. +func (sp *Peer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) { + // Disconnect and/or ban depending on the node bloom services flag and + // negotiated protocol version. + if !sp.enforceNodeBloomFlag(msg.Command()) { + return + } + + if !sp.filter.IsLoaded() { + peerLog.Debugf("%s sent a filterclear request with no "+ + "filter loaded -- disconnecting", sp) + sp.Disconnect() + return + } + + sp.filter.Unload() +} diff --git a/server/p2p/on_filter_load.go b/server/p2p/on_filter_load.go new file mode 100644 index 000000000..561a89ada --- /dev/null +++ b/server/p2p/on_filter_load.go @@ -0,0 +1,23 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnFilterLoad is invoked when a peer receives a filterload bitcoin +// message and it used to load a bloom filter that should be used for +// delivering merkle blocks and associated transactions that match the filter. +// The peer will be disconnected if the server is not configured to allow bloom +// filters. +func (sp *Peer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) { + // Disconnect and/or ban depending on the node bloom services flag and + // negotiated protocol version. + if !sp.enforceNodeBloomFlag(msg.Command()) { + return + } + + sp.setDisableRelayTx(false) + + sp.filter.Reload(msg) +} diff --git a/server/p2p/on_get_addr.go b/server/p2p/on_get_addr.go new file mode 100644 index 000000000..1961d186e --- /dev/null +++ b/server/p2p/on_get_addr.go @@ -0,0 +1,43 @@ +package p2p + +import ( + "github.com/daglabs/btcd/config" + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnGetAddr is invoked when a peer receives a getaddr bitcoin message +// and is used to provide the peer with known addresses from the address +// manager. +func (sp *Peer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) { + // Don't return any addresses when running on the simulation test + // network. This helps prevent the network from becoming another + // public test network since it will not be able to learn about other + // peers that have not specifically been provided. + if config.MainConfig().SimNet { + return + } + + // Do not accept getaddr requests from outbound peers. This reduces + // fingerprinting attacks. + if !sp.Inbound() { + peerLog.Debugf("Ignoring getaddr request from outbound peer ", + "%s", sp) + return + } + + // Only allow one getaddr request per connection to discourage + // address stamping of inv announcements. + if sp.sentAddrs { + peerLog.Debugf("Ignoring repeated getaddr request from peer ", + "%s", sp) + return + } + sp.sentAddrs = true + + // Get the current known addresses from the address manager. + addrCache := sp.server.addrManager.AddressCache(msg.IncludeAllSubnetworks, msg.SubnetworkID) + + // Push the addresses. + sp.pushAddrMsg(addrCache, sp.SubnetworkID()) +} diff --git a/server/p2p/on_get_block_invs.go b/server/p2p/on_get_block_invs.go new file mode 100644 index 000000000..28f235007 --- /dev/null +++ b/server/p2p/on_get_block_invs.go @@ -0,0 +1,37 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnGetBlockInvs is invoked when a peer receives a getblockinvs bitcoin +// message. +// It finds the blue future between msg.StartHash and msg.StopHash +// and send the invs to the requesting peer. +func (sp *Peer) OnGetBlockInvs(_ *peer.Peer, msg *wire.MsgGetBlockInvs) { + dag := sp.server.DAG + // We want to prevent a situation where the syncing peer needs + // to call getblocks once again, but the block we sent him + // won't affect his selected chain, so next time it'll try + // to find the highest shared chain block, it'll find the + // same one as before. + // To prevent that we use blockdag.FinalityInterval as maxHashes. + // This way, if one getblocks is not enough to get the peer + // synced, we can know for sure that its selected chain will + // change, so we'll have higher shared chain block. + hashList := dag.GetBlueBlocksHashesBetween(msg.StartHash, msg.StopHash, + wire.MaxInvPerMsg) + + // Generate inventory message. + invMsg := wire.NewMsgInv() + for i := range hashList { + iv := wire.NewInvVect(wire.InvTypeSyncBlock, hashList[i]) + invMsg.AddInvVect(iv) + } + + // Send the inventory message if there is anything to send. + if len(invMsg.InvList) > 0 { + sp.QueueMessage(invMsg, nil) + } +} diff --git a/server/p2p/on_get_block_locator.go b/server/p2p/on_get_block_locator.go new file mode 100644 index 000000000..e5b8d4ce4 --- /dev/null +++ b/server/p2p/on_get_block_locator.go @@ -0,0 +1,25 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnGetBlockLocator is invoked when a peer receives a getlocator bitcoin +// message. +func (sp *Peer) OnGetBlockLocator(_ *peer.Peer, msg *wire.MsgGetBlockLocator) { + locator := sp.server.DAG.BlockLocatorFromHashes(msg.StartHash, msg.StopHash) + + if len(locator) == 0 { + peerLog.Infof("Couldn't build a block locator between blocks %s and %s"+ + " that was requested from peer %s", + sp) + return + } + err := sp.PushBlockLocatorMsg(locator) + if err != nil { + peerLog.Errorf("Failed to send block locator message to peer %s: %s", + sp, err) + return + } +} diff --git a/server/p2p/on_get_c_filters.go b/server/p2p/on_get_c_filters.go new file mode 100644 index 000000000..962fc0685 --- /dev/null +++ b/server/p2p/on_get_c_filters.go @@ -0,0 +1,37 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message. +func (sp *Peer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { + // Ignore getcfilters requests if not in sync. + if !sp.server.SyncManager.IsCurrent() { + return + } + + hashes, err := sp.server.DAG.HeightToHashRange(msg.StartHeight, + msg.StopHash, wire.MaxGetCFiltersReqRange) + if err != nil { + peerLog.Debugf("Invalid getcfilters request: %s", err) + return + } + + filters, err := sp.server.CfIndex.FiltersByBlockHashes(hashes, + msg.FilterType) + if err != nil { + peerLog.Errorf("Error retrieving cfilters: %s", err) + return + } + + for i, filterBytes := range filters { + if len(filterBytes) == 0 { + peerLog.Warnf("Could not obtain cfilter for %s", hashes[i]) + return + } + filterMsg := wire.NewMsgCFilter(msg.FilterType, hashes[i], filterBytes) + sp.QueueMessage(filterMsg, nil) + } +} diff --git a/server/p2p/on_get_cf_checkpt.go b/server/p2p/on_get_cf_checkpt.go new file mode 100644 index 000000000..7689927fe --- /dev/null +++ b/server/p2p/on_get_cf_checkpt.go @@ -0,0 +1,106 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/util/daghash" + "github.com/daglabs/btcd/wire" +) + +// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message. +func (sp *Peer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { + // Ignore getcfcheckpt requests if not in sync. + if !sp.server.SyncManager.IsCurrent() { + return + } + + blockHashes, err := sp.server.DAG.IntervalBlockHashes(msg.StopHash, + wire.CFCheckptInterval) + if err != nil { + peerLog.Debugf("Invalid getcfilters request: %s", err) + return + } + + var updateCache bool + var checkptCache []cfHeaderKV + + if len(blockHashes) > len(checkptCache) { + // Update the cache if the checkpoint chain is longer than the cached + // one. This ensures that the cache is relatively stable and mostly + // overlaps with the best chain, since it follows the longest chain + // heuristic. + updateCache = true + + // Take write lock because we are going to update cache. + sp.server.cfCheckptCachesMtx.Lock() + defer sp.server.cfCheckptCachesMtx.Unlock() + + // Grow the checkptCache to be the length of blockHashes. + additionalLength := len(blockHashes) - len(checkptCache) + checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType], + make([]cfHeaderKV, additionalLength)...) + } else { + updateCache = false + + // Take reader lock because we are not going to update cache. + sp.server.cfCheckptCachesMtx.RLock() + defer sp.server.cfCheckptCachesMtx.RUnlock() + + checkptCache = sp.server.cfCheckptCaches[msg.FilterType] + } + + // Iterate backwards until the block hash is found in the cache. + var forkIdx int + for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- { + if checkptCache[forkIdx-1].blockHash.IsEqual(blockHashes[forkIdx-1]) { + break + } + } + + // Populate results with cached checkpoints. + checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, msg.StopHash, + len(blockHashes)) + for i := 0; i < forkIdx; i++ { + checkptMsg.AddCFHeader(checkptCache[i].filterHeader) + } + + // Look up any filter headers that aren't cached. + blockHashPtrs := make([]*daghash.Hash, 0, len(blockHashes)-forkIdx) + for i := forkIdx; i < len(blockHashes); i++ { + blockHashPtrs = append(blockHashPtrs, blockHashes[i]) + } + + filterHeaders, err := sp.server.CfIndex.FilterHeadersByBlockHashes(blockHashPtrs, + msg.FilterType) + if err != nil { + peerLog.Errorf("Error retrieving cfilter headers: %s", err) + return + } + + for i, filterHeaderBytes := range filterHeaders { + if len(filterHeaderBytes) == 0 { + peerLog.Warnf("Could not obtain CF header for %s", blockHashPtrs[i]) + return + } + + filterHeader, err := daghash.NewHash(filterHeaderBytes) + if err != nil { + peerLog.Warnf("Committed filter header deserialize "+ + "failed: %s", err) + return + } + + checkptMsg.AddCFHeader(filterHeader) + if updateCache { + checkptCache[forkIdx+i] = cfHeaderKV{ + blockHash: blockHashes[forkIdx+i], + filterHeader: filterHeader, + } + } + } + + if updateCache { + sp.server.cfCheckptCaches[msg.FilterType] = checkptCache + } + + sp.QueueMessage(checkptMsg, nil) +} diff --git a/server/p2p/on_get_cf_headers.go b/server/p2p/on_get_cf_headers.go new file mode 100644 index 000000000..cb14a48b8 --- /dev/null +++ b/server/p2p/on_get_cf_headers.go @@ -0,0 +1,102 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/util/daghash" + "github.com/daglabs/btcd/wire" +) + +// OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message. +func (sp *Peer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { + // Ignore getcfilterheader requests if not in sync. + if !sp.server.SyncManager.IsCurrent() { + return + } + + startHeight := msg.StartHeight + maxResults := wire.MaxCFHeadersPerMsg + + // If StartHeight is positive, fetch the predecessor block hash so we can + // populate the PrevFilterHeader field. + if msg.StartHeight > 0 { + startHeight-- + maxResults++ + } + + // Fetch the hashes from the block index. + hashList, err := sp.server.DAG.HeightToHashRange(startHeight, + msg.StopHash, maxResults) + if err != nil { + peerLog.Debugf("Invalid getcfheaders request: %s", err) + } + + // This is possible if StartHeight is one greater that the height of + // StopHash, and we pull a valid range of hashes including the previous + // filter header. + if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) { + peerLog.Debug("No results for getcfheaders request") + return + } + + // Fetch the raw filter hash bytes from the database for all blocks. + filterHashes, err := sp.server.CfIndex.FilterHashesByBlockHashes(hashList, + msg.FilterType) + if err != nil { + peerLog.Errorf("Error retrieving cfilter hashes: %s", err) + return + } + + // Generate cfheaders message and send it. + headersMsg := wire.NewMsgCFHeaders() + + // Populate the PrevFilterHeader field. + if msg.StartHeight > 0 { + parentHash := hashList[0] + + // Fetch the raw committed filter header bytes from the + // database. + headerBytes, err := sp.server.CfIndex.FilterHeaderByBlockHash( + parentHash, msg.FilterType) + if err != nil { + peerLog.Errorf("Error retrieving CF header: %s", err) + return + } + if len(headerBytes) == 0 { + peerLog.Warnf("Could not obtain CF header for %s", parentHash) + return + } + + // Deserialize the hash into PrevFilterHeader. + err = headersMsg.PrevFilterHeader.SetBytes(headerBytes) + if err != nil { + peerLog.Warnf("Committed filter header deserialize "+ + "failed: %s", err) + return + } + + hashList = hashList[1:] + filterHashes = filterHashes[1:] + } + + // Populate HeaderHashes. + for i, hashBytes := range filterHashes { + if len(hashBytes) == 0 { + peerLog.Warnf("Could not obtain CF hash for %s", hashList[i]) + return + } + + // Deserialize the hash. + filterHash, err := daghash.NewHash(hashBytes) + if err != nil { + peerLog.Warnf("Committed filter hash deserialize "+ + "failed: %s", err) + return + } + + headersMsg.AddCFHash(filterHash) + } + + headersMsg.FilterType = msg.FilterType + headersMsg.StopHash = msg.StopHash + sp.QueueMessage(headersMsg, nil) +} diff --git a/server/p2p/on_get_data.go b/server/p2p/on_get_data.go new file mode 100644 index 000000000..597135c69 --- /dev/null +++ b/server/p2p/on_get_data.go @@ -0,0 +1,83 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/util/daghash" + "github.com/daglabs/btcd/wire" +) + +// OnGetData is invoked when a peer receives a getdata bitcoin message and +// is used to deliver block and transaction information. +func (sp *Peer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { + numAdded := 0 + notFound := wire.NewMsgNotFound() + + length := len(msg.InvList) + // A decaying ban score increase is applied to prevent exhausting resources + // with unusually large inventory queries. + // Requesting more than the maximum inventory vector length within a short + // period of time yields a score above the default ban threshold. Sustained + // bursts of small requests are not penalized as that would potentially ban + // peers performing IBD. + // This incremental score decays each minute to half of its value. + sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") + + // We wait on this wait channel periodically to prevent queuing + // far more data than we can send in a reasonable time, wasting memory. + // The waiting occurs after the database fetch for the next one to + // provide a little pipelining. + var waitChan chan struct{} + doneChan := make(chan struct{}, 1) + + for i, iv := range msg.InvList { + var c chan struct{} + // If this will be the last message we send. + if i == length-1 && len(notFound.InvList) == 0 { + c = doneChan + } else if (i+1)%3 == 0 { + // Buffered so as to not make the send goroutine block. + c = make(chan struct{}, 1) + } + var err error + switch iv.Type { + case wire.InvTypeTx: + err = sp.server.pushTxMsg(sp, (*daghash.TxID)(iv.Hash), c, waitChan) + case wire.InvTypeSyncBlock: + fallthrough + case wire.InvTypeBlock: + err = sp.server.pushBlockMsg(sp, iv.Hash, c, waitChan) + case wire.InvTypeFilteredBlock: + err = sp.server.pushMerkleBlockMsg(sp, iv.Hash, c, waitChan) + default: + peerLog.Warnf("Unknown type in inventory request %d", + iv.Type) + continue + } + if err != nil { + notFound.AddInvVect(iv) + + // When there is a failure fetching the final entry + // and the done channel was sent in due to there + // being no outstanding not found inventory, consume + // it here because there is now not found inventory + // that will use the channel momentarily. + if i == len(msg.InvList)-1 && c != nil { + <-c + } + } + numAdded++ + waitChan = c + } + if len(notFound.InvList) != 0 { + sp.QueueMessage(notFound, doneChan) + } + + // Wait for messages to be sent. We can send quite a lot of data at this + // point and this will keep the peer busy for a decent amount of time. + // We don't process anything else by them in this time so that we + // have an idea of when we should hear back from them - else the idle + // timeout could fire when we were only half done sending the blocks. + if numAdded > 0 { + <-doneChan + } +} diff --git a/server/p2p/on_get_headers.go b/server/p2p/on_get_headers.go new file mode 100644 index 000000000..0d3b82049 --- /dev/null +++ b/server/p2p/on_get_headers.go @@ -0,0 +1,33 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnGetHeaders is invoked when a peer receives a getheaders bitcoin +// message. +func (sp *Peer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) { + // Ignore getheaders requests if not in sync. + if !sp.server.SyncManager.IsCurrent() { + return + } + + // Find the most recent known block in the best chain based on the block + // locator and fetch all of the headers after it until either + // wire.MaxBlockHeadersPerMsg have been fetched or the provided stop + // hash is encountered. + // + // Use the block after the genesis block if no other blocks in the + // provided locator are known. This does mean the client will start + // over with the genesis block if unknown block locators are provided. + dag := sp.server.DAG + headers := dag.GetBlueBlocksHeadersBetween(msg.StartHash, msg.StopHash) + + // Send found headers to the requesting peer. + blockHeaders := make([]*wire.BlockHeader, len(headers)) + for i := range headers { + blockHeaders[i] = headers[i] + } + sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil) +} diff --git a/server/p2p/on_headers.go b/server/p2p/on_headers.go new file mode 100644 index 000000000..e3f4c4aa5 --- /dev/null +++ b/server/p2p/on_headers.go @@ -0,0 +1,12 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnHeaders is invoked when a peer receives a headers bitcoin +// message. The message is passed down to the sync manager. +func (sp *Peer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { + sp.server.SyncManager.QueueHeaders(msg, sp.Peer) +} diff --git a/server/p2p/on_inv.go b/server/p2p/on_inv.go new file mode 100644 index 000000000..d521744ab --- /dev/null +++ b/server/p2p/on_inv.go @@ -0,0 +1,41 @@ +package p2p + +import ( + "github.com/daglabs/btcd/config" + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnInv is invoked when a peer receives an inv bitcoin message and is +// used to examine the inventory being advertised by the remote peer and react +// accordingly. We pass the message down to blockmanager which will call +// QueueMessage with any appropriate responses. +func (sp *Peer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { + if !config.MainConfig().BlocksOnly { + if len(msg.InvList) > 0 { + sp.server.SyncManager.QueueInv(msg, sp.Peer) + } + return + } + + newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList))) + for _, invVect := range msg.InvList { + if invVect.Type == wire.InvTypeTx { + peerLog.Tracef("Ignoring tx %s in inv from %s -- "+ + "blocksonly enabled", invVect.Hash, sp) + peerLog.Infof("Peer %s is announcing "+ + "transactions -- disconnecting", sp) + sp.Disconnect() + return + } + err := newInv.AddInvVect(invVect) + if err != nil { + peerLog.Errorf("Failed to add inventory vector: %s", err) + break + } + } + + if len(newInv.InvList) > 0 { + sp.server.SyncManager.QueueInv(newInv, sp.Peer) + } +} diff --git a/server/p2p/on_mempool.go b/server/p2p/on_mempool.go new file mode 100644 index 000000000..b389d6207 --- /dev/null +++ b/server/p2p/on_mempool.go @@ -0,0 +1,55 @@ +package p2p + +import ( + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/util/daghash" + "github.com/daglabs/btcd/wire" +) + +// OnMemPool is invoked when a peer receives a mempool bitcoin message. +// It creates and sends an inventory message with the contents of the memory +// pool up to the maximum inventory allowed per message. When the peer has a +// bloom filter loaded, the contents are filtered accordingly. +func (sp *Peer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) { + // Only allow mempool requests if the server has bloom filtering + // enabled. + if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom { + peerLog.Debugf("peer %s sent mempool request with bloom "+ + "filtering disabled -- disconnecting", sp) + sp.Disconnect() + return + } + + // A decaying ban score increase is applied to prevent flooding. + // The ban score accumulates and passes the ban threshold if a burst of + // mempool messages comes from a peer. The score decays each minute to + // half of its value. + sp.addBanScore(0, 33, "mempool") + + // Generate inventory message with the available transactions in the + // transaction memory pool. Limit it to the max allowed inventory + // per message. The NewMsgInvSizeHint function automatically limits + // the passed hint to the maximum allowed, so it's safe to pass it + // without double checking it here. + txMemPool := sp.server.TxMemPool + txDescs := txMemPool.TxDescs() + invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs))) + + for _, txDesc := range txDescs { + // Either add all transactions when there is no bloom filter, + // or only the transactions that match the filter when there is + // one. + if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) { + iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(txDesc.Tx.ID())) + invMsg.AddInvVect(iv) + if len(invMsg.InvList)+1 > wire.MaxInvPerMsg { + break + } + } + } + + // Send the inventory message if there is anything to send. + if len(invMsg.InvList) > 0 { + sp.QueueMessage(invMsg, nil) + } +} diff --git a/server/p2p/on_tx.go b/server/p2p/on_tx.go new file mode 100644 index 000000000..7051a77be --- /dev/null +++ b/server/p2p/on_tx.go @@ -0,0 +1,36 @@ +package p2p + +import ( + "github.com/daglabs/btcd/config" + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/util" + "github.com/daglabs/btcd/util/daghash" + "github.com/daglabs/btcd/wire" +) + +// OnTx is invoked when a peer receives a tx bitcoin message. It blocks +// until the bitcoin transaction has been fully processed. Unlock the block +// handler this does not serialize all transactions through a single thread +// transactions don't rely on the previous one in a linear fashion like blocks. +func (sp *Peer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { + if config.MainConfig().BlocksOnly { + peerLog.Tracef("Ignoring tx %s from %s - blocksonly enabled", + msg.TxID(), sp) + return + } + + // Add the transaction to the known inventory for the peer. + // Convert the raw MsgTx to a util.Tx which provides some convenience + // methods and things such as hash caching. + tx := util.NewTx(msg) + iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(tx.ID())) + sp.AddKnownInventory(iv) + + // Queue the transaction up to be handled by the sync manager and + // intentionally block further receives until the transaction is fully + // processed and known good or bad. This helps prevent a malicious peer + // from queuing up a bunch of bad transactions before disconnecting (or + // being disconnected) and wasting memory. + sp.server.SyncManager.QueueTx(tx, sp.Peer, sp.txProcessed) + <-sp.txProcessed +} diff --git a/server/p2p/on_version.go b/server/p2p/on_version.go new file mode 100644 index 000000000..84954bac1 --- /dev/null +++ b/server/p2p/on_version.go @@ -0,0 +1,64 @@ +package p2p + +import ( + "github.com/daglabs/btcd/addrmgr" + "github.com/daglabs/btcd/config" + "github.com/daglabs/btcd/peer" + "github.com/daglabs/btcd/wire" +) + +// OnVersion is invoked when a peer receives a version bitcoin message +// and is used to negotiate the protocol version details as well as kick start +// the communications. +func (sp *Peer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { + // Add the remote peer time as a sample for creating an offset against + // the local clock to keep the network time in sync. + sp.server.TimeSource.AddTimeSample(sp.Addr(), msg.Timestamp) + + // Signal the sync manager this peer is a new sync candidate. + sp.server.SyncManager.NewPeer(sp.Peer) + + // Choose whether or not to relay transactions before a filter command + // is received. + sp.setDisableRelayTx(msg.DisableRelayTx) + + // Update the address manager and request known addresses from the + // remote peer for outbound connections. This is skipped when running + // on the simulation test network since it is only intended to connect + // to specified peers and actively avoids advertising and connecting to + // discovered peers. + if !config.MainConfig().SimNet { + addrManager := sp.server.addrManager + + // Outbound connections. + if !sp.Inbound() { + // TODO(davec): Only do this if not doing the initial block + // download and the local address is routable. + if !config.MainConfig().DisableListen /* && isCurrent? */ { + // Get address that best matches. + lna := addrManager.GetBestLocalAddress(sp.NA()) + if addrmgr.IsRoutable(lna) { + // Filter addresses the peer already knows about. + addresses := []*wire.NetAddress{lna} + sp.pushAddrMsg(addresses, sp.SubnetworkID()) + } + } + + // Request known addresses if the server address manager needs + // more. + if addrManager.NeedMoreAddresses() { + sp.QueueMessage(wire.NewMsgGetAddr(false, sp.SubnetworkID()), nil) + + if sp.SubnetworkID() != nil { + sp.QueueMessage(wire.NewMsgGetAddr(false, nil), nil) + } + } + + // Mark the address as a known good address. + addrManager.Good(sp.NA(), msg.SubnetworkID) + } + } + + // Add valid peer to the server. + sp.server.AddPeer(sp) +} diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index b7403f50c..dd4ce28a0 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -410,619 +410,6 @@ func (sp *Peer) addBanScore(persistent, transient uint32, reason string) { } } -// OnVersion is invoked when a peer receives a version bitcoin message -// and is used to negotiate the protocol version details as well as kick start -// the communications. -func (sp *Peer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) { - // Add the remote peer time as a sample for creating an offset against - // the local clock to keep the network time in sync. - sp.server.TimeSource.AddTimeSample(sp.Addr(), msg.Timestamp) - - // Signal the sync manager this peer is a new sync candidate. - sp.server.SyncManager.NewPeer(sp.Peer) - - // Choose whether or not to relay transactions before a filter command - // is received. - sp.setDisableRelayTx(msg.DisableRelayTx) - - // Update the address manager and request known addresses from the - // remote peer for outbound connections. This is skipped when running - // on the simulation test network since it is only intended to connect - // to specified peers and actively avoids advertising and connecting to - // discovered peers. - if !config.MainConfig().SimNet { - addrManager := sp.server.addrManager - - // Outbound connections. - if !sp.Inbound() { - // TODO(davec): Only do this if not doing the initial block - // download and the local address is routable. - if !config.MainConfig().DisableListen /* && isCurrent? */ { - // Get address that best matches. - lna := addrManager.GetBestLocalAddress(sp.NA()) - if addrmgr.IsRoutable(lna) { - // Filter addresses the peer already knows about. - addresses := []*wire.NetAddress{lna} - sp.pushAddrMsg(addresses, sp.SubnetworkID()) - } - } - - // Request known addresses if the server address manager needs - // more. - if addrManager.NeedMoreAddresses() { - sp.QueueMessage(wire.NewMsgGetAddr(false, sp.SubnetworkID()), nil) - - if sp.SubnetworkID() != nil { - sp.QueueMessage(wire.NewMsgGetAddr(false, nil), nil) - } - } - - // Mark the address as a known good address. - addrManager.Good(sp.NA(), msg.SubnetworkID) - } - } - - // Add valid peer to the server. - sp.server.AddPeer(sp) -} - -// OnMemPool is invoked when a peer receives a mempool bitcoin message. -// It creates and sends an inventory message with the contents of the memory -// pool up to the maximum inventory allowed per message. When the peer has a -// bloom filter loaded, the contents are filtered accordingly. -func (sp *Peer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) { - // Only allow mempool requests if the server has bloom filtering - // enabled. - if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom { - peerLog.Debugf("peer %s sent mempool request with bloom "+ - "filtering disabled -- disconnecting", sp) - sp.Disconnect() - return - } - - // A decaying ban score increase is applied to prevent flooding. - // The ban score accumulates and passes the ban threshold if a burst of - // mempool messages comes from a peer. The score decays each minute to - // half of its value. - sp.addBanScore(0, 33, "mempool") - - // Generate inventory message with the available transactions in the - // transaction memory pool. Limit it to the max allowed inventory - // per message. The NewMsgInvSizeHint function automatically limits - // the passed hint to the maximum allowed, so it's safe to pass it - // without double checking it here. - txMemPool := sp.server.TxMemPool - txDescs := txMemPool.TxDescs() - invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs))) - - for _, txDesc := range txDescs { - // Either add all transactions when there is no bloom filter, - // or only the transactions that match the filter when there is - // one. - if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) { - iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(txDesc.Tx.ID())) - invMsg.AddInvVect(iv) - if len(invMsg.InvList)+1 > wire.MaxInvPerMsg { - break - } - } - } - - // Send the inventory message if there is anything to send. - if len(invMsg.InvList) > 0 { - sp.QueueMessage(invMsg, nil) - } -} - -// OnTx is invoked when a peer receives a tx bitcoin message. It blocks -// until the bitcoin transaction has been fully processed. Unlock the block -// handler this does not serialize all transactions through a single thread -// transactions don't rely on the previous one in a linear fashion like blocks. -func (sp *Peer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { - if config.MainConfig().BlocksOnly { - peerLog.Tracef("Ignoring tx %s from %s - blocksonly enabled", - msg.TxID(), sp) - return - } - - // Add the transaction to the known inventory for the peer. - // Convert the raw MsgTx to a util.Tx which provides some convenience - // methods and things such as hash caching. - tx := util.NewTx(msg) - iv := wire.NewInvVect(wire.InvTypeTx, (*daghash.Hash)(tx.ID())) - sp.AddKnownInventory(iv) - - // Queue the transaction up to be handled by the sync manager and - // intentionally block further receives until the transaction is fully - // processed and known good or bad. This helps prevent a malicious peer - // from queuing up a bunch of bad transactions before disconnecting (or - // being disconnected) and wasting memory. - sp.server.SyncManager.QueueTx(tx, sp.Peer, sp.txProcessed) - <-sp.txProcessed -} - -// OnBlock is invoked when a peer receives a block bitcoin message. It -// blocks until the bitcoin block has been fully processed. -func (sp *Peer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { - // Convert the raw MsgBlock to a util.Block which provides some - // convenience methods and things such as hash caching. - block := util.NewBlockFromBlockAndBytes(msg, buf) - - // Add the block to the known inventory for the peer. - iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) - sp.AddKnownInventory(iv) - - // Queue the block up to be handled by the block - // manager and intentionally block further receives - // until the bitcoin block is fully processed and known - // good or bad. This helps prevent a malicious peer - // from queuing up a bunch of bad blocks before - // disconnecting (or being disconnected) and wasting - // memory. Additionally, this behavior is depended on - // by at least the block acceptance test tool as the - // reference implementation processes blocks in the same - // thread and therefore blocks further messages until - // the bitcoin block has been fully processed. - sp.server.SyncManager.QueueBlock(block, sp.Peer, false, sp.blockProcessed) - <-sp.blockProcessed -} - -// OnInv is invoked when a peer receives an inv bitcoin message and is -// used to examine the inventory being advertised by the remote peer and react -// accordingly. We pass the message down to blockmanager which will call -// QueueMessage with any appropriate responses. -func (sp *Peer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { - if !config.MainConfig().BlocksOnly { - if len(msg.InvList) > 0 { - sp.server.SyncManager.QueueInv(msg, sp.Peer) - } - return - } - - newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList))) - for _, invVect := range msg.InvList { - if invVect.Type == wire.InvTypeTx { - peerLog.Tracef("Ignoring tx %s in inv from %s -- "+ - "blocksonly enabled", invVect.Hash, sp) - peerLog.Infof("Peer %s is announcing "+ - "transactions -- disconnecting", sp) - sp.Disconnect() - return - } - err := newInv.AddInvVect(invVect) - if err != nil { - peerLog.Errorf("Failed to add inventory vector: %s", err) - break - } - } - - if len(newInv.InvList) > 0 { - sp.server.SyncManager.QueueInv(newInv, sp.Peer) - } -} - -// OnHeaders is invoked when a peer receives a headers bitcoin -// message. The message is passed down to the sync manager. -func (sp *Peer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { - sp.server.SyncManager.QueueHeaders(msg, sp.Peer) -} - -// OnGetData is invoked when a peer receives a getdata bitcoin message and -// is used to deliver block and transaction information. -func (sp *Peer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { - numAdded := 0 - notFound := wire.NewMsgNotFound() - - length := len(msg.InvList) - // A decaying ban score increase is applied to prevent exhausting resources - // with unusually large inventory queries. - // Requesting more than the maximum inventory vector length within a short - // period of time yields a score above the default ban threshold. Sustained - // bursts of small requests are not penalized as that would potentially ban - // peers performing IBD. - // This incremental score decays each minute to half of its value. - sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") - - // We wait on this wait channel periodically to prevent queuing - // far more data than we can send in a reasonable time, wasting memory. - // The waiting occurs after the database fetch for the next one to - // provide a little pipelining. - var waitChan chan struct{} - doneChan := make(chan struct{}, 1) - - for i, iv := range msg.InvList { - var c chan struct{} - // If this will be the last message we send. - if i == length-1 && len(notFound.InvList) == 0 { - c = doneChan - } else if (i+1)%3 == 0 { - // Buffered so as to not make the send goroutine block. - c = make(chan struct{}, 1) - } - var err error - switch iv.Type { - case wire.InvTypeTx: - err = sp.server.pushTxMsg(sp, (*daghash.TxID)(iv.Hash), c, waitChan) - case wire.InvTypeSyncBlock: - fallthrough - case wire.InvTypeBlock: - err = sp.server.pushBlockMsg(sp, iv.Hash, c, waitChan) - case wire.InvTypeFilteredBlock: - err = sp.server.pushMerkleBlockMsg(sp, iv.Hash, c, waitChan) - default: - peerLog.Warnf("Unknown type in inventory request %d", - iv.Type) - continue - } - if err != nil { - notFound.AddInvVect(iv) - - // When there is a failure fetching the final entry - // and the done channel was sent in due to there - // being no outstanding not found inventory, consume - // it here because there is now not found inventory - // that will use the channel momentarily. - if i == len(msg.InvList)-1 && c != nil { - <-c - } - } - numAdded++ - waitChan = c - } - if len(notFound.InvList) != 0 { - sp.QueueMessage(notFound, doneChan) - } - - // Wait for messages to be sent. We can send quite a lot of data at this - // point and this will keep the peer busy for a decent amount of time. - // We don't process anything else by them in this time so that we - // have an idea of when we should hear back from them - else the idle - // timeout could fire when we were only half done sending the blocks. - if numAdded > 0 { - <-doneChan - } -} - -// OnGetBlockLocator is invoked when a peer receives a getlocator bitcoin -// message. -func (sp *Peer) OnGetBlockLocator(_ *peer.Peer, msg *wire.MsgGetBlockLocator) { - locator := sp.server.DAG.BlockLocatorFromHashes(msg.StartHash, msg.StopHash) - - if len(locator) == 0 { - peerLog.Infof("Couldn't build a block locator between blocks %s and %s"+ - " that was requested from peer %s", - sp) - return - } - err := sp.PushBlockLocatorMsg(locator) - if err != nil { - peerLog.Errorf("Failed to send block locator message to peer %s: %s", - sp, err) - return - } -} - -// OnBlockLocator is invoked when a peer receives a locator bitcoin -// message. -func (sp *Peer) OnBlockLocator(_ *peer.Peer, msg *wire.MsgBlockLocator) { - // Find the highest known shared block between the peers, and asks - // the block and its future from the peer. If the block is not - // found, create a lower resolution block locator and send it to - // the peer in order to find it in the next iteration. - dag := sp.server.DAG - if len(msg.BlockLocatorHashes) == 0 { - peerLog.Warnf("Got empty block locator from peer %s", - sp) - return - } - // If the first hash of the block locator is known, it means we found - // the highest shared block. - firstHash := msg.BlockLocatorHashes[0] - if dag.BlockExists(firstHash) { - if dag.IsKnownFinalizedBlock(firstHash) { - peerLog.Warnf("Cannot sync with peer %s because the highest"+ - " shared chain block (%s) is below the finality point", sp, firstHash) - sp.Disconnect() - return - } - err := sp.server.SyncManager.PushGetBlockInvsOrHeaders(sp.Peer, firstHash) - if err != nil { - peerLog.Errorf("Failed pushing get blocks message for peer %s: %s", - sp, err) - return - } - return - } - startHash, stopHash := dag.FindNextLocatorBoundaries(msg.BlockLocatorHashes) - if startHash == nil { - panic("Couldn't find any unknown hashes in the block locator.") - } - sp.PushGetBlockLocatorMsg(startHash, stopHash) -} - -// OnGetBlockInvs is invoked when a peer receives a getblockinvs bitcoin -// message. -// It finds the blue future between msg.StartHash and msg.StopHash -// and send the invs to the requesting peer. -func (sp *Peer) OnGetBlockInvs(_ *peer.Peer, msg *wire.MsgGetBlockInvs) { - dag := sp.server.DAG - // We want to prevent a situation where the syncing peer needs - // to call getblocks once again, but the block we sent him - // won't affect his selected chain, so next time it'll try - // to find the highest shared chain block, it'll find the - // same one as before. - // To prevent that we use blockdag.FinalityInterval as maxHashes. - // This way, if one getblocks is not enough to get the peer - // synced, we can know for sure that its selected chain will - // change, so we'll have higher shared chain block. - hashList := dag.GetBlueBlocksHashesBetween(msg.StartHash, msg.StopHash, - wire.MaxInvPerMsg) - - // Generate inventory message. - invMsg := wire.NewMsgInv() - for i := range hashList { - iv := wire.NewInvVect(wire.InvTypeSyncBlock, hashList[i]) - invMsg.AddInvVect(iv) - } - - // Send the inventory message if there is anything to send. - if len(invMsg.InvList) > 0 { - sp.QueueMessage(invMsg, nil) - } -} - -// OnGetHeaders is invoked when a peer receives a getheaders bitcoin -// message. -func (sp *Peer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) { - // Ignore getheaders requests if not in sync. - if !sp.server.SyncManager.IsCurrent() { - return - } - - // Find the most recent known block in the best chain based on the block - // locator and fetch all of the headers after it until either - // wire.MaxBlockHeadersPerMsg have been fetched or the provided stop - // hash is encountered. - // - // Use the block after the genesis block if no other blocks in the - // provided locator are known. This does mean the client will start - // over with the genesis block if unknown block locators are provided. - dag := sp.server.DAG - headers := dag.GetBlueBlocksHeadersBetween(msg.StartHash, msg.StopHash) - - // Send found headers to the requesting peer. - blockHeaders := make([]*wire.BlockHeader, len(headers)) - for i := range headers { - blockHeaders[i] = headers[i] - } - sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil) -} - -// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message. -func (sp *Peer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { - // Ignore getcfilters requests if not in sync. - if !sp.server.SyncManager.IsCurrent() { - return - } - - hashes, err := sp.server.DAG.HeightToHashRange(msg.StartHeight, - msg.StopHash, wire.MaxGetCFiltersReqRange) - if err != nil { - peerLog.Debugf("Invalid getcfilters request: %s", err) - return - } - - filters, err := sp.server.CfIndex.FiltersByBlockHashes(hashes, - msg.FilterType) - if err != nil { - peerLog.Errorf("Error retrieving cfilters: %s", err) - return - } - - for i, filterBytes := range filters { - if len(filterBytes) == 0 { - peerLog.Warnf("Could not obtain cfilter for %s", hashes[i]) - return - } - filterMsg := wire.NewMsgCFilter(msg.FilterType, hashes[i], filterBytes) - sp.QueueMessage(filterMsg, nil) - } -} - -// OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message. -func (sp *Peer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { - // Ignore getcfilterheader requests if not in sync. - if !sp.server.SyncManager.IsCurrent() { - return - } - - startHeight := msg.StartHeight - maxResults := wire.MaxCFHeadersPerMsg - - // If StartHeight is positive, fetch the predecessor block hash so we can - // populate the PrevFilterHeader field. - if msg.StartHeight > 0 { - startHeight-- - maxResults++ - } - - // Fetch the hashes from the block index. - hashList, err := sp.server.DAG.HeightToHashRange(startHeight, - msg.StopHash, maxResults) - if err != nil { - peerLog.Debugf("Invalid getcfheaders request: %s", err) - } - - // This is possible if StartHeight is one greater that the height of - // StopHash, and we pull a valid range of hashes including the previous - // filter header. - if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) { - peerLog.Debug("No results for getcfheaders request") - return - } - - // Fetch the raw filter hash bytes from the database for all blocks. - filterHashes, err := sp.server.CfIndex.FilterHashesByBlockHashes(hashList, - msg.FilterType) - if err != nil { - peerLog.Errorf("Error retrieving cfilter hashes: %s", err) - return - } - - // Generate cfheaders message and send it. - headersMsg := wire.NewMsgCFHeaders() - - // Populate the PrevFilterHeader field. - if msg.StartHeight > 0 { - parentHash := hashList[0] - - // Fetch the raw committed filter header bytes from the - // database. - headerBytes, err := sp.server.CfIndex.FilterHeaderByBlockHash( - parentHash, msg.FilterType) - if err != nil { - peerLog.Errorf("Error retrieving CF header: %s", err) - return - } - if len(headerBytes) == 0 { - peerLog.Warnf("Could not obtain CF header for %s", parentHash) - return - } - - // Deserialize the hash into PrevFilterHeader. - err = headersMsg.PrevFilterHeader.SetBytes(headerBytes) - if err != nil { - peerLog.Warnf("Committed filter header deserialize "+ - "failed: %s", err) - return - } - - hashList = hashList[1:] - filterHashes = filterHashes[1:] - } - - // Populate HeaderHashes. - for i, hashBytes := range filterHashes { - if len(hashBytes) == 0 { - peerLog.Warnf("Could not obtain CF hash for %s", hashList[i]) - return - } - - // Deserialize the hash. - filterHash, err := daghash.NewHash(hashBytes) - if err != nil { - peerLog.Warnf("Committed filter hash deserialize "+ - "failed: %s", err) - return - } - - headersMsg.AddCFHash(filterHash) - } - - headersMsg.FilterType = msg.FilterType - headersMsg.StopHash = msg.StopHash - sp.QueueMessage(headersMsg, nil) -} - -// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message. -func (sp *Peer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { - // Ignore getcfcheckpt requests if not in sync. - if !sp.server.SyncManager.IsCurrent() { - return - } - - blockHashes, err := sp.server.DAG.IntervalBlockHashes(msg.StopHash, - wire.CFCheckptInterval) - if err != nil { - peerLog.Debugf("Invalid getcfilters request: %s", err) - return - } - - var updateCache bool - var checkptCache []cfHeaderKV - - if len(blockHashes) > len(checkptCache) { - // Update the cache if the checkpoint chain is longer than the cached - // one. This ensures that the cache is relatively stable and mostly - // overlaps with the best chain, since it follows the longest chain - // heuristic. - updateCache = true - - // Take write lock because we are going to update cache. - sp.server.cfCheckptCachesMtx.Lock() - defer sp.server.cfCheckptCachesMtx.Unlock() - - // Grow the checkptCache to be the length of blockHashes. - additionalLength := len(blockHashes) - len(checkptCache) - checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType], - make([]cfHeaderKV, additionalLength)...) - } else { - updateCache = false - - // Take reader lock because we are not going to update cache. - sp.server.cfCheckptCachesMtx.RLock() - defer sp.server.cfCheckptCachesMtx.RUnlock() - - checkptCache = sp.server.cfCheckptCaches[msg.FilterType] - } - - // Iterate backwards until the block hash is found in the cache. - var forkIdx int - for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- { - if checkptCache[forkIdx-1].blockHash.IsEqual(blockHashes[forkIdx-1]) { - break - } - } - - // Populate results with cached checkpoints. - checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, msg.StopHash, - len(blockHashes)) - for i := 0; i < forkIdx; i++ { - checkptMsg.AddCFHeader(checkptCache[i].filterHeader) - } - - // Look up any filter headers that aren't cached. - blockHashPtrs := make([]*daghash.Hash, 0, len(blockHashes)-forkIdx) - for i := forkIdx; i < len(blockHashes); i++ { - blockHashPtrs = append(blockHashPtrs, blockHashes[i]) - } - - filterHeaders, err := sp.server.CfIndex.FilterHeadersByBlockHashes(blockHashPtrs, - msg.FilterType) - if err != nil { - peerLog.Errorf("Error retrieving cfilter headers: %s", err) - return - } - - for i, filterHeaderBytes := range filterHeaders { - if len(filterHeaderBytes) == 0 { - peerLog.Warnf("Could not obtain CF header for %s", blockHashPtrs[i]) - return - } - - filterHeader, err := daghash.NewHash(filterHeaderBytes) - if err != nil { - peerLog.Warnf("Committed filter header deserialize "+ - "failed: %s", err) - return - } - - checkptMsg.AddCFHeader(filterHeader) - if updateCache { - checkptCache[forkIdx+i] = cfHeaderKV{ - blockHash: blockHashes[forkIdx+i], - filterHeader: filterHeader, - } - } - } - - if updateCache { - sp.server.cfCheckptCaches[msg.FilterType] = checkptCache - } - - sp.QueueMessage(checkptMsg, nil) -} - // enforceNodeBloomFlag disconnects the peer if the server is not configured to // allow bloom filters. Additionally, if the peer has negotiated to a protocol // version that is high enough to observe the bloom filter service support bit, @@ -1053,174 +440,6 @@ func (sp *Peer) enforceNodeBloomFlag(cmd string) bool { return true } -// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and -// is used by remote peers to request that no transactions which have a fee rate -// lower than provided value are inventoried to them. The peer will be -// disconnected if an invalid fee filter value is provided. -func (sp *Peer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) { - // Check that the passed minimum fee is a valid amount. - if msg.MinFee < 0 || msg.MinFee > util.MaxSatoshi { - peerLog.Debugf("Peer %s sent an invalid feefilter '%s' -- "+ - "disconnecting", sp, util.Amount(msg.MinFee)) - sp.Disconnect() - return - } - - atomic.StoreInt64(&sp.FeeFilterInt, msg.MinFee) -} - -// OnFilterAdd is invoked when a peer receives a filteradd bitcoin -// message and is used by remote peers to add data to an already loaded bloom -// filter. The peer will be disconnected if a filter is not loaded when this -// message is received or the server is not configured to allow bloom filters. -func (sp *Peer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) { - // Disconnect and/or ban depending on the node bloom services flag and - // negotiated protocol version. - if !sp.enforceNodeBloomFlag(msg.Command()) { - return - } - - if sp.filter.IsLoaded() { - peerLog.Debugf("%s sent a filteradd request with no filter "+ - "loaded -- disconnecting", sp) - sp.Disconnect() - return - } - - sp.filter.Add(msg.Data) -} - -// OnFilterClear is invoked when a peer receives a filterclear bitcoin -// message and is used by remote peers to clear an already loaded bloom filter. -// The peer will be disconnected if a filter is not loaded when this message is -// received or the server is not configured to allow bloom filters. -func (sp *Peer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) { - // Disconnect and/or ban depending on the node bloom services flag and - // negotiated protocol version. - if !sp.enforceNodeBloomFlag(msg.Command()) { - return - } - - if !sp.filter.IsLoaded() { - peerLog.Debugf("%s sent a filterclear request with no "+ - "filter loaded -- disconnecting", sp) - sp.Disconnect() - return - } - - sp.filter.Unload() -} - -// OnFilterLoad is invoked when a peer receives a filterload bitcoin -// message and it used to load a bloom filter that should be used for -// delivering merkle blocks and associated transactions that match the filter. -// The peer will be disconnected if the server is not configured to allow bloom -// filters. -func (sp *Peer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) { - // Disconnect and/or ban depending on the node bloom services flag and - // negotiated protocol version. - if !sp.enforceNodeBloomFlag(msg.Command()) { - return - } - - sp.setDisableRelayTx(false) - - sp.filter.Reload(msg) -} - -// OnGetAddr is invoked when a peer receives a getaddr bitcoin message -// and is used to provide the peer with known addresses from the address -// manager. -func (sp *Peer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) { - // Don't return any addresses when running on the simulation test - // network. This helps prevent the network from becoming another - // public test network since it will not be able to learn about other - // peers that have not specifically been provided. - if config.MainConfig().SimNet { - return - } - - // Do not accept getaddr requests from outbound peers. This reduces - // fingerprinting attacks. - if !sp.Inbound() { - peerLog.Debugf("Ignoring getaddr request from outbound peer ", - "%s", sp) - return - } - - // Only allow one getaddr request per connection to discourage - // address stamping of inv announcements. - if sp.sentAddrs { - peerLog.Debugf("Ignoring repeated getaddr request from peer ", - "%s", sp) - return - } - sp.sentAddrs = true - - // Get the current known addresses from the address manager. - addrCache := sp.server.addrManager.AddressCache(msg.IncludeAllSubnetworks, msg.SubnetworkID) - - // Push the addresses. - sp.pushAddrMsg(addrCache, sp.SubnetworkID()) -} - -// OnAddr is invoked when a peer receives an addr bitcoin message and is -// used to notify the server about advertised addresses. -func (sp *Peer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { - // Ignore addresses when running on the simulation test network. This - // helps prevent the network from becoming another public test network - // since it will not be able to learn about other peers that have not - // specifically been provided. - if config.MainConfig().SimNet { - return - } - - // A message that has no addresses is invalid. - if len(msg.AddrList) == 0 { - peerLog.Errorf("Command [%s] from %s does not contain any addresses", - msg.Command(), sp.Peer) - sp.Disconnect() - return - } - - if msg.IncludeAllSubnetworks { - peerLog.Errorf("Got unexpected IncludeAllSubnetworks=true in [%s] command from %s", - msg.Command(), sp.Peer) - sp.Disconnect() - return - } else if !msg.SubnetworkID.IsEqual(config.MainConfig().SubnetworkID) && msg.SubnetworkID != nil { - peerLog.Errorf("Only full nodes and %s subnetwork IDs are allowed in [%s] command, but got subnetwork ID %s from %s", - config.MainConfig().SubnetworkID, msg.Command(), msg.SubnetworkID, sp.Peer) - sp.Disconnect() - return - } - - for _, na := range msg.AddrList { - // Don't add more address if we're disconnecting. - if !sp.Connected() { - return - } - - // Set the timestamp to 5 days ago if it's more than 24 hours - // in the future so this address is one of the first to be - // removed when space is needed. - now := time.Now() - if na.Timestamp.After(now.Add(time.Minute * 10)) { - na.Timestamp = now.Add(-1 * time.Hour * 24 * 5) - } - - // Add address to known addresses for this peer. - sp.addKnownAddresses([]*wire.NetAddress{na}) - } - - // Add addresses to server address manager. The address manager handles - // the details of things such as preventing duplicate addresses, max - // addresses, and last seen updates. - // XXX bitcoind gives a 2 hour time penalty here, do we want to do the - // same? - sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA(), msg.SubnetworkID) -} - // OnRead is invoked when a peer receives a message and it is used to update // the bytes received by the server. func (sp *Peer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) {