From e22bc9af8fd3601f4419e94e5f2075e5a80db351 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Wed, 17 Apr 2019 17:51:50 +0300 Subject: [PATCH] [NOD-115] add timeout to rpcclient requests (#252) * [NOD-115] Add timeout to rpcclient requests * [NOD-115] Add timeout of half a second to mining simulator requests * [NOD-115] Remove redundant allocation of responseChan --- mining/simulator/connect.go | 11 ++-- mining/simulator/mineloop.go | 6 ++- rpcclient/infrastructure.go | 99 ++++++++++++++++++++++++------------ rpcclient/rawrequest.go | 10 ++-- 4 files changed, 81 insertions(+), 45 deletions(-) diff --git a/mining/simulator/connect.go b/mining/simulator/connect.go index 79d2df158..09abea96c 100644 --- a/mining/simulator/connect.go +++ b/mining/simulator/connect.go @@ -35,11 +35,12 @@ func connectToServers(cfg *config, addressList []string) ([]*simulatorClient, er }, } connCfg := &rpcclient.ConnConfig{ - Host: address, - Endpoint: "ws", - User: "user", - Pass: "pass", - DisableTLS: cfg.DisableTLS, + Host: address, + Endpoint: "ws", + User: "user", + Pass: "pass", + DisableTLS: cfg.DisableTLS, + RequestTimeout: time.Second / 2, } if !cfg.DisableTLS { diff --git a/mining/simulator/mineloop.go b/mining/simulator/mineloop.go index 3e7043d92..0beb6ea74 100644 --- a/mining/simulator/mineloop.go +++ b/mining/simulator/mineloop.go @@ -12,6 +12,7 @@ import ( "github.com/daglabs/btcd/blockdag" "github.com/daglabs/btcd/btcjson" "github.com/daglabs/btcd/dagconfig/daghash" + "github.com/daglabs/btcd/rpcclient" "github.com/daglabs/btcd/util" "github.com/daglabs/btcd/wire" ) @@ -79,7 +80,10 @@ func templatesLoop(client *simulatorClient, newTemplateChan chan *btcjson.GetBlo longPollID := "" getBlockTemplateLongPoll := func() { template, err := getBlockTemplate(client, longPollID) - if err != nil { + if err == rpcclient.ErrResponseTimedOut { + log.Printf("Got timeout while requesting template '%s' from %s", longPollID, client.Host()) + return + } else if err != nil { errChan <- fmt.Errorf("Error getting block template: %s", err) return } diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index c31a2d4f8..3f8398de3 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -69,6 +69,10 @@ var ( // client having already connected to the RPC server. ErrClientAlreadyConnected = errors.New("websocket client has already " + "connected") + + // ErrResponseTimedOut is an error to describe the condition where + // a response hasn't arrived before the expected timeout. + ErrResponseTimedOut = errors.New("no response was receieved until the timeout") ) const ( @@ -96,11 +100,15 @@ type sendPostDetails struct { // jsonRequest holds information about a json request that is used to properly // detect, interpret, and deliver a reply to it. type jsonRequest struct { + *jsonRequestData + responseChan chan *response +} + +type jsonRequestData struct { id uint64 method string cmd interface{} marshalledJSON []byte - responseChan chan *response } // Client represents a Bitcoin RPC client which allows easy access to the @@ -834,35 +842,58 @@ func (c *Client) sendPost(jReq *jsonRequest) { // sendRequest sends the passed json request to the associated server using the // provided response channel for the reply. It handles both websocket and HTTP // POST mode depending on the configuration of the client. -func (c *Client) sendRequest(jReq *jsonRequest) { - // Choose which marshal and send function to use depending on whether - // the client running in HTTP POST mode or not. When running in HTTP - // POST mode, the command is issued via an HTTP client. Otherwise, - // the command is issued via the asynchronous websocket channels. - if c.config.HTTPPostMode { - c.sendPost(jReq) - return +func (c *Client) sendRequest(data *jsonRequestData) chan *response { + jReq := &jsonRequest{ + jsonRequestData: data, } + responseChan := make(chan *response, 1) + cancelOnTimeout := c.config.RequestTimeout != 0 && !c.config.HTTPPostMode + if cancelOnTimeout { + jReq.responseChan = make(chan *response, 1) + } else { + jReq.responseChan = responseChan + } + go func() { + // Choose which marshal and send function to use depending on whether + // the client running in HTTP POST mode or not. When running in HTTP + // POST mode, the command is issued via an HTTP client. Otherwise, + // the command is issued via the asynchronous websocket channels. + if c.config.HTTPPostMode { + c.sendPost(jReq) + return + } - // Check whether the websocket connection has never been established, - // in which case the handler goroutines are not running. - select { - case <-c.connEstablished: - default: - jReq.responseChan <- &response{err: ErrClientNotConnected} - return - } + // Check whether the websocket connection has never been established, + // in which case the handler goroutines are not running. + select { + case <-c.connEstablished: + default: + jReq.responseChan <- &response{err: ErrClientNotConnected} + return + } - // Add the request to the internal tracking map so the response from the - // remote server can be properly detected and routed to the response - // channel. Then send the marshalled request via the websocket - // connection. - if err := c.addRequest(jReq); err != nil { - jReq.responseChan <- &response{err: err} - return + // Add the request to the internal tracking map so the response from the + // remote server can be properly detected and routed to the response + // channel. Then send the marshalled request via the websocket + // connection. + if err := c.addRequest(jReq); err != nil { + jReq.responseChan <- &response{err: err} + return + } + log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) + c.sendMessage(jReq.marshalledJSON) + }() + if cancelOnTimeout { + go func() { + select { + case <-time.Tick(c.config.RequestTimeout): + responseChan <- &response{err: ErrResponseTimedOut} + case resp := <-jReq.responseChan: + responseChan <- resp + } + }() } - log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) - c.sendMessage(jReq.marshalledJSON) + return responseChan } // sendCmd sends the passed command to the associated server and returns a @@ -883,18 +914,15 @@ func (c *Client) sendCmd(cmd interface{}) chan *response { return newFutureError(err) } - // Generate the request and send it along with a channel to respond on. - responseChan := make(chan *response, 1) - jReq := &jsonRequest{ + // Generate the request. + jReqData := &jsonRequestData{ id: id, method: method, cmd: cmd, marshalledJSON: marshalledJSON, - responseChan: responseChan, } - c.sendRequest(jReq) - - return responseChan + // Send the request and return its response channel + return c.sendRequest(jReqData) } // sendCmdAndWait sends the passed command to the associated server, waits @@ -1113,6 +1141,10 @@ type ConnConfig struct { // flag can be set to true to use basic HTTP POST requests instead. HTTPPostMode bool + // RequestTimeout is the time it'll take for a request to timeout if + // it doesn't get a response. + RequestTimeout time.Duration + // EnableBCInfoHacks is an option provided to enable compatibility hacks // when connecting to blockchain.info RPC server EnableBCInfoHacks bool @@ -1148,6 +1180,7 @@ func newHTTPClient(config *ConnConfig) (*http.Client, error) { Proxy: proxyFunc, TLSClientConfig: tlsConfig, }, + Timeout: config.RequestTimeout, } return &client, nil diff --git a/rpcclient/rawrequest.go b/rpcclient/rawrequest.go index f79b4494d..89fbe8a33 100644 --- a/rpcclient/rawrequest.go +++ b/rpcclient/rawrequest.go @@ -54,18 +54,16 @@ func (c *Client) RawRequestAsync(method string, params []json.RawMessage) Future return newFutureError(err) } - // Generate the request and send it along with a channel to respond on. - responseChan := make(chan *response, 1) - jReq := &jsonRequest{ + // Generate the request. + jReqData := &jsonRequestData{ id: id, method: method, cmd: nil, marshalledJSON: marshalledJSON, - responseChan: responseChan, } - c.sendRequest(jReq) - return responseChan + // Send the request and return its response channel + return c.sendRequest(jReqData) } // RawRequest allows the caller to send a raw or custom request to the server.