[NOD-159] Wrap all goroutines to handle panics (#290)

* [NOD-159] Wrap all goroutines to handle panics

* [NOD-159] Fix gofmt errors

* [NOD-159] Add comment to HandlePanic

* [NOD-159] Merge panics and gowrapper packages

* [NOD-159] Added missing initialization
This commit is contained in:
Ori Newman 2019-05-07 16:13:07 +03:00 committed by Svarog
parent 42109ec4d5
commit b7b41f1a94
38 changed files with 186 additions and 117 deletions

View File

@ -724,7 +724,7 @@ func (a *AddrManager) Start() {
// Start the address ticker to save addresses periodically. // Start the address ticker to save addresses periodically.
a.wg.Add(1) a.wg.Add(1)
go a.addressHandler() spawn(a.addressHandler)
} }
// Stop gracefully shuts down the address manager by stopping the main handler. // Stop gracefully shuts down the address manager by stopping the main handler.

View File

@ -7,13 +7,16 @@ package addrmgr
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.ADXR) log, _ = logger.Get(logger.SubsystemTags.ADXR)
spawn = panics.GoroutineWrapperFunc(log)
} }

View File

@ -7,14 +7,17 @@ package indexers
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
// The default amount of logging is none. // The default amount of logging is none.
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.INDX) log, _ = logger.Get(logger.SubsystemTags.INDX)
spawn = panics.GoroutineWrapperFunc(log)
} }

View File

@ -7,14 +7,17 @@ package blockdag
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
// The default amount of logging is none. // The default amount of logging is none.
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.CHAN) log, _ = logger.Get(logger.SubsystemTags.CHAN)
spawn = panics.GoroutineWrapperFunc(log)
} }

View File

