From b7b41f1a941b09c666055cbf5d7df71a31d6d026 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 7 May 2019 16:13:07 +0300 Subject: [PATCH] [NOD-159] Wrap all goroutines to handle panics (#290) * [NOD-159] Wrap all goroutines to handle panics * [NOD-159] Fix gofmt errors * [NOD-159] Add comment to HandlePanic * [NOD-159] Merge panics and gowrapper packages * [NOD-159] Added missing initialization --- addrmgr/addrmanager.go | 2 +- addrmgr/log.go | 3 +++ blockdag/indexers/log.go | 3 +++ blockdag/log.go | 3 +++ blockdag/scriptval.go | 2 +- btcd.go | 12 ++++------- cmd/addblock/addblock.go | 7 ++++-- cmd/addblock/import.go | 12 ++++++----- cmd/txgen/main.go | 12 ++--------- connmgr/connmanager.go | 6 +++--- connmgr/log.go | 4 +++- database/cmd/dbtool/insecureimport.go | 12 +++++------ database/cmd/dbtool/main.go | 2 ++ database/cmd/dbtool/signal.go | 6 +++--- dnsseeder/dnsseed.go | 6 ++++-- dnsseeder/log.go | 7 ++++-- dnsseeder/manager.go | 2 +- log.go | 2 ++ mining/cpuminer/cpuminer.go | 10 +++++---- mining/cpuminer/log.go | 3 +++ mining/simulator/log.go | 9 +++++--- mining/simulator/main.go | 16 ++++---------- mining/simulator/mineloop.go | 4 ++-- netsync/log.go | 3 +++ netsync/manager.go | 6 +++--- peer/log.go | 3 +++ peer/peer.go | 22 +++++++++---------- rpcclient/infrastructure.go | 24 ++++++++++----------- rpcclient/log.go | 4 ++++ server/log.go | 4 ++++ server/p2p/log.go | 4 ++++ server/p2p/p2p.go | 20 ++++++++++------- server/rpc/log.go | 3 +++ server/rpc/rpcserver.go | 12 +++++------ server/rpc/rpcwebsocket.go | 14 ++++++------ server/server.go | 4 ++-- service_windows.go | 4 ++-- util/panics/panics.go | 31 +++++++++++++++++++++++++++ 38 files changed, 186 insertions(+), 117 deletions(-) create mode 100644 util/panics/panics.go diff --git a/addrmgr/addrmanager.go b/addrmgr/addrmanager.go index 82d4e60f3..6bd848b0c 100644 --- a/addrmgr/addrmanager.go +++ b/addrmgr/addrmanager.go @@ -724,7 +724,7 @@ func (a *AddrManager) Start() { // Start the address ticker to save addresses periodically. a.wg.Add(1) - go a.addressHandler() + spawn(a.addressHandler) } // Stop gracefully shuts down the address manager by stopping the main handler. diff --git a/addrmgr/log.go b/addrmgr/log.go index af1db624d..19ec5ea94 100644 --- a/addrmgr/log.go +++ b/addrmgr/log.go @@ -7,13 +7,16 @@ package addrmgr import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) func init() { log, _ = logger.Get(logger.SubsystemTags.ADXR) + spawn = panics.GoroutineWrapperFunc(log) } diff --git a/blockdag/indexers/log.go b/blockdag/indexers/log.go index 2a09287be..23d33cce7 100644 --- a/blockdag/indexers/log.go +++ b/blockdag/indexers/log.go @@ -7,14 +7,17 @@ package indexers import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) // The default amount of logging is none. func init() { log, _ = logger.Get(logger.SubsystemTags.INDX) + spawn = panics.GoroutineWrapperFunc(log) } diff --git a/blockdag/log.go b/blockdag/log.go index a550d95c5..8a3c8c831 100644 --- a/blockdag/log.go +++ b/blockdag/log.go @@ -7,14 +7,17 @@ package blockdag import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) // The default amount of logging is none. func init() { log, _ = logger.Get(logger.SubsystemTags.CHAN) + spawn = panics.GoroutineWrapperFunc(log) } diff --git a/blockdag/scriptval.go b/blockdag/scriptval.go index ead9d149d..522bd6477 100644 --- a/blockdag/scriptval.go +++ b/blockdag/scriptval.go @@ -127,7 +127,7 @@ func (v *txValidator) Validate(items []*txValidateItem) error { // Start up validation handlers that are used to asynchronously // validate each transaction input. for i := 0; i < maxGoRoutines; i++ { - go v.validateHandler() + spawn(v.validateHandler) } // Validate each of the inputs. The quit channel is closed when any diff --git a/btcd.go b/btcd.go index 26df72531..3ad9d4962 100644 --- a/btcd.go +++ b/btcd.go @@ -21,10 +21,10 @@ import ( "github.com/daglabs/btcd/database" _ "github.com/daglabs/btcd/database/ffldb" "github.com/daglabs/btcd/limits" - "github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/server" "github.com/daglabs/btcd/signal" "github.com/daglabs/btcd/util/fs" + "github.com/daglabs/btcd/util/panics" "github.com/daglabs/btcd/version" ) @@ -56,11 +56,7 @@ func btcdMain(serverChan chan<- *server.Server) error { return err } cfg = config.MainConfig() - defer func() { - if logger.LogRotator != nil { - logger.LogRotator.Close() - } - }() + defer panics.HandlePanic(btcdLog) // Get a channel that will be closed when a shutdown signal has been // triggered either from an OS signal such as SIGINT (Ctrl+C) or from @@ -73,14 +69,14 @@ func btcdMain(serverChan chan<- *server.Server) error { // Enable http profiling server if requested. if cfg.Profile != "" { - go func() { + spawn(func() { listenAddr := net.JoinHostPort("", cfg.Profile) btcdLog.Infof("Profile server listening on %s", listenAddr) profileRedirect := http.RedirectHandler("/debug/pprof", http.StatusSeeOther) http.Handle("/", profileRedirect) btcdLog.Errorf("%s", http.ListenAndServe(listenAddr, nil)) - }() + }) } // Write cpu profile if requested. diff --git a/cmd/addblock/addblock.go b/cmd/addblock/addblock.go index c2ac9a9a3..7a5d3f61f 100644 --- a/cmd/addblock/addblock.go +++ b/cmd/addblock/addblock.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/database" "github.com/daglabs/btcd/limits" + "github.com/daglabs/btcd/util/panics" ) const ( @@ -20,8 +21,9 @@ const ( ) var ( - cfg *config - log btclog.Logger + cfg *config + log btclog.Logger + spawn func(func()) ) // loadBlockDB opens the block database and returns a handle to it. @@ -70,6 +72,7 @@ func realMain() error { backendLogger := btclog.NewBackend(os.Stdout) defer os.Stdout.Sync() log = backendLogger.Logger("MAIN") + spawn = panics.GoroutineWrapperFunc(log) // Load the block database. db, err := loadBlockDB() diff --git a/cmd/addblock/import.go b/cmd/addblock/import.go index cb76c7d23..136f08698 100644 --- a/cmd/addblock/import.go +++ b/cmd/addblock/import.go @@ -273,20 +273,22 @@ func (bi *blockImporter) Import() chan *importResults { // Start up the read and process handling goroutines. This setup allows // blocks to be read from disk in parallel while being processed. bi.wg.Add(2) - go bi.readHandler() - go bi.processHandler() + spawn(bi.readHandler) + spawn(bi.processHandler) // Wait for the import to finish in a separate goroutine and signal // the status handler when done. - go func() { + spawn(func() { bi.wg.Wait() bi.doneChan <- true - }() + }) // Start the status handler and return the result channel that it will // send the results on when the import is done. resultChan := make(chan *importResults) - go bi.statusHandler(resultChan) + spawn(func() { + bi.statusHandler(resultChan) + }) return resultChan } diff --git a/cmd/txgen/main.go b/cmd/txgen/main.go index 380aaf00c..a216e6825 100644 --- a/cmd/txgen/main.go +++ b/cmd/txgen/main.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "runtime/debug" "sync/atomic" "github.com/daglabs/btcd/btcec" @@ -11,6 +10,7 @@ import ( "github.com/daglabs/btcd/signal" "github.com/daglabs/btcd/util" "github.com/daglabs/btcd/util/base58" + "github.com/daglabs/btcd/util/panics" ) var ( @@ -31,7 +31,7 @@ func privateKeyToP2pkhAddress(key *btcec.PrivateKey, net *dagconfig.Params) (uti } func main() { - defer handlePanic() + defer panics.HandlePanic(log) cfg, err := parseConfig() if err != nil { @@ -75,11 +75,3 @@ func disconnect(clients []*rpcclient.Client) { client.Disconnect() } } - -func handlePanic() { - err := recover() - if err != nil { - log.Errorf("Fatal error: %s", err) - log.Errorf("Stack trace: %s", debug.Stack()) - } -} diff --git a/connmgr/connmanager.go b/connmgr/connmanager.go index 761e849cc..0bd5b221d 100644 --- a/connmgr/connmanager.go +++ b/connmgr/connmanager.go @@ -214,7 +214,7 @@ func (cm *ConnManager) handleFailedConn(c *ConnReq) { cm.NewConnReq() }) } else { - go cm.NewConnReq() + spawn(cm.NewConnReq) } } } @@ -507,7 +507,7 @@ func (cm *ConnManager) Start() { log.Trace("Connection manager started") cm.wg.Add(1) - go cm.connHandler() + spawn(cm.connHandler) // Start all the listeners so long as the caller requested them and // provided a callback to be invoked when connections are accepted. @@ -519,7 +519,7 @@ func (cm *ConnManager) Start() { } for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ { - go cm.NewConnReq() + spawn(cm.NewConnReq) } } diff --git a/connmgr/log.go b/connmgr/log.go index c0125ef8e..bdcb9f243 100644 --- a/connmgr/log.go +++ b/connmgr/log.go @@ -7,15 +7,17 @@ package connmgr import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) // The default amount of logging is none. func init() { log, _ = logger.Get(logger.SubsystemTags.CMGR) - + spawn = panics.GoroutineWrapperFunc(log) } diff --git a/database/cmd/dbtool/insecureimport.go b/database/cmd/dbtool/insecureimport.go index 843fc977a..43d0cc878 100644 --- a/database/cmd/dbtool/insecureimport.go +++ b/database/cmd/dbtool/insecureimport.go @@ -288,15 +288,15 @@ func (bi *blockImporter) Import() chan *importResults { // Start up the read and process handling goroutines. This setup allows // blocks to be read from disk in parallel while being processed. bi.wg.Add(2) - go bi.readHandler() - go bi.processHandler() + spawn(bi.readHandler) + spawn(bi.processHandler) // Wait for the import to finish in a separate goroutine and signal // the status handler when done. - go func() { + spawn(func() { bi.wg.Wait() bi.doneChan <- true - }() + }) // Start the status handler and return the result channel that it will // send the results on when the import is done. @@ -365,7 +365,7 @@ func (cmd *importCmd) Execute(args []string) error { // or from the main interrupt handler. This is necessary since the main // goroutine must be kept running long enough for the interrupt handler // goroutine to finish. - go func() { + spawn(func() { log.Info("Starting import") resultsChan := importer.Import() results := <-resultsChan @@ -382,7 +382,7 @@ func (cmd *importCmd) Execute(args []string) error { results.blocksImported, results.blocksProcessed-results.blocksImported) shutdownChannel <- nil - }() + }) // Wait for shutdown signal from either a normal completion or from the // interrupt handler. diff --git a/database/cmd/dbtool/main.go b/database/cmd/dbtool/main.go index 20424b3f5..d94c839f1 100644 --- a/database/cmd/dbtool/main.go +++ b/database/cmd/dbtool/main.go @@ -13,6 +13,7 @@ import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/database" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" flags "github.com/jessevdk/go-flags" ) @@ -23,6 +24,7 @@ const ( var ( log btclog.Logger + spawn = panics.GoroutineWrapperFunc(log) shutdownChannel = make(chan error) ) diff --git a/database/cmd/dbtool/signal.go b/database/cmd/dbtool/signal.go index a32f1b376..0966cacf5 100644 --- a/database/cmd/dbtool/signal.go +++ b/database/cmd/dbtool/signal.go @@ -51,9 +51,9 @@ func mainInterruptHandler() { } // Signal the main goroutine to shutdown. - go func() { + spawn(func() { shutdownChannel <- nil - }() + }) case handler := <-addHandlerChannel: // The shutdown signal has already been received, so @@ -75,7 +75,7 @@ func addInterruptHandler(handler func()) { if interruptChannel == nil { interruptChannel = make(chan os.Signal, 1) signal.Notify(interruptChannel, os.Interrupt) - go mainInterruptHandler() + spawn(mainInterruptHandler) } addHandlerChannel <- handler diff --git a/dnsseeder/dnsseed.go b/dnsseeder/dnsseed.go index c61a6d051..b241305b0 100644 --- a/dnsseeder/dnsseed.go +++ b/dnsseeder/dnsseed.go @@ -14,6 +14,7 @@ import ( "time" "github.com/daglabs/btcd/dagconfig/daghash" + "github.com/daglabs/btcd/util/panics" "github.com/daglabs/btcd/connmgr" "github.com/daglabs/btcd/peer" @@ -155,6 +156,7 @@ func creep() { } func main() { + defer panics.HandlePanic(log) cfg, err := loadConfig() if err != nil { fmt.Fprintf(os.Stderr, "loadConfig: %v\n", err) @@ -193,11 +195,11 @@ func main() { } wg.Add(1) - go creep() + spawn(creep) dnsServer := NewDNSServer(cfg.Host, cfg.Nameserver, cfg.Listen) wg.Add(1) - go dnsServer.Start() + spawn(dnsServer.Start) defer func() { log.Infof("Gracefully shutting down the seeder...") diff --git a/dnsseeder/log.go b/dnsseeder/log.go index 45b75d901..01e7ded95 100644 --- a/dnsseeder/log.go +++ b/dnsseeder/log.go @@ -2,10 +2,12 @@ package main import ( "fmt" - "github.com/btcsuite/btclog" - "github.com/jrick/logrotate/rotator" "os" "path/filepath" + + "github.com/btcsuite/btclog" + "github.com/daglabs/btcd/util/panics" + "github.com/jrick/logrotate/rotator" ) type logWriter struct{} @@ -22,6 +24,7 @@ var ( backendLog = btclog.NewBackend(logWriter{}) LogRotator *rotator.Rotator log = backendLog.Logger("SEED") + spawn = panics.GoroutineWrapperFunc(log) initiated = false ) diff --git a/dnsseeder/manager.go b/dnsseeder/manager.go index 96dfde95f..43b951e29 100644 --- a/dnsseeder/manager.go +++ b/dnsseeder/manager.go @@ -137,7 +137,7 @@ func NewManager(dataDir string) (*Manager, error) { } amgr.wg.Add(1) - go amgr.addressHandler() + spawn(amgr.addressHandler) return &amgr, nil } diff --git a/log.go b/log.go index fcd4fdc9d..af04d13ac 100644 --- a/log.go +++ b/log.go @@ -7,7 +7,9 @@ package main import ( "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) var btcdLog, _ = logger.Get(logger.SubsystemTags.BTCD) +var spawn = panics.GoroutineWrapperFunc(btcdLog) var srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR) diff --git a/mining/cpuminer/cpuminer.go b/mining/cpuminer/cpuminer.go index 4d97d8775..4b4899b8d 100644 --- a/mining/cpuminer/cpuminer.go +++ b/mining/cpuminer/cpuminer.go @@ -372,7 +372,9 @@ func (m *CPUMiner) miningWorkerController() { runningWorkers = append(runningWorkers, quit) m.workerWg.Add(1) - go m.generateBlocks(quit) + spawn(func() { + m.generateBlocks(quit) + }) } } @@ -437,8 +439,8 @@ func (m *CPUMiner) Start() { m.quit = make(chan struct{}) m.speedMonitorQuit = make(chan struct{}) m.wg.Add(2) - go m.speedMonitor() - go m.miningWorkerController() + spawn(m.speedMonitor) + spawn(m.miningWorkerController) m.started = true log.Infof("CPU miner started, number of workers %d", m.numWorkers) @@ -552,7 +554,7 @@ func (m *CPUMiner) GenerateNBlocks(n uint32) ([]*daghash.Hash, error) { m.speedMonitorQuit = make(chan struct{}) m.wg.Add(1) - go m.speedMonitor() + spawn(m.speedMonitor) m.Unlock() diff --git a/mining/cpuminer/log.go b/mining/cpuminer/log.go index 76b0158e6..dd4a84b10 100644 --- a/mining/cpuminer/log.go +++ b/mining/cpuminer/log.go @@ -7,13 +7,16 @@ package cpuminer import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) func init() { log, _ = logger.Get(logger.SubsystemTags.MINR) + spawn = panics.GoroutineWrapperFunc(log) } diff --git a/mining/simulator/log.go b/mining/simulator/log.go index bafec10a3..7d31ece7a 100644 --- a/mining/simulator/log.go +++ b/mining/simulator/log.go @@ -2,11 +2,13 @@ package main import ( "fmt" - "github.com/btcsuite/btclog" - "github.com/daglabs/btcd/rpcclient" - "github.com/jrick/logrotate/rotator" "os" "path/filepath" + + "github.com/btcsuite/btclog" + "github.com/daglabs/btcd/rpcclient" + "github.com/daglabs/btcd/util/panics" + "github.com/jrick/logrotate/rotator" ) type logWriter struct{} @@ -23,6 +25,7 @@ var ( backendLog = btclog.NewBackend(logWriter{}) LogRotator *rotator.Rotator log = backendLog.Logger("MNSM") + spawn = panics.GoroutineWrapperFunc(log) initiated = false ) diff --git a/mining/simulator/main.go b/mining/simulator/main.go index c25bacaf8..127c0f41f 100644 --- a/mining/simulator/main.go +++ b/mining/simulator/main.go @@ -3,13 +3,13 @@ package main import ( "fmt" "os" - "runtime/debug" "github.com/daglabs/btcd/signal" + "github.com/daglabs/btcd/util/panics" ) func main() { - defer handlePanic() + defer panics.HandlePanic(log) cfg, err := parseConfig() if err != nil { fmt.Fprintf(os.Stderr, "Error parsing command-line arguments: %s", err) @@ -31,12 +31,12 @@ func main() { } defer disconnect(clients) - go func() { + spawn(func() { err = mineLoop(clients) if err != nil { panic(fmt.Errorf("Error in main loop: %s", err)) } - }() + }) interrupt := signal.InterruptListener() <-interrupt @@ -47,11 +47,3 @@ func disconnect(clients []*simulatorClient) { client.Disconnect() } } - -func handlePanic() { - err := recover() - if err != nil { - log.Errorf("Fatal error: %s", err) - log.Errorf("Stack trace: %s", debug.Stack()) - } -} diff --git a/mining/simulator/mineloop.go b/mining/simulator/mineloop.go index a82bfe1a8..026de203f 100644 --- a/mining/simulator/mineloop.go +++ b/mining/simulator/mineloop.go @@ -164,7 +164,7 @@ func mineLoop(clients []*simulatorClient) error { templateStopChan := make(chan struct{}) - go func() { + spawn(func() { for { currentClient := getRandomClient(clients) currentClient.notifyForNewBlocks = true @@ -182,7 +182,7 @@ func mineLoop(clients []*simulatorClient) error { return } } - }() + }) err := <-errChan diff --git a/netsync/log.go b/netsync/log.go index 1e7f28a6c..716dc6d2f 100644 --- a/netsync/log.go +++ b/netsync/log.go @@ -7,13 +7,16 @@ package netsync import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) func init() { log, _ = logger.Get(logger.SubsystemTags.SYNC) + spawn = panics.GoroutineWrapperFunc(log) } diff --git a/netsync/manager.go b/netsync/manager.go index adce4c2eb..a06eaf924 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -1231,13 +1231,13 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific // Update mempool ch := make(chan mempool.NewBlockMsg) - go func() { + spawn(func() { err := sm.txMemPool.HandleNewBlock(block, ch) close(ch) if err != nil { panic(fmt.Sprintf("HandleNewBlock failed to handle block %s", block.Hash())) } - }() + }) for msg := range ch { sm.peerNotifier.TransactionConfirmed(msg.Tx) sm.peerNotifier.AnnounceNewTransactions(msg.AcceptedTxs) @@ -1336,7 +1336,7 @@ func (sm *SyncManager) Start() { log.Trace("Starting sync manager") sm.wg.Add(1) - go sm.blockHandler() + spawn(sm.blockHandler) } // Stop gracefully shuts down the sync manager by stopping all asynchronous diff --git a/peer/log.go b/peer/log.go index caa850fe9..537203d2a 100644 --- a/peer/log.go +++ b/peer/log.go @@ -13,6 +13,7 @@ import ( "github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/logger" "github.com/daglabs/btcd/txscript" + "github.com/daglabs/btcd/util/panics" "github.com/daglabs/btcd/wire" ) @@ -26,10 +27,12 @@ const ( // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) // The default amount of logging is none. func init() { log, _ = logger.Get(logger.SubsystemTags.PEER) + spawn = panics.GoroutineWrapperFunc(log) } // LogClosure is a closure that can be printed with %s to be used to diff --git a/peer/peer.go b/peer/peer.go index c1ac7a282..b73f7cbf9 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1868,9 +1868,9 @@ func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) { // it is marked as disconnected and *then* it drains the channels. if !p.Connected() { if doneChan != nil { - go func() { + spawn(func() { doneChan <- struct{}{} - }() + }) } return } @@ -1925,12 +1925,12 @@ func (p *Peer) AssociateConnection(conn net.Conn) { p.na = na } - go func() { + spawn(func() { if err := p.start(); err != nil { log.Debugf("Cannot start peer %s: %s", p, err) p.Disconnect() } - }() + }) } // Connected returns whether or not the peer is currently connected. @@ -1961,13 +1961,13 @@ func (p *Peer) start() error { log.Tracef("Starting peer %s", p) negotiateErr := make(chan error, 1) - go func() { + spawn(func() { if p.inbound { negotiateErr <- p.negotiateInboundProtocol() } else { negotiateErr <- p.negotiateOutboundProtocol() } - }() + }) // Negotiate the protocol within the specified negotiateTimeout. select { @@ -1982,11 +1982,11 @@ func (p *Peer) start() error { // The protocol has been negotiated successfully so start processing input // and output messages. - go p.stallHandler() - go p.inHandler() - go p.queueHandler() - go p.outHandler() - go p.pingHandler() + spawn(p.stallHandler) + spawn(p.inHandler) + spawn(p.queueHandler) + spawn(p.outHandler) + spawn(p.pingHandler) // Send our verack message now that the IO processing machinery has started. p.QueueMessage(wire.NewMsgVerAck(), nil) diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index fe5600d4f..8f64306a7 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -650,7 +650,7 @@ out: // Reissue pending requests in another goroutine since // the send can block. - go c.resendRequests() + spawn(c.resendRequests) // Break out of the reconnect loop back to wait for // disconnect again. @@ -815,7 +815,7 @@ func (c *Client) sendRequest(data *jsonRequestData) chan *response { } else { jReq.responseChan = responseChan } - go func() { + spawn(func() { // Choose which marshal and send function to use depending on whether // the client running in HTTP POST mode or not. When running in HTTP // POST mode, the command is issued via an HTTP client. Otherwise, @@ -844,16 +844,16 @@ func (c *Client) sendRequest(data *jsonRequestData) chan *response { } log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) c.sendMessage(jReq.marshalledJSON) - }() + }) if cancelOnTimeout { - go func() { + spawn(func() { select { case <-time.Tick(c.config.RequestTimeout): responseChan <- &response{err: ErrResponseTimedOut} case resp := <-jReq.responseChan: responseChan <- resp } - }() + }) } return responseChan } @@ -1021,19 +1021,19 @@ func (c *Client) start() { // in HTTP POST mode or the default websocket mode. if c.config.HTTPPostMode { c.wg.Add(1) - go c.sendPostHandler() + spawn(c.sendPostHandler) } else { c.wg.Add(3) - go func() { + spawn(func() { if c.ntfnHandlers != nil { if c.ntfnHandlers.OnClientConnected != nil { c.ntfnHandlers.OnClientConnected() } } c.wg.Done() - }() - go c.wsInHandler() - go c.wsOutHandler() + }) + spawn(c.wsInHandler) + spawn(c.wsOutHandler) } } @@ -1269,7 +1269,7 @@ func New(config *ConnConfig, ntfnHandlers *NotificationHandlers) (*Client, error client.start() if !client.config.HTTPPostMode && !client.config.DisableAutoReconnect { client.wg.Add(1) - go client.wsReconnectHandler() + spawn(client.wsReconnectHandler) } } @@ -1324,7 +1324,7 @@ func (c *Client) Connect(tries int) error { c.start() if !c.config.DisableAutoReconnect { c.wg.Add(1) - go c.wsReconnectHandler() + spawn(c.wsReconnectHandler) } return nil } diff --git a/rpcclient/log.go b/rpcclient/log.go index 976483f7d..ab4599f4f 100644 --- a/rpcclient/log.go +++ b/rpcclient/log.go @@ -6,12 +6,14 @@ package rpcclient import ( "github.com/btcsuite/btclog" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) // The default amount of logging is none. func init() { @@ -22,11 +24,13 @@ func init() { // by default until UseLogger is called. func DisableLog() { log = btclog.Disabled + spawn = panics.GoroutineWrapperFunc(log) } // UseLogger uses a specified Logger to output package logging info. func UseLogger(logger btclog.Logger) { log = logger + spawn = panics.GoroutineWrapperFunc(log) } // LogClosure is a closure that can be printed with %s to be used to diff --git a/server/log.go b/server/log.go index 389e89b0e..b2094a4cd 100644 --- a/server/log.go +++ b/server/log.go @@ -7,12 +7,14 @@ package server import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var srvrLog, peerLog, txmpLog, indxLog, rpcsLog, amgrLog btclog.Logger +var spawn func(func()) func init() { srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR) @@ -21,4 +23,6 @@ func init() { indxLog, _ = logger.Get(logger.SubsystemTags.INDX) rpcsLog, _ = logger.Get(logger.SubsystemTags.RPCS) amgrLog, _ = logger.Get(logger.SubsystemTags.AMGR) + + spawn = panics.GoroutineWrapperFunc(srvrLog) } diff --git a/server/p2p/log.go b/server/p2p/log.go index e5bd3399b..778d8ab98 100644 --- a/server/p2p/log.go +++ b/server/p2p/log.go @@ -7,16 +7,20 @@ package p2p import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var srvrLog, peerLog, txmpLog, indxLog, rpcsLog, amgrLog btclog.Logger +var spawn func(func()) func init() { srvrLog, _ = logger.Get(logger.SubsystemTags.SRVR) peerLog, _ = logger.Get(logger.SubsystemTags.PEER) + spawn = panics.GoroutineWrapperFunc(peerLog) + txmpLog, _ = logger.Get(logger.SubsystemTags.TXMP) indxLog, _ = logger.Get(logger.SubsystemTags.INDX) rpcsLog, _ = logger.Get(logger.SubsystemTags.RPCS) diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index cdc814d2d..0a749feaf 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -1826,7 +1826,9 @@ func (s *Server) inboundPeerConnected(conn net.Conn) { sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + spawn(func() { + s.peerDoneHandler(sp) + }) } // outboundPeerConnected is invoked by the connection manager when a new @@ -1845,7 +1847,9 @@ func (s *Server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { sp.connReq = c sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) sp.AssociateConnection(conn) - go s.peerDoneHandler(sp) + spawn(func() { + s.peerDoneHandler(sp) + }) s.addrManager.Attempt(sp.NA()) } @@ -1911,7 +1915,7 @@ func (s *Server) peerHandler() { seedFromSubNetwork(config.MainConfig().SubnetworkID) } } - go s.connManager.Start() + spawn(s.connManager.Start) out: for { @@ -2109,11 +2113,11 @@ func (s *Server) Start() { // Start the peer handler which in turn starts the address and block // managers. s.wg.Add(1) - go s.peerHandler() + spawn(s.peerHandler) if s.nat != nil { s.wg.Add(1) - go s.upnpUpdateThread() + spawn(s.upnpUpdateThread) } cfg := config.MainConfig() @@ -2123,7 +2127,7 @@ func (s *Server) Start() { // Start the rebroadcastHandler, which ensures user tx received by // the RPC server are rebroadcast until being included in a block. - go s.rebroadcastHandler() + spawn(s.rebroadcastHandler) } } @@ -2158,7 +2162,7 @@ func (s *Server) ScheduleShutdown(duration time.Duration) { return } srvrLog.Warnf("Server shutdown in %s", duration) - go func() { + spawn(func() { remaining := duration tickDuration := dynamicTickDuration(remaining) done := time.After(remaining) @@ -2186,7 +2190,7 @@ func (s *Server) ScheduleShutdown(duration time.Duration) { srvrLog.Warnf("Server shutdown in %s", remaining) } } - }() + }) } // ParseListeners determines whether each listen address is IPv4 and IPv6 and diff --git a/server/rpc/log.go b/server/rpc/log.go index 1ef85a565..a92778af9 100644 --- a/server/rpc/log.go +++ b/server/rpc/log.go @@ -7,13 +7,16 @@ package rpc import ( "github.com/btcsuite/btclog" "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" ) // log is a logger that is initialized with no output filters. This // means the package will not perform any logging by default until the caller // requests it. var log btclog.Logger +var spawn func(func()) func init() { log, _ = logger.Get(logger.SubsystemTags.RPCS) + spawn = panics.GoroutineWrapperFunc(log) } diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go index 12dce8d8f..d62c43636 100644 --- a/server/rpc/rpcserver.go +++ b/server/rpc/rpcserver.go @@ -1503,12 +1503,12 @@ func (state *gbtWorkState) notifyLongPollers(tipHashes []*daghash.Hash, lastGene // clients with a new block template when their existing block template is // stale due to the newly added block. func (state *gbtWorkState) NotifyBlockAdded(tipHashes []*daghash.Hash) { - go func() { + spawn(func() { state.Lock() defer state.Unlock() state.notifyLongPollers(tipHashes, state.lastTxUpdate) - }() + }) } // NotifyMempoolTx uses the new last updated time for the transaction memory @@ -1516,7 +1516,7 @@ func (state *gbtWorkState) NotifyBlockAdded(tipHashes []*daghash.Hash) { // existing block template is stale due to enough time passing and the contents // of the memory pool changing. func (state *gbtWorkState) NotifyMempoolTx(lastUpdated time.Time) { - go func() { + spawn(func() { state.Lock() defer state.Unlock() @@ -1531,7 +1531,7 @@ func (state *gbtWorkState) NotifyMempoolTx(lastUpdated time.Time) { state.notifyLongPollers(state.tipHashes, lastUpdated) } - }() + }) } // templateUpdateChan returns a channel that will be closed once the block @@ -3918,12 +3918,12 @@ func (s *Server) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin boo // Setup a close notifier. Since the connection is hijacked, // the CloseNotifer on the ResponseWriter is not available. closeChan := make(chan struct{}, 1) - go func() { + spawn(func() { _, err := conn.Read(make([]byte, 1)) if err != nil { close(closeChan) } - }() + }) // Check if the user is limited and set error if method unauthorized if !isAdmin { diff --git a/server/rpc/rpcwebsocket.go b/server/rpc/rpcwebsocket.go index 5c2089a80..88a86d9c2 100644 --- a/server/rpc/rpcwebsocket.go +++ b/server/rpc/rpcwebsocket.go @@ -812,8 +812,8 @@ func (m *wsNotificationManager) RemoveClient(wsc *wsClient) { // websocket client notifications. func (m *wsNotificationManager) Start() { m.wg.Add(2) - go m.queueHandler() - go m.notificationHandler() + spawn(m.queueHandler) + spawn(m.notificationHandler) } // WaitForShutdown blocks until all notification manager goroutines have @@ -1078,10 +1078,10 @@ out: // read of the next request from the websocket client and allow // many requests to be waited on concurrently. c.serviceRequestSem.acquire() - go func() { + spawn(func() { c.serviceRequest(cmd) c.serviceRequestSem.release() - }() + }) } // Ensure the connection is closed. @@ -1303,9 +1303,9 @@ func (c *wsClient) Start() { // Start processing input and output. c.wg.Add(3) - go c.inHandler() - go c.notificationQueueHandler() - go c.outHandler() + spawn(c.inHandler) + spawn(c.notificationQueueHandler) + spawn(c.outHandler) } // WaitForShutdown blocks until the websocket client goroutines are stopped diff --git a/server/server.go b/server/server.go index 742778cd9..26451f9ec 100644 --- a/server/server.go +++ b/server/server.go @@ -132,10 +132,10 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params } // Signal process shutdown when the RPC server requests it. - go func() { + spawn(func() { <-s.rpcServer.RequestedProcessShutdown() signal.ShutdownRequestChannel <- struct{}{} - }() + }) } return s, nil diff --git a/service_windows.go b/service_windows.go index 92db6750b..58d78ab46 100644 --- a/service_windows.go +++ b/service_windows.go @@ -67,10 +67,10 @@ func (s *btcdService) Execute(args []string, r <-chan svc.ChangeRequest, changes // it is started so it can be gracefully stopped. doneChan := make(chan error) serverChan := make(chan *server.Server) - go func() { + spawn(func() { err := btcdMain(serverChan) doneChan <- err - }() + }) // Service is now started. changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted} diff --git a/util/panics/panics.go b/util/panics/panics.go new file mode 100644 index 000000000..960ce45cd --- /dev/null +++ b/util/panics/panics.go @@ -0,0 +1,31 @@ +package panics + +import ( + "os" + "runtime/debug" + + "github.com/btcsuite/btclog" + "github.com/daglabs/btcd/logger" +) + +// HandlePanic recovers panics, log them, and then exits the process. +func HandlePanic(log btclog.Logger) { + if err := recover(); err != nil { + log.Criticalf("Fatal error: %s", err) + log.Criticalf("Stack trace: %s", debug.Stack()) + if logger.LogRotator != nil { + logger.LogRotator.Close() + } + os.Exit(1) + } +} + +// GoroutineWrapperFunc returns a goroutine wrapper function that handles panics and write them to the log. +func GoroutineWrapperFunc(log btclog.Logger) func(func()) { + return func(f func()) { + go func() { + defer HandlePanic(log) + f() + }() + } +}