From 3327858a5466496c322c4c182d8cdf4779f73b46 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 29 Feb 2016 11:27:56 -0800 Subject: [PATCH 1/2] clientv3/concurrency: move election recipe into clientv3 --- clientv3/concurrency/election.go | 184 +++++++++++++++++++++++++++++++ clientv3/concurrency/key.go | 63 ++++++++++- contrib/recipes/election.go | 109 ------------------ integration/v3_election_test.go | 50 +++++---- 4 files changed, 269 insertions(+), 137 deletions(-) create mode 100644 clientv3/concurrency/election.go delete mode 100644 contrib/recipes/election.go diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go new file mode 100644 index 000000000..c4cebe7cd --- /dev/null +++ b/clientv3/concurrency/election.go @@ -0,0 +1,184 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency + +import ( + "errors" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + v3 "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/storage/storagepb" +) + +var ( + ErrElectionNotLeader = errors.New("election: not leader") + ErrElectionNoLeader = errors.New("election: no leader") +) + +type Election struct { + client *v3.Client + ctx context.Context + + keyPrefix string + + leaderKey string + leaderRev int64 + leaderSession *Session +} + +// NewElection returns a new election on a given key prefix. +func NewElection(ctx context.Context, client *v3.Client, pfx string) *Election { + return &Election{client: client, ctx: ctx, keyPrefix: pfx} +} + +// Campaign puts a value as eligible for the election. It blocks until +// it is elected, an error occurs, or the context is cancelled. +func (e *Election) Campaign(ctx context.Context, val string) error { + s, serr := NewSession(e.client) + if serr != nil { + return serr + } + + k, rev, err := NewUniqueKV(ctx, e.client, e.keyPrefix, val, v3.WithLease(s.Lease())) + if err == nil { + err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(rev-1)) + } + + if err != nil { + // clean up in case of context cancel + select { + case <-ctx.Done(): + e.client.Delete(e.ctx, k) + default: + } + return err + } + + e.leaderKey, e.leaderRev, e.leaderSession = k, rev, s + return nil +} + +// Proclaim lets the leader announce a new value without another election. +func (e *Election) Proclaim(ctx context.Context, val string) error { + if e.leaderSession == nil { + return ErrElectionNotLeader + } + cmp := v3.Compare(v3.CreatedRevision(e.leaderKey), "=", e.leaderRev) + txn := e.client.Txn(ctx).If(cmp) + txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease()))) + tresp, terr := txn.Commit() + if terr != nil { + return terr + } + if !tresp.Succeeded { + e.leaderKey = "" + return ErrElectionNotLeader + } + return nil +} + +// Resign lets a leader start a new election. +func (e *Election) Resign() (err error) { + if e.leaderSession == nil { + return nil + } + _, err = e.client.Delete(e.ctx, e.leaderKey) + e.leaderKey = "" + e.leaderSession = nil + return err +} + +// Leader returns the leader value for the current election. +func (e *Election) Leader() (string, error) { + resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) + if err != nil { + return "", err + } else if len(resp.Kvs) == 0 { + // no leader currently elected + return "", ErrElectionNoLeader + } + return string(resp.Kvs[0].Value), nil +} + +// Observe returns a channel that observes all leader proposal values as +// GetResponse values on the current leader key. The channel closes when +// the context is cancelled or the underlying watcher is otherwise disrupted. +func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse { + retc := make(chan v3.GetResponse) + go e.observe(ctx, retc) + return retc +} + +func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) { + defer close(ch) + for { + resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) + if err != nil { + return + } + + var kv *storagepb.KeyValue + + cctx, cancel := context.WithCancel(ctx) + if len(resp.Kvs) == 0 { + // wait for first key put on prefix + opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()} + wch := e.client.Watch(cctx, e.keyPrefix, opts...) + + for kv == nil { + wr, ok := <-wch + if !ok || len(wr.Events) == 0 { + cancel() + return + } + // only accept PUTs; a DELETE will make observe() spin + for _, ev := range wr.Events { + if ev.Type == storagepb.PUT { + kv = ev.Kv + break + } + } + } + } else { + kv = resp.Kvs[0] + } + + wch := e.client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision)) + keyDeleted := false + for !keyDeleted { + wr, ok := <-wch + if !ok { + return + } + for _, ev := range wr.Events { + if ev.Type == storagepb.DELETE { + keyDeleted = true + break + } + resp.Header = &wr.Header + resp.Kvs = []*storagepb.KeyValue{ev.Kv} + select { + case ch <- *resp: + case <-cctx.Done(): + return + } + } + } + cancel() + } +} + +// Key returns the leader key if elected, empty string otherwise. +func (e *Election) Key() string { return e.leaderKey } diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go index e16de51b4..0a1a93078 100644 --- a/clientv3/concurrency/key.go +++ b/clientv3/concurrency/key.go @@ -15,18 +15,24 @@ package concurrency import ( "fmt" + "math" "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc" + "github.com/coreos/etcd/storage/storagepb" ) // NewUniqueKey creates a new key from a given prefix. func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) { + return NewUniqueKV(ctx, kv, pfx, "", opts...) +} + +func NewUniqueKV(ctx context.Context, kv v3.KV, pfx, val string, opts ...v3.OpOption) (string, int64, error) { for { newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano()) - put := v3.OpPut(newKey, "", opts...) + put := v3.OpPut(newKey, val, opts...) cmp := v3.Compare(v3.ModifiedRevision(newKey), "=", 0) resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit() if err != nil { @@ -40,11 +46,9 @@ func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption } func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { - wc := client.Watch(ctx, key, opts...) - if wc == nil { - return ctx.Err() - } - wresp, ok := <-wc + cctx, cancel := context.WithCancel(ctx) + defer cancel() + wresp, ok := <-client.Watch(cctx, key, opts...) if !ok { return ctx.Err() } @@ -53,3 +57,50 @@ func waitUpdate(ctx context.Context, client *v3.Client, key string, opts ...v3.O } return nil } + +func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { + cctx, cancel := context.WithCancel(ctx) + defer cancel() + wch := client.Watch(cctx, key, v3.WithRev(rev)) + for wr := range wch { + for _, ev := range wr.Events { + if ev.Type == storagepb.DELETE { + return nil + } + } + } + if err := ctx.Err(); err != nil { + return err + } + return fmt.Errorf("lost watcher waiting for delete") +} + +// waitDeletes efficiently waits until all keys matched by Get(key, opts...) are deleted +func waitDeletes(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error { + getOpts := []v3.OpOption{v3.WithSort(v3.SortByCreatedRev, v3.SortAscend)} + getOpts = append(getOpts, opts...) + resp, err := client.Get(ctx, key, getOpts...) + maxRev := int64(math.MaxInt64) + getOpts = append(getOpts, v3.WithRev(0)) + for err == nil { + for len(resp.Kvs) > 0 { + i := len(resp.Kvs) - 1 + if resp.Kvs[i].CreateRevision <= maxRev { + break + } + resp.Kvs = resp.Kvs[:i] + } + if len(resp.Kvs) == 0 { + break + } + lastKV := resp.Kvs[len(resp.Kvs)-1] + maxRev = lastKV.CreateRevision + err = waitDelete(ctx, client, string(lastKV.Key), maxRev) + if err != nil || len(resp.Kvs) == 1 { + break + } + getOpts = append(getOpts, v3.WithLimit(int64(len(resp.Kvs)-1))) + resp, err = client.Get(ctx, key, getOpts...) + } + return err +} diff --git a/contrib/recipes/election.go b/contrib/recipes/election.go deleted file mode 100644 index 8c41b610b..000000000 --- a/contrib/recipes/election.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2016 CoreOS, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package recipe - -import ( - "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" - v3 "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/storage/storagepb" -) - -type Election struct { - client *v3.Client - ctx context.Context - - keyPrefix string - leaderKey *EphemeralKV -} - -// NewElection returns a new election on a given key prefix. -func NewElection(client *v3.Client, keyPrefix string) *Election { - return &Election{client, context.TODO(), keyPrefix, nil} -} - -// Volunteer puts a value as eligible for the election. It blocks until -// it is elected or an error occurs (cannot withdraw candidacy) -func (e *Election) Volunteer(val string) error { - if e.leaderKey != nil { - return e.leaderKey.Put(val) - } - myKey, err := NewUniqueEphemeralKV(e.client, e.keyPrefix, val) - if err != nil { - return err - } - if err = e.waitLeadership(myKey); err != nil { - return err - } - e.leaderKey = myKey - return nil -} - -// Resign lets a leader start a new election. -func (e *Election) Resign() (err error) { - if e.leaderKey != nil { - err = e.leaderKey.Delete() - e.leaderKey = nil - } - return err -} - -// Leader returns the leader value for the current election. -func (e *Election) Leader() (string, error) { - resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) - if err != nil { - return "", err - } else if len(resp.Kvs) == 0 { - // no leader currently elected - return "", etcdserver.ErrNoLeader - } - return string(resp.Kvs[0].Value), nil -} - -// Wait waits for a leader to be elected, returning the leader value. -func (e *Election) Wait() (string, error) { - resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...) - if err != nil { - return "", err - } else if len(resp.Kvs) != 0 { - // leader already exists - return string(resp.Kvs[0].Value), nil - } - _, err = WaitPrefixEvents( - e.client, - e.keyPrefix, - resp.Header.Revision, - []storagepb.Event_EventType{storagepb.PUT}) - if err != nil { - return "", err - } - return e.Wait() -} - -func (e *Election) waitLeadership(tryKey *EphemeralKV) error { - opts := append(v3.WithLastCreate(), v3.WithRev(tryKey.Revision()-1)) - resp, err := e.client.Get(e.ctx, e.keyPrefix, opts...) - if err != nil { - return err - } else if len(resp.Kvs) == 0 { - // nothing before tryKey => have leadership - return nil - } - _, err = WaitEvents( - e.client, - string(resp.Kvs[0].Key), - tryKey.Revision(), - []storagepb.Event_EventType{storagepb.DELETE}) - return err -} diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index 138e32a38..4f7ad7a7b 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -18,8 +18,8 @@ import ( "testing" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3/concurrency" - "github.com/coreos/etcd/contrib/recipes" ) // TestElectionWait tests if followers can correctly wait for elections. @@ -40,12 +40,14 @@ func TestElectionWait(t *testing.T) { nextc = append(nextc, make(chan struct{})) go func(ch chan struct{}) { for j := 0; j < leaders; j++ { - b := recipe.NewElection(clus.RandClient(), "test-election") - s, err := b.Wait() - if err != nil { - t.Fatalf("could not wait for election (%v)", err) + b := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election") + cctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + s, ok := <-b.Observe(cctx) + if !ok { + t.Fatalf("could not observe election; channel closed") } - electedc <- s + electedc <- string(s.Kvs[0].Value) // wait for next election round <-ch } @@ -56,9 +58,9 @@ func TestElectionWait(t *testing.T) { // elect some leaders for i := 0; i < leaders; i++ { go func() { - e := recipe.NewElection(clus.RandClient(), "test-election") + e := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election") ev := fmt.Sprintf("electval-%v", time.Now().UnixNano()) - if err := e.Volunteer(ev); err != nil { + if err := e.Campaign(context.TODO(), ev); err != nil { t.Fatalf("failed volunteer (%v)", err) } // wait for followers to accept leadership @@ -91,17 +93,21 @@ func TestElectionFailover(t *testing.T) { defer clus.Terminate(t) defer dropSessionLease(clus) + cctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + // first leader (elected) - e := recipe.NewElection(clus.clients[0], "test-election") - if err := e.Volunteer("foo"); err != nil { + e := concurrency.NewElection(context.TODO(), clus.clients[0], "test-election") + if err := e.Campaign(context.TODO(), "foo"); err != nil { t.Fatalf("failed volunteer (%v)", err) } // check first leader - s, err := e.Wait() - if err != nil { - t.Fatalf("could not wait for first election (%v)", err) + resp, ok := <-e.Observe(cctx) + if !ok { + t.Fatalf("could not wait for first election; channel closed") } + s := string(resp.Kvs[0].Value) if s != "foo" { t.Fatalf("wrong election result. got %s, wanted foo", s) } @@ -109,8 +115,8 @@ func TestElectionFailover(t *testing.T) { // next leader electedc := make(chan struct{}) go func() { - ee := recipe.NewElection(clus.clients[1], "test-election") - if eer := ee.Volunteer("bar"); eer != nil { + ee := concurrency.NewElection(context.TODO(), clus.clients[1], "test-election") + if eer := ee.Campaign(context.TODO(), "bar"); eer != nil { t.Fatal(eer) } electedc <- struct{}{} @@ -121,21 +127,21 @@ func TestElectionFailover(t *testing.T) { if serr != nil { t.Fatal(serr) } - err = session.Close() - if err != nil { + if err := session.Close(); err != nil { t.Fatal(err) } // check new leader - e = recipe.NewElection(clus.clients[2], "test-election") - s, err = e.Wait() - if err != nil { - t.Fatalf("could not wait for second election (%v)", err) + e = concurrency.NewElection(context.TODO(), clus.clients[2], "test-election") + resp, ok = <-e.Observe(cctx) + if !ok { + t.Fatalf("could not wait for second election; channel closed") } + s = string(resp.Kvs[0].Value) if s != "bar" { t.Fatalf("wrong election result. got %s, wanted bar", s) } - // leader must ack election (otherwise, Volunteer may see closed conn) + // leader must ack election (otherwise, Campaign may see closed conn) <-electedc } From 20d89bcf324b3535f559596b49457290b67a9f3c Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 1 Mar 2016 13:36:48 -0800 Subject: [PATCH 2/2] etcdctlv3: elect command --- etcdctlv3/README.md | 37 ++++++++ etcdctlv3/command/elect_command.go | 132 +++++++++++++++++++++++++++++ etcdctlv3/main.go | 1 + 3 files changed, 170 insertions(+) create mode 100644 etcdctlv3/command/elect_command.go diff --git a/etcdctlv3/README.md b/etcdctlv3/README.md index a88625a86..2ebc97f97 100644 --- a/etcdctlv3/README.md +++ b/etcdctlv3/README.md @@ -282,6 +282,43 @@ mylock/1234534535445 ``` +### Notes + +The lease length of a lock defaults to 60 seconds. If LOCK is abnormally terminated, lock progress may be delayed +by up to 60 seconds. + + +### ELECT [options] \ [proposal] + +ELECT participates on a named election. A node announces its candidacy in the election by providing +a proposal value. If a node wishes to observe the election, ELECT listens for new leaders values. +Whenever a leader is elected, its proposal is given as output. + +#### Options + +- listen -- observe the election + +#### Return value + +- If a candidate, ELECT displays the GET on the leader key once the node is elected election. + +- If observing, ELECT streams the result for a GET on the leader key for the current election and all future elections. + +- ELECT returns a zero exit code only if it is terminated by a signal and can revoke its candidacy or leadership, if any. + +#### Example +```bash +./etcdctl elect myelection foo +myelection/1456952310051373265 +foo + +``` + +### Notes + +The lease length of a leader defaults to 60 seconds. If a candidate is abnormally terminated, election +progress may be delayed by up to 60 seconds. + ### MAKE-MIRROR [options] \ diff --git a/etcdctlv3/command/elect_command.go b/etcdctlv3/command/elect_command.go new file mode 100644 index 000000000..f7d0242ea --- /dev/null +++ b/etcdctlv3/command/elect_command.go @@ -0,0 +1,132 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "errors" + "os" + "os/signal" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" +) + +var ( + electListen bool +) + +// NewElectCommand returns the cobra command for "elect". +func NewElectCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "elect [proposal]", + Short: "elect observes and participates in leader election", + Run: electCommandFunc, + } + cmd.Flags().BoolVarP(&electListen, "listen", "l", false, "observation mode") + return cmd +} + +func electCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 && len(args) != 2 { + ExitWithError(ExitBadArgs, errors.New("elect takes one election name argument and an optional proposal argument.")) + } + c := mustClientFromCmd(cmd) + + var err error + if len(args) == 1 { + if !electListen { + ExitWithError(ExitBadArgs, errors.New("no proposal argument but -l not set")) + } + err = observe(c, args[0]) + } else { + if electListen { + ExitWithError(ExitBadArgs, errors.New("proposal given but -l is set")) + } + err = campaign(c, args[0], args[1]) + } + if err != nil { + ExitWithError(ExitError, err) + } +} + +func observe(c *clientv3.Client, election string) error { + e := concurrency.NewElection(context.TODO(), c, election) + ctx, cancel := context.WithCancel(context.TODO()) + + donec := make(chan struct{}) + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, os.Interrupt, os.Kill) + go func() { + <-sigc + cancel() + }() + + go func() { + for resp := range e.Observe(ctx) { + display.Get(resp) + } + close(donec) + }() + + <-donec + + select { + case <-ctx.Done(): + default: + return errors.New("elect: observer lost") + } + + return nil +} + +func campaign(c *clientv3.Client, election string, prop string) error { + e := concurrency.NewElection(context.TODO(), c, election) + ctx, cancel := context.WithCancel(context.TODO()) + + donec := make(chan struct{}) + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, os.Interrupt, os.Kill) + go func() { + <-sigc + cancel() + close(donec) + }() + + s, serr := concurrency.NewSession(c) + if serr != nil { + return serr + } + + if err := e.Campaign(ctx, prop); err != nil { + return err + } + + // print key since elected + resp, err := c.Get(ctx, e.Key()) + if err != nil { + return err + } + display.Get(*resp) + + select { + case <-donec: + case <-s.Done(): + return errors.New("elect: session expired") + } + + return e.Resign() +} diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index 771a9c5b9..c76b795bc 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -64,6 +64,7 @@ func init() { command.NewMakeMirrorCommand(), command.NewLockCommand(), command.NewAuthCommand(), + command.NewElectCommand(), ) }