Compare commits

..

5 Commits

Author SHA1 Message Date
Svarog
e68b242243 [NOD-489] Don't skip notification about transactions for orphan/non-current blocks (#511) 2019-12-03 14:18:32 +02:00
Ori Newman
9cc2a7260b [NOD-479] Separate max outbound connections and max inbound connections (#509)
* [NOD-479] Separate max outbound connections and max inbound connections

* [NOD-479] Fix merge

* [NOD-479] Renames and add function countinboundPeers

* [NOD-479] Remove redundant check on maximum outbound peers

* [NOD-479] Rename countinboundPeers -> countInboundPeers
2019-12-03 12:27:49 +02:00
Ori Newman
bcd73012de [NOD-428] Require RPC user and password, and do not create a default config file for btcctl if rpc login details were provided (#510)
* [NOD-428] Required RPC user and password, and do not create a default config file for btcctl if rpc login details were provided

* [NOD-428] Don't check rpc user and password if rpc is disabled

* [NOD-428] Fix error message
2019-12-03 11:18:28 +02:00
Dan Aharoni
1fea2a9421 [NOD-486] API Server TX posting: Forward error when RPC Error is received (#507)
* [NOD-486] Forward error when RPC Error is recieved

* [NOD-486] Rename variable

* [NOD-486] Rename variable

* [NOD-486] Rename Variable (again)
2019-12-02 18:44:39 +02:00
stasatdaglabs
bb7d68deda [NOD-484] Fix deadlock between p2p server and sync manager during shutdown (#508)
* [NOD-484] Fix deadlock between p2p server and sync manager during shutdown.

* [NOD-484] Fix quitWaitGroup.Wait() potentially not waiting in some scenarios.

* [NOD-484] Add a comment explaining quitWaitGroup.

* [NOD-484] Fix typo.

* [NOD-484] Add etc to comment.
2019-12-02 18:08:32 +02:00
6 changed files with 104 additions and 57 deletions

View File

@@ -278,12 +278,13 @@ func PostTransaction(requestBody []byte) error {
_, err = client.SendRawTransaction(tx, true)
if err != nil {
if rpcErr, ok := err.(*btcjson.RPCError); ok && rpcErr.Code == btcjson.ErrRPCVerify {
return httpserverutils.NewHandlerError(http.StatusInternalServerError, err)
switch err := errors.Cause(err).(type) {
case *btcjson.RPCError:
return httpserverutils.NewHandlerError(http.StatusUnprocessableEntity, err)
default:
return err
}
return err
}
return nil
}

View File

@@ -7,6 +7,7 @@ package main
import (
"fmt"
"github.com/daglabs/btcd/config"
"github.com/pkg/errors"
"io/ioutil"
"net"
"os"
@@ -198,13 +199,16 @@ func loadConfig() (*ConfigFlags, []string, error) {
os.Exit(0)
}
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
// Use config file for RPC server to create default btcctl config
serverConfigPath := filepath.Join(btcdHomeDir, "btcd.conf")
err := createDefaultConfigFile(preCfg.ConfigFile, serverConfigPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating a default config file: %s\n", err)
// If no rpc user and password were configured, create
// a btcctl default config file based on the rpc login
// details written in the RPC server configuration file
if preCfg.RPCUser == "" && preCfg.RPCPassword == "" {
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
serverConfigPath := filepath.Join(btcdHomeDir, "btcd.conf")
err := createDefaultConfigFile(preCfg.ConfigFile, serverConfigPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating a default config file: %s\n", err)
}
}
}
@@ -250,6 +254,9 @@ func loadConfig() (*ConfigFlags, []string, error) {
func createDefaultConfigFile(destinationPath, serverConfigPath string) error {
// Read the RPC server config
serverConfigFile, err := os.Open(serverConfigPath)
if os.IsNotExist(err) {
return errors.Errorf("the RPC server configuration file could not be found at %s", serverConfigPath)
}
if err != nil {
return err
}

View File

@@ -32,15 +32,16 @@ import (
)
const (
defaultConfigFilename = "btcd.conf"
defaultDataDirname = "data"
defaultLogLevel = "info"
defaultLogDirname = "logs"
defaultLogFilename = "btcd.log"
defaultErrLogFilename = "btcd_err.log"
defaultMaxPeers = 125
defaultBanDuration = time.Hour * 24
defaultBanThreshold = 100
defaultConfigFilename = "btcd.conf"
defaultDataDirname = "data"
defaultLogLevel = "info"
defaultLogDirname = "logs"
defaultLogFilename = "btcd.log"
defaultErrLogFilename = "btcd_err.log"
defaultTargetOutboundPeers = 8
defaultMaxInboundPeers = 117
defaultBanDuration = time.Hour * 24
defaultBanThreshold = 100
//DefaultConnectTimeout is the default connection timeout when dialing
DefaultConnectTimeout = time.Second * 30
defaultMaxRPCClients = 10
@@ -101,7 +102,8 @@ type Flags struct {
ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"`
DisableListen bool `long:"nolisten" description:"Disable listening for incoming connections -- NOTE: Listening is automatically disabled if the --connect or --proxy options are used without also specifying listen interfaces via --listen"`
Listeners []string `long:"listen" description:"Add an interface/port to listen for connections (default all interfaces port: 8333, testnet: 18333)"`
MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"`
TargetOutboundPeers int `long:"outpeers" description:"Target number of outbound peers"`
MaxInboundPeers int `long:"maxinpeers" description:"Max number of inbound peers"`
DisableBanning bool `long:"nobanning" description:"Disable banning of misbehaving peers"`
BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"`
BanThreshold uint32 `long:"banthreshold" description:"Maximum allowed ban score before disconnecting and banning misbehaving peers."`
@@ -296,7 +298,8 @@ func loadConfig() (*Config, []string, error) {
cfgFlags := Flags{
ConfigFile: defaultConfigFile,
DebugLevel: defaultLogLevel,
MaxPeers: defaultMaxPeers,
TargetOutboundPeers: defaultTargetOutboundPeers,
MaxInboundPeers: defaultMaxInboundPeers,
BanDuration: defaultBanDuration,
BanThreshold: defaultBanThreshold,
RPCMaxClients: defaultMaxRPCClients,
@@ -417,6 +420,24 @@ func loadConfig() (*Config, []string, error) {
return nil, nil, err
}
if !activeConfig.DisableRPC {
if activeConfig.RPCUser == "" {
str := "%s: rpcuser cannot be empty"
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
if activeConfig.RPCPass == "" {
str := "%s: rpcpass cannot be empty"
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
}
err = activeConfig.ResolveNetwork(parser)
if err != nil {
return nil, nil, err

View File

@@ -40,9 +40,9 @@ var (
//ErrDialNil is used to indicate that Dial cannot be nil in the configuration.
ErrDialNil = errors.New("Config: Dial cannot be nil")
// ErrMaxPeers is an error that is thrown when the max amount of peers had
// ErrMaxOutboundPeers is an error that is thrown when the max amount of peers had
// been reached.
ErrMaxPeers = errors.New("max peers reached")
ErrMaxOutboundPeers = errors.New("max outbound peers reached")
// ErrAlreadyConnected is an error that is thrown if the peer is already
// connected.

View File

@@ -7,12 +7,13 @@ package netsync
import (
"container/list"
"fmt"
"github.com/pkg/errors"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/dagconfig"
"github.com/daglabs/btcd/database"
@@ -1290,16 +1291,13 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
}
})
// Don't relay if we are not current or the block was just now unorphaned.
// Other peers that are current should already know about it
if !sm.current() || data.WasUnorphaned {
return
// Relay if we are current and the block was not just now unorphaned.
// Otherwise peers that are current should already know about it
if sm.current() && !data.WasUnorphaned {
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
}
// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
for msg := range ch {
sm.peerNotifier.TransactionConfirmed(msg.Tx)
sm.peerNotifier.AnnounceNewTransactions(msg.AcceptedTxs)

View File

@@ -53,9 +53,6 @@ const (
// required to be supported by outbound peers.
defaultRequiredServices = wire.SFNodeNetwork
// defaultTargetOutbound is the default number of outbound peers to target.
defaultTargetOutbound = 8
// connectionRetryInterval is the base amount of time to wait in between
// retries when connecting to persistent peers. It is adjusted by the
// number of retries such that there is a retry backoff.
@@ -191,7 +188,15 @@ type peerState struct {
// Count returns the count of all known peers.
func (ps *peerState) Count() int {
return len(ps.inboundPeers) + len(ps.outboundPeers) +
return ps.countInboundPeers() + ps.countOutboundPeers()
}
func (ps *peerState) countInboundPeers() int {
return len(ps.inboundPeers)
}
func (ps *peerState) countOutboundPeers() int {
return len(ps.outboundPeers) +
len(ps.persistentPeers)
}
@@ -277,12 +282,17 @@ type Server struct {
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan struct{}
nat serverutils.NAT
db database.DB
TimeSource blockdag.MedianTimeSource
services wire.ServiceFlag
// We add to quitWaitGroup before every instance in which we wait for
// the quit channel so that all those instances finish before we shut
// down the managers (connManager, addrManager, etc),
quitWaitGroup sync.WaitGroup
quit chan struct{}
// The following fields are used for optional indexes. They will be nil
// if the associated index is not enabled. These fields are set during
// initial creation of the server and never changed afterwards, so they
@@ -688,12 +698,10 @@ func (s *Server) handleAddPeerMsg(state *peerState, sp *Peer) bool {
// TODO: Check for max peers from a single IP.
// Limit max number of total peers.
if state.Count() >= config.ActiveConfig().MaxPeers {
srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s",
config.ActiveConfig().MaxPeers, sp)
if sp.Inbound() && len(state.inboundPeers) >= config.ActiveConfig().MaxInboundPeers {
srvrLog.Infof("Max inbound peers reached [%d] - disconnecting peer %s",
config.ActiveConfig().MaxInboundPeers, sp)
sp.Disconnect()
// TODO: how to handle permanent peers here?
// they should be rescheduled.
return false
}
@@ -939,8 +947,8 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) {
case ConnectNodeMsg:
// TODO: duplicate oneshots?
// Limit max number of total peers.
if state.Count() >= config.ActiveConfig().MaxPeers {
msg.Reply <- connmgr.ErrMaxPeers
if state.countOutboundPeers() >= config.ActiveConfig().TargetOutboundPeers {
msg.Reply <- connmgr.ErrMaxOutboundPeers
return
}
for _, peer := range state.persistentPeers {
@@ -1167,6 +1175,8 @@ func (s *Server) peerHandler() {
s.addrManager.Start()
s.SyncManager.Start()
s.quitWaitGroup.Add(1)
srvrLog.Tracef("Starting peer handler")
state := &peerState{
@@ -1232,6 +1242,7 @@ out:
sp.Disconnect()
return true
})
s.quitWaitGroup.Done()
break out
case opcMsg := <-s.newOutboundConnection:
@@ -1239,6 +1250,10 @@ out:
}
}
// Wait for all p2p server quit jobs to finish before stopping the
// various managers
s.quitWaitGroup.Wait()
s.connManager.Stop()
s.SyncManager.Stop()
s.addrManager.Stop()
@@ -1341,6 +1356,8 @@ func (s *Server) rebroadcastHandler() {
timer := time.NewTimer(5 * time.Minute)
pendingInvs := make(map[wire.InvVect]interface{})
s.quitWaitGroup.Add(1)
out:
for {
select {
@@ -1388,6 +1405,7 @@ cleanup:
break cleanup
}
}
s.quitWaitGroup.Done()
s.wg.Done()
}
@@ -1525,6 +1543,9 @@ func (s *Server) upnpUpdateThread() {
timer := time.NewTimer(0 * time.Second)
lport, _ := strconv.ParseInt(config.ActiveConfig().NetParams().DefaultPort, 10, 16)
first := true
s.quitWaitGroup.Add(1)
out:
for {
select {
@@ -1570,6 +1591,7 @@ out:
srvrLog.Debugf("successfully disestablished UPnP port mapping")
}
s.quitWaitGroup.Done()
s.wg.Done()
}
@@ -1600,18 +1622,20 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
}
}
maxPeers := config.ActiveConfig().TargetOutboundPeers + config.ActiveConfig().MaxInboundPeers
s := Server{
DAGParams: dagParams,
addrManager: amgr,
newPeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
donePeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
banPeers: make(chan *Peer, config.ActiveConfig().MaxPeers),
newPeers: make(chan *Peer, maxPeers),
donePeers: make(chan *Peer, maxPeers),
banPeers: make(chan *Peer, maxPeers),
Query: make(chan interface{}),
relayInv: make(chan relayMsg, config.ActiveConfig().MaxPeers),
broadcast: make(chan broadcastMsg, config.ActiveConfig().MaxPeers),
relayInv: make(chan relayMsg, maxPeers),
broadcast: make(chan broadcastMsg, maxPeers),
quit: make(chan struct{}),
modifyRebroadcastInv: make(chan interface{}),
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().MaxPeers), // TODO: replace with target outbound
newOutboundConnection: make(chan *outboundPeerConnectedMsg, config.ActiveConfig().TargetOutboundPeers),
nat: nat,
db: db,
TimeSource: blockdag.NewMedianTime(),
@@ -1715,7 +1739,7 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
TxMemPool: s.TxMemPool,
ChainParams: s.DAGParams,
DisableCheckpoints: cfg.DisableCheckpoints,
MaxPeers: cfg.MaxPeers,
MaxPeers: maxPeers,
})
if err != nil {
return nil, err
@@ -1773,15 +1797,11 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
}
// Create a connection manager.
targetOutbound := defaultTargetOutbound
if config.ActiveConfig().MaxPeers < targetOutbound {
targetOutbound = config.ActiveConfig().MaxPeers
}
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(targetOutbound),
TargetOutbound: uint32(config.ActiveConfig().TargetOutboundPeers),
Dial: serverutils.BTCDDial,
OnConnection: func(c *connmgr.ConnReq, conn net.Conn) {
s.newOutboundConnection <- &outboundPeerConnectedMsg{