From 7567cd4cb90aacd717c35724c84d8c937df90727 Mon Sep 17 00:00:00 2001 From: Dan Aharoni Date: Thu, 13 Feb 2020 13:10:07 +0200 Subject: [PATCH] [NOD-744] Wrap go routines with spawn (#626) * [NOD-744] Wrap go routines with spawn * [NOD-747] Wrap some more go routines with spawn * [NOD-744] Some more missing go routines * [NOD-744] Break lines so make code more readable * [NOD-744] Declare a local scope variable so the func would use it. * [NOD-744] Fix type and update comment. * [NOD-744] Declare local var so go routine would use it * [NOD-744] Rename variable, use normal assignment; * [NOD-744] Rename variable. --- cmd/kaspaminer/mineloop.go | 12 +++++++++--- connmgr/connmanager.go | 17 +++++++++++++---- connmgr/seed.go | 4 ++-- database/cmd/dbtool/insecureimport.go | 4 +++- rpcclient/infrastructure.go | 4 +++- server/p2p/p2p.go | 16 ++++++++++------ server/rpc/rpcserver.go | 13 ++++++++----- 7 files changed, 48 insertions(+), 22 deletions(-) diff --git a/cmd/kaspaminer/mineloop.go b/cmd/kaspaminer/mineloop.go index 5d1795b08..c48d4986d 100644 --- a/cmd/kaspaminer/mineloop.go +++ b/cmd/kaspaminer/mineloop.go @@ -60,8 +60,12 @@ func mineLoop(client *minerClient, numberOfBlocks uint64, blockDelay uint64) err func mineNextBlock(client *minerClient, foundBlock chan *util.Block, templateStopChan chan struct{}, errChan chan error) { newTemplateChan := make(chan *rpcmodel.GetBlockTemplateResult) - go templatesLoop(client, newTemplateChan, errChan, templateStopChan) - go solveLoop(newTemplateChan, foundBlock, errChan) + spawn(func() { + templatesLoop(client, newTemplateChan, errChan, templateStopChan) + }) + spawn(func() { + solveLoop(newTemplateChan, foundBlock, errChan) + }) } func handleFoundBlock(client *minerClient, block *util.Block) error { @@ -193,7 +197,9 @@ func solveLoop(newTemplateChan chan *rpcmodel.GetBlockTemplateResult, foundBlock return } - go solveBlock(block, stopOldTemplateSolving, foundBlock) + spawn(func() { + solveBlock(block, stopOldTemplateSolving, foundBlock) + }) } if stopOldTemplateSolving != nil { close(stopOldTemplateSolving) diff --git a/connmgr/connmanager.go b/connmgr/connmanager.go index 38ef56959..a4cbd4b32 100644 --- a/connmgr/connmanager.go +++ b/connmgr/connmanager.go @@ -375,7 +375,9 @@ out: } if cm.cfg.OnDisconnection != nil { - go cm.cfg.OnDisconnection(connReq) + spawn(func() { + cm.cfg.OnDisconnection(connReq) + }, cm.handlePanic) } // All internal state has been cleaned up, if @@ -572,7 +574,9 @@ func (cm *ConnManager) listenHandler(listener net.Listener) { } continue } - go cm.cfg.OnAccept(conn) + spawn(func() { + cm.cfg.OnAccept(conn) + }, cm.handlePanic) } cm.wg.Done() @@ -593,9 +597,14 @@ func (cm *ConnManager) Start() { // Start all the listeners so long as the caller requested them and // provided a callback to be invoked when connections are accepted. if cm.cfg.OnAccept != nil { - for _, listner := range cm.cfg.Listeners { + for _, listener := range cm.cfg.Listeners { + // Declaring this variable is necessary as it needs be declared in the same + // scope of the anonymous function below it. + listenerCopy := listener cm.wg.Add(1) - go cm.listenHandler(listner) + spawn(func() { + cm.listenHandler(listenerCopy) + }, cm.handlePanic) } } diff --git a/connmgr/seed.go b/connmgr/seed.go index 94cc66379..c13961ca2 100644 --- a/connmgr/seed.go +++ b/connmgr/seed.go @@ -66,7 +66,7 @@ func SeedFromDNS(dagParams *dagconfig.Params, reqServices wire.ServiceFlag, incl } } - go func(host string) { + spawn(func() { randSource := mrand.New(mrand.NewSource(time.Now().UnixNano())) seedpeers, err := lookupFn(host) @@ -94,6 +94,6 @@ func SeedFromDNS(dagParams *dagconfig.Params, reqServices wire.ServiceFlag, incl } seedFn(addresses) - }(host) + }, nil) } } diff --git a/database/cmd/dbtool/insecureimport.go b/database/cmd/dbtool/insecureimport.go index 1f9ee64c8..952e8256c 100644 --- a/database/cmd/dbtool/insecureimport.go +++ b/database/cmd/dbtool/insecureimport.go @@ -301,7 +301,9 @@ func (bi *blockImporter) Import() chan *importResults { // Start the status handler and return the result channel that it will // send the results on when the import is done. resultChan := make(chan *importResults) - go bi.statusHandler(resultChan) + spawn(func() { + bi.statusHandler(resultChan) + }) return resultChan } diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index b3b1d6300..804226f5f 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -426,7 +426,9 @@ out: } break out } - go c.handleMessage(msg) + spawn(func() { + c.handleMessage(msg) + }) } // Ensure the connection is closed. diff --git a/server/p2p/p2p.go b/server/p2p/p2p.go index 8db6b50f1..c4b9d2633 100644 --- a/server/p2p/p2p.go +++ b/server/p2p/p2p.go @@ -923,9 +923,11 @@ func (s *Server) handleQuery(state *peerState, querymsg interface{}) { } // TODO: if too many, nuke a non-perm peer. - go s.connManager.Connect(&connmgr.ConnReq{ - Addr: netAddr, - Permanent: msg.Permanent, + spawn(func() { + s.connManager.Connect(&connmgr.ConnReq{ + Addr: netAddr, + Permanent: msg.Permanent, + }) }) msg.Reply <- nil case RemoveNodeMsg: @@ -1751,9 +1753,11 @@ func NewServer(listenAddrs []string, db database.DB, dagParams *dagconfig.Params return nil, err } - go s.connManager.Connect(&connmgr.ConnReq{ - Addr: netAddr, - Permanent: true, + spawn(func() { + s.connManager.Connect(&connmgr.ConnReq{ + Addr: netAddr, + Permanent: true, + }) }) } diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go index b61f97c4f..4c30922ff 100644 --- a/server/rpc/rpcserver.go +++ b/server/rpc/rpcserver.go @@ -623,12 +623,15 @@ func (s *Server) Start() { for _, listener := range s.cfg.Listeners { s.wg.Add(1) - go func(listener net.Listener) { - log.Infof("RPC server listening on %s", listener.Addr()) - httpServer.Serve(listener) - log.Tracef("RPC listener done for %s", listener.Addr()) + // Declaring this variable is necessary as it needs be declared in the same + // scope of the anonymous function below it. + listenerCopy := listener + spawn(func() { + log.Infof("RPC server listening on %s", listenerCopy.Addr()) + httpServer.Serve(listenerCopy) + log.Tracef("RPC listener done for %s", listenerCopy.Addr()) s.wg.Done() - }(listener) + }) } s.ntfnMgr.Start()