kaspad/protocol/protocol.go
stasatdaglabs 6d591dde74
[NOD-1124] Implement the Flow thread model and architecture (#789)
* [NOD-1124] Rename Peer to Connection (because Peer is a business logic term)

* [NOD-1124] Implement Close for Router.

* [NOD-1124] Add SetPeerDisconnectedHandler.

* [NOD-1124] Remove mentions of "peer" from the netadapter package.

* [NOD-1124] Handle errors/stopping in netadapter.

* [NOD-1124] Remove netadapter.Connection.

* [NOD-1124] Add startSendLoop.

* [NOD-1124] Implement network IDs.

* [NOD-1124] Implement a map between IDs and routes.

* [NOD-1124] Implement Broadcast.

* [NOD-1124] Fix rename error.

* [NOD-1124] Fix copy+paste error.

* [NOD-1124] Change the type of NetAdapter.stop to uint32.

* [NOD-1124] If NetAdapter is stopped more than once, return an error.

* [NOD-1124] Add an error case to RouteInputMessage.

* [NOD-1124] Rename CreateID to NewID.

* [NOD-1124] Spawn from outside startReceiveLoop and startSendLoop.

* [NOD-1124] Fix a comment.

* [NOD-1124] Replace break with for condition.

* [NOD-1124] Don't disconnect from disconnected peers.

* [NOD-1124] Fix a for condition.

* [NOD-1124] Handle an error.
2020-07-09 09:34:28 +03:00

61 lines
1.4 KiB
Go

package protocol
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/wire"
)
// Manager manages the p2p protocol
type Manager struct {
netAdapter *netadapter.NetAdapter
}
// NewManager creates a new instance of the p2p protocol manager
func NewManager(listeningAddrs []string, dag *blockdag.BlockDAG) (*Manager, error) {
netAdapter, err := netadapter.NewNetAdapter(listeningAddrs)
if err != nil {
return nil, err
}
routerInitializer := newRouterInitializer(netAdapter, dag)
netAdapter.SetRouterInitializer(routerInitializer)
manager := Manager{
netAdapter: netAdapter,
}
return &manager, nil
}
// Start starts the p2p protocol
func (p *Manager) Start() error {
return p.netAdapter.Start()
}
// Stop stops the p2p protocol
func (p *Manager) Stop() error {
return p.netAdapter.Stop()
}
func newRouterInitializer(netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) netadapter.RouterInitializer {
return func() (*netadapter.Router, error) {
router := netadapter.NewRouter()
err := router.AddRoute([]string{wire.CmdTx}, startDummy(netAdapter, router, dag))
if err != nil {
return nil, err
}
return router, nil
}
}
func startDummy(netAdapter *netadapter.NetAdapter, router *netadapter.Router,
dag *blockdag.BlockDAG) chan wire.Message {
ch := make(chan wire.Message)
spawn(func() {
for range ch {
}
})
return ch
}