@ -127,7 +127,7 @@ func (v *txValidator) Validate(items []*txValidateItem) error {
// Start up validation handlers that are used to asynchronously // Start up validation handlers that are used to asynchronously
// validate each transaction input. // validate each transaction input.
for i := 0; i < maxGoRoutines; i++ { for i := 0; i < maxGoRoutines; i++ {
go v.validateHandler() spawn(v.validateHandler)
} }
// Validate each of the inputs. The quit channel is closed when any // Validate each of the inputs. The quit channel is closed when any

12
btcd.go
View File

@ -21,10 +21,10 @@ import (
"github.com/daglabs/btcd/database" "github.com/daglabs/btcd/database"
_ "github.com/daglabs/btcd/database/ffldb" _ "github.com/daglabs/btcd/database/ffldb"
"github.com/daglabs/btcd/limits" "github.com/daglabs/btcd/limits"
"github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/server" "github.com/daglabs/btcd/server"
"github.com/daglabs/btcd/signal" "github.com/daglabs/btcd/signal"
"github.com/daglabs/btcd/util/fs" "github.com/daglabs/btcd/util/fs"
"github.com/daglabs/btcd/util/panics"
"github.com/daglabs/btcd/version" "github.com/daglabs/btcd/version"
) )
@ -56,11 +56,7 @@ func btcdMain(serverChan chan<- *server.Server) error {
return err return err
} }
cfg = config.MainConfig() cfg = config.MainConfig()
defer func() { defer panics.HandlePanic(btcdLog)
if logger.LogRotator != nil {
logger.LogRotator.Close()
}
}()
// Get a channel that will be closed when a shutdown signal has been // Get a channel that will be closed when a shutdown signal has been
// triggered either from an OS signal such as SIGINT (Ctrl+C) or from // triggered either from an OS signal such as SIGINT (Ctrl+C) or from
@ -73,14 +69,14 @@ func btcdMain(serverChan chan<- *server.Server) error {
// Enable http profiling server if requested. // Enable http profiling server if requested.
if cfg.Profile != "" { if cfg.Profile != "" {
go func() { spawn(func() {
listenAddr := net.JoinHostPort("", cfg.Profile) listenAddr := net.JoinHostPort("", cfg.Profile)
btcdLog.Infof("Profile server listening on %s", listenAddr) btcdLog.Infof("Profile server listening on %s", listenAddr)
profileRedirect := http.RedirectHandler("/debug/pprof", profileRedirect := http.RedirectHandler("/debug/pprof",
http.StatusSeeOther) http.StatusSeeOther)
http.Handle("/", profileRedirect) http.Handle("/", profileRedirect)
btcdLog.Errorf("%s", http.ListenAndServe(listenAddr, nil)) btcdLog.Errorf("%s", http.ListenAndServe(listenAddr, nil))
}() })
} }
// Write cpu profile if requested. // Write cpu profile if requested.

View File

@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/database" "github.com/daglabs/btcd/database"
"github.com/daglabs/btcd/limits" "github.com/daglabs/btcd/limits"
"github.com/daglabs/btcd/util/panics"
) )
const ( const (
@ -20,8 +21,9 @@ const (
) )
var ( var (
cfg *config cfg *config
log btclog.Logger log btclog.Logger
spawn func(func())
) )
// loadBlockDB opens the block database and returns a handle to it. // loadBlockDB opens the block database and returns a handle to it.
@ -70,6 +72,7 @@ func realMain() error {
backendLogger := btclog.NewBackend(os.Stdout) backendLogger := btclog.NewBackend(os.Stdout)
defer os.Stdout.Sync() defer os.Stdout.Sync()
log = backendLogger.Logger("MAIN") log = backendLogger.Logger("MAIN")
spawn = panics.GoroutineWrapperFunc(log)
// Load the block database. // Load the block database.
db, err := loadBlockDB() db, err := loadBlockDB()

View File

@ -273,20 +273,22 @@ func (bi *blockImporter) Import() chan *importResults {
// Start up the read and process handling goroutines. This setup allows // Start up the read and process handling goroutines. This setup allows
// blocks to be read from disk in parallel while being processed. // blocks to be read from disk in parallel while being processed.
bi.wg.Add(2) bi.wg.Add(2)
go bi.readHandler() spawn(bi.readHandler)
go bi.processHandler() spawn(bi.processHandler)
// Wait for the import to finish in a separate goroutine and signal // Wait for the import to finish in a separate goroutine and signal
// the status handler when done. // the status handler when done.
go func() { spawn(func() {
bi.wg.Wait() bi.wg.Wait()
bi.doneChan <- true bi.doneChan <- true
}() })
// Start the status handler and return the result channel that it will // Start the status handler and return the result channel that it will
// send the results on when the import is done. // send the results on when the import is done.
resultChan := make(chan *importResults) resultChan := make(chan *importResults)
go bi.statusHandler(resultChan) spawn(func() {
bi.statusHandler(resultChan)
})
return resultChan return resultChan
} }

View File

@ -2,7 +2,6 @@ package main
import ( import (
"fmt" "fmt"
"runtime/debug"
"sync/atomic" "sync/atomic"
"github.com/daglabs/btcd/btcec" "github.com/daglabs/btcd/btcec"
@ -11,6 +10,7 @@ import (
"github.com/daglabs/btcd/signal" "github.com/daglabs/btcd/signal"
"github.com/daglabs/btcd/util" "github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/util/base58" "github.com/daglabs/btcd/util/base58"
"github.com/daglabs/btcd/util/panics"
) )
var ( var (
@ -31,7 +31,7 @@ func privateKeyToP2pkhAddress(key *btcec.PrivateKey, net *dagconfig.Params) (uti
} }
func main() { func main() {
defer handlePanic() defer panics.HandlePanic(log)
cfg, err := parseConfig() cfg, err := parseConfig()
if err != nil { if err != nil {
@ -75,11 +75,3 @@ func disconnect(clients []*rpcclient.Client) {
client.Disconnect() client.Disconnect()
} }
} }
func handlePanic() {
err := recover()
if err != nil {
log.Errorf("Fatal error: %s", err)
log.Errorf("Stack trace: %s", debug.Stack())
}
}

View File

@ -214,7 +214,7 @@ func (cm *ConnManager) handleFailedConn(c *ConnReq) {
cm.NewConnReq() cm.NewConnReq()
}) })
} else { } else {
go cm.NewConnReq() spawn(cm.NewConnReq)
} }
} }
} }
@ -507,7 +507,7 @@ func (cm *ConnManager) Start() {
log.Trace("Connection manager started") log.Trace("Connection manager started")
cm.wg.Add(1) cm.wg.Add(1)
go cm.connHandler() spawn(cm.connHandler)
// Start all the listeners so long as the caller requested them and // Start all the listeners so long as the caller requested them and
// provided a callback to be invoked when connections are accepted. // provided a callback to be invoked when connections are accepted.
@ -519,7 +519,7 @@ func (cm *ConnManager) Start() {
} }
for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ { for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
go cm.NewConnReq() spawn(cm.NewConnReq)
} }
} }

View File

@ -7,15 +7,17 @@ package connmgr
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
// The default amount of logging is none. // The default amount of logging is none.
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.CMGR) log, _ = logger.Get(logger.SubsystemTags.CMGR)
spawn = panics.GoroutineWrapperFunc(log)
} }

View File

