mirror of
https://github.com/kaspanet/kaspad.git
synced 2026-02-21 03:03:08 +00:00
Compare commits
5 Commits
v0.6.4-dev
...
v0.6.5-rc1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4dbd64478c | ||
|
|
7756baf9a9 | ||
|
|
c331293a2e | ||
|
|
fcae491e6d | ||
|
|
5a4cafe342 |
@@ -2,6 +2,7 @@ package ibd
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/app/appmessage"
|
||||
"github.com/kaspanet/kaspad/app/protocol/blocklogger"
|
||||
"github.com/kaspanet/kaspad/app/protocol/common"
|
||||
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
|
||||
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
|
||||
@@ -173,7 +174,6 @@ func (flow *handleIBDFlow) receiveIBDBlock() (msgIBDBlock *appmessage.MsgIBDBloc
|
||||
}
|
||||
|
||||
func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *appmessage.MsgIBDBlock) error {
|
||||
|
||||
block := util.NewBlock(msgIBDBlock.MsgBlock)
|
||||
if flow.DAG().IsInDAG(block.Hash()) {
|
||||
return nil
|
||||
@@ -194,5 +194,9 @@ func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *appmessage.MsgIBDBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = blocklogger.LogBlock(block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ func (c *ConnectionManager) checkRequestedConnections(connSet connectionSet) {
|
||||
}
|
||||
|
||||
// try to initiate connection
|
||||
log.Debugf("Connecting to connection request %s", connReq.address)
|
||||
err := c.initiateConnection(connReq.address)
|
||||
if err != nil {
|
||||
log.Infof("Couldn't connect to %s: %s", address, err)
|
||||
|
||||
@@ -143,3 +143,19 @@ func (c *ConnectionManager) waitTillNextIteration() {
|
||||
case <-c.loopTicker.C:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConnectionManager) connectionExists(addressString string) bool {
|
||||
if _, ok := c.activeRequested[addressString]; ok {
|
||||
return true
|
||||
}
|
||||
|
||||
if _, ok := c.activeOutgoing[addressString]; ok {
|
||||
return true
|
||||
}
|
||||
|
||||
if _, ok := c.activeIncoming[addressString]; ok {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -8,8 +8,12 @@ func (c *ConnectionManager) checkIncomingConnections(incomingConnectionSet conne
|
||||
}
|
||||
|
||||
numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming
|
||||
log.Debugf("Got %d incoming connections while only %d are allowed. Disconnecting "+
|
||||
"%d", len(incomingConnectionSet), c.maxIncoming, numConnectionsOverMax)
|
||||
|
||||
// randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming
|
||||
for _, connection := range incomingConnectionSet {
|
||||
log.Debugf("Disconnecting %s due to exceeding incoming connections", connection)
|
||||
connection.Disconnect()
|
||||
|
||||
numConnectionsOverMax--
|
||||
|
||||
@@ -39,6 +39,12 @@ func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) {
|
||||
netAddress := address.NetAddress()
|
||||
tcpAddress := netAddress.TCPAddress()
|
||||
addressString := tcpAddress.String()
|
||||
|
||||
if c.connectionExists(addressString) {
|
||||
log.Debugf("Fetched address %s from address manager but it's already connected. Skipping...", addressString)
|
||||
continue
|
||||
}
|
||||
|
||||
isBanned, err := c.addressManager.IsBanned(netAddress)
|
||||
if err != nil {
|
||||
log.Infof("Couldn't resolve whether %s is banned: %s", addressString, err)
|
||||
@@ -49,6 +55,8 @@ func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) {
|
||||
}
|
||||
|
||||
c.addressManager.Attempt(netAddress)
|
||||
log.Debugf("Connecting to %s because we have %d outgoing connections and the target is "+
|
||||
"%d", addressString, len(c.activeOutgoing), c.targetOutgoing)
|
||||
err = c.initiateConnection(addressString)
|
||||
if err != nil {
|
||||
log.Infof("Couldn't connect to %s: %s", addressString, err)
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/kaspanet/kaspad/app/appmessage"
|
||||
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
||||
"github.com/pkg/errors"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
|
||||
@@ -17,7 +18,7 @@ type NetConnection struct {
|
||||
router *routerpkg.Router
|
||||
invalidMessageChan chan error
|
||||
onDisconnectedHandler server.OnDisconnectedHandler
|
||||
isConnected uint32
|
||||
isRouterClosed uint32
|
||||
}
|
||||
|
||||
func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection {
|
||||
@@ -30,7 +31,12 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
|
||||
}
|
||||
|
||||
netConnection.connection.SetOnDisconnectedHandler(func() {
|
||||
router.Close()
|
||||
// If the disconnection came because of a network error and not because of the application layer, we
|
||||
// need to close the router as well.
|
||||
if atomic.AddUint32(&netConnection.isRouterClosed, 1) == 1 {
|
||||
netConnection.router.Close()
|
||||
}
|
||||
|
||||
close(netConnection.invalidMessageChan)
|
||||
netConnection.onDisconnectedHandler()
|
||||
})
|
||||
@@ -91,7 +97,9 @@ func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.On
|
||||
|
||||
// Disconnect disconnects the given connection
|
||||
func (c *NetConnection) Disconnect() {
|
||||
c.connection.Disconnect()
|
||||
if atomic.AddUint32(&c.isRouterClosed, 1) == 1 {
|
||||
c.router.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// DequeueInvalidMessage dequeues the next invalid message
|
||||
|
||||
@@ -100,8 +100,3 @@ func (r *Route) Close() {
|
||||
r.closed = true
|
||||
close(r.channel)
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the route doesn't have any pending messages
|
||||
func (r *Route) IsEmpty() bool {
|
||||
return len(r.channel) == 0
|
||||
}
|
||||
|
||||
@@ -33,8 +33,7 @@ func (c *gRPCConnection) connectionLoops() error {
|
||||
|
||||
func (c *gRPCConnection) sendLoop() error {
|
||||
outgoingRoute := c.router.OutgoingRoute()
|
||||
// Once the connection is closed, the send loop empties the remaining messages in the channel.
|
||||
for c.IsConnected() || !outgoingRoute.IsEmpty() {
|
||||
for c.IsConnected() {
|
||||
message, err := outgoingRoute.Dequeue()
|
||||
if err != nil {
|
||||
if errors.Is(err, routerpkg.ErrRouteClosed) {
|
||||
@@ -57,7 +56,6 @@ func (c *gRPCConnection) sendLoop() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
package grpcserver
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
|
||||
"github.com/pkg/errors"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
|
||||
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
|
||||
"google.golang.org/grpc"
|
||||
@@ -55,7 +53,7 @@ func (c *gRPCConnection) Start(router *router.Router) {
|
||||
spawn("gRPCConnection.Start-connectionLoops", func() {
|
||||
err := c.connectionLoops()
|
||||
if err != nil {
|
||||
log.Errorf("error from connectionLoops for %s: %+v", c.address, err)
|
||||
log.Errorf("error from connectionLoops for %s: %s", c.address, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -93,14 +91,9 @@ func (c *gRPCConnection) Disconnect() {
|
||||
close(c.stopChan)
|
||||
|
||||
if c.isOutbound {
|
||||
spawn("gRPCConnection.Disconnect-clientStream.CloseSend", func() {
|
||||
// Wait a second before closing the stream, to let the send queue to get emptied.
|
||||
const finishSendDuration = time.Second
|
||||
time.Sleep(finishSendDuration)
|
||||
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
|
||||
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
|
||||
log.Debugf("Disconnected from %s", c)
|
||||
})
|
||||
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
|
||||
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
|
||||
log.Debugf("Disconnected from %s", c)
|
||||
}
|
||||
|
||||
log.Debugf("Disconnecting from %s", c)
|
||||
|
||||
@@ -60,6 +60,7 @@ func TestTxRelay(t *testing.T) {
|
||||
t.Fatalf("Error getting mempool entry: %+v", err)
|
||||
}
|
||||
close(txAddedToMempoolChan)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
|
||||
const (
|
||||
appMajor uint = 0
|
||||
appMinor uint = 6
|
||||
appPatch uint = 4
|
||||
appPatch uint = 5
|
||||
)
|
||||
|
||||
// appBuild is defined as a variable so it can be overridden during the build
|
||||
|
||||
Reference in New Issue
Block a user