Michael Sutton c81506220b
Send pruning point anticone in batches (#1973)
* add p2p v5 which is currently identical to v4

* set all internal imports to v5

* set default version to 5

* Send pruning point and its anticone in batches

* go lint

* Fix jsom format

* Use DequeueWithTimeout

* Assert that batch size < route capacity

* oops, this is a flow handler, by definition it needs to be w/o a timeout

* here however, a timeout is required

* Keep IDs of prev messages unmodified

* previous merge operation accidentally erased an important part of this pr

* Extend timeout of simple sync

Co-authored-by: Ori Newman <orinewman1@gmail.com>
2022-03-13 16:31:34 +02:00

37 lines
1.1 KiB
Go

package flowcontext
import (
"errors"
"sync/atomic"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
)
// HandleError handles an error from a flow,
// It sends the error to errChan if isStopping == 0 and increments isStopping
//
// If this is ErrRouteClosed - forward it to errChan
// If this is ProtocolError - logs the error, and forward it to errChan
// Otherwise - panics
func (*FlowContext) HandleError(err error, flowName string, isStopping *uint32, errChan chan<- error) {
isErrRouteClosed := errors.Is(err, router.ErrRouteClosed)
if !isErrRouteClosed {
if protocolErr := (protocolerrors.ProtocolError{}); !errors.As(err, &protocolErr) {
panic(err)
}
log.Errorf("error from %s: %+v", flowName, err)
}
if atomic.AddUint32(isStopping, 1) == 1 {
errChan <- err
}
}
// IsRecoverableError returns whether the error is recoverable
func (*FlowContext) IsRecoverableError(err error) bool {
return err == nil || errors.Is(err, router.ErrRouteClosed) || errors.As(err, &protocolerrors.ProtocolError{})
}