@ -288,15 +288,15 @@ func (bi *blockImporter) Import() chan *importResults {
// Start up the read and process handling goroutines. This setup allows // Start up the read and process handling goroutines. This setup allows
// blocks to be read from disk in parallel while being processed. // blocks to be read from disk in parallel while being processed.
bi.wg.Add(2) bi.wg.Add(2)
go bi.readHandler() spawn(bi.readHandler)
go bi.processHandler() spawn(bi.processHandler)
// Wait for the import to finish in a separate goroutine and signal // Wait for the import to finish in a separate goroutine and signal
// the status handler when done. // the status handler when done.
go func() { spawn(func() {
bi.wg.Wait() bi.wg.Wait()
bi.doneChan <- true bi.doneChan <- true
}() })
// Start the status handler and return the result channel that it will // Start the status handler and return the result channel that it will
// send the results on when the import is done. // send the results on when the import is done.
@ -365,7 +365,7 @@ func (cmd *importCmd) Execute(args []string) error {
// or from the main interrupt handler. This is necessary since the main // or from the main interrupt handler. This is necessary since the main
// goroutine must be kept running long enough for the interrupt handler // goroutine must be kept running long enough for the interrupt handler
// goroutine to finish. // goroutine to finish.
go func() { spawn(func() {
log.Info("Starting import") log.Info("Starting import")
resultsChan := importer.Import() resultsChan := importer.Import()
results := <-resultsChan results := <-resultsChan
@ -382,7 +382,7 @@ func (cmd *importCmd) Execute(args []string) error {
results.blocksImported, results.blocksImported,
results.blocksProcessed-results.blocksImported) results.blocksProcessed-results.blocksImported)
shutdownChannel <- nil shutdownChannel <- nil
}() })
// Wait for shutdown signal from either a normal completion or from the // Wait for shutdown signal from either a normal completion or from the
// interrupt handler. // interrupt handler.

View File

@ -13,6 +13,7 @@ import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/database" "github.com/daglabs/btcd/database"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
flags "github.com/jessevdk/go-flags" flags "github.com/jessevdk/go-flags"
) )
@ -23,6 +24,7 @@ const (
var ( var (
log btclog.Logger log btclog.Logger
spawn = panics.GoroutineWrapperFunc(log)
shutdownChannel = make(chan error) shutdownChannel = make(chan error)
) )

View File

@ -51,9 +51,9 @@ func mainInterruptHandler() {
} }
// Signal the main goroutine to shutdown. // Signal the main goroutine to shutdown.
go func() { spawn(func() {
shutdownChannel <- nil shutdownChannel <- nil
}() })
case handler := <-addHandlerChannel: case handler := <-addHandlerChannel:
// The shutdown signal has already been received, so // The shutdown signal has already been received, so
@ -75,7 +75,7 @@ func addInterruptHandler(handler func()) {
if interruptChannel == nil { if interruptChannel == nil {
interruptChannel = make(chan os.Signal, 1) interruptChannel = make(chan os.Signal, 1)
signal.Notify(interruptChannel, os.Interrupt) signal.Notify(interruptChannel, os.Interrupt)
go mainInterruptHandler() spawn(mainInterruptHandler)
} }
addHandlerChannel <- handler addHandlerChannel <- handler

View File

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/util/panics"
"github.com/daglabs/btcd/connmgr" "github.com/daglabs/btcd/connmgr"
"github.com/daglabs/btcd/peer" "github.com/daglabs/btcd/peer"
@ -155,6 +156,7 @@ func creep() {
} }
func main() { func main() {
defer panics.HandlePanic(log)
cfg, err := loadConfig() cfg, err := loadConfig()
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "loadConfig: %v\n", err) fmt.Fprintf(os.Stderr, "loadConfig: %v\n", err)
@ -193,11 +195,11 @@ func main() {
} }
wg.Add(1) wg.Add(1)
go creep() spawn(creep)
dnsServer := NewDNSServer(cfg.Host, cfg.Nameserver, cfg.Listen) dnsServer := NewDNSServer(cfg.Host, cfg.Nameserver, cfg.Listen)
wg.Add(1) wg.Add(1)
go dnsServer.Start() spawn(dnsServer.Start)
defer func() { defer func() {
log.Infof("Gracefully shutting down the seeder...") log.Infof("Gracefully shutting down the seeder...")

View File

@ -2,10 +2,12 @@ package main
import ( import (
"fmt" "fmt"
"github.com/btcsuite/btclog"
"github.com/jrick/logrotate/rotator"
"os" "os"
"path/filepath" "path/filepath"
"github.com/btcsuite/btclog"
"github.com/daglabs/btcd/util/panics"
"github.com/jrick/logrotate/rotator"
) )
type logWriter struct{} type logWriter struct{}
@ -22,6 +24,7 @@ var (
backendLog = btclog.NewBackend(logWriter{}) backendLog = btclog.NewBackend(logWriter{})
LogRotator *rotator.Rotator LogRotator *rotator.Rotator
log = backendLog.Logger("SEED") log = backendLog.Logger("SEED")
spawn = panics.GoroutineWrapperFunc(log)
initiated = false initiated = false
) )

View File

@ -137,7 +137,7 @@ func NewManager(dataDir string) (*Manager, error) {
} }
amgr.wg.Add(1) amgr.wg.Add(1)
go amgr.addressHandler() spawn(amgr.addressHandler)
return &amgr, nil return &amgr, nil
} }

2
log.go
View File

