Compare commits

..

1 Commits

Author SHA1 Message Date
stasatdaglabs
8534d7f317 Empty commit. 2020-08-19 15:43:01 +03:00
12 changed files with 26 additions and 55 deletions

View File

@@ -74,4 +74,3 @@ The documentation is a work-in-progress. It is located in the [docs](https://git
## License
Kaspad is licensed under the copyfree [ISC License](https://choosealicense.com/licenses/isc/).

View File

@@ -2,7 +2,6 @@ package ibd
import (
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/blocklogger"
"github.com/kaspanet/kaspad/app/protocol/common"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
@@ -174,6 +173,7 @@ func (flow *handleIBDFlow) receiveIBDBlock() (msgIBDBlock *appmessage.MsgIBDBloc
}
func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *appmessage.MsgIBDBlock) error {
block := util.NewBlock(msgIBDBlock.MsgBlock)
if flow.DAG().IsInDAG(block.Hash()) {
return nil
@@ -194,9 +194,5 @@ func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *appmessage.MsgIBDBlock)
if err != nil {
return err
}
err = blocklogger.LogBlock(block)
if err != nil {
return err
}
return nil
}

View File

@@ -63,7 +63,6 @@ func (c *ConnectionManager) checkRequestedConnections(connSet connectionSet) {
}
// try to initiate connection
log.Debugf("Connecting to connection request %s", connReq.address)
err := c.initiateConnection(connReq.address)
if err != nil {
log.Infof("Couldn't connect to %s: %s", address, err)

View File

@@ -143,19 +143,3 @@ func (c *ConnectionManager) waitTillNextIteration() {
case <-c.loopTicker.C:
}
}
func (c *ConnectionManager) connectionExists(addressString string) bool {
if _, ok := c.activeRequested[addressString]; ok {
return true
}
if _, ok := c.activeOutgoing[addressString]; ok {
return true
}
if _, ok := c.activeIncoming[addressString]; ok {
return true
}
return false
}

View File

@@ -8,12 +8,8 @@ func (c *ConnectionManager) checkIncomingConnections(incomingConnectionSet conne
}
numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming
log.Debugf("Got %d incoming connections while only %d are allowed. Disconnecting "+
"%d", len(incomingConnectionSet), c.maxIncoming, numConnectionsOverMax)
// randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming
for _, connection := range incomingConnectionSet {
log.Debugf("Disconnecting %s due to exceeding incoming connections", connection)
connection.Disconnect()
numConnectionsOverMax--

View File

@@ -39,12 +39,6 @@ func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) {
netAddress := address.NetAddress()
tcpAddress := netAddress.TCPAddress()
addressString := tcpAddress.String()
if c.connectionExists(addressString) {
log.Debugf("Fetched address %s from address manager but it's already connected. Skipping...", addressString)
continue
}
isBanned, err := c.addressManager.IsBanned(netAddress)
if err != nil {
log.Infof("Couldn't resolve whether %s is banned: %s", addressString, err)
@@ -55,8 +49,6 @@ func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) {
}
c.addressManager.Attempt(netAddress)
log.Debugf("Connecting to %s because we have %d outgoing connections and the target is "+
"%d", addressString, len(c.activeOutgoing), c.targetOutgoing)
err = c.initiateConnection(addressString)
if err != nil {
log.Infof("Couldn't connect to %s: %s", addressString, err)

View File

@@ -5,7 +5,6 @@ import (
"github.com/kaspanet/kaspad/app/appmessage"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/pkg/errors"
"sync/atomic"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
@@ -18,7 +17,7 @@ type NetConnection struct {
router *routerpkg.Router
invalidMessageChan chan error
onDisconnectedHandler server.OnDisconnectedHandler
isRouterClosed uint32
isConnected uint32
}
func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection {
@@ -31,12 +30,7 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
}
netConnection.connection.SetOnDisconnectedHandler(func() {
// If the disconnection came because of a network error and not because of the application layer, we
// need to close the router as well.
if atomic.AddUint32(&netConnection.isRouterClosed, 1) == 1 {
netConnection.router.Close()
}
router.Close()
close(netConnection.invalidMessageChan)
netConnection.onDisconnectedHandler()
})
@@ -97,9 +91,7 @@ func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.On
// Disconnect disconnects the given connection
func (c *NetConnection) Disconnect() {
if atomic.AddUint32(&c.isRouterClosed, 1) == 1 {
c.router.Close()
}
c.connection.Disconnect()
}
// DequeueInvalidMessage dequeues the next invalid message

View File

@@ -100,3 +100,8 @@ func (r *Route) Close() {
r.closed = true
close(r.channel)
}
// IsEmpty returns true if the route doesn't have any pending messages
func (r *Route) IsEmpty() bool {
return len(r.channel) == 0
}

View File

@@ -33,7 +33,8 @@ func (c *gRPCConnection) connectionLoops() error {
func (c *gRPCConnection) sendLoop() error {
outgoingRoute := c.router.OutgoingRoute()
for c.IsConnected() {
// Once the connection is closed, the send loop empties the remaining messages in the channel.
for c.IsConnected() || !outgoingRoute.IsEmpty() {
message, err := outgoingRoute.Dequeue()
if err != nil {
if errors.Is(err, routerpkg.ErrRouteClosed) {
@@ -56,6 +57,7 @@ func (c *gRPCConnection) sendLoop() error {
if err != nil {
return err
}
}
return nil
}

View File

@@ -1,11 +1,13 @@
package grpcserver
import (
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
"github.com/pkg/errors"
"net"
"sync/atomic"
"time"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
"google.golang.org/grpc"
@@ -53,7 +55,7 @@ func (c *gRPCConnection) Start(router *router.Router) {
spawn("gRPCConnection.Start-connectionLoops", func() {
err := c.connectionLoops()
if err != nil {
log.Errorf("error from connectionLoops for %s: %s", c.address, err)
log.Errorf("error from connectionLoops for %s: %+v", c.address, err)
}
})
}
@@ -91,9 +93,14 @@ func (c *gRPCConnection) Disconnect() {
close(c.stopChan)
if c.isOutbound {
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
log.Debugf("Disconnected from %s", c)
spawn("gRPCConnection.Disconnect-clientStream.CloseSend", func() {
// Wait a second before closing the stream, to let the send queue to get emptied.
const finishSendDuration = time.Second
time.Sleep(finishSendDuration)
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
log.Debugf("Disconnected from %s", c)
})
}
log.Debugf("Disconnecting from %s", c)

View File

@@ -60,7 +60,6 @@ func TestTxRelay(t *testing.T) {
t.Fatalf("Error getting mempool entry: %+v", err)
}
close(txAddedToMempoolChan)
return
}
})

View File

@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const (
appMajor uint = 0
appMinor uint = 6
appPatch uint = 5
appPatch uint = 4
)
// appBuild is defined as a variable so it can be overridden during the build