mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3/concurrency: allow election on prefixes of keys.
After winning an election or obtaining a lock, we auto-append a slash after the provided key prefix. This avoids the previous deadlock due to waiting on the wrong key. Fixes #6278 Conflicts: clientv3/concurrency/election.go clientv3/concurrency/mutex.go
This commit is contained in:
parent
5089bf58fb
commit
4b48876f0e
@ -40,7 +40,7 @@ type Election struct {
|
|||||||
|
|
||||||
// NewElection returns a new election on a given key prefix.
|
// NewElection returns a new election on a given key prefix.
|
||||||
func NewElection(client *v3.Client, pfx string) *Election {
|
func NewElection(client *v3.Client, pfx string) *Election {
|
||||||
return &Election{client: client, keyPrefix: pfx}
|
return &Election{client: client, keyPrefix: pfx + "/"}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Campaign puts a value as eligible for the election. It blocks until
|
// Campaign puts a value as eligible for the election. It blocks until
|
||||||
@ -59,7 +59,6 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
|
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
|
||||||
if !resp.Succeeded {
|
if !resp.Succeeded {
|
||||||
kv := resp.Responses[0].GetResponseRange().Kvs[0]
|
kv := resp.Responses[0].GetResponseRange().Kvs[0]
|
||||||
|
@ -32,7 +32,7 @@ type Mutex struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewMutex(client *v3.Client, pfx string) *Mutex {
|
func NewMutex(client *v3.Client, pfx string) *Mutex {
|
||||||
return &Mutex{client, pfx, "", -1}
|
return &Mutex{client, pfx + "/", "", -1}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
// Lock locks the mutex with a cancellable context. If the context is cancelled
|
||||||
@ -43,7 +43,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
|
|||||||
return serr
|
return serr
|
||||||
}
|
}
|
||||||
|
|
||||||
m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
|
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
|
||||||
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
|
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
|
||||||
// put self in lock waiters via myKey; oldest waiter holds lock
|
// put self in lock waiters via myKey; oldest waiter holds lock
|
||||||
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
|
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
|
||||||
|
@ -174,3 +174,31 @@ func TestElectionSessionRecampaign(t *testing.T) {
|
|||||||
t.Fatalf("expected value=%q, got response %v", "def", resp)
|
t.Fatalf("expected value=%q, got response %v", "def", resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestElectionOnPrefixOfExistingKey checks that a single
|
||||||
|
// candidate can be elected on a new key that is a prefix
|
||||||
|
// of an existing key. To wit, check for regression
|
||||||
|
// of bug #6278. https://github.com/coreos/etcd/issues/6278
|
||||||
|
//
|
||||||
|
func TestElectionOnPrefixOfExistingKey(t *testing.T) {
|
||||||
|
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
cli := clus.RandClient()
|
||||||
|
if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
s, serr := concurrency.NewSession(cli)
|
||||||
|
if serr != nil {
|
||||||
|
t.Fatal(serr)
|
||||||
|
}
|
||||||
|
e := concurrency.NewElection(s, "test")
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||||
|
err := e.Campaign(ctx, "abc")
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
|
// after 5 seconds, deadlock results in
|
||||||
|
// 'context deadline exceeded' here.
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user