@ -7,7 +7,9 @@ package main
import ( import (
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
var btcdLog, _ = logger.Get(logger.SubsystemTags.BTCD) var btcdLog, _ = logger.Get(logger.SubsystemTags.BTCD)
var spawn = panics.GoroutineWrapperFunc(btcdLog)
var srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR) var srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR)

View File

@ -372,7 +372,9 @@ func (m *CPUMiner) miningWorkerController() {
runningWorkers = append(runningWorkers, quit) runningWorkers = append(runningWorkers, quit)
m.workerWg.Add(1) m.workerWg.Add(1)
go m.generateBlocks(quit) spawn(func() {
m.generateBlocks(quit)
})
} }
} }
@ -437,8 +439,8 @@ func (m *CPUMiner) Start() {
m.quit = make(chan struct{}) m.quit = make(chan struct{})
m.speedMonitorQuit = make(chan struct{}) m.speedMonitorQuit = make(chan struct{})
m.wg.Add(2) m.wg.Add(2)
go m.speedMonitor() spawn(m.speedMonitor)
go m.miningWorkerController() spawn(m.miningWorkerController)
m.started = true m.started = true
log.Infof("CPU miner started, number of workers %d", m.numWorkers) log.Infof("CPU miner started, number of workers %d", m.numWorkers)
@ -552,7 +554,7 @@ func (m *CPUMiner) GenerateNBlocks(n uint32) ([]*daghash.Hash, error) {
m.speedMonitorQuit = make(chan struct{}) m.speedMonitorQuit = make(chan struct{})
m.wg.Add(1) m.wg.Add(1)
go m.speedMonitor() spawn(m.speedMonitor)
m.Unlock() m.Unlock()

View File

@ -7,13 +7,16 @@ package cpuminer
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.MINR) log, _ = logger.Get(logger.SubsystemTags.MINR)
spawn = panics.GoroutineWrapperFunc(log)
} }

View File

@ -2,11 +2,13 @@ package main
import ( import (
"fmt" "fmt"
"github.com/btcsuite/btclog"
"github.com/daglabs/btcd/rpcclient"
"github.com/jrick/logrotate/rotator"
"os" "os"
"path/filepath" "path/filepath"
"github.com/btcsuite/btclog"
"github.com/daglabs/btcd/rpcclient"
"github.com/daglabs/btcd/util/panics"
"github.com/jrick/logrotate/rotator"
) )
type logWriter struct{} type logWriter struct{}
@ -23,6 +25,7 @@ var (
backendLog = btclog.NewBackend(logWriter{}) backendLog = btclog.NewBackend(logWriter{})
LogRotator *rotator.Rotator LogRotator *rotator.Rotator
log = backendLog.Logger("MNSM") log = backendLog.Logger("MNSM")
spawn = panics.GoroutineWrapperFunc(log)
initiated = false initiated = false
) )

View File

@ -3,13 +3,13 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"runtime/debug"
"github.com/daglabs/btcd/signal" "github.com/daglabs/btcd/signal"
"github.com/daglabs/btcd/util/panics"
) )
func main() { func main() {
defer handlePanic() defer panics.HandlePanic(log)
cfg, err := parseConfig() cfg, err := parseConfig()
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Error parsing command-line arguments: %s", err) fmt.Fprintf(os.Stderr, "Error parsing command-line arguments: %s", err)
@ -31,12 +31,12 @@ func main() {
} }
defer disconnect(clients) defer disconnect(clients)
go func() { spawn(func() {
err = mineLoop(clients) err = mineLoop(clients)
if err != nil { if err != nil {
panic(fmt.Errorf("Error in main loop: %s", err)) panic(fmt.Errorf("Error in main loop: %s", err))
} }
}() })
interrupt := signal.InterruptListener() interrupt := signal.InterruptListener()
<-interrupt <-interrupt
@ -47,11 +47,3 @@ func disconnect(clients []*simulatorClient) {
client.Disconnect() client.Disconnect()
} }
} }
func handlePanic() {
err := recover()
if err != nil {
log.Errorf("Fatal error: %s", err)
log.Errorf("Stack trace: %s", debug.Stack())
}
}

View File

@ -164,7 +164,7 @@ func mineLoop(clients []*simulatorClient) error {
templateStopChan := make(chan struct{}) templateStopChan := make(chan struct{})
go func() { spawn(func() {
for { for {
currentClient := getRandomClient(clients) currentClient := getRandomClient(clients)
currentClient.notifyForNewBlocks = true currentClient.notifyForNewBlocks = true
@ -182,7 +182,7 @@ func mineLoop(clients []*simulatorClient) error {
return return
} }
} }
}() })
err := <-errChan err := <-errChan

View File

@ -7,13 +7,16 @@ package netsync
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.SYNC) log, _ = logger.Get(logger.SubsystemTags.SYNC)
spawn = panics.GoroutineWrapperFunc(log)
} }

View File

