client: handle empty watch responses elegantly

Even though current etcd does not time out
watches, the client could be running against
an old etcd version or the server may close
polling connection for other reasons.
This patch ignores successful (as in 200)
responses with emtpy bodies instead
of producing JSON errors.
This commit is contained in:
Eugene Yakubovich 2015-07-29 18:52:13 -07:00
parent 219ed1695b
commit 6312e22b1d
2 changed files with 26 additions and 10 deletions

View File

@ -356,6 +356,13 @@ So the first watch after the get should be:
curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=2008' curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=2008'
``` ```
#### Connection being closed prematurely
The server may close a long polling connection before emitting any events.
This can happend due to a timeout or the server being shutdown.
Since the HTTP header is sent immediately upon accepting the connection, the response will be seen as empty: `200 OK` and empty body.
The clients should be prepared to deal with this scenario and retry the watch.
### Atomically Creating In-Order Keys ### Atomically Creating In-Order Keys
Using `POST` on a directory, you can create keys with key names that are created in-order. Using `POST` on a directory, you can create keys with key names that are created in-order.

View File

@ -63,6 +63,7 @@ func (e Error) Error() string {
var ( var (
ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint.") ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint.")
ErrEmptyBody = errors.New("client: response body is empty")
) )
// PrevExistType is used to define an existence condition when setting // PrevExistType is used to define an existence condition when setting
@ -419,6 +420,7 @@ type httpWatcher struct {
} }
func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) { func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
for {
httpresp, body, err := hw.client.Do(ctx, &hw.nextWait) httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
if err != nil { if err != nil {
return nil, err return nil, err
@ -426,12 +428,16 @@ func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body) resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body)
if err != nil { if err != nil {
if err == ErrEmptyBody {
continue
}
return nil, err return nil, err
} }
hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
return resp, nil return resp, nil
} }
}
// v2KeysURL forms a URL representing the location of a key. // v2KeysURL forms a URL representing the location of a key.
// The endpoint argument represents the base URL of an etcd // The endpoint argument represents the base URL of an etcd
@ -590,6 +596,9 @@ func (a *createInOrderAction) HTTPRequest(ep url.URL) *http.Request {
func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) { func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) {
switch code { switch code {
case http.StatusOK, http.StatusCreated: case http.StatusOK, http.StatusCreated:
if len(body) == 0 {
return nil, ErrEmptyBody
}
res, err = unmarshalSuccessfulKeysResponse(header, body) res, err = unmarshalSuccessfulKeysResponse(header, body)
default: default:
err = unmarshalFailedKeysResponse(body) err = unmarshalFailedKeysResponse(body)