From 9f569842f0f666c85f81838b99ea415336c81b81 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 23 Feb 2016 17:45:43 -0800 Subject: [PATCH 1/4] clientv3: move syncer to mirror package to be in line with sync meaning process synchronization, not data synchronization --- clientv3/{sync => mirror}/syncer.go | 2 +- etcdctlv3/command/make_mirror_command.go | 4 ++-- etcdctlv3/command/snapshot_command.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) rename clientv3/{sync => mirror}/syncer.go (99%) diff --git a/clientv3/sync/syncer.go b/clientv3/mirror/syncer.go similarity index 99% rename from clientv3/sync/syncer.go rename to clientv3/mirror/syncer.go index 06c4007fd..f9faaed3d 100644 --- a/clientv3/sync/syncer.go +++ b/clientv3/mirror/syncer.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sync +package mirror import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" diff --git a/etcdctlv3/command/make_mirror_command.go b/etcdctlv3/command/make_mirror_command.go index 53294689c..abef26ecf 100644 --- a/etcdctlv3/command/make_mirror_command.go +++ b/etcdctlv3/command/make_mirror_command.go @@ -23,7 +23,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/sync" + "github.com/coreos/etcd/clientv3/mirror" "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/storage/storagepb" ) @@ -77,7 +77,7 @@ func makeMirror(ctx context.Context, c *clientv3.Client, dc *clientv3.Client) er // TODO: remove the prefix of the destination cluster? dkv := clientv3.NewKV(dc) - s := sync.NewSyncer(c, mmprefix, 0) + s := mirror.NewSyncer(c, mmprefix, 0) rc, errc := s.SyncBase(ctx) diff --git a/etcdctlv3/command/snapshot_command.go b/etcdctlv3/command/snapshot_command.go index 3746630e3..0dcc868cb 100644 --- a/etcdctlv3/command/snapshot_command.go +++ b/etcdctlv3/command/snapshot_command.go @@ -22,7 +22,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/sync" + "github.com/coreos/etcd/clientv3/mirror" "github.com/coreos/etcd/etcdserver/api/v3rpc" ) @@ -90,7 +90,7 @@ func snapshotToFile(c *clientv3.Client, path string) { // snapshot reads all of a watcher; returns compaction revision if incomplete // TODO: stabilize snapshot format func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 { - s := sync.NewSyncer(c, "", rev) + s := mirror.NewSyncer(c, "", rev) rc, errc := s.SyncBase(context.TODO()) From 20b4336cdb8b7e2911ac541b0ac794cccdc8f44d Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 23 Feb 2016 18:57:41 -0800 Subject: [PATCH 2/4] clientv3/concurrency: Session A client may bind itself to a session lease to signal its continued in participation with the cluster. --- clientv3/concurrency/session.go | 106 ++++++++++++++++++++++++ contrib/recipes/key.go | 5 +- contrib/recipes/lease.go | 113 -------------------------- integration/v3_double_barrier_test.go | 18 ++-- integration/v3_election_test.go | 11 ++- 5 files changed, 130 insertions(+), 123 deletions(-) create mode 100644 clientv3/concurrency/session.go delete mode 100644 contrib/recipes/lease.go diff --git a/clientv3/concurrency/session.go b/clientv3/concurrency/session.go new file mode 100644 index 000000000..739733547 --- /dev/null +++ b/clientv3/concurrency/session.go @@ -0,0 +1,106 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package concurrency + +import ( + "sync" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/lease" +) + +// only keep one ephemeral lease per client +var clientSessions clientSessionMgr = clientSessionMgr{sessions: make(map[*v3.Client]*Session)} + +const sessionTTL = 60 + +type clientSessionMgr struct { + sessions map[*v3.Client]*Session + mu sync.Mutex +} + +// Session represents a lease kept alive for the lifetime of a client. +// Fault-tolerant applications may use sessions to reason about liveness. +type Session struct { + client *v3.Client + id lease.LeaseID + + cancel context.CancelFunc + donec <-chan struct{} +} + +// NewSession gets the leased session for a client. +func NewSession(client *v3.Client) (*Session, error) { + clientSessions.mu.Lock() + defer clientSessions.mu.Unlock() + if s, ok := clientSessions.sessions[client]; ok { + return s, nil + } + + lc := v3.NewLease(client) + resp, err := lc.Create(context.TODO(), sessionTTL) + if err != nil { + return nil, err + } + id := lease.LeaseID(resp.ID) + + ctx, cancel := context.WithCancel(context.Background()) + keepAlive, err := lc.KeepAlive(ctx, id) + if err != nil || keepAlive == nil { + return nil, err + } + + donec := make(chan struct{}) + s := &Session{client: client, id: id, cancel: cancel, donec: donec} + clientSessions.sessions[client] = s + + // keep the lease alive until client error or cancelled context + go func() { + defer func() { + clientSessions.mu.Lock() + delete(clientSessions.sessions, client) + clientSessions.mu.Unlock() + lc.Close() + close(donec) + }() + for range keepAlive { + // eat messages until keep alive channel closes + } + }() + + return s, nil +} + +// Lease is the lease ID for keys bound to the session. +func (s *Session) Lease() lease.LeaseID { return s.id } + +// Done returns a channel that closes when the lease is orphaned, expires, or +// is otherwise no longer being refreshed. +func (s *Session) Done() <-chan struct{} { return s.donec } + +// Orphan ends the refresh for the session lease. This is useful +// in case the state of the client connection is indeterminate (revoke +// would fail) or when transferring lease ownership. +func (s *Session) Orphan() { + s.cancel() + <-s.donec +} + +// Close orphans the session and revokes the session lease. +func (s *Session) Close() error { + s.Orphan() + _, err := v3.NewLease(s.client).Revoke(context.TODO(), s.id) + return err +} diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index 5c086213f..e88357611 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/lease" ) @@ -161,11 +162,11 @@ type EphemeralKV struct{ RemoteKV } // NewEphemeralKV creates a new key/value pair associated with a session lease func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) { - leaseID, err := SessionLease(client) + s, err := concurrency.NewSession(client) if err != nil { return nil, err } - k, err := NewKV(v3.NewKV(client), key, val, leaseID) + k, err := NewKV(v3.NewKV(client), key, val, s.Lease()) if err != nil { return nil, err } diff --git a/contrib/recipes/lease.go b/contrib/recipes/lease.go deleted file mode 100644 index 327d12fb7..000000000 --- a/contrib/recipes/lease.go +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2016 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package recipe - -import ( - "sync" - - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/lease" -) - -// only keep one ephemeral lease per client -var clientLeases clientLeaseMgr = clientLeaseMgr{leases: make(map[*clientv3.Client]*leaseKeepAlive)} - -type clientLeaseMgr struct { - leases map[*clientv3.Client]*leaseKeepAlive - mu sync.Mutex -} - -type leaseKeepAlive struct { - id lease.LeaseID - cancel context.CancelFunc - donec <-chan struct{} -} - -func SessionLease(client *clientv3.Client) (lease.LeaseID, error) { - return clientLeases.sessionLease(client, 120) -} - -func SessionLeaseTTL(client *clientv3.Client, ttl int64) (lease.LeaseID, error) { - return clientLeases.sessionLease(client, ttl) -} - -// StopSessionLease ends the refresh for the session lease. This is useful -// in case the state of the client connection is indeterminate (revoke -// would fail) or if transferring lease ownership. -func StopSessionLease(client *clientv3.Client) { - clientLeases.mu.Lock() - lka := clientLeases.leases[client] - clientLeases.mu.Unlock() - if lka != nil { - lka.cancel() - <-lka.donec - } -} - -// RevokeSessionLease revokes the session lease. -func RevokeSessionLease(client *clientv3.Client) (err error) { - clientLeases.mu.Lock() - lka := clientLeases.leases[client] - clientLeases.mu.Unlock() - StopSessionLease(client) - if lka != nil { - _, err = clientv3.NewLease(client).Revoke(context.TODO(), lka.id) - } - return err -} - -func (clm *clientLeaseMgr) sessionLease(client *clientv3.Client, ttl int64) (lease.LeaseID, error) { - clm.mu.Lock() - defer clm.mu.Unlock() - if lka, ok := clm.leases[client]; ok { - return lka.id, nil - } - - lc := clientv3.NewLease(client) - resp, err := lc.Create(context.TODO(), ttl) - if err != nil { - return lease.NoLease, err - } - id := lease.LeaseID(resp.ID) - - ctx, cancel := context.WithCancel(context.Background()) - keepAlive, err := lc.KeepAlive(ctx, id) - if err != nil || keepAlive == nil { - return lease.NoLease, err - } - - donec := make(chan struct{}) - lka := &leaseKeepAlive{ - id: id, - cancel: cancel, - donec: donec} - clm.leases[client] = lka - - // keep the lease alive until client error or cancelled context - go func() { - defer func() { - clm.mu.Lock() - delete(clm.leases, client) - clm.mu.Unlock() - lc.Close() - close(donec) - }() - for range keepAlive { - // eat messages until keep alive channel closes - } - }() - - return id, nil -} diff --git a/integration/v3_double_barrier_test.go b/integration/v3_double_barrier_test.go index 72a2544ee..e7206dc8c 100644 --- a/integration/v3_double_barrier_test.go +++ b/integration/v3_double_barrier_test.go @@ -17,13 +17,14 @@ import ( "testing" "time" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/contrib/recipes" ) func TestDoubleBarrier(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer closeSessionLease(clus) + defer dropSessionLease(clus) waiters := 10 @@ -84,7 +85,7 @@ func TestDoubleBarrier(t *testing.T) { func TestDoubleBarrierFailover(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer closeSessionLease(clus) + defer dropSessionLease(clus) waiters := 10 donec := make(chan struct{}) @@ -119,7 +120,13 @@ func TestDoubleBarrierFailover(t *testing.T) { } } // kill lease, expect Leave unblock - recipe.RevokeSessionLease(clus.clients[0]) + s, err := concurrency.NewSession(clus.clients[0]) + if err != nil { + t.Fatal(err) + } + if err = s.Close(); err != nil { + t.Fatal(err) + } // join on rest of waiters for i := 0; i < waiters-1; i++ { select { @@ -130,8 +137,9 @@ func TestDoubleBarrierFailover(t *testing.T) { } } -func closeSessionLease(clus *ClusterV3) { +func dropSessionLease(clus *ClusterV3) { for _, client := range clus.clients { - recipe.StopSessionLease(client) + s, _ := concurrency.NewSession(client) + s.Orphan() } } diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index e2619d678..138e32a38 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/contrib/recipes" ) @@ -25,7 +26,7 @@ import ( func TestElectionWait(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer closeSessionLease(clus) + defer dropSessionLease(clus) leaders := 3 followers := 3 @@ -88,7 +89,7 @@ func TestElectionWait(t *testing.T) { func TestElectionFailover(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - defer closeSessionLease(clus) + defer dropSessionLease(clus) // first leader (elected) e := recipe.NewElection(clus.clients[0], "test-election") @@ -116,7 +117,11 @@ func TestElectionFailover(t *testing.T) { }() // invoke leader failover - err = recipe.RevokeSessionLease(clus.clients[0]) + session, serr := concurrency.NewSession(clus.clients[0]) + if serr != nil { + t.Fatal(serr) + } + err = session.Close() if err != nil { t.Fatal(err) } From d4b2044eb19b8d74b786c70a54bd0c0aed578dbe Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 23 Feb 2016 21:03:22 -0800 Subject: [PATCH 3/4] clientv3/concurrency: Mutex --- clientv3/concurrency/key.go | 57 +++++++++++++++ clientv3/concurrency/mutex.go | 111 ++++++++++++++++++++++++++++++ clientv3/op.go | 23 +++++++ contrib/recipes/barrier.go | 2 +- contrib/recipes/election.go | 6 +- contrib/recipes/key.go | 2 +- contrib/recipes/mutex.go | 91 ------------------------ contrib/recipes/priority_queue.go | 2 +- contrib/recipes/queue.go | 2 +- contrib/recipes/range.go | 34 --------- contrib/recipes/rwmutex.go | 6 +- integration/v3_lock_test.go | 8 ++- 12 files changed, 206 insertions(+), 138 deletions(-) create mode 100644 clientv3/concurrency/key.go create mode 100644 clientv3/concurrency/mutex.go delete mode 100644 contrib/recipes/mutex.go delete mode 100644 contrib/recipes/range.go diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go new file mode 100644 index 000000000..b741f97b4 --- /dev/null +++ b/clientv3/concurrency/key.go @@ -0,0 +1,57 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package concurrency + +import ( + "fmt" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3rpc" +) + +// NewUniqueKey creates a new key from a given prefix. +func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) { + for { + newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano()) + put := v3.OpPut(newKey, "", opts...) + cmp := v3.Compare(v3.ModifiedRevision(newKey), "=", 0) + resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit() + if err != nil { + return "", 0, err + } + if !resp.Succeeded { + continue + } + return newKey, resp.Header.Revision, nil + } +} + +func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { + w := v3.NewWatcher(client) + defer w.Close() + wc := w.Watch(ctx, key, opts...) + if wc == nil { + return ctx.Err() + } + wresp, ok := <-wc + if !ok { + return ctx.Err() + } + if len(wresp.Events) == 0 { + return v3rpc.ErrCompacted + } + return nil +} diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go new file mode 100644 index 000000000..f80f2e40f --- /dev/null +++ b/clientv3/concurrency/mutex.go @@ -0,0 +1,111 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency + +import ( + "sync" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" +) + +// Mutex implements the sync Locker interface with etcd +type Mutex struct { + client *v3.Client + kv v3.KV + ctx context.Context + + pfx string + myKey string + myRev int64 +} + +func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex { + return &Mutex{client, v3.NewKV(client), ctx, pfx, "", -1} +} + +// Lock locks the mutex with a cancellable context. If the context is cancelled +// while trying to acquire the lock, the mutex tries to clean its stale lock entry. +func (m *Mutex) Lock(ctx context.Context) error { + s, err := NewSession(m.client) + if err != nil { + return err + } + // put self in lock waiters via myKey; oldest waiter holds lock + m.myKey, m.myRev, err = NewUniqueKey(ctx, m.kv, m.pfx, v3.WithLease(s.Lease())) + // wait for lock to become available + for err == nil { + // find oldest element in waiters via revision of insertion + var resp *v3.GetResponse + resp, err = m.kv.Get(ctx, m.pfx, v3.WithFirstRev()...) + if err != nil { + break + } + if m.myRev == resp.Kvs[0].CreateRevision { + // myKey is oldest in waiters; myKey holds the lock now + return nil + } + // otherwise myKey isn't lowest, so there must be a pfx prior to myKey + opts := append(v3.WithLastRev(), v3.WithRev(m.myRev-1)) + resp, err = m.kv.Get(ctx, m.pfx, opts...) + if err != nil { + break + } + lastKey := string(resp.Kvs[0].Key) + // wait for release on prior pfx + err = waitUpdate(ctx, m.client, lastKey, v3.WithRev(m.myRev)) + // try again in case lastKey left the wait list before acquiring the lock; + // myKey can only hold the lock if it's the oldest in the list + } + + // release lock key if cancelled + select { + case <-ctx.Done(): + m.Unlock() + default: + } + return err +} + +func (m *Mutex) Unlock() error { + if _, err := m.kv.Delete(m.ctx, m.myKey); err != nil { + return err + } + m.myKey = "\x00" + m.myRev = -1 + return nil +} + +func (m *Mutex) IsOwner() v3.Cmp { + return v3.Compare(v3.CreatedRevision(m.myKey), "=", m.myRev) +} + +type lockerMutex struct{ *Mutex } + +func (lm *lockerMutex) Lock() { + if err := lm.Mutex.Lock(lm.ctx); err != nil { + panic(err) + } +} +func (lm *lockerMutex) Unlock() { + if err := lm.Mutex.Unlock(); err != nil { + panic(err) + } +} + +// NewLocker creates a sync.Locker backed by an etcd mutex. +func NewLocker(ctx context.Context, client *v3.Client, pfx string) sync.Locker { + return &lockerMutex{NewMutex(ctx, client, pfx)} +} diff --git a/clientv3/op.go b/clientv3/op.go index 9f76e2d39..a7b545049 100644 --- a/clientv3/op.go +++ b/clientv3/op.go @@ -227,3 +227,26 @@ func WithFromKey() OpOption { return WithRange("\x00") } func WithSerializable() OpOption { return func(op *Op) { op.serializable = true } } + +// WithFirstCreate gets the key with the oldest creation revision in the request range. +func WithFirstCreate() []OpOption { return withTop(SortByCreatedRev, SortAscend) } + +// WithLastCreate gets the key with the latest creation revision in the request range. +func WithLastCreate() []OpOption { return withTop(SortByCreatedRev, SortDescend) } + +// WithFirstKey gets the lexically first key in the request range. +func WithFirstKey() []OpOption { return withTop(SortByKey, SortAscend) } + +// WithLastKey gets the lexically last key in the request range. +func WithLastKey() []OpOption { return withTop(SortByKey, SortDescend) } + +// WithFirstRev gets the key with the oldest modification revision in the request range. +func WithFirstRev() []OpOption { return withTop(SortByModifiedRev, SortAscend) } + +// WithLastRev gets the key with the latest modification revision in the request range. +func WithLastRev() []OpOption { return withTop(SortByModifiedRev, SortDescend) } + +// withTop gets the first key over the get's prefix given a sort order +func withTop(target SortTarget, order SortOrder) []OpOption { + return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)} +} diff --git a/contrib/recipes/barrier.go b/contrib/recipes/barrier.go index dcfd025a6..56fde2dfd 100644 --- a/contrib/recipes/barrier.go +++ b/contrib/recipes/barrier.go @@ -49,7 +49,7 @@ func (b *Barrier) Release() error { // Wait blocks on the barrier key until it is deleted. If there is no key, Wait // assumes Release has already been called and returns immediately. func (b *Barrier) Wait() error { - resp, err := b.kv.Get(b.ctx, b.key, withFirstKey()...) + resp, err := b.kv.Get(b.ctx, b.key, v3.WithFirstKey()...) if err != nil { return err } diff --git a/contrib/recipes/election.go b/contrib/recipes/election.go index 163f49867..d091a1975 100644 --- a/contrib/recipes/election.go +++ b/contrib/recipes/election.go @@ -62,7 +62,7 @@ func (e *Election) Resign() (err error) { // Leader returns the leader value for the current election. func (e *Election) Leader() (string, error) { - resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) + resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { @@ -74,7 +74,7 @@ func (e *Election) Leader() (string, error) { // Wait waits for a leader to be elected, returning the leader value. func (e *Election) Wait() (string, error) { - resp, err := e.kv.Get(e.ctx, e.keyPrefix, withFirstCreate()...) + resp, err := e.kv.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) != 0 { @@ -93,7 +93,7 @@ func (e *Election) Wait() (string, error) { } func (e *Election) waitLeadership(tryKey *EphemeralKV) error { - opts := append(withLastCreate(), v3.WithRev(tryKey.Revision()-1)) + opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1)) resp, err := e.kv.Get(e.ctx, e.keyPrefix, opts...) if err != nil { return err diff --git a/contrib/recipes/key.go b/contrib/recipes/key.go index e88357611..a7011c177 100644 --- a/contrib/recipes/key.go +++ b/contrib/recipes/key.go @@ -99,7 +99,7 @@ func NewSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) { // newSequentialKV allocates a new sequential key /nnnnn with a given // value and lease. Note: a bookkeeping node __ is also allocated. func newSequentialKV(kv v3.KV, prefix, val string, leaseID lease.LeaseID) (*RemoteKV, error) { - resp, err := kv.Get(context.TODO(), prefix, withLastKey()...) + resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...) if err != nil { return nil, err } diff --git a/contrib/recipes/mutex.go b/contrib/recipes/mutex.go deleted file mode 100644 index f872e6d4d..000000000 --- a/contrib/recipes/mutex.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2016 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package recipe - -import ( - "sync" - - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - v3 "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/storage/storagepb" -) - -// Mutex implements the sync Locker interface with etcd -type Mutex struct { - client *v3.Client - kv v3.KV - ctx context.Context - - key string - myKey *EphemeralKV -} - -func NewMutex(client *v3.Client, key string) *Mutex { - return &Mutex{client, v3.NewKV(client), context.TODO(), key, nil} -} - -func (m *Mutex) Lock() (err error) { - // put self in lock waiters via myKey; oldest waiter holds lock - m.myKey, err = NewUniqueEphemeralKey(m.client, m.key) - if err != nil { - return err - } - // find oldest element in waiters via revision of insertion - resp, err := m.kv.Get(m.ctx, m.key, withFirstRev()...) - if err != nil { - return err - } - // if myKey is oldest in waiters, then myKey holds the lock - if m.myKey.Revision() == resp.Kvs[0].CreateRevision { - return nil - } - // otherwise myKey isn't lowest, so there must be a key prior to myKey - opts := append(withLastRev(), v3.WithRev(m.myKey.Revision()-1)) - lastKey, err := m.kv.Get(m.ctx, m.key, opts...) - if err != nil { - return err - } - // wait for release on prior key - _, err = WaitEvents( - m.client, - string(lastKey.Kvs[0].Key), - m.myKey.Revision()-1, - []storagepb.Event_EventType{storagepb.DELETE}) - // myKey now oldest - return err -} - -func (m *Mutex) Unlock() error { - err := m.myKey.Delete() - m.myKey = nil - return err -} - -type lockerMutex struct{ *Mutex } - -func (lm *lockerMutex) Lock() { - if err := lm.Mutex.Lock(); err != nil { - panic(err) - } -} -func (lm *lockerMutex) Unlock() { - if err := lm.Mutex.Unlock(); err != nil { - panic(err) - } -} - -func NewLocker(client *v3.Client, key string) sync.Locker { - return &lockerMutex{NewMutex(client, key)} -} diff --git a/contrib/recipes/priority_queue.go b/contrib/recipes/priority_queue.go index e7cce4ec0..1a1c628a1 100644 --- a/contrib/recipes/priority_queue.go +++ b/contrib/recipes/priority_queue.go @@ -46,7 +46,7 @@ func (q *PriorityQueue) Enqueue(val string, pr uint16) error { // queue is empty, Dequeue blocks until items are available. func (q *PriorityQueue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := q.kv.Get(q.ctx, q.key, withFirstKey()...) + resp, err := q.kv.Get(q.ctx, q.key, v3.WithFirstKey()...) if err != nil { return "", err } diff --git a/contrib/recipes/queue.go b/contrib/recipes/queue.go index 396420d2c..c0a4977fb 100644 --- a/contrib/recipes/queue.go +++ b/contrib/recipes/queue.go @@ -42,7 +42,7 @@ func (q *Queue) Enqueue(val string) error { // queue is empty, Dequeue blocks until elements are available. func (q *Queue) Dequeue() (string, error) { // TODO: fewer round trips by fetching more than one key - resp, err := q.kv.Get(q.ctx, q.keyPrefix, withFirstRev()...) + resp, err := q.kv.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...) if err != nil { return "", err } diff --git a/contrib/recipes/range.go b/contrib/recipes/range.go deleted file mode 100644 index 155a644e8..000000000 --- a/contrib/recipes/range.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2016 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package recipe - -import ( - v3 "github.com/coreos/etcd/clientv3" -) - -func withFirstCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortAscend) } -func withLastCreate() []v3.OpOption { return withTop(v3.SortByCreatedRev, v3.SortDescend) } -func withFirstKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortAscend) } -func withLastKey() []v3.OpOption { return withTop(v3.SortByKey, v3.SortDescend) } -func withFirstRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortAscend) } -func withLastRev() []v3.OpOption { return withTop(v3.SortByModifiedRev, v3.SortDescend) } - -// withTop gets the first key over the get's prefix given a sort order -func withTop(target v3.SortTarget, order v3.SortOrder) []v3.OpOption { - return []v3.OpOption{ - v3.WithPrefix(), - v3.WithSort(target, order), - v3.WithLimit(1)} -} diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index 93aff10cd..ea2425c69 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -42,7 +42,7 @@ func (rwm *RWMutex) RLock() error { // if there are nodes with "write-" and a lower // revision number than us we must wait - resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", withFirstRev()...) + resp, err := rwm.kv.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) if err != nil { return err } @@ -62,7 +62,7 @@ func (rwm *RWMutex) Lock() error { for { // find any key of lower rev number blocks the write lock - opts := append(withLastRev(), v3.WithRev(rk.Revision()-1)) + opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1)) resp, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err @@ -82,7 +82,7 @@ func (rwm *RWMutex) Lock() error { func (rwm *RWMutex) waitOnLowest() error { // must block; get key before ek for waiting - opts := append(withLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) + opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) lastKey, err := rwm.kv.Get(rwm.ctx, rwm.key, opts...) if err != nil { return err diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go index a3dfc1fab..97c0ae4f9 100644 --- a/integration/v3_lock_test.go +++ b/integration/v3_lock_test.go @@ -18,7 +18,9 @@ import ( "testing" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/contrib/recipes" ) @@ -36,11 +38,11 @@ func TestMutexMultiNode(t *testing.T) { func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) { // stream lock acquisitions - lockedC := make(chan *recipe.Mutex, 1) + lockedC := make(chan *concurrency.Mutex, 1) for i := 0; i < waiters; i++ { go func() { - m := recipe.NewMutex(chooseClient(), "test-mutex") - if err := m.Lock(); err != nil { + m := concurrency.NewMutex(context.TODO(), chooseClient(), "test-mutex") + if err := m.Lock(context.TODO()); err != nil { t.Fatalf("could not wait on lock (%v)", err) } lockedC <- m From ed44bb00f8745e1fade98dac74bb597c477f3909 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 23 Feb 2016 22:50:26 -0800 Subject: [PATCH 4/4] etcdctlv3: lock command --- clientv3/concurrency/mutex.go | 2 + etcdctlv3/README.md | 19 +++++++ etcdctlv3/command/lock_command.go | 88 +++++++++++++++++++++++++++++++ etcdctlv3/main.go | 1 + 4 files changed, 110 insertions(+) create mode 100644 etcdctlv3/command/lock_command.go diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index f80f2e40f..6c18d99c7 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -92,6 +92,8 @@ func (m *Mutex) IsOwner() v3.Cmp { return v3.Compare(v3.CreatedRevision(m.myKey), "=", m.myRev) } +func (m *Mutex) Key() string { return m.myKey } + type lockerMutex struct{ *Mutex } func (lm *lockerMutex) Lock() { diff --git a/etcdctlv3/README.md b/etcdctlv3/README.md index f560d276d..89e0684e1 100644 --- a/etcdctlv3/README.md +++ b/etcdctlv3/README.md @@ -264,6 +264,25 @@ bar ## Utility Commands +### LOCK \ + +LOCK acquires a distributed named mutex with a given name. Once the lock is acquired, it will be held until etcdctlv3 is terminated. + +#### Return value + +- Once the lock is acquired, the result for the GET on the unique lock holder key is displayed. + +- LOCK returns a zero exit code only if it is terminated by a signal and can release the lock. + +#### Example +```bash +./etcdctl lock mylock +mylock/1234534535445 + + +``` + + ### MAKE-MIRROR [options] \ [make-mirror][mirror] mirrors a key prefix in an etcd cluster to a destination etcd cluster. diff --git a/etcdctlv3/command/lock_command.go b/etcdctlv3/command/lock_command.go new file mode 100644 index 000000000..b68841422 --- /dev/null +++ b/etcdctlv3/command/lock_command.go @@ -0,0 +1,88 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "errors" + "os" + "os/signal" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" +) + +// NewLockCommand returns the cobra command for "lock". +func NewLockCommand() *cobra.Command { + c := &cobra.Command{ + Use: "lock ", + Short: "lock acquires a named lock", + Run: lockCommandFunc, + } + return c +} + +func lockCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + ExitWithError(ExitBadArgs, errors.New("lock takes one lock name arguement.")) + } + c := mustClientFromCmd(cmd) + if err := lockUntilSignal(c, args[0]); err != nil { + ExitWithError(ExitError, err) + } +} + +func lockUntilSignal(c *clientv3.Client, lockname string) error { + m := concurrency.NewMutex(context.TODO(), c, lockname) + ctx, cancel := context.WithCancel(context.TODO()) + + // unlock in case of ordinary shutdown + donec := make(chan struct{}) + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, os.Interrupt, os.Kill) + go func() { + <-sigc + cancel() + close(donec) + }() + + s, serr := concurrency.NewSession(c) + if serr != nil { + return serr + } + + if err := m.Lock(ctx); err != nil { + return err + } + + k, kerr := clientv3.NewKV(c).Get(ctx, m.Key()) + if kerr != nil { + return kerr + } + if len(k.Kvs) == 0 { + return errors.New("lock lost on init") + } + + display.Get(*k) + + select { + case <-donec: + return m.Unlock() + case <-s.Done(): + } + + return errors.New("session expired") +} diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index 3106a7a02..240034997 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -62,6 +62,7 @@ func init() { command.NewMemberCommand(), command.NewSnapshotCommand(), command.NewMakeMirrorCommand(), + command.NewLockCommand(), ) }