mirror of
https://github.com/kaspanet/kaspad.git
synced 2026-02-28 14:13:21 +00:00
* [NOD-1162] Separate kaspad to it's own package, so that I can use it out of integration test * [NOD-1162] Begin integration tests * [NOD-1162] [FIX] Assign cfg to RPCServer * [NOD-1162] Basic integration test ready * [NOD-1162] Wait for connection for real * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] Make connect something that can be invoked in middle of test * [NOD-1162] Complete first integration test * [NOD-1162] Undo refactor error * [NOD-1162] Rename Kaspad to App * [NOD-1162] Convert checking connection to polling * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] Add comment * [NOD-1162] Added support for 3 nodes and clients in integration tests * [NOD-1162] Add third node to integration test * [NOD-1192] Use lock-less functions in TxPool.HandleNewBlock * [NOD-1192] Broadcast transactions only if there's more then 0 * [NOD-1162] Removed double waitTillNextIteration * [NOD-1192] Rename: broadcastTransactions -> broadcastTransactionsAfterBlockAdded * [NOD-1162] Call NotifyBlocks on client3 as well * [NOD-1162] ErrTimeout and ErrRouteClosed should be ProtocolErrors * [NOD-1162] Added comment and removed redundant type PeerAddedCallback * [NOD-1162] Revert overly eager rename * [NOD-1162] Move DisalbeTLS to common config + minimize call for ioutil.TempDir() * [NOD-1162] Add some clarifications in code * [NOD-1193] Skip closed connections in NetAdapter.Broadcast * [NOD-1193] Make sure to protect connectionsToRouters from concurrent access * [NOD-1162] Add _test to all files in integration package * [NOD-1162] Introduced appHarness to better encapsulate a single node * [NOD-1162] Removed onChainChanged handler * [NOD-1162] Remove redundant closure * [NOD-1162] Correctly mark integration_test config as Simnet * [NOD-1162] Rename app.ID -> app.P2PNodeID * [NOD-1162] Move TestIntegrationBasicSync to basic_sync_test.go * [NOD-1210] Made it possible to setup any number of harnesses needed * [NOD-1210] Rename appHarness1/2 to incoming/outgoing in connect function * [NOD-1210] Add the 117-incoming-connections integration test * [NOD-1210] Delete 117-incoming-connections test because it opens too much files * [NOD-1210] Added function to notify of blocks conveniently * [NOD-1210] Added function to mine a block from-A-to-Z * [NOD-1210] Added IBD integration test * [NOD-1210] Finish test for IBD and fix bug where requestSelectedTipsIfRequired ran in handshake's goroutine * [NOD-1210] Set log level to debug * [NOD-1211] Add test for transaction relay * [NOD-1211] Compare fix incorrect comaprison in KaspadMessage_RequestTransactions.fromWireMessage * [NOD-1211] Return ok instead of err from FetchTxDesc and FetchTransaction * [NOD-1211] Added MsgTransactionNotFound type * [NOD-1211] Added HandlRequestedTransactions flow * [NOD-1211] Wait for blocks to be accepted before moving forward * [NOD-1211] Rename CmdNotFound to CmdTransactionNotFound * [NOD-1211] Rename: requestAndSolveTemplate -> mineNextBlock * [NOD-1211] Renamed incoming/outgoing to appHarness1/appHarness2 in isConnected * [NOD-1211] Move check of Hash == nil to outside wireHashToProto * [NOD-1211] Instantiate payloadHash before *x
287 lines
9.8 KiB
Go
287 lines
9.8 KiB
Go
package protocol
|
|
|
|
import (
|
|
"sync/atomic"
|
|
|
|
"github.com/kaspanet/kaspad/addressmanager"
|
|
"github.com/kaspanet/kaspad/netadapter"
|
|
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
|
|
"github.com/kaspanet/kaspad/protocol/flows/addressexchange"
|
|
"github.com/kaspanet/kaspad/protocol/flows/blockrelay"
|
|
"github.com/kaspanet/kaspad/protocol/flows/handshake"
|
|
"github.com/kaspanet/kaspad/protocol/flows/ibd"
|
|
"github.com/kaspanet/kaspad/protocol/flows/ibd/selectedtip"
|
|
"github.com/kaspanet/kaspad/protocol/flows/ping"
|
|
"github.com/kaspanet/kaspad/protocol/flows/relaytransactions"
|
|
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
|
|
"github.com/kaspanet/kaspad/protocol/protocolerrors"
|
|
"github.com/kaspanet/kaspad/wire"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
type flowInitializeFunc func(route *routerpkg.Route, peer *peerpkg.Peer) error
|
|
type flowExecuteFunc func(peer *peerpkg.Peer)
|
|
|
|
type flow struct {
|
|
name string
|
|
executeFunc flowExecuteFunc
|
|
}
|
|
|
|
func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *netadapter.NetConnection) {
|
|
// isStopping flag is raised the moment that the connection associated with this router is disconnected
|
|
// errChan is used by the flow goroutines to return to runFlows when an error occurs.
|
|
// They are both initialized here and passed to register flows.
|
|
isStopping := uint32(0)
|
|
errChan := make(chan error)
|
|
|
|
flows := m.registerFlows(router, errChan, &isStopping)
|
|
receiveVersionRoute, sendVersionRoute := registerHandshakeRoutes(router)
|
|
|
|
// After flows were registered - spawn a new thread that will wait for connection to finish initializing
|
|
// and start receiving messages
|
|
spawn("routerInitializer-runFlows", func() {
|
|
isBanned, err := m.context.ConnectionManager().IsBanned(netConnection)
|
|
if err != nil && !errors.Is(err, addressmanager.ErrAddressNotFound) {
|
|
panic(err)
|
|
}
|
|
if isBanned {
|
|
netConnection.Disconnect()
|
|
return
|
|
}
|
|
|
|
netConnection.SetOnInvalidMessageHandler(func(err error) {
|
|
if atomic.AddUint32(&isStopping, 1) == 1 {
|
|
errChan <- protocolerrors.Wrap(true, err, "received bad message")
|
|
}
|
|
})
|
|
|
|
peer, err := handshake.HandleHandshake(m.context, netConnection, receiveVersionRoute,
|
|
sendVersionRoute, router.OutgoingRoute())
|
|
if err != nil {
|
|
m.handleError(err, netConnection)
|
|
return
|
|
}
|
|
|
|
removeHandshakeRoutes(router)
|
|
|
|
err = m.runFlows(flows, peer, errChan)
|
|
if err != nil {
|
|
m.handleError(err, netConnection)
|
|
return
|
|
}
|
|
})
|
|
}
|
|
|
|
func (m *Manager) handleError(err error, netConnection *netadapter.NetConnection) {
|
|
if protocolErr := &(protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
|
|
if protocolErr.ShouldBan {
|
|
log.Warnf("Banning %s (reason: %s)", netConnection, protocolErr.Cause)
|
|
err := m.context.ConnectionManager().Ban(netConnection)
|
|
if err != nil && !errors.Is(err, addressmanager.ErrAddressNotFound) {
|
|
panic(err)
|
|
}
|
|
}
|
|
netConnection.Disconnect()
|
|
return
|
|
}
|
|
if errors.Is(err, routerpkg.ErrTimeout) {
|
|
log.Warnf("Got timeout from %s. Disconnecting...", netConnection)
|
|
netConnection.Disconnect()
|
|
return
|
|
}
|
|
if errors.Is(err, routerpkg.ErrRouteClosed) {
|
|
return
|
|
}
|
|
panic(err)
|
|
}
|
|
|
|
func (m *Manager) registerFlows(router *routerpkg.Router, errChan chan error, isStopping *uint32) (flows []*flow) {
|
|
flows = m.registerAddressFlows(router, isStopping, errChan)
|
|
flows = append(flows, m.registerBlockRelayFlows(router, isStopping, errChan)...)
|
|
flows = append(flows, m.registerPingFlows(router, isStopping, errChan)...)
|
|
flows = append(flows, m.registerIBDFlows(router, isStopping, errChan)...)
|
|
flows = append(flows, m.registerTransactionRelayFlow(router, isStopping, errChan)...)
|
|
|
|
return flows
|
|
}
|
|
|
|
func (m *Manager) registerAddressFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
|
outgoingRoute := router.OutgoingRoute()
|
|
|
|
return []*flow{
|
|
m.registerOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdRequestAddresses}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return addressexchange.SendAddresses(m.context, incomingRoute, outgoingRoute)
|
|
},
|
|
),
|
|
|
|
m.registerOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddresses}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return addressexchange.ReceiveAddresses(m.context, incomingRoute, outgoingRoute, peer)
|
|
},
|
|
),
|
|
}
|
|
}
|
|
|
|
func (m *Manager) registerBlockRelayFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
|
outgoingRoute := router.OutgoingRoute()
|
|
|
|
return []*flow{
|
|
m.registerFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return blockrelay.HandleRelayInvs(m.context, incomingRoute,
|
|
outgoingRoute, peer)
|
|
},
|
|
),
|
|
|
|
m.registerFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdRequestRelayBlocks}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return blockrelay.HandleRelayBlockRequests(m.context, incomingRoute, outgoingRoute, peer)
|
|
},
|
|
),
|
|
}
|
|
}
|
|
|
|
func (m *Manager) registerPingFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
|
outgoingRoute := router.OutgoingRoute()
|
|
|
|
return []*flow{
|
|
m.registerFlow("ReceivePings", router, []wire.MessageCommand{wire.CmdPing}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return ping.ReceivePings(m.context, incomingRoute, outgoingRoute)
|
|
},
|
|
),
|
|
|
|
m.registerFlow("SendPings", router, []wire.MessageCommand{wire.CmdPong}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return ping.SendPings(m.context, incomingRoute, outgoingRoute, peer)
|
|
},
|
|
),
|
|
}
|
|
}
|
|
|
|
func (m *Manager) registerIBDFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
|
outgoingRoute := router.OutgoingRoute()
|
|
|
|
return []*flow{
|
|
m.registerFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock,
|
|
wire.CmdDoneIBDBlocks}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return ibd.HandleIBD(m.context, incomingRoute, outgoingRoute, peer)
|
|
},
|
|
),
|
|
|
|
m.registerFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return selectedtip.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer)
|
|
},
|
|
),
|
|
|
|
m.registerFlow("HandleRequestSelectedTip", router, []wire.MessageCommand{wire.CmdRequestSelectedTip}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return selectedtip.HandleRequestSelectedTip(m.context, incomingRoute, outgoingRoute)
|
|
},
|
|
),
|
|
|
|
m.registerFlow("HandleRequestBlockLocator", router, []wire.MessageCommand{wire.CmdRequestBlockLocator}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return ibd.HandleRequestBlockLocator(m.context, incomingRoute, outgoingRoute)
|
|
},
|
|
),
|
|
|
|
m.registerFlow("HandleRequestIBDBlocks", router, []wire.MessageCommand{wire.CmdRequestIBDBlocks, wire.CmdRequestNextIBDBlocks}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return ibd.HandleRequestIBDBlocks(m.context, incomingRoute, outgoingRoute)
|
|
},
|
|
),
|
|
}
|
|
}
|
|
|
|
func (m *Manager) registerTransactionRelayFlow(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
|
outgoingRoute := router.OutgoingRoute()
|
|
|
|
return []*flow{
|
|
m.registerFlow("HandleRelayedTransactions", router,
|
|
[]wire.MessageCommand{wire.CmdInvTransaction, wire.CmdTx, wire.CmdTransactionNotFound}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return relaytransactions.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute)
|
|
},
|
|
),
|
|
m.registerFlow("HandleRequestTransactions", router,
|
|
[]wire.MessageCommand{wire.CmdRequestTransactions}, isStopping, errChan,
|
|
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
|
return relaytransactions.HandleRequestedTransactions(m.context, incomingRoute, outgoingRoute)
|
|
},
|
|
),
|
|
}
|
|
}
|
|
|
|
func (m *Manager) registerFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageCommand, isStopping *uint32,
|
|
errChan chan error, initializeFunc flowInitializeFunc) *flow {
|
|
|
|
route, err := router.AddIncomingRoute(messageTypes)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return &flow{
|
|
name: name,
|
|
executeFunc: func(peer *peerpkg.Peer) {
|
|
err := initializeFunc(route, peer)
|
|
if err != nil {
|
|
m.context.HandleError(err, name, isStopping, errChan)
|
|
return
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
func (m *Manager) registerOneTimeFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageCommand,
|
|
isStopping *uint32, stopChan chan error, initializeFunc flowInitializeFunc) *flow {
|
|
|
|
route, err := router.AddIncomingRoute(messageTypes)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return &flow{
|
|
name: name,
|
|
executeFunc: func(peer *peerpkg.Peer) {
|
|
defer func() {
|
|
err := router.RemoveRoute(messageTypes)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}()
|
|
|
|
err := initializeFunc(route, peer)
|
|
if err != nil {
|
|
m.context.HandleError(err, name, isStopping, stopChan)
|
|
return
|
|
}
|
|
},
|
|
}
|
|
}
|
|
|
|
func registerHandshakeRoutes(router *routerpkg.Router) (
|
|
receiveVersionRoute *routerpkg.Route, sendVersionRoute *routerpkg.Route) {
|
|
receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
sendVersionRoute, err = router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVerAck})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return receiveVersionRoute, sendVersionRoute
|
|
}
|
|
|
|
func removeHandshakeRoutes(router *routerpkg.Router) {
|
|
err := router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|