diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index d91d9a269..89b52089a 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -16,6 +16,7 @@ package concurrency import ( "errors" + "fmt" v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" @@ -50,22 +51,39 @@ func (e *Election) Campaign(ctx context.Context, val string) error { return serr } - k, rev, err := NewUniqueKV(ctx, e.client, e.keyPrefix, val, v3.WithLease(s.Lease())) - if err == nil { - err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(rev-1)) + k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease()) + txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0)) + txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease()))) + txn = txn.Else(v3.OpGet(k)) + resp, err := txn.Commit() + if err != nil { + return err } + e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s + if !resp.Succeeded { + kv := resp.Responses[0].GetResponseRange().Kvs[0] + e.leaderRev = kv.CreateRevision + if string(kv.Value) != val { + if err = e.Proclaim(ctx, val); err != nil { + e.Resign(ctx) + return err + } + } + } + + err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1)) if err != nil { // clean up in case of context cancel select { case <-ctx.Done(): - e.client.Delete(e.client.Ctx(), k) + e.Resign(e.client.Ctx()) default: + e.leaderSession = nil } return err } - e.leaderKey, e.leaderRev, e.leaderSession = k, rev, s return nil } @@ -89,19 +107,19 @@ func (e *Election) Proclaim(ctx context.Context, val string) error { } // Resign lets a leader start a new election. -func (e *Election) Resign() (err error) { +func (e *Election) Resign(ctx context.Context) (err error) { if e.leaderSession == nil { return nil } - _, err = e.client.Delete(e.client.Ctx(), e.leaderKey) + _, err = e.client.Delete(ctx, e.leaderKey) e.leaderKey = "" e.leaderSession = nil return err } // Leader returns the leader value for the current election. -func (e *Election) Leader() (string, error) { - resp, err := e.client.Get(e.client.Ctx(), e.keyPrefix, v3.WithFirstCreate()...) +func (e *Election) Leader(ctx context.Context) (string, error) { + resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return "", err } else if len(resp.Kvs) == 0 { diff --git a/clientv3/concurrency/key.go b/clientv3/concurrency/key.go index 65e2b899a..74d495dd0 100644 --- a/clientv3/concurrency/key.go +++ b/clientv3/concurrency/key.go @@ -17,34 +17,12 @@ package concurrency import ( "fmt" "math" - "time" v3 "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "golang.org/x/net/context" ) -// NewUniqueKey creates a new key from a given prefix. -func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) { - return NewUniqueKV(ctx, kv, pfx, "", opts...) -} - -func NewUniqueKV(ctx context.Context, kv v3.KV, pfx, val string, opts ...v3.OpOption) (string, int64, error) { - for { - newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano()) - put := v3.OpPut(newKey, val, opts...) - cmp := v3.Compare(v3.ModRevision(newKey), "=", 0) - resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit() - if err != nil { - return "", 0, err - } - if !resp.Succeeded { - continue - } - return newKey, resp.Header.Revision, nil - } -} - func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { cctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 5e120fab1..803a8470a 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -63,14 +63,14 @@ func (m *Mutex) Lock(ctx context.Context) error { // release lock key if cancelled select { case <-ctx.Done(): - m.Unlock() + m.Unlock(m.client.Ctx()) default: } return err } -func (m *Mutex) Unlock() error { - if _, err := m.client.Delete(m.client.Ctx(), m.myKey); err != nil { +func (m *Mutex) Unlock(ctx context.Context) error { + if _, err := m.client.Delete(ctx, m.myKey); err != nil { return err } m.myKey = "\x00" @@ -92,7 +92,7 @@ func (lm *lockerMutex) Lock() { } } func (lm *lockerMutex) Unlock() { - if err := lm.Mutex.Unlock(); err != nil { + if err := lm.Mutex.Unlock(lm.client.Ctx()); err != nil { panic(err) } } diff --git a/etcdctl/ctlv3/command/elect_command.go b/etcdctl/ctlv3/command/elect_command.go index 64a5ff9e2..4af6b6b31 100644 --- a/etcdctl/ctlv3/command/elect_command.go +++ b/etcdctl/ctlv3/command/elect_command.go @@ -128,5 +128,5 @@ func campaign(c *clientv3.Client, election string, prop string) error { return errors.New("elect: session expired") } - return e.Resign() + return e.Resign(context.TODO()) } diff --git a/etcdctl/ctlv3/command/lock_command.go b/etcdctl/ctlv3/command/lock_command.go index 6cac497e7..7ead1c0ac 100644 --- a/etcdctl/ctlv3/command/lock_command.go +++ b/etcdctl/ctlv3/command/lock_command.go @@ -80,7 +80,7 @@ func lockUntilSignal(c *clientv3.Client, lockname string) error { select { case <-donec: - return m.Unlock() + return m.Unlock(context.TODO()) case <-s.Done(): } diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index 1c47e15ab..dcadd786f 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -72,7 +72,7 @@ func TestElectionWait(t *testing.T) { } } // let next leader take over - if err := e.Resign(); err != nil { + if err := e.Resign(context.TODO()); err != nil { t.Fatalf("failed resign (%v)", err) } // tell followers to start listening for next leader @@ -146,3 +146,26 @@ func TestElectionFailover(t *testing.T) { // leader must ack election (otherwise, Campaign may see closed conn) <-electedc } + +// TestElectionSessionRelock ensures that campaigning twice on the same election +// with the same lock will Proclaim instead of deadlocking. +func TestElectionSessionRecampaign(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + cli := clus.RandClient() + + e := concurrency.NewElection(cli, "test-elect") + if err := e.Campaign(context.TODO(), "abc"); err != nil { + t.Fatal(err) + } + e2 := concurrency.NewElection(cli, "test-elect") + if err := e2.Campaign(context.TODO(), "def"); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + if resp := <-e.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" { + t.Fatalf("expected value=%q, got response %v", "def", resp) + } +} diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go index 243c29c49..29d694200 100644 --- a/integration/v3_lock_test.go +++ b/integration/v3_lock_test.go @@ -69,7 +69,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) t.Fatalf("lock %d followers did not wait", i) default: } - if err := m.Unlock(); err != nil { + if err := m.Unlock(context.TODO()); err != nil { t.Fatalf("could not release lock (%v)", err) } } diff --git a/tools/functional-tester/etcd-runner/main.go b/tools/functional-tester/etcd-runner/main.go index 0320f2c36..7fab6d9c1 100644 --- a/tools/functional-tester/etcd-runner/main.go +++ b/tools/functional-tester/etcd-runner/main.go @@ -95,7 +95,7 @@ func runElection(eps []string, rounds int) { } } rcs[i].validate = func() error { - if l, err := e.Leader(); err == nil && l != observedLeader { + if l, err := e.Leader(context.TODO()); err == nil && l != observedLeader { return fmt.Errorf("expected leader %q, got %q", observedLeader, l) } validatec <- struct{}{} @@ -110,7 +110,7 @@ func runElection(eps []string, rounds int) { return fmt.Errorf("waiting on followers") } } - if err := e.Resign(); err != nil { + if err := e.Resign(context.TODO()); err != nil { return err } if observedLeader == v { @@ -182,7 +182,7 @@ func runRacer(eps []string, round int) { return nil } rcs[i].release = func() error { - if err := m.Unlock(); err != nil { + if err := m.Unlock(ctx); err != nil { return err } cnt = 0 diff --git a/tools/local-tester/bridge/bridge.go b/tools/local-tester/bridge/bridge.go index 25243ff30..b636d3237 100644 --- a/tools/local-tester/bridge/bridge.go +++ b/tools/local-tester/bridge/bridge.go @@ -22,13 +22,12 @@ import ( "log" "math/rand" "net" - "os" "sync" "time" ) func bridge(conn net.Conn, remoteAddr string) { - outconn, err := net.Dial("tcp", os.Args[2]) + outconn, err := net.Dial("tcp", flag.Args()[1]) if err != nil { log.Println("oops:", err) return @@ -45,7 +44,7 @@ func blackhole(conn net.Conn) { } func readRemoteOnly(conn net.Conn, remoteAddr string) { - outconn, err := net.Dial("tcp", os.Args[2]) + outconn, err := net.Dial("tcp", flag.Args()[1]) if err != nil { log.Println("oops:", err) return @@ -55,7 +54,7 @@ func readRemoteOnly(conn net.Conn, remoteAddr string) { } func writeRemoteOnly(conn net.Conn, remoteAddr string) { - outconn, err := net.Dial("tcp", os.Args[2]) + outconn, err := net.Dial("tcp", flag.Args()[1]) if err != nil { log.Println("oops:", err) return @@ -79,7 +78,7 @@ func randCopy(conn net.Conn, outconn net.Conn) { } func randomBlackhole(conn net.Conn, remoteAddr string) { - outconn, err := net.Dial("tcp", os.Args[2]) + outconn, err := net.Dial("tcp", flag.Args()[1]) if err != nil { log.Println("oops:", err) return