Files
kaspad/protocol/peer/peer.go
Ori Newman 04b578cee1 [NOD-1137] Implement handshake protocol (#792)
* [NOD-1126] Implement block relay flow

* [NOD-1126] Implement block relay flow

* [NOD-1126] Add StartGetRelayBlocksListener

* [NOD-1126] Integrate with new interface

* [NOD-1126] Fix comments

* [NOD-1126] Refactor protocol.go

* [NOD-1126] Split long lines

* [NOD-1126] Fix comment

* [NOD-1126] move sharedRequestedBlocks to a separate file

* [NOD-1126] Fix error message

* [NOD-1126] Move handleInv to StartBlockRelay

* [NOD-1126] Create hashesQueueSet type

* [NOD-1126] Make deleteFromRequestedBlocks a method

* [NOD-1126] Fix comment

* [NOD-1126] Add block logger

* [NOD-1126] Rename advertisedProtoVer->advertisedProtocolVer

* [NOD-1126] Fix comment and an error message

* [NOD-1126] Remove redundant loop

* [NOD-1126] Move requestBlocks upper

* [NOD-1126] Remove exiting blocks in requestedBlocks from hashesToRequest

* [NOD-1126] Change comment

* [NOD-1126] Rename stallResponseTimeout->timeout

* [NOD-1126] Use switch inside readMsgBlock

* [NOD-1126] Fix error message and remove redundant log

* [NOD-1126] Rename pacakge names

* [NOD-1126] Fix comment

* [NOD-1126] Change file names

* [NOD-1126] Convert block to partial if needed

* [NOD-1126] Remove function redeclaration

* [NOD-1126] continue instead of return

* [NOD-1126] Rename LogBlockBlueScore->LogBlock

* [NOD-1126] Add minimum functions to utils

* [NOD-1126] Flip condition on readInv

* [NOD-1126] Rename utilMath->mathUtil

* [NOD-1126] Fix comment

* [NOD-1137] Implement handshake

* [NOD-1137] Replace version's nonce with ID

* [NOD-1137] Remove redundant function

* [NOD-1137] Move handshake to a separate file

* [NOD-1137] Add todo

* [NOD-1137] Replace peer internal id with global peer ID

* [NOD-1137] Add serializer/deserializer to ID

* [NOD-1137] Remove validation from AddUserAgent

* [NOD-1137] Add missing id package

* [NOD-1137] Rename variables

* [NOD-1137] Add comment

* [NOD-1137] Implement GetBestLocalAddress

* [NOD-1137] Implement TODOs

* [NOD-1137] Rename variables

* [NOD-1137] Move errors.Is inside err!=nil branch

* [NOD-1137] Fix erroneous condition on Dequeue

* [NOD-1137] Fix bug in GetReadyPeerIDs

* [NOD-1137] Handle external IP on GetBestLocalAddress

* [NOD-1137] Remove version and verack message types when handshake is over

* [NOD-1137] Add FromBytes to id package

* [NOD-1137] Add protocol error

* [NOD-1137] Add ErrTimeout

* [NOD-1137] Log error only if exists

* [NOD-1137] Replace idFromBytes->id.FromBytes

* [NOD-1137] Add comments

* [NOD-1137] Remove ErrTimeout

* [NOD-1137] Unremove ErrTimeout

* [NOD-1137] Change comment

* [NOD-1137] Use EnqueueWithTimeout everywhere in protocol
2020-07-14 17:20:29 +03:00

172 lines
4.5 KiB
Go

package peer
import (
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/daghash"
mathUtil "github.com/kaspanet/kaspad/util/math"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync"
"sync/atomic"
"time"
)
// Peer holds data about a peer.
type Peer struct {
ready uint32
selectedTipHashMtx sync.RWMutex
selectedTipHash *daghash.Hash
id *id.ID
userAgent string
services wire.ServiceFlag
advertisedProtocolVer uint32 // protocol version advertised by remote
protocolVersion uint32 // negotiated protocol version
disableRelayTx bool
subnetworkID *subnetworkid.SubnetworkID
pingLock sync.RWMutex
lastPingNonce uint64 // The nonce of the last ping we sent
lastPingTime time.Time // Time we sent last ping
lastPingDuration time.Duration // Time for last ping to return
}
// SelectedTipHash returns the selected tip of the peer.
func (p *Peer) SelectedTipHash() (*daghash.Hash, error) {
if atomic.LoadUint32(&p.ready) == 0 {
return nil, errors.New("peer is not ready yet")
}
p.selectedTipHashMtx.RLock()
defer p.selectedTipHashMtx.RUnlock()
return p.selectedTipHash, nil
}
// SetSelectedTipHash sets the selected tip of the peer.
func (p *Peer) SetSelectedTipHash(hash *daghash.Hash) error {
if atomic.LoadUint32(&p.ready) == 0 {
return errors.New("peer is not ready yet")
}
p.selectedTipHashMtx.Lock()
defer p.selectedTipHashMtx.Unlock()
p.selectedTipHash = hash
return nil
}
// SubnetworkID returns the subnetwork the peer is associated with.
// It is nil in full nodes.
func (p *Peer) SubnetworkID() (*subnetworkid.SubnetworkID, error) {
if atomic.LoadUint32(&p.ready) == 0 {
return nil, errors.New("peer is not ready yet")
}
return p.subnetworkID, nil
}
// ID returns the peer ID.
func (p *Peer) ID() (*id.ID, error) {
if atomic.LoadUint32(&p.ready) == 0 {
return nil, errors.New("peer is not ready yet")
}
return p.id, nil
}
// MarkAsReady marks the peer as ready.
func (p *Peer) MarkAsReady() error {
if atomic.AddUint32(&p.ready, 1) != 1 {
return errors.New("peer is already ready")
}
return nil
}
// UpdateFieldsFromMsgVersion updates the peer with the data from the version message.
func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion) {
// Negotiate the protocol version.
p.advertisedProtocolVer = msg.ProtocolVersion
p.protocolVersion = mathUtil.MinUint32(p.protocolVersion, p.advertisedProtocolVer)
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = msg.ID
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
p.disableRelayTx = msg.DisableRelayTx
p.selectedTipHash = msg.SelectedTipHash
p.subnetworkID = msg.SubnetworkID
}
// SetPingPending sets the ping state of the peer to 'pending'
func (p *Peer) SetPingPending(nonce uint64) {
p.pingLock.Lock()
defer p.pingLock.Unlock()
p.lastPingNonce = nonce
p.lastPingTime = time.Now()
}
// SetPingIdle sets the ping state of the peer to 'idle'
func (p *Peer) SetPingIdle() {
p.pingLock.Lock()
defer p.pingLock.Unlock()
p.lastPingNonce = 0
p.lastPingDuration = time.Since(p.lastPingTime)
}
func (p *Peer) String() string {
//TODO(libp2p)
panic("unimplemented")
}
var (
readyPeers = make(map[*id.ID]*Peer, 0)
readyPeersMutex sync.RWMutex
)
// ErrPeerWithSameIDExists signifies that a peer with the same ID already exist.
var ErrPeerWithSameIDExists = errors.New("ready with the same ID already exists")
// AddToReadyPeers marks this peer as ready and adds it to the ready peers list.
func AddToReadyPeers(peer *Peer) error {
readyPeersMutex.RLock()
defer readyPeersMutex.RUnlock()
if _, ok := readyPeers[peer.id]; ok {
return errors.Wrapf(ErrPeerWithSameIDExists, "peer with ID %s already exists", peer.id)
}
err := peer.MarkAsReady()
if err != nil {
return err
}
readyPeers[peer.id] = peer
return nil
}
// GetReadyPeerIDs returns the peer IDs of all the ready peers.
func GetReadyPeerIDs() []*id.ID {
readyPeersMutex.RLock()
defer readyPeersMutex.RUnlock()
peerIDs := make([]*id.ID, len(readyPeers))
i := 0
for peerID := range readyPeers {
peerIDs[i] = peerID
i++
}
return peerIDs
}
// IDExists returns whether there's a peer with the given ID.
func IDExists(peerID *id.ID) bool {
_, ok := readyPeers[peerID]
return ok
}