mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #3181 from xiang90/2.2-client-error
client: return cluster error if the etcd cluster is not avaliable
This commit is contained in:
commit
ff945c7404
@ -31,6 +31,7 @@ import (
|
||||
var (
|
||||
ErrNoEndpoints = errors.New("client: no endpoints available")
|
||||
ErrTooManyRedirects = errors.New("client: too many redirects")
|
||||
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
||||
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
||||
)
|
||||
|
||||
@ -230,18 +231,22 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
||||
var resp *http.Response
|
||||
var body []byte
|
||||
var err error
|
||||
cerr := &ClusterError{}
|
||||
|
||||
for i := pinned; i < leps+pinned; i++ {
|
||||
k := i % leps
|
||||
hc := c.clientFactory(eps[k])
|
||||
resp, body, err = hc.Do(ctx, action)
|
||||
if err != nil {
|
||||
cerr.Errors = append(cerr.Errors, err)
|
||||
if err == context.DeadlineExceeded || err == context.Canceled {
|
||||
return nil, nil, err
|
||||
return nil, nil, cerr
|
||||
}
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode/100 == 5 {
|
||||
// TODO: make sure this is a no leader response
|
||||
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String()))
|
||||
continue
|
||||
}
|
||||
if k != pinned {
|
||||
@ -249,10 +254,10 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
|
||||
c.pinned = k
|
||||
c.Unlock()
|
||||
}
|
||||
break
|
||||
return resp, body, nil
|
||||
}
|
||||
|
||||
return resp, body, err
|
||||
return nil, nil, cerr
|
||||
}
|
||||
|
||||
func (c *httpClusterClient) Endpoints() []string {
|
||||
|
@ -349,7 +349,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
||||
),
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
},
|
||||
wantErr: context.DeadlineExceeded,
|
||||
wantErr: &ClusterError{Errors: []error{context.DeadlineExceeded}},
|
||||
},
|
||||
|
||||
// context.Canceled short-circuits Do
|
||||
@ -364,7 +364,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
||||
),
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
},
|
||||
wantErr: context.Canceled,
|
||||
wantErr: &ClusterError{Errors: []error{context.Canceled}},
|
||||
},
|
||||
|
||||
// return err if there are no endpoints
|
||||
@ -389,7 +389,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
|
||||
),
|
||||
rand: rand.New(rand.NewSource(0)),
|
||||
},
|
||||
wantErr: fakeErr,
|
||||
wantErr: &ClusterError{Errors: []error{fakeErr, fakeErr}},
|
||||
},
|
||||
|
||||
// 500-level errors cause Do to fallthrough to next endpoint
|
||||
|
33
client/cluster_error.go
Normal file
33
client/cluster_error.go
Normal file
@ -0,0 +1,33 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import "fmt"
|
||||
|
||||
type ClusterError struct {
|
||||
Errors []error
|
||||
}
|
||||
|
||||
func (ce *ClusterError) Error() string {
|
||||
return ErrClusterUnavailable.Error()
|
||||
}
|
||||
|
||||
func (ce *ClusterError) Detail() string {
|
||||
s := ""
|
||||
for i, e := range ce.Errors {
|
||||
s += fmt.Sprintf("error #%d: %s\n", i, e)
|
||||
}
|
||||
return s
|
||||
}
|
@ -221,7 +221,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||
if err == client.ErrInvalidJSON {
|
||||
return nil, 0, 0, ErrBadDiscoveryEndpoint
|
||||
}
|
||||
if err == context.DeadlineExceeded {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
return d.checkClusterRetry()
|
||||
}
|
||||
return nil, 0, 0, err
|
||||
@ -235,7 +236,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||
resp, err = d.c.Get(ctx, d.cluster, nil)
|
||||
cancel()
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
return d.checkClusterRetry()
|
||||
}
|
||||
return nil, 0, 0, err
|
||||
@ -266,7 +268,7 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
|
||||
func (d *discovery) logAndBackoffForRetry(step string) {
|
||||
d.retries++
|
||||
retryTime := time.Second * (0x1 << d.retries)
|
||||
plog.Infof("%s: connection to %s timed out, retrying in %s", step, d.url, retryTime)
|
||||
plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTime)
|
||||
d.clock.Sleep(retryTime)
|
||||
}
|
||||
|
||||
@ -311,7 +313,8 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
|
||||
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
|
||||
resp, err := w.Next(context.Background())
|
||||
if err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
if ce, ok := err.(*client.ClusterError); ok {
|
||||
plog.Error(ce.Detail())
|
||||
return d.waitNodesRetry()
|
||||
}
|
||||
return nil, err
|
||||
|
@ -524,7 +524,7 @@ type clientWithRetry struct {
|
||||
func (c *clientWithRetry) Create(ctx context.Context, key string, value string) (*client.Response, error) {
|
||||
if c.failCount < c.failTimes {
|
||||
c.failCount++
|
||||
return nil, context.DeadlineExceeded
|
||||
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||
}
|
||||
return c.clientWithResp.Create(ctx, key, value)
|
||||
}
|
||||
@ -532,7 +532,7 @@ func (c *clientWithRetry) Create(ctx context.Context, key string, value string)
|
||||
func (c *clientWithRetry) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
|
||||
if c.failCount < c.failTimes {
|
||||
c.failCount++
|
||||
return nil, context.DeadlineExceeded
|
||||
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||
}
|
||||
return c.clientWithResp.Get(ctx, key, opts)
|
||||
}
|
||||
@ -547,7 +547,7 @@ type watcherWithRetry struct {
|
||||
func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
|
||||
if w.failCount < w.failTimes {
|
||||
w.failCount++
|
||||
return nil, context.DeadlineExceeded
|
||||
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
|
||||
}
|
||||
if len(w.rs) == 0 {
|
||||
return &client.Response{}, nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user