Compare commits

...

16 Commits

Author SHA1 Message Date
Svarog
e68b242243 [NOD-489] Don't skip notification about transactions for orphan/non-current blocks (#511) 2019-12-03 14:18:32 +02:00
Ori Newman
9cc2a7260b [NOD-479] Separate max outbound connections and max inbound connections (#509)
* [NOD-479] Separate max outbound connections and max inbound connections

* [NOD-479] Fix merge

* [NOD-479] Renames and add function countinboundPeers

* [NOD-479] Remove redundant check on maximum outbound peers

* [NOD-479] Rename countinboundPeers -> countInboundPeers
2019-12-03 12:27:49 +02:00
Ori Newman
bcd73012de [NOD-428] Require RPC user and password, and do not create a default config file for btcctl if rpc login details were provided (#510)
* [NOD-428] Required RPC user and password, and do not create a default config file for btcctl if rpc login details were provided

* [NOD-428] Don't check rpc user and password if rpc is disabled

* [NOD-428] Fix error message
2019-12-03 11:18:28 +02:00
Dan Aharoni
1fea2a9421 [NOD-486] API Server TX posting: Forward error when RPC Error is received (#507)
* [NOD-486] Forward error when RPC Error is recieved

* [NOD-486] Rename variable

* [NOD-486] Rename variable

* [NOD-486] Rename Variable (again)
2019-12-02 18:44:39 +02:00
stasatdaglabs
bb7d68deda [NOD-484] Fix deadlock between p2p server and sync manager during shutdown (#508)
* [NOD-484] Fix deadlock between p2p server and sync manager during shutdown.

* [NOD-484] Fix quitWaitGroup.Wait() potentially not waiting in some scenarios.

* [NOD-484] Add a comment explaining quitWaitGroup.

* [NOD-484] Fix typo.

* [NOD-484] Add etc to comment.
2019-12-02 18:08:32 +02:00
Ori Newman
3ab861227d [NOD-443] Fix API server unorphaning (#501)
* [NOD-443] Immediately request missing parents in API server sync

* [NOD-443] Add rpc client log to api server

* [NOD-443] Fix wrong ordering of pendingHeaders queue

* [NOD-443] Fix error comparision at TestNewHashFromStr

* [NOD-443] Make a separate handleMissingParentHeaders function

* [NOD-443] Change log level

* [NOD-443] Put handleMissingParentHeaders next to handleBlockHeader

* [NOD-443] Make handleBlockAddedMsg function

* [NOD-443] Make reusable missingParentsHashesStr string

* [NOD-443] Remove redundant 's'

* [NOD-443] Refactor to first get all blocks and then add them to database

* [NOD-443] Rename variables and functions, and remove redundant logs

* [NOD-443] Make fetchBlockAndMissingAncestors use block hash as an argument

* [NOD-443] Add log only for first orphan block

* [NOD-443] Fix wrong order of adding blocks to pendingBlocks

* [NOD-443] Write logs for all orphans

* [NOD-443] Log only missing parents that are not already fetched

* [NOD-443] Rename rawVerboseBlockTuple -> rawVerboseBlock

* [NOD-443] Make fetchBlock return *rawVerboseBlock

* [NOD-443] Rename rawVerboseBlock -> rawAndVerboseBlock
2019-12-02 13:29:28 +02:00
stasatdaglabs
8f0d98ef9b [NOD-464] Fix error messages in GetBlocks. (#506) 2019-12-02 13:05:56 +02:00
Svarog
dbd8bf3d2c [NOD-478] Add buffer to newOutboundConnection channel (#502) 2019-12-01 17:58:05 +02:00
Dan Aharoni
1b6b02e0d2 [NOD-475] Return error when requesting limit=0 (#505) 2019-12-01 17:25:27 +02:00
Ori Newman
2402bae1ff [NOD-410] Add log level CLI argument to API server (#503)
* [NOD-410] Add log level CLI argument to API server

* [NOD-410] Add comments

* [NOD-410] Remove pre-allocation of one item
2019-12-01 17:24:12 +02:00
Dan Aharoni
3dcf8d88b8 [NOD-481] Fix /blocks limit error message for api server (#504) 2019-12-01 16:19:14 +02:00
stasatdaglabs
dbf9c09a2e [NOD-461] Fix error code and message in GetTransactionsByAddressHandler. (#499) 2019-11-28 17:32:34 +02:00
stasatdaglabs
5e9fc2defc [NOD-464] Fix error messages in GetBlocks. (#500) 2019-11-28 17:31:19 +02:00
Ori Newman
bdc3cbceaa [NOD-472] Don't fetch accepting block and tx confirmations for getBlocks (#498)
* [NOD-472] Don't fetch accepting block and tx confirmations for getBlocks

* [NOD-472] Don't fetch accepting block and tx confirmations in any block verbose result

* [NOD-472] Add stringPointerToString function
2019-11-28 13:04:03 +02:00
stasatdaglabs
a71528fefb [NOD-450] Fix netsync clogging its own request queue with orphans that it had just now processed (#497)
* [NOD-450] Fix netsync clogging its own request queue with orphans that it had just now processed.

* [NOD-450] Rename hash to orphanHash.
2019-11-28 11:30:50 +02:00
Dan Aharoni
6725742d2c [NOD-470] Pass string instead of hash to controller (#496) 2019-11-27 17:08:23 +02:00
25 changed files with 290 additions and 231 deletions

View File

@@ -30,6 +30,7 @@ func ActiveConfig() *Config {
// Config defines the configuration options for the API server.
type Config struct {
LogDir string `long:"logdir" description:"Directory to log output."`
DebugLevel string `short:"d" long:"debuglevel" description:"Set log level {trace, debug, info, warn, error, critical}"`
RPCUser string `short:"u" long:"rpcuser" description:"RPC username"`
RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"`
RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"`
@@ -48,7 +49,7 @@ type Config struct {
}
// Parse parses the CLI arguments and returns a config struct.
func Parse() (*Config, error) {
func Parse() error {
activeConfig = &Config{
LogDir: defaultLogDir,
DBAddress: defaultDBAddress,
@@ -57,42 +58,49 @@ func Parse() (*Config, error) {
parser := flags.NewParser(activeConfig, flags.PrintErrors|flags.HelpFlag)
_, err := parser.Parse()
if err != nil {
return nil, err
return err
}
if !activeConfig.Migrate {
if activeConfig.RPCUser == "" {
return nil, errors.New("--rpcuser is required if --migrate flag is not used")
return errors.New("--rpcuser is required if --migrate flag is not used")
}
if activeConfig.RPCPassword == "" {
return nil, errors.New("--rpcpass is required if --migrate flag is not used")
return errors.New("--rpcpass is required if --migrate flag is not used")
}
if activeConfig.RPCServer == "" {
return nil, errors.New("--rpcserver is required if --migrate flag is not used")
return errors.New("--rpcserver is required if --migrate flag is not used")
}
}
if activeConfig.RPCCert == "" && !activeConfig.DisableTLS {
return nil, errors.New("--notls has to be disabled if --cert is used")
return errors.New("--notls has to be disabled if --cert is used")
}
if activeConfig.RPCCert != "" && activeConfig.DisableTLS {
return nil, errors.New("--cert should be omitted if --notls is used")
return errors.New("--cert should be omitted if --notls is used")
}
if (activeConfig.MQTTBrokerAddress != "" || activeConfig.MQTTUser != "" || activeConfig.MQTTPassword != "") &&
(activeConfig.MQTTBrokerAddress == "" || activeConfig.MQTTUser == "" || activeConfig.MQTTPassword == "") {
return nil, errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
return errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
}
err = activeConfig.ResolveNetwork(parser)
if err != nil {
return nil, err
return err
}
logFile := filepath.Join(activeConfig.LogDir, defaultLogFilename)
errLogFile := filepath.Join(activeConfig.LogDir, defaultErrLogFilename)
logger.InitLog(logFile, errLogFile)
return activeConfig, nil
if activeConfig.DebugLevel != "" {
err := logger.SetLogLevels(activeConfig.DebugLevel)
if err != nil {
return err
}
}
return nil
}

View File

@@ -54,8 +54,9 @@ func GetBlockByHashHandler(blockHash string) (interface{}, error) {
// GetBlocksHandler searches for all blocks
func GetBlocksHandler(order string, skip uint64, limit uint64) (interface{}, error) {
if limit > maxGetBlocksLimit {
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf("The maximum allowed value for the limit is %d", maxGetBlocksLimit))
if limit < 1 || limit > maxGetBlocksLimit {
return nil, httpserverutils.NewHandlerError(http.StatusBadRequest,
errors.Errorf("Limit higher than %d or lower than 1 was requested.", maxGetTransactionsLimit))
}
blocks := []*dbmodels.Block{}
db, err := database.DB()
@@ -82,7 +83,7 @@ func GetBlocksHandler(order string, skip uint64, limit uint64) (interface{}, err
}
// GetAcceptedTransactionIDsByBlockHashHandler returns an array of transaction IDs for a given block hash
func GetAcceptedTransactionIDsByBlockHashHandler(blockHash *daghash.Hash) ([]string, error) {
func GetAcceptedTransactionIDsByBlockHashHandler(blockHash string) ([]string, error) {
db, err := database.DB()
if err != nil {
return nil, err

View File

@@ -79,9 +79,9 @@ func GetTransactionByHashHandler(txHash string) (interface{}, error) {
// GetTransactionsByAddressHandler searches for all transactions
// where the given address is either an input or an output.
func GetTransactionsByAddressHandler(address string, skip uint64, limit uint64) (interface{}, error) {
if limit > maxGetTransactionsLimit {
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity,
errors.Errorf("The maximum allowed value for the limit is %d", maxGetTransactionsLimit))
if limit < 1 || limit > maxGetTransactionsLimit {
return nil, httpserverutils.NewHandlerError(http.StatusBadRequest,
errors.Errorf("Limit higher than %d or lower than 1 was requested.", maxGetTransactionsLimit))
}
db, err := database.DB()
@@ -278,12 +278,13 @@ func PostTransaction(requestBody []byte) error {
_, err = client.SendRawTransaction(tx, true)
if err != nil {
if rpcErr, ok := err.(*btcjson.RPCError); ok && rpcErr.Code == btcjson.ErrRPCVerify {
return httpserverutils.NewHandlerError(http.StatusInternalServerError, err)
switch err := errors.Cause(err).(type) {
case *btcjson.RPCError:
return httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, err)
default:
return err
}
return err
}
return nil
}

View File

@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
var (
log = logger.BackendLog.Logger("DTBS")
log = logger.Logger("DTBS")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

View File

@@ -71,7 +71,7 @@ func Connect() error {
User: cfg.RPCUser,
Pass: cfg.RPCPassword,
DisableTLS: cfg.DisableTLS,
RequestTimeout: time.Second * 5,
RequestTimeout: time.Second * 60,
}
if !cfg.DisableTLS {

16
apiserver/jsonrpc/log.go Normal file
View File

@@ -0,0 +1,16 @@
package jsonrpc
import (
"github.com/daglabs/btcd/apiserver/logger"
"github.com/daglabs/btcd/rpcclient"
"github.com/daglabs/btcd/util/panics"
)
var (
log = logger.BackendLog.Logger("RPCC")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)
func init() {
rpcclient.UseLogger(log, logger.BackendLog)
}

View File

@@ -6,6 +6,6 @@ import (
)
var (
log = logger.BackendLog.Logger("APIS")
log = logger.Logger("APIS")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

View File

@@ -3,11 +3,13 @@ package logger
import (
"fmt"
"github.com/daglabs/btcd/logs"
"github.com/pkg/errors"
"os"
)
// BackendLog is the logging backend used to create all subsystem loggers.
var BackendLog = logs.NewBackend()
var loggers []logs.Logger
// InitLog attaches log file and error log file to the backend log.
func InitLog(logFile, errLogFile string) {
@@ -22,3 +24,24 @@ func InitLog(logFile, errLogFile string) {
os.Exit(1)
}
}
// Logger returns a new logger for a particular subsystem that writes to
// BackendLog, and add it to a slice so it will be possible to access it
// later and change its log level
func Logger(subsystemTag string) logs.Logger {
logger := BackendLog.Logger(subsystemTag)
loggers = append(loggers, logger)
return logger
}
// SetLogLevels sets the logging level for all of the subsystems in the API server.
func SetLogLevels(level string) error {
lvl, ok := logs.LevelFromString(level)
if !ok {
return errors.Errorf("Invalid log level %s", level)
}
for _, logger := range loggers {
logger.SetLevel(lvl)
}
return nil
}

View File

@@ -21,7 +21,7 @@ import (
func main() {
defer panics.HandlePanic(log, logger.BackendLog, nil)
cfg, err := config.Parse()
err := config.Parse()
if err != nil {
errString := fmt.Sprintf("Error parsing command-line arguments: %s", err)
_, fErr := fmt.Fprintf(os.Stderr, errString)
@@ -31,7 +31,7 @@ func main() {
return
}
if cfg.Migrate {
if config.ActiveConfig().Migrate {
err := database.Migrate()
if err != nil {
panic(errors.Errorf("Error migrating database: %s", err))

View File

@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
var (
log = logger.BackendLog.Logger("MQTT")
log = logger.Logger("MQTT")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

View File

@@ -96,7 +96,7 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai
// PublishUnacceptedTransactionsNotifications publishes notification for each unaccepted transaction of the given chain-block
func PublishUnacceptedTransactionsNotifications(removedChainHashes []*daghash.Hash) error {
for _, removedHash := range removedChainHashes {
transactionIDs, err := controllers.GetAcceptedTransactionIDsByBlockHashHandler(removedHash)
transactionIDs, err := controllers.GetAcceptedTransactionIDsByBlockHashHandler(removedHash.String())
if err != nil {
return err
}

View File

@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
var (
log = logger.BackendLog.Logger("REST")
log = logger.Logger("REST")
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
)

View File

@@ -86,7 +86,11 @@ func convertQueryParamToInt(queryParams map[string]string, param string, default
if _, ok := queryParams[param]; ok {
intValue, err := strconv.Atoi(queryParams[param])
if err != nil {
return 0, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Wrap(err, fmt.Sprintf("Couldn't parse the '%s' query parameter", param)))
errorMessage := fmt.Sprintf("Couldn't parse the '%s' query parameter", param)
return 0, httpserverutils.NewHandlerErrorWithCustomClientMessage(
http.StatusUnprocessableEntity,
errors.Wrap(err, errorMessage),
errorMessage)
}
return intValue, nil
}
@@ -159,7 +163,8 @@ func getBlocksHandler(_ *httpserverutils.ServerContext, _ *http.Request, _ map[s
order := defaultGetBlocksOrder
if orderParamValue, ok := queryParams[queryParamOrder]; ok {
if orderParamValue != controllers.OrderAscending && orderParamValue != controllers.OrderDescending {
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf("'%s' is not a valid value for the '%s' query parameter", orderParamValue, queryParamLimit))
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf(
"Couldn't parse the '%s' query parameter", queryParamOrder))
}
order = orderParamValue
}

View File

@@ -24,24 +24,8 @@ import (
"github.com/pkg/errors"
)
var (
// pendingBlockAddedMsgs holds blockAddedMsgs in order of arrival
pendingBlockAddedMsgs []*jsonrpc.BlockAddedMsg
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
// missingBlocks is a map between missing block hashes and the time
// they were first found to be missing. If a block is still missing
// after blockMissingTimeout then it gets re-requested from the node.
missingBlocks = make(map[string]time.Time)
)
const (
// blockMissingTimeout is the amount of time after which a missing block
// gets re-requested from the node.
blockMissingTimeout = time.Second * 10
)
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
var pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
// startSync keeps the node and the API server in sync. On start, it downloads
// all data that's missing from the API server, and once it's done it keeps
@@ -57,7 +41,6 @@ func startSync(doneChan chan struct{}) error {
if err != nil {
return err
}
log.Infof("Finished syncing past data")
// Keep the node and the API server in sync
return sync(client, doneChan)
@@ -66,14 +49,17 @@ func startSync(doneChan chan struct{}) error {
// fetchInitialData downloads all data that's currently missing from
// the database.
func fetchInitialData(client *jsonrpc.Client) error {
log.Infof("Syncing past blocks")
err := syncBlocks(client)
if err != nil {
return err
}
log.Infof("Syncing past selected parent chain")
err = syncSelectedParentChain(client)
if err != nil {
return err
}
log.Infof("Finished syncing past data")
return nil
}
@@ -83,8 +69,7 @@ func sync(client *jsonrpc.Client, doneChan chan struct{}) error {
for {
select {
case blockAdded := <-client.OnBlockAdded:
enqueueBlockAddedMsg(blockAdded)
err := processBlockAddedMsgs(client)
err := handleBlockAddedMsg(client, blockAdded)
if err != nil {
return err
}
@@ -101,6 +86,13 @@ func sync(client *jsonrpc.Client, doneChan chan struct{}) error {
}
}
func stringPointerToString(str *string) string {
if str == nil {
return "<nil>"
}
return *str
}
// syncBlocks attempts to download all DAG blocks starting with
// the bluest block, and then inserts them into the database.
func syncBlocks(client *jsonrpc.Client) error {
@@ -115,6 +107,7 @@ func syncBlocks(client *jsonrpc.Client) error {
var rawBlocks []string
var verboseBlocks []btcjson.GetBlockVerboseResult
for {
log.Debugf("Calling getBlocks with start hash %v", stringPointerToString(startHash))
blocksResult, err := client.GetBlocks(true, true, startHash)
if err != nil {
return err
@@ -142,6 +135,7 @@ func syncSelectedParentChain(client *jsonrpc.Client) error {
}
for {
log.Debugf("Calling getChainFromBlock with start hash %s", stringPointerToString(startHash))
chainFromBlockResult, err := client.GetChainFromBlock(false, startHash)
if err != nil {
return err
@@ -190,23 +184,27 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) {
// fetchBlock downloads the serialized block and raw block data of
// the block with hash blockHash.
func fetchBlock(client *jsonrpc.Client, blockHash *daghash.Hash) (
rawBlock string, verboseBlock *btcjson.GetBlockVerboseResult, err error) {
*rawAndVerboseBlock, error) {
log.Debugf("Getting block %s from the RPC server", blockHash)
msgBlock, err := client.GetBlock(blockHash, nil)
if err != nil {
return "", nil, err
return nil, err
}
writer := bytes.NewBuffer(make([]byte, 0, msgBlock.SerializeSize()))
err = msgBlock.Serialize(writer)
if err != nil {
return "", nil, err
return nil, err
}
rawBlock = hex.EncodeToString(writer.Bytes())
rawBlock := hex.EncodeToString(writer.Bytes())
verboseBlock, err = client.GetBlockVerboseTx(blockHash, nil)
verboseBlock, err := client.GetBlockVerboseTx(blockHash, nil)
if err != nil {
return "", nil, err
return nil, err
}
return rawBlock, verboseBlock, nil
return &rawAndVerboseBlock{
rawBlock: rawBlock,
verboseBlock: verboseBlock,
}, nil
}
// addBlocks inserts data in the given rawBlocks and verboseBlocks pairwise
@@ -308,12 +306,6 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB
}
dbTx.Commit()
// If the block was previously missing, remove it from
// the missing blocks collection.
if _, ok := missingBlocks[verboseBlock.Hash]; ok {
delete(missingBlocks, verboseBlock.Hash)
}
return nil
}
@@ -926,126 +918,106 @@ func updateAddedChainBlocks(dbTx *gorm.DB, addedBlock *btcjson.ChainBlock) error
return nil
}
// enqueueBlockAddedMsg enqueues onBlockAdded messages to be handled later
func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) {
pendingBlockAddedMsgs = append(pendingBlockAddedMsgs, blockAdded)
type rawAndVerboseBlock struct {
rawBlock string
verboseBlock *btcjson.GetBlockVerboseResult
}
// processBlockAddedMsgs processes all pending onBlockAdded messages.
// Messages that cannot yet be processed are re-enqueued.
func processBlockAddedMsgs(client *jsonrpc.Client) error {
var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg
for _, blockAdded := range pendingBlockAddedMsgs {
missingHashes, err := missingParentHashes(blockAdded)
if err != nil {
return errors.Errorf("Could not resolve missing parents: %s", err)
}
for _, missingHash := range missingHashes {
err := handleMissingParent(client, missingHash)
if err != nil {
log.Errorf("Failed to handle missing parent block %s: %s",
missingHash, err)
}
}
if len(missingHashes) > 0 {
unprocessedBlockAddedMsgs = append(unprocessedBlockAddedMsgs, blockAdded)
continue
}
func (r *rawAndVerboseBlock) String() string {
return r.verboseBlock.Hash
}
handleBlockAddedMsg(client, blockAdded)
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) error {
blocks, err := fetchBlockAndMissingAncestors(client, blockAdded.Header.BlockHash())
if err != nil {
return err
}
for _, block := range blocks {
err = addBlock(client, block.rawBlock, *block.verboseBlock)
if err != nil {
return err
}
log.Infof("Added block %s", block.verboseBlock.Hash)
}
pendingBlockAddedMsgs = unprocessedBlockAddedMsgs
return nil
}
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) {
hash := blockAdded.Header.BlockHash()
log.Debugf("Getting block %s from the RPC server", hash)
rawBlock, verboseBlock, err := fetchBlock(client, hash)
if err != nil {
log.Warnf("Could not fetch block %s: %s", hash, err)
return
}
err = addBlock(client, rawBlock, *verboseBlock)
if err != nil {
log.Errorf("Could not insert block %s: %s", hash, err)
return
}
log.Infof("Added block %s", hash)
}
func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) {
db, err := database.DB()
func fetchBlockAndMissingAncestors(client *jsonrpc.Client, blockHash *daghash.Hash) ([]*rawAndVerboseBlock, error) {
block, err := fetchBlock(client, blockHash)
if err != nil {
return nil, err
}
pendingBlocks := []*rawAndVerboseBlock{block}
blocksToAdd := make([]*rawAndVerboseBlock, 0)
blocksToAddSet := make(map[string]struct{})
for len(pendingBlocks) > 0 {
var currentBlock *rawAndVerboseBlock
currentBlock, pendingBlocks = pendingBlocks[0], pendingBlocks[1:]
missingHashes, err := missingParentHashes(currentBlock.verboseBlock.ParentHashes)
if err != nil {
return nil, err
}
blocksToPrependToPending := make([]*rawAndVerboseBlock, 0, len(missingHashes))
for _, missingHash := range missingHashes {
if _, ok := blocksToAddSet[missingHash]; ok {
continue
}
hash, err := daghash.NewHashFromStr(missingHash)
if err != nil {
return nil, err
}
block, err := fetchBlock(client, hash)
if err != nil {
return nil, err
}
blocksToPrependToPending = append(blocksToPrependToPending, block)
}
if len(blocksToPrependToPending) == 0 {
blocksToAddSet[currentBlock.verboseBlock.Hash] = struct{}{}
blocksToAdd = append(blocksToAdd, currentBlock)
continue
}
log.Debugf("Found %s missing parents for block %s and fetched them", blocksToPrependToPending, currentBlock)
blocksToPrependToPending = append(blocksToPrependToPending, currentBlock)
pendingBlocks = append(blocksToPrependToPending, pendingBlocks...)
}
return blocksToAdd, nil
}
// Collect all referenced parent hashes
hashesIn := make([]string, 0, len(blockAdded.Header.ParentHashes))
for _, hash := range blockAdded.Header.ParentHashes {
hashesIn = append(hashesIn, hash.String())
func missingParentHashes(parentHashes []string) ([]string, error) {
db, err := database.DB()
if err != nil {
return nil, err
}
// Make sure that all the parent hashes exist in the database
var dbParentBlocks []dbmodels.Block
dbResult := db.
Model(&dbmodels.Block{}).
Where("block_hash in (?)", hashesIn).
Where("block_hash in (?)", parentHashes).
Find(&dbParentBlocks)
dbErrors := dbResult.GetErrors()
if httpserverutils.HasDBError(dbErrors) {
return nil, httpserverutils.NewErrorFromDBErrors("failed to find parent blocks: ", dbErrors)
}
if len(hashesIn) != len(dbParentBlocks) {
if len(parentHashes) != len(dbParentBlocks) {
// Some parent hashes are missing. Collect and return them
var missingParentHashes []string
var missingHashes []string
outerLoop:
for _, hash := range hashesIn {
for _, hash := range parentHashes {
for _, dbParentBlock := range dbParentBlocks {
if dbParentBlock.BlockHash == hash {
continue outerLoop
}
}
missingParentHashes = append(missingParentHashes, hash)
missingHashes = append(missingHashes, hash)
}
return missingParentHashes, nil
return missingHashes, nil
}
return nil, nil
}
// handleMissingParent handles missing parent block hashes as follows:
// a. If it's the first time we've encountered this block hash, add it to
// the missing blocks collection with time = now
// b. Otherwise, if time + blockMissingTimeout < now (that is to say,
// blockMissingTimeout had elapsed) then request the block from the
// node
func handleMissingParent(client *jsonrpc.Client, missingParentHash string) error {
firstMissingTime, ok := missingBlocks[missingParentHash]
if !ok {
log.Infof("Parent block %s is missing", missingParentHash)
missingBlocks[missingParentHash] = time.Now()
return nil
}
if firstMissingTime.Add(blockMissingTimeout).Before(time.Now()) {
hash, err := daghash.NewHashFromStr(missingParentHash)
if err != nil {
return errors.Errorf("Could not create hash: %s", err)
}
rawBlock, verboseBlock, err := fetchBlock(client, hash)
if err != nil {
return errors.Errorf("Could not fetch block %s: %s", hash, err)
}
err = addBlock(client, rawBlock, *verboseBlock)
if err != nil {
return errors.Errorf("Could not insert block %s: %s", hash, err)
}
log.Infof("Parent block %s was fetched after missing for over %s",
missingParentHash, blockMissingTimeout)
}
return nil
}
// enqueueChainChangedMsg enqueues onChainChanged messages to be handled later
func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) {
pendingChainChangedMsgs = append(pendingChainChangedMsgs, chainChanged)

View File

@@ -195,7 +195,7 @@ func (dag *BlockDAG) IsKnownOrphan(hash *daghash.Hash) bool {
// GetOrphanMissingAncestorHashes returns all of the missing parents in the orphan's sub-DAG
//
// This function is safe for concurrent access.
func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*daghash.Hash, error) {
func (dag *BlockDAG) GetOrphanMissingAncestorHashes(orphanHash *daghash.Hash) ([]*daghash.Hash, error) {
// Protect concurrent access. Using a read lock only so multiple
// readers can query without blocking each other.
dag.orphanLock.RLock()
@@ -204,7 +204,7 @@ func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*dagh
missingAncestorsHashes := make([]*daghash.Hash, 0)
visited := make(map[daghash.Hash]bool)
queue := []*daghash.Hash{hash}
queue := []*daghash.Hash{orphanHash}
for len(queue) > 0 {
var current *daghash.Hash
current, queue = queue[0], queue[1:]
@@ -216,7 +216,7 @@ func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*dagh
queue = append(queue, parentHash)
}
} else {
if !dag.BlockExists(current) {
if !dag.BlockExists(current) && current != orphanHash {
missingAncestorsHashes = append(missingAncestorsHashes, current)
}
}

View File

@@ -7,6 +7,7 @@ package main
import (
"fmt"
"github.com/daglabs/btcd/config"
"github.com/pkg/errors"
"io/ioutil"
"net"
"os"
@@ -198,13 +199,16 @@ func loadConfig() (*ConfigFlags, []string, error) {
os.Exit(0)
}
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
// Use config file for RPC server to create default btcctl config
serverConfigPath := filepath.Join(btcdHomeDir, "btcd.conf")
err := createDefaultConfigFile(preCfg.ConfigFile, serverConfigPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating a default config file: %s\n", err)
// If no rpc user and password were configured, create
// a btcctl default config file based on the rpc login
// details written in the RPC server configuration file
if preCfg.RPCUser == "" && preCfg.RPCPassword == "" {
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
serverConfigPath := filepath.Join(btcdHomeDir, "btcd.conf")
err := createDefaultConfigFile(preCfg.ConfigFile, serverConfigPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating a default config file: %s\n", err)
}
}
}
@@ -250,6 +254,9 @@ func loadConfig() (*ConfigFlags, []string, error) {
func createDefaultConfigFile(destinationPath, serverConfigPath string) error {
// Read the RPC server config
serverConfigFile, err := os.Open(serverConfigPath)
if os.IsNotExist(err) {
return errors.Errorf("the RPC server configuration file could not be found at %s", serverConfigPath)
}
if err != nil {
return err
}

View File

@@ -32,15 +32,16 @@ import (
)
const (
defaultConfigFilename = "btcd.conf"
defaultDataDirname = "data"
defaultLogLevel = "info"
defaultLogDirname = "logs"
defaultLogFilename = "btcd.log"
defaultErrLogFilename = "btcd_err.log"
defaultMaxPeers = 125
defaultBanDuration = time.Hour * 24
defaultBanThreshold = 100
defaultConfigFilename = "btcd.conf"
defaultDataDirname = "data"
defaultLogLevel = "info"
defaultLogDirname = "logs"
defaultLogFilename = "btcd.log"
defaultErrLogFilename = "btcd_err.log"
defaultTargetOutboundPeers = 8
defaultMaxInboundPeers = 117
defaultBanDuration = time.Hour * 24
defaultBanThreshold = 100
//DefaultConnectTimeout is the default connection timeout when dialing
DefaultConnectTimeout = time.Second * 30
defaultMaxRPCClients = 10
@@ -101,7 +102,8 @@ type Flags struct {
ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"`
DisableListen bool `long:"nolisten" description:"Disable listening for incoming connections -- NOTE: Listening is automatically disabled if the --connect or --proxy options are used without also specifying listen interfaces via --listen"`
Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 8333, testnet: 18333)"`
MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"`
TargetOutboundPeers int `long:"outpeers" description:"Target number of outbound peers"`
MaxInboundPeers int `long:"maxinpeers" description:"Max number of inbound peers"`
DisableBanning bool `long:"nobanning" description:"Disable banning of misbehaving peers"`
BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"`
BanThreshold uint32 `long:"banthreshold" description:"Maximum allowed ban score before disconnecting and banning misbehaving peers."`
@@ -296,7 +298,8 @@ func loadConfig() (*Config, []string, error) {
cfgFlags := Flags{
ConfigFile: defaultConfigFile,
DebugLevel: defaultLogLevel,
MaxPeers: defaultMaxPeers,
TargetOutboundPeers: defaultTargetOutboundPeers,
MaxInboundPeers: defaultMaxInboundPeers,
BanDuration: defaultBanDuration,
BanThreshold: defaultBanThreshold,
RPCMaxClients: defaultMaxRPCClients,
@@ -417,6 +420,24 @@ func loadConfig() (*Config, []string, error) {
return nil, nil, err
}
if !activeConfig.DisableRPC {
if activeConfig.RPCUser == "" {
str := "%s: rpcuser cannot be empty"
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
if activeConfig.RPCPass == "" {
str := "%s: rpcpass cannot be empty"
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
}
err = activeConfig.ResolveNetwork(parser)
if err != nil {
return nil, nil, err

View File

@@ -7,11 +7,12 @@ package connmgr
import (
nativeerrors "errors"
"fmt"
"github.com/pkg/errors"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
)
// maxFailedAttempts is the maximum number of successive failed connection
@@ -39,9 +40,9 @@ var (
//ErrDialNil is used to indicate that Dial cannot be nil in the configuration.
ErrDialNil = errors.New("Config: Dial cannot be nil")
// ErrMaxPeers is an error that is thrown when the max amount of peers had
// ErrMaxOutboundPeers is an error that is thrown when the max amount of peers had
// been reached.
ErrMaxPeers = errors.New("max peers reached")
ErrMaxOutboundPeers = errors.New("max outbound peers reached")
// ErrAlreadyConnected is an error that is thrown if the peer is already
// connected.

View File

@@ -7,12 +7,13 @@ package netsync
import (
"container/list"
"fmt"
"github.com/pkg/errors"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/dagconfig"
"github.com/daglabs/btcd/database"
@@ -1290,16 +1291,13 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
}
})
// Don't relay if we are not current or the block was just now unorphaned.
// Other peers that are current should already know about it
if !sm.current() || data.WasUnorphaned {
return
// Relay if we are current and the block was not just now unorphaned.
// Otherwise peers that are current should already know about it
if sm.current() && !data.WasUnorphaned {
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
}
// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
for msg := range ch {
sm.peerNotifier.TransactionConfirmed(msg.Tx)
sm.peerNotifier.AnnounceNewTransactions(msg.AcceptedTxs)

View File

@@ -782,7 +782,11 @@ func newFutureError(err error) chan *response {
func receiveFuture(f chan *response) ([]byte, error) {
// Wait for a response on the returned channel.
r := <-f
return r.result, r.err
var err error
if r.err != nil {
err = errors.Wrap(r.err, "got error from response channel")
}
return r.result, err
}
// sendPost sends the passed request to the server by issuing an HTTP POST

View File

@@ -10,7 +10,6 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"math"
"net"
"runtime"
@@ -21,6 +20,8 @@ import (
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/daglabs/btcd/addrmgr"
@@ -52,9 +53,6 @@ const (
// required to be supported by outbound peers.
defaultRequiredServices = wire.SFNodeNetwork
// defaultTargetOutbound is the default number of outbound peers to target.
defaultTargetOutbound = 8
// connectionRetryInterval is the base amount of time to wait in between
// retries when connecting to persistent peers. It is adjusted by the
// number of retries such that there is a retry backoff.
@@ -190,7 +188,15 @@ type peerState struct {
// Count returns the count of all known peers.
func (ps *peerState) Count() int {
return len(ps.inboundPeers) + len(ps.outboundPeers) +
return ps.countInboundPeers() + ps.countOutboundPeers()
}
func (ps *peerState) countInboundPeers() int {
return len(ps.inboundPeers)
}
func (ps *peerState) countOutboundPeers() int {
return len(ps.outboundPeers) +
len(ps.persistentPeers)
}
@@ -276,12 +282,17 @@ type Server struct {
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan struct{}
nat serverutils.NAT
db database.DB
TimeSource blockdag.MedianTimeSource
services wire.ServiceFlag
// We add to quitWaitGroup before every instance in which we wait for
// the quit channel so that all those instances finish before we shut
// down the managers (connManager, addrManager, etc),
quitWaitGroup sync.WaitGroup
quit chan struct{}
// The following fields are used for optional indexes. They will be nil
// if the associated index is not enabled. These fields are set during
// initial creation of the server and never changed afterwards, so they
@@ -687,12 +698,10 @@ func (s *Server) handleAddPeerMsg(state *peerState, sp *Peer) bool {
// TODO: Check for max peers from a single IP.
// Limit max number of total peers.
if state.Count() >= config.ActiveConfig().MaxPeers {
srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s",
config.ActiveConfig().MaxPeers, sp)
if sp.Inbound() && len(state.inboundPeers) >= config.ActiveConfig().MaxInboundPeers {
srvrLog.Infof("Max inbound peers reached [%d] - disconnecting peer %s",
config.ActiveConfig().MaxInboundPeers, sp)
sp.Disconnect()
// TODO: how to handle permanent peers here?
// they should be rescheduled.
return false
}
@@ -938,8 +947,8 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) {
case ConnectNodeMsg:
// TODO: duplicate oneshots?
// Limit max number of total peers.
if state.Count() >= config.ActiveConfig().MaxPeers {
msg.Reply <- connmgr.ErrMaxPeers
if state.countOutboundPeers() >= config.ActiveConfig().TargetOutboundPeers {
msg.Reply <- connmgr.ErrMaxOutboundPeers
return
}
for _, peer := range state.persistentPeers {
@@ -1166,6 +1175,8 @@ func (s *Server) peerHandler() {
s.addrManager.Start()
s.SyncManager.Start()
s.quitWaitGroup.Add(1)
srvrLog.Tracef("Starting peer handler")
state := &peerState{
@@ -1231,6 +1242,7 @@ out:
sp.Disconnect()
return true
})
s.quitWaitGroup.Done()
break out
case opcMsg := <-s.newOutboundConnection:
@@ -1238,6 +1250,10 @@ out:
}
}
// Wait for all p2p server quit jobs to finish before stopping the
// various managers
s.quitWaitGroup.Wait()
s.connManager.Stop()
s.SyncManager.Stop()
s.addrManager.Stop()
@@ -1340,6 +1356,8 @@ func (s *Server) rebroadcastHandler() {
timer := time.NewTimer(5 * time.Minute)
pendingInvs := make(map[wire.InvVect]interface{})
s.quitWaitGroup.Add(1)
out:
for {
select {
@@ -1387,6 +1405,7 @@ cleanup:
break cleanup
}
}
s.quitWaitGroup.Done()
s.wg.Done()
}
@@ -1524,6 +1543,9 @@ func (s *Server) upnpUpdateThread() {
timer := time.NewTimer(0 * time.Second)
lport, _ := strconv.ParseInt(config.ActiveConfig().NetParams().DefaultPort, 10, 16)
first := true
s.quitWaitGroup.Add(1)
out:
for {
select {
@@ -1569,6 +1591,7 @@ out:
srvrLog.Debugf("successfully disestablished UPnP port mapping")
}
s.quitWaitGroup.Done()
s.wg.Done()
}
@@ -1599,18 +1622,20 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
}
}
maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers
s := Server{
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
donePeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
banPeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, config.ActiveConfig().MaxPeers),
broadcast: make(chan broadcastMsg, config.ActiveConfig().MaxPeers),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
newOutboundConnection: make(chan *outboundPeerConnectedMsg),
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers),
nat: nat,
db: db,
TimeSource: blockdag.NewMedianTime(),
@@ -1714,7 +1739,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
TxMemPool: s.TxMemPool,
ChainParams: s.DAGParams,
DisableCheckpoints: cfg.DisableCheckpoints,
MaxPeers: cfg.MaxPeers,
MaxPeers: maxPeers,
})
if err != nil {
return nil, err
@@ -1772,15 +1797,11 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
}
// Create a connection manager.
targetOutbound := defaultTargetOutbound
if config.ActiveConfig().MaxPeers < targetOutbound {
targetOutbound = config.ActiveConfig().MaxPeers
}
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(targetOutbound),
TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers),
Dial: serverutils.BTCDDial,
OnConnection: func(c *connmgr.ConnReq, conn net.Conn) {
s.newOutboundConnection <- &outboundPeerConnectedMsg{

View File

@@ -275,21 +275,8 @@ func buildGetBlockVerboseResult(s *Server, block *util.Block, isVerboseTx bool)
txns := block.Transactions()
rawTxns := make([]btcjson.TxRawResult, len(txns))
for i, tx := range txns {
var acceptingBlock *daghash.Hash
var confirmations *uint64
if s.cfg.TxIndex != nil {
acceptingBlock, err = s.cfg.TxIndex.BlockThatAcceptedTx(s.cfg.DAG, tx.ID())
if err != nil {
return nil, err
}
txConfirmations, err := txConfirmationsNoLock(s, tx.ID())
if err != nil {
return nil, err
}
confirmations = &txConfirmations
}
rawTxn, err := createTxRawResult(params, tx.MsgTx(), tx.ID().String(),
&blockHeader, hash.String(), acceptingBlock, confirmations, false)
&blockHeader, hash.String(), nil, nil, false)
if err != nil {
return nil, err
}

View File

@@ -1,9 +1,5 @@
package testutil
import (
"fmt"
)
// AreErrorsEqual returns whether errors have the same type
// and same error string from .Error().
func AreErrorsEqual(err1, err2 error) bool {
@@ -13,8 +9,5 @@ func AreErrorsEqual(err1, err2 error) bool {
if err1 == nil && err2 != nil {
return false
}
if fmt.Sprintf("%T", err1) != fmt.Sprintf("%T", err2) {
return false
}
return err1.Error() == err2.Error()
}

View File

@@ -191,7 +191,7 @@ func Decode(dst *Hash, src string) error {
var reversedHash Hash
_, err := hex.Decode(reversedHash[HashSize-hex.DecodedLen(len(srcBytes)):], srcBytes)
if err != nil {
return err
return errors.WithStack(err)
}
// Reverse copy from the temporary hash to destination. Because the

View File

@@ -7,6 +7,7 @@ package daghash
import (
"bytes"
"encoding/hex"
"github.com/daglabs/btcd/testutil"
"math/big"
"reflect"
"testing"
@@ -214,7 +215,7 @@ func TestNewHashFromStr(t *testing.T) {
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
result, err := NewHashFromStr(test.in)
if err != test.err {
if !testutil.AreErrorsEqual(err, test.err) {
t.Errorf(unexpectedErrStr, i, err, test.err)
continue
} else if err != nil {