[NOD-1145] Normalize panics in flows (#819)

* [NOD-1145] Remove panics from regular flows.

* [NOD-1145] Remove panics from the handshake flow.

* [NOD-1045] Fix merge errors.

* [NOD-1045] Remove a comment.

* [NOD-1045] Handle errors properly in AddTransaction and AddBlock.

* [NOD-1045] Remove a comment.

* [NOD-1045] Wrap ErrPeerWithSameIDExists with ProtocolError.

* [NOD-1145] Add TODOs.
This commit is contained in:
stasatdaglabs 2020-07-26 13:46:59 +03:00 committed by GitHub
parent 683ceda3a7
commit 6cf589dc9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 56 additions and 33 deletions

View File

@ -21,7 +21,7 @@ func (f *FlowContext) AddTransaction(tx *util.Tx) error {
}
if len(transactionsAcceptedToMempool) > 1 {
panic(errors.New("got more than one accepted transactions when no orphans were allowed"))
return errors.New("got more than one accepted transactions when no orphans were allowed")
}
f.transactionsToRebroadcast[*tx.ID()] = tx

View File

@ -26,7 +26,7 @@ func SendAddresses(context SendAddressesContext, incomingRoute *router.Route, ou
msgAddresses := wire.NewMsgAddresses(msgGetAddresses.IncludeAllSubnetworks, msgGetAddresses.SubnetworkID)
err = msgAddresses.AddAddresses(shuffleAddresses(addresses)...)
if err != nil {
panic(err)
return err
}
return outgoingRoute.Enqueue(msgAddresses)

View File

@ -31,7 +31,7 @@ func HandleRelayBlockRequests(context RelayBlockRequestsContext, incomingRoute *
if blockdag.IsNotInDAGErr(err) {
return protocolerrors.Errorf(true, "block %s not found", hash)
} else if err != nil {
panic(errors.Wrapf(err, "unable to fetch requested block hash %s", hash))
return errors.Wrapf(err, "unable to fetch requested block hash %s", hash)
}
msgBlock := block.MsgBlock()

View File

@ -169,25 +169,19 @@ func (flow *handleRelayInvsFlow) readMsgBlock() (
case *wire.MsgBlock:
return message, nil
default:
panic(errors.Errorf("unexpected message %s", message.Command()))
return nil, errors.Errorf("unexpected message %s", message.Command())
}
}
}
func (flow *handleRelayInvsFlow) processAndRelayBlock(requestQueue *hashesQueueSet, block *util.Block) error {
blockHash := block.Hash()
isOrphan, isDelayed, err := flow.DAG().ProcessBlock(block, blockdag.BFNone)
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
// it as such. Otherwise, something really did go wrong, so panic.
if !errors.As(err, &blockdag.RuleError{}) {
panic(errors.Wrapf(err, "failed to process block %s",
blockHash))
return errors.Wrapf(err, "failed to process block %s", blockHash)
}
log.Infof("Rejected block %s from %s: %s", blockHash,
flow.peer, err)
log.Infof("Rejected block %s from %s: %s", blockHash, flow.peer, err)
return protocolerrors.Wrap(true, err, "got invalid block")
}
@ -237,7 +231,7 @@ func (flow *handleRelayInvsFlow) processAndRelayBlock(requestQueue *hashesQueueS
flow.StartIBDIfRequired()
err = flow.OnNewBlock(block)
if err != nil {
panic(err)
return err
}
return nil

View File

@ -1,6 +1,8 @@
package handshake
import (
"github.com/kaspanet/kaspad/protocol/common"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"sync"
"sync/atomic"
@ -8,7 +10,6 @@ import (
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/common"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
@ -34,12 +35,12 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion})
if err != nil {
panic(err)
return nil, false, err
}
sendVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVerAck})
if err != nil {
panic(err)
return nil, false, err
}
// For HandleHandshake to finish, we need to get from the other node
@ -59,6 +60,9 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
address, err := ReceiveVersion(context, receiveVersionRoute, router.OutgoingRoute(), peer)
if err != nil {
log.Errorf("error from ReceiveVersion: %s", err)
if protocolErr := &(protocolerrors.ProtocolError{}); !errors.As(err, &protocolErr) {
panic(err)
}
}
if err != nil {
if atomic.AddUint32(&errChanUsed, 1) != 1 {
@ -74,6 +78,9 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
err := SendVersion(context, sendVersionRoute, router.OutgoingRoute())
if err != nil {
log.Errorf("error from SendVersion: %s", err)
if protocolErr := &(protocolerrors.ProtocolError{}); !errors.As(err, &protocolErr) {
panic(err)
}
}
if err != nil {
if atomic.AddUint32(&errChanUsed, 1) != 1 {
@ -94,10 +101,10 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
err = context.AddToPeers(peer)
if err != nil {
if errors.Is(err, common.ErrPeerWithSameIDExists) {
return nil, false, err
if errors.As(err, &common.ErrPeerWithSameIDExists) {
return nil, false, protocolerrors.Wrap(false, err, "peer already exists")
}
panic(err)
return nil, false, err
}
if peerAddress != nil {
@ -110,7 +117,7 @@ func HandleHandshake(context HandleHandshakeContext, router *routerpkg.Router,
err = router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck})
if err != nil {
panic(err)
return nil, false, err
}
return peer, false, nil

View File

@ -47,7 +47,7 @@ func (flow *sendVersionFlow) start() error {
// Version message.
localAddress, err := flow.NetAdapter().GetBestLocalAddress()
if err != nil {
panic(err)
return err
}
msg := wire.NewMsgVersion(localAddress, flow.NetAdapter().ID(), selectedTipHash, subnetworkID)
msg.AddUserAgent(userAgentName, userAgentVersion, flow.Config().UserAgentComments...)

View File

@ -178,7 +178,7 @@ func (flow *handleIBDFlow) processIBDBlock(msgIBDBlock *wire.MsgIBDBlock) error
}
err = flow.OnNewBlock(block)
if err != nil {
panic(err)
return err
}
return nil
}

View File

@ -48,8 +48,8 @@ func (flow *handleGetSelectedTipFlow) receiveGetSelectedTip() error {
}
_, ok := message.(*wire.MsgGetSelectedTip)
if !ok {
panic(errors.Errorf("received unexpected message type. "+
"expected: %s, got: %s", wire.CmdGetSelectedTip, message.Command()))
return errors.Errorf("received unexpected message type. "+
"expected: %s, got: %s", wire.CmdGetSelectedTip, message.Command())
}
return nil

View File

@ -163,7 +163,7 @@ func (flow *handleRelayedTransactionsFlow) readMsgTx() (
case *wire.MsgTx:
return message, nil
default:
panic(errors.Errorf("unexpected message %s", message.Command()))
return nil, errors.Errorf("unexpected message %s", message.Command())
}
}
}
@ -185,13 +185,9 @@ func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransact
acceptedTxs, err := flow.TxPool().ProcessTransaction(tx, true, 0) // TODO(libp2p) Use the peer ID for the mempool tag
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
// so log it as such. Otherwise, something really did go wrong,
// so panic.
ruleErr := &mempool.RuleError{}
if !errors.As(err, ruleErr) {
panic(errors.Wrapf(err, "failed to process transaction %s", tx.ID()))
return errors.Wrapf(err, "failed to process transaction %s", tx.ID())
}
shouldBan := false
@ -211,7 +207,7 @@ func (flow *handleRelayedTransactionsFlow) receiveTransactions(requestedTransact
}
err = flow.broadcastAcceptedTransactions(acceptedTxs)
if err != nil {
panic(err)
return err
}
// TODO(libp2p) Notify transactionsAcceptedToMempool to RPC
}

View File

@ -9,7 +9,9 @@ import (
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/protocol/flowcontext"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/util"
"github.com/pkg/errors"
)
// Manager manages the p2p protocol
@ -57,10 +59,28 @@ func (m *Manager) IBDPeer() *peerpkg.Peer {
// AddTransaction adds transaction to the mempool and propagates it.
func (m *Manager) AddTransaction(tx *util.Tx) error {
return m.context.AddTransaction(tx)
err := m.context.AddTransaction(tx)
if err != nil {
if protocolErr := &(protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
return err
}
// TODO(libp2p): Remove panic once RPC is integrated into protocol architecture
panic(err)
}
return nil
}
// AddBlock adds the given block to the DAG and propagates it.
func (m *Manager) AddBlock(block *util.Block, flags blockdag.BehaviorFlags) error {
return m.context.AddBlock(block, flags)
err := m.context.AddBlock(block, flags)
if err != nil {
if protocolErr := &(protocolerrors.ProtocolError{}); errors.As(err, &protocolErr) {
return err
}
// TODO(libp2p): Remove panic once RPC is integrated into protocol architecture
panic(err)
}
return nil
}

View File

@ -199,6 +199,9 @@ func addFlow(name string, router *routerpkg.Router, messageTypes []wire.MessageC
err := flow(route)
if err != nil {
log.Errorf("error from %s flow: %s", name, err)
if protocolErr := &(protocolerrors.ProtocolError{}); !errors.As(err, &protocolErr) {
panic(err)
}
}
if atomic.AddUint32(stopped, 1) == 1 {
stopChan <- err
@ -225,6 +228,9 @@ func addOneTimeFlow(name string, router *routerpkg.Router, messageTypes []wire.M
err := flow(route)
if err != nil {
log.Errorf("error from %s flow: %s", name, err)
if protocolErr := &(protocolerrors.ProtocolError{}); !errors.As(err, &protocolErr) {
panic(err)
}
}
if err != nil && atomic.AddUint32(stopped, 1) == 1 {
stopChan <- err