[NOD-1124] Implement the Flow thread model and architecture (#787)

* [NOD-1124] Begin implementing netadapter.

* [NOD-1124] Implementing a stub gRPC server..

* [NOD-1124] Construct the server inside the netadapter.

* [NOD-1124] Rewrite protocol.go to fit with the new netAdapter model.

* [NOD-1124] Wrap a connection in Peer.

* [NOD-1124] Add a peerstate object.

* [NOD-1124] Remove the peerstate object.

* [NOD-1124] Remove router out of Peer.

* [NOD-1124] Tag a TODO.

* [NOD-1124] Return an error out of AddRoute if a route already exists for some message type.

* [NOD-1124] Rename the package grpc to grpcserver.

* [NOD-1124] Extracted newConnectionHandler into a type.

* [NOD-1124] Extract routerInitializer into a type.

* [NOD-1124] Panic/Add TODOs everywhere that isn't implemented.

* [NOD-1124] Improve the NetAdapter comment.

* [NOD-1124] Rename NewConnectionHandler to PeerConnectedHandler.

* [NOD-1124] Rename buildRouterInitializer to newRouterInitializer.

* [NOD-1124] Remove unreachable code.

* [NOD-1124] Make go vet happy.
This commit is contained in:
stasatdaglabs 2020-07-08 10:14:52 +03:00 committed by GitHub
parent 1a43cabfb9
commit eb2642ba90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 245 additions and 32 deletions

View File

@ -1,8 +0,0 @@
package messagemux
import "github.com/kaspanet/kaspad/wire"
// Mux represents a p2p message multiplexer.
type Mux interface {
AddFlow(msgTypes []string, ch chan<- wire.Message)
}

68
netadapter/netadapter.go Normal file
View File

@ -0,0 +1,68 @@
package netadapter
import (
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver"
)
// RouterInitializer is a function that initializes a new
// router to be used with a newly connected peer
type RouterInitializer func(peer *Peer) (*Router, error)
// NetAdapter is an abstraction layer over networking.
// This type expects a RouteInitializer function. This
// function weaves together the various "routes" (messages
// and message handlers) without exposing anything related
// to networking internals.
type NetAdapter struct {
server server.Server
routerInitializer RouterInitializer
}
// NewNetAdapter creates and starts a new NetAdapter on the
// given listeningPort
func NewNetAdapter(listeningPort string) (*NetAdapter, error) {
server, err := grpcserver.NewGRPCServer(listeningPort)
if err != nil {
return nil, err
}
adapter := NetAdapter{
server: server,
}
peerConnectedHandler := adapter.newPeerConnectedHandler()
server.SetPeerConnectedHandler(peerConnectedHandler)
return &adapter, nil
}
func (na *NetAdapter) newPeerConnectedHandler() server.PeerConnectedHandler {
return func(connection server.Connection) {
peer := NewPeer(connection)
router, err := na.routerInitializer(peer)
if err != nil {
// TODO(libp2p): properly handle error
panic(err)
}
for {
message, err := peer.connection.Receive()
if err != nil {
// TODO(libp2p): properly handle error
panic(err)
}
router.RouteMessage(message)
}
}
}
// SetRouterInitializer sets the routerInitializer function
// for the net adapter
func (na *NetAdapter) SetRouterInitializer(routerInitializer RouterInitializer) {
na.routerInitializer = routerInitializer
}
// Close safely closes the netAdapter
func (na *NetAdapter) Close() error {
return na.server.Close()
}

23
netadapter/peer.go Normal file
View File

@ -0,0 +1,23 @@
package netadapter
import (
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/wire"
)
// Peer represents a remote peer in a network
type Peer struct {
connection server.Connection
}
// NewPeer creates a new Peer wrapping the given connection
func NewPeer(connection server.Connection) *Peer {
return &Peer{
connection: connection,
}
}
// SendMessage sends the given message to the remote peer
func (p *Peer) SendMessage(message wire.Message) error {
return p.connection.Send(message)
}

31
netadapter/router.go Normal file
View File

@ -0,0 +1,31 @@
package netadapter
import (
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
// Router routes messages by type to their respective
// input channels
type Router struct {
routes map[string]chan<- wire.Message
}
// AddRoute registers the messages of types `messageTypes` to
// be routed to the given `inChannel`
func (r *Router) AddRoute(messageTypes []string, inChannel chan<- wire.Message) error {
for _, messageType := range messageTypes {
if _, ok := r.routes[messageType]; ok {
return errors.Errorf("a route for '%s' already exists", messageType)
}
r.routes[messageType] = inChannel
}
return nil
}
// RouteMessage sends the given message to the correct input
// channel as registered with AddRoute
func (r *Router) RouteMessage(message wire.Message) {
routeInChannel := r.routes[message.Command()]
routeInChannel <- message
}

View File

@ -0,0 +1,67 @@
package grpcserver
import (
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/wire"
)
type gRPCServer struct {
peerConnectedHandler server.PeerConnectedHandler
connections []server.Connection
}
// NewGRPCServer creates and starts a gRPC server with the given
// listening port
func NewGRPCServer(listeningPort string) (server.Server, error) {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// SetPeerConnectedHandler sets the peer connected handler
// function for the server
func (s *gRPCServer) SetPeerConnectedHandler(peerConnectedHandler server.PeerConnectedHandler) {
s.peerConnectedHandler = peerConnectedHandler
}
// Connect connects to the given address
// This is part of the Server interface
func (s *gRPCServer) Connect(address string) (server.Connection, error) {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// Connections returns a slice of connections the server
// is currently connected to.
// This is part of the Server interface
func (s *gRPCServer) Connections() []server.Connection {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
func (s *gRPCServer) Close() error {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
type gRPCConnection struct{}
// Send sends the given message through the connection
// This is part of the Connection interface
func (c *gRPCConnection) Send(message wire.Message) error {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// Receive receives the next message from the connection
// This is part of the Connection interface
func (c *gRPCConnection) Receive() (wire.Message, error) {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// Disconnect disconnects the connection
// This is part of the Connection interface
func (c *gRPCConnection) Disconnect() error {
// TODO(libp2p): unimplemented
panic("unimplemented")
}

View File

@ -0,0 +1,24 @@
package server
import (
"github.com/kaspanet/kaspad/wire"
)
// PeerConnectedHandler is a function that is to be called
// once a new Connection is successfully established.
type PeerConnectedHandler func(connection Connection)
// Server represents a p2p server.
type Server interface {
SetPeerConnectedHandler(peerConnectedHandler PeerConnectedHandler)
Connect(address string) (Connection, error)
Connections() []Connection
Close() error
}
// Connection represents a p2p server connection.
type Connection interface {
Send(message wire.Message) error
Receive() (wire.Message, error)
Disconnect() error
}

View File

@ -1,16 +0,0 @@
package p2pserver
import "github.com/kaspanet/kaspad/wire"
// Server represents a p2p server.
type Server interface {
Connect(address string) (Connection, error)
Connections() []Connection
}
// Connection represents a p2p server connection.
type Connection interface {
Send(message wire.Message) error
Receive() (wire.Message, error)
Disconnect() error
}

View File

@ -2,19 +2,43 @@ package protocol
import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/messagemux"
"github.com/kaspanet/kaspad/p2pserver"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/wire"
)
// StartProtocol starts the p2p protocol for a given connection
func StartProtocol(server p2pserver.Server, mux messagemux.Mux, connection p2pserver.Connection,
dag *blockdag.BlockDAG) {
mux.AddFlow([]string{wire.CmdTx}, startDummy(server, connection, dag))
// Protocol manages the p2p protocol
type Protocol struct {
netAdapter *netadapter.NetAdapter
}
func startDummy(server p2pserver.Server, connection p2pserver.Connection, dag *blockdag.BlockDAG) chan<- wire.Message {
// Start starts the p2p protocol
func Start(listeningPort string, dag *blockdag.BlockDAG) (*Protocol, error) {
netAdapter, err := netadapter.NewNetAdapter(listeningPort)
if err != nil {
return nil, err
}
routerInitializer := newRouterInitializer(netAdapter, dag)
netAdapter.SetRouterInitializer(routerInitializer)
protocol := Protocol{
netAdapter: netAdapter,
}
return &protocol, nil
}
func newRouterInitializer(netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) netadapter.RouterInitializer {
return func(peer *netadapter.Peer) (*netadapter.Router, error) {
router := netadapter.Router{}
err := router.AddRoute([]string{wire.CmdTx}, startDummy(netAdapter, peer, dag))
if err != nil {
return nil, err
}
return &router, nil
}
}
func startDummy(netAdapter *netadapter.NetAdapter, peer *netadapter.Peer, dag *blockdag.BlockDAG) chan<- wire.Message {
ch := make(chan wire.Message)
spawn(func() {
for range ch {