From 45b1e6b470eec2eaf899469b9d4e98c6c32f1f43 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 25 Feb 2021 17:16:07 +0100 Subject: [PATCH] ClientV3: Ordering: Fix the ordering test such it does not fail. The test depended on very subtle timing semantic and on properties of 'copied' clients. https://travis-ci.com/github/etcd-io/etcd/jobs/486191449 Examplar failure: ``` {"level":"warn","ts":"2021-02-25T12:34:47.894Z","caller":"v3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc0000d6fc0/#initially=[unix://localhost:86269902489114839060]","attempt":1,"error":"rpc error: code = Unavailable desc = etcdserver: rpc not supported for learner"} {"level":"warn","ts":"2021-02-25T12:34:48.163Z","caller":"v3/retry_interceptor.go:62","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0xc00035a000/#initially=[unix://localhost:78285857058450835940]","attempt":0,"error":"rpc error: code = FailedPrecondition desc = etcdserver: not leader"} {"level":"info","ts":"2021-02-25T12:34:48.255Z","caller":"v3/maintenance.go:211","msg":"opened snapshot stream; downloading"} {"level":"warn","ts":"2021-02-25T12:34:48.255Z","caller":"v3/maintenance.go:221","msg":"failed to receive from snapshot stream; closing","error":"rpc error: code = Canceled desc = context canceled"} {"level":"info","ts":"2021-02-25T12:34:48.255Z","caller":"v3/maintenance.go:211","msg":"opened snapshot stream; downloading"} {"level":"info","ts":"2021-02-25T12:34:50.255Z","caller":"v3/maintenance.go:219","msg":"completed snapshot read; closing"} {"level":"info","ts":"2021-02-25T12:34:51.717Z","caller":"v3/maintenance.go:211","msg":"opened snapshot stream; downloading"} {"level":"warn","ts":"2021-02-25T12:34:52.017Z","caller":"v3/maintenance.go:221","msg":"failed to receive from snapshot stream; closing","error":"rpc error: code = Canceled desc = context canceled"} {"level":"info","ts":"2021-02-25T12:34:52.018Z","caller":"v3/maintenance.go:211","msg":"opened snapshot stream; downloading"} {"level":"warn","ts":"2021-02-25T12:34:53.018Z","caller":"v3/maintenance.go:221","msg":"failed to receive from snapshot stream; closing","error":"rpc error: code = DeadlineExceeded desc = context deadline exceeded"} --- FAIL: TestEndpointSwitchResolvesViolation (10.12s) ordering_util_test.go:81: failed to resolve order violation etcdclient: no cluster members have a revision higher than the previously received revision ``` --- client/v3/ordering/util.go | 31 +++++++------------ server/etcdmain/grpc_proxy.go | 2 +- .../clientv3/ordering_util_test.go | 21 +++++-------- 3 files changed, 19 insertions(+), 35 deletions(-) diff --git a/client/v3/ordering/util.go b/client/v3/ordering/util.go index bd9d31f5c..f8f65c4c9 100644 --- a/client/v3/ordering/util.go +++ b/client/v3/ordering/util.go @@ -16,8 +16,7 @@ package ordering import ( "errors" - "sync" - "time" + "sync/atomic" "go.etcd.io/etcd/client/v3" ) @@ -26,26 +25,18 @@ type OrderViolationFunc func(op clientv3.Op, resp clientv3.OpResponse, prevRev i var ErrNoGreaterRev = errors.New("etcdclient: no cluster members have a revision higher than the previously received revision") -func NewOrderViolationSwitchEndpointClosure(c clientv3.Client) OrderViolationFunc { - var mu sync.Mutex - violationCount := 0 - return func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error { - if violationCount > len(c.Endpoints()) { +func NewOrderViolationSwitchEndpointClosure(c *clientv3.Client) OrderViolationFunc { + violationCount := int32(0) + return func(_ clientv3.Op, _ clientv3.OpResponse, _ int64) error { + // Each request is assigned by round-robin load-balancer's picker to a different + // endpoints. If we cycled them 5 times (even with some level of concurrency), + // with high probability no endpoint points on a member with fresh data. + // TODO: Ideally we should track members (resp.opp.Header) that returned + // stale result and explicitly temporarily disable them in 'picker'. + if atomic.LoadInt32(&violationCount) > int32(5*len(c.Endpoints())) { return ErrNoGreaterRev } - mu.Lock() - defer mu.Unlock() - eps := c.Endpoints() - // force client to connect to given endpoint by limiting to a single endpoint - c.SetEndpoints(eps[violationCount%len(eps)]) - // give enough time for operation - time.Sleep(1 * time.Second) - // set available endpoints back to all endpoints in to ensure - // the client has access to all the endpoints. - c.SetEndpoints(eps...) - // give enough time for operation - time.Sleep(1 * time.Second) - violationCount++ + atomic.AddInt32(&violationCount, 1) return nil } } diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 0c1e551bb..859d99f4c 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -386,7 +386,7 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { if grpcProxyEnableOrdering { - vf := ordering.NewOrderViolationSwitchEndpointClosure(*client) + vf := ordering.NewOrderViolationSwitchEndpointClosure(client) client.KV = ordering.NewKV(client.KV, vf) lg.Info("waiting for linearized read from cluster to recover ordering") for { diff --git a/tests/integration/clientv3/ordering_util_test.go b/tests/integration/clientv3/ordering_util_test.go index efce2bb0d..5ba239e66 100644 --- a/tests/integration/clientv3/ordering_util_test.go +++ b/tests/integration/clientv3/ordering_util_test.go @@ -62,23 +62,20 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) { t.Fatal(err) } - // reset client endpoints to all members such that the copy of cli sent to - // NewOrderViolationSwitchEndpointClosure will be able to - // access the full list of endpoints. cli.SetEndpoints(eps...) - OrderingKv := ordering.NewKV(cli.KV, ordering.NewOrderViolationSwitchEndpointClosure(*cli)) + orderingKv := ordering.NewKV(cli.KV, ordering.NewOrderViolationSwitchEndpointClosure(cli)) // set prevRev to the second member's revision of "foo" such that // the revision is higher than the third member's revision of "foo" - _, err = OrderingKv.Get(ctx, "foo") + _, err = orderingKv.Get(ctx, "foo") if err != nil { t.Fatal(err) } + t.Logf("Reconfigure client to speak only to the 'partitioned' member") cli.SetEndpoints(clus.Members[2].GRPCAddr()) - time.Sleep(1 * time.Second) // give enough time for operation - _, err = OrderingKv.Get(ctx, "foo", clientv3.WithSerializable()) - if err != nil { - t.Fatalf("failed to resolve order violation %v", err) + _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + if err != ordering.ErrNoGreaterRev { + t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error") } } @@ -121,12 +118,9 @@ func TestUnresolvableOrderViolation(t *testing.T) { t.Fatal(err) } - // reset client endpoints to all members such that the copy of cli sent to - // NewOrderViolationSwitchEndpointClosure will be able to - // access the full list of endpoints. cli.SetEndpoints(eps...) time.Sleep(1 * time.Second) // give enough time for operation - OrderingKv := ordering.NewKV(cli.KV, ordering.NewOrderViolationSwitchEndpointClosure(*cli)) + OrderingKv := ordering.NewKV(cli.KV, ordering.NewOrderViolationSwitchEndpointClosure(cli)) // set prevRev to the first member's revision of "foo" such that // the revision is higher than the fourth and fifth members' revision of "foo" _, err = OrderingKv.Get(ctx, "foo") @@ -147,7 +141,6 @@ func TestUnresolvableOrderViolation(t *testing.T) { } clus.Members[3].WaitStarted(t) cli.SetEndpoints(clus.Members[3].GRPCAddr()) - time.Sleep(5 * time.Second) // give enough time for operation _, err = OrderingKv.Get(ctx, "foo", clientv3.WithSerializable()) if err != ordering.ErrNoGreaterRev {