mirror of
https://github.com/kaspanet/kaspad.git
synced 2026-02-27 05:33:18 +00:00
Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e68b242243 | ||
|
|
9cc2a7260b | ||
|
|
bcd73012de | ||
|
|
1fea2a9421 | ||
|
|
bb7d68deda | ||
|
|
3ab861227d | ||
|
|
8f0d98ef9b | ||
|
|
dbd8bf3d2c | ||
|
|
1b6b02e0d2 | ||
|
|
2402bae1ff | ||
|
|
3dcf8d88b8 | ||
|
|
dbf9c09a2e | ||
|
|
5e9fc2defc | ||
|
|
bdc3cbceaa | ||
|
|
a71528fefb | ||
|
|
6725742d2c |
@@ -30,6 +30,7 @@ func ActiveConfig() *Config {
|
||||
// Config defines the configuration options for the API server.
|
||||
type Config struct {
|
||||
LogDir string `long:"logdir" description:"Directory to log output."`
|
||||
DebugLevel string `short:"d" long:"debuglevel" description:"Set log level {trace, debug, info, warn, error, critical}"`
|
||||
RPCUser string `short:"u" long:"rpcuser" description:"RPC username"`
|
||||
RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"`
|
||||
RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"`
|
||||
@@ -48,7 +49,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
// Parse parses the CLI arguments and returns a config struct.
|
||||
func Parse() (*Config, error) {
|
||||
func Parse() error {
|
||||
activeConfig = &Config{
|
||||
LogDir: defaultLogDir,
|
||||
DBAddress: defaultDBAddress,
|
||||
@@ -57,42 +58,49 @@ func Parse() (*Config, error) {
|
||||
parser := flags.NewParser(activeConfig, flags.PrintErrors|flags.HelpFlag)
|
||||
_, err := parser.Parse()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if !activeConfig.Migrate {
|
||||
if activeConfig.RPCUser == "" {
|
||||
return nil, errors.New("--rpcuser is required if --migrate flag is not used")
|
||||
return errors.New("--rpcuser is required if --migrate flag is not used")
|
||||
}
|
||||
if activeConfig.RPCPassword == "" {
|
||||
return nil, errors.New("--rpcpass is required if --migrate flag is not used")
|
||||
return errors.New("--rpcpass is required if --migrate flag is not used")
|
||||
}
|
||||
if activeConfig.RPCServer == "" {
|
||||
return nil, errors.New("--rpcserver is required if --migrate flag is not used")
|
||||
return errors.New("--rpcserver is required if --migrate flag is not used")
|
||||
}
|
||||
}
|
||||
|
||||
if activeConfig.RPCCert == "" && !activeConfig.DisableTLS {
|
||||
return nil, errors.New("--notls has to be disabled if --cert is used")
|
||||
return errors.New("--notls has to be disabled if --cert is used")
|
||||
}
|
||||
|
||||
if activeConfig.RPCCert != "" && activeConfig.DisableTLS {
|
||||
return nil, errors.New("--cert should be omitted if --notls is used")
|
||||
return errors.New("--cert should be omitted if --notls is used")
|
||||
}
|
||||
|
||||
if (activeConfig.MQTTBrokerAddress != "" || activeConfig.MQTTUser != "" || activeConfig.MQTTPassword != "") &&
|
||||
(activeConfig.MQTTBrokerAddress == "" || activeConfig.MQTTUser == "" || activeConfig.MQTTPassword == "") {
|
||||
return nil, errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
|
||||
return errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
|
||||
}
|
||||
|
||||
err = activeConfig.ResolveNetwork(parser)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
logFile := filepath.Join(activeConfig.LogDir, defaultLogFilename)
|
||||
errLogFile := filepath.Join(activeConfig.LogDir, defaultErrLogFilename)
|
||||
logger.InitLog(logFile, errLogFile)
|
||||
|
||||
return activeConfig, nil
|
||||
if activeConfig.DebugLevel != "" {
|
||||
err := logger.SetLogLevels(activeConfig.DebugLevel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -54,8 +54,9 @@ func GetBlockByHashHandler(blockHash string) (interface{}, error) {
|
||||
|
||||
// GetBlocksHandler searches for all blocks
|
||||
func GetBlocksHandler(order string, skip uint64, limit uint64) (interface{}, error) {
|
||||
if limit > maxGetBlocksLimit {
|
||||
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf("The maximum allowed value for the limit is %d", maxGetBlocksLimit))
|
||||
if limit < 1 || limit > maxGetBlocksLimit {
|
||||
return nil, httpserverutils.NewHandlerError(http.StatusBadRequest,
|
||||
errors.Errorf("Limit higher than %d or lower than 1 was requested.", maxGetTransactionsLimit))
|
||||
}
|
||||
blocks := []*dbmodels.Block{}
|
||||
db, err := database.DB()
|
||||
@@ -82,7 +83,7 @@ func GetBlocksHandler(order string, skip uint64, limit uint64) (interface{}, err
|
||||
}
|
||||
|
||||
// GetAcceptedTransactionIDsByBlockHashHandler returns an array of transaction IDs for a given block hash
|
||||
func GetAcceptedTransactionIDsByBlockHashHandler(blockHash *daghash.Hash) ([]string, error) {
|
||||
func GetAcceptedTransactionIDsByBlockHashHandler(blockHash string) ([]string, error) {
|
||||
db, err := database.DB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -79,9 +79,9 @@ func GetTransactionByHashHandler(txHash string) (interface{}, error) {
|
||||
// GetTransactionsByAddressHandler searches for all transactions
|
||||
// where the given address is either an input or an output.
|
||||
func GetTransactionsByAddressHandler(address string, skip uint64, limit uint64) (interface{}, error) {
|
||||
if limit > maxGetTransactionsLimit {
|
||||
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity,
|
||||
errors.Errorf("The maximum allowed value for the limit is %d", maxGetTransactionsLimit))
|
||||
if limit < 1 || limit > maxGetTransactionsLimit {
|
||||
return nil, httpserverutils.NewHandlerError(http.StatusBadRequest,
|
||||
errors.Errorf("Limit higher than %d or lower than 1 was requested.", maxGetTransactionsLimit))
|
||||
}
|
||||
|
||||
db, err := database.DB()
|
||||
@@ -278,12 +278,13 @@ func PostTransaction(requestBody []byte) error {
|
||||
|
||||
_, err = client.SendRawTransaction(tx, true)
|
||||
if err != nil {
|
||||
if rpcErr, ok := err.(*btcjson.RPCError); ok && rpcErr.Code == btcjson.ErrRPCVerify {
|
||||
return httpserverutils.NewHandlerError(http.StatusInternalServerError, err)
|
||||
switch err := errors.Cause(err).(type) {
|
||||
case *btcjson.RPCError:
|
||||
return httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, err)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
|
||||
import "github.com/daglabs/btcd/apiserver/logger"
|
||||
|
||||
var (
|
||||
log = logger.BackendLog.Logger("DTBS")
|
||||
log = logger.Logger("DTBS")
|
||||
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
|
||||
)
|
||||
|
||||
@@ -71,7 +71,7 @@ func Connect() error {
|
||||
User: cfg.RPCUser,
|
||||
Pass: cfg.RPCPassword,
|
||||
DisableTLS: cfg.DisableTLS,
|
||||
RequestTimeout: time.Second * 5,
|
||||
RequestTimeout: time.Second * 60,
|
||||
}
|
||||
|
||||
if !cfg.DisableTLS {
|
||||
|
||||
16
apiserver/jsonrpc/log.go
Normal file
16
apiserver/jsonrpc/log.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"github.com/daglabs/btcd/apiserver/logger"
|
||||
"github.com/daglabs/btcd/rpcclient"
|
||||
"github.com/daglabs/btcd/util/panics"
|
||||
)
|
||||
|
||||
var (
|
||||
log = logger.BackendLog.Logger("RPCC")
|
||||
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
|
||||
)
|
||||
|
||||
func init() {
|
||||
rpcclient.UseLogger(log, logger.BackendLog)
|
||||
}
|
||||
@@ -6,6 +6,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
log = logger.BackendLog.Logger("APIS")
|
||||
log = logger.Logger("APIS")
|
||||
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
|
||||
)
|
||||
|
||||
@@ -3,11 +3,13 @@ package logger
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/daglabs/btcd/logs"
|
||||
"github.com/pkg/errors"
|
||||
"os"
|
||||
)
|
||||
|
||||
// BackendLog is the logging backend used to create all subsystem loggers.
|
||||
var BackendLog = logs.NewBackend()
|
||||
var loggers []logs.Logger
|
||||
|
||||
// InitLog attaches log file and error log file to the backend log.
|
||||
func InitLog(logFile, errLogFile string) {
|
||||
@@ -22,3 +24,24 @@ func InitLog(logFile, errLogFile string) {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// Logger returns a new logger for a particular subsystem that writes to
|
||||
// BackendLog, and add it to a slice so it will be possible to access it
|
||||
// later and change its log level
|
||||
func Logger(subsystemTag string) logs.Logger {
|
||||
logger := BackendLog.Logger(subsystemTag)
|
||||
loggers = append(loggers, logger)
|
||||
return logger
|
||||
}
|
||||
|
||||
// SetLogLevels sets the logging level for all of the subsystems in the API server.
|
||||
func SetLogLevels(level string) error {
|
||||
lvl, ok := logs.LevelFromString(level)
|
||||
if !ok {
|
||||
return errors.Errorf("Invalid log level %s", level)
|
||||
}
|
||||
for _, logger := range loggers {
|
||||
logger.SetLevel(lvl)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
func main() {
|
||||
defer panics.HandlePanic(log, logger.BackendLog, nil)
|
||||
|
||||
cfg, err := config.Parse()
|
||||
err := config.Parse()
|
||||
if err != nil {
|
||||
errString := fmt.Sprintf("Error parsing command-line arguments: %s", err)
|
||||
_, fErr := fmt.Fprintf(os.Stderr, errString)
|
||||
@@ -31,7 +31,7 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
if cfg.Migrate {
|
||||
if config.ActiveConfig().Migrate {
|
||||
err := database.Migrate()
|
||||
if err != nil {
|
||||
panic(errors.Errorf("Error migrating database: %s", err))
|
||||
|
||||
@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
|
||||
import "github.com/daglabs/btcd/apiserver/logger"
|
||||
|
||||
var (
|
||||
log = logger.BackendLog.Logger("MQTT")
|
||||
log = logger.Logger("MQTT")
|
||||
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
|
||||
)
|
||||
|
||||
@@ -96,7 +96,7 @@ func PublishAcceptedTransactionsNotifications(addedChainBlocks []*rpcclient.Chai
|
||||
// PublishUnacceptedTransactionsNotifications publishes notification for each unaccepted transaction of the given chain-block
|
||||
func PublishUnacceptedTransactionsNotifications(removedChainHashes []*daghash.Hash) error {
|
||||
for _, removedHash := range removedChainHashes {
|
||||
transactionIDs, err := controllers.GetAcceptedTransactionIDsByBlockHashHandler(removedHash)
|
||||
transactionIDs, err := controllers.GetAcceptedTransactionIDsByBlockHashHandler(removedHash.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -4,6 +4,6 @@ import "github.com/daglabs/btcd/util/panics"
|
||||
import "github.com/daglabs/btcd/apiserver/logger"
|
||||
|
||||
var (
|
||||
log = logger.BackendLog.Logger("REST")
|
||||
log = logger.Logger("REST")
|
||||
spawn = panics.GoroutineWrapperFunc(log, logger.BackendLog)
|
||||
)
|
||||
|
||||
@@ -86,7 +86,11 @@ func convertQueryParamToInt(queryParams map[string]string, param string, default
|
||||
if _, ok := queryParams[param]; ok {
|
||||
intValue, err := strconv.Atoi(queryParams[param])
|
||||
if err != nil {
|
||||
return 0, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Wrap(err, fmt.Sprintf("Couldn't parse the '%s' query parameter", param)))
|
||||
errorMessage := fmt.Sprintf("Couldn't parse the '%s' query parameter", param)
|
||||
return 0, httpserverutils.NewHandlerErrorWithCustomClientMessage(
|
||||
http.StatusUnprocessableEntity,
|
||||
errors.Wrap(err, errorMessage),
|
||||
errorMessage)
|
||||
}
|
||||
return intValue, nil
|
||||
}
|
||||
@@ -159,7 +163,8 @@ func getBlocksHandler(_ *httpserverutils.ServerContext, _ *http.Request, _ map[s
|
||||
order := defaultGetBlocksOrder
|
||||
if orderParamValue, ok := queryParams[queryParamOrder]; ok {
|
||||
if orderParamValue != controllers.OrderAscending && orderParamValue != controllers.OrderDescending {
|
||||
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf("'%s' is not a valid value for the '%s' query parameter", orderParamValue, queryParamLimit))
|
||||
return nil, httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, errors.Errorf(
|
||||
"Couldn't parse the '%s' query parameter", queryParamOrder))
|
||||
}
|
||||
order = orderParamValue
|
||||
}
|
||||
|
||||
@@ -24,24 +24,8 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// pendingBlockAddedMsgs holds blockAddedMsgs in order of arrival
|
||||
pendingBlockAddedMsgs []*jsonrpc.BlockAddedMsg
|
||||
|
||||
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
|
||||
pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
|
||||
|
||||
// missingBlocks is a map between missing block hashes and the time
|
||||
// they were first found to be missing. If a block is still missing
|
||||
// after blockMissingTimeout then it gets re-requested from the node.
|
||||
missingBlocks = make(map[string]time.Time)
|
||||
)
|
||||
|
||||
const (
|
||||
// blockMissingTimeout is the amount of time after which a missing block
|
||||
// gets re-requested from the node.
|
||||
blockMissingTimeout = time.Second * 10
|
||||
)
|
||||
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
|
||||
var pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
|
||||
|
||||
// startSync keeps the node and the API server in sync. On start, it downloads
|
||||
// all data that's missing from the API server, and once it's done it keeps
|
||||
@@ -57,7 +41,6 @@ func startSync(doneChan chan struct{}) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Finished syncing past data")
|
||||
|
||||
// Keep the node and the API server in sync
|
||||
return sync(client, doneChan)
|
||||
@@ -66,14 +49,17 @@ func startSync(doneChan chan struct{}) error {
|
||||
// fetchInitialData downloads all data that's currently missing from
|
||||
// the database.
|
||||
func fetchInitialData(client *jsonrpc.Client) error {
|
||||
log.Infof("Syncing past blocks")
|
||||
err := syncBlocks(client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Syncing past selected parent chain")
|
||||
err = syncSelectedParentChain(client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Finished syncing past data")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -83,8 +69,7 @@ func sync(client *jsonrpc.Client, doneChan chan struct{}) error {
|
||||
for {
|
||||
select {
|
||||
case blockAdded := <-client.OnBlockAdded:
|
||||
enqueueBlockAddedMsg(blockAdded)
|
||||
err := processBlockAddedMsgs(client)
|
||||
err := handleBlockAddedMsg(client, blockAdded)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -101,6 +86,13 @@ func sync(client *jsonrpc.Client, doneChan chan struct{}) error {
|
||||
}
|
||||
}
|
||||
|
||||
func stringPointerToString(str *string) string {
|
||||
if str == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
return *str
|
||||
}
|
||||
|
||||
// syncBlocks attempts to download all DAG blocks starting with
|
||||
// the bluest block, and then inserts them into the database.
|
||||
func syncBlocks(client *jsonrpc.Client) error {
|
||||
@@ -115,6 +107,7 @@ func syncBlocks(client *jsonrpc.Client) error {
|
||||
var rawBlocks []string
|
||||
var verboseBlocks []btcjson.GetBlockVerboseResult
|
||||
for {
|
||||
log.Debugf("Calling getBlocks with start hash %v", stringPointerToString(startHash))
|
||||
blocksResult, err := client.GetBlocks(true, true, startHash)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -142,6 +135,7 @@ func syncSelectedParentChain(client *jsonrpc.Client) error {
|
||||
}
|
||||
|
||||
for {
|
||||
log.Debugf("Calling getChainFromBlock with start hash %s", stringPointerToString(startHash))
|
||||
chainFromBlockResult, err := client.GetChainFromBlock(false, startHash)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -190,23 +184,27 @@ func findHashOfBluestBlock(mustBeChainBlock bool) (*string, error) {
|
||||
// fetchBlock downloads the serialized block and raw block data of
|
||||
// the block with hash blockHash.
|
||||
func fetchBlock(client *jsonrpc.Client, blockHash *daghash.Hash) (
|
||||
rawBlock string, verboseBlock *btcjson.GetBlockVerboseResult, err error) {
|
||||
*rawAndVerboseBlock, error) {
|
||||
log.Debugf("Getting block %s from the RPC server", blockHash)
|
||||
msgBlock, err := client.GetBlock(blockHash, nil)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, err
|
||||
}
|
||||
writer := bytes.NewBuffer(make([]byte, 0, msgBlock.SerializeSize()))
|
||||
err = msgBlock.Serialize(writer)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, err
|
||||
}
|
||||
rawBlock = hex.EncodeToString(writer.Bytes())
|
||||
rawBlock := hex.EncodeToString(writer.Bytes())
|
||||
|
||||
verboseBlock, err = client.GetBlockVerboseTx(blockHash, nil)
|
||||
verboseBlock, err := client.GetBlockVerboseTx(blockHash, nil)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
return nil, err
|
||||
}
|
||||
return rawBlock, verboseBlock, nil
|
||||
return &rawAndVerboseBlock{
|
||||
rawBlock: rawBlock,
|
||||
verboseBlock: verboseBlock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// addBlocks inserts data in the given rawBlocks and verboseBlocks pairwise
|
||||
@@ -308,12 +306,6 @@ func addBlock(client *jsonrpc.Client, rawBlock string, verboseBlock btcjson.GetB
|
||||
}
|
||||
|
||||
dbTx.Commit()
|
||||
|
||||
// If the block was previously missing, remove it from
|
||||
// the missing blocks collection.
|
||||
if _, ok := missingBlocks[verboseBlock.Hash]; ok {
|
||||
delete(missingBlocks, verboseBlock.Hash)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -926,126 +918,106 @@ func updateAddedChainBlocks(dbTx *gorm.DB, addedBlock *btcjson.ChainBlock) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// enqueueBlockAddedMsg enqueues onBlockAdded messages to be handled later
|
||||
func enqueueBlockAddedMsg(blockAdded *jsonrpc.BlockAddedMsg) {
|
||||
pendingBlockAddedMsgs = append(pendingBlockAddedMsgs, blockAdded)
|
||||
type rawAndVerboseBlock struct {
|
||||
rawBlock string
|
||||
verboseBlock *btcjson.GetBlockVerboseResult
|
||||
}
|
||||
|
||||
// processBlockAddedMsgs processes all pending onBlockAdded messages.
|
||||
// Messages that cannot yet be processed are re-enqueued.
|
||||
func processBlockAddedMsgs(client *jsonrpc.Client) error {
|
||||
var unprocessedBlockAddedMsgs []*jsonrpc.BlockAddedMsg
|
||||
for _, blockAdded := range pendingBlockAddedMsgs {
|
||||
missingHashes, err := missingParentHashes(blockAdded)
|
||||
if err != nil {
|
||||
return errors.Errorf("Could not resolve missing parents: %s", err)
|
||||
}
|
||||
for _, missingHash := range missingHashes {
|
||||
err := handleMissingParent(client, missingHash)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to handle missing parent block %s: %s",
|
||||
missingHash, err)
|
||||
}
|
||||
}
|
||||
if len(missingHashes) > 0 {
|
||||
unprocessedBlockAddedMsgs = append(unprocessedBlockAddedMsgs, blockAdded)
|
||||
continue
|
||||
}
|
||||
func (r *rawAndVerboseBlock) String() string {
|
||||
return r.verboseBlock.Hash
|
||||
}
|
||||
|
||||
handleBlockAddedMsg(client, blockAdded)
|
||||
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) error {
|
||||
blocks, err := fetchBlockAndMissingAncestors(client, blockAdded.Header.BlockHash())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, block := range blocks {
|
||||
err = addBlock(client, block.rawBlock, *block.verboseBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Added block %s", block.verboseBlock.Hash)
|
||||
}
|
||||
pendingBlockAddedMsgs = unprocessedBlockAddedMsgs
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleBlockAddedMsg(client *jsonrpc.Client, blockAdded *jsonrpc.BlockAddedMsg) {
|
||||
hash := blockAdded.Header.BlockHash()
|
||||
log.Debugf("Getting block %s from the RPC server", hash)
|
||||
rawBlock, verboseBlock, err := fetchBlock(client, hash)
|
||||
if err != nil {
|
||||
log.Warnf("Could not fetch block %s: %s", hash, err)
|
||||
return
|
||||
}
|
||||
err = addBlock(client, rawBlock, *verboseBlock)
|
||||
if err != nil {
|
||||
log.Errorf("Could not insert block %s: %s", hash, err)
|
||||
return
|
||||
}
|
||||
log.Infof("Added block %s", hash)
|
||||
}
|
||||
|
||||
func missingParentHashes(blockAdded *jsonrpc.BlockAddedMsg) ([]string, error) {
|
||||
db, err := database.DB()
|
||||
func fetchBlockAndMissingAncestors(client *jsonrpc.Client, blockHash *daghash.Hash) ([]*rawAndVerboseBlock, error) {
|
||||
block, err := fetchBlock(client, blockHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pendingBlocks := []*rawAndVerboseBlock{block}
|
||||
blocksToAdd := make([]*rawAndVerboseBlock, 0)
|
||||
blocksToAddSet := make(map[string]struct{})
|
||||
for len(pendingBlocks) > 0 {
|
||||
var currentBlock *rawAndVerboseBlock
|
||||
currentBlock, pendingBlocks = pendingBlocks[0], pendingBlocks[1:]
|
||||
missingHashes, err := missingParentHashes(currentBlock.verboseBlock.ParentHashes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blocksToPrependToPending := make([]*rawAndVerboseBlock, 0, len(missingHashes))
|
||||
for _, missingHash := range missingHashes {
|
||||
if _, ok := blocksToAddSet[missingHash]; ok {
|
||||
continue
|
||||
}
|
||||
hash, err := daghash.NewHashFromStr(missingHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
block, err := fetchBlock(client, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blocksToPrependToPending = append(blocksToPrependToPending, block)
|
||||
}
|
||||
if len(blocksToPrependToPending) == 0 {
|
||||
blocksToAddSet[currentBlock.verboseBlock.Hash] = struct{}{}
|
||||
blocksToAdd = append(blocksToAdd, currentBlock)
|
||||
continue
|
||||
}
|
||||
log.Debugf("Found %s missing parents for block %s and fetched them", blocksToPrependToPending, currentBlock)
|
||||
blocksToPrependToPending = append(blocksToPrependToPending, currentBlock)
|
||||
pendingBlocks = append(blocksToPrependToPending, pendingBlocks...)
|
||||
}
|
||||
return blocksToAdd, nil
|
||||
}
|
||||
|
||||
// Collect all referenced parent hashes
|
||||
hashesIn := make([]string, 0, len(blockAdded.Header.ParentHashes))
|
||||
for _, hash := range blockAdded.Header.ParentHashes {
|
||||
hashesIn = append(hashesIn, hash.String())
|
||||
func missingParentHashes(parentHashes []string) ([]string, error) {
|
||||
db, err := database.DB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Make sure that all the parent hashes exist in the database
|
||||
var dbParentBlocks []dbmodels.Block
|
||||
dbResult := db.
|
||||
Model(&dbmodels.Block{}).
|
||||
Where("block_hash in (?)", hashesIn).
|
||||
Where("block_hash in (?)", parentHashes).
|
||||
Find(&dbParentBlocks)
|
||||
dbErrors := dbResult.GetErrors()
|
||||
if httpserverutils.HasDBError(dbErrors) {
|
||||
return nil, httpserverutils.NewErrorFromDBErrors("failed to find parent blocks: ", dbErrors)
|
||||
}
|
||||
if len(hashesIn) != len(dbParentBlocks) {
|
||||
if len(parentHashes) != len(dbParentBlocks) {
|
||||
// Some parent hashes are missing. Collect and return them
|
||||
var missingParentHashes []string
|
||||
var missingHashes []string
|
||||
outerLoop:
|
||||
for _, hash := range hashesIn {
|
||||
for _, hash := range parentHashes {
|
||||
for _, dbParentBlock := range dbParentBlocks {
|
||||
if dbParentBlock.BlockHash == hash {
|
||||
continue outerLoop
|
||||
}
|
||||
}
|
||||
missingParentHashes = append(missingParentHashes, hash)
|
||||
missingHashes = append(missingHashes, hash)
|
||||
}
|
||||
return missingParentHashes, nil
|
||||
return missingHashes, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// handleMissingParent handles missing parent block hashes as follows:
|
||||
// a. If it's the first time we've encountered this block hash, add it to
|
||||
// the missing blocks collection with time = now
|
||||
// b. Otherwise, if time + blockMissingTimeout < now (that is to say,
|
||||
// blockMissingTimeout had elapsed) then request the block from the
|
||||
// node
|
||||
func handleMissingParent(client *jsonrpc.Client, missingParentHash string) error {
|
||||
firstMissingTime, ok := missingBlocks[missingParentHash]
|
||||
if !ok {
|
||||
log.Infof("Parent block %s is missing", missingParentHash)
|
||||
missingBlocks[missingParentHash] = time.Now()
|
||||
return nil
|
||||
}
|
||||
if firstMissingTime.Add(blockMissingTimeout).Before(time.Now()) {
|
||||
hash, err := daghash.NewHashFromStr(missingParentHash)
|
||||
if err != nil {
|
||||
return errors.Errorf("Could not create hash: %s", err)
|
||||
}
|
||||
rawBlock, verboseBlock, err := fetchBlock(client, hash)
|
||||
if err != nil {
|
||||
return errors.Errorf("Could not fetch block %s: %s", hash, err)
|
||||
}
|
||||
err = addBlock(client, rawBlock, *verboseBlock)
|
||||
if err != nil {
|
||||
return errors.Errorf("Could not insert block %s: %s", hash, err)
|
||||
}
|
||||
log.Infof("Parent block %s was fetched after missing for over %s",
|
||||
missingParentHash, blockMissingTimeout)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// enqueueChainChangedMsg enqueues onChainChanged messages to be handled later
|
||||
func enqueueChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) {
|
||||
pendingChainChangedMsgs = append(pendingChainChangedMsgs, chainChanged)
|
||||
|
||||
@@ -195,7 +195,7 @@ func (dag *BlockDAG) IsKnownOrphan(hash *daghash.Hash) bool {
|
||||
// GetOrphanMissingAncestorHashes returns all of the missing parents in the orphan's sub-DAG
|
||||
//
|
||||
// This function is safe for concurrent access.
|
||||
func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*daghash.Hash, error) {
|
||||
func (dag *BlockDAG) GetOrphanMissingAncestorHashes(orphanHash *daghash.Hash) ([]*daghash.Hash, error) {
|
||||
// Protect concurrent access. Using a read lock only so multiple
|
||||
// readers can query without blocking each other.
|
||||
dag.orphanLock.RLock()
|
||||
@@ -204,7 +204,7 @@ func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*dagh
|
||||
missingAncestorsHashes := make([]*daghash.Hash, 0)
|
||||
|
||||
visited := make(map[daghash.Hash]bool)
|
||||
queue := []*daghash.Hash{hash}
|
||||
queue := []*daghash.Hash{orphanHash}
|
||||
for len(queue) > 0 {
|
||||
var current *daghash.Hash
|
||||
current, queue = queue[0], queue[1:]
|
||||
@@ -216,7 +216,7 @@ func (dag *BlockDAG) GetOrphanMissingAncestorHashes(hash *daghash.Hash) ([]*dagh
|
||||
queue = append(queue, parentHash)
|
||||
}
|
||||
} else {
|
||||
if !dag.BlockExists(current) {
|
||||
if !dag.BlockExists(current) && current != orphanHash {
|
||||
missingAncestorsHashes = append(missingAncestorsHashes, current)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/daglabs/btcd/config"
|
||||
"github.com/pkg/errors"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
@@ -198,13 +199,16 @@ func loadConfig() (*ConfigFlags, []string, error) {
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
|
||||
// Use config file for RPC server to create default btcctl config
|
||||
serverConfigPath := filepath.Join(btcdHomeDir, "btcd.conf")
|
||||
|
||||
err := createDefaultConfigFile(preCfg.ConfigFile, serverConfigPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error creating a default config file: %s\n", err)
|
||||
// If no rpc user and password were configured, create
|
||||
// a btcctl default config file based on the rpc login
|
||||
// details written in the RPC server configuration file
|
||||
if preCfg.RPCUser == "" && preCfg.RPCPassword == "" {
|
||||
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
|
||||
serverConfigPath := filepath.Join(btcdHomeDir, "btcd.conf")
|
||||
err := createDefaultConfigFile(preCfg.ConfigFile, serverConfigPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error creating a default config file: %s\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,6 +254,9 @@ func loadConfig() (*ConfigFlags, []string, error) {
|
||||
func createDefaultConfigFile(destinationPath, serverConfigPath string) error {
|
||||
// Read the RPC server config
|
||||
serverConfigFile, err := os.Open(serverConfigPath)
|
||||
if os.IsNotExist(err) {
|
||||
return errors.Errorf("the RPC server configuration file could not be found at %s", serverConfigPath)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -32,15 +32,16 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultConfigFilename = "btcd.conf"
|
||||
defaultDataDirname = "data"
|
||||
defaultLogLevel = "info"
|
||||
defaultLogDirname = "logs"
|
||||
defaultLogFilename = "btcd.log"
|
||||
defaultErrLogFilename = "btcd_err.log"
|
||||
defaultMaxPeers = 125
|
||||
defaultBanDuration = time.Hour * 24
|
||||
defaultBanThreshold = 100
|
||||
defaultConfigFilename = "btcd.conf"
|
||||
defaultDataDirname = "data"
|
||||
defaultLogLevel = "info"
|
||||
defaultLogDirname = "logs"
|
||||
defaultLogFilename = "btcd.log"
|
||||
defaultErrLogFilename = "btcd_err.log"
|
||||
defaultTargetOutboundPeers = 8
|
||||
defaultMaxInboundPeers = 117
|
||||
defaultBanDuration = time.Hour * 24
|
||||
defaultBanThreshold = 100
|
||||
//DefaultConnectTimeout is the default connection timeout when dialing
|
||||
DefaultConnectTimeout = time.Second * 30
|
||||
defaultMaxRPCClients = 10
|
||||
@@ -101,7 +102,8 @@ type Flags struct {
|
||||
ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"`
|
||||
DisableListen bool `long:"nolisten" description:"Disable listening for incoming connections -- NOTE: Listening is automatically disabled if the --connect or --proxy options are used without also specifying listen interfaces via --listen"`
|
||||
Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 8333, testnet: 18333)"`
|
||||
MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"`
|
||||
TargetOutboundPeers int `long:"outpeers" description:"Target number of outbound peers"`
|
||||
MaxInboundPeers int `long:"maxinpeers" description:"Max number of inbound peers"`
|
||||
DisableBanning bool `long:"nobanning" description:"Disable banning of misbehaving peers"`
|
||||
BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"`
|
||||
BanThreshold uint32 `long:"banthreshold" description:"Maximum allowed ban score before disconnecting and banning misbehaving peers."`
|
||||
@@ -296,7 +298,8 @@ func loadConfig() (*Config, []string, error) {
|
||||
cfgFlags := Flags{
|
||||
ConfigFile: defaultConfigFile,
|
||||
DebugLevel: defaultLogLevel,
|
||||
MaxPeers: defaultMaxPeers,
|
||||
TargetOutboundPeers: defaultTargetOutboundPeers,
|
||||
MaxInboundPeers: defaultMaxInboundPeers,
|
||||
BanDuration: defaultBanDuration,
|
||||
BanThreshold: defaultBanThreshold,
|
||||
RPCMaxClients: defaultMaxRPCClients,
|
||||
@@ -417,6 +420,24 @@ func loadConfig() (*Config, []string, error) {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if !activeConfig.DisableRPC {
|
||||
if activeConfig.RPCUser == "" {
|
||||
str := "%s: rpcuser cannot be empty"
|
||||
err := errors.Errorf(str, funcName)
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
fmt.Fprintln(os.Stderr, usageMessage)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if activeConfig.RPCPass == "" {
|
||||
str := "%s: rpcpass cannot be empty"
|
||||
err := errors.Errorf(str, funcName)
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
fmt.Fprintln(os.Stderr, usageMessage)
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = activeConfig.ResolveNetwork(parser)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
||||
@@ -7,11 +7,12 @@ package connmgr
|
||||
import (
|
||||
nativeerrors "errors"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// maxFailedAttempts is the maximum number of successive failed connection
|
||||
@@ -39,9 +40,9 @@ var (
|
||||
//ErrDialNil is used to indicate that Dial cannot be nil in the configuration.
|
||||
ErrDialNil = errors.New("Config: Dial cannot be nil")
|
||||
|
||||
// ErrMaxPeers is an error that is thrown when the max amount of peers had
|
||||
// ErrMaxOutboundPeers is an error that is thrown when the max amount of peers had
|
||||
// been reached.
|
||||
ErrMaxPeers = errors.New("max peers reached")
|
||||
ErrMaxOutboundPeers = errors.New("max outbound peers reached")
|
||||
|
||||
// ErrAlreadyConnected is an error that is thrown if the peer is already
|
||||
// connected.
|
||||
|
||||
@@ -7,12 +7,13 @@ package netsync
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/daglabs/btcd/blockdag"
|
||||
"github.com/daglabs/btcd/dagconfig"
|
||||
"github.com/daglabs/btcd/database"
|
||||
@@ -1290,16 +1291,13 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
|
||||
}
|
||||
})
|
||||
|
||||
// Don't relay if we are not current or the block was just now unorphaned.
|
||||
// Other peers that are current should already know about it
|
||||
if !sm.current() || data.WasUnorphaned {
|
||||
return
|
||||
// Relay if we are current and the block was not just now unorphaned.
|
||||
// Otherwise peers that are current should already know about it
|
||||
if sm.current() && !data.WasUnorphaned {
|
||||
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
|
||||
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
|
||||
}
|
||||
|
||||
// Generate the inventory vector and relay it.
|
||||
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
|
||||
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
|
||||
|
||||
for msg := range ch {
|
||||
sm.peerNotifier.TransactionConfirmed(msg.Tx)
|
||||
sm.peerNotifier.AnnounceNewTransactions(msg.AcceptedTxs)
|
||||
|
||||
@@ -782,7 +782,11 @@ func newFutureError(err error) chan *response {
|
||||
func receiveFuture(f chan *response) ([]byte, error) {
|
||||
// Wait for a response on the returned channel.
|
||||
r := <-f
|
||||
return r.result, r.err
|
||||
var err error
|
||||
if r.err != nil {
|
||||
err = errors.Wrap(r.err, "got error from response channel")
|
||||
}
|
||||
return r.result, err
|
||||
}
|
||||
|
||||
// sendPost sends the passed request to the server by issuing an HTTP POST
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"math"
|
||||
"net"
|
||||
"runtime"
|
||||
@@ -21,6 +20,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/daglabs/btcd/util/subnetworkid"
|
||||
|
||||
"github.com/daglabs/btcd/addrmgr"
|
||||
@@ -52,9 +53,6 @@ const (
|
||||
// required to be supported by outbound peers.
|
||||
defaultRequiredServices = wire.SFNodeNetwork
|
||||
|
||||
// defaultTargetOutbound is the default number of outbound peers to target.
|
||||
defaultTargetOutbound = 8
|
||||
|
||||
// connectionRetryInterval is the base amount of time to wait in between
|
||||
// retries when connecting to persistent peers. It is adjusted by the
|
||||
// number of retries such that there is a retry backoff.
|
||||
@@ -190,7 +188,15 @@ type peerState struct {
|
||||
|
||||
// Count returns the count of all known peers.
|
||||
func (ps *peerState) Count() int {
|
||||
return len(ps.inboundPeers) + len(ps.outboundPeers) +
|
||||
return ps.countInboundPeers() + ps.countOutboundPeers()
|
||||
}
|
||||
|
||||
func (ps *peerState) countInboundPeers() int {
|
||||
return len(ps.inboundPeers)
|
||||
}
|
||||
|
||||
func (ps *peerState) countOutboundPeers() int {
|
||||
return len(ps.outboundPeers) +
|
||||
len(ps.persistentPeers)
|
||||
}
|
||||
|
||||
@@ -276,12 +282,17 @@ type Server struct {
|
||||
relayInv chan relayMsg
|
||||
broadcast chan broadcastMsg
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
nat serverutils.NAT
|
||||
db database.DB
|
||||
TimeSource blockdag.MedianTimeSource
|
||||
services wire.ServiceFlag
|
||||
|
||||
// We add to quitWaitGroup before every instance in which we wait for
|
||||
// the quit channel so that all those instances finish before we shut
|
||||
// down the managers (connManager, addrManager, etc),
|
||||
quitWaitGroup sync.WaitGroup
|
||||
quit chan struct{}
|
||||
|
||||
// The following fields are used for optional indexes. They will be nil
|
||||
// if the associated index is not enabled. These fields are set during
|
||||
// initial creation of the server and never changed afterwards, so they
|
||||
@@ -687,12 +698,10 @@ func (s *Server) handleAddPeerMsg(state *peerState, sp *Peer) bool {
|
||||
// TODO: Check for max peers from a single IP.
|
||||
|
||||
// Limit max number of total peers.
|
||||
if state.Count() >= config.ActiveConfig().MaxPeers {
|
||||
srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s",
|
||||
config.ActiveConfig().MaxPeers, sp)
|
||||
if sp.Inbound() && len(state.inboundPeers) >= config.ActiveConfig().MaxInboundPeers {
|
||||
srvrLog.Infof("Max inbound peers reached [%d] - disconnecting peer %s",
|
||||
config.ActiveConfig().MaxInboundPeers, sp)
|
||||
sp.Disconnect()
|
||||
// TODO: how to handle permanent peers here?
|
||||
// they should be rescheduled.
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -938,8 +947,8 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) {
|
||||
case ConnectNodeMsg:
|
||||
// TODO: duplicate oneshots?
|
||||
// Limit max number of total peers.
|
||||
if state.Count() >= config.ActiveConfig().MaxPeers {
|
||||
msg.Reply <- connmgr.ErrMaxPeers
|
||||
if state.countOutboundPeers() >= config.ActiveConfig().TargetOutboundPeers {
|
||||
msg.Reply <- connmgr.ErrMaxOutboundPeers
|
||||
return
|
||||
}
|
||||
for _, peer := range state.persistentPeers {
|
||||
@@ -1166,6 +1175,8 @@ func (s *Server) peerHandler() {
|
||||
s.addrManager.Start()
|
||||
s.SyncManager.Start()
|
||||
|
||||
s.quitWaitGroup.Add(1)
|
||||
|
||||
srvrLog.Tracef("Starting peer handler")
|
||||
|
||||
state := &peerState{
|
||||
@@ -1231,6 +1242,7 @@ out:
|
||||
sp.Disconnect()
|
||||
return true
|
||||
})
|
||||
s.quitWaitGroup.Done()
|
||||
break out
|
||||
|
||||
case opcMsg := <-s.newOutboundConnection:
|
||||
@@ -1238,6 +1250,10 @@ out:
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all p2p server quit jobs to finish before stopping the
|
||||
// various managers
|
||||
s.quitWaitGroup.Wait()
|
||||
|
||||
s.connManager.Stop()
|
||||
s.SyncManager.Stop()
|
||||
s.addrManager.Stop()
|
||||
@@ -1340,6 +1356,8 @@ func (s *Server) rebroadcastHandler() {
|
||||
timer := time.NewTimer(5 * time.Minute)
|
||||
pendingInvs := make(map[wire.InvVect]interface{})
|
||||
|
||||
s.quitWaitGroup.Add(1)
|
||||
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
@@ -1387,6 +1405,7 @@ cleanup:
|
||||
break cleanup
|
||||
}
|
||||
}
|
||||
s.quitWaitGroup.Done()
|
||||
s.wg.Done()
|
||||
}
|
||||
|
||||
@@ -1524,6 +1543,9 @@ func (s *Server) upnpUpdateThread() {
|
||||
timer := time.NewTimer(0 * time.Second)
|
||||
lport, _ := strconv.ParseInt(config.ActiveConfig().NetParams().DefaultPort, 10, 16)
|
||||
first := true
|
||||
|
||||
s.quitWaitGroup.Add(1)
|
||||
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
@@ -1569,6 +1591,7 @@ out:
|
||||
srvrLog.Debugf("successfully disestablished UPnP port mapping")
|
||||
}
|
||||
|
||||
s.quitWaitGroup.Done()
|
||||
s.wg.Done()
|
||||
}
|
||||
|
||||
@@ -1599,18 +1622,20 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
|
||||
}
|
||||
}
|
||||
|
||||
maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers
|
||||
|
||||
s := Server{
|
||||
DAGParams: dagParams,
|
||||
addrManager: amgr,
|
||||
newPeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
|
||||
donePeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
|
||||
banPeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
|
||||
newPeers: make(chan *Peer, maxPeers),
|
||||
donePeers: make(chan *Peer, maxPeers),
|
||||
banPeers: make(chan *Peer, maxPeers),
|
||||
Query: make(chan interface{}),
|
||||
relayInv: make(chan relayMsg, config.ActiveConfig().MaxPeers),
|
||||
broadcast: make(chan broadcastMsg, config.ActiveConfig().MaxPeers),
|
||||
relayInv: make(chan relayMsg, maxPeers),
|
||||
broadcast: make(chan broadcastMsg, maxPeers),
|
||||
quit: make(chan struct{}),
|
||||
modifyRebroadcastInv: make(chan interface{}),
|
||||
newOutboundConnection: make(chan *outboundPeerConnectedMsg),
|
||||
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers),
|
||||
nat: nat,
|
||||
db: db,
|
||||
TimeSource: blockdag.NewMedianTime(),
|
||||
@@ -1714,7 +1739,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
|
||||
TxMemPool: s.TxMemPool,
|
||||
ChainParams: s.DAGParams,
|
||||
DisableCheckpoints: cfg.DisableCheckpoints,
|
||||
MaxPeers: cfg.MaxPeers,
|
||||
MaxPeers: maxPeers,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -1772,15 +1797,11 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
|
||||
}
|
||||
|
||||
// Create a connection manager.
|
||||
targetOutbound := defaultTargetOutbound
|
||||
if config.ActiveConfig().MaxPeers < targetOutbound {
|
||||
targetOutbound = config.ActiveConfig().MaxPeers
|
||||
}
|
||||
cmgr, err := connmgr.New(&connmgr.Config{
|
||||
Listeners: listeners,
|
||||
OnAccept: s.inboundPeerConnected,
|
||||
RetryDuration: connectionRetryInterval,
|
||||
TargetOutbound: uint32(targetOutbound),
|
||||
TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers),
|
||||
Dial: serverutils.BTCDDial,
|
||||
OnConnection: func(c *connmgr.ConnReq, conn net.Conn) {
|
||||
s.newOutboundConnection <- &outboundPeerConnectedMsg{
|
||||
|
||||
@@ -275,21 +275,8 @@ func buildGetBlockVerboseResult(s *Server, block *util.Block, isVerboseTx bool)
|
||||
txns := block.Transactions()
|
||||
rawTxns := make([]btcjson.TxRawResult, len(txns))
|
||||
for i, tx := range txns {
|
||||
var acceptingBlock *daghash.Hash
|
||||
var confirmations *uint64
|
||||
if s.cfg.TxIndex != nil {
|
||||
acceptingBlock, err = s.cfg.TxIndex.BlockThatAcceptedTx(s.cfg.DAG, tx.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
txConfirmations, err := txConfirmationsNoLock(s, tx.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
confirmations = &txConfirmations
|
||||
}
|
||||
rawTxn, err := createTxRawResult(params, tx.MsgTx(), tx.ID().String(),
|
||||
&blockHeader, hash.String(), acceptingBlock, confirmations, false)
|
||||
&blockHeader, hash.String(), nil, nil, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
package testutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// AreErrorsEqual returns whether errors have the same type
|
||||
// and same error string from .Error().
|
||||
func AreErrorsEqual(err1, err2 error) bool {
|
||||
@@ -13,8 +9,5 @@ func AreErrorsEqual(err1, err2 error) bool {
|
||||
if err1 == nil && err2 != nil {
|
||||
return false
|
||||
}
|
||||
if fmt.Sprintf("%T", err1) != fmt.Sprintf("%T", err2) {
|
||||
return false
|
||||
}
|
||||
return err1.Error() == err2.Error()
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func Decode(dst *Hash, src string) error {
|
||||
var reversedHash Hash
|
||||
_, err := hex.Decode(reversedHash[HashSize-hex.DecodedLen(len(srcBytes)):], srcBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Reverse copy from the temporary hash to destination. Because the
|
||||
|
||||
@@ -7,6 +7,7 @@ package daghash
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"github.com/daglabs/btcd/testutil"
|
||||
"math/big"
|
||||
"reflect"
|
||||
"testing"
|
||||
@@ -214,7 +215,7 @@ func TestNewHashFromStr(t *testing.T) {
|
||||
t.Logf("Running %d tests", len(tests))
|
||||
for i, test := range tests {
|
||||
result, err := NewHashFromStr(test.in)
|
||||
if err != test.err {
|
||||
if !testutil.AreErrorsEqual(err, test.err) {
|
||||
t.Errorf(unexpectedErrStr, i, err, test.err)
|
||||
continue
|
||||
} else if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user