diff --git a/messagemux/messagemux.go b/messagemux/messagemux.go deleted file mode 100644 index 7434b1cce..000000000 --- a/messagemux/messagemux.go +++ /dev/null @@ -1,8 +0,0 @@ -package messagemux - -import "github.com/kaspanet/kaspad/wire" - -// Mux represents a p2p message multiplexer. -type Mux interface { - AddFlow(msgTypes []string, ch chan<- wire.Message) -} diff --git a/netadapter/netadapter.go b/netadapter/netadapter.go new file mode 100644 index 000000000..95be0fa66 --- /dev/null +++ b/netadapter/netadapter.go @@ -0,0 +1,68 @@ +package netadapter + +import ( + "github.com/kaspanet/kaspad/netadapter/server" + "github.com/kaspanet/kaspad/netadapter/server/grpcserver" +) + +// RouterInitializer is a function that initializes a new +// router to be used with a newly connected peer +type RouterInitializer func(peer *Peer) (*Router, error) + +// NetAdapter is an abstraction layer over networking. +// This type expects a RouteInitializer function. This +// function weaves together the various "routes" (messages +// and message handlers) without exposing anything related +// to networking internals. +type NetAdapter struct { + server server.Server + routerInitializer RouterInitializer +} + +// NewNetAdapter creates and starts a new NetAdapter on the +// given listeningPort +func NewNetAdapter(listeningPort string) (*NetAdapter, error) { + server, err := grpcserver.NewGRPCServer(listeningPort) + if err != nil { + return nil, err + } + adapter := NetAdapter{ + server: server, + } + + peerConnectedHandler := adapter.newPeerConnectedHandler() + server.SetPeerConnectedHandler(peerConnectedHandler) + + return &adapter, nil +} + +func (na *NetAdapter) newPeerConnectedHandler() server.PeerConnectedHandler { + return func(connection server.Connection) { + peer := NewPeer(connection) + router, err := na.routerInitializer(peer) + if err != nil { + // TODO(libp2p): properly handle error + panic(err) + } + + for { + message, err := peer.connection.Receive() + if err != nil { + // TODO(libp2p): properly handle error + panic(err) + } + router.RouteMessage(message) + } + } +} + +// SetRouterInitializer sets the routerInitializer function +// for the net adapter +func (na *NetAdapter) SetRouterInitializer(routerInitializer RouterInitializer) { + na.routerInitializer = routerInitializer +} + +// Close safely closes the netAdapter +func (na *NetAdapter) Close() error { + return na.server.Close() +} diff --git a/netadapter/peer.go b/netadapter/peer.go new file mode 100644 index 000000000..acfb123b5 --- /dev/null +++ b/netadapter/peer.go @@ -0,0 +1,23 @@ +package netadapter + +import ( + "github.com/kaspanet/kaspad/netadapter/server" + "github.com/kaspanet/kaspad/wire" +) + +// Peer represents a remote peer in a network +type Peer struct { + connection server.Connection +} + +// NewPeer creates a new Peer wrapping the given connection +func NewPeer(connection server.Connection) *Peer { + return &Peer{ + connection: connection, + } +} + +// SendMessage sends the given message to the remote peer +func (p *Peer) SendMessage(message wire.Message) error { + return p.connection.Send(message) +} diff --git a/netadapter/router.go b/netadapter/router.go new file mode 100644 index 000000000..bffe63d41 --- /dev/null +++ b/netadapter/router.go @@ -0,0 +1,31 @@ +package netadapter + +import ( + "github.com/kaspanet/kaspad/wire" + "github.com/pkg/errors" +) + +// Router routes messages by type to their respective +// input channels +type Router struct { + routes map[string]chan<- wire.Message +} + +// AddRoute registers the messages of types `messageTypes` to +// be routed to the given `inChannel` +func (r *Router) AddRoute(messageTypes []string, inChannel chan<- wire.Message) error { + for _, messageType := range messageTypes { + if _, ok := r.routes[messageType]; ok { + return errors.Errorf("a route for '%s' already exists", messageType) + } + r.routes[messageType] = inChannel + } + return nil +} + +// RouteMessage sends the given message to the correct input +// channel as registered with AddRoute +func (r *Router) RouteMessage(message wire.Message) { + routeInChannel := r.routes[message.Command()] + routeInChannel <- message +} diff --git a/netadapter/server/grpcserver/grpcserver.go b/netadapter/server/grpcserver/grpcserver.go new file mode 100644 index 000000000..020e32d76 --- /dev/null +++ b/netadapter/server/grpcserver/grpcserver.go @@ -0,0 +1,67 @@ +package grpcserver + +import ( + "github.com/kaspanet/kaspad/netadapter/server" + "github.com/kaspanet/kaspad/wire" +) + +type gRPCServer struct { + peerConnectedHandler server.PeerConnectedHandler + connections []server.Connection +} + +// NewGRPCServer creates and starts a gRPC server with the given +// listening port +func NewGRPCServer(listeningPort string) (server.Server, error) { + // TODO(libp2p): unimplemented + panic("unimplemented") +} + +// SetPeerConnectedHandler sets the peer connected handler +// function for the server +func (s *gRPCServer) SetPeerConnectedHandler(peerConnectedHandler server.PeerConnectedHandler) { + s.peerConnectedHandler = peerConnectedHandler +} + +// Connect connects to the given address +// This is part of the Server interface +func (s *gRPCServer) Connect(address string) (server.Connection, error) { + // TODO(libp2p): unimplemented + panic("unimplemented") +} + +// Connections returns a slice of connections the server +// is currently connected to. +// This is part of the Server interface +func (s *gRPCServer) Connections() []server.Connection { + // TODO(libp2p): unimplemented + panic("unimplemented") +} + +func (s *gRPCServer) Close() error { + // TODO(libp2p): unimplemented + panic("unimplemented") +} + +type gRPCConnection struct{} + +// Send sends the given message through the connection +// This is part of the Connection interface +func (c *gRPCConnection) Send(message wire.Message) error { + // TODO(libp2p): unimplemented + panic("unimplemented") +} + +// Receive receives the next message from the connection +// This is part of the Connection interface +func (c *gRPCConnection) Receive() (wire.Message, error) { + // TODO(libp2p): unimplemented + panic("unimplemented") +} + +// Disconnect disconnects the connection +// This is part of the Connection interface +func (c *gRPCConnection) Disconnect() error { + // TODO(libp2p): unimplemented + panic("unimplemented") +} diff --git a/netadapter/server/server.go b/netadapter/server/server.go new file mode 100644 index 000000000..6c73702e6 --- /dev/null +++ b/netadapter/server/server.go @@ -0,0 +1,24 @@ +package server + +import ( + "github.com/kaspanet/kaspad/wire" +) + +// PeerConnectedHandler is a function that is to be called +// once a new Connection is successfully established. +type PeerConnectedHandler func(connection Connection) + +// Server represents a p2p server. +type Server interface { + SetPeerConnectedHandler(peerConnectedHandler PeerConnectedHandler) + Connect(address string) (Connection, error) + Connections() []Connection + Close() error +} + +// Connection represents a p2p server connection. +type Connection interface { + Send(message wire.Message) error + Receive() (wire.Message, error) + Disconnect() error +} diff --git a/p2pserver/p2pserver.go b/p2pserver/p2pserver.go deleted file mode 100644 index ce211f083..000000000 --- a/p2pserver/p2pserver.go +++ /dev/null @@ -1,16 +0,0 @@ -package p2pserver - -import "github.com/kaspanet/kaspad/wire" - -// Server represents a p2p server. -type Server interface { - Connect(address string) (Connection, error) - Connections() []Connection -} - -// Connection represents a p2p server connection. -type Connection interface { - Send(message wire.Message) error - Receive() (wire.Message, error) - Disconnect() error -} diff --git a/protocol/protocol.go b/protocol/protocol.go index 92d29f2e2..226c5be5c 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -2,19 +2,43 @@ package protocol import ( "github.com/kaspanet/kaspad/blockdag" - "github.com/kaspanet/kaspad/messagemux" - "github.com/kaspanet/kaspad/p2pserver" + "github.com/kaspanet/kaspad/netadapter" "github.com/kaspanet/kaspad/wire" ) -// StartProtocol starts the p2p protocol for a given connection -func StartProtocol(server p2pserver.Server, mux messagemux.Mux, connection p2pserver.Connection, - dag *blockdag.BlockDAG) { - - mux.AddFlow([]string{wire.CmdTx}, startDummy(server, connection, dag)) +// Protocol manages the p2p protocol +type Protocol struct { + netAdapter *netadapter.NetAdapter } -func startDummy(server p2pserver.Server, connection p2pserver.Connection, dag *blockdag.BlockDAG) chan<- wire.Message { +// Start starts the p2p protocol +func Start(listeningPort string, dag *blockdag.BlockDAG) (*Protocol, error) { + netAdapter, err := netadapter.NewNetAdapter(listeningPort) + if err != nil { + return nil, err + } + + routerInitializer := newRouterInitializer(netAdapter, dag) + netAdapter.SetRouterInitializer(routerInitializer) + + protocol := Protocol{ + netAdapter: netAdapter, + } + return &protocol, nil +} + +func newRouterInitializer(netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) netadapter.RouterInitializer { + return func(peer *netadapter.Peer) (*netadapter.Router, error) { + router := netadapter.Router{} + err := router.AddRoute([]string{wire.CmdTx}, startDummy(netAdapter, peer, dag)) + if err != nil { + return nil, err + } + return &router, nil + } +} + +func startDummy(netAdapter *netadapter.NetAdapter, peer *netadapter.Peer, dag *blockdag.BlockDAG) chan<- wire.Message { ch := make(chan wire.Message) spawn(func() { for range ch {