From 501d8f01eabd9ba7a3c0a54b60420b27fdbce4bc Mon Sep 17 00:00:00 2001 From: Sam Batschelet Date: Wed, 23 Jun 2021 21:26:55 -0400 Subject: [PATCH] [release-3.4]: ClientV3: Ordering: Fix TestEndpointSwitchResolvesViolation test Signed-off-by: Sam Batschelet --- clientv3/ordering/util.go | 31 +++++++++++-------------------- clientv3/ordering/util_test.go | 14 +++++++------- etcdmain/grpc_proxy.go | 2 +- 3 files changed, 19 insertions(+), 28 deletions(-) diff --git a/clientv3/ordering/util.go b/clientv3/ordering/util.go index f08740cb4..9ad9dc9a9 100644 --- a/clientv3/ordering/util.go +++ b/clientv3/ordering/util.go @@ -16,8 +16,7 @@ package ordering import ( "errors" - "sync" - "time" + "sync/atomic" "go.etcd.io/etcd/clientv3" ) @@ -26,26 +25,18 @@ type OrderViolationFunc func(op clientv3.Op, resp clientv3.OpResponse, prevRev i var ErrNoGreaterRev = errors.New("etcdclient: no cluster members have a revision higher than the previously received revision") -func NewOrderViolationSwitchEndpointClosure(c clientv3.Client) OrderViolationFunc { - var mu sync.Mutex - violationCount := 0 - return func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error { - if violationCount > len(c.Endpoints()) { +func NewOrderViolationSwitchEndpointClosure(c *clientv3.Client) OrderViolationFunc { + violationCount := int32(0) + return func(_ clientv3.Op, _ clientv3.OpResponse, _ int64) error { + // Each request is assigned by round-robin load-balancer's picker to a different + // endpoints. If we cycled them 5 times (even with some level of concurrency), + // with high probability no endpoint points on a member with fresh data. + // TODO: Ideally we should track members (resp.opp.Header) that returned + // stale result and explicitly temporarily disable them in 'picker'. + if atomic.LoadInt32(&violationCount) > int32(5*len(c.Endpoints())) { return ErrNoGreaterRev } - mu.Lock() - defer mu.Unlock() - eps := c.Endpoints() - // force client to connect to given endpoint by limiting to a single endpoint - c.SetEndpoints(eps[violationCount%len(eps)]) - // give enough time for operation - time.Sleep(1 * time.Second) - // set available endpoints back to all endpoints in to ensure - // the client has access to all the endpoints. - c.SetEndpoints(eps...) - // give enough time for operation - time.Sleep(1 * time.Second) - violationCount++ + atomic.AddInt32(&violationCount, 1) return nil } } diff --git a/clientv3/ordering/util_test.go b/clientv3/ordering/util_test.go index f903baae8..477f9be8c 100644 --- a/clientv3/ordering/util_test.go +++ b/clientv3/ordering/util_test.go @@ -64,19 +64,19 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) { // NewOrderViolationSwitchEndpointClosure will be able to // access the full list of endpoints. cli.SetEndpoints(eps...) - OrderingKv := NewKV(cli.KV, NewOrderViolationSwitchEndpointClosure(*cli)) + orderingKv := NewKV(cli.KV, NewOrderViolationSwitchEndpointClosure(cli)) // set prevRev to the second member's revision of "foo" such that // the revision is higher than the third member's revision of "foo" - _, err = OrderingKv.Get(ctx, "foo") + _, err = orderingKv.Get(ctx, "foo") if err != nil { t.Fatal(err) } + t.Logf("Reconfigure client to speak only to the 'partitioned' member") cli.SetEndpoints(clus.Members[2].GRPCAddr()) - time.Sleep(1 * time.Second) // give enough time for operation - _, err = OrderingKv.Get(ctx, "foo", clientv3.WithSerializable()) - if err != nil { - t.Fatalf("failed to resolve order violation %v", err) + _, err = orderingKv.Get(ctx, "foo", clientv3.WithSerializable()) + if err != ErrNoGreaterRev { + t.Fatal("While speaking to partitioned leader, we should get ErrNoGreaterRev error") } } @@ -123,7 +123,7 @@ func TestUnresolvableOrderViolation(t *testing.T) { // access the full list of endpoints. cli.SetEndpoints(eps...) time.Sleep(1 * time.Second) // give enough time for operation - OrderingKv := NewKV(cli.KV, NewOrderViolationSwitchEndpointClosure(*cli)) + OrderingKv := NewKV(cli.KV, NewOrderViolationSwitchEndpointClosure(cli)) // set prevRev to the first member's revision of "foo" such that // the revision is higher than the fourth and fifth members' revision of "foo" _, err = OrderingKv.Get(ctx, "foo") diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 244fae74a..323d1d942 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -326,7 +326,7 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { if grpcProxyEnableOrdering { - vf := ordering.NewOrderViolationSwitchEndpointClosure(*client) + vf := ordering.NewOrderViolationSwitchEndpointClosure(client) client.KV = ordering.NewKV(client.KV, vf) lg.Info("waiting for linearized read from cluster to recover ordering") for {