@ -1231,13 +1231,13 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
// Update mempool // Update mempool
ch := make(chan mempool.NewBlockMsg) ch := make(chan mempool.NewBlockMsg)
go func() { spawn(func() {
err := sm.txMemPool.HandleNewBlock(block, ch) err := sm.txMemPool.HandleNewBlock(block, ch)
close(ch) close(ch)
if err != nil { if err != nil {
panic(fmt.Sprintf("HandleNewBlock failed to handle block %s", block.Hash())) panic(fmt.Sprintf("HandleNewBlock failed to handle block %s", block.Hash()))
} }
}() })
for msg := range ch { for msg := range ch {
sm.peerNotifier.TransactionConfirmed(msg.Tx) sm.peerNotifier.TransactionConfirmed(msg.Tx)
sm.peerNotifier.AnnounceNewTransactions(msg.AcceptedTxs) sm.peerNotifier.AnnounceNewTransactions(msg.AcceptedTxs)
@ -1336,7 +1336,7 @@ func (sm *SyncManager) Start() {
log.Trace("Starting sync manager") log.Trace("Starting sync manager")
sm.wg.Add(1) sm.wg.Add(1)
go sm.blockHandler() spawn(sm.blockHandler)
} }
// Stop gracefully shuts down the sync manager by stopping all asynchronous // Stop gracefully shuts down the sync manager by stopping all asynchronous

View File

@ -13,6 +13,7 @@ import (
"github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/txscript" "github.com/daglabs/btcd/txscript"
"github.com/daglabs/btcd/util/panics"
"github.com/daglabs/btcd/wire" "github.com/daglabs/btcd/wire"
) )
@ -26,10 +27,12 @@ const (
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
// The default amount of logging is none. // The default amount of logging is none.
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.PEER) log, _ = logger.Get(logger.SubsystemTags.PEER)
spawn = panics.GoroutineWrapperFunc(log)
} }
// LogClosure is a closure that can be printed with %s to be used to // LogClosure is a closure that can be printed with %s to be used to

View File

@ -1868,9 +1868,9 @@ func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
// it is marked as disconnected and *then* it drains the channels. // it is marked as disconnected and *then* it drains the channels.
if !p.Connected() { if !p.Connected() {
if doneChan != nil { if doneChan != nil {
go func() { spawn(func() {
doneChan <- struct{}{} doneChan <- struct{}{}
}() })
} }
return return
} }
@ -1925,12 +1925,12 @@ func (p *Peer) AssociateConnection(conn net.Conn) {
p.na = na p.na = na
} }
go func() { spawn(func() {
if err := p.start(); err != nil { if err := p.start(); err != nil {
log.Debugf("Cannot start peer %s: %s", p, err) log.Debugf("Cannot start peer %s: %s", p, err)
p.Disconnect() p.Disconnect()
} }
}() })
} }
// Connected returns whether or not the peer is currently connected. // Connected returns whether or not the peer is currently connected.
@ -1961,13 +1961,13 @@ func (p *Peer) start() error {
log.Tracef("Starting peer %s", p) log.Tracef("Starting peer %s", p)
negotiateErr := make(chan error, 1) negotiateErr := make(chan error, 1)
go func() { spawn(func() {
if p.inbound { if p.inbound {
negotiateErr <- p.negotiateInboundProtocol() negotiateErr <- p.negotiateInboundProtocol()
} else { } else {
negotiateErr <- p.negotiateOutboundProtocol() negotiateErr <- p.negotiateOutboundProtocol()
} }
}() })
// Negotiate the protocol within the specified negotiateTimeout. // Negotiate the protocol within the specified negotiateTimeout.
select { select {
@ -1982,11 +1982,11 @@ func (p *Peer) start() error {
// The protocol has been negotiated successfully so start processing input // The protocol has been negotiated successfully so start processing input
// and output messages. // and output messages.
go p.stallHandler() spawn(p.stallHandler)
go p.inHandler() spawn(p.inHandler)
go p.queueHandler() spawn(p.queueHandler)
go p.outHandler() spawn(p.outHandler)
go p.pingHandler() spawn(p.pingHandler)
// Send our verack message now that the IO processing machinery has started. // Send our verack message now that the IO processing machinery has started.
p.QueueMessage(wire.NewMsgVerAck(), nil) p.QueueMessage(wire.NewMsgVerAck(), nil)

View File

@ -650,7 +650,7 @@ out:
// Reissue pending requests in another goroutine since // Reissue pending requests in another goroutine since
// the send can block. // the send can block.
go c.resendRequests() spawn(c.resendRequests)
// Break out of the reconnect loop back to wait for // Break out of the reconnect loop back to wait for
// disconnect again. // disconnect again.
@ -815,7 +815,7 @@ func (c *Client) sendRequest(data *jsonRequestData) chan *response {
} else { } else {
jReq.responseChan = responseChan jReq.responseChan = responseChan
} }
go func() { spawn(func() {
// Choose which marshal and send function to use depending on whether // Choose which marshal and send function to use depending on whether
// the client running in HTTP POST mode or not. When running in HTTP // the client running in HTTP POST mode or not. When running in HTTP
// POST mode, the command is issued via an HTTP client. Otherwise, // POST mode, the command is issued via an HTTP client. Otherwise,
@ -844,16 +844,16 @@ func (c *Client) sendRequest(data *jsonRequestData) chan *response {
} }
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
c.sendMessage(jReq.marshalledJSON) c.sendMessage(jReq.marshalledJSON)
}() })
if cancelOnTimeout { if cancelOnTimeout {
go func() { spawn(func() {
select { select {
case <-time.Tick(c.config.RequestTimeout): case <-time.Tick(c.config.RequestTimeout):
responseChan <- &response{err: ErrResponseTimedOut} responseChan <- &response{err: ErrResponseTimedOut}
case resp := <-jReq.responseChan: case resp := <-jReq.responseChan:
responseChan <- resp responseChan <- resp
} }
}() })
} }
return responseChan return responseChan
} }
@ -1021,19 +1021,19 @@ func (c *Client) start() {
// in HTTP POST mode or the default websocket mode. // in HTTP POST mode or the default websocket mode.
if c.config.HTTPPostMode { if c.config.HTTPPostMode {
c.wg.Add(1) c.wg.Add(1)
go c.sendPostHandler() spawn(c.sendPostHandler)
} else { } else {
c.wg.Add(3) c.wg.Add(3)
go func() { spawn(func() {
if c.ntfnHandlers != nil { if c.ntfnHandlers != nil {
if c.ntfnHandlers.OnClientConnected != nil { if c.ntfnHandlers.OnClientConnected != nil {
c.ntfnHandlers.OnClientConnected() c.ntfnHandlers.OnClientConnected()
} }
} }
c.wg.Done() c.wg.Done()
}() })
go c.wsInHandler() spawn(c.wsInHandler)
go c.wsOutHandler() spawn(c.wsOutHandler)
} }
} }
@ -1269,7 +1269,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error
client.start() client.start()
if !client.config.HTTPPostMode && !client.config.DisableAutoReconnect { if !client.config.HTTPPostMode && !client.config.DisableAutoReconnect {
client.wg.Add(1) client.wg.Add(1)
go client.wsReconnectHandler() spawn(client.wsReconnectHandler)
} }
} }
@ -1324,7 +1324,7 @@ func (c *Client) Connect(tries int) error {
c.start() c.start()
if !c.config.DisableAutoReconnect { if !c.config.DisableAutoReconnect {
c.wg.Add(1) c.wg.Add(1)
go c.wsReconnectHandler() spawn(c.wsReconnectHandler)
} }
return nil return nil
} }

