mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
contrib,clientv3: Move contrib/recipies to clientv3/experimental/recipies/...
Recipies is set of patterns / primitives implementation on top of clientv3. It's used by integration tests. It shouldn't be considered "server" code.
This commit is contained in:
66
client/v3/experimental/recipes/barrier.go
Normal file
66
client/v3/experimental/recipes/barrier.go
Normal file
@@ -0,0 +1,66 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
// Barrier creates a key in etcd to block processes, then deletes the key to
|
||||
// release all blocked processes.
|
||||
type Barrier struct {
|
||||
client *v3.Client
|
||||
ctx context.Context
|
||||
|
||||
key string
|
||||
}
|
||||
|
||||
func NewBarrier(client *v3.Client, key string) *Barrier {
|
||||
return &Barrier{client, context.TODO(), key}
|
||||
}
|
||||
|
||||
// Hold creates the barrier key causing processes to block on Wait.
|
||||
func (b *Barrier) Hold() error {
|
||||
_, err := newKey(b.client, b.key, v3.NoLease)
|
||||
return err
|
||||
}
|
||||
|
||||
// Release deletes the barrier key to unblock all waiting processes.
|
||||
func (b *Barrier) Release() error {
|
||||
_, err := b.client.Delete(b.ctx, b.key)
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait blocks on the barrier key until it is deleted. If there is no key, Wait
|
||||
// assumes Release has already been called and returns immediately.
|
||||
func (b *Barrier) Wait() error {
|
||||
resp, err := b.client.Get(b.ctx, b.key, v3.WithFirstKey()...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.Kvs) == 0 {
|
||||
// key already removed
|
||||
return nil
|
||||
}
|
||||
_, err = WaitEvents(
|
||||
b.client,
|
||||
b.key,
|
||||
resp.Header.Revision,
|
||||
[]mvccpb.Event_EventType{mvccpb.PUT, mvccpb.DELETE})
|
||||
return err
|
||||
}
|
||||
55
client/v3/experimental/recipes/client.go
Normal file
55
client/v3/experimental/recipes/client.go
Normal file
@@ -0,0 +1,55 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
spb "go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrKeyExists = errors.New("key already exists")
|
||||
ErrWaitMismatch = errors.New("unexpected wait result")
|
||||
ErrTooManyClients = errors.New("too many clients")
|
||||
ErrNoWatcher = errors.New("no watcher channel")
|
||||
)
|
||||
|
||||
// deleteRevKey deletes a key by revision, returning false if key is missing
|
||||
func deleteRevKey(kv v3.KV, key string, rev int64) (bool, error) {
|
||||
cmp := v3.Compare(v3.ModRevision(key), "=", rev)
|
||||
req := v3.OpDelete(key)
|
||||
txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
|
||||
if err != nil {
|
||||
return false, err
|
||||
} else if !txnresp.Succeeded {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func claimFirstKey(kv v3.KV, kvs []*spb.KeyValue) (*spb.KeyValue, error) {
|
||||
for _, k := range kvs {
|
||||
ok, err := deleteRevKey(kv, string(k.Key), k.ModRevision)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if ok {
|
||||
return k, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
17
client/v3/experimental/recipes/doc.go
Normal file
17
client/v3/experimental/recipes/doc.go
Normal file
@@ -0,0 +1,17 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package recipe contains experimental client-side distributed
|
||||
// synchronization primitives.
|
||||
package recipe
|
||||
138
client/v3/experimental/recipes/double_barrier.go
Normal file
138
client/v3/experimental/recipes/double_barrier.go
Normal file
@@ -0,0 +1,138 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
)
|
||||
|
||||
// DoubleBarrier blocks processes on Enter until an expected count enters, then
|
||||
// blocks again on Leave until all processes have left.
|
||||
type DoubleBarrier struct {
|
||||
s *concurrency.Session
|
||||
ctx context.Context
|
||||
|
||||
key string // key for the collective barrier
|
||||
count int
|
||||
myKey *EphemeralKV // current key for this process on the barrier
|
||||
}
|
||||
|
||||
func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier {
|
||||
return &DoubleBarrier{
|
||||
s: s,
|
||||
ctx: context.TODO(),
|
||||
key: key,
|
||||
count: count,
|
||||
}
|
||||
}
|
||||
|
||||
// Enter waits for "count" processes to enter the barrier then returns
|
||||
func (b *DoubleBarrier) Enter() error {
|
||||
client := b.s.Client()
|
||||
ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.myKey = ek
|
||||
|
||||
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(resp.Kvs) > b.count {
|
||||
return ErrTooManyClients
|
||||
}
|
||||
|
||||
if len(resp.Kvs) == b.count {
|
||||
// unblock waiters
|
||||
_, err = client.Put(b.ctx, b.key+"/ready", "")
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = WaitEvents(
|
||||
client,
|
||||
b.key+"/ready",
|
||||
ek.Revision(),
|
||||
[]mvccpb.Event_EventType{mvccpb.PUT})
|
||||
return err
|
||||
}
|
||||
|
||||
// Leave waits for "count" processes to leave the barrier then returns
|
||||
func (b *DoubleBarrier) Leave() error {
|
||||
client := b.s.Client()
|
||||
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(resp.Kvs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
lowest, highest := resp.Kvs[0], resp.Kvs[0]
|
||||
for _, k := range resp.Kvs {
|
||||
if k.ModRevision < lowest.ModRevision {
|
||||
lowest = k
|
||||
}
|
||||
if k.ModRevision > highest.ModRevision {
|
||||
highest = k
|
||||
}
|
||||
}
|
||||
isLowest := string(lowest.Key) == b.myKey.Key()
|
||||
|
||||
if len(resp.Kvs) == 1 {
|
||||
// this is the only node in the barrier; finish up
|
||||
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
|
||||
return err
|
||||
}
|
||||
return b.myKey.Delete()
|
||||
}
|
||||
|
||||
// this ensures that if a process fails, the ephemeral lease will be
|
||||
// revoked, its barrier key is removed, and the barrier can resume
|
||||
|
||||
// lowest process in node => wait on highest process
|
||||
if isLowest {
|
||||
_, err = WaitEvents(
|
||||
client,
|
||||
string(highest.Key),
|
||||
highest.ModRevision,
|
||||
[]mvccpb.Event_EventType{mvccpb.DELETE})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Leave()
|
||||
}
|
||||
|
||||
// delete self and wait on lowest process
|
||||
if err = b.myKey.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := string(lowest.Key)
|
||||
_, err = WaitEvents(
|
||||
client,
|
||||
key,
|
||||
lowest.ModRevision,
|
||||
[]mvccpb.Event_EventType{mvccpb.DELETE})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Leave()
|
||||
}
|
||||
74
client/v3/experimental/recipes/grpc_gateway/user_add.sh
Normal file
74
client/v3/experimental/recipes/grpc_gateway/user_add.sh
Normal file
@@ -0,0 +1,74 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Copyright 2018 The etcd Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
usage () {
|
||||
echo 'Username required: ./user_add.sh $username'
|
||||
exit
|
||||
}
|
||||
|
||||
if [ "$1" == "" ]; then
|
||||
usage
|
||||
fi
|
||||
|
||||
newuser=$1
|
||||
read -r -s -p "Enter password for $newuser" newpass
|
||||
|
||||
user=root
|
||||
pass=toor
|
||||
host=127.0.0.1
|
||||
port=2379
|
||||
api=v3
|
||||
|
||||
cacert="path/to/ca.pem"
|
||||
key="path/to/client-key.pem"
|
||||
cert="path/to/client.pem"
|
||||
|
||||
tokengen() {
|
||||
json=$(printf '{"name": "%s", "password": "%s"}' \
|
||||
"$(escape "$1")" \
|
||||
"$(escape "$2")"
|
||||
)
|
||||
curl -s --cacert $cacert \
|
||||
--key $key \
|
||||
--cert $cert \
|
||||
-X POST \
|
||||
-d "$json" \
|
||||
https://${host}:${port}/${api}/auth/authenticate \
|
||||
| jq -r '.token'
|
||||
}
|
||||
|
||||
add_user() {
|
||||
json=$(printf '{"name": "%s", "password": "%s"}' \
|
||||
"$(escape "$1")" \
|
||||
"$(escape "$2")"
|
||||
)
|
||||
curl -s --cacert $cacert \
|
||||
--key $key \
|
||||
--cert $cert \
|
||||
-H "Authorization: $3" \
|
||||
-X POST \
|
||||
-d "$json" \
|
||||
https://${host}:${port}/${api}/auth/user/add
|
||||
}
|
||||
|
||||
escape() {
|
||||
echo "${1//\"/\\\"}"
|
||||
}
|
||||
|
||||
token=$(tokengen $user $pass)
|
||||
response=$(add_user $newuser $newpass $token)
|
||||
|
||||
echo -e "\\n$response"
|
||||
163
client/v3/experimental/recipes/key.go
Normal file
163
client/v3/experimental/recipes/key.go
Normal file
@@ -0,0 +1,163 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
v3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
)
|
||||
|
||||
// RemoteKV is a key/revision pair created by the client and stored on etcd
|
||||
type RemoteKV struct {
|
||||
kv v3.KV
|
||||
key string
|
||||
rev int64
|
||||
val string
|
||||
}
|
||||
|
||||
func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) {
|
||||
return newKV(kv, key, "", leaseID)
|
||||
}
|
||||
|
||||
func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
|
||||
rev, err := putNewKV(kv, key, val, leaseID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &RemoteKV{kv, key, rev, val}, nil
|
||||
}
|
||||
|
||||
func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
|
||||
for {
|
||||
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
|
||||
rev, err := putNewKV(kv, newKey, val, v3.NoLease)
|
||||
if err == nil {
|
||||
return &RemoteKV{kv, newKey, rev, val}, nil
|
||||
}
|
||||
if err != ErrKeyExists {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// putNewKV attempts to create the given key, only succeeding if the key did
|
||||
// not yet exist.
|
||||
func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
|
||||
cmp := v3.Compare(v3.Version(key), "=", 0)
|
||||
req := v3.OpPut(key, val, v3.WithLease(leaseID))
|
||||
txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !txnresp.Succeeded {
|
||||
return 0, ErrKeyExists
|
||||
}
|
||||
return txnresp.Header.Revision, nil
|
||||
}
|
||||
|
||||
// newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
|
||||
// prefix and value. Note: a bookkeeping node __<prefix> is also allocated.
|
||||
func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
|
||||
resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// add 1 to last key, if any
|
||||
newSeqNum := 0
|
||||
if len(resp.Kvs) != 0 {
|
||||
fields := strings.Split(string(resp.Kvs[0].Key), "/")
|
||||
_, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
|
||||
if serr != nil {
|
||||
return nil, serr
|
||||
}
|
||||
newSeqNum++
|
||||
}
|
||||
newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
|
||||
|
||||
// base prefix key must be current (i.e., <=) with the server update;
|
||||
// the base key is important to avoid the following:
|
||||
// N1: LastKey() == 1, start txn.
|
||||
// N2: new Key 2, new Key 3, Delete Key 2
|
||||
// N1: txn succeeds allocating key 2 when it shouldn't
|
||||
baseKey := "__" + prefix
|
||||
|
||||
// current revision might contain modification so +1
|
||||
cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1)
|
||||
reqPrefix := v3.OpPut(baseKey, "")
|
||||
reqnewKey := v3.OpPut(newKey, val)
|
||||
|
||||
txn := kv.Txn(context.TODO())
|
||||
txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !txnresp.Succeeded {
|
||||
return newSequentialKV(kv, prefix, val)
|
||||
}
|
||||
return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
|
||||
}
|
||||
|
||||
func (rk *RemoteKV) Key() string { return rk.key }
|
||||
func (rk *RemoteKV) Revision() int64 { return rk.rev }
|
||||
func (rk *RemoteKV) Value() string { return rk.val }
|
||||
|
||||
func (rk *RemoteKV) Delete() error {
|
||||
if rk.kv == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := rk.kv.Delete(context.TODO(), rk.key)
|
||||
rk.kv = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (rk *RemoteKV) Put(val string) error {
|
||||
_, err := rk.kv.Put(context.TODO(), rk.key, val)
|
||||
return err
|
||||
}
|
||||
|
||||
// EphemeralKV is a new key associated with a session lease
|
||||
type EphemeralKV struct{ RemoteKV }
|
||||
|
||||
// newEphemeralKV creates a new key/value pair associated with a session lease
|
||||
func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
|
||||
k, err := newKV(s.Client(), key, val, s.Lease())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &EphemeralKV{*k}, nil
|
||||
}
|
||||
|
||||
// newUniqueEphemeralKey creates a new unique valueless key associated with a session lease
|
||||
func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
|
||||
return newUniqueEphemeralKV(s, prefix, "")
|
||||
}
|
||||
|
||||
// newUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
|
||||
func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
|
||||
for {
|
||||
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
|
||||
ek, err = newEphemeralKV(s, newKey, val)
|
||||
if err == nil || err != ErrKeyExists {
|
||||
break
|
||||
}
|
||||
}
|
||||
return ek, err
|
||||
}
|
||||
80
client/v3/experimental/recipes/priority_queue.go
Normal file
80
client/v3/experimental/recipes/priority_queue.go
Normal file
@@ -0,0 +1,80 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
// PriorityQueue implements a multi-reader, multi-writer distributed queue.
|
||||
type PriorityQueue struct {
|
||||
client *v3.Client
|
||||
ctx context.Context
|
||||
key string
|
||||
}
|
||||
|
||||
// NewPriorityQueue creates an etcd priority queue.
|
||||
func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
|
||||
return &PriorityQueue{client, context.TODO(), key + "/"}
|
||||
}
|
||||
|
||||
// Enqueue puts a value into a queue with a given priority.
|
||||
func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
|
||||
prefix := fmt.Sprintf("%s%05d", q.key, pr)
|
||||
_, err := newSequentialKV(q.client, prefix, val)
|
||||
return err
|
||||
}
|
||||
|
||||
// Dequeue returns Enqueue()'d items in FIFO order. If the
|
||||
// queue is empty, Dequeue blocks until items are available.
|
||||
func (q *PriorityQueue) Dequeue() (string, error) {
|
||||
// TODO: fewer round trips by fetching more than one key
|
||||
resp, err := q.client.Get(q.ctx, q.key, v3.WithFirstKey()...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
kv, err := claimFirstKey(q.client, resp.Kvs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if kv != nil {
|
||||
return string(kv.Value), nil
|
||||
} else if resp.More {
|
||||
// missed some items, retry to read in more
|
||||
return q.Dequeue()
|
||||
}
|
||||
|
||||
// nothing to dequeue; wait on items
|
||||
ev, err := WaitPrefixEvents(
|
||||
q.client,
|
||||
q.key,
|
||||
resp.Header.Revision,
|
||||
[]mvccpb.Event_EventType{mvccpb.PUT})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if !ok {
|
||||
return q.Dequeue()
|
||||
}
|
||||
return string(ev.Kv.Value), err
|
||||
}
|
||||
77
client/v3/experimental/recipes/queue.go
Normal file
77
client/v3/experimental/recipes/queue.go
Normal file
@@ -0,0 +1,77 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
// Queue implements a multi-reader, multi-writer distributed queue.
|
||||
type Queue struct {
|
||||
client *v3.Client
|
||||
ctx context.Context
|
||||
|
||||
keyPrefix string
|
||||
}
|
||||
|
||||
func NewQueue(client *v3.Client, keyPrefix string) *Queue {
|
||||
return &Queue{client, context.TODO(), keyPrefix}
|
||||
}
|
||||
|
||||
func (q *Queue) Enqueue(val string) error {
|
||||
_, err := newUniqueKV(q.client, q.keyPrefix, val)
|
||||
return err
|
||||
}
|
||||
|
||||
// Dequeue returns Enqueue()'d elements in FIFO order. If the
|
||||
// queue is empty, Dequeue blocks until elements are available.
|
||||
func (q *Queue) Dequeue() (string, error) {
|
||||
// TODO: fewer round trips by fetching more than one key
|
||||
resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
kv, err := claimFirstKey(q.client, resp.Kvs)
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if kv != nil {
|
||||
return string(kv.Value), nil
|
||||
} else if resp.More {
|
||||
// missed some items, retry to read in more
|
||||
return q.Dequeue()
|
||||
}
|
||||
|
||||
// nothing yet; wait on elements
|
||||
ev, err := WaitPrefixEvents(
|
||||
q.client,
|
||||
q.keyPrefix,
|
||||
resp.Header.Revision,
|
||||
[]mvccpb.Event_EventType{mvccpb.PUT})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
|
||||
if err != nil {
|
||||
return "", err
|
||||
} else if !ok {
|
||||
return q.Dequeue()
|
||||
}
|
||||
return string(ev.Kv.Value), err
|
||||
}
|
||||
89
client/v3/experimental/recipes/rwmutex.go
Normal file
89
client/v3/experimental/recipes/rwmutex.go
Normal file
@@ -0,0 +1,89 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
v3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/client/v3/concurrency"
|
||||
)
|
||||
|
||||
type RWMutex struct {
|
||||
s *concurrency.Session
|
||||
ctx context.Context
|
||||
|
||||
pfx string
|
||||
myKey *EphemeralKV
|
||||
}
|
||||
|
||||
func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex {
|
||||
return &RWMutex{s, context.TODO(), prefix + "/", nil}
|
||||
}
|
||||
|
||||
func (rwm *RWMutex) RLock() error {
|
||||
rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"read")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rwm.myKey = rk
|
||||
// wait until nodes with "write-" and a lower revision number than myKey are gone
|
||||
for {
|
||||
if done, werr := rwm.waitOnLastRev(rwm.pfx + "write"); done || werr != nil {
|
||||
return werr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rwm *RWMutex) Lock() error {
|
||||
rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"write")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rwm.myKey = rk
|
||||
// wait until all keys of lower revision than myKey are gone
|
||||
for {
|
||||
if done, werr := rwm.waitOnLastRev(rwm.pfx); done || werr != nil {
|
||||
return werr
|
||||
}
|
||||
// get the new lowest key until this is the only one left
|
||||
}
|
||||
}
|
||||
|
||||
// waitOnLowest will wait on the last key with a revision < rwm.myKey.Revision with a
|
||||
// given prefix. If there are no keys left to wait on, return true.
|
||||
func (rwm *RWMutex) waitOnLastRev(pfx string) (bool, error) {
|
||||
client := rwm.s.Client()
|
||||
// get key that's blocking myKey
|
||||
opts := append(v3.WithLastRev(), v3.WithMaxModRev(rwm.myKey.Revision()-1))
|
||||
lastKey, err := client.Get(rwm.ctx, pfx, opts...)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(lastKey.Kvs) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
// wait for release on blocking key
|
||||
_, err = WaitEvents(
|
||||
client,
|
||||
string(lastKey.Kvs[0].Key),
|
||||
rwm.myKey.Revision(),
|
||||
[]mvccpb.Event_EventType{mvccpb.DELETE})
|
||||
return false, err
|
||||
}
|
||||
|
||||
func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }
|
||||
func (rwm *RWMutex) Unlock() error { return rwm.myKey.Delete() }
|
||||
58
client/v3/experimental/recipes/watch.go
Normal file
58
client/v3/experimental/recipes/watch.go
Normal file
@@ -0,0 +1,58 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package recipe
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
// WaitEvents waits on a key until it observes the given events and returns the final one.
|
||||
func WaitEvents(c *clientv3.Client, key string, rev int64, evs []mvccpb.Event_EventType) (*clientv3.Event, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
wc := c.Watch(ctx, key, clientv3.WithRev(rev))
|
||||
if wc == nil {
|
||||
return nil, ErrNoWatcher
|
||||
}
|
||||
return waitEvents(wc, evs), nil
|
||||
}
|
||||
|
||||
func WaitPrefixEvents(c *clientv3.Client, prefix string, rev int64, evs []mvccpb.Event_EventType) (*clientv3.Event, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
wc := c.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(rev))
|
||||
if wc == nil {
|
||||
return nil, ErrNoWatcher
|
||||
}
|
||||
return waitEvents(wc, evs), nil
|
||||
}
|
||||
|
||||
func waitEvents(wc clientv3.WatchChan, evs []mvccpb.Event_EventType) *clientv3.Event {
|
||||
i := 0
|
||||
for wresp := range wc {
|
||||
for _, ev := range wresp.Events {
|
||||
if ev.Type == evs[i] {
|
||||
i++
|
||||
if i == len(evs) {
|
||||
return ev
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user