clientv3: fix the design & implementation of double barrier

Check the client count before creating the ephemeral key, do not
create the key if there are already too many clients. Check the
count after creating the key again, if the total kvs is bigger
than the expected count, then check the rev of the current key,
and take action accordingly based on its rev. If its rev is in
the first ${count}, then it's valid client, otherwise, it should
fail.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
Benjamin Wang
2022-10-20 15:52:08 +08:00
parent 0a0f0e3617
commit 8e26a1fff1
2 changed files with 109 additions and 10 deletions

View File

@@ -45,25 +45,46 @@ func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarr
// Enter waits for "count" processes to enter the barrier then returns
func (b *DoubleBarrier) Enter() error {
client := b.s.Client()
// Check the entered clients before creating the UniqueEphemeralKey,
// fail the request if there are already too many clients.
if resp1, err := b.enteredClients(client); err != nil {
return err
} else if len(resp1.Kvs) >= b.count {
return ErrTooManyClients
}
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
if err != nil {
return err
}
b.myKey = ek
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
// Check the entered clients after creating the UniqueEphemeralKey
resp2, err := b.enteredClients(client)
if err != nil {
return err
}
if len(resp2.Kvs) >= b.count {
lastWaiter := resp2.Kvs[b.count-1]
if ek.rev > lastWaiter.CreateRevision {
// delete itself now, otherwise other processes may need to wait
// until these keys are automatically deleted when the related
// lease expires.
if err = b.myKey.Delete(); err != nil {
// Nothing to do here. We have to wait for the key to be
// deleted when the lease expires.
}
return ErrTooManyClients
}
if len(resp.Kvs) > b.count {
return ErrTooManyClients
}
if len(resp.Kvs) == b.count {
// unblock waiters
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
if ek.rev == lastWaiter.CreateRevision {
// TODO(ahrtr): we might need to compare ek.key and
// string(lastWaiter.Key), they should be equal.
// unblock all other waiters
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
}
}
_, err = WaitEvents(
@@ -74,6 +95,18 @@ func (b *DoubleBarrier) Enter() error {
return err
}
// enteredClients gets all the entered clients, which are ordered by the
// createRevision in ascending order.
func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) {
resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
if err != nil {
return nil, err
}
return resp, nil
}
// Leave waits for "count" processes to leave the barrier then returns
func (b *DoubleBarrier) Leave() error {
client := b.s.Client()
@@ -96,7 +129,7 @@ func (b *DoubleBarrier) Leave() error {
}
isLowest := string(lowest.Key) == b.myKey.Key()
if len(resp.Kvs) == 1 {
if len(resp.Kvs) == 1 && isLowest {
// this is the only node in the barrier; finish up
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
return err