kaspad/protocol/protocol.go
Ori Newman 9939671ccc
[NOD-1147] Implement address exchange (#795)
* [NOD-1147] Implement address exchange

* [NOD-1147] Put placeholder for source address

* [NOD-1147] Fix tests

* [NOD-1147] Add comment

* [NOD-1147] Remove needAddresses from MsgGetAddr

* [NOD-1147] Use rand.Shuffle

* [NOD-1147] Remove redundant const

* [NOD-1147] Move defer to its correct place

* [NOD-1147] Fix typo

* [NOD-1147] Use EnqueueWithTimeout for outgoingRoute

* [NOD-1147] Rename MsgGetAddr->MsgGetAddresses

* [NOD-1147] Rename MsgGetAddr->MsgGetAddresses

* [NOD-1147] Rename MsgAddr->MsgAddresses

* [NOD-1147] Rename fakeSrcAddr->fakeSourceAddress

* [NOD-1147] Remove redundant files

* [NOD-1147] CmdAddr -> CmdAddress

* [NOD-1147] Rename addr to address in protocol package
2020-07-15 17:19:46 +03:00

186 lines
5.0 KiB
Go

package protocol
import (
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/handlerelayblockrequests"
"github.com/kaspanet/kaspad/protocol/handlerelayinvs"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/ping"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/protocol/receiveaddresses"
"github.com/kaspanet/kaspad/protocol/sendaddresses"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync/atomic"
)
// Manager manages the p2p protocol
type Manager struct {
netAdapter *netadapter.NetAdapter
}
// NewManager creates a new instance of the p2p protocol manager
func NewManager(listeningAddresses []string, dag *blockdag.BlockDAG,
addressManager *addrmgr.AddrManager) (*Manager, error) {
netAdapter, err := netadapter.NewNetAdapter(listeningAddresses)
if err != nil {
return nil, err
}
routerInitializer := newRouterInitializer(netAdapter, addressManager, dag)
netAdapter.SetRouterInitializer(routerInitializer)
manager := Manager{
netAdapter: netAdapter,
}
return &manager, nil
}
// Start starts the p2p protocol
func (p *Manager) Start() error {
return p.netAdapter.Start()
}
// Stop stops the p2p protocol
func (p *Manager) Stop() error {
return p.netAdapter.Stop()
}
func newRouterInitializer(netAdapter *netadapter.NetAdapter,
addressManager *addrmgr.AddrManager, dag *blockdag.BlockDAG) netadapter.RouterInitializer {
return func() (*routerpkg.Router, error) {
router := routerpkg.NewRouter()
spawn(func() {
err := startFlows(netAdapter, router, dag, addressManager)
if err != nil {
if protocolErr := &(protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
if protocolErr.ShouldBan {
// TODO(libp2p) Ban peer
panic("unimplemented")
}
err = netAdapter.DisconnectAssociatedConnection(router)
if err != nil {
panic(err)
}
return
}
if errors.Is(err, routerpkg.ErrTimeout) {
err = netAdapter.DisconnectAssociatedConnection(router)
if err != nil {
panic(err)
}
return
}
panic(err)
}
})
return router, nil
}
}
func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag *blockdag.BlockDAG,
addressManager *addrmgr.AddrManager) error {
stop := make(chan error)
stopped := uint32(0)
outgoingRoute := router.OutgoingRoute()
peer := new(peerpkg.Peer)
closed, err := handshake(router, netAdapter, peer, dag, addressManager)
if err != nil {
return err
}
if closed {
return nil
}
addOneTimeFlow("SendAddresses", router, []string{wire.CmdGetAddresses}, &stopped, stop,
func(incomingRoute *routerpkg.Route) (routeClosed bool, err error) {
return sendaddresses.SendAddresses(incomingRoute, outgoingRoute, addressManager)
},
)
addOneTimeFlow("ReceiveAddresses", router, []string{wire.CmdAddress}, &stopped, stop,
func(incomingRoute *routerpkg.Route) (routeClosed bool, err error) {
return receiveaddresses.ReceiveAddresses(incomingRoute, outgoingRoute, peer, addressManager)
},
)
addFlow("HandleRelayInvs", router, []string{wire.CmdInvRelayBlock, wire.CmdBlock}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return handlerelayinvs.HandleRelayInvs(incomingRoute, outgoingRoute, peer, netAdapter, dag)
},
)
addFlow("HandleRelayBlockRequests", router, []string{wire.CmdGetRelayBlocks}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return handlerelayblockrequests.HandleRelayBlockRequests(incomingRoute, outgoingRoute, peer, dag)
},
)
addFlow("ReceivePings", router, []string{wire.CmdPing}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ping.ReceivePings(incomingRoute, outgoingRoute)
},
)
addFlow("SendPings", router, []string{wire.CmdPong}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return ping.SendPings(incomingRoute, outgoingRoute, peer)
},
)
err = <-stop
return err
}
func addFlow(name string, router *routerpkg.Router, messageTypes []string, stopped *uint32,
stopChan chan error, flow func(route *routerpkg.Route) error) {
route, err := router.AddIncomingRoute(messageTypes)
if err != nil {
panic(err)
}
spawn(func() {
err := flow(route)
if err != nil {
log.Errorf("error from %s flow: %s", name, err)
}
if atomic.AddUint32(stopped, 1) == 1 {
stopChan <- err
}
})
}
func addOneTimeFlow(name string, router *routerpkg.Router, messageTypes []string, stopped *uint32,
stopChan chan error, flow func(route *routerpkg.Route) (routeClosed bool, err error)) {
route, err := router.AddIncomingRoute(messageTypes)
if err != nil {
panic(err)
}
spawn(func() {
defer func() {
err := router.RemoveRoute(messageTypes)
if err != nil {
panic(err)
}
}()
closed, err := flow(route)
if err != nil {
log.Errorf("error from %s flow: %s", name, err)
}
if (err != nil || closed) && atomic.AddUint32(stopped, 1) == 1 {
stopChan <- err
}
})
}