Compare commits

...

4 Commits

Author SHA1 Message Date
Ori Newman
53706c2f9f bignet debug 2020-09-02 15:43:00 +03:00
Ori Newman
917fa11706 bignet debug 2020-09-02 14:53:38 +03:00
Ori Newman
a32a9011c7 [NOD-1305] Close client connection on disconnect (#909) 2020-08-31 15:57:11 +03:00
stasatdaglabs
5da957f16e Update to version 0.6.8 2020-08-30 11:31:43 +03:00
14 changed files with 62 additions and 22 deletions

View File

@@ -22,7 +22,7 @@ func (*FlowContext) HandleError(err error, flowName string, isStopping *uint32,
panic(err) panic(err)
} }
log.Errorf("error from %s: %+v", flowName, err) log.Errorf("error from %s: %s", flowName, err)
} }
if atomic.AddUint32(isStopping, 1) == 1 { if atomic.AddUint32(isStopping, 1) == 1 {

View File

@@ -26,6 +26,7 @@ func (f *FlowContext) AddTransaction(tx *util.Tx) error {
f.transactionsToRebroadcast[*tx.ID()] = tx f.transactionsToRebroadcast[*tx.ID()] = tx
inv := appmessage.NewMsgInvTransaction([]*daghash.TxID{tx.ID()}) inv := appmessage.NewMsgInvTransaction([]*daghash.TxID{tx.ID()})
log.Criticalf("~~~~~ FlowContext.AddTransaction() broadcasting %s", tx.ID())
return f.Broadcast(inv) return f.Broadcast(inv)
} }

View File

@@ -1,10 +1,10 @@
package ping package ping
import ( import (
"github.com/kaspanet/kaspad/app/protocol/common"
"time" "time"
"github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors" "github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"

View File

@@ -48,6 +48,10 @@ func (flow *handleRelayedTransactionsFlow) start() error {
return err return err
} }
for _, txID := range inv.TxIDs {
log.Criticalf("~~~~~ handleRelayedTransactionsFlow.start() got %s", txID)
}
requestedIDs, err := flow.requestInvTransactions(inv) requestedIDs, err := flow.requestInvTransactions(inv)
if err != nil { if err != nil {
return err return err
@@ -137,6 +141,7 @@ func (flow *handleRelayedTransactionsFlow) readInv() (*appmessage.MsgInvTransact
func (flow *handleRelayedTransactionsFlow) broadcastAcceptedTransactions(acceptedTxs []*mempool.TxDesc) error { func (flow *handleRelayedTransactionsFlow) broadcastAcceptedTransactions(acceptedTxs []*mempool.TxDesc) error {
idsToBroadcast := make([]*daghash.TxID, len(acceptedTxs)) idsToBroadcast := make([]*daghash.TxID, len(acceptedTxs))
for i, tx := range acceptedTxs { for i, tx := range acceptedTxs {
log.Criticalf("~~~~~ broadcastAcceptedTransactions() broadcasting %s", tx.Tx.ID())
idsToBroadcast[i] = tx.Tx.ID() idsToBroadcast[i] = tx.Tx.ID()
} }
inv := appmessage.NewMsgInvTransaction(idsToBroadcast) inv := appmessage.NewMsgInvTransaction(idsToBroadcast)
@@ -187,6 +192,7 @@ func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransact
continue continue
} }
tx := util.NewTx(msgTx) tx := util.NewTx(msgTx)
log.Criticalf("~~~~~ receiveTransactions() got %s", tx.ID())
if !tx.ID().IsEqual(expectedID) { if !tx.ID().IsEqual(expectedID) {
return protocolerrors.Errorf(true, "expected transaction %s, but got %s", return protocolerrors.Errorf(true, "expected transaction %s, but got %s",
expectedID, tx.ID()) expectedID, tx.ID())

View File

@@ -30,6 +30,7 @@ func (flow *handleRequestedTransactionsFlow) start() error {
} }
for _, transactionID := range msgRequestTransactions.IDs { for _, transactionID := range msgRequestTransactions.IDs {
log.Criticalf("~~~~~ handleRequestedTransactionsFlow.start() tx %s was requested", transactionID)
tx, ok := flow.TxPool().FetchTransaction(transactionID) tx, ok := flow.TxPool().FetchTransaction(transactionID)
if !ok { if !ok {

View File

@@ -0,0 +1,9 @@
package relaytransactions
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.PROT)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@@ -33,6 +33,10 @@ func (dag *BlockDAG) processBlockNoLock(block *util.Block, flags BehaviorFlags)
blockHash := block.Hash() blockHash := block.Hash()
log.Tracef("Processing block %s", blockHash) log.Tracef("Processing block %s", blockHash)
for _, tx := range block.Transactions() {
log.Criticalf("~~~~~ processBlockNoLock block %s tx %s", block.Hash(), tx.ID())
}
err = dag.checkDuplicateBlock(blockHash, flags) err = dag.checkDuplicateBlock(blockHash, flags)
if err != nil { if err != nil {
return false, false, err return false, false, err

View File

@@ -7,6 +7,7 @@ package mempool
import ( import (
"container/list" "container/list"
"fmt" "fmt"
"runtime/debug"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -498,6 +499,7 @@ func (mp *TxPool) removeTransactionWithDiff(tx *util.Tx, diff *blockdag.UTXODiff
txDesc, _ := mp.fetchTxDesc(txID) txDesc, _ := mp.fetchTxDesc(txID)
if txDesc.depCount == 0 { if txDesc.depCount == 0 {
log.Criticalf("~~~~~ removeTransactionWithDiff delete %s stack %s", txID, debug.Stack())
delete(mp.pool, *txID) delete(mp.pool, *txID)
} else { } else {
delete(mp.depends, *txID) delete(mp.depends, *txID)
@@ -571,6 +573,7 @@ func (mp *TxPool) processRemovedTransactionDependencies(tx *util.Tx) {
if _, ok := mp.depends[*txD.Tx.ID()]; ok { if _, ok := mp.depends[*txD.Tx.ID()]; ok {
delete(mp.depends, *txD.Tx.ID()) delete(mp.depends, *txD.Tx.ID())
mp.pool[*txD.Tx.ID()] = txD mp.pool[*txD.Tx.ID()] = txD
log.Criticalf("~~~~~ processRemovedTransactionDependencies adds %s stack %s", txD.Tx.ID(), debug.Stack())
} }
} }
} }
@@ -652,6 +655,7 @@ func (mp *TxPool) addTransaction(tx *util.Tx, fee uint64, parentsInPool []*appme
} }
if len(parentsInPool) == 0 { if len(parentsInPool) == 0 {
log.Criticalf("~~~~~ addTransaction adds %s stack %s", tx.ID(), debug.Stack())
mp.pool[*tx.ID()] = txD mp.pool[*tx.ID()] = txD
} else { } else {
mp.depends[*tx.ID()] = txD mp.depends[*tx.ID()] = txD

View File

@@ -13,11 +13,11 @@ import (
) )
type gRPCConnection struct { type gRPCConnection struct {
server *gRPCServer server *gRPCServer
address *net.TCPAddr address *net.TCPAddr
isOutbound bool stream grpcStream
stream grpcStream router *router.Router
router *router.Router lowLevelClientConnection *grpc.ClientConn
// streamLock protects concurrent access to stream. // streamLock protects concurrent access to stream.
// Note that it's an RWMutex. Despite what the name // Note that it's an RWMutex. Despite what the name
@@ -34,14 +34,16 @@ type gRPCConnection struct {
isConnected uint32 isConnected uint32
} }
func newConnection(server *gRPCServer, address *net.TCPAddr, isOutbound bool, stream grpcStream) *gRPCConnection { func newConnection(server *gRPCServer, address *net.TCPAddr, stream grpcStream,
lowLevelClientConnection *grpc.ClientConn) *gRPCConnection {
connection := &gRPCConnection{ connection := &gRPCConnection{
server: server, server: server,
address: address, address: address,
isOutbound: isOutbound, stream: stream,
stream: stream, stopChan: make(chan struct{}),
stopChan: make(chan struct{}), isConnected: 1,
isConnected: 1, lowLevelClientConnection: lowLevelClientConnection,
} }
return connection return connection
@@ -83,7 +85,7 @@ func (c *gRPCConnection) SetOnInvalidMessageHandler(onInvalidMessageHandler serv
} }
func (c *gRPCConnection) IsOutbound() bool { func (c *gRPCConnection) IsOutbound() bool {
return c.isOutbound return c.lowLevelClientConnection != nil
} }
// Disconnect disconnects the connection // Disconnect disconnects the connection
@@ -98,7 +100,7 @@ func (c *gRPCConnection) Disconnect() {
close(c.stopChan) close(c.stopChan)
if c.isOutbound { if c.IsOutbound() {
c.closeSend() c.closeSend()
log.Debugf("Disconnected from %s", c) log.Debugf("Disconnected from %s", c)
} }
@@ -138,5 +140,8 @@ func (c *gRPCConnection) closeSend() {
defer c.streamLock.Unlock() defer c.streamLock.Unlock()
clientStream := c.stream.(protowire.P2P_MessageStreamClient) clientStream := c.stream.(protowire.P2P_MessageStreamClient)
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
// ignore error because we don't really know what's the status of the connection
_ = clientStream.CloseSend()
_ = c.lowLevelClientConnection.Close()
} }

View File

@@ -90,12 +90,12 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) {
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel() defer cancel()
gRPCConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) gRPCClientConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error connecting to %s", address) return nil, errors.Wrapf(err, "error connecting to %s", address)
} }
client := protowire.NewP2PClient(gRPCConnection) client := protowire.NewP2PClient(gRPCClientConnection)
stream, err := client.MessageStream(context.Background(), grpc.UseCompressor(gzip.Name)) stream, err := client.MessageStream(context.Background(), grpc.UseCompressor(gzip.Name))
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "error getting client stream for %s", address) return nil, errors.Wrapf(err, "error getting client stream for %s", address)
@@ -110,7 +110,7 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) {
return nil, errors.Errorf("non-tcp addresses are not supported") return nil, errors.Errorf("non-tcp addresses are not supported")
} }
connection := newConnection(s, tcpAddress, true, stream) connection := newConnection(s, tcpAddress, stream, gRPCClientConnection)
err = s.onConnectedHandler(connection) err = s.onConnectedHandler(connection)
if err != nil { if err != nil {

View File

@@ -29,7 +29,7 @@ func (p *p2pServer) MessageStream(stream protowire.P2P_MessageStreamServer) erro
return errors.Errorf("non-tcp connections are not supported") return errors.Errorf("non-tcp connections are not supported")
} }
connection := newConnection(p.server, tcpAddress, false, stream) connection := newConnection(p.server, tcpAddress, stream, nil)
err := p.server.onConnectedHandler(connection) err := p.server.onConnectedHandler(connection)
if err != nil { if err != nil {

View File

@@ -29,6 +29,7 @@ func handleSendRawTransaction(s *Server, cmd interface{}, closeChan <-chan struc
} }
tx := util.NewTx(&msgTx) tx := util.NewTx(&msgTx)
log.Criticalf("~~~~~ handleSendRawTransaction got %s", tx.ID())
err = s.protocolManager.AddTransaction(tx) err = s.protocolManager.AddTransaction(tx)
if err != nil { if err != nil {
if !errors.As(err, &mempool.RuleError{}) { if !errors.As(err, &mempool.RuleError{}) {

9
util/bigintpool/log.go Normal file
View File

@@ -0,0 +1,9 @@
package bigintpool
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.UTIL)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const ( const (
appMajor uint = 0 appMajor uint = 0
appMinor uint = 6 appMinor uint = 6
appPatch uint = 7 appPatch uint = 8
) )
// appBuild is defined as a variable so it can be overridden during the build // appBuild is defined as a variable so it can be overridden during the build