Compare commits

...

6 Commits

Author SHA1 Message Date
Ori Newman
3ab861227d [NOD-443] Fix API server unorphaning (#501)
* [NOD-443] Immediately request missing parents in API server sync

* [NOD-443] Add rpc client log to api server

* [NOD-443] Fix wrong ordering of pendingHeaders queue

* [NOD-443] Fix error comparision at TestNewHashFromStr

* [NOD-443] Make a separate handleMissingParentHeaders function

* [NOD-443] Change log level

* [NOD-443] Put handleMissingParentHeaders next to handleBlockHeader

* [NOD-443] Make handleBlockAddedMsg function

* [NOD-443] Make reusable missingParentsHashesStr string

* [NOD-443] Remove redundant 's'

* [NOD-443] Refactor to first get all blocks and then add them to database

* [NOD-443] Rename variables and functions, and remove redundant logs

* [NOD-443] Make fetchBlockAndMissingAncestors use block hash as an argument

* [NOD-443] Add log only for first orphan block

* [NOD-443] Fix wrong order of adding blocks to pendingBlocks

* [NOD-443] Write logs for all orphans

* [NOD-443] Log only missing parents that are not already fetched

* [NOD-443] Rename rawVerboseBlockTuple -> rawVerboseBlock

* [NOD-443] Make fetchBlock return *rawVerboseBlock

* [NOD-443] Rename rawVerboseBlock -> rawAndVerboseBlock
2019-12-02 13:29:28 +02:00
stasatdaglabs
8f0d98ef9b [NOD-464] Fix error messages in GetBlocks. (#506) 2019-12-02 13:05:56 +02:00
Svarog
dbd8bf3d2c [NOD-478] Add buffer to newOutboundConnection channel (#502) 2019-12-01 17:58:05 +02:00
Dan Aharoni
1b6b02e0d2 [NOD-475] Return error when requesting limit=0 (#505) 2019-12-01 17:25:27 +02:00
Ori Newman
2402bae1ff [NOD-410] Add log level CLI argument to API server (#503)
* [NOD-410] Add log level CLI argument to API server

* [NOD-410] Add comments

* [NOD-410] Remove pre-allocation of one item
2019-12-01 17:24:12 +02:00
Dan Aharoni
3dcf8d88b8 [NOD-481] Fix /blocks limit error message for api server (#504) 2019-12-01 16:19:14 +02:00
17 changed files with 156 additions and 151 deletions

View File

@@ -30,6 +30,7 @@ func ActiveConfig() *Config {
// Config defines the configuration options for the API server.
type Config struct {
LogDir string `long:"logdir" description:"Directory to log output."`
DebugLevel string `short:"d" long:"debuglevel" description:"Set log level {trace, debug, info, warn, error, critical}"`
RPCUser string `short:"u" long:"rpcuser" description:"RPC username"`
RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"`
RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"`
@@ -48,7 +49,7 @@ type Config struct {
}
// Parse parses the CLI arguments and returns a config struct.
func Parse() (*Config, error) {
func Parse() error {
activeConfig = &Config{
LogDir: defaultLogDir,
DBAddress: defaultDBAddress,
@@ -57,42 +58,49 @@ func Parse() (*Config, error) {
parser := flags.NewParser(activeConfig, flags.PrintErrors|flags.HelpFlag)
_, err := parser.Parse()
if err != nil {
return nil, err
return err
}
if !activeConfig.Migrate {
if activeConfig.RPCUser == "" {
return nil, errors.New("--rpcuser is required if --migrate flag is not used")
return errors.New("--rpcuser is required if --migrate flag is not used")
}
if activeConfig.RPCPassword == "" {
return nil, errors.New("--rpcpass is required if --migrate flag is not used")
return errors.New("--rpcpass is required if --migrate flag is not used")
}
if activeConfig.RPCServer == "" {
return nil, errors.New("--rpcserver is required if --migrate flag is not used")
return errors.New("--rpcserver is required if --migrate flag is not used")
}
}
if activeConfig.RPCCert == "" && !activeConfig.DisableTLS {
return nil, errors.New("--notls has to be disabled if --cert is used")
return errors.New("--notls has to be disabled if --cert is used")
}
if activeConfig.RPCCert != "" && activeConfig.DisableTLS {
return nil, errors.New("--cert should be omitted if --notls is used")
return errors.New("--cert should be omitted if --notls is used")
}
if (activeConfig.MQTTBrokerAddress != "" || activeConfig.MQTTUser != "" || activeConfig.MQTTPassword != "") &&
(activeConfig.MQTTBrokerAddress == "" || activeConfig.MQTTUser == "" || activeConfig.MQTTPassword == "") {
return nil, errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
return errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
}
err = activeConfig.ResolveNetwork(parser)
if err != nil {
return nil, err
return err
}
logFile := filepath.Join(activeConfig.LogDir, defaultLogFilename)
errLogFile := filepath.Join(activeConfig.LogDir, defaultErrLogFilename)
logger.InitLog(logFile, errLogFile)
return activeConfig, nil
if activeConfig.DebugLevel != "" {
err := logger.SetLogLevels(activeConfig.DebugLevel)
if err != nil {
return err
}
}
return nil
}

View File

@@ -54,8 +54,9 @@ func GetBlockByHashHandler(blockHash string) (interface{}, error) {
// GetBlocksHandler searches for all blocks
func GetBlocksHandler(order string, skip uint64, limit uint64) (interface{}, error) {
if limit > maxGetBlocksLimit {
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf("The maximum allowed value for the limit is %d", maxGetBlocksLimit))
if limit < 1 || limit > maxGetBlocksLimit {
return nil, httpserverutils.NewHandlerError(http.StatusBadRequest,
errors.Errorf("Limit higher than %d or lower than 1 was requested.", maxGetTransactionsLimit))
}
blocks := []*dbmodels.Block{}
db, err := database.DB()

View File

@@ -79,9 +79,9 @@ func GetTransactionByHashHandler(txHash string) (interface{}, error) {
// GetTransactionsByAddressHandler searches for all transactions
// where the given address is either an input or an output.
func GetTransactionsByAddressHandler(address string, skip uint64, limit uint64) (interface{}, error) {
if limit > maxGetTransactionsLimit {
if limit < 1 || limit > maxGetTransactionsLimit {
return nil, httpserverutils.NewHandlerError(http.StatusBadRequest,
errors.Errorf("Limit higher than %d or lower than 0 was requested.", maxGetTransactionsLimit))
errors.Errorf("Limit higher than %d or lower than 1 was requested.", maxGetTransactionsLimit))
}
db, err := database.DB()

View File

@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
var (
log = logger.BackendLog.Logger("DTBS")
log = logger.Logger("DTBS")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

16
apiserver/jsonrpc/log.go Normal file
View File

@@ -0,0 +1,16 @@
package jsonrpc
import (
"github.com/daglabs/btcd/apiserver/logger"
"github.com/daglabs/btcd/rpcclient"
"github.com/daglabs/btcd/util/panics"
)
var (
log = logger.BackendLog.Logger("RPCC")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)
func init() {
rpcclient.UseLogger(log, logger.BackendLog)
}

View File

@@ -6,6 +6,6 @@ import (
)
var (
log = logger.BackendLog.Logger("APIS")
log = logger.Logger("APIS")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

View File

@@ -3,11 +3,13 @@ package logger
import (
"fmt"
"github.com/daglabs/btcd/logs"
"github.com/pkg/errors"
"os"
)
// BackendLog is the logging backend used to create all subsystem loggers.
var BackendLog = logs.NewBackend()
var loggers []logs.Logger
// InitLog attaches log file and error log file to the backend log.
func InitLog(logFile, errLogFile string) {
@@ -22,3 +24,24 @@ func InitLog(logFile, errLogFile string) {
os.Exit(1)
}
}
// Logger returns a new logger for a particular subsystem that writes to
// BackendLog, and add it to a slice so it will be possible to access it
// later and change its log level
func Logger(subsystemTag string) logs.Logger {
logger := BackendLog.Logger(subsystemTag)
loggers = append(loggers, logger)
return logger
}
// SetLogLevels sets the logging level for all of the subsystems in the API server.
func SetLogLevels(level string) error {
lvl, ok := logs.LevelFromString(level)
if !ok {
return errors.Errorf("Invalid log level %s", level)
}
for _, logger := range loggers {
logger.SetLevel(lvl)
}
return nil
}

View File

@@ -21,7 +21,7 @@ import (
func main() {
defer panics.HandlePanic(log, logger.BackendLog, nil)
cfg, err := config.Parse()
err := config.Parse()
if err != nil {
errString := fmt.Sprintf("Error parsing command-line arguments: %s", err)
_, fErr := fmt.Fprintf(os.Stderr, errString)
@@ -31,7 +31,7 @@ func main() {
return
}
if cfg.Migrate {
if config.ActiveConfig().Migrate {
err := database.Migrate()
if err != nil {
panic(errors.Errorf("Error migrating database: %s", err))

View File

@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
var (
log = logger.BackendLog.Logger("MQTT")
log = logger.Logger("MQTT")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

View File

@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
var (
log = logger.BackendLog.Logger("REST")
log = logger.Logger("REST")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

View File

@@ -164,7 +164,7 @@ func getBlocksHandler(_ *httpserverutils.ServerContext, _ *http.Request, _ map[s
if orderParamValue, ok := queryParams[queryParamOrder]; ok {
if orderParamValue != controllers.OrderAscending && orderParamValue != controllers.OrderDescending {
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf(
"Couldn't parse the '%s' query parameter", queryParamLimit))
"Couldn't parse the '%s' query parameter", queryParamOrder))
}
order = orderParamValue
}

View File

@@ -24,24 +24,8 @@ import (
"github.com/pkg/errors"
)
var (
// pendingBlockAddedMsgs holds blockAddedMsgs in order of arrival
pendingBlockAddedMsgs []*jsonrpc.BlockAddedMsg
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
// missingBlocks is a map between missing block hashes and the time
// they were first found to be missing. If a block is still missing
// after blockMissingTimeout then it gets re-requested from the node.
missingBlocks = make(map[string]time.Time)
)
const (
// blockMissingTimeout is the amount of time after which a missing block
// gets re-requested from the node.
blockMissingTimeout = time.Second * 10
)
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
var pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
// startSync keeps the node and the API server in sync. On start, it downloads
// all data that's missing from the API server, and once it's done it keeps
@@ -85,8 +69,7 @@ func sync(client *jsonrpc.Client, doneChan chan struct{}) error {
for {
select {
case blockAdded := <-client.OnBlockAdded:
enqueueBlockAddedMsg(blockAdded)
err := processBlockAddedMsgs(client)
err := handleBlockAddedMsg(client, blockAdded)
if err != nil {
return err
}
@@ -201,23 +184,27 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) {
// fetchBlock downloads the serialized block and raw block data of
// the block with hash blockHash.
func fetchBlock(client *jsonrpc.Client, blockHash *daghash.Hash) (
rawBlock string, verboseBlock *btcjson.GetBlockVerboseResult, err error) {
*rawAndVerboseBlock, error) {
log.Debugf("Getting block %s from the RPC server", blockHash)
msgBlock, err := client.GetBlock(blockHash, nil)
if err != nil {
return "", nil, err
return nil, err
}
writer := bytes.NewBuffer(make([]byte, 0, msgBlock.SerializeSize()))
err = msgBlock.Serialize(writer)
if err != nil {
return "", nil, err
return nil, err
}
rawBlock = hex.EncodeToString(writer.Bytes())
rawBlock := hex.EncodeToString(writer.Bytes())
verboseBlock, err = client.GetBlockVerboseTx(blockHash, nil)
verboseBlock, err := client.GetBlockVerboseTx(blockHash, nil)
if err != nil {
return "", nil, err
return nil, err
}
return rawBlock, verboseBlock, nil
return &rawAndVerboseBlock{
rawBlock: rawBlock,
verboseBlock: verboseBlock,
}, nil
}
// addBlocks inserts data in the given rawBlocks and verboseBlocks pairwise
@@ -319,12 +306,6 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB
}
dbTx.Commit()
// If the block was previously missing, remove it from
// the missing blocks collection.
if _, ok := missingBlocks[verboseBlock.Hash]; ok {
delete(missingBlocks, verboseBlock.Hash)
}
return nil
}
@@ -937,126 +918,106 @@ func updateAddedChainBlocks(dbTx *gorm.DB, addedBlock *btcjson.ChainBlock) error
return nil
}
// enqueueBlockAddedMsg enqueues onBlockAdded messages to be handled later
func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) {
pendingBlockAddedMsgs = append(pendingBlockAddedMsgs, blockAdded)
type rawAndVerboseBlock struct {
rawBlock string
verboseBlock *btcjson.GetBlockVerboseResult
}
// processBlockAddedMsgs processes all pending onBlockAdded messages.
// Messages that cannot yet be processed are re-enqueued.
func processBlockAddedMsgs(client *jsonrpc.Client) error {
var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg
for _, blockAdded := range pendingBlockAddedMsgs {
missingHashes, err := missingParentHashes(blockAdded)
if err != nil {
return errors.Errorf("Could not resolve missing parents: %s", err)
}
for _, missingHash := range missingHashes {
err := handleMissingParent(client, missingHash)
if err != nil {
log.Errorf("Failed to handle missing parent block %s: %s",
missingHash, err)
}
}
if len(missingHashes) > 0 {
unprocessedBlockAddedMsgs = append(unprocessedBlockAddedMsgs, blockAdded)
continue
}
func (r *rawAndVerboseBlock) String() string {
return r.verboseBlock.Hash
}
handleBlockAddedMsg(client, blockAdded)
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) error {
blocks, err := fetchBlockAndMissingAncestors(client, blockAdded.Header.BlockHash())
if err != nil {
return err
}
for _, block := range blocks {
err = addBlock(client, block.rawBlock, *block.verboseBlock)
if err != nil {
return err
}
log.Infof("Added block %s", block.verboseBlock.Hash)
}
pendingBlockAddedMsgs = unprocessedBlockAddedMsgs
return nil
}
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) {
hash := blockAdded.Header.BlockHash()
log.Debugf("Getting block %s from the RPC server", hash)
rawBlock, verboseBlock, err := fetchBlock(client, hash)
if err != nil {
log.Warnf("Could not fetch block %s: %s", hash, err)
return
}
err = addBlock(client, rawBlock, *verboseBlock)
if err != nil {
log.Errorf("Could not insert block %s: %s", hash, err)
return
}
log.Infof("Added block %s", hash)
}
func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) {
db, err := database.DB()
func fetchBlockAndMissingAncestors(client *jsonrpc.Client, blockHash *daghash.Hash) ([]*rawAndVerboseBlock, error) {
block, err := fetchBlock(client, blockHash)
if err != nil {
return nil, err
}
pendingBlocks := []*rawAndVerboseBlock{block}
blocksToAdd := make([]*rawAndVerboseBlock, 0)
blocksToAddSet := make(map[string]struct{})
for len(pendingBlocks) > 0 {
var currentBlock *rawAndVerboseBlock
currentBlock, pendingBlocks = pendingBlocks[0], pendingBlocks[1:]
missingHashes, err := missingParentHashes(currentBlock.verboseBlock.ParentHashes)
if err != nil {
return nil, err
}
blocksToPrependToPending := make([]*rawAndVerboseBlock, 0, len(missingHashes))
for _, missingHash := range missingHashes {
if _, ok := blocksToAddSet[missingHash]; ok {
continue
}
hash, err := daghash.NewHashFromStr(missingHash)
if err != nil {
return nil, err
}
block, err := fetchBlock(client, hash)
if err != nil {
return nil, err
}
blocksToPrependToPending = append(blocksToPrependToPending, block)
}
if len(blocksToPrependToPending) == 0 {
blocksToAddSet[currentBlock.verboseBlock.Hash] = struct{}{}
blocksToAdd = append(blocksToAdd, currentBlock)
continue
}
log.Debugf("Found %s missing parents for block %s and fetched them", blocksToPrependToPending, currentBlock)
blocksToPrependToPending = append(blocksToPrependToPending, currentBlock)
pendingBlocks = append(blocksToPrependToPending, pendingBlocks...)
}
return blocksToAdd, nil
}
// Collect all referenced parent hashes
hashesIn := make([]string, 0, len(blockAdded.Header.ParentHashes))
for _, hash := range blockAdded.Header.ParentHashes {
hashesIn = append(hashesIn, hash.String())
func missingParentHashes(parentHashes []string) ([]string, error) {
db, err := database.DB()
if err != nil {
return nil, err
}
// Make sure that all the parent hashes exist in the database
var dbParentBlocks []dbmodels.Block
dbResult := db.
Model(&dbmodels.Block{}).
Where("block_hash in (?)", hashesIn).
Where("block_hash in (?)", parentHashes).
Find(&dbParentBlocks)
dbErrors := dbResult.GetErrors()
if httpserverutils.HasDBError(dbErrors) {
return nil, httpserverutils.NewErrorFromDBErrors("failed to find parent blocks: ", dbErrors)
}
if len(hashesIn) != len(dbParentBlocks) {
if len(parentHashes) != len(dbParentBlocks) {
// Some parent hashes are missing. Collect and return them
var missingParentHashes []string
var missingHashes []string
outerLoop:
for _, hash := range hashesIn {
for _, hash := range parentHashes {
for _, dbParentBlock := range dbParentBlocks {
if dbParentBlock.BlockHash == hash {
continue outerLoop
}
}
missingParentHashes = append(missingParentHashes, hash)
missingHashes = append(missingHashes, hash)
}
return missingParentHashes, nil
return missingHashes, nil
}
return nil, nil
}
// handleMissingParent handles missing parent block hashes as follows:
// a. If it's the first time we've encountered this block hash, add it to
// the missing blocks collection with time = now
// b. Otherwise, if time + blockMissingTimeout < now (that is to say,
// blockMissingTimeout had elapsed) then request the block from the
// node
func handleMissingParent(client *jsonrpc.Client, missingParentHash string) error {
firstMissingTime, ok := missingBlocks[missingParentHash]
if !ok {
log.Infof("Parent block %s is missing", missingParentHash)
missingBlocks[missingParentHash] = time.Now()
return nil
}
if firstMissingTime.Add(blockMissingTimeout).Before(time.Now()) {
hash, err := daghash.NewHashFromStr(missingParentHash)
if err != nil {
return errors.Errorf("Could not create hash: %s", err)
}
rawBlock, verboseBlock, err := fetchBlock(client, hash)
if err != nil {
return errors.Errorf("Could not fetch block %s: %s", hash, err)
}
err = addBlock(client, rawBlock, *verboseBlock)
if err != nil {
return errors.Errorf("Could not insert block %s: %s", hash, err)
}
log.Infof("Parent block %s was fetched after missing for over %s",
missingParentHash, blockMissingTimeout)
}
return nil
}
// enqueueChainChangedMsg enqueues onChainChanged messages to be handled later
func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) {
pendingChainChangedMsgs = append(pendingChainChangedMsgs, chainChanged)

View File

@@ -7,11 +7,12 @@ package connmgr
import (
nativeerrors "errors"
"fmt"
"github.com/pkg/errors"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
)
// maxFailedAttempts is the maximum number of successive failed connection

View File

@@ -10,7 +10,6 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"math"
"net"
"runtime"
@@ -21,6 +20,8 @@ import (
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/daglabs/btcd/addrmgr"
@@ -1610,7 +1611,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
broadcast: make(chan broadcastMsg, config.ActiveConfig().MaxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
newOutboundConnection: make(chan *outboundPeerConnectedMsg),
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().MaxPeers), // TODO: replace with target outbound
nat: nat,
db: db,
TimeSource: blockdag.NewMedianTime(),

View File

@@ -1,9 +1,5 @@
package testutil
import (
"fmt"
)
// AreErrorsEqual returns whether errors have the same type
// and same error string from .Error().
func AreErrorsEqual(err1, err2 error) bool {
@@ -13,8 +9,5 @@ func AreErrorsEqual(err1, err2 error) bool {
if err1 == nil && err2 != nil {
return false
}
if fmt.Sprintf("%T", err1) != fmt.Sprintf("%T", err2) {
return false
}
return err1.Error() == err2.Error()
}

View File

@@ -191,7 +191,7 @@ func Decode(dst *Hash, src string) error {
var reversedHash Hash
_, err := hex.Decode(reversedHash[HashSize-hex.DecodedLen(len(srcBytes)):], srcBytes)
if err != nil {
return err
return errors.WithStack(err)
}
// Reverse copy from the temporary hash to destination. Because the

View File

@@ -7,6 +7,7 @@ package daghash
import (
"bytes"
"encoding/hex"
"github.com/daglabs/btcd/testutil"
"math/big"
"reflect"
"testing"
@@ -214,7 +215,7 @@ func TestNewHashFromStr(t *testing.T) {
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
result, err := NewHashFromStr(test.in)
if err != test.err {
if !testutil.AreErrorsEqual(err, test.err) {
t.Errorf(unexpectedErrStr, i, err, test.err)
continue
} else if err != nil {