mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Check the client count before creating the ephemeral key, do not create the key if there are already too many clients. Check the count after creating the key again, if the total kvs is bigger than the expected count, then check the rev of the current key, and take action accordingly based on its rev. If its rev is in the first ${count}, then it's valid client, otherwise, it should fail. Signed-off-by: Benjamin Wang <wachao@vmware.com>
172 lines
4.5 KiB
Go
172 lines
4.5 KiB
Go
// Copyright 2016 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package recipe
|
|
|
|
import (
|
|
"context"
|
|
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
"go.etcd.io/etcd/client/v3"
|
|
"go.etcd.io/etcd/client/v3/concurrency"
|
|
)
|
|
|
|
// DoubleBarrier blocks processes on Enter until an expected count enters, then
|
|
// blocks again on Leave until all processes have left.
|
|
type DoubleBarrier struct {
|
|
s *concurrency.Session
|
|
ctx context.Context
|
|
|
|
key string // key for the collective barrier
|
|
count int
|
|
myKey *EphemeralKV // current key for this process on the barrier
|
|
}
|
|
|
|
func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier {
|
|
return &DoubleBarrier{
|
|
s: s,
|
|
ctx: context.TODO(),
|
|
key: key,
|
|
count: count,
|
|
}
|
|
}
|
|
|
|
// Enter waits for "count" processes to enter the barrier then returns
|
|
func (b *DoubleBarrier) Enter() error {
|
|
client := b.s.Client()
|
|
|
|
// Check the entered clients before creating the UniqueEphemeralKey,
|
|
// fail the request if there are already too many clients.
|
|
if resp1, err := b.enteredClients(client); err != nil {
|
|
return err
|
|
} else if len(resp1.Kvs) >= b.count {
|
|
return ErrTooManyClients
|
|
}
|
|
|
|
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.myKey = ek
|
|
|
|
// Check the entered clients after creating the UniqueEphemeralKey
|
|
resp2, err := b.enteredClients(client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(resp2.Kvs) >= b.count {
|
|
lastWaiter := resp2.Kvs[b.count-1]
|
|
if ek.rev > lastWaiter.CreateRevision {
|
|
// delete itself now, otherwise other processes may need to wait
|
|
// until these keys are automatically deleted when the related
|
|
// lease expires.
|
|
if err = b.myKey.Delete(); err != nil {
|
|
// Nothing to do here. We have to wait for the key to be
|
|
// deleted when the lease expires.
|
|
}
|
|
return ErrTooManyClients
|
|
}
|
|
|
|
if ek.rev == lastWaiter.CreateRevision {
|
|
// TODO(ahrtr): we might need to compare ek.key and
|
|
// string(lastWaiter.Key), they should be equal.
|
|
// unblock all other waiters
|
|
_, err = client.Put(b.ctx, b.key+"/ready", "")
|
|
return err
|
|
}
|
|
}
|
|
|
|
_, err = WaitEvents(
|
|
client,
|
|
b.key+"/ready",
|
|
ek.Revision(),
|
|
[]mvccpb.Event_EventType{mvccpb.PUT})
|
|
return err
|
|
}
|
|
|
|
// enteredClients gets all the entered clients, which are ordered by the
|
|
// createRevision in ascending order.
|
|
func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) {
|
|
resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
|
|
clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// Leave waits for "count" processes to leave the barrier then returns
|
|
func (b *DoubleBarrier) Leave() error {
|
|
client := b.s.Client()
|
|
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(resp.Kvs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
lowest, highest := resp.Kvs[0], resp.Kvs[0]
|
|
for _, k := range resp.Kvs {
|
|
if k.ModRevision < lowest.ModRevision {
|
|
lowest = k
|
|
}
|
|
if k.ModRevision > highest.ModRevision {
|
|
highest = k
|
|
}
|
|
}
|
|
isLowest := string(lowest.Key) == b.myKey.Key()
|
|
|
|
if len(resp.Kvs) == 1 && isLowest {
|
|
// this is the only node in the barrier; finish up
|
|
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
|
|
return err
|
|
}
|
|
return b.myKey.Delete()
|
|
}
|
|
|
|
// this ensures that if a process fails, the ephemeral lease will be
|
|
// revoked, its barrier key is removed, and the barrier can resume
|
|
|
|
// lowest process in node => wait on highest process
|
|
if isLowest {
|
|
_, err = WaitEvents(
|
|
client,
|
|
string(highest.Key),
|
|
highest.ModRevision,
|
|
[]mvccpb.Event_EventType{mvccpb.DELETE})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return b.Leave()
|
|
}
|
|
|
|
// delete self and wait on lowest process
|
|
if err = b.myKey.Delete(); err != nil {
|
|
return err
|
|
}
|
|
|
|
key := string(lowest.Key)
|
|
_, err = WaitEvents(
|
|
client,
|
|
key,
|
|
lowest.ModRevision,
|
|
[]mvccpb.Event_EventType{mvccpb.DELETE})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return b.Leave()
|
|
}
|