mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-03-30 15:08:33 +00:00
[NOD-1120] Separate registration of routes, and the starting of flows (#832)
* [NOD-1120] Separate flow registration and running * [NOD-1120] Extract executeFunc to separate function * [NOD-1120] Move the registration of flows out of goroutine * [NOD-1120] Return after handleError * [NOD-1120] Rename: addXXXFlow -> registerXXXFlow * Rename: stop -> errChan * [NOD-1120] Fix name of goroutine
This commit is contained in:
parent
a9f3bdf4ab
commit
211c4d05e8
@ -31,18 +31,9 @@ type HandleHandshakeContext interface {
|
||||
|
||||
// HandleHandshake sets up the handshake protocol - It sends a version message and waits for an incoming
|
||||
// version message, as well as a verack for the sent version
|
||||
func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
|
||||
netConnection *netadapter.NetConnection) (peer *peerpkg.Peer, closed bool, err error) {
|
||||
|
||||
receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
sendVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVerAck})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.NetConnection,
|
||||
receiveVersionRoute *routerpkg.Route, sendVersionRoute *routerpkg.Route, outgoingRoute *routerpkg.Route,
|
||||
) (peer *peerpkg.Peer, err error) {
|
||||
|
||||
// For HandleHandshake to finish, we need to get from the other node
|
||||
// a version and verack messages, so we increase the wait group by 2
|
||||
@ -58,7 +49,7 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
|
||||
var peerAddress *wire.NetAddress
|
||||
spawn("HandleHandshake-ReceiveVersion", func() {
|
||||
defer wg.Done()
|
||||
address, err := ReceiveVersion(context, receiveVersionRoute, router.OutgoingRoute(), peer)
|
||||
address, err := ReceiveVersion(context, receiveVersionRoute, outgoingRoute, peer)
|
||||
if err != nil {
|
||||
context.HandleError(err, "SendVersion", &isStopping, errChan)
|
||||
return
|
||||
@ -68,7 +59,7 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
|
||||
|
||||
spawn("HandleHandshake-SendVersion", func() {
|
||||
defer wg.Done()
|
||||
err := SendVersion(context, sendVersionRoute, router.OutgoingRoute())
|
||||
err := SendVersion(context, sendVersionRoute, outgoingRoute)
|
||||
if err != nil {
|
||||
context.HandleError(err, "SendVersion", &isStopping, errChan)
|
||||
return
|
||||
@ -78,18 +69,18 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
return nil, err
|
||||
}
|
||||
return nil, true, nil
|
||||
return nil, nil
|
||||
case <-locks.ReceiveFromChanWhenDone(func() { wg.Wait() }):
|
||||
}
|
||||
|
||||
err = context.AddToPeers(peer)
|
||||
if err != nil {
|
||||
if errors.As(err, &common.ErrPeerWithSameIDExists) {
|
||||
return nil, false, protocolerrors.Wrap(false, err, "peer already exists")
|
||||
return nil, protocolerrors.Wrap(false, err, "peer already exists")
|
||||
}
|
||||
return nil, false, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if peerAddress != nil {
|
||||
@ -100,10 +91,5 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
|
||||
|
||||
context.StartIBDIfRequired()
|
||||
|
||||
err = router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
return peer, false, nil
|
||||
return peer, nil
|
||||
}
|
||||
|
@ -1,13 +1,14 @@
|
||||
package ping
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/kaspanet/kaspad/netadapter/router"
|
||||
"github.com/kaspanet/kaspad/protocol/common"
|
||||
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
|
||||
"github.com/kaspanet/kaspad/protocol/protocolerrors"
|
||||
"github.com/kaspanet/kaspad/util/random"
|
||||
"github.com/kaspanet/kaspad/wire"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SendPingsContext is the interface for the context needed for the SendPings flow.
|
||||
|
@ -1,6 +1,8 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/kaspanet/kaspad/addressmanager"
|
||||
"github.com/kaspanet/kaspad/blockdag"
|
||||
"github.com/kaspanet/kaspad/config"
|
||||
@ -79,3 +81,14 @@ func (m *Manager) AddBlock(block *util.Block, flags blockdag.BehaviorFlags) erro
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) runFlows(flows []*flow, peer *peerpkg.Peer, errChan <-chan error) error {
|
||||
for _, flow := range flows {
|
||||
executeFunc := flow.executeFunc // extract to new variable so that it's not overwritten
|
||||
spawn(fmt.Sprintf("flow-%s", flow.name), func() {
|
||||
executeFunc(peer)
|
||||
})
|
||||
}
|
||||
|
||||
return <-errChan
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/kaspanet/kaspad/addressmanager"
|
||||
@ -20,8 +19,27 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type flowInitializeFunc func(route *routerpkg.Route, peer *peerpkg.Peer) error
|
||||
type flowExecuteFunc func(peer *peerpkg.Peer)
|
||||
|
||||
type flow struct {
|
||||
name string
|
||||
executeFunc flowExecuteFunc
|
||||
}
|
||||
|
||||
func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *netadapter.NetConnection) {
|
||||
spawn("routerInitializer-startFlows", func() {
|
||||
// isStopping flag is raised the moment that the connection associated with this router is disconnected
|
||||
// errChan is used by the flow goroutines to return to runFlows when an error occurs.
|
||||
// They are both initialized here and passed to register flows.
|
||||
isStopping := uint32(0)
|
||||
errChan := make(chan error)
|
||||
|
||||
flows := m.registerFlows(router, errChan, &isStopping)
|
||||
receiveVersionRoute, sendVersionRoute := registerHandshakeRoutes(router)
|
||||
|
||||
// After flows were registered - spawn a new thread that will wait for connection to finish initializing
|
||||
// and start receiving messages
|
||||
spawn("routerInitializer-runFlows", func() {
|
||||
isBanned, err := m.context.ConnectionManager().IsBanned(netConnection)
|
||||
if err != nil && !errors.Is(err, addressmanager.ErrAddressNotFound) {
|
||||
panic(err)
|
||||
@ -31,194 +49,223 @@ func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *net
|
||||
return
|
||||
}
|
||||
|
||||
err = m.startFlows(netConnection, router)
|
||||
netConnection.SetOnInvalidMessageHandler(func(err error) {
|
||||
if atomic.AddUint32(&isStopping, 1) == 1 {
|
||||
errChan <- protocolerrors.Wrap(true, err, "received bad message")
|
||||
}
|
||||
})
|
||||
|
||||
peer, err := handshake.HandleHandshake(m.context, netConnection, receiveVersionRoute,
|
||||
sendVersionRoute, router.OutgoingRoute())
|
||||
if err != nil {
|
||||
if protocolErr := &(protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
|
||||
if protocolErr.ShouldBan {
|
||||
err := m.context.ConnectionManager().Ban(netConnection)
|
||||
if err != nil && !errors.Is(err, addressmanager.ErrAddressNotFound) {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
netConnection.Disconnect()
|
||||
return
|
||||
}
|
||||
if errors.Is(err, routerpkg.ErrTimeout) {
|
||||
netConnection.Disconnect()
|
||||
return
|
||||
}
|
||||
if errors.Is(err, routerpkg.ErrRouteClosed) {
|
||||
return
|
||||
}
|
||||
panic(err)
|
||||
m.handleError(err, netConnection)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) startFlows(netConnection *netadapter.NetConnection, router *routerpkg.Router) error {
|
||||
stop := make(chan error)
|
||||
isStopping := uint32(0)
|
||||
removeHandshakeRoutes(router)
|
||||
|
||||
netConnection.SetOnInvalidMessageHandler(func(err error) {
|
||||
if atomic.AddUint32(&isStopping, 1) == 1 {
|
||||
stop <- protocolerrors.Wrap(true, err, "received bad message")
|
||||
}
|
||||
})
|
||||
|
||||
peer, closed, err := handshake.HandleHandshake(m.context, router, netConnection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.addAddressFlows(router, &isStopping, stop, peer)
|
||||
m.addBlockRelayFlows(router, &isStopping, stop, peer)
|
||||
m.addPingFlows(router, &isStopping, stop, peer)
|
||||
m.addIBDFlows(router, &isStopping, stop, peer)
|
||||
m.addTransactionRelayFlow(router, &isStopping, stop)
|
||||
|
||||
err = <-stop
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Manager) addAddressFlows(router *routerpkg.Router, isStopping *uint32, stop chan error,
|
||||
peer *peerpkg.Peer) {
|
||||
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
m.addOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdGetAddresses}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return addressexchange.SendAddresses(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)
|
||||
|
||||
m.addOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddress}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return addressexchange.ReceiveAddresses(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (m *Manager) addBlockRelayFlows(router *routerpkg.Router, isStopping *uint32, stop chan error, peer *peerpkg.Peer) {
|
||||
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
m.addFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return blockrelay.HandleRelayInvs(m.context, incomingRoute,
|
||||
outgoingRoute, peer)
|
||||
},
|
||||
)
|
||||
|
||||
m.addFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdGetRelayBlocks}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return blockrelay.HandleRelayBlockRequests(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (m *Manager) addPingFlows(router *routerpkg.Router, isStopping *uint32, stop chan error, peer *peerpkg.Peer) {
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
m.addFlow("ReceivePings", router, []wire.MessageCommand{wire.CmdPing}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return ping.ReceivePings(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)
|
||||
|
||||
m.addFlow("SendPings", router, []wire.MessageCommand{wire.CmdPong}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return ping.SendPings(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (m *Manager) addIBDFlows(router *routerpkg.Router, isStopping *uint32, stop chan error,
|
||||
peer *peerpkg.Peer) {
|
||||
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
m.addFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return ibd.HandleIBD(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)
|
||||
|
||||
m.addFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return selectedtip.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)
|
||||
|
||||
m.addFlow("HandleGetSelectedTip", router, []wire.MessageCommand{wire.CmdGetSelectedTip}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return selectedtip.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)
|
||||
|
||||
m.addFlow("HandleGetBlockLocator", router, []wire.MessageCommand{wire.CmdGetBlockLocator}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return ibd.HandleGetBlockLocator(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)
|
||||
|
||||
m.addFlow("HandleGetBlocks", router, []wire.MessageCommand{wire.CmdGetBlocks}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return ibd.HandleGetBlocks(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (m *Manager) addTransactionRelayFlow(router *routerpkg.Router, isStopping *uint32, stop chan error) {
|
||||
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
m.addFlow("HandleRelayedTransactions", router, []wire.MessageCommand{wire.CmdInv, wire.CmdTx}, isStopping, stop,
|
||||
func(incomingRoute *routerpkg.Route) error {
|
||||
return relaytransactions.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (m *Manager) addFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageCommand, isStopping *uint32,
|
||||
stopChan chan error, flow func(route *routerpkg.Route) error) {
|
||||
|
||||
route, err := router.AddIncomingRoute(messageTypes)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
spawn(fmt.Sprintf("addFlow-startFlow-%s", name), func() {
|
||||
err := flow(route)
|
||||
err = m.runFlows(flows, peer, errChan)
|
||||
if err != nil {
|
||||
m.context.HandleError(err, name, isStopping, stopChan)
|
||||
m.handleError(err, netConnection)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) addOneTimeFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageCommand,
|
||||
isStopping *uint32, stopChan chan error, flow func(route *routerpkg.Route) error) {
|
||||
|
||||
route, err := router.AddIncomingRoute(messageTypes)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
spawn(fmt.Sprintf("addOneTimeFlow-startFlow-%s", name), func() {
|
||||
defer func() {
|
||||
err := router.RemoveRoute(messageTypes)
|
||||
if err != nil {
|
||||
func (m *Manager) handleError(err error, netConnection *netadapter.NetConnection) {
|
||||
if protocolErr := &(protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
|
||||
if protocolErr.ShouldBan {
|
||||
err := m.context.ConnectionManager().Ban(netConnection)
|
||||
if err != nil && !errors.Is(err, addressmanager.ErrAddressNotFound) {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
err := flow(route)
|
||||
if err != nil {
|
||||
m.context.HandleError(err, name, isStopping, stopChan)
|
||||
return
|
||||
}
|
||||
})
|
||||
netConnection.Disconnect()
|
||||
return
|
||||
}
|
||||
if errors.Is(err, routerpkg.ErrTimeout) {
|
||||
netConnection.Disconnect()
|
||||
return
|
||||
}
|
||||
if errors.Is(err, routerpkg.ErrRouteClosed) {
|
||||
return
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
|
||||
func (m *Manager) registerFlows(router *routerpkg.Router, errChan chan error, isStopping *uint32) (flows []*flow) {
|
||||
flows = m.registerAddressFlows(router, isStopping, errChan)
|
||||
flows = append(flows, m.registerBlockRelayFlows(router, isStopping, errChan)...)
|
||||
flows = append(flows, m.registerPingFlows(router, isStopping, errChan)...)
|
||||
flows = append(flows, m.registerIBDFlows(router, isStopping, errChan)...)
|
||||
flows = append(flows, m.registerTransactionRelayFlow(router, isStopping, errChan)...)
|
||||
|
||||
return flows
|
||||
}
|
||||
|
||||
func (m *Manager) registerAddressFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
return []*flow{
|
||||
m.registerOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdGetAddresses}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return addressexchange.SendAddresses(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
),
|
||||
|
||||
m.registerOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddress}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return addressexchange.ReceiveAddresses(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)}
|
||||
}
|
||||
|
||||
func (m *Manager) registerBlockRelayFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
return []*flow{
|
||||
m.registerFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return blockrelay.HandleRelayInvs(m.context, incomingRoute,
|
||||
outgoingRoute, peer)
|
||||
},
|
||||
),
|
||||
|
||||
m.registerFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdGetRelayBlocks}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return blockrelay.HandleRelayBlockRequests(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)}
|
||||
}
|
||||
|
||||
func (m *Manager) registerPingFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
return []*flow{
|
||||
m.registerFlow("ReceivePings", router, []wire.MessageCommand{wire.CmdPing}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return ping.ReceivePings(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
),
|
||||
|
||||
m.registerFlow("SendPings", router, []wire.MessageCommand{wire.CmdPong}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return ping.SendPings(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
)}
|
||||
}
|
||||
|
||||
func (m *Manager) registerIBDFlows(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
return []*flow{
|
||||
m.registerFlow("HandleIBD", router, []wire.MessageCommand{wire.CmdBlockLocator, wire.CmdIBDBlock}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return ibd.HandleIBD(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
),
|
||||
|
||||
m.registerFlow("RequestSelectedTip", router, []wire.MessageCommand{wire.CmdSelectedTip}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return selectedtip.RequestSelectedTip(m.context, incomingRoute, outgoingRoute, peer)
|
||||
},
|
||||
),
|
||||
|
||||
m.registerFlow("HandleGetSelectedTip", router, []wire.MessageCommand{wire.CmdGetSelectedTip}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return selectedtip.HandleGetSelectedTip(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
),
|
||||
|
||||
m.registerFlow("HandleGetBlockLocator", router, []wire.MessageCommand{wire.CmdGetBlockLocator}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return ibd.HandleGetBlockLocator(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
),
|
||||
|
||||
m.registerFlow("HandleGetBlocks", router, []wire.MessageCommand{wire.CmdGetBlocks}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return ibd.HandleGetBlocks(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)}
|
||||
}
|
||||
|
||||
func (m *Manager) registerTransactionRelayFlow(router *routerpkg.Router, isStopping *uint32, errChan chan error) []*flow {
|
||||
outgoingRoute := router.OutgoingRoute()
|
||||
|
||||
return []*flow{
|
||||
m.registerFlow("HandleRelayedTransactions", router, []wire.MessageCommand{wire.CmdInv, wire.CmdTx}, isStopping, errChan,
|
||||
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
|
||||
return relaytransactions.HandleRelayedTransactions(m.context, incomingRoute, outgoingRoute)
|
||||
},
|
||||
)}
|
||||
}
|
||||
|
||||
func (m *Manager) registerFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageCommand, isStopping *uint32,
|
||||
errChan chan error, initializeFunc flowInitializeFunc) *flow {
|
||||
|
||||
route, err := router.AddIncomingRoute(messageTypes)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &flow{
|
||||
name: name,
|
||||
executeFunc: func(peer *peerpkg.Peer) {
|
||||
err := initializeFunc(route, peer)
|
||||
if err != nil {
|
||||
m.context.HandleError(err, name, isStopping, errChan)
|
||||
return
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) registerOneTimeFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageCommand,
|
||||
isStopping *uint32, stopChan chan error, initializeFunc flowInitializeFunc) *flow {
|
||||
|
||||
route, err := router.AddIncomingRoute(messageTypes)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &flow{
|
||||
name: name,
|
||||
executeFunc: func(peer *peerpkg.Peer) {
|
||||
defer func() {
|
||||
err := router.RemoveRoute(messageTypes)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
err := initializeFunc(route, peer)
|
||||
if err != nil {
|
||||
m.context.HandleError(err, name, isStopping, stopChan)
|
||||
return
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func registerHandshakeRoutes(router *routerpkg.Router) (
|
||||
receiveVersionRoute *routerpkg.Route, sendVersionRoute *routerpkg.Route) {
|
||||
receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
sendVersionRoute, err = router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVerAck})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return receiveVersionRoute, sendVersionRoute
|
||||
}
|
||||
|
||||
func removeHandshakeRoutes(router *routerpkg.Router) {
|
||||
err := router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user