View File

@ -6,12 +6,14 @@ package rpcclient
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
// The default amount of logging is none. // The default amount of logging is none.
func init() { func init() {
@ -22,11 +24,13 @@ func init() {
// by default until UseLogger is called. // by default until UseLogger is called.
func DisableLog() { func DisableLog() {
log = btclog.Disabled log = btclog.Disabled
spawn = panics.GoroutineWrapperFunc(log)
} }
// UseLogger uses a specified Logger to output package logging info. // UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) { func UseLogger(logger btclog.Logger) {
log = logger log = logger
spawn = panics.GoroutineWrapperFunc(log)
} }
// LogClosure is a closure that can be printed with %s to be used to // LogClosure is a closure that can be printed with %s to be used to

View File

@ -7,12 +7,14 @@ package server
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var srvrLog, peerLog, txmpLog, indxLog, rpcsLog, amgrLog btclog.Logger var srvrLog, peerLog, txmpLog, indxLog, rpcsLog, amgrLog btclog.Logger
var spawn func(func())
func init() { func init() {
srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR) srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR)
@ -21,4 +23,6 @@ func init() {
indxLog, _ = logger.Get(logger.SubsystemTags.INDX) indxLog, _ = logger.Get(logger.SubsystemTags.INDX)
rpcsLog, _ = logger.Get(logger.SubsystemTags.RPCS) rpcsLog, _ = logger.Get(logger.SubsystemTags.RPCS)
amgrLog, _ = logger.Get(logger.SubsystemTags.AMGR) amgrLog, _ = logger.Get(logger.SubsystemTags.AMGR)
spawn = panics.GoroutineWrapperFunc(srvrLog)
} }

View File

@ -7,16 +7,20 @@ package p2p
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var srvrLog, peerLog, txmpLog, indxLog, rpcsLog, amgrLog btclog.Logger var srvrLog, peerLog, txmpLog, indxLog, rpcsLog, amgrLog btclog.Logger
var spawn func(func())
func init() { func init() {
srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR) srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR)
peerLog, _ = logger.Get(logger.SubsystemTags.PEER) peerLog, _ = logger.Get(logger.SubsystemTags.PEER)
spawn = panics.GoroutineWrapperFunc(peerLog)
txmpLog, _ = logger.Get(logger.SubsystemTags.TXMP) txmpLog, _ = logger.Get(logger.SubsystemTags.TXMP)
indxLog, _ = logger.Get(logger.SubsystemTags.INDX) indxLog, _ = logger.Get(logger.SubsystemTags.INDX)
rpcsLog, _ = logger.Get(logger.SubsystemTags.RPCS) rpcsLog, _ = logger.Get(logger.SubsystemTags.RPCS)

