Merge pull request #3196 from eyakubovich/fix-watch-timeout

client: handle watch timing out elegantly
This commit is contained in:
Yicheng Qin 2015-08-04 13:52:42 -07:00
commit 0650170a1b
2 changed files with 26 additions and 10 deletions

View File

@ -356,6 +356,13 @@ So the first watch after the get should be:
curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=2008' curl 'http://127.0.0.1:2379/v2/keys/foo?wait=true&waitIndex=2008'
``` ```
#### Connection being closed prematurely
The server may close a long polling connection before emitting any events.
This can happend due to a timeout or the server being shutdown.
Since the HTTP header is sent immediately upon accepting the connection, the response will be seen as empty: `200 OK` and empty body.
The clients should be prepared to deal with this scenario and retry the watch.
### Atomically Creating In-Order Keys ### Atomically Creating In-Order Keys
Using `POST` on a directory, you can create keys with key names that are created in-order. Using `POST` on a directory, you can create keys with key names that are created in-order.

View File

@ -63,6 +63,7 @@ func (e Error) Error() string {
var ( var (
ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint.") ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint.")
ErrEmptyBody = errors.New("client: response body is empty")
) )
// PrevExistType is used to define an existence condition when setting // PrevExistType is used to define an existence condition when setting
@ -419,18 +420,23 @@ type httpWatcher struct {
} }
func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) { func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
httpresp, body, err := hw.client.Do(ctx, &hw.nextWait) for {
if err != nil { httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
return nil, err if err != nil {
} return nil, err
}
resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body) resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body)
if err != nil { if err != nil {
return nil, err if err == ErrEmptyBody {
} continue
}
return nil, err
}
hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1 hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
return resp, nil return resp, nil
}
} }
// v2KeysURL forms a URL representing the location of a key. // v2KeysURL forms a URL representing the location of a key.
@ -590,6 +596,9 @@ func (a *createInOrderAction) HTTPRequest(ep url.URL) *http.Request {
func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) { func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) {
switch code { switch code {
case http.StatusOK, http.StatusCreated: case http.StatusOK, http.StatusCreated:
if len(body) == 0 {
return nil, ErrEmptyBody
}
res, err = unmarshalSuccessfulKeysResponse(header, body) res, err = unmarshalSuccessfulKeysResponse(header, body)
default: default:
err = unmarshalFailedKeysResponse(body) err = unmarshalFailedKeysResponse(body)