[NOD-1162] Fixes from integration test (#821)

* [NOD-1162] [FIX] Connection manager should run the moment it adds a request

* [NOD-1162] [FIX] Set peerID on handshake

* [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming

* [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage

* [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock

* [NOD-1162] [FIX] Invert condition

* [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks

* [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage

* [NOD-1162] Add comment
This commit is contained in:
Svarog
2020-07-26 11:44:16 +03:00
committed by GitHub
parent 2c9e5be816
commit 6a18b56587
11 changed files with 78 additions and 14 deletions

View File

@@ -100,6 +100,8 @@ func (c *ConnectionManager) AddConnectionRequest(address string, isPermanent boo
address: address,
isPermanent: isPermanent,
}
c.run()
})
}

View File

@@ -35,6 +35,9 @@ type ConnectionManager struct {
stop uint32
connectionRequestsLock sync.Mutex
resetLoopChan chan struct{}
loopTicker *time.Ticker
}
// New instantiates a new instance of a ConnectionManager
@@ -47,6 +50,8 @@ func New(cfg *config.Config, netAdapter *netadapter.NetAdapter, addressManager *
pendingRequested: map[string]*connectionRequest{},
activeOutgoing: map[string]struct{}{},
activeIncoming: map[string]struct{}{},
resetLoopChan: make(chan struct{}),
loopTicker: time.NewTicker(connectionsLoopInterval),
}
connectPeers := cfg.AddPeers
@@ -79,6 +84,12 @@ func (c *ConnectionManager) Stop() {
for _, connection := range c.netAdapter.Connections() {
_ = c.netAdapter.Disconnect(connection) // Ignore errors since connection might be in the midst of disconnecting
}
c.loopTicker.Stop()
}
func (c *ConnectionManager) run() {
c.resetLoopChan <- struct{}{}
}
func (c *ConnectionManager) initiateConnection(address string) error {
@@ -104,7 +115,7 @@ func (c *ConnectionManager) connectionsLoop() {
c.checkIncomingConnections(connSet)
<-time.Tick(connectionsLoopInterval)
c.waitTillNextIteration()
}
}
@@ -117,3 +128,12 @@ func (c *ConnectionManager) ConnectionCount() int {
func (c *ConnectionManager) Ban(netConnection *netadapter.NetConnection) {
c.netAdapter.Ban(netConnection)
}
func (c *ConnectionManager) waitTillNextIteration() {
select {
case <-c.resetLoopChan:
c.loopTicker.Stop()
c.loopTicker = time.NewTicker(connectionsLoopInterval)
case <-c.loopTicker.C:
}
}

View File

@@ -101,7 +101,7 @@ func (na *NetAdapter) ConnectionCount() int {
}
func (na *NetAdapter) onConnectedHandler(connection server.Connection) error {
netConnection := newNetConnection(connection, nil)
netConnection := newNetConnection(connection)
router, err := na.routerInitializer(netConnection)
if err != nil {
return err
@@ -144,7 +144,7 @@ func (na *NetAdapter) Broadcast(netConnections []*NetConnection, message wire.Me
defer na.RUnlock()
for _, netConnection := range netConnections {
router := na.connectionsToRouters[netConnection]
err := router.EnqueueIncomingMessage(message)
err := router.OutgoingRoute().Enqueue(message)
if err != nil {
if errors.Is(err, routerpkg.ErrRouteClosed) {
log.Debugf("Cannot enqueue message to %s: router is closed", netConnection)

View File

@@ -13,10 +13,9 @@ type NetConnection struct {
id *id.ID
}
func newNetConnection(connection server.Connection, id *id.ID) *NetConnection {
func newNetConnection(connection server.Connection) *NetConnection {
return &NetConnection{
connection: connection,
id: id,
}
}
@@ -29,6 +28,11 @@ func (c *NetConnection) ID() *id.ID {
return c.id
}
// SetID sets the ID associated with this connection
func (c *NetConnection) SetID(peerID *id.ID) {
c.id = peerID
}
// Address returns the address associated with this connection
func (c *NetConnection) Address() string {
return c.connection.Address().String()

View File

@@ -111,7 +111,7 @@ func (flow *handleRelayInvsFlow) requestBlocks(requestQueue *hashesQueueSet) err
var filteredHashesToRequest []*daghash.Hash
for _, hash := range hashesToRequest {
exists := flow.SharedRequestedBlocks().addIfNotExists(hash)
if !exists {
if exists {
continue
}

View File

@@ -8,7 +8,7 @@ type hashesQueueSet struct {
}
func (r *hashesQueueSet) enqueueIfNotExists(hash *daghash.Hash) {
if _, ok := r.set[*hash]; !ok {
if _, ok := r.set[*hash]; ok {
return
}
r.queue = append(r.queue, hash)

View File

@@ -1,13 +1,14 @@
package handshake
import (
"sync"
"sync/atomic"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/common"
"sync"
"sync/atomic"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"

View File

@@ -90,5 +90,8 @@ func (flow *receiveVersionFlow) start() (*wire.NetAddress, error) {
if err != nil {
return nil, err
}
flow.peer.Connection().SetID(msgVersion.ID)
return msgVersion.Address, nil
}

View File

@@ -182,6 +182,12 @@ func MakeEmptyMessage(command MessageCommand) (Message, error) {
case CmdIBDBlock:
msg = &MsgIBDBlock{}
case CmdInvRelayBlock:
msg = &MsgInvRelayBlock{}
case CmdGetRelayBlocks:
msg = &MsgGetRelayBlocks{}
default:
return nil, errors.Errorf("unhandled command [%s]", command)
}

View File

@@ -1,8 +1,9 @@
package wire
import (
"github.com/kaspanet/kaspad/util/daghash"
"io"
"github.com/kaspanet/kaspad/util/daghash"
)
// MsgGetRelayBlocksHashes is the maximum number of hashes that can
@@ -19,13 +20,38 @@ type MsgGetRelayBlocks struct {
// KaspaDecode decodes r using the kaspa protocol encoding into the receiver.
// This is part of the Message interface implementation.
func (msg *MsgGetRelayBlocks) KaspaDecode(r io.Reader, pver uint32) error {
return ReadElement(r, &msg.Hashes)
numHashes, err := ReadVarInt(r)
if err != nil {
return err
}
msg.Hashes = make([]*daghash.Hash, numHashes)
for i := uint64(0); i < numHashes; i++ {
msg.Hashes[i] = &daghash.Hash{}
err := ReadElement(r, msg.Hashes[i])
if err != nil {
return err
}
}
return nil
}
// KaspaEncode encodes the receiver to w using the kaspa protocol encoding.
// This is part of the Message interface implementation.
func (msg *MsgGetRelayBlocks) KaspaEncode(w io.Writer, pver uint32) error {
return WriteElement(w, msg.Hashes)
err := WriteVarInt(w, uint64(len(msg.Hashes)))
if err != nil {
return err
}
for _, hash := range msg.Hashes {
err := WriteElement(w, hash)
if err != nil {
return err
}
}
return nil
}
// Command returns the protocol command string for the message. This is part

View File

@@ -1,8 +1,9 @@
package wire
import (
"github.com/kaspanet/kaspad/util/daghash"
"io"
"github.com/kaspanet/kaspad/util/daghash"
)
// MsgInvRelayBlock implements the Message interface and represents a kaspa
@@ -15,7 +16,8 @@ type MsgInvRelayBlock struct {
// KaspaDecode decodes r using the kaspa protocol encoding into the receiver.
// This is part of the Message interface implementation.
func (msg *MsgInvRelayBlock) KaspaDecode(r io.Reader, pver uint32) error {
return ReadElement(r, &msg.Hash)
msg.Hash = &daghash.Hash{}
return ReadElement(r, msg.Hash)
}
// KaspaEncode encodes the receiver to w using the kaspa protocol encoding.