diff --git a/client/client.go b/client/client.go index 17b63d3d9..4c4d41eb9 100644 --- a/client/client.go +++ b/client/client.go @@ -37,6 +37,10 @@ var ( ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured") ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available") errTooManyRedirectChecks = errors.New("client: too many redirect checks") + + // oneShotCtxValue is set on a context using WithValue(&oneShotValue) so + // that Do() will not retry a request + oneShotCtxValue interface{} ) var DefaultRequestTimeout = 5 * time.Second @@ -335,6 +339,7 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo var body []byte var err error cerr := &ClusterError{} + isOneShot := ctx.Value(&oneShotCtxValue) != nil for i := pinned; i < leps+pinned; i++ { k := i % leps @@ -348,6 +353,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo if err == context.Canceled || err == context.DeadlineExceeded { return nil, nil, err } + if isOneShot { + return nil, nil, err + } continue } if resp.StatusCode/100 == 5 { @@ -358,6 +366,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo default: cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode))) } + if isOneShot { + return nil, nil, cerr.Errors[0] + } continue } if k != pinned { diff --git a/client/integration/client_test.go b/client/integration/client_test.go new file mode 100644 index 000000000..ba4cb8b2b --- /dev/null +++ b/client/integration/client_test.go @@ -0,0 +1,134 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync/atomic" + "testing" + + "golang.org/x/net/context" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" +) + +// TestV2NoRetryEOF tests destructive api calls won't retry on a disconnection. +func TestV2NoRetryEOF(t *testing.T) { + defer testutil.AfterTest(t) + // generate an EOF response; specify address so appears first in sorted ep list + lEOF := integration.NewListenerWithAddr(t, fmt.Sprintf("eof:123.%d.sock", os.Getpid())) + defer lEOF.Close() + tries := uint32(0) + go func() { + for { + conn, err := lEOF.Accept() + if err != nil { + return + } + atomic.AddUint32(&tries, 1) + conn.Close() + } + }() + eofURL := integration.UrlScheme + "://" + lEOF.Addr().String() + cli := integration.MustNewHTTPClient(t, []string{eofURL, eofURL}, nil) + kapi := client.NewKeysAPI(cli) + for i, f := range noRetryList(kapi) { + startTries := atomic.LoadUint32(&tries) + if err := f(); err == nil { + t.Errorf("#%d: expected EOF error, got nil", i) + } + endTries := atomic.LoadUint32(&tries) + if startTries+1 != endTries { + t.Errorf("#%d: expected 1 try, got %d", i, endTries-startTries) + } + } +} + +// TestV2NoRetryNoLeader tests destructive api calls won't retry if given an error code. +func TestV2NoRetryNoLeader(t *testing.T) { + defer testutil.AfterTest(t) + + lHttp := integration.NewListenerWithAddr(t, fmt.Sprintf("errHttp:123.%d.sock", os.Getpid())) + eh := &errHandler{errCode: http.StatusServiceUnavailable} + srv := httptest.NewUnstartedServer(eh) + defer lHttp.Close() + defer srv.Close() + srv.Listener = lHttp + go srv.Start() + lHttpURL := integration.UrlScheme + "://" + lHttp.Addr().String() + + cli := integration.MustNewHTTPClient(t, []string{lHttpURL, lHttpURL}, nil) + kapi := client.NewKeysAPI(cli) + // test error code + for i, f := range noRetryList(kapi) { + reqs := eh.reqs + if err := f(); err == nil || !strings.Contains(err.Error(), "no leader") { + t.Errorf("#%d: expected \"no leader\", got %v", i, err) + } + if eh.reqs != reqs+1 { + t.Errorf("#%d: expected 1 request, got %d", i, eh.reqs-reqs) + } + } +} + +// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused. +func TestV2RetryRefuse(t *testing.T) { + defer testutil.AfterTest(t) + cl := integration.NewCluster(t, 1) + cl.Launch(t) + defer cl.Terminate(t) + // test connection refused; expect no error failover + cli := integration.MustNewHTTPClient(t, []string{integration.UrlScheme + "://refuseconn:123", cl.URL(0)}, nil) + kapi := client.NewKeysAPI(cli) + if _, err := kapi.Set(context.Background(), "/delkey", "def", nil); err != nil { + t.Fatal(err) + } + for i, f := range noRetryList(kapi) { + if err := f(); err != nil { + t.Errorf("#%d: unexpected retry failure (%v)", i, err) + } + } +} + +type errHandler struct { + errCode int + reqs int +} + +func (eh *errHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + req.Body.Close() + eh.reqs++ + w.WriteHeader(eh.errCode) +} + +func noRetryList(kapi client.KeysAPI) []func() error { + return []func() error{ + func() error { + opts := &client.SetOptions{PrevExist: client.PrevNoExist} + _, err := kapi.Set(context.Background(), "/setkey", "bar", opts) + return err + }, + func() error { + _, err := kapi.Delete(context.Background(), "/delkey", nil) + return err + }, + } +} diff --git a/client/integration/main_test.go b/client/integration/main_test.go new file mode 100644 index 000000000..2913ce511 --- /dev/null +++ b/client/integration/main_test.go @@ -0,0 +1,20 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package integration + +import ( + "os" + "testing" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestMain(m *testing.M) { + v := m.Run() + if v == 0 && testutil.CheckLeakedGoroutine() { + os.Exit(1) + } + os.Exit(v) +} diff --git a/client/keys.go b/client/keys.go index 5c7a3d57d..62d5d506e 100644 --- a/client/keys.go +++ b/client/keys.go @@ -337,7 +337,11 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions act.Dir = opts.Dir } - resp, body, err := k.client.Do(ctx, act) + doCtx := ctx + if act.PrevExist == PrevNoExist { + doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue) + } + resp, body, err := k.client.Do(doCtx, act) if err != nil { return nil, err } @@ -385,7 +389,8 @@ func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOption act.Recursive = opts.Recursive } - resp, body, err := k.client.Do(ctx, act) + doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue) + resp, body, err := k.client.Do(doCtx, act) if err != nil { return nil, err } diff --git a/integration/cluster.go b/integration/cluster.go index 8f15f42cb..4b34bf494 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -54,8 +54,8 @@ const ( requestTimeout = 20 * time.Second basePort = 21000 - urlScheme = "unix" - urlSchemeTLS = "unixs" + UrlScheme = "unix" + UrlSchemeTLS = "unixs" ) var ( @@ -96,9 +96,9 @@ func init() { func schemeFromTLSInfo(tls *transport.TLSInfo) string { if tls == nil { - return urlScheme + return UrlScheme } - return urlSchemeTLS + return UrlSchemeTLS } func (c *cluster) fillClusterForMembers() error { @@ -257,7 +257,7 @@ func (c *cluster) addMember(t *testing.T) { } func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { - cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := ma.Add(ctx, peerURL); err != nil { @@ -277,7 +277,7 @@ func (c *cluster) AddMember(t *testing.T) { func (c *cluster) RemoveMember(t *testing.T, id uint64) { // send remove request to the cluster - cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if err := ma.Remove(ctx, types.ID(id).String()); err != nil { @@ -312,7 +312,7 @@ func (c *cluster) Terminate(t *testing.T) { func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { for _, u := range c.URLs() { - cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -391,10 +391,10 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { func newLocalListener(t *testing.T) net.Listener { c := atomic.AddInt64(&localListenCount, 1) addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid()) - return newListenerWithAddr(t, addr) + return NewListenerWithAddr(t, addr) } -func newListenerWithAddr(t *testing.T, addr string) net.Listener { +func NewListenerWithAddr(t *testing.T, addr string) net.Listener { l, err := transport.NewUnixListener(addr) if err != nil { t.Fatal(err) @@ -614,7 +614,7 @@ func (m *member) Launch() error { } func (m *member) WaitOK(t *testing.T) { - cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) + cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) kapi := client.NewKeysAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -678,12 +678,12 @@ func (m *member) Restart(t *testing.T) error { plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr) newPeerListeners := make([]net.Listener, 0) for _, ln := range m.PeerListeners { - newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String())) + newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String())) } m.PeerListeners = newPeerListeners newClientListeners := make([]net.Listener, 0) for _, ln := range m.ClientListeners { - newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) + newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String())) } m.ClientListeners = newClientListeners @@ -708,7 +708,7 @@ func (m *member) Terminate(t *testing.T) { plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr) } -func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { +func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { cfgtls := transport.TLSInfo{} if tls != nil { cfgtls = *tls diff --git a/integration/cluster_test.go b/integration/cluster_test.go index e11ffe454..1ecc3cd79 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -67,7 +67,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs(), nil) + dcc := MustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { @@ -90,7 +90,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs(), nil) + dcc := MustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { @@ -157,7 +157,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { func TestForceNewCluster(t *testing.T) { c := NewCluster(t, 3) c.Launch(t) - cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) + cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := kapi.Create(ctx, "/foo", "bar") @@ -184,7 +184,7 @@ func TestForceNewCluster(t *testing.T) { c.waitLeader(t, c.Members[:1]) // use new http client to init new connection - cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) + cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi = client.NewKeysAPI(cc) // ensure force restart keep the old data, and new cluster can make progress ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) @@ -273,7 +273,7 @@ func TestIssue2904(t *testing.T) { c.Members[1].Stop(t) // send remove member-1 request to the cluster. - cc := mustNewHTTPClient(t, c.URLs(), nil) + cc := MustNewHTTPClient(t, c.URLs(), nil) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) // the proposal is not committed because member 1 is stopped, but the @@ -337,7 +337,7 @@ func TestIssue3699(t *testing.T) { c.waitLeader(t, c.Members) // try to participate in cluster - cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil { @@ -350,7 +350,7 @@ func TestIssue3699(t *testing.T) { // a random key first, and check the new key could be got from all client urls // of the cluster. func clusterMustProgress(t *testing.T, membs []*member) { - cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil) + cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", rand.Int()) @@ -362,7 +362,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { for i, m := range membs { u := m.URL() - mcc := mustNewHTTPClient(t, []string{u}, nil) + mcc := MustNewHTTPClient(t, []string{u}, nil) mkapi := client.NewKeysAPI(mcc) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { diff --git a/integration/member_test.go b/integration/member_test.go index 19c711007..8ac8f772b 100644 --- a/integration/member_test.go +++ b/integration/member_test.go @@ -93,7 +93,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { resps := make([]*client.Response, 120) var err error for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}, nil) + cc := MustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) @@ -108,7 +108,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { m.WaitOK(t) for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}, nil) + cc := MustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) diff --git a/test b/test index 5c0496874..12340a63e 100755 --- a/test +++ b/test @@ -64,6 +64,7 @@ function integration_tests { intpid="$!" wait $e2epid wait $intpid + go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}