[NOD-1201] Panic if callbacks are not set (#856)

* [NOD-1201] Panic if necessary callback are not set in gRPCConnection and gRPCServer

* [NOD-1201] Fix comment and change return order

* [NOD-1201] Return nil instead of error on gRPCServer.Start

* [NOD-1201] Fix typo
This commit is contained in:
Ori Newman 2020-08-13 15:21:52 +03:00 committed by GitHub
parent 7e74fc0b2b
commit a2aa58c8a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 22 deletions

View File

@ -2,9 +2,9 @@ package netadapter
import (
"fmt"
"github.com/kaspanet/kaspad/domainmessage"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/netadapter/server"
@ -15,23 +15,28 @@ type NetConnection struct {
connection server.Connection
id *id.ID
router *routerpkg.Router
invalidMessageChan chan error
onDisconnectedHandler server.OnDisconnectedHandler
isConnected uint32
}
func newNetConnection(connection server.Connection, routerInitializer RouterInitializer) *NetConnection {
router := routerpkg.NewRouter()
netConnection := &NetConnection{
connection: connection,
router: router,
connection: connection,
router: router,
invalidMessageChan: make(chan error),
}
netConnection.connection.SetOnDisconnectedHandler(func() {
router.Close()
close(netConnection.invalidMessageChan)
netConnection.onDisconnectedHandler()
})
if netConnection.onDisconnectedHandler != nil {
netConnection.onDisconnectedHandler()
}
netConnection.connection.SetOnInvalidMessageHandler(func(err error) {
netConnection.invalidMessageChan <- err
})
router.SetOnRouteCapacityReachedHandler(func() {
@ -44,6 +49,10 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
}
func (c *NetConnection) start() {
if c.onDisconnectedHandler == nil {
panic(errors.New("onDisconnectedHandler is nil"))
}
c.connection.Start(c.router)
}
@ -76,12 +85,6 @@ func (c *NetConnection) NetAddress() *domainmessage.NetAddress {
return domainmessage.NewNetAddress(c.connection.Address(), 0)
}
// SetOnInvalidMessageHandler sets a handler function
// for invalid messages
func (c *NetConnection) SetOnInvalidMessageHandler(onInvalidMessageHandler server.OnInvalidMessageHandler) {
c.connection.SetOnInvalidMessageHandler(onInvalidMessageHandler)
}
func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.OnDisconnectedHandler) {
c.onDisconnectedHandler = onDisconnectedHandler
}
@ -90,3 +93,9 @@ func (c *NetConnection) setOnDisconnectedHandler(onDisconnectedHandler server.On
func (c *NetConnection) Disconnect() {
c.connection.Disconnect()
}
// DequeueInvalidMessage dequeues the next invalid message
func (c *NetConnection) DequeueInvalidMessage() (isOpen bool, err error) {
err, isOpen = <-c.invalidMessageChan
return isOpen, err
}

View File

@ -1,6 +1,7 @@
package grpcserver
import (
"github.com/pkg/errors"
"net"
"sync/atomic"
@ -40,6 +41,14 @@ func newConnection(server *gRPCServer, address *net.TCPAddr, isOutbound bool, st
}
func (c *gRPCConnection) Start(router *router.Router) {
if c.onDisconnectedHandler == nil {
panic(errors.New("onDisconnectedHandler is nil"))
}
if c.onInvalidMessageHandler == nil {
panic(errors.New("onInvalidMessageHandler is nil"))
}
c.router = router
spawn("gRPCConnection.Start-connectionLoops", func() {

View File

@ -37,6 +37,10 @@ func NewGRPCServer(listeningAddrs []string) (server.Server, error) {
}
func (s *gRPCServer) Start() error {
if s.onConnectedHandler == nil {
return errors.New("onConnectedHandler is nil")
}
for _, listenAddr := range s.listeningAddrs {
err := s.listenOn(listenAddr)
if err != nil {

View File

@ -49,9 +49,15 @@ func (m *Manager) routerInitializer(router *routerpkg.Router, netConnection *net
return
}
netConnection.SetOnInvalidMessageHandler(func(err error) {
if atomic.AddUint32(&isStopping, 1) == 1 {
errChan <- protocolerrors.Wrap(true, err, "received bad message")
spawn("Manager.routerInitializer-netConnection.DequeueInvalidMessage", func() {
for {
isOpen, err := netConnection.DequeueInvalidMessage()
if !isOpen {
return
}
if atomic.AddUint32(&isStopping, 1) == 1 {
errChan <- protocolerrors.Wrap(true, err, "received bad message")
}
}
})

View File

@ -18,8 +18,7 @@ func (e *ProtocolError) Unwrap() error {
}
// Errorf formats according to a format specifier and returns the string
// as a value that satisfies error.
// Errorf also records the stack trace at the point it was called.
// as a ProtocolError.
func Errorf(shouldBan bool, format string, args ...interface{}) error {
return &ProtocolError{
ShouldBan: shouldBan,
@ -27,7 +26,7 @@ func Errorf(shouldBan bool, format string, args ...interface{}) error {
}
}
// New returns an error with the supplied message.
// New returns a ProtocolError with the supplied message.
// New also records the stack trace at the point it was called.
func New(shouldBan bool, message string) error {
return &ProtocolError{
@ -36,8 +35,7 @@ func New(shouldBan bool, message string) error {
}
}
// Wrap returns an error annotating err with a stack trace
// at the point Wrap is called, and the supplied message.
// Wrap wraps the given error and returns it as a ProtocolError.
func Wrap(shouldBan bool, err error, message string) error {
return &ProtocolError{
ShouldBan: shouldBan,
@ -45,8 +43,7 @@ func Wrap(shouldBan bool, err error, message string) error {
}
}
// Wrapf returns an error annotating err with a stack trace
// at the point Wrapf is called, and the format specifier.
// Wrapf wraps the given error with the given format and returns it as a ProtocolError.
func Wrapf(shouldBan bool, err error, format string, args ...interface{}) error {
return &ProtocolError{
ShouldBan: shouldBan,