Compare commits

...

5 Commits

Author SHA1 Message Date
Ori Newman
4dbd64478c [NOD-1294] In TestTxRelay return after tx is found in the mempool (#885) 2020-08-20 19:05:53 +03:00
stasatdaglabs
7756baf9a9 [NOD-1290] Add blocklogger.LogBlock to IBD. (#884) 2020-08-20 12:29:11 +03:00
Ori Newman
c331293a2e [NOD-1289] Check if connection exists before establishing another one with the same address (#883) 2020-08-20 11:50:29 +03:00
Ori Newman
fcae491e6d [NOD-1286] Close router from netConnection.Disconnect (#881)
* [NOD-1286] Close router from netConnection.Disconnect

* [NOD-1286] Close router in grpc errors as well

* [NOD-1286] Fix typo

* [NOD-1286] Rename isConnected->isRouterClosed
2020-08-19 17:28:01 +03:00
stasatdaglabs
5a4cafe342 Update to version 0.6.5 2020-08-19 15:00:12 +03:00
11 changed files with 54 additions and 26 deletions

View File

@@ -2,6 +2,7 @@ package ibd
import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/blocklogger"
"github.com/kaspanet/kaspad/app/protocol/common"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
@@ -173,7 +174,6 @@ func (flow *handleIBDFlow) receiveIBDBlock() (msgIBDBlock *appmessage.MsgIBDBloc
}
func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *appmessage.MsgIBDBlock) error {
block := util.NewBlock(msgIBDBlock.MsgBlock)
if flow.DAG().IsInDAG(block.Hash()) {
return nil
@@ -194,5 +194,9 @@ func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *appmessage.MsgIBDBlock)
if err != nil {
return err
}
err = blocklogger.LogBlock(block)
if err != nil {
return err
}
return nil
}

View File

@@ -63,6 +63,7 @@ func (c *ConnectionManager) checkRequestedConnections(connSet connectionSet) {
}
// try to initiate connection
log.Debugf("Connecting to connection request %s", connReq.address)
err := c.initiateConnection(connReq.address)
if err != nil {
log.Infof("Couldn't connect to %s: %s", address, err)

View File

@@ -143,3 +143,19 @@ func (c *ConnectionManager) waitTillNextIteration() {
case <-c.loopTicker.C:
}
}
func (c *ConnectionManager) connectionExists(addressString string) bool {
if _, ok := c.activeRequested[addressString]; ok {
return true
}
if _, ok := c.activeOutgoing[addressString]; ok {
return true
}
if _, ok := c.activeIncoming[addressString]; ok {
return true
}
return false
}

View File

@@ -8,8 +8,12 @@ func (c *ConnectionManager) checkIncomingConnections(incomingConnectionSet conne
}
numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming
log.Debugf("Got %d incoming connections while only %d are allowed. Disconnecting "+
"%d", len(incomingConnectionSet), c.maxIncoming, numConnectionsOverMax)
// randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming
for _, connection := range incomingConnectionSet {
log.Debugf("Disconnecting %s due to exceeding incoming connections", connection)
connection.Disconnect()
numConnectionsOverMax--

View File

@@ -39,6 +39,12 @@ func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) {
netAddress := address.NetAddress()
tcpAddress := netAddress.TCPAddress()
addressString := tcpAddress.String()
if c.connectionExists(addressString) {
log.Debugf("Fetched address %s from address manager but it's already connected. Skipping...", addressString)
continue
}
isBanned, err := c.addressManager.IsBanned(netAddress)
if err != nil {
log.Infof("Couldn't resolve whether %s is banned: %s", addressString, err)
@@ -49,6 +55,8 @@ func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) {
}
c.addressManager.Attempt(netAddress)
log.Debugf("Connecting to %s because we have %d outgoing connections and the target is "+
"%d", addressString, len(c.activeOutgoing), c.targetOutgoing)
err = c.initiateConnection(addressString)
if err != nil {
log.Infof("Couldn't connect to %s: %s", addressString, err)

View File

@@ -5,6 +5,7 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"sync/atomic"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
@@ -17,7 +18,7 @@ type NetConnection struct {
router *routerpkg.Router
invalidMessageChan chan error
onDisconnectedHandler server.OnDisconnectedHandler
isConnected uint32
isRouterClosed uint32
}
func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection {
@@ -30,7 +31,12 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
}
netConnection.connection.SetOnDisconnectedHandler(func() {
router.Close()
// If the disconnection came because of a network error and not because of the application layer, we
// need to close the router as well.
if atomic.AddUint32(&netConnection.isRouterClosed, 1) == 1 {
netConnection.router.Close()
}
close(netConnection.invalidMessageChan)
netConnection.onDisconnectedHandler()
})
@@ -91,7 +97,9 @@ func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.On
// Disconnect disconnects the given connection
func (c *NetConnection) Disconnect() {
c.connection.Disconnect()
if atomic.AddUint32(&c.isRouterClosed, 1) == 1 {
c.router.Close()
}
}
// DequeueInvalidMessage dequeues the next invalid message

View File

@@ -100,8 +100,3 @@ func (r *Route) Close() {
r.closed = true
close(r.channel)
}
// IsEmpty returns true if the route doesn't have any pending messages
func (r *Route) IsEmpty() bool {
return len(r.channel) == 0
}

View File

@@ -33,8 +33,7 @@ func (c *gRPCConnection) connectionLoops() error {
func (c *gRPCConnection) sendLoop() error {
outgoingRoute := c.router.OutgoingRoute()
// Once the connection is closed, the send loop empties the remaining messages in the channel.
for c.IsConnected() || !outgoingRoute.IsEmpty() {
for c.IsConnected() {
message, err := outgoingRoute.Dequeue()
if err != nil {
if errors.Is(err, routerpkg.ErrRouteClosed) {
@@ -57,7 +56,6 @@ func (c *gRPCConnection) sendLoop() error {
if err != nil {
return err
}
}
return nil
}

View File

@@ -1,13 +1,11 @@
package grpcserver
import (
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
"github.com/pkg/errors"
"net"
"sync/atomic"
"time"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
"google.golang.org/grpc"
@@ -55,7 +53,7 @@ func (c *gRPCConnection) Start(router *router.Router) {
spawn("gRPCConnection.Start-connectionLoops", func() {
err := c.connectionLoops()
if err != nil {
log.Errorf("error from connectionLoops for %s: %+v", c.address, err)
log.Errorf("error from connectionLoops for %s: %s", c.address, err)
}
})
}
@@ -93,14 +91,9 @@ func (c *gRPCConnection) Disconnect() {
close(c.stopChan)
if c.isOutbound {
spawn("gRPCConnection.Disconnect-clientStream.CloseSend", func() {
// Wait a second before closing the stream, to let the send queue to get emptied.
const finishSendDuration = time.Second
time.Sleep(finishSendDuration)
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
log.Debugf("Disconnected from %s", c)
})
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
log.Debugf("Disconnected from %s", c)
}
log.Debugf("Disconnecting from %s", c)

View File

@@ -60,6 +60,7 @@ func TestTxRelay(t *testing.T) {
t.Fatalf("Error getting mempool entry: %+v", err)
}
close(txAddedToMempoolChan)
return
}
})

View File

@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const (
appMajor uint = 0
appMinor uint = 6
appPatch uint = 4
appPatch uint = 5
)
// appBuild is defined as a variable so it can be overridden during the build