View File

@ -1826,7 +1826,9 @@ func (s *Server) inboundPeerConnected(conn net.Conn) {
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
sp.AssociateConnection(conn) sp.AssociateConnection(conn)
go s.peerDoneHandler(sp) spawn(func() {
s.peerDoneHandler(sp)
})
} }
// outboundPeerConnected is invoked by the connection manager when a new // outboundPeerConnected is invoked by the connection manager when a new
@ -1845,7 +1847,9 @@ func (s *Server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
sp.connReq = c sp.connReq = c
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
sp.AssociateConnection(conn) sp.AssociateConnection(conn)
go s.peerDoneHandler(sp) spawn(func() {
s.peerDoneHandler(sp)
})
s.addrManager.Attempt(sp.NA()) s.addrManager.Attempt(sp.NA())
} }
@ -1911,7 +1915,7 @@ func (s *Server) peerHandler() {
seedFromSubNetwork(config.MainConfig().SubnetworkID) seedFromSubNetwork(config.MainConfig().SubnetworkID)
} }
} }
go s.connManager.Start() spawn(s.connManager.Start)
out: out:
for { for {
@ -2109,11 +2113,11 @@ func (s *Server) Start() {
// Start the peer handler which in turn starts the address and block // Start the peer handler which in turn starts the address and block
// managers. // managers.
s.wg.Add(1) s.wg.Add(1)
go s.peerHandler() spawn(s.peerHandler)
if s.nat != nil { if s.nat != nil {
s.wg.Add(1) s.wg.Add(1)
go s.upnpUpdateThread() spawn(s.upnpUpdateThread)
} }
cfg := config.MainConfig() cfg := config.MainConfig()
@ -2123,7 +2127,7 @@ func (s *Server) Start() {
// Start the rebroadcastHandler, which ensures user tx received by // Start the rebroadcastHandler, which ensures user tx received by
// the RPC server are rebroadcast until being included in a block. // the RPC server are rebroadcast until being included in a block.
go s.rebroadcastHandler() spawn(s.rebroadcastHandler)
} }
} }
@ -2158,7 +2162,7 @@ func (s *Server) ScheduleShutdown(duration time.Duration) {
return return
} }
srvrLog.Warnf("Server shutdown in %s", duration) srvrLog.Warnf("Server shutdown in %s", duration)
go func() { spawn(func() {
remaining := duration remaining := duration
tickDuration := dynamicTickDuration(remaining) tickDuration := dynamicTickDuration(remaining)
done := time.After(remaining) done := time.After(remaining)
@ -2186,7 +2190,7 @@ func (s *Server) ScheduleShutdown(duration time.Duration) {
srvrLog.Warnf("Server shutdown in %s", remaining) srvrLog.Warnf("Server shutdown in %s", remaining)
} }
} }
}() })
} }
// ParseListeners determines whether each listen address is IPv4 and IPv6 and // ParseListeners determines whether each listen address is IPv4 and IPv6 and

View File

@ -7,13 +7,16 @@ package rpc
import ( import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
) )
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
// requests it. // requests it.
var log btclog.Logger var log btclog.Logger
var spawn func(func())
func init() { func init() {
log, _ = logger.Get(logger.SubsystemTags.RPCS) log, _ = logger.Get(logger.SubsystemTags.RPCS)
spawn = panics.GoroutineWrapperFunc(log)
} }

View File

