From b07900ae036726bda5167863edf05d36fdf3a6b8 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 4 Jan 2016 11:13:43 -0800 Subject: [PATCH] contrib: v3 recipes Concurrency recipes using the V3 API (sans leases). --- contrib/recipes/barrier.go | 63 +++++++++ contrib/recipes/client.go | 87 ++++++++++++ contrib/recipes/key.go | 190 ++++++++++++++++++++++++++ contrib/recipes/mutex.go | 85 ++++++++++++ contrib/recipes/priority_queue.go | 77 +++++++++++ contrib/recipes/queue.go | 72 ++++++++++ contrib/recipes/range.go | 107 +++++++++++++++ contrib/recipes/rwmutex.go | 94 +++++++++++++ contrib/recipes/stm.go | 104 ++++++++++++++ contrib/recipes/watch.go | 147 ++++++++++++++++++++ integration/v3_barrier_test.go | 74 ++++++++++ integration/v3_lock_test.go | 136 +++++++++++++++++++ integration/v3_queue_test.go | 219 ++++++++++++++++++++++++++++++ integration/v3_stm_test.go | 138 +++++++++++++++++++ 14 files changed, 1593 insertions(+) create mode 100644 contrib/recipes/barrier.go create mode 100644 contrib/recipes/client.go create mode 100644 contrib/recipes/key.go create mode 100644 contrib/recipes/mutex.go create mode 100644 contrib/recipes/priority_queue.go create mode 100644 contrib/recipes/queue.go create mode 100644 contrib/recipes/range.go create mode 100644 contrib/recipes/rwmutex.go create mode 100644 contrib/recipes/stm.go create mode 100644 contrib/recipes/watch.go create mode 100644 integration/v3_barrier_test.go create mode 100644 integration/v3_lock_test.go create mode 100644 integration/v3_queue_test.go create mode 100644 integration/v3_stm_test.go diff --git a/contrib/recipes/barrier.go b/contrib/recipes/barrier.go new file mode 100644 index 000000000..ac731f300 --- /dev/null +++ b/contrib/recipes/barrier.go @@ -0,0 +1,63 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage/storagepb" +) + +// Barrier creates a key in etcd to block processes, then deletes the key to +// release all blocked processes. +type Barrier struct { + client *EtcdClient + key string +} + +func NewBarrier(client *EtcdClient, key string) *Barrier { + return &Barrier{client, key} +} + +// Hold creates the barrier key causing processes to block on Wait. +func (b *Barrier) Hold() error { + _, err := NewKey(b.client, b.key, 0) + return err +} + +// Release deletes the barrier key to unblock all waiting processes. +func (b *Barrier) Release() error { + _, err := b.client.KV.DeleteRange(context.TODO(), &pb.DeleteRangeRequest{Key: []byte(b.key)}) + return err +} + +// Wait blocks on the barrier key until it is deleted. If there is no key, Wait +// assumes Release has already been called and returns immediately. +func (b *Barrier) Wait() error { + resp, err := NewRange(b.client, b.key).FirstKey() + if err != nil { + return err + } + if len(resp.Kvs) == 0 { + // key already removed + return nil + } + _, err = WaitEvents( + b.client, + b.key, + resp.Header.Revision, + []storagepb.Event_EventType{storagepb.PUT, storagepb.DELETE}) + return err +} diff --git a/contrib/recipes/client.go b/contrib/recipes/client.go new file mode 100644 index 000000000..9a5613723 --- /dev/null +++ b/contrib/recipes/client.go @@ -0,0 +1,87 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "errors" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + spb "github.com/coreos/etcd/storage/storagepb" +) + +var ( + ErrKeyExists = errors.New("key already exists") + ErrWaitMismatch = errors.New("unexpected wait result") +) + +type EtcdClient struct { + conn *grpc.ClientConn + KV pb.KVClient + Lease pb.LeaseClient + Watch pb.WatchClient +} + +func NewEtcdClient(conn *grpc.ClientConn) *EtcdClient { + kv := pb.NewKVClient(conn) + lease := pb.NewLeaseClient(conn) + watch := pb.NewWatchClient(conn) + return &EtcdClient{conn, kv, lease, watch} +} + +// deleteRevKey deletes a key by revision, returning false if key is missing +func (ec *EtcdClient) deleteRevKey(key string, rev int64) (bool, error) { + cmp := &pb.Compare{ + Result: pb.Compare_EQUAL, + Target: pb.Compare_MOD, + Key: []byte(key), + ModRevision: rev} + req := &pb.RequestUnion{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}} + txnresp, err := ec.KV.Txn( + context.TODO(), + &pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil}) + if err != nil { + return false, err + } else if txnresp.Succeeded == false { + return false, nil + } + return true, nil +} + +func (ec *EtcdClient) claimFirstKey(kvs []*spb.KeyValue) (*spb.KeyValue, error) { + for _, kv := range kvs { + ok, err := ec.deleteRevKey(string(kv.Key), kv.ModRevision) + if err != nil { + return nil, err + } else if ok { + return kv, nil + } + } + return nil, nil +} + +func putEmptyKey(kv pb.KVClient, key string) (*pb.PutResponse, error) { + return kv.Put(context.TODO(), &pb.PutRequest{Key: []byte(key), Value: []byte{}}) +} + +// deletePrefix performs a RangeRequest to get keys on a given prefix +func deletePrefix(kv pb.KVClient, prefix string) (*pb.DeleteRangeResponse, error) { + return kv.DeleteRange( + context.TODO(), + &pb.DeleteRangeRequest{ + Key: []byte(prefix), + RangeEnd: []byte(prefixEnd(prefix))}) +} diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go new file mode 100644 index 000000000..f4d30ed71 --- /dev/null +++ b/contrib/recipes/key.go @@ -0,0 +1,190 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "fmt" + "strings" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/lease" +) + +// Key is a key/revision pair created by the client and stored on etcd +type RemoteKV struct { + client *EtcdClient + key string + rev int64 + val string +} + +func NewKey(client *EtcdClient, key string, leaseID lease.LeaseID) (*RemoteKV, error) { + return NewKV(client, key, "", leaseID) +} + +func NewKV(client *EtcdClient, key, val string, leaseID lease.LeaseID) (*RemoteKV, error) { + rev, err := putNewKV(client, key, val, leaseID) + if err != nil { + return nil, err + } + return &RemoteKV{client, key, rev, val}, nil +} + +func GetRemoteKV(client *EtcdClient, key string) (*RemoteKV, error) { + resp, err := client.KV.Range( + context.TODO(), + &pb.RangeRequest{Key: []byte(key)}, + ) + if err != nil { + return nil, err + } + rev := resp.Header.Revision + val := "" + if len(resp.Kvs) > 0 { + rev = resp.Kvs[0].ModRevision + val = string(resp.Kvs[0].Value) + } + return &RemoteKV{ + client: client, + key: key, + rev: rev, + val: val}, nil +} + +func NewUniqueKey(client *EtcdClient, prefix string) (*RemoteKV, error) { + return NewUniqueKV(client, prefix, "", 0) +} + +func NewUniqueKV(client *EtcdClient, prefix string, val string, leaseID lease.LeaseID) (*RemoteKV, error) { + for { + newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano()) + rev, err := putNewKV(client, newKey, val, 0) + if err == nil { + return &RemoteKV{client, newKey, rev, val}, nil + } + if err != ErrKeyExists { + return nil, err + } + } +} + +// putNewKV attempts to create the given key, only succeeding if the key did +// not yet exist. +func putNewKV(ec *EtcdClient, key, val string, leaseID lease.LeaseID) (int64, error) { + cmp := &pb.Compare{ + Result: pb.Compare_EQUAL, + Target: pb.Compare_VERSION, + Key: []byte(key)} + req := &pb.RequestUnion{ + RequestPut: &pb.PutRequest{ + Key: []byte(key), + Value: []byte(val), + Lease: int64(leaseID)}} + + txnresp, err := ec.KV.Txn( + context.TODO(), + &pb.TxnRequest{[]*pb.Compare{cmp}, []*pb.RequestUnion{req}, nil}) + if err != nil { + return 0, err + } + if txnresp.Succeeded == false { + return 0, ErrKeyExists + } + return txnresp.Header.Revision, nil +} + +// NewSequentialKV allocates a new sequential key-value pair at /nnnnn +func NewSequentialKV(client *EtcdClient, prefix, val string) (*RemoteKV, error) { + return newSequentialKV(client, prefix, val, 0) +} + +// newSequentialKV allocates a new sequential key /nnnnn with a given +// value and lease. Note: a bookkeeping node __ is also allocated. +func newSequentialKV(client *EtcdClient, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) { + resp, err := NewRange(client, prefix).LastKey() + if err != nil { + return nil, err + } + + // add 1 to last key, if any + newSeqNum := 0 + if len(resp.Kvs) != 0 { + fields := strings.Split(string(resp.Kvs[0].Key), "/") + _, err := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum) + if err != nil { + return nil, err + } + newSeqNum++ + } + newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum) + + // base prefix key must be current (i.e., <=) with the server update; + // the base key is important to avoid the following: + // N1: LastKey() == 1, start txn. + // N2: New Key 2, New Key 3, Delete Key 2 + // N1: txn succeeds allocating key 2 when it shouldn't + baseKey := []byte("__" + prefix) + cmp := &pb.Compare{ + Result: pb.Compare_LESS, + Target: pb.Compare_MOD, + Key: []byte(baseKey), + // current revision might contain modification so +1 + ModRevision: resp.Header.Revision + 1, + } + prPrefix := &pb.PutRequest{Key: baseKey, Lease: int64(leaseID)} + reqPrefix := &pb.RequestUnion{RequestPut: prPrefix} + + prNewKey := &pb.PutRequest{ + Key: []byte(newKey), + Value: []byte(val), + Lease: int64(leaseID), + } + reqNewKey := &pb.RequestUnion{RequestPut: prNewKey} + + txnresp, err := client.KV.Txn( + context.TODO(), + &pb.TxnRequest{ + []*pb.Compare{cmp}, + []*pb.RequestUnion{reqPrefix, reqNewKey}, nil}) + if err != nil { + return nil, err + } + if txnresp.Succeeded == false { + return newSequentialKV(client, prefix, val, leaseID) + } + return &RemoteKV{client, newKey, txnresp.Header.Revision, val}, nil +} + +func (rk *RemoteKV) Key() string { return rk.key } +func (rk *RemoteKV) Revision() int64 { return rk.rev } +func (rk *RemoteKV) Value() string { return rk.val } + +func (rk *RemoteKV) Delete() error { + if rk.client == nil { + return nil + } + req := &pb.DeleteRangeRequest{Key: []byte(rk.key)} + _, err := rk.client.KV.DeleteRange(context.TODO(), req) + rk.client = nil + return err +} + +func (rk *RemoteKV) Put(val string) error { + req := &pb.PutRequest{Key: []byte(rk.key), Value: []byte(val)} + _, err := rk.client.KV.Put(context.TODO(), req) + return err +} diff --git a/contrib/recipes/mutex.go b/contrib/recipes/mutex.go new file mode 100644 index 000000000..5d967898a --- /dev/null +++ b/contrib/recipes/mutex.go @@ -0,0 +1,85 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "sync" + + "github.com/coreos/etcd/storage/storagepb" +) + +// Mutex implements the sync Locker interface with etcd +type Mutex struct { + client *EtcdClient + key string + myKey *RemoteKV +} + +func NewMutex(client *EtcdClient, key string) *Mutex { + return &Mutex{client, key, nil} +} + +func (m *Mutex) Lock() (err error) { + // put self in lock waiters via myKey; oldest waiter holds lock + m.myKey, err = NewUniqueKey(m.client, m.key) + if err != nil { + return err + } + // find oldest element in waiters via revision of insertion + resp, err := NewRange(m.client, m.key).FirstRev() + if err != nil { + return err + } + // if myKey is oldest in waiters, then myKey holds the lock + if m.myKey.Revision() == resp.Kvs[0].CreateRevision { + return nil + } + // otherwise myKey isn't lowest, so there must be a key prior to myKey + lastKey, err := NewRangeRev(m.client, m.key, m.myKey.Revision()-1).LastRev() + if err != nil { + return err + } + // wait for release on prior key + _, err = WaitEvents( + m.client, + string(lastKey.Kvs[0].Key), + m.myKey.Revision()-1, + []storagepb.Event_EventType{storagepb.DELETE}) + // myKey now oldest + return err +} + +func (m *Mutex) Unlock() error { + err := m.myKey.Delete() + m.myKey = nil + return err +} + +type lockerMutex struct{ *Mutex } + +func (lm *lockerMutex) Lock() { + if err := lm.Mutex.Lock(); err != nil { + panic(err) + } +} +func (lm *lockerMutex) Unlock() { + if err := lm.Mutex.Unlock(); err != nil { + panic(err) + } +} + +func NewLocker(client *EtcdClient, key string) sync.Locker { + return &lockerMutex{NewMutex(client, key)} +} diff --git a/contrib/recipes/priority_queue.go b/contrib/recipes/priority_queue.go new file mode 100644 index 000000000..1d48f1b34 --- /dev/null +++ b/contrib/recipes/priority_queue.go @@ -0,0 +1,77 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "fmt" + + "github.com/coreos/etcd/storage/storagepb" +) + +// PriorityQueue implements a multi-reader, multi-writer distributed queue. +type PriorityQueue struct { + client *EtcdClient + key string +} + +// NewPriorityQueue creates an etcd priority queue. +func NewPriorityQueue(client *EtcdClient, key string) *PriorityQueue { + return &PriorityQueue{client, key + "/"} +} + +// Enqueue puts a value into a queue with a given priority. +func (q *PriorityQueue) Enqueue(val string, pr uint16) error { + prefix := fmt.Sprintf("%s%05d", q.key, pr) + _, err := NewSequentialKV(q.client, prefix, val) + return err +} + +// Dequeue returns Enqueued()'d items in FIFO order. If the +// queue is empty, Dequeue blocks until items are available. +func (q *PriorityQueue) Dequeue() (string, error) { + // TODO: fewer round trips by fetching more than one key + resp, err := NewRange(q.client, q.key).FirstKey() + if err != nil { + return "", err + } + + kv, err := q.client.claimFirstKey(resp.Kvs) + if err != nil { + return "", err + } else if kv != nil { + return string(kv.Value), nil + } else if resp.More { + // missed some items, retry to read in more + return q.Dequeue() + } + + // nothing to dequeue; wait on items + ev, err := WaitPrefixEvents( + q.client, + q.key, + resp.Header.Revision, + []storagepb.Event_EventType{storagepb.PUT}) + if err != nil { + return "", err + } + + ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision) + if err != nil { + return "", err + } else if !ok { + return q.Dequeue() + } + return string(ev.Kv.Value), err +} diff --git a/contrib/recipes/queue.go b/contrib/recipes/queue.go new file mode 100644 index 000000000..5fe3e4f18 --- /dev/null +++ b/contrib/recipes/queue.go @@ -0,0 +1,72 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "github.com/coreos/etcd/storage/storagepb" +) + +// Queue implements a multi-reader, multi-writer distributed queue. +type Queue struct { + client *EtcdClient + keyPrefix string +} + +func NewQueue(client *EtcdClient, keyPrefix string) *Queue { + return &Queue{client, keyPrefix} +} + +func (q *Queue) Enqueue(val string) error { + _, err := NewUniqueKV(q.client, q.keyPrefix, val, 0) + return err +} + +// Dequeue returns Enqueued()'d elements in FIFO order. If the +// queue is empty, Dequeue blocks until elements are available. +func (q *Queue) Dequeue() (string, error) { + // TODO: fewer round trips by fetching more than one key + resp, err := NewRange(q.client, q.keyPrefix).FirstRev() + if err != nil { + return "", err + } + + kv, err := q.client.claimFirstKey(resp.Kvs) + if err != nil { + return "", err + } else if kv != nil { + return string(kv.Value), nil + } else if resp.More { + // missed some items, retry to read in more + return q.Dequeue() + } + + // nothing yet; wait on elements + ev, err := WaitPrefixEvents( + q.client, + q.keyPrefix, + resp.Header.Revision, + []storagepb.Event_EventType{storagepb.PUT}) + if err != nil { + return "", err + } + + ok, err := q.client.deleteRevKey(string(ev.Kv.Key), ev.Kv.ModRevision) + if err != nil { + return "", err + } else if !ok { + return q.Dequeue() + } + return string(ev.Kv.Value), err +} diff --git a/contrib/recipes/range.go b/contrib/recipes/range.go new file mode 100644 index 000000000..1b4057cab --- /dev/null +++ b/contrib/recipes/range.go @@ -0,0 +1,107 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type Range struct { + kv pb.KVClient + key []byte + rev int64 + keyEnd []byte +} + +func NewRange(client *EtcdClient, key string) *Range { + return NewRangeRev(client, key, 0) +} + +func NewRangeRev(client *EtcdClient, key string, rev int64) *Range { + return &Range{client.KV, []byte(key), rev, prefixEnd(key)} +} + +// Prefix performs a RangeRequest to get keys matching * +func (r *Range) Prefix() (*pb.RangeResponse, error) { + return r.kv.Range( + context.TODO(), + &pb.RangeRequest{ + Key: prefixNext(string(r.key)), + RangeEnd: r.keyEnd, + Revision: r.rev}) +} + +// OpenInterval gets the keys in the set * - +func (r *Range) OpenInterval() (*pb.RangeResponse, error) { + return r.kv.Range( + context.TODO(), + &pb.RangeRequest{Key: r.key, RangeEnd: r.keyEnd, Revision: r.rev}) +} + +func (r *Range) FirstKey() (*pb.RangeResponse, error) { + return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_KEY) +} + +func (r *Range) LastKey() (*pb.RangeResponse, error) { + return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_KEY) +} + +func (r *Range) FirstRev() (*pb.RangeResponse, error) { + return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD) +} + +func (r *Range) LastRev() (*pb.RangeResponse, error) { + return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD) +} + +func (r *Range) FirstCreate() (*pb.RangeResponse, error) { + return r.topTarget(pb.RangeRequest_ASCEND, pb.RangeRequest_MOD) +} + +func (r *Range) LastCreate() (*pb.RangeResponse, error) { + return r.topTarget(pb.RangeRequest_DESCEND, pb.RangeRequest_MOD) +} + +// topTarget gets the first key for a given sort order and target +func (r *Range) topTarget(order pb.RangeRequest_SortOrder, target pb.RangeRequest_SortTarget) (*pb.RangeResponse, error) { + return r.kv.Range( + context.TODO(), + &pb.RangeRequest{ + Key: r.key, + RangeEnd: r.keyEnd, + Limit: 1, + Revision: r.rev, + SortOrder: order, + SortTarget: target}) +} + +// prefixNext returns the first key possibly matched by * - +func prefixNext(prefix string) []byte { + return append([]byte(prefix), 0) +} + +// prefixEnd returns the last key possibly matched by * +func prefixEnd(prefix string) []byte { + keyEnd := []byte(prefix) + for i := len(keyEnd) - 1; i >= 0; i-- { + if keyEnd[i] < 0xff { + keyEnd[i] = keyEnd[i] + 1 + keyEnd = keyEnd[:i+1] + break + } + } + return keyEnd +} diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go new file mode 100644 index 000000000..639e8de57 --- /dev/null +++ b/contrib/recipes/rwmutex.go @@ -0,0 +1,94 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "github.com/coreos/etcd/storage/storagepb" +) + +type RWMutex struct { + client *EtcdClient + key string + myKey *RemoteKV +} + +func NewRWMutex(client *EtcdClient, key string) *RWMutex { + return &RWMutex{client, key, nil} +} + +func (rwm *RWMutex) RLock() error { + // XXX: make reads ephemeral locks? + rk, err := NewUniqueKey(rwm.client, rwm.key+"/read") + if err != nil { + return err + } + rwm.myKey = rk + + // if there are nodes with "write-" and a lower + // revision number than us we must wait + resp, err := NewRange(rwm.client, rwm.key+"/write").FirstRev() + if err != nil { + return err + } + if len(resp.Kvs) == 0 || resp.Kvs[0].ModRevision > rk.Revision() { + // no blocking since no write key + return nil + } + return rwm.waitOnLowest() +} + +func (rwm *RWMutex) Lock() error { + rk, err := NewUniqueKey(rwm.client, rwm.key+"/write") + if err != nil { + return err + } + rwm.myKey = rk + + for { + // any key of lower rev number blocks the write lock + resp, err := NewRangeRev(rwm.client, rwm.key, rk.Revision()-1).LastRev() + if err != nil { + return err + } + if len(resp.Kvs) == 0 { + // no matching for revision before myKey; acquired + return nil + } + if err := rwm.waitOnLowest(); err != nil { + return err + } + // get the new lowest, etc until this is the only one left + } + + return nil +} + +func (rwm *RWMutex) waitOnLowest() error { + // must block; get key before ek for waiting + lastKey, err := NewRangeRev(rwm.client, rwm.key, rwm.myKey.Revision()-1).LastRev() + if err != nil { + return err + } + // wait for release on prior key + _, err = WaitEvents( + rwm.client, + string(lastKey.Kvs[0].Key), + rwm.myKey.Revision(), + []storagepb.Event_EventType{storagepb.DELETE}) + return err +} + +func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() } +func (rwm *RWMutex) Unlock() error { return rwm.myKey.Delete() } diff --git a/contrib/recipes/stm.go b/contrib/recipes/stm.go new file mode 100644 index 000000000..e4d2b47c8 --- /dev/null +++ b/contrib/recipes/stm.go @@ -0,0 +1,104 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// STM implements software transactional memory over etcd +type STM struct { + client *EtcdClient + // rset holds the read key's value and revision of read + rset map[string]*RemoteKV + // wset holds the write key and its value + wset map[string]string + // aborted is whether user aborted the txn + aborted bool + apply func(*STM) error +} + +// NewSTM creates new transaction loop for a given apply function. +func NewSTM(client *EtcdClient, apply func(*STM) error) <-chan error { + s := &STM{client: client, apply: apply} + errc := make(chan error, 1) + go func() { + var err error + for { + s.clear() + if err = apply(s); err != nil || s.aborted { + break + } + if ok, err := s.commit(); ok || err != nil { + break + } + } + errc <- err + }() + return errc +} + +// Abort abandons the apply loop, letting the transaction close without a commit. +func (s *STM) Abort() { s.aborted = true } + +// Get returns the value for a given key, inserting the key into the txn's readset. +func (s *STM) Get(key string) (string, error) { + if wv, ok := s.wset[key]; ok { + return wv, nil + } + if rk, ok := s.rset[key]; ok { + return rk.Value(), nil + } + rk, err := GetRemoteKV(s.client, key) + if err != nil { + return "", err + } + // TODO: setup watchers to abort txn early + s.rset[key] = rk + return rk.Value(), nil +} + +// Put adds a value for a key to the write set. +func (s *STM) Put(key string, val string) { s.wset[key] = val } + +// commit attempts to apply the txn's changes to the server. +func (s *STM) commit() (ok bool, err error) { + // read set must not change + cmps := []*pb.Compare{} + for k, rk := range s.rset { + // use < to support updating keys that don't exist yet + cmp := &pb.Compare{ + Result: pb.Compare_LESS, + Target: pb.Compare_MOD, + Key: []byte(k), + ModRevision: rk.Revision() + 1, + } + cmps = append(cmps, cmp) + } + // apply all writes + puts := []*pb.RequestUnion{} + for k, v := range s.wset { + put := &pb.PutRequest{Key: []byte(k), Value: []byte(v)} + puts = append(puts, &pb.RequestUnion{RequestPut: put}) + } + txnresp, err := s.client.KV.Txn(context.TODO(), &pb.TxnRequest{cmps, puts, nil}) + return txnresp.Succeeded, err +} + +func (s *STM) clear() { + s.rset = make(map[string]*RemoteKV) + s.wset = make(map[string]string) +} diff --git a/contrib/recipes/watch.go b/contrib/recipes/watch.go new file mode 100644 index 000000000..64fbc7f62 --- /dev/null +++ b/contrib/recipes/watch.go @@ -0,0 +1,147 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package recipe + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage" + "github.com/coreos/etcd/storage/storagepb" +) + +type Watcher struct { + wstream pb.Watch_WatchClient + donec chan struct{} + id storage.WatchID + recvc chan *storagepb.Event + lastErr error +} + +func NewWatcher(c *EtcdClient, key string, rev int64) (*Watcher, error) { + return newWatcher(c, key, rev, false) +} + +func NewPrefixWatcher(c *EtcdClient, prefix string, rev int64) (*Watcher, error) { + return newWatcher(c, prefix, rev, true) +} + +func newWatcher(c *EtcdClient, key string, rev int64, isPrefix bool) (*Watcher, error) { + w, err := c.Watch.Watch(context.Background()) + if err != nil { + return nil, err + } + + req := &pb.WatchCreateRequest{StartRevision: rev} + if isPrefix { + req.Prefix = []byte(key) + } else { + req.Key = []byte(key) + } + + if err := w.Send(&pb.WatchRequest{CreateRequest: req}); err != nil { + return nil, err + } + + wresp, err := w.Recv() + if err != nil { + return nil, err + } + if len(wresp.Events) != 0 || wresp.Created != true { + return nil, ErrWaitMismatch + } + ret := &Watcher{ + wstream: w, + donec: make(chan struct{}), + id: storage.WatchID(wresp.WatchId), + recvc: make(chan *storagepb.Event), + } + go ret.recvLoop() + return ret, nil +} + +func (w *Watcher) Close() error { + if w.wstream == nil { + return w.lastErr + } + req := &pb.WatchCancelRequest{WatchId: int64(w.id)} + err := w.wstream.Send(&pb.WatchRequest{CancelRequest: req}) + if err != nil && w.lastErr == nil { + return err + } + w.wstream.CloseSend() + w.donec <- struct{}{} + <-w.donec + w.wstream = nil + return w.lastErr +} + +func (w *Watcher) Chan() <-chan *storagepb.Event { return w.recvc } + +func (w *Watcher) recvLoop() { + defer close(w.donec) + for { + wresp, err := w.wstream.Recv() + if err != nil { + w.lastErr = err + break + } + for i := range wresp.Events { + select { + case <-w.donec: + close(w.recvc) + return + case w.recvc <- wresp.Events[i]: + } + } + } + close(w.recvc) + <-w.donec +} + +func (w *Watcher) waitEvents(evs []storagepb.Event_EventType) (*storagepb.Event, error) { + i := 0 + for { + ev, ok := <-w.recvc + if !ok { + break + } + if ev.Type == evs[i] { + i++ + if i == len(evs) { + return ev, nil + } + } + } + return nil, w.Close() +} + +// WaitEvents waits on a key until it observes the given events and returns the final one. +func WaitEvents(c *EtcdClient, key string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { + w, err := NewWatcher(c, key, rev) + if err != nil { + return nil, err + } + defer w.Close() + return w.waitEvents(evs) +} + +func WaitPrefixEvents(c *EtcdClient, prefix string, rev int64, evs []storagepb.Event_EventType) (*storagepb.Event, error) { + w, err := NewPrefixWatcher(c, prefix, rev) + if err != nil { + return nil, err + } + defer w.Close() + return w.waitEvents(evs) +} diff --git a/integration/v3_barrier_test.go b/integration/v3_barrier_test.go new file mode 100644 index 000000000..4db4e3ecd --- /dev/null +++ b/integration/v3_barrier_test.go @@ -0,0 +1,74 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package recipe +package integration + +import ( + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/contrib/recipes" +) + +func TestBarrierSingleNode(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + testBarrier(t, 5, func() *grpc.ClientConn { return clus.conns[0] }) +} + +func TestBarrierMultiNode(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + testBarrier(t, 5, func() *grpc.ClientConn { return clus.RandConn() }) +} + +func testBarrier(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) { + b := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier") + if err := b.Hold(); err != nil { + t.Fatalf("could not hold barrier (%v)", err) + } + if err := b.Hold(); err == nil { + t.Fatalf("able to double-hold barrier") + } + + donec := make(chan struct{}) + for i := 0; i < waiters; i++ { + go func() { + b := recipe.NewBarrier(recipe.NewEtcdClient(chooseConn()), "test-barrier") + if err := b.Wait(); err != nil { + t.Fatalf("could not wait on barrier (%v)", err) + } + donec <- struct{}{} + }() + } + + select { + case <-donec: + t.Fatalf("barrier did not wait") + default: + } + + if err := b.Release(); err != nil { + t.Fatalf("could not release barrier (%v)", err) + } + + timerC := time.After(time.Duration(waiters*100) * time.Millisecond) + for i := 0; i < waiters; i++ { + select { + case <-timerC: + t.Fatalf("barrier timed out") + case <-donec: + } + } +} diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go new file mode 100644 index 000000000..a1799dec7 --- /dev/null +++ b/integration/v3_lock_test.go @@ -0,0 +1,136 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package integration + +import ( + "math/rand" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + "github.com/coreos/etcd/contrib/recipes" +) + +func TestMutexSingleNode(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + testMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] }) +} + +func TestMutexMultiNode(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + testMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() }) +} + +func testMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) { + // stream lock acquistions + lockedC := make(chan *recipe.Mutex, 1) + for i := 0; i < waiters; i++ { + go func() { + m := recipe.NewMutex(recipe.NewEtcdClient(chooseConn()), "test-mutex") + if err := m.Lock(); err != nil { + t.Fatalf("could not wait on lock (%v)", err) + } + lockedC <- m + }() + } + // unlock locked mutexes + timerC := time.After(time.Duration(waiters) * time.Second) + for i := 0; i < waiters; i++ { + select { + case <-timerC: + t.Fatalf("timed out waiting for lock %d", i) + case m := <-lockedC: + // lock acquired with m + select { + case <-lockedC: + t.Fatalf("lock %d followers did not wait", i) + default: + } + if err := m.Unlock(); err != nil { + t.Fatalf("could not release lock (%v)", err) + } + } + } +} + +func BenchmarkMutex4Waiters(b *testing.B) { + // XXX switch tests to use TB interface + clus := newClusterGRPC(nil, &clusterConfig{size: 3}) + defer clus.Terminate(nil) + for i := 0; i < b.N; i++ { + testMutex(nil, 4, func() *grpc.ClientConn { return clus.RandConn() }) + } +} + +func TestRWMutexSingleNode(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + testRWMutex(t, 5, func() *grpc.ClientConn { return clus.conns[0] }) +} + +func TestRWMutexMultiNode(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + testRWMutex(t, 5, func() *grpc.ClientConn { return clus.RandConn() }) +} + +func testRWMutex(t *testing.T, waiters int, chooseConn func() *grpc.ClientConn) { + // stream rwlock acquistions + rlockedC := make(chan *recipe.RWMutex, 1) + wlockedC := make(chan *recipe.RWMutex, 1) + for i := 0; i < waiters; i++ { + go func() { + rwm := recipe.NewRWMutex(recipe.NewEtcdClient(chooseConn()), "test-rwmutex") + if rand.Intn(1) == 0 { + if err := rwm.RLock(); err != nil { + t.Fatalf("could not rlock (%v)", err) + } + rlockedC <- rwm + } else { + if err := rwm.Lock(); err != nil { + t.Fatalf("could not lock (%v)", err) + } + wlockedC <- rwm + } + }() + } + // unlock locked rwmutexes + timerC := time.After(time.Duration(waiters) * time.Second) + for i := 0; i < waiters; i++ { + select { + case <-timerC: + t.Fatalf("timed out waiting for lock %d", i) + case wl := <-wlockedC: + select { + case <-rlockedC: + t.Fatalf("rlock %d readers did not wait", i) + default: + } + if err := wl.Unlock(); err != nil { + t.Fatalf("could not release lock (%v)", err) + } + case rl := <-rlockedC: + select { + case <-wlockedC: + t.Fatalf("rlock %d writers did not wait", i) + default: + } + if err := rl.RUnlock(); err != nil { + t.Fatalf("could not release rlock (%v)", err) + } + } + } +} diff --git a/integration/v3_queue_test.go b/integration/v3_queue_test.go new file mode 100644 index 000000000..4b745adbd --- /dev/null +++ b/integration/v3_queue_test.go @@ -0,0 +1,219 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package recipe +package integration + +import ( + "fmt" + "math/rand" + "sync/atomic" + "testing" + + "github.com/coreos/etcd/contrib/recipes" +) + +const ( + manyQueueClients = 3 + queueItemsPerClient = 2 +) + +// TestQueueOneReaderOneWriter confirms the queue is FIFO +func TestQueueOneReaderOneWriter(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 1}) + defer clus.Terminate(t) + + go func() { + etcdc := recipe.NewEtcdClient(clus.RandConn()) + q := recipe.NewQueue(etcdc, "testq") + for i := 0; i < 5; i++ { + if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil { + t.Fatalf("error enqueuing (%v)", err) + } + } + }() + + etcdc := recipe.NewEtcdClient(clus.RandConn()) + q := recipe.NewQueue(etcdc, "testq") + for i := 0; i < 5; i++ { + s, err := q.Dequeue() + if err != nil { + t.Fatalf("error dequeueing (%v)", err) + } + if s != fmt.Sprintf("%d", i) { + t.Fatalf("expected dequeue value %v, got %v", s, i) + } + } +} + +func TestQueueManyReaderOneWriter(t *testing.T) { + testQueueNReaderMWriter(t, manyQueueClients, 1) +} + +func TestQueueOneReaderManyWriter(t *testing.T) { + testQueueNReaderMWriter(t, 1, manyQueueClients) +} + +func TestQueueManyReaderManyWriter(t *testing.T) { + testQueueNReaderMWriter(t, manyQueueClients, manyQueueClients) +} + +// BenchmarkQueue benchmarks Queues using many/many readers/writers +func BenchmarkQueue(b *testing.B) { + // XXX switch tests to use TB interface + clus := newClusterGRPC(nil, &clusterConfig{size: 3}) + defer clus.Terminate(nil) + for i := 0; i < b.N; i++ { + testQueueNReaderMWriter(nil, manyQueueClients, manyQueueClients) + } +} + +// TestPrQueue tests whether priority queues respect priorities. +func TestPrQueueOneReaderOneWriter(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 1}) + defer clus.Terminate(t) + + // write out five items with random priority + etcdc := recipe.NewEtcdClient(clus.RandConn()) + q := recipe.NewPriorityQueue(etcdc, "testprq") + for i := 0; i < 5; i++ { + // [0, 2] priority for priority collision to test seq keys + pr := uint16(rand.Intn(3)) + if err := q.Enqueue(fmt.Sprintf("%d", pr), pr); err != nil { + t.Fatalf("error enqueuing (%v)", err) + } + } + + // read back items; confirm priority order is respected + lastPr := -1 + for i := 0; i < 5; i++ { + s, err := q.Dequeue() + if err != nil { + t.Fatalf("error dequeueing (%v)", err) + } + curPr := 0 + if _, err := fmt.Sscanf(s, "%d", &curPr); err != nil { + t.Fatalf(`error parsing item "%s" (%v)`, s, err) + } + if lastPr > curPr { + t.Fatalf("expected priority %v > %v", curPr, lastPr) + } + } +} + +func TestPrQueueManyReaderManyWriter(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + rqs := newPriorityQueues(clus, manyQueueClients) + wqs := newPriorityQueues(clus, manyQueueClients) + testReadersWriters(t, rqs, wqs) +} + +// BenchmarkQueue benchmarks Queues using n/n readers/writers +func BenchmarkPrQueueOneReaderOneWriter(b *testing.B) { + // XXX switch tests to use TB interface + clus := newClusterGRPC(nil, &clusterConfig{size: 3}) + defer clus.Terminate(nil) + rqs := newPriorityQueues(clus, 1) + wqs := newPriorityQueues(clus, 1) + for i := 0; i < b.N; i++ { + testReadersWriters(nil, rqs, wqs) + } +} + +func testQueueNReaderMWriter(t *testing.T, n int, m int) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + testReadersWriters(t, newQueues(clus, n), newQueues(clus, m)) +} + +func newQueues(clus *clusterV3, n int) (qs []testQueue) { + for i := 0; i < n; i++ { + etcdc := recipe.NewEtcdClient(clus.RandConn()) + qs = append(qs, recipe.NewQueue(etcdc, "q")) + } + return qs +} + +func newPriorityQueues(clus *clusterV3, n int) (qs []testQueue) { + for i := 0; i < n; i++ { + etcdc := recipe.NewEtcdClient(clus.RandConn()) + q := &flatPriorityQueue{recipe.NewPriorityQueue(etcdc, "prq")} + qs = append(qs, q) + } + return qs +} + +func testReadersWriters(t *testing.T, rqs []testQueue, wqs []testQueue) { + rerrc := make(chan error) + werrc := make(chan error) + manyWriters(wqs, queueItemsPerClient, werrc) + manyReaders(rqs, len(wqs)*queueItemsPerClient, rerrc) + for range wqs { + if err := <-werrc; err != nil { + t.Errorf("error writing (%v)", err) + } + } + for range rqs { + if err := <-rerrc; err != nil { + t.Errorf("error reading (%v)", err) + } + } +} + +func manyReaders(qs []testQueue, totalReads int, errc chan<- error) { + var rxReads int32 + for _, q := range qs { + go func(q testQueue) { + for { + total := atomic.AddInt32(&rxReads, 1) + if int(total) > totalReads { + break + } + if _, err := q.Dequeue(); err != nil { + errc <- err + return + } + } + errc <- nil + }(q) + } +} + +func manyWriters(qs []testQueue, writesEach int, errc chan<- error) { + for _, q := range qs { + go func(q testQueue) { + for j := 0; j < writesEach; j++ { + if err := q.Enqueue("foo"); err != nil { + errc <- err + return + } + } + errc <- nil + }(q) + } +} + +type testQueue interface { + Enqueue(val string) error + Dequeue() (string, error) +} + +type flatPriorityQueue struct{ *recipe.PriorityQueue } + +func (q *flatPriorityQueue) Enqueue(val string) error { + // randomized to stress dequeuing logic; order isn't important + return q.PriorityQueue.Enqueue(val, uint16(rand.Intn(2))) +} +func (q *flatPriorityQueue) Dequeue() (string, error) { + return q.PriorityQueue.Dequeue() +} diff --git a/integration/v3_stm_test.go b/integration/v3_stm_test.go new file mode 100644 index 000000000..6a19144ae --- /dev/null +++ b/integration/v3_stm_test.go @@ -0,0 +1,138 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package recipe +package integration + +import ( + "fmt" + "math/rand" + "strconv" + "testing" + + "github.com/coreos/etcd/contrib/recipes" +) + +// TestSTMConflict tests that conflicts are retried. +func TestSTMConflict(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + + etcdc := recipe.NewEtcdClient(clus.RandConn()) + keys := make([]*recipe.RemoteKV, 5) + for i := 0; i < len(keys); i++ { + rk, err := recipe.NewKV(etcdc, fmt.Sprintf("foo-%d", i), "100", 0) + if err != nil { + t.Fatalf("could not make key (%v)", err) + } + keys[i] = rk + } + + errc := make([]<-chan error, len(keys)) + for i, rk := range keys { + curEtcdc := recipe.NewEtcdClient(clus.RandConn()) + srcKey := rk.Key() + applyf := func(stm *recipe.STM) error { + src, err := stm.Get(srcKey) + if err != nil { + return err + } + // must be different key to avoid double-adding + dstKey := srcKey + for dstKey == srcKey { + dstKey = keys[rand.Intn(len(keys))].Key() + } + dst, err := stm.Get(dstKey) + if err != nil { + return err + } + srcV, _ := strconv.ParseInt(src, 10, 64) + dstV, _ := strconv.ParseInt(dst, 10, 64) + xfer := int64(rand.Intn(int(srcV)) / 2) + stm.Put(srcKey, fmt.Sprintf("%d", srcV-xfer)) + stm.Put(dstKey, fmt.Sprintf("%d", dstV+xfer)) + return nil + } + errc[i] = recipe.NewSTM(curEtcdc, applyf) + } + + // wait for txns + for _, ch := range errc { + if err := <-ch; err != nil { + t.Fatalf("apply failed (%v)", err) + } + } + + // ensure sum matches initial sum + sum := 0 + for _, oldRK := range keys { + rk, err := recipe.GetRemoteKV(etcdc, oldRK.Key()) + if err != nil { + t.Fatalf("couldn't fetch key %s (%v)", oldRK.Key(), err) + } + v, _ := strconv.ParseInt(rk.Value(), 10, 64) + sum += int(v) + } + if sum != len(keys)*100 { + t.Fatalf("bad sum. got %d, expected %d", sum, len(keys)*100) + } +} + +// TestSTMPut confirms a STM put on a new key is visible after commit. +func TestSTMPutNewKey(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 1}) + defer clus.Terminate(t) + + etcdc := recipe.NewEtcdClient(clus.RandConn()) + applyf := func(stm *recipe.STM) error { + stm.Put("foo", "bar") + return nil + } + errc := recipe.NewSTM(etcdc, applyf) + if err := <-errc; err != nil { + t.Fatalf("error on stm txn (%v)", err) + } + + rk, err := recipe.GetRemoteKV(etcdc, "foo") + if err != nil { + t.Fatalf("error fetching key (%v)", err) + } + if rk.Value() != "bar" { + t.Fatalf("bad value. got %v, expected bar", rk.Value()) + } +} + +// TestSTMAbort tests that an aborted txn does not modify any keys. +func TestSTMAbort(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 1}) + defer clus.Terminate(t) + + etcdc := recipe.NewEtcdClient(clus.RandConn()) + applyf := func(stm *recipe.STM) error { + stm.Put("foo", "baz") + stm.Abort() + stm.Put("foo", "baz") + return nil + } + errc := recipe.NewSTM(etcdc, applyf) + if err := <-errc; err != nil { + t.Fatalf("error on stm txn (%v)", err) + } + + rk, err := recipe.GetRemoteKV(etcdc, "foo") + if err != nil { + t.Fatalf("error fetching key (%v)", err) + } + if rk.Value() != "" { + t.Fatalf("bad value. got %v, expected empty string", rk.Value()) + } +}