Merge pull request #5349 from heyitsanthony/clientv3-conc-fixups

clientv3/concurrency: ctx-izations and session leader ids
This commit is contained in:
Anthony Romano 2016-05-13 10:33:55 -07:00
commit 8c953499fa
9 changed files with 65 additions and 47 deletions

View File

@ -16,6 +16,7 @@ package concurrency
import (
"errors"
"fmt"
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
@ -50,22 +51,39 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
return serr
}
k, rev, err := NewUniqueKV(ctx, e.client, e.keyPrefix, val, v3.WithLease(s.Lease()))
if err == nil {
err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(rev-1))
k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.client.Delete(e.client.Ctx(), k)
e.Resign(e.client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, rev, s
return nil
}
@ -89,19 +107,19 @@ func (e *Election) Proclaim(ctx context.Context, val string) error {
}
// Resign lets a leader start a new election.
func (e *Election) Resign() (err error) {
func (e *Election) Resign(ctx context.Context) (err error) {
if e.leaderSession == nil {
return nil
}
_, err = e.client.Delete(e.client.Ctx(), e.leaderKey)
_, err = e.client.Delete(ctx, e.leaderKey)
e.leaderKey = ""
e.leaderSession = nil
return err
}
// Leader returns the leader value for the current election.
func (e *Election) Leader() (string, error) {
resp, err := e.client.Get(e.client.Ctx(), e.keyPrefix, v3.WithFirstCreate()...)
func (e *Election) Leader(ctx context.Context) (string, error) {
resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return "", err
} else if len(resp.Kvs) == 0 {

View File

@ -17,34 +17,12 @@ package concurrency
import (
"fmt"
"math"
"time"
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
// NewUniqueKey creates a new key from a given prefix.
func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) {
return NewUniqueKV(ctx, kv, pfx, "", opts...)
}
func NewUniqueKV(ctx context.Context, kv v3.KV, pfx, val string, opts ...v3.OpOption) (string, int64, error) {
for {
newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano())
put := v3.OpPut(newKey, val, opts...)
cmp := v3.Compare(v3.ModRevision(newKey), "=", 0)
resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit()
if err != nil {
return "", 0, err
}
if !resp.Succeeded {
continue
}
return newKey, resp.Header.Revision, nil
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()

View File

@ -63,14 +63,14 @@ func (m *Mutex) Lock(ctx context.Context) error {
// release lock key if cancelled
select {
case <-ctx.Done():
m.Unlock()
m.Unlock(m.client.Ctx())
default:
}
return err
}
func (m *Mutex) Unlock() error {
if _, err := m.client.Delete(m.client.Ctx(), m.myKey); err != nil {
func (m *Mutex) Unlock(ctx context.Context) error {
if _, err := m.client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
@ -92,7 +92,7 @@ func (lm *lockerMutex) Lock() {
}
}
func (lm *lockerMutex) Unlock() {
if err := lm.Mutex.Unlock(); err != nil {
if err := lm.Mutex.Unlock(lm.client.Ctx()); err != nil {
panic(err)
}
}

View File

@ -128,5 +128,5 @@ func campaign(c *clientv3.Client, election string, prop string) error {
return errors.New("elect: session expired")
}
return e.Resign()
return e.Resign(context.TODO())
}

View File

@ -80,7 +80,7 @@ func lockUntilSignal(c *clientv3.Client, lockname string) error {
select {
case <-donec:
return m.Unlock()
return m.Unlock(context.TODO())
case <-s.Done():
}

View File

@ -72,7 +72,7 @@ func TestElectionWait(t *testing.T) {
}
}
// let next leader take over
if err := e.Resign(); err != nil {
if err := e.Resign(context.TODO()); err != nil {
t.Fatalf("failed resign (%v)", err)
}
// tell followers to start listening for next leader
@ -146,3 +146,26 @@ func TestElectionFailover(t *testing.T) {
// leader must ack election (otherwise, Campaign may see closed conn)
<-electedc
}
// TestElectionSessionRelock ensures that campaigning twice on the same election
// with the same lock will Proclaim instead of deadlocking.
func TestElectionSessionRecampaign(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
cli := clus.RandClient()
e := concurrency.NewElection(cli, "test-elect")
if err := e.Campaign(context.TODO(), "abc"); err != nil {
t.Fatal(err)
}
e2 := concurrency.NewElection(cli, "test-elect")
if err := e2.Campaign(context.TODO(), "def"); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
if resp := <-e.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
t.Fatalf("expected value=%q, got response %v", "def", resp)
}
}

View File

@ -69,7 +69,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
t.Fatalf("lock %d followers did not wait", i)
default:
}
if err := m.Unlock(); err != nil {
if err := m.Unlock(context.TODO()); err != nil {
t.Fatalf("could not release lock (%v)", err)
}
}

View File

@ -95,7 +95,7 @@ func runElection(eps []string, rounds int) {
}
}
rcs[i].validate = func() error {
if l, err := e.Leader(); err == nil && l != observedLeader {
if l, err := e.Leader(context.TODO()); err == nil && l != observedLeader {
return fmt.Errorf("expected leader %q, got %q", observedLeader, l)
}
validatec <- struct{}{}
@ -110,7 +110,7 @@ func runElection(eps []string, rounds int) {
return fmt.Errorf("waiting on followers")
}
}
if err := e.Resign(); err != nil {
if err := e.Resign(context.TODO()); err != nil {
return err
}
if observedLeader == v {
@ -182,7 +182,7 @@ func runRacer(eps []string, round int) {
return nil
}
rcs[i].release = func() error {
if err := m.Unlock(); err != nil {
if err := m.Unlock(ctx); err != nil {
return err
}
cnt = 0

View File

@ -22,13 +22,12 @@ import (
"log"
"math/rand"
"net"
"os"
"sync"
"time"
)
func bridge(conn net.Conn, remoteAddr string) {
outconn, err := net.Dial("tcp", os.Args[2])
outconn, err := net.Dial("tcp", flag.Args()[1])
if err != nil {
log.Println("oops:", err)
return
@ -45,7 +44,7 @@ func blackhole(conn net.Conn) {
}
func readRemoteOnly(conn net.Conn, remoteAddr string) {
outconn, err := net.Dial("tcp", os.Args[2])
outconn, err := net.Dial("tcp", flag.Args()[1])
if err != nil {
log.Println("oops:", err)
return
@ -55,7 +54,7 @@ func readRemoteOnly(conn net.Conn, remoteAddr string) {
}
func writeRemoteOnly(conn net.Conn, remoteAddr string) {
outconn, err := net.Dial("tcp", os.Args[2])
outconn, err := net.Dial("tcp", flag.Args()[1])
if err != nil {
log.Println("oops:", err)
return
@ -79,7 +78,7 @@ func randCopy(conn net.Conn, outconn net.Conn) {
}
func randomBlackhole(conn net.Conn, remoteAddr string) {
outconn, err := net.Dial("tcp", os.Args[2])
outconn, err := net.Dial("tcp", flag.Args()[1])
if err != nil {
log.Println("oops:", err)
return