@ -1503,12 +1503,12 @@ func (state *gbtWorkState) notifyLongPollers(tipHashes []*daghash.Hash, lastGene
// clients with a new block template when their existing block template is // clients with a new block template when their existing block template is
// stale due to the newly added block. // stale due to the newly added block.
func (state *gbtWorkState) NotifyBlockAdded(tipHashes []*daghash.Hash) { func (state *gbtWorkState) NotifyBlockAdded(tipHashes []*daghash.Hash) {
go func() { spawn(func() {
state.Lock() state.Lock()
defer state.Unlock() defer state.Unlock()
state.notifyLongPollers(tipHashes, state.lastTxUpdate) state.notifyLongPollers(tipHashes, state.lastTxUpdate)
}() })
} }
// NotifyMempoolTx uses the new last updated time for the transaction memory // NotifyMempoolTx uses the new last updated time for the transaction memory
@ -1516,7 +1516,7 @@ func (state *gbtWorkState) NotifyBlockAdded(tipHashes []*daghash.Hash) {
// existing block template is stale due to enough time passing and the contents // existing block template is stale due to enough time passing and the contents
// of the memory pool changing. // of the memory pool changing.
func (state *gbtWorkState) NotifyMempoolTx(lastUpdated time.Time) { func (state *gbtWorkState) NotifyMempoolTx(lastUpdated time.Time) {
go func() { spawn(func() {
state.Lock() state.Lock()
defer state.Unlock() defer state.Unlock()
@ -1531,7 +1531,7 @@ func (state *gbtWorkState) NotifyMempoolTx(lastUpdated time.Time) {
state.notifyLongPollers(state.tipHashes, lastUpdated) state.notifyLongPollers(state.tipHashes, lastUpdated)
} }
}() })
} }
// templateUpdateChan returns a channel that will be closed once the block // templateUpdateChan returns a channel that will be closed once the block
@ -3918,12 +3918,12 @@ func (s *Server) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin boo
// Setup a close notifier. Since the connection is hijacked, // Setup a close notifier. Since the connection is hijacked,
// the CloseNotifer on the ResponseWriter is not available. // the CloseNotifer on the ResponseWriter is not available.
closeChan := make(chan struct{}, 1) closeChan := make(chan struct{}, 1)
go func() { spawn(func() {
_, err := conn.Read(make([]byte, 1)) _, err := conn.Read(make([]byte, 1))
if err != nil { if err != nil {
close(closeChan) close(closeChan)
} }
}() })
// Check if the user is limited and set error if method unauthorized // Check if the user is limited and set error if method unauthorized
if !isAdmin { if !isAdmin {

View File

@ -812,8 +812,8 @@ func (m *wsNotificationManager) RemoveClient(wsc *wsClient) {
// websocket client notifications. // websocket client notifications.
func (m *wsNotificationManager) Start() { func (m *wsNotificationManager) Start() {
m.wg.Add(2) m.wg.Add(2)
go m.queueHandler() spawn(m.queueHandler)
go m.notificationHandler() spawn(m.notificationHandler)
} }
// WaitForShutdown blocks until all notification manager goroutines have // WaitForShutdown blocks until all notification manager goroutines have
@ -1078,10 +1078,10 @@ out:
// read of the next request from the websocket client and allow // read of the next request from the websocket client and allow
// many requests to be waited on concurrently. // many requests to be waited on concurrently.
c.serviceRequestSem.acquire() c.serviceRequestSem.acquire()
go func() { spawn(func() {
c.serviceRequest(cmd) c.serviceRequest(cmd)
c.serviceRequestSem.release() c.serviceRequestSem.release()
}() })
} }
// Ensure the connection is closed. // Ensure the connection is closed.
@ -1303,9 +1303,9 @@ func (c *wsClient) Start() {
// Start processing input and output. // Start processing input and output.
c.wg.Add(3) c.wg.Add(3)
go c.inHandler() spawn(c.inHandler)
go c.notificationQueueHandler() spawn(c.notificationQueueHandler)
go c.outHandler() spawn(c.outHandler)
} }
// WaitForShutdown blocks until the websocket client goroutines are stopped // WaitForShutdown blocks until the websocket client goroutines are stopped

View File

@ -132,10 +132,10 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params
} }
// Signal process shutdown when the RPC server requests it. // Signal process shutdown when the RPC server requests it.
go func() { spawn(func() {
<-s.rpcServer.RequestedProcessShutdown() <-s.rpcServer.RequestedProcessShutdown()
signal.ShutdownRequestChannel <- struct{}{} signal.ShutdownRequestChannel <- struct{}{}
}() })
} }
return s, nil return s, nil

View File

@ -67,10 +67,10 @@ func (s *btcdService) Execute(args []string, r <-chan svc.ChangeRequest, changes
// it is started so it can be gracefully stopped. // it is started so it can be gracefully stopped.
doneChan := make(chan error) doneChan := make(chan error)
serverChan := make(chan *server.Server) serverChan := make(chan *server.Server)
go func() { spawn(func() {
err := btcdMain(serverChan) err := btcdMain(serverChan)
doneChan <- err doneChan <- err
}() })
// Service is now started. // Service is now started.
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted} changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}

31
util/panics/panics.go Normal file
View File

@ -0,0 +1,31 @@
package panics
import (
"os"
"runtime/debug"
"github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger"
)
// HandlePanic recovers panics, log them, and then exits the process.
func HandlePanic(log btclog.Logger) {
if err := recover(); err != nil {
log.Criticalf("Fatal error: %s", err)
log.Criticalf("Stack trace: %s", debug.Stack())
if logger.LogRotator != nil {
logger.LogRotator.Close()
}
os.Exit(1)
}
}
// GoroutineWrapperFunc returns a goroutine wrapper function that handles panics and write them to the log.
func GoroutineWrapperFunc(log btclog.Logger) func(func()) {
return func(f func()) {
go func() {
defer HandlePanic(log)
f()
}()
}
}