[NOD-115] add timeout to rpcclient requests (#252)

* [NOD-115] Add timeout to rpcclient requests

* [NOD-115] Add timeout of half a second to mining simulator requests

* [NOD-115] Remove redundant allocation of responseChan
This commit is contained in:
Ori Newman 2019-04-17 17:51:50 +03:00 committed by Evgeny Khirin
parent 89ca293dc1
commit e22bc9af8f
4 changed files with 81 additions and 45 deletions

View File

@ -35,11 +35,12 @@ func connectToServers(cfg *config, addressList []string) ([]*simulatorClient, er
}, },
} }
connCfg := &rpcclient.ConnConfig{ connCfg := &rpcclient.ConnConfig{
Host: address, Host: address,
Endpoint: "ws", Endpoint: "ws",
User: "user", User: "user",
Pass: "pass", Pass: "pass",
DisableTLS: cfg.DisableTLS, DisableTLS: cfg.DisableTLS,
RequestTimeout: time.Second / 2,
} }
if !cfg.DisableTLS { if !cfg.DisableTLS {

View File

@ -12,6 +12,7 @@ import (
"github.com/daglabs/btcd/blockdag" "github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/btcjson" "github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/dagconfig/daghash" "github.com/daglabs/btcd/dagconfig/daghash"
"github.com/daglabs/btcd/rpcclient"
"github.com/daglabs/btcd/util" "github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/wire" "github.com/daglabs/btcd/wire"
) )
@ -79,7 +80,10 @@ func templatesLoop(client *simulatorClient, newTemplateChan chan *btcjson.GetBlo
longPollID := "" longPollID := ""
getBlockTemplateLongPoll := func() { getBlockTemplateLongPoll := func() {
template, err := getBlockTemplate(client, longPollID) template, err := getBlockTemplate(client, longPollID)
if err != nil { if err == rpcclient.ErrResponseTimedOut {
log.Printf("Got timeout while requesting template '%s' from %s", longPollID, client.Host())
return
} else if err != nil {
errChan <- fmt.Errorf("Error getting block template: %s", err) errChan <- fmt.Errorf("Error getting block template: %s", err)
return return
} }

View File

@ -69,6 +69,10 @@ var (
// client having already connected to the RPC server. // client having already connected to the RPC server.
ErrClientAlreadyConnected = errors.New("websocket client has already " + ErrClientAlreadyConnected = errors.New("websocket client has already " +
"connected") "connected")
// ErrResponseTimedOut is an error to describe the condition where
// a response hasn't arrived before the expected timeout.
ErrResponseTimedOut = errors.New("no response was receieved until the timeout")
) )
const ( const (
@ -96,11 +100,15 @@ type sendPostDetails struct {
// jsonRequest holds information about a json request that is used to properly // jsonRequest holds information about a json request that is used to properly
// detect, interpret, and deliver a reply to it. // detect, interpret, and deliver a reply to it.
type jsonRequest struct { type jsonRequest struct {
*jsonRequestData
responseChan chan *response
}
type jsonRequestData struct {
id uint64 id uint64
method string method string
cmd interface{} cmd interface{}
marshalledJSON []byte marshalledJSON []byte
responseChan chan *response
} }
// Client represents a Bitcoin RPC client which allows easy access to the // Client represents a Bitcoin RPC client which allows easy access to the
@ -834,35 +842,58 @@ func (c *Client) sendPost(jReq *jsonRequest) {
// sendRequest sends the passed json request to the associated server using the // sendRequest sends the passed json request to the associated server using the
// provided response channel for the reply. It handles both websocket and HTTP // provided response channel for the reply. It handles both websocket and HTTP
// POST mode depending on the configuration of the client. // POST mode depending on the configuration of the client.
func (c *Client) sendRequest(jReq *jsonRequest) { func (c *Client) sendRequest(data *jsonRequestData) chan *response {
// Choose which marshal and send function to use depending on whether jReq := &jsonRequest{
// the client running in HTTP POST mode or not. When running in HTTP jsonRequestData: data,
// POST mode, the command is issued via an HTTP client. Otherwise,
// the command is issued via the asynchronous websocket channels.
if c.config.HTTPPostMode {
c.sendPost(jReq)
return
} }
responseChan := make(chan *response, 1)
cancelOnTimeout := c.config.RequestTimeout != 0 && !c.config.HTTPPostMode
if cancelOnTimeout {
jReq.responseChan = make(chan *response, 1)
} else {
jReq.responseChan = responseChan
}
go func() {
// Choose which marshal and send function to use depending on whether
// the client running in HTTP POST mode or not. When running in HTTP
// POST mode, the command is issued via an HTTP client. Otherwise,
// the command is issued via the asynchronous websocket channels.
if c.config.HTTPPostMode {
c.sendPost(jReq)
return
}
// Check whether the websocket connection has never been established, // Check whether the websocket connection has never been established,
// in which case the handler goroutines are not running. // in which case the handler goroutines are not running.
select { select {
case <-c.connEstablished: case <-c.connEstablished:
default: default:
jReq.responseChan <- &response{err: ErrClientNotConnected} jReq.responseChan <- &response{err: ErrClientNotConnected}
return return
} }
// Add the request to the internal tracking map so the response from the // Add the request to the internal tracking map so the response from the
// remote server can be properly detected and routed to the response // remote server can be properly detected and routed to the response
// channel. Then send the marshalled request via the websocket // channel. Then send the marshalled request via the websocket
// connection. // connection.
if err := c.addRequest(jReq); err != nil { if err := c.addRequest(jReq); err != nil {
jReq.responseChan <- &response{err: err} jReq.responseChan <- &response{err: err}
return return
}
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id)
c.sendMessage(jReq.marshalledJSON)
}()
if cancelOnTimeout {
go func() {
select {
case <-time.Tick(c.config.RequestTimeout):
responseChan <- &response{err: ErrResponseTimedOut}
case resp := <-jReq.responseChan:
responseChan <- resp
}
}()
} }
log.Tracef("Sending command [%s] with id %d", jReq.method, jReq.id) return responseChan
c.sendMessage(jReq.marshalledJSON)
} }
// sendCmd sends the passed command to the associated server and returns a // sendCmd sends the passed command to the associated server and returns a
@ -883,18 +914,15 @@ func (c *Client) sendCmd(cmd interface{}) chan *response {
return newFutureError(err) return newFutureError(err)
} }
// Generate the request and send it along with a channel to respond on. // Generate the request.
responseChan := make(chan *response, 1) jReqData := &jsonRequestData{
jReq := &jsonRequest{
id: id, id: id,
method: method, method: method,
cmd: cmd, cmd: cmd,
marshalledJSON: marshalledJSON, marshalledJSON: marshalledJSON,
responseChan: responseChan,
} }
c.sendRequest(jReq) // Send the request and return its response channel
return c.sendRequest(jReqData)
return responseChan
} }
// sendCmdAndWait sends the passed command to the associated server, waits // sendCmdAndWait sends the passed command to the associated server, waits
@ -1113,6 +1141,10 @@ type ConnConfig struct {
// flag can be set to true to use basic HTTP POST requests instead. // flag can be set to true to use basic HTTP POST requests instead.
HTTPPostMode bool HTTPPostMode bool
// RequestTimeout is the time it'll take for a request to timeout if
// it doesn't get a response.
RequestTimeout time.Duration
// EnableBCInfoHacks is an option provided to enable compatibility hacks // EnableBCInfoHacks is an option provided to enable compatibility hacks
// when connecting to blockchain.info RPC server // when connecting to blockchain.info RPC server
EnableBCInfoHacks bool EnableBCInfoHacks bool
@ -1148,6 +1180,7 @@ func newHTTPClient(config *ConnConfig) (*http.Client, error) {
Proxy: proxyFunc, Proxy: proxyFunc,
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
}, },
Timeout: config.RequestTimeout,
} }
return &client, nil return &client, nil

View File

@ -54,18 +54,16 @@ func (c *Client) RawRequestAsync(method string, params []json.RawMessage) Future
return newFutureError(err) return newFutureError(err)
} }
// Generate the request and send it along with a channel to respond on. // Generate the request.
responseChan := make(chan *response, 1) jReqData := &jsonRequestData{
jReq := &jsonRequest{
id: id, id: id,
method: method, method: method,
cmd: nil, cmd: nil,
marshalledJSON: marshalledJSON, marshalledJSON: marshalledJSON,
responseChan: responseChan,
} }
c.sendRequest(jReq)
return responseChan // Send the request and return its response channel
return c.sendRequest(jReqData)
} }
// RawRequest allows the caller to send a raw or custom request to the server. // RawRequest allows the caller to send a raw or custom request to the server.