kaspad/network/netadapter/netconnection.go
stasatdaglabs 8a4ece1101
[NOD-1223] Reorganize project (#868)
* [NOD-1223] Move all network stuff into a new network package.

* [NOD-1223] Delete the unused package testutil.

* [NOD-1223] Move infrastructure stuff into a new instrastructure package.

* [NOD-1223] Move domain stuff into a new domain package.
2020-08-13 17:27:25 +03:00

102 lines
2.7 KiB
Go

package netadapter
import (
"fmt"
"github.com/kaspanet/kaspad/network/domainmessage"
routerpkg "github.com/kaspanet/kaspad/network/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/network/netadapter/id"
"github.com/kaspanet/kaspad/network/netadapter/server"
)
// NetConnection is a wrapper to a server connection for use by services external to NetAdapter
type NetConnection struct {
connection server.Connection
id *id.ID
router *routerpkg.Router
invalidMessageChan chan error
onDisconnectedHandler server.OnDisconnectedHandler
isConnected uint32
}
func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection {
router := routerpkg.NewRouter()
netConnection := &NetConnection{
connection: connection,
router: router,
invalidMessageChan: make(chan error),
}
netConnection.connection.SetOnDisconnectedHandler(func() {
router.Close()
close(netConnection.invalidMessageChan)
netConnection.onDisconnectedHandler()
})
netConnection.connection.SetOnInvalidMessageHandler(func(err error) {
netConnection.invalidMessageChan <- err
})
router.SetOnRouteCapacityReachedHandler(func() {
netConnection.Disconnect()
})
routerInitializer(router, netConnection)
return netConnection
}
func (c *NetConnection) start() {
if c.onDisconnectedHandler == nil {
panic(errors.New("onDisconnectedHandler is nil"))
}
c.connection.Start(c.router)
}
func (c *NetConnection) String() string {
return fmt.Sprintf("<%s: %s>", c.id, c.connection)
}
// ID returns the ID associated with this connection
func (c *NetConnection) ID() *id.ID {
return c.id
}
// SetID sets the ID associated with this connection
func (c *NetConnection) SetID(peerID *id.ID) {
c.id = peerID
}
// Address returns the address associated with this connection
func (c *NetConnection) Address() string {
return c.connection.Address().String()
}
// IsOutbound returns whether the connection is outbound
func (c *NetConnection) IsOutbound() bool {
return c.connection.IsOutbound()
}
// NetAddress returns the NetAddress associated with this connection
func (c *NetConnection) NetAddress() *domainmessage.NetAddress {
return domainmessage.NewNetAddress(c.connection.Address(), 0)
}
func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.OnDisconnectedHandler) {
c.onDisconnectedHandler = onDisconnectedHandler
}
// Disconnect disconnects the given connection
func (c *NetConnection) Disconnect() {
c.connection.Disconnect()
}
// DequeueInvalidMessage dequeues the next invalid message
func (c *NetConnection) DequeueInvalidMessage() (isOpen bool, err error) {
err, isOpen = <-c.invalidMessageChan
return isOpen, err
}