From 946b3cce1d2097c8f07368716b621d47a25e2cc6 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 6 Jul 2016 16:57:30 -0700 Subject: [PATCH 1/2] client: make set/delete one shot operations Old behavior would retry set and delete even if there's an error. This can lead to the client returning an error for deleting twice, instead of returning an error for an interdeterminate state. Fixes #5832 --- client/client.go | 11 +++++++++++ client/keys.go | 9 +++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index 17b63d3d9..4c4d41eb9 100644 --- a/client/client.go +++ b/client/client.go @@ -37,6 +37,10 @@ var ( ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured") ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available") errTooManyRedirectChecks = errors.New("client: too many redirect checks") + + // oneShotCtxValue is set on a context using WithValue(&oneShotValue) so + // that Do() will not retry a request + oneShotCtxValue interface{} ) var DefaultRequestTimeout = 5 * time.Second @@ -335,6 +339,7 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo var body []byte var err error cerr := &ClusterError{} + isOneShot := ctx.Value(&oneShotCtxValue) != nil for i := pinned; i < leps+pinned; i++ { k := i % leps @@ -348,6 +353,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo if err == context.Canceled || err == context.DeadlineExceeded { return nil, nil, err } + if isOneShot { + return nil, nil, err + } continue } if resp.StatusCode/100 == 5 { @@ -358,6 +366,9 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo default: cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode))) } + if isOneShot { + return nil, nil, cerr.Errors[0] + } continue } if k != pinned { diff --git a/client/keys.go b/client/keys.go index 5c7a3d57d..62d5d506e 100644 --- a/client/keys.go +++ b/client/keys.go @@ -337,7 +337,11 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions act.Dir = opts.Dir } - resp, body, err := k.client.Do(ctx, act) + doCtx := ctx + if act.PrevExist == PrevNoExist { + doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue) + } + resp, body, err := k.client.Do(doCtx, act) if err != nil { return nil, err } @@ -385,7 +389,8 @@ func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOption act.Recursive = opts.Recursive } - resp, body, err := k.client.Do(ctx, act) + doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue) + resp, body, err := k.client.Do(doCtx, act) if err != nil { return nil, err } From c30f89f1d089a14c4dae0888a8ebfaae0166ae6f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 7 Jul 2016 15:22:03 -0700 Subject: [PATCH 2/2] client/integration: test v2 client one shot operations --- client/integration/client_test.go | 134 ++++++++++++++++++++++++++++++ client/integration/main_test.go | 20 +++++ integration/cluster.go | 26 +++--- integration/cluster_test.go | 16 ++-- integration/member_test.go | 4 +- test | 1 + 6 files changed, 178 insertions(+), 23 deletions(-) create mode 100644 client/integration/client_test.go create mode 100644 client/integration/main_test.go diff --git a/client/integration/client_test.go b/client/integration/client_test.go new file mode 100644 index 000000000..ba4cb8b2b --- /dev/null +++ b/client/integration/client_test.go @@ -0,0 +1,134 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync/atomic" + "testing" + + "golang.org/x/net/context" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" +) + +// TestV2NoRetryEOF tests destructive api calls won't retry on a disconnection. +func TestV2NoRetryEOF(t *testing.T) { + defer testutil.AfterTest(t) + // generate an EOF response; specify address so appears first in sorted ep list + lEOF := integration.NewListenerWithAddr(t, fmt.Sprintf("eof:123.%d.sock", os.Getpid())) + defer lEOF.Close() + tries := uint32(0) + go func() { + for { + conn, err := lEOF.Accept() + if err != nil { + return + } + atomic.AddUint32(&tries, 1) + conn.Close() + } + }() + eofURL := integration.UrlScheme + "://" + lEOF.Addr().String() + cli := integration.MustNewHTTPClient(t, []string{eofURL, eofURL}, nil) + kapi := client.NewKeysAPI(cli) + for i, f := range noRetryList(kapi) { + startTries := atomic.LoadUint32(&tries) + if err := f(); err == nil { + t.Errorf("#%d: expected EOF error, got nil", i) + } + endTries := atomic.LoadUint32(&tries) + if startTries+1 != endTries { + t.Errorf("#%d: expected 1 try, got %d", i, endTries-startTries) + } + } +} + +// TestV2NoRetryNoLeader tests destructive api calls won't retry if given an error code. +func TestV2NoRetryNoLeader(t *testing.T) { + defer testutil.AfterTest(t) + + lHttp := integration.NewListenerWithAddr(t, fmt.Sprintf("errHttp:123.%d.sock", os.Getpid())) + eh := &errHandler{errCode: http.StatusServiceUnavailable} + srv := httptest.NewUnstartedServer(eh) + defer lHttp.Close() + defer srv.Close() + srv.Listener = lHttp + go srv.Start() + lHttpURL := integration.UrlScheme + "://" + lHttp.Addr().String() + + cli := integration.MustNewHTTPClient(t, []string{lHttpURL, lHttpURL}, nil) + kapi := client.NewKeysAPI(cli) + // test error code + for i, f := range noRetryList(kapi) { + reqs := eh.reqs + if err := f(); err == nil || !strings.Contains(err.Error(), "no leader") { + t.Errorf("#%d: expected \"no leader\", got %v", i, err) + } + if eh.reqs != reqs+1 { + t.Errorf("#%d: expected 1 request, got %d", i, eh.reqs-reqs) + } + } +} + +// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused. +func TestV2RetryRefuse(t *testing.T) { + defer testutil.AfterTest(t) + cl := integration.NewCluster(t, 1) + cl.Launch(t) + defer cl.Terminate(t) + // test connection refused; expect no error failover + cli := integration.MustNewHTTPClient(t, []string{integration.UrlScheme + "://refuseconn:123", cl.URL(0)}, nil) + kapi := client.NewKeysAPI(cli) + if _, err := kapi.Set(context.Background(), "/delkey", "def", nil); err != nil { + t.Fatal(err) + } + for i, f := range noRetryList(kapi) { + if err := f(); err != nil { + t.Errorf("#%d: unexpected retry failure (%v)", i, err) + } + } +} + +type errHandler struct { + errCode int + reqs int +} + +func (eh *errHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + req.Body.Close() + eh.reqs++ + w.WriteHeader(eh.errCode) +} + +func noRetryList(kapi client.KeysAPI) []func() error { + return []func() error{ + func() error { + opts := &client.SetOptions{PrevExist: client.PrevNoExist} + _, err := kapi.Set(context.Background(), "/setkey", "bar", opts) + return err + }, + func() error { + _, err := kapi.Delete(context.Background(), "/delkey", nil) + return err + }, + } +} diff --git a/client/integration/main_test.go b/client/integration/main_test.go new file mode 100644 index 000000000..2913ce511 --- /dev/null +++ b/client/integration/main_test.go @@ -0,0 +1,20 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package integration + +import ( + "os" + "testing" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestMain(m *testing.M) { + v := m.Run() + if v == 0 && testutil.CheckLeakedGoroutine() { + os.Exit(1) + } + os.Exit(v) +} diff --git a/integration/cluster.go b/integration/cluster.go index 8f15f42cb..4b34bf494 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -54,8 +54,8 @@ const ( requestTimeout = 20 * time.Second basePort = 21000 - urlScheme = "unix" - urlSchemeTLS = "unixs" + UrlScheme = "unix" + UrlSchemeTLS = "unixs" ) var ( @@ -96,9 +96,9 @@ func init() { func schemeFromTLSInfo(tls *transport.TLSInfo) string { if tls == nil { - return urlScheme + return UrlScheme } - return urlSchemeTLS + return UrlSchemeTLS } func (c *cluster) fillClusterForMembers() error { @@ -257,7 +257,7 @@ func (c *cluster) addMember(t *testing.T) { } func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { - cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := ma.Add(ctx, peerURL); err != nil { @@ -277,7 +277,7 @@ func (c *cluster) AddMember(t *testing.T) { func (c *cluster) RemoveMember(t *testing.T, id uint64) { // send remove request to the cluster - cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if err := ma.Remove(ctx, types.ID(id).String()); err != nil { @@ -312,7 +312,7 @@ func (c *cluster) Terminate(t *testing.T) { func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { for _, u := range c.URLs() { - cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -391,10 +391,10 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { func newLocalListener(t *testing.T) net.Listener { c := atomic.AddInt64(&localListenCount, 1) addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid()) - return newListenerWithAddr(t, addr) + return NewListenerWithAddr(t, addr) } -func newListenerWithAddr(t *testing.T, addr string) net.Listener { +func NewListenerWithAddr(t *testing.T, addr string) net.Listener { l, err := transport.NewUnixListener(addr) if err != nil { t.Fatal(err) @@ -614,7 +614,7 @@ func (m *member) Launch() error { } func (m *member) WaitOK(t *testing.T) { - cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) + cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) kapi := client.NewKeysAPI(cc) for { ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -678,12 +678,12 @@ func (m *member) Restart(t *testing.T) error { plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr) newPeerListeners := make([]net.Listener, 0) for _, ln := range m.PeerListeners { - newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String())) + newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String())) } m.PeerListeners = newPeerListeners newClientListeners := make([]net.Listener, 0) for _, ln := range m.ClientListeners { - newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) + newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String())) } m.ClientListeners = newClientListeners @@ -708,7 +708,7 @@ func (m *member) Terminate(t *testing.T) { plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr) } -func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { +func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { cfgtls := transport.TLSInfo{} if tls != nil { cfgtls = *tls diff --git a/integration/cluster_test.go b/integration/cluster_test.go index e11ffe454..1ecc3cd79 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -67,7 +67,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs(), nil) + dcc := MustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { @@ -90,7 +90,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { dc.Launch(t) defer dc.Terminate(t) // init discovery token space - dcc := mustNewHTTPClient(t, dc.URLs(), nil) + dcc := MustNewHTTPClient(t, dc.URLs(), nil) dkapi := client.NewKeysAPI(dcc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { @@ -157,7 +157,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { func TestForceNewCluster(t *testing.T) { c := NewCluster(t, 3) c.Launch(t) - cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) + cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) resp, err := kapi.Create(ctx, "/foo", "bar") @@ -184,7 +184,7 @@ func TestForceNewCluster(t *testing.T) { c.waitLeader(t, c.Members[:1]) // use new http client to init new connection - cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) + cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi = client.NewKeysAPI(cc) // ensure force restart keep the old data, and new cluster can make progress ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) @@ -273,7 +273,7 @@ func TestIssue2904(t *testing.T) { c.Members[1].Stop(t) // send remove member-1 request to the cluster. - cc := mustNewHTTPClient(t, c.URLs(), nil) + cc := MustNewHTTPClient(t, c.URLs(), nil) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) // the proposal is not committed because member 1 is stopped, but the @@ -337,7 +337,7 @@ func TestIssue3699(t *testing.T) { c.waitLeader(t, c.Members) // try to participate in cluster - cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) + cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil { @@ -350,7 +350,7 @@ func TestIssue3699(t *testing.T) { // a random key first, and check the new key could be got from all client urls // of the cluster. func clusterMustProgress(t *testing.T, membs []*member) { - cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil) + cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", rand.Int()) @@ -362,7 +362,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { for i, m := range membs { u := m.URL() - mcc := mustNewHTTPClient(t, []string{u}, nil) + mcc := MustNewHTTPClient(t, []string{u}, nil) mkapi := client.NewKeysAPI(mcc) mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { diff --git a/integration/member_test.go b/integration/member_test.go index 19c711007..8ac8f772b 100644 --- a/integration/member_test.go +++ b/integration/member_test.go @@ -93,7 +93,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { resps := make([]*client.Response, 120) var err error for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}, nil) + cc := MustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) @@ -108,7 +108,7 @@ func TestSnapshotAndRestartMember(t *testing.T) { m.WaitOK(t) for i := 0; i < 120; i++ { - cc := mustNewHTTPClient(t, []string{m.URL()}, nil) + cc := MustNewHTTPClient(t, []string{m.URL()}, nil) kapi := client.NewKeysAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) key := fmt.Sprintf("foo%d", i) diff --git a/test b/test index 5c0496874..12340a63e 100755 --- a/test +++ b/test @@ -64,6 +64,7 @@ function integration_tests { intpid="$!" wait $e2epid wait $intpid + go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}