// Copyright (c) 2013-2016 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package peer import ( "bytes" "container/list" "fmt" "github.com/pkg/errors" "io" "math/rand" "net" "strconv" "sync" "sync/atomic" "time" "github.com/kaspanet/kaspad/util/random" "github.com/kaspanet/kaspad/util/subnetworkid" "github.com/btcsuite/go-socks/socks" "github.com/davecgh/go-spew/spew" "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/dagconfig" "github.com/kaspanet/kaspad/logger" "github.com/kaspanet/kaspad/util/daghash" "github.com/kaspanet/kaspad/wire" ) const ( // MaxProtocolVersion is the max protocol version the peer supports. MaxProtocolVersion = wire.ProtocolVersion // minAcceptableProtocolVersion is the lowest protocol version that a // connected peer may support. minAcceptableProtocolVersion = wire.ProtocolVersion // outputBufferSize is the number of elements the output channels use. outputBufferSize = 50 // invTrickleSize is the maximum amount of inventory to send in a single // message when trickling inventory to remote peers. maxInvTrickleSize = 1000 // maxKnownInventory is the maximum number of items to keep in the known // inventory cache. maxKnownInventory = 1000 // pingInterval is the interval of time to wait in between sending ping // messages. pingInterval = 2 * time.Minute // negotiateTimeout is the duration of inactivity before we timeout a // peer that hasn't completed the initial version negotiation. negotiateTimeout = 30 * time.Second // idleTimeout is the duration of inactivity before we time out a peer. idleTimeout = 5 * time.Minute // stallTickInterval is the interval of time between each check for // stalled peers. stallTickInterval = 15 * time.Second // stallResponseTimeout is the base maximum amount of time messages that // expect a response will wait before disconnecting the peer for // stalling. The deadlines are adjusted for callback running times and // only checked on each stall tick interval. stallResponseTimeout = 30 * time.Second // trickleTimeout is the duration of the ticker which trickles down the // inventory to a peer. trickleTimeout = 100 * time.Millisecond ) var ( // nodeCount is the total number of peer connections made since startup // and is used to assign an id to a peer. nodeCount int32 // sentNonces houses the unique nonces that are generated when pushing // version messages that are used to detect self connections. sentNonces = newMruNonceMap(50) // allowSelfConns is only used to allow the tests to bypass the self // connection detecting and disconnect logic since they intentionally // do so for testing purposes. allowSelfConns bool ) // MessageListeners defines callback function pointers to invoke with message // listeners for a peer. Any listener which is not set to a concrete callback // during peer initialization is ignored. Execution of multiple message // listeners occurs serially, so one callback blocks the execution of the next. // // NOTE: Unless otherwise documented, these listeners must NOT directly call any // blocking calls (such as WaitForShutdown) on the peer instance since the input // handler goroutine blocks until the callback has completed. Doing so will // result in a deadlock. type MessageListeners struct { // OnGetAddr is invoked when a peer receives a getaddr kaspa message. OnGetAddr func(p *Peer, msg *wire.MsgGetAddr) // OnAddr is invoked when a peer receives an addr kaspa message. OnAddr func(p *Peer, msg *wire.MsgAddr) // OnPing is invoked when a peer receives a ping kaspa message. OnPing func(p *Peer, msg *wire.MsgPing) // OnPong is invoked when a peer receives a pong kaspa message. OnPong func(p *Peer, msg *wire.MsgPong) // OnTx is invoked when a peer receives a tx kaspa message. OnTx func(p *Peer, msg *wire.MsgTx) // OnBlock is invoked when a peer receives a block kaspa message. OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte) // OnInv is invoked when a peer receives an inv kaspa message. OnInv func(p *Peer, msg *wire.MsgInv) // OnGetBlockLocator is invoked when a peer receives a getlocator kaspa message. OnGetBlockLocator func(p *Peer, msg *wire.MsgGetBlockLocator) // OnBlockLocator is invoked when a peer receives a locator kaspa message. OnBlockLocator func(p *Peer, msg *wire.MsgBlockLocator) // OnNotFound is invoked when a peer receives a notfound kaspa // message. OnNotFound func(p *Peer, msg *wire.MsgNotFound) // OnGetData is invoked when a peer receives a getdata kaspa message. OnGetData func(p *Peer, msg *wire.MsgGetData) // OnGetBlockInvs is invoked when a peer receives a getblockinvs kaspa // message. OnGetBlockInvs func(p *Peer, msg *wire.MsgGetBlockInvs) // OnFeeFilter is invoked when a peer receives a feefilter kaspa message. OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter) // OnFilterAdd is invoked when a peer receives a filteradd kaspa message. OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd) // OnFilterClear is invoked when a peer receives a filterclear kaspa // message. OnFilterClear func(p *Peer, msg *wire.MsgFilterClear) // OnFilterLoad is invoked when a peer receives a filterload kaspa // message. OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad) // OnMerkleBlock is invoked when a peer receives a merkleblock kaspa // message. OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock) // OnVersion is invoked when a peer receives a version kaspa message. OnVersion func(p *Peer, msg *wire.MsgVersion) // OnVerAck is invoked when a peer receives a verack kaspa message. OnVerAck func(p *Peer, msg *wire.MsgVerAck) // OnReject is invoked when a peer receives a reject kaspa message. OnReject func(p *Peer, msg *wire.MsgReject) // OnGetSelectedTip is invoked when a peer receives a getSelectedTip kaspa // message. OnGetSelectedTip func() // OnSelectedTip is invoked when a peer receives a selectedTip kaspa // message. OnSelectedTip func(p *Peer, msg *wire.MsgSelectedTip) // OnRead is invoked when a peer receives a kaspa message. It // consists of the number of bytes read, the message, and whether or not // an error in the read occurred. Typically, callers will opt to use // the callbacks for the specific message types, however this can be // useful for circumstances such as keeping track of server-wide byte // counts or working with custom message types for which the peer does // not directly provide a callback. OnRead func(p *Peer, bytesRead int, msg wire.Message, err error) // OnWrite is invoked when we write a kaspa message to a peer. It // consists of the number of bytes written, the message, and whether or // not an error in the write occurred. This can be useful for // circumstances such as keeping track of server-wide byte counts. OnWrite func(p *Peer, bytesWritten int, msg wire.Message, err error) } // Config is the struct to hold configuration options useful to Peer. type Config struct { // SelectedTipHash specifies a callback which provides the selected tip // to the peer as needed. SelectedTipHash func() *daghash.Hash // IsInDAG determines whether a block with the given hash exists in // the DAG. IsInDAG func(*daghash.Hash) bool // HostToNetAddress returns the netaddress for the given host. This can be // nil in which case the host will be parsed as an IP address. HostToNetAddress HostToNetAddrFunc // Proxy indicates a proxy is being used for connections. The only // effect this has is to prevent leaking the tor proxy address, so it // only needs to specified if using a tor proxy. Proxy string // UserAgentName specifies the user agent name to advertise. It is // highly recommended to specify this value. UserAgentName string // UserAgentVersion specifies the user agent version to advertise. It // is highly recommended to specify this value and that it follows the // form "major.minor.revision" e.g. "2.6.41". UserAgentVersion string // UserAgentComments specify the user agent comments to advertise. These // values must not contain the illegal characters specified in BIP 14: // '/', ':', '(', ')'. UserAgentComments []string // DAGParams identifies which DAG parameters the peer is associated // with. It is highly recommended to specify this field, however it can // be omitted in which case the test network will be used. DAGParams *dagconfig.Params // Services specifies which services to advertise as supported by the // local peer. This field can be omitted in which case it will be 0 // and therefore advertise no supported services. Services wire.ServiceFlag // ProtocolVersion specifies the maximum protocol version to use and // advertise. This field can be omitted in which case // peer.MaxProtocolVersion will be used. ProtocolVersion uint32 // DisableRelayTx specifies if the remote peer should be informed to // not send inv messages for transactions. DisableRelayTx bool // Listeners houses callback functions to be invoked on receiving peer // messages. Listeners MessageListeners // SubnetworkID specifies which subnetwork the peer is associated with. // It is nil in full nodes. SubnetworkID *subnetworkid.SubnetworkID } // minUint32 is a helper function to return the minimum of two uint32s. // This avoids a math import and the need to cast to floats. func minUint32(a, b uint32) uint32 { if a < b { return a } return b } // newNetAddress attempts to extract the IP address and port from the passed // net.Addr interface and create a kaspa NetAddress structure using that // information. func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) { // addr will be a net.TCPAddr when not using a proxy. if tcpAddr, ok := addr.(*net.TCPAddr); ok { ip := tcpAddr.IP port := uint16(tcpAddr.Port) na := wire.NewNetAddressIPPort(ip, port, services) return na, nil } // addr will be a socks.ProxiedAddr when using a proxy. if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok { ip := net.ParseIP(proxiedAddr.Host) if ip == nil { ip = net.ParseIP("0.0.0.0") } port := uint16(proxiedAddr.Port) na := wire.NewNetAddressIPPort(ip, port, services) return na, nil } // For the most part, addr should be one of the two above cases, but // to be safe, fall back to trying to parse the information from the // address string as a last resort. host, portStr, err := net.SplitHostPort(addr.String()) if err != nil { return nil, err } ip := net.ParseIP(host) port, err := strconv.ParseUint(portStr, 10, 16) if err != nil { return nil, err } na := wire.NewNetAddressIPPort(ip, uint16(port), services) return na, nil } // outMsg is used to house a message to be sent along with a channel to signal // when the message has been sent (or won't be sent due to things such as // shutdown) type outMsg struct { msg wire.Message doneChan chan<- struct{} } // stallControlCmd represents the command of a stall control message. type stallControlCmd uint8 // Constants for the command of a stall control message. const ( // sccSendMessage indicates a message is being sent to the remote peer. sccSendMessage stallControlCmd = iota // sccReceiveMessage indicates a message has been received from the // remote peer. sccReceiveMessage // sccHandlerStart indicates a callback handler is about to be invoked. sccHandlerStart // sccHandlerStart indicates a callback handler has completed. sccHandlerDone ) // stallControlMsg is used to signal the stall handler about specific events // so it can properly detect and handle stalled remote peers. type stallControlMsg struct { command stallControlCmd message wire.Message } // StatsSnap is a snapshot of peer stats at a point in time. type StatsSnap struct { ID int32 Addr string Services wire.ServiceFlag LastSend time.Time LastRecv time.Time BytesSent uint64 BytesRecv uint64 ConnTime time.Time TimeOffset int64 Version uint32 UserAgent string Inbound bool SelectedTipHash *daghash.Hash LastPingNonce uint64 LastPingTime time.Time LastPingMicros int64 } // HostToNetAddrFunc is a func which takes a host, port, services and returns // the netaddress. type HostToNetAddrFunc func(host string, port uint16, services wire.ServiceFlag) (*wire.NetAddress, error) // NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound // messages are read via the inHandler goroutine and generally dispatched to // their own handler. For inbound data-related messages such as blocks, // transactions, and inventory, the data is handled by the corresponding // message handlers. The data flow for outbound messages is split into 2 // goroutines, queueHandler and outHandler. The first, queueHandler, is used // as a way for external entities to queue messages, by way of the QueueMessage // function, quickly regardless of whether the peer is currently sending or not. // It acts as the traffic cop between the external world and the actual // goroutine which writes to the network socket. // Peer provides a basic concurrent safe kaspa peer for handling kaspa // communications via the peer-to-peer protocol. It provides full duplex // reading and writing, automatic handling of the initial handshake process, // querying of usage statistics and other information about the remote peer such // as its address, user agent, and protocol version, output message queuing, // inventory trickling, and the ability to dynamically register and unregister // callbacks for handling kaspa protocol messages. // // Outbound messages are typically queued via QueueMessage or QueueInventory. // QueueMessage is intended for all messages, including responses to data such // as blocks and transactions. QueueInventory, on the other hand, is only // intended for relaying inventory as it employs a trickling mechanism to batch // the inventory together. However, some helper functions for pushing messages // of specific types that typically require common special handling are // provided as a convenience. type Peer struct { // The following variables must only be used atomically. bytesReceived uint64 bytesSent uint64 lastRecv int64 lastSend int64 connected int32 disconnect int32 conn net.Conn // These fields are set at creation time and never modified, so they are // safe to read from concurrently without a mutex. addr string cfg Config inbound bool flagsMtx sync.Mutex // protects the peer flags below na *wire.NetAddress id int32 userAgent string services wire.ServiceFlag versionKnown bool advertisedProtoVer uint32 // protocol version advertised by remote protocolVersion uint32 // negotiated protocol version verAckReceived bool knownInventory *mruInventoryMap prevGetBlockInvsMtx sync.Mutex prevGetBlockInvsLow *daghash.Hash prevGetBlockInvsHigh *daghash.Hash // These fields keep track of statistics for the peer and are protected // by the statsMtx mutex. statsMtx sync.RWMutex timeOffset int64 timeConnected time.Time selectedTipHash *daghash.Hash lastPingNonce uint64 // Set to nonce if we have a pending ping. lastPingTime time.Time // Time we sent last ping. lastPingMicros int64 // Time for last ping to return. stallControl chan stallControlMsg outputQueue chan outMsg sendQueue chan outMsg sendDoneQueue chan struct{} outputInvChan chan *wire.InvVect inQuit chan struct{} queueQuit chan struct{} outQuit chan struct{} quit chan struct{} } // String returns the peer's address and directionality as a human-readable // string. // // This function is safe for concurrent access. func (p *Peer) String() string { return fmt.Sprintf("%s (%s)", p.addr, logger.DirectionString(p.inbound)) } // AddKnownInventory adds the passed inventory to the cache of known inventory // for the peer. // // This function is safe for concurrent access. func (p *Peer) AddKnownInventory(invVect *wire.InvVect) { p.knownInventory.Add(invVect) } // StatsSnapshot returns a snapshot of the current peer flags and statistics. // // This function is safe for concurrent access. func (p *Peer) StatsSnapshot() *StatsSnap { p.statsMtx.RLock() defer p.statsMtx.RUnlock() p.flagsMtx.Lock() defer p.flagsMtx.Unlock() id := p.id addr := p.addr userAgent := p.userAgent services := p.services protocolVersion := p.advertisedProtoVer // Get a copy of all relevant flags and stats. statsSnap := &StatsSnap{ ID: id, Addr: addr, UserAgent: userAgent, Services: services, LastSend: p.LastSend(), LastRecv: p.LastRecv(), BytesSent: p.BytesSent(), BytesRecv: p.BytesReceived(), ConnTime: p.timeConnected, TimeOffset: p.timeOffset, Version: protocolVersion, Inbound: p.inbound, SelectedTipHash: p.selectedTipHash, LastPingNonce: p.lastPingNonce, LastPingMicros: p.lastPingMicros, LastPingTime: p.lastPingTime, } return statsSnap } // ID returns the peer id. // // This function is safe for concurrent access. func (p *Peer) ID() int32 { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.id } // NA returns the peer network address. // // This function is safe for concurrent access. func (p *Peer) NA() *wire.NetAddress { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.na } // Addr returns the peer address. // // This function is safe for concurrent access. func (p *Peer) Addr() string { // The address doesn't change after initialization, therefore it is not // protected by a mutex. return p.addr } // Inbound returns whether the peer is inbound. // // This function is safe for concurrent access. func (p *Peer) Inbound() bool { return p.inbound } // Services returns the services flag of the remote peer. // // This function is safe for concurrent access. func (p *Peer) Services() wire.ServiceFlag { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.services } // UserAgent returns the user agent of the remote peer. // // This function is safe for concurrent access. func (p *Peer) UserAgent() string { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.userAgent } // SubnetworkID returns peer subnetwork ID func (p *Peer) SubnetworkID() *subnetworkid.SubnetworkID { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.cfg.SubnetworkID } // LastPingNonce returns the last ping nonce of the remote peer. // // This function is safe for concurrent access. func (p *Peer) LastPingNonce() uint64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() return p.lastPingNonce } // LastPingTime returns the last ping time of the remote peer. // // This function is safe for concurrent access. func (p *Peer) LastPingTime() time.Time { p.statsMtx.RLock() defer p.statsMtx.RUnlock() return p.lastPingTime } // LastPingMicros returns the last ping micros of the remote peer. // // This function is safe for concurrent access. func (p *Peer) LastPingMicros() int64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() return p.lastPingMicros } // VersionKnown returns the whether or not the version of a peer is known // locally. // // This function is safe for concurrent access. func (p *Peer) VersionKnown() bool { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.versionKnown } // VerAckReceived returns whether or not a verack message was received by the // peer. // // This function is safe for concurrent access. func (p *Peer) VerAckReceived() bool { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.verAckReceived } // ProtocolVersion returns the negotiated peer protocol version. // // This function is safe for concurrent access. func (p *Peer) ProtocolVersion() uint32 { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() return p.protocolVersion } // SelectedTipHash returns the selected tip of the peer. // // This function is safe for concurrent access. func (p *Peer) SelectedTipHash() *daghash.Hash { p.statsMtx.RLock() defer p.statsMtx.RUnlock() return p.selectedTipHash } // SetSelectedTipHash sets the selected tip of the peer. func (p *Peer) SetSelectedTipHash(selectedTipHash *daghash.Hash) { p.statsMtx.Lock() defer p.statsMtx.Unlock() p.selectedTipHash = selectedTipHash } // IsSelectedTipKnown returns whether or not this peer selected // tip is a known block. // // This function is safe for concurrent access. func (p *Peer) IsSelectedTipKnown() bool { return !p.cfg.IsInDAG(p.selectedTipHash) } // LastSend returns the last send time of the peer. // // This function is safe for concurrent access. func (p *Peer) LastSend() time.Time { return time.Unix(atomic.LoadInt64(&p.lastSend), 0) } // LastRecv returns the last recv time of the peer. // // This function is safe for concurrent access. func (p *Peer) LastRecv() time.Time { return time.Unix(atomic.LoadInt64(&p.lastRecv), 0) } // BytesSent returns the total number of bytes sent by the peer. // // This function is safe for concurrent access. func (p *Peer) BytesSent() uint64 { return atomic.LoadUint64(&p.bytesSent) } // BytesReceived returns the total number of bytes received by the peer. // // This function is safe for concurrent access. func (p *Peer) BytesReceived() uint64 { return atomic.LoadUint64(&p.bytesReceived) } // TimeConnected returns the time at which the peer connected. // // This function is safe for concurrent access. func (p *Peer) TimeConnected() time.Time { p.statsMtx.RLock() defer p.statsMtx.RUnlock() return p.timeConnected } // TimeOffset returns the number of seconds the local time was offset from the // time the peer reported during the initial negotiation phase. Negative values // indicate the remote peer's time is before the local time. // // This function is safe for concurrent access. func (p *Peer) TimeOffset() int64 { p.statsMtx.RLock() defer p.statsMtx.RUnlock() return p.timeOffset } // localVersionMsg creates a version message that can be used to send to the // remote peer. func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { selectedTipHash := p.cfg.SelectedTipHash() theirNA := p.na // If we are behind a proxy and the connection comes from the proxy then // we return an unroutable address as their address. This is to prevent // leaking the tor proxy address. if p.cfg.Proxy != "" { proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy) // invalid proxy means poorly configured, be on the safe side. if err != nil || p.na.IP.String() == proxyaddress { theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0) } } // Create a wire.NetAddress with only the services set to use as the // "addrme" in the version message. // // Older nodes previously added the IP and port information to the // address manager which proved to be unreliable as an inbound // connection from a peer didn't necessarily mean the peer itself // accepted inbound connections. // // Also, the timestamp is unused in the version message. ourNA := &wire.NetAddress{ Services: p.cfg.Services, } // Generate a unique nonce for this peer so self connections can be // detected. This is accomplished by adding it to a size-limited map of // recently seen nonces. nonce := uint64(rand.Int63()) sentNonces.Add(nonce) subnetworkID := p.cfg.SubnetworkID // Version message. msg := wire.NewMsgVersion(ourNA, theirNA, nonce, selectedTipHash, subnetworkID) msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion, p.cfg.UserAgentComments...) msg.AddrYou.Services = wire.SFNodeNetwork // Advertise the services flag msg.Services = p.cfg.Services // Advertise our max supported protocol version. msg.ProtocolVersion = int32(p.cfg.ProtocolVersion) // Advertise if inv messages for transactions are desired. msg.DisableRelayTx = p.cfg.DisableRelayTx return msg, nil } // PushAddrMsg sends an addr message to the connected peer using the provided // addresses. This function is useful over manually sending the message via // QueueMessage since it automatically limits the addresses to the maximum // number allowed by the message and randomizes the chosen addresses when there // are too many. It returns the addresses that were actually sent and no // message will be sent if there are no entries in the provided addresses slice. // // This function is safe for concurrent access. func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress, subnetworkID *subnetworkid.SubnetworkID) ([]*wire.NetAddress, error) { addressCount := len(addresses) // Nothing to send. if addressCount == 0 { return nil, nil } msg := wire.NewMsgAddr(false, subnetworkID) msg.AddrList = make([]*wire.NetAddress, addressCount) copy(msg.AddrList, addresses) // Randomize the addresses sent if there are more than the maximum allowed. if addressCount > wire.MaxAddrPerMsg { // Shuffle the address list. for i := 0; i < wire.MaxAddrPerMsg; i++ { j := i + rand.Intn(addressCount-i) msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i] } // Truncate it to the maximum size. msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg] } p.QueueMessage(msg, nil) return msg.AddrList, nil } // PushGetBlockLocatorMsg sends a getlocator message for the provided high // and low hash. // // This function is safe for concurrent access. func (p *Peer) PushGetBlockLocatorMsg(highHash, lowHash *daghash.Hash) { msg := wire.NewMsgGetBlockLocator(highHash, lowHash) p.QueueMessage(msg, nil) } func (p *Peer) isDuplicateGetBlockInvsMsg(lowHash, highHash *daghash.Hash) bool { p.prevGetBlockInvsMtx.Lock() defer p.prevGetBlockInvsMtx.Unlock() return p.prevGetBlockInvsHigh != nil && p.prevGetBlockInvsLow != nil && lowHash != nil && highHash.IsEqual(p.prevGetBlockInvsHigh) && lowHash.IsEqual(p.prevGetBlockInvsLow) } // PushGetBlockInvsMsg sends a getblockinvs message for the provided block locator // and high hash. It will ignore back-to-back duplicate requests. // // This function is safe for concurrent access. func (p *Peer) PushGetBlockInvsMsg(lowHash, highHash *daghash.Hash) error { // Filter duplicate getblockinvs requests. if p.isDuplicateGetBlockInvsMsg(lowHash, highHash) { log.Tracef("Filtering duplicate [getblockinvs] with low "+ "hash %s, high hash %s", lowHash, highHash) return nil } // Construct the getblockinvs request and queue it to be sent. msg := wire.NewMsgGetBlockInvs(lowHash, highHash) p.QueueMessage(msg, nil) // Update the previous getblockinvs request information for filtering // duplicates. p.prevGetBlockInvsMtx.Lock() defer p.prevGetBlockInvsMtx.Unlock() p.prevGetBlockInvsLow = lowHash p.prevGetBlockInvsHigh = highHash return nil } // PushBlockLocatorMsg sends a locator message for the provided block locator. // // This function is safe for concurrent access. func (p *Peer) PushBlockLocatorMsg(locator blockdag.BlockLocator) error { // Construct the locator request and queue it to be sent. msg := wire.NewMsgBlockLocator() for _, hash := range locator { err := msg.AddBlockLocatorHash(hash) if err != nil { return err } } p.QueueMessage(msg, nil) return nil } // PushRejectMsg sends a reject message for the provided command, reject code, // reject reason, and hash. The hash will only be used when the command is a tx // or block and should be nil in other cases. The wait parameter will cause the // function to block until the reject message has actually been sent. // // This function is safe for concurrent access. func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *daghash.Hash, wait bool) { msg := wire.NewMsgReject(command, code, reason) if command == wire.CmdTx || command == wire.CmdBlock { if hash == nil { log.Warnf("Sending a reject message for command "+ "type %s which should have specified a hash "+ "but does not", command) hash = &daghash.ZeroHash } msg.Hash = hash } // Send the message without waiting if the caller has not requested it. if !wait { p.QueueMessage(msg, nil) return } // Send the message and block until it has been sent before returning. doneChan := make(chan struct{}, 1) p.QueueMessage(msg, doneChan) <-doneChan } // handleRemoteVersionMsg is invoked when a version kaspa message is received // from the remote peer. It will return an error if the remote peer's version // is not compatible with ours. func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { // Detect self connections. if !allowSelfConns && sentNonces.Exists(msg.Nonce) { return errors.New("disconnecting peer connected to self") } // Notify and disconnect clients that have a protocol version that is // too old. // // NOTE: If minAcceptableProtocolVersion is raised to be higher than // wire.RejectVersion, this should send a reject packet before // disconnecting. if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion { reason := fmt.Sprintf("protocol version must be %d or greater", minAcceptableProtocolVersion) return errors.New(reason) } // Disconnect if: // - we are a full node and the outbound connection we've initiated is a partial node // - the remote node is partial and our subnetwork doesn't match their subnetwork isLocalNodeFull := p.cfg.SubnetworkID == nil isRemoteNodeFull := msg.SubnetworkID == nil if (isLocalNodeFull && !isRemoteNodeFull && !p.inbound) || (!isLocalNodeFull && !isRemoteNodeFull && !msg.SubnetworkID.IsEqual(p.cfg.SubnetworkID)) { return errors.New("incompatible subnetworks") } p.updateStatsFromVersionMsg(msg) p.updateFlagsFromVersionMsg(msg) return nil } // updateStatsFromVersionMsg updates a bunch of stats including block based stats, and the // peer's time offset. func (p *Peer) updateStatsFromVersionMsg(msg *wire.MsgVersion) { p.statsMtx.Lock() defer p.statsMtx.Unlock() p.selectedTipHash = msg.SelectedTipHash p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix() } func (p *Peer) updateFlagsFromVersionMsg(msg *wire.MsgVersion) { // Negotiate the protocol version. p.flagsMtx.Lock() defer p.flagsMtx.Unlock() p.advertisedProtoVer = uint32(msg.ProtocolVersion) p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer) p.versionKnown = true log.Debugf("Negotiated protocol version %d for peer %s", p.protocolVersion, p) // Set the peer's ID. p.id = atomic.AddInt32(&nodeCount, 1) // Set the supported services for the peer to what the remote peer // advertised. p.services = msg.Services // Set the remote peer's user agent. p.userAgent = msg.UserAgent } // handlePingMsg is invoked when a peer receives a ping kaspa message. For // recent clients (protocol version > BIP0031Version), it replies with a pong // message. For older clients, it does nothing and anything other than failure // is considered a successful ping. func (p *Peer) handlePingMsg(msg *wire.MsgPing) { // Include nonce from ping so pong can be identified. p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil) } // handlePongMsg is invoked when a peer receives a pong kaspa message. It // updates the ping statistics as required for recent clients. func (p *Peer) handlePongMsg(msg *wire.MsgPong) { // Arguably we could use a buffered channel here sending data // in a fifo manner whenever we send a ping, or a list keeping track of // the times of each ping. For now we just make a best effort and // only record stats if it was for the last ping sent. Any preceding // and overlapping pings will be ignored. It is unlikely to occur // without large usage of the ping rpc call since we ping infrequently // enough that if they overlap we would have timed out the peer. p.statsMtx.Lock() defer p.statsMtx.Unlock() if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce { p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds() p.lastPingMicros /= 1000 // convert to usec. p.lastPingNonce = 0 } } // readMessage reads the next kaspa message from the peer with logging. func (p *Peer) readMessage() (wire.Message, []byte, error) { n, msg, buf, err := wire.ReadMessageN(p.conn, p.ProtocolVersion(), p.cfg.DAGParams.Net) atomic.AddUint64(&p.bytesReceived, uint64(n)) if p.cfg.Listeners.OnRead != nil { p.cfg.Listeners.OnRead(p, n, msg, err) } if err != nil { return nil, nil, err } // Use closures to log expensive operations so they are only run when // the logging level requires it. log.Debugf("%s", logger.NewLogClosure(func() string { // Debug summary of message. summary := messageSummary(msg) if len(summary) > 0 { summary = " (" + summary + ")" } return fmt.Sprintf("Received %s%s from %s", msg.Command(), summary, p) })) log.Tracef("%s", logger.NewLogClosure(func() string { return spew.Sdump(msg) })) log.Tracef("%s", logger.NewLogClosure(func() string { return spew.Sdump(buf) })) return msg, buf, nil } // writeMessage sends a kaspa message to the peer with logging. func (p *Peer) writeMessage(msg wire.Message) error { // Don't do anything if we're disconnecting. if atomic.LoadInt32(&p.disconnect) != 0 { return nil } // Use closures to log expensive operations so they are only run when // the logging level requires it. log.Debugf("%s", logger.NewLogClosure(func() string { // Debug summary of message. summary := messageSummary(msg) if len(summary) > 0 { summary = " (" + summary + ")" } return fmt.Sprintf("Sending %s%s to %s", msg.Command(), summary, p) })) log.Tracef("%s", logger.NewLogClosure(func() string { return spew.Sdump(msg) })) log.Tracef("%s", logger.NewLogClosure(func() string { var buf bytes.Buffer _, err := wire.WriteMessageN(&buf, msg, p.ProtocolVersion(), p.cfg.DAGParams.Net) if err != nil { return err.Error() } return spew.Sdump(buf.Bytes()) })) // Write the message to the peer. n, err := wire.WriteMessageN(p.conn, msg, p.ProtocolVersion(), p.cfg.DAGParams.Net) atomic.AddUint64(&p.bytesSent, uint64(n)) if p.cfg.Listeners.OnWrite != nil { p.cfg.Listeners.OnWrite(p, n, msg, err) } return err } // isAllowedReadError returns whether or not the passed error is allowed without // disconnecting the peer. In particular, regression tests need to be allowed // to send malformed messages without the peer being disconnected. func (p *Peer) isAllowedReadError(err error) bool { // Only allow read errors in regression test mode. if p.cfg.DAGParams.Net != wire.Regtest { return false } // Don't allow the error if it's not specifically a malformed message error. if msgErr := &(wire.MessageError{}); !errors.As(err, &msgErr) { return false } // Don't allow the error if it's not coming from localhost or the // hostname can't be determined for some reason. host, _, err := net.SplitHostPort(p.addr) if err != nil { return false } if host != "127.0.0.1" && host != "localhost" { return false } // Allowed if all checks passed. return true } // shouldHandleReadError returns whether or not the passed error, which is // expected to have come from reading from the remote peer in the inHandler, // should be logged and responded to with a reject message. func (p *Peer) shouldHandleReadError(err error) bool { // No logging or reject message when the peer is being forcibly // disconnected. if atomic.LoadInt32(&p.disconnect) != 0 { return false } // No logging or reject message when the remote peer has been // disconnected. if err == io.EOF { return false } var opErr *net.OpError if ok := errors.As(err, &opErr); ok && !opErr.Temporary() { return false } return true } // maybeAddDeadline potentially adds a deadline for the appropriate expected // response for the passed wire protocol command to the pending responses map. func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) { // Setup a deadline for each message being sent that expects a response. // // NOTE: Pings are intentionally ignored here since they are typically // sent asynchronously and as a result of a long backlock of messages, // such as is typical in the case of initial block download, the // response won't be received in time. deadline := time.Now().Add(stallResponseTimeout) switch msgCmd { case wire.CmdVersion: // Expects a verack message. pendingResponses[wire.CmdVerAck] = deadline case wire.CmdGetBlockInvs: // Expects an inv message. pendingResponses[wire.CmdInv] = deadline case wire.CmdGetData: // Expects a block, merkleblock, tx, or notfound message. pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdMerkleBlock] = deadline pendingResponses[wire.CmdTx] = deadline pendingResponses[wire.CmdNotFound] = deadline case wire.CmdGetSelectedTip: // Expects a selected tip message. pendingResponses[wire.CmdSelectedTip] = deadline } } // stallHandler handles stall detection for the peer. This entails keeping // track of expected responses and assigning them deadlines while accounting for // the time spent in callbacks. It must be run as a goroutine. func (p *Peer) stallHandler() { // These variables are used to adjust the deadline times forward by the // time it takes callbacks to execute. This is done because new // messages aren't read until the previous one is finished processing // (which includes callbacks), so the deadline for receiving a response // for a given message must account for the processing time as well. var handlerActive bool var handlersStartTime time.Time var deadlineOffset time.Duration // pendingResponses tracks the expected response deadline times. pendingResponses := make(map[string]time.Time) // stallTicker is used to periodically check pending responses that have // exceeded the expected deadline and disconnect the peer due to // stalling. stallTicker := time.NewTicker(stallTickInterval) defer stallTicker.Stop() // ioStopped is used to detect when both the input and output handler // goroutines are done. var ioStopped bool out: for { select { case msg := <-p.stallControl: switch msg.command { case sccSendMessage: // Add a deadline for the expected response // message if needed. p.maybeAddDeadline(pendingResponses, msg.message.Command()) case sccReceiveMessage: // Remove received messages from the expected // response map. Since certain commands expect // one of a group of responses, remove // everything in the expected group accordingly. switch msgCmd := msg.message.Command(); msgCmd { case wire.CmdBlock: fallthrough case wire.CmdMerkleBlock: fallthrough case wire.CmdTx: fallthrough case wire.CmdNotFound: delete(pendingResponses, wire.CmdBlock) delete(pendingResponses, wire.CmdMerkleBlock) delete(pendingResponses, wire.CmdTx) delete(pendingResponses, wire.CmdNotFound) default: delete(pendingResponses, msgCmd) } case sccHandlerStart: // Warn on unbalanced callback signalling. if handlerActive { log.Warn("Received handler start " + "control command while a " + "handler is already active") continue } handlerActive = true handlersStartTime = time.Now() case sccHandlerDone: // Warn on unbalanced callback signalling. if !handlerActive { log.Warn("Received handler done " + "control command when a " + "handler is not already active") continue } // Extend active deadlines by the time it took // to execute the callback. duration := time.Since(handlersStartTime) deadlineOffset += duration handlerActive = false default: log.Warnf("Unsupported message command %d", msg.command) } case <-stallTicker.C: // Calculate the offset to apply to the deadline based // on how long the handlers have taken to execute since // the last tick. now := time.Now() offset := deadlineOffset if handlerActive { offset += now.Sub(handlersStartTime) } // Disconnect the peer if any of the pending responses // don't arrive by their adjusted deadline. for command, deadline := range pendingResponses { if now.Before(deadline.Add(offset)) { continue } log.Debugf("Peer %s appears to be stalled or "+ "misbehaving, %s timeout -- "+ "disconnecting", p, command) p.Disconnect() break } // Reset the deadline offset for the next tick. deadlineOffset = 0 case <-p.inQuit: // The stall handler can exit once both the input and // output handler goroutines are done. if ioStopped { break out } ioStopped = true case <-p.outQuit: // The stall handler can exit once both the input and // output handler goroutines are done. if ioStopped { break out } ioStopped = true } } // Drain any wait channels before going away so there is nothing left // waiting on this goroutine. cleanup: for { select { case <-p.stallControl: default: break cleanup } } log.Tracef("Peer stall handler done for %s", p) } // inHandler handles all incoming messages for the peer. It must be run as a // goroutine. func (p *Peer) inHandler() { // The timer is stopped when a new message is received and reset after it // is processed. idleTimer := spawnAfter(idleTimeout, func() { log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout) p.Disconnect() }) out: for atomic.LoadInt32(&p.disconnect) == 0 { // Read a message and stop the idle timer as soon as the read // is done. The timer is reset below for the next iteration if // needed. rmsg, buf, err := p.readMessage() idleTimer.Stop() if err != nil { // In order to allow regression tests with malformed messages, don't // disconnect the peer when we're in regression test mode and the // error is one of the allowed errors. if p.isAllowedReadError(err) { log.Errorf("Allowed test error from %s: %s", p, err) idleTimer.Reset(idleTimeout) continue } // Only log the error and send reject message if the // local peer is not forcibly disconnecting and the // remote peer has not disconnected. if p.shouldHandleReadError(err) { errMsg := fmt.Sprintf("Can't read message from %s: %s", p, err) if err != io.ErrUnexpectedEOF { log.Errorf(errMsg) } // Push a reject message for the malformed message and wait for // the message to be sent before disconnecting. // // NOTE: Ideally this would include the command in the header if // at least that much of the message was valid, but that is not // currently exposed by wire, so just used malformed for the // command. p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil, true) } break out } atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} // Handle each supported message type. p.stallControl <- stallControlMsg{sccHandlerStart, rmsg} switch msg := rmsg.(type) { case *wire.MsgVersion: p.PushRejectMsg(msg.Command(), wire.RejectDuplicate, "duplicate version message", nil, true) break out case *wire.MsgVerAck: // No read lock is necessary because verAckReceived is not written // to in any other goroutine. if p.verAckReceived { log.Infof("Already received 'verack' from peer %s -- "+ "disconnecting", p) break out } p.markVerAckReceived() if p.cfg.Listeners.OnVerAck != nil { p.cfg.Listeners.OnVerAck(p, msg) } case *wire.MsgGetAddr: if p.cfg.Listeners.OnGetAddr != nil { p.cfg.Listeners.OnGetAddr(p, msg) } case *wire.MsgAddr: if p.cfg.Listeners.OnAddr != nil { p.cfg.Listeners.OnAddr(p, msg) } case *wire.MsgPing: p.handlePingMsg(msg) if p.cfg.Listeners.OnPing != nil { p.cfg.Listeners.OnPing(p, msg) } case *wire.MsgPong: p.handlePongMsg(msg) if p.cfg.Listeners.OnPong != nil { p.cfg.Listeners.OnPong(p, msg) } case *wire.MsgTx: if p.cfg.Listeners.OnTx != nil { p.cfg.Listeners.OnTx(p, msg) } case *wire.MsgBlock: if p.cfg.Listeners.OnBlock != nil { p.cfg.Listeners.OnBlock(p, msg, buf) } case *wire.MsgInv: if p.cfg.Listeners.OnInv != nil { p.cfg.Listeners.OnInv(p, msg) } case *wire.MsgNotFound: if p.cfg.Listeners.OnNotFound != nil { p.cfg.Listeners.OnNotFound(p, msg) } case *wire.MsgGetData: if p.cfg.Listeners.OnGetData != nil { p.cfg.Listeners.OnGetData(p, msg) } case *wire.MsgGetBlockLocator: if p.cfg.Listeners.OnGetBlockLocator != nil { p.cfg.Listeners.OnGetBlockLocator(p, msg) } case *wire.MsgBlockLocator: if p.cfg.Listeners.OnBlockLocator != nil { p.cfg.Listeners.OnBlockLocator(p, msg) } case *wire.MsgGetBlockInvs: if p.cfg.Listeners.OnGetBlockInvs != nil { p.cfg.Listeners.OnGetBlockInvs(p, msg) } case *wire.MsgFeeFilter: if p.cfg.Listeners.OnFeeFilter != nil { p.cfg.Listeners.OnFeeFilter(p, msg) } case *wire.MsgFilterAdd: if p.cfg.Listeners.OnFilterAdd != nil { p.cfg.Listeners.OnFilterAdd(p, msg) } case *wire.MsgFilterClear: if p.cfg.Listeners.OnFilterClear != nil { p.cfg.Listeners.OnFilterClear(p, msg) } case *wire.MsgFilterLoad: if p.cfg.Listeners.OnFilterLoad != nil { p.cfg.Listeners.OnFilterLoad(p, msg) } case *wire.MsgMerkleBlock: if p.cfg.Listeners.OnMerkleBlock != nil { p.cfg.Listeners.OnMerkleBlock(p, msg) } case *wire.MsgReject: if p.cfg.Listeners.OnReject != nil { p.cfg.Listeners.OnReject(p, msg) } case *wire.MsgGetSelectedTip: if p.cfg.Listeners.OnGetSelectedTip != nil { p.cfg.Listeners.OnGetSelectedTip() } case *wire.MsgSelectedTip: if p.cfg.Listeners.OnSelectedTip != nil { p.cfg.Listeners.OnSelectedTip(p, msg) } default: log.Debugf("Received unhandled message of type %s "+ "from %s", rmsg.Command(), p) } p.stallControl <- stallControlMsg{sccHandlerDone, rmsg} // A message was received so reset the idle timer. idleTimer.Reset(idleTimeout) } // Ensure the idle timer is stopped to avoid leaking the resource. idleTimer.Stop() // Ensure connection is closed. p.Disconnect() close(p.inQuit) log.Tracef("Peer input handler done for %s", p) } func (p *Peer) markVerAckReceived() { p.flagsMtx.Lock() defer p.flagsMtx.Unlock() p.verAckReceived = true } // queueHandler handles the queuing of outgoing data for the peer. This runs as // a muxer for various sources of input so we can ensure that server and peer // handlers will not block on us sending a message. That data is then passed on // to outHandler to be actually written. func (p *Peer) queueHandler() { pendingMsgs := list.New() invSendQueue := list.New() trickleTicker := time.NewTicker(trickleTimeout) defer trickleTicker.Stop() // We keep the waiting flag so that we know if we have a message queued // to the outHandler or not. We could use the presence of a head of // the list for this but then we have rather racy concerns about whether // it has gotten it at cleanup time - and thus who sends on the // message's done channel. To avoid such confusion we keep a different // flag and pendingMsgs only contains messages that we have not yet // passed to outHandler. waiting := false // To avoid duplication below. queuePacket := func(msg outMsg, list *list.List, waiting bool) bool { if !waiting { p.sendQueue <- msg } else { list.PushBack(msg) } // we are always waiting now. return true } out: for { select { case msg := <-p.outputQueue: waiting = queuePacket(msg, pendingMsgs, waiting) // This channel is notified when a message has been sent across // the network socket. case <-p.sendDoneQueue: // No longer waiting if there are no more messages // in the pending messages queue. next := pendingMsgs.Front() if next == nil { waiting = false continue } // Notify the outHandler about the next item to // asynchronously send. val := pendingMsgs.Remove(next) p.sendQueue <- val.(outMsg) case iv := <-p.outputInvChan: // No handshake? They'll find out soon enough. if p.VersionKnown() { // If this is a new block, then we'll blast it // out immediately, sipping the inv trickle // queue. if iv.Type == wire.InvTypeBlock { invMsg := wire.NewMsgInvSizeHint(1) invMsg.AddInvVect(iv) waiting = queuePacket(outMsg{msg: invMsg}, pendingMsgs, waiting) } else { invSendQueue.PushBack(iv) } } case <-trickleTicker.C: // Don't send anything if we're disconnecting or there // is no queued inventory. // version is known if send queue has any entries. if atomic.LoadInt32(&p.disconnect) != 0 || invSendQueue.Len() == 0 { continue } // Create and send as many inv messages as needed to // drain the inventory send queue. invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len())) for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() { iv := invSendQueue.Remove(e).(*wire.InvVect) // Don't send inventory that became known after // the initial check. if p.knownInventory.Exists(iv) { continue } invMsg.AddInvVect(iv) if len(invMsg.InvList) >= maxInvTrickleSize { waiting = queuePacket( outMsg{msg: invMsg}, pendingMsgs, waiting) invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len())) } // Add the inventory that is being relayed to // the known inventory for the peer. p.AddKnownInventory(iv) } if len(invMsg.InvList) > 0 { waiting = queuePacket(outMsg{msg: invMsg}, pendingMsgs, waiting) } case <-p.quit: break out } } // Drain any wait channels before we go away so we don't leave something // waiting for us. for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() { val := pendingMsgs.Remove(e) msg := val.(outMsg) if msg.doneChan != nil { msg.doneChan <- struct{}{} } } cleanup: for { select { case msg := <-p.outputQueue: if msg.doneChan != nil { msg.doneChan <- struct{}{} } case <-p.outputInvChan: // Just drain channel // sendDoneQueue is buffered so doesn't need draining. default: break cleanup } } close(p.queueQuit) log.Tracef("Peer queue handler done for %s", p) } // outHandler handles all outgoing messages for the peer. It must be run as a // goroutine. It uses a buffered channel to serialize output messages while // allowing the sender to continue running asynchronously. func (p *Peer) outHandler() { out: for { select { case msg := <-p.sendQueue: switch m := msg.msg.(type) { case *wire.MsgPing: func() { p.statsMtx.Lock() defer p.statsMtx.Unlock() p.lastPingNonce = m.Nonce p.lastPingTime = time.Now() }() } p.stallControl <- stallControlMsg{sccSendMessage, msg.msg} err := p.writeMessage(msg.msg) if err != nil { p.Disconnect() log.Errorf("Failed to send message to "+ "%s: %s", p, err) if msg.doneChan != nil { msg.doneChan <- struct{}{} } continue } // At this point, the message was successfully sent, so // update the last send time, signal the sender of the // message that it has been sent (if requested), and // signal the send queue to the deliver the next queued // message. atomic.StoreInt64(&p.lastSend, time.Now().Unix()) if msg.doneChan != nil { msg.doneChan <- struct{}{} } p.sendDoneQueue <- struct{}{} case <-p.quit: break out } } <-p.queueQuit // Drain any wait channels before we go away so we don't leave something // waiting for us. We have waited on queueQuit and thus we can be sure // that we will not miss anything sent on sendQueue. cleanup: for { select { case msg := <-p.sendQueue: if msg.doneChan != nil { msg.doneChan <- struct{}{} } // no need to send on sendDoneQueue since queueHandler // has been waited on and already exited. default: break cleanup } } close(p.outQuit) log.Tracef("Peer output handler done for %s", p) } // pingHandler periodically pings the peer. It must be run as a goroutine. func (p *Peer) pingHandler() { pingTicker := time.NewTicker(pingInterval) defer pingTicker.Stop() out: for { select { case <-pingTicker.C: nonce, err := random.Uint64() if err != nil { log.Errorf("Not sending ping to %s: %s", p, err) continue } p.QueueMessage(wire.NewMsgPing(nonce), nil) case <-p.quit: break out } } } // QueueMessage adds the passed kaspa message to the peer send queue. // // This function is safe for concurrent access. func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) { // Avoid risk of deadlock if goroutine already exited. The goroutine // we will be sending to hangs around until it knows for a fact that // it is marked as disconnected and *then* it drains the channels. if !p.Connected() { if doneChan != nil { spawn(func() { doneChan <- struct{}{} }) } return } p.outputQueue <- outMsg{msg: msg, doneChan: doneChan} } // QueueInventory adds the passed inventory to the inventory send queue which // might not be sent right away, rather it is trickled to the peer in batches. // Inventory that the peer is already known to have is ignored. // // This function is safe for concurrent access. func (p *Peer) QueueInventory(invVect *wire.InvVect) { // Don't add the inventory to the send queue if the peer is already // known to have it. if p.knownInventory.Exists(invVect) { return } // Avoid risk of deadlock if goroutine already exited. The goroutine // we will be sending to hangs around until it knows for a fact that // it is marked as disconnected and *then* it drains the channels. if !p.Connected() { return } p.outputInvChan <- invVect } // AssociateConnection associates the given conn to the peer. Calling this // function when the peer is already connected will have no effect. func (p *Peer) AssociateConnection(conn net.Conn) { // Already connected? if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) { return } p.conn = conn p.timeConnected = time.Now() if p.inbound { p.addr = p.conn.RemoteAddr().String() // Set up a NetAddress for the peer to be used with AddrManager. We // only do this inbound because outbound set this up at connection time // and no point recomputing. na, err := newNetAddress(p.conn.RemoteAddr(), p.services) if err != nil { log.Errorf("Cannot create remote net address: %s", err) p.Disconnect() return } p.na = na } spawn(func() { if err := p.start(); err != nil { log.Debugf("Cannot start peer %s: %s", p, err) p.Disconnect() } }) } // Connected returns whether or not the peer is currently connected. // // This function is safe for concurrent access. func (p *Peer) Connected() bool { return atomic.LoadInt32(&p.connected) != 0 && atomic.LoadInt32(&p.disconnect) == 0 } // Disconnect disconnects the peer by closing the connection. Calling this // function when the peer is already disconnected or in the process of // disconnecting will have no effect. func (p *Peer) Disconnect() { if atomic.AddInt32(&p.disconnect, 1) != 1 { return } log.Tracef("Disconnecting %s", p) if atomic.LoadInt32(&p.connected) != 0 { p.conn.Close() } close(p.quit) } // start begins processing input and output messages. func (p *Peer) start() error { log.Tracef("Starting peer %s", p) negotiateErr := make(chan error, 1) spawn(func() { if p.inbound { negotiateErr <- p.negotiateInboundProtocol() } else { negotiateErr <- p.negotiateOutboundProtocol() } }) // Negotiate the protocol within the specified negotiateTimeout. select { case err := <-negotiateErr: if err != nil { return err } case <-time.After(negotiateTimeout): return errors.New("protocol negotiation timeout") } log.Debugf("Connected to %s", p.Addr()) // The protocol has been negotiated successfully so start processing input // and output messages. spawn(p.stallHandler) spawn(p.inHandler) spawn(p.queueHandler) spawn(p.outHandler) spawn(p.pingHandler) // Send our verack message now that the IO processing machinery has started. p.QueueMessage(wire.NewMsgVerAck(), nil) return nil } // WaitForDisconnect waits until the peer has completely disconnected and all // resources are cleaned up. This will happen if either the local or remote // side has been disconnected or the peer is forcibly disconnected via // Disconnect. func (p *Peer) WaitForDisconnect() { <-p.quit } // readRemoteVersionMsg waits for the next message to arrive from the remote // peer. If the next message is not a version message or the version is not // acceptable then return an error. func (p *Peer) readRemoteVersionMsg() error { // Read their version message. msg, _, err := p.readMessage() if err != nil { return err } remoteVerMsg, ok := msg.(*wire.MsgVersion) if !ok { errStr := "A version message must precede all others" log.Errorf(errStr) rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed, errStr) return p.writeMessage(rejectMsg) } if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil { return err } if p.cfg.Listeners.OnVersion != nil { p.cfg.Listeners.OnVersion(p, remoteVerMsg) } return nil } // writeLocalVersionMsg writes our version message to the remote peer. func (p *Peer) writeLocalVersionMsg() error { localVerMsg, err := p.localVersionMsg() if err != nil { return err } return p.writeMessage(localVerMsg) } // negotiateInboundProtocol waits to receive a version message from the peer // then sends our version message. If the events do not occur in that order then // it returns an error. func (p *Peer) negotiateInboundProtocol() error { if err := p.readRemoteVersionMsg(); err != nil { return err } return p.writeLocalVersionMsg() } // negotiateOutboundProtocol sends our version message then waits to receive a // version message from the peer. If the events do not occur in that order then // it returns an error. func (p *Peer) negotiateOutboundProtocol() error { if err := p.writeLocalVersionMsg(); err != nil { return err } return p.readRemoteVersionMsg() } // newPeerBase returns a new base kaspa peer based on the inbound flag. This // is used by the NewInboundPeer and NewOutboundPeer functions to perform base // setup needed by both types of peers. func newPeerBase(origCfg *Config, inbound bool) *Peer { // Default to the max supported protocol version if not specified by the // caller. cfg := *origCfg // Copy to avoid mutating caller. if cfg.ProtocolVersion == 0 { cfg.ProtocolVersion = MaxProtocolVersion } // Set the DAG parameters to testnet if the caller did not specify any. if cfg.DAGParams == nil { cfg.DAGParams = &dagconfig.TestnetParams } p := Peer{ inbound: inbound, knownInventory: newMruInventoryMap(maxKnownInventory), stallControl: make(chan stallControlMsg, 1), // nonblocking sync outputQueue: make(chan outMsg, outputBufferSize), sendQueue: make(chan outMsg, 1), // nonblocking sync sendDoneQueue: make(chan struct{}, 1), // nonblocking sync outputInvChan: make(chan *wire.InvVect, outputBufferSize), inQuit: make(chan struct{}), queueQuit: make(chan struct{}), outQuit: make(chan struct{}), quit: make(chan struct{}), cfg: cfg, // Copy so caller can't mutate. services: cfg.Services, protocolVersion: cfg.ProtocolVersion, } return &p } // NewInboundPeer returns a new inbound kaspa peer. Use Start to begin // processing incoming and outgoing messages. func NewInboundPeer(cfg *Config) *Peer { return newPeerBase(cfg, true) } // NewOutboundPeer returns a new outbound kaspa peer. func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) { p := newPeerBase(cfg, false) p.addr = addr host, portStr, err := net.SplitHostPort(addr) if err != nil { return nil, err } port, err := strconv.ParseUint(portStr, 10, 16) if err != nil { return nil, err } if cfg.HostToNetAddress != nil { na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services) if err != nil { return nil, err } p.na = na } else { p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), cfg.Services) } return p, nil } func init() { rand.Seed(time.Now().UnixNano()) }