From 8e26a1fff1e57a24a5cf48baae25c11794968fa4 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Thu, 20 Oct 2022 15:52:08 +0800 Subject: [PATCH] clientv3: fix the design & implementation of double barrier Check the client count before creating the ephemeral key, do not create the key if there are already too many clients. Check the count after creating the key again, if the total kvs is bigger than the expected count, then check the rev of the current key, and take action accordingly based on its rev. If its rev is in the first ${count}, then it's valid client, otherwise, it should fail. Signed-off-by: Benjamin Wang --- .../v3/experimental/recipes/double_barrier.go | 53 ++++++++++++--- .../recipes/v3_double_barrier_test.go | 66 +++++++++++++++++++ 2 files changed, 109 insertions(+), 10 deletions(-) diff --git a/client/v3/experimental/recipes/double_barrier.go b/client/v3/experimental/recipes/double_barrier.go index eac5d4f7f..0fce8d0e7 100644 --- a/client/v3/experimental/recipes/double_barrier.go +++ b/client/v3/experimental/recipes/double_barrier.go @@ -45,25 +45,46 @@ func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarr // Enter waits for "count" processes to enter the barrier then returns func (b *DoubleBarrier) Enter() error { client := b.s.Client() + + // Check the entered clients before creating the UniqueEphemeralKey, + // fail the request if there are already too many clients. + if resp1, err := b.enteredClients(client); err != nil { + return err + } else if len(resp1.Kvs) >= b.count { + return ErrTooManyClients + } + ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters") if err != nil { return err } b.myKey = ek - resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix()) + // Check the entered clients after creating the UniqueEphemeralKey + resp2, err := b.enteredClients(client) if err != nil { return err } + if len(resp2.Kvs) >= b.count { + lastWaiter := resp2.Kvs[b.count-1] + if ek.rev > lastWaiter.CreateRevision { + // delete itself now, otherwise other processes may need to wait + // until these keys are automatically deleted when the related + // lease expires. + if err = b.myKey.Delete(); err != nil { + // Nothing to do here. We have to wait for the key to be + // deleted when the lease expires. + } + return ErrTooManyClients + } - if len(resp.Kvs) > b.count { - return ErrTooManyClients - } - - if len(resp.Kvs) == b.count { - // unblock waiters - _, err = client.Put(b.ctx, b.key+"/ready", "") - return err + if ek.rev == lastWaiter.CreateRevision { + // TODO(ahrtr): we might need to compare ek.key and + // string(lastWaiter.Key), they should be equal. + // unblock all other waiters + _, err = client.Put(b.ctx, b.key+"/ready", "") + return err + } } _, err = WaitEvents( @@ -74,6 +95,18 @@ func (b *DoubleBarrier) Enter() error { return err } +// enteredClients gets all the entered clients, which are ordered by the +// createRevision in ascending order. +func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) { + resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend)) + if err != nil { + return nil, err + } + + return resp, nil +} + // Leave waits for "count" processes to leave the barrier then returns func (b *DoubleBarrier) Leave() error { client := b.s.Client() @@ -96,7 +129,7 @@ func (b *DoubleBarrier) Leave() error { } isLowest := string(lowest.Key) == b.myKey.Key() - if len(resp.Kvs) == 1 { + if len(resp.Kvs) == 1 && isLowest { // this is the only node in the barrier; finish up if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil { return err diff --git a/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go b/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go index 463bb6051..d536e4528 100644 --- a/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go +++ b/tests/integration/clientv3/experimental/recipes/v3_double_barrier_test.go @@ -15,9 +15,14 @@ package recipes_test import ( + "context" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" + + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" recipe "go.etcd.io/etcd/client/v3/experimental/recipes" "go.etcd.io/etcd/tests/v3/integration" @@ -97,6 +102,67 @@ func TestDoubleBarrier(t *testing.T) { } } +func TestDoubleBarrierTooManyClients(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + waiters := 10 + session, err := concurrency.NewSession(clus.RandClient()) + if err != nil { + t.Error(err) + } + defer session.Orphan() + + b := recipe.NewDoubleBarrier(session, "test-barrier", waiters) + donec := make(chan struct{}) + var ( + wgDone sync.WaitGroup // make sure all clients have finished the tasks + wgEntered sync.WaitGroup // make sure all clients have entered the double barrier + ) + wgDone.Add(waiters) + wgEntered.Add(waiters) + for i := 0; i < waiters; i++ { + go func() { + defer wgDone.Done() + session, err := concurrency.NewSession(clus.RandClient()) + if err != nil { + t.Error(err) + } + defer session.Orphan() + + bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters) + if err := bb.Enter(); err != nil { + t.Errorf("could not enter on barrier (%v)", err) + } + wgEntered.Done() + <-donec + if err := bb.Leave(); err != nil { + t.Errorf("could not leave on barrier (%v)", err) + } + }() + } + + // Wait until all clients have already entered the double barrier, so + // no any other client can enter the barrier. + wgEntered.Wait() + t.Log("Try to enter into double barrier") + if err := b.Enter(); err != recipe.ErrTooManyClients { + t.Errorf("Unexcepted error, expected: ErrTooManyClients, got: %v", err) + } + + resp, err := clus.RandClient().Get(context.TODO(), "test-barrier/waiters", clientv3.WithPrefix()) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + // Make sure the extra `b.Enter()` did not create a new ephemeral key + assert.Equal(t, waiters, len(resp.Kvs)) + close(donec) + + wgDone.Wait() +} + func TestDoubleBarrierFailover(t *testing.T) { integration.BeforeTest(t)