Merge pull request #6172 from xiang90/session

session: remove session manager and add ttl
This commit is contained in:
Xiang Li 2016-08-15 15:20:19 -07:00 committed by GitHub
commit d5d2370fc8
13 changed files with 235 additions and 138 deletions

View File

@ -29,7 +29,7 @@ var (
)
type Election struct {
client *v3.Client
session *Session
keyPrefix string
@ -39,20 +39,18 @@ type Election struct {
}
// NewElection returns a new election on a given key prefix.
func NewElection(client *v3.Client, pfx string) *Election {
return &Election{client: client, keyPrefix: pfx}
func NewElection(s *Session, pfx string) *Election {
return &Election{session: s, keyPrefix: pfx}
}
// Campaign puts a value as eligible for the election. It blocks until
// it is elected, an error occurs, or the context is cancelled.
func (e *Election) Campaign(ctx context.Context, val string) error {
s, serr := NewSession(e.client)
if serr != nil {
return serr
}
s := e.session
client := e.session.Client()
k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
@ -72,12 +70,12 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
}
}
err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
err = waitDeletes(ctx, client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(e.client.Ctx())
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
@ -92,8 +90,9 @@ func (e *Election) Proclaim(ctx context.Context, val string) error {
if e.leaderSession == nil {
return ErrElectionNotLeader
}
client := e.session.Client()
cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
txn := e.client.Txn(ctx).If(cmp)
txn := client.Txn(ctx).If(cmp)
txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
tresp, terr := txn.Commit()
if terr != nil {
@ -111,7 +110,8 @@ func (e *Election) Resign(ctx context.Context) (err error) {
if e.leaderSession == nil {
return nil
}
_, err = e.client.Delete(ctx, e.leaderKey)
client := e.session.Client()
_, err = client.Delete(ctx, e.leaderKey)
e.leaderKey = ""
e.leaderSession = nil
return err
@ -119,7 +119,8 @@ func (e *Election) Resign(ctx context.Context) (err error) {
// Leader returns the leader value for the current election.
func (e *Election) Leader(ctx context.Context) (string, error) {
resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
client := e.session.Client()
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return "", err
} else if len(resp.Kvs) == 0 {
@ -139,9 +140,11 @@ func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
}
func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
client := e.session.Client()
defer close(ch)
for {
resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
if err != nil {
return
}
@ -152,7 +155,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
if len(resp.Kvs) == 0 {
// wait for first key put on prefix
opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
wch := e.client.Watch(cctx, e.keyPrefix, opts...)
wch := client.Watch(cctx, e.keyPrefix, opts...)
for kv == nil {
wr, ok := <-wch
@ -172,7 +175,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
kv = resp.Kvs[0]
}
wch := e.client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
keyDeleted := false
for !keyDeleted {
wr, ok := <-wch

View File

@ -24,24 +24,22 @@ import (
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
client *v3.Client
s *Session
pfx string
myKey string
myRev int64
}
func NewMutex(client *v3.Client, pfx string) *Mutex {
return &Mutex{client, pfx, "", -1}
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx, "", -1}
}
// Lock locks the mutex with a cancellable context. If the context is cancelled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s, serr := NewSession(m.client)
if serr != nil {
return serr
}
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
@ -49,7 +47,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
resp, err := m.client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
resp, err := client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
if err != nil {
return err
}
@ -59,18 +57,19 @@ func (m *Mutex) Lock(ctx context.Context) error {
}
// wait for deletion revisions prior to myKey
err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
err = waitDeletes(ctx, client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
// release lock key if cancelled
select {
case <-ctx.Done():
m.Unlock(m.client.Ctx())
m.Unlock(client.Ctx())
default:
}
return err
}
func (m *Mutex) Unlock(ctx context.Context) error {
if _, err := m.client.Delete(ctx, m.myKey); err != nil {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
@ -87,17 +86,19 @@ func (m *Mutex) Key() string { return m.myKey }
type lockerMutex struct{ *Mutex }
func (lm *lockerMutex) Lock() {
if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil {
client := lm.s.Client()
if err := lm.Mutex.Lock(client.Ctx()); err != nil {
panic(err)
}
}
func (lm *lockerMutex) Unlock() {
if err := lm.Mutex.Unlock(lm.client.Ctx()); err != nil {
client := lm.s.Client()
if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
panic(err)
}
}
// NewLocker creates a sync.Locker backed by an etcd mutex.
func NewLocker(client *v3.Client, pfx string) sync.Locker {
return &lockerMutex{NewMutex(client, pfx)}
func NewLocker(s *Session, pfx string) sync.Locker {
return &lockerMutex{NewMutex(s, pfx)}
}

View File

@ -15,21 +15,11 @@
package concurrency
import (
"sync"
v3 "github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
)
// only keep one ephemeral lease per client
var clientSessions clientSessionMgr = clientSessionMgr{sessions: make(map[*v3.Client]*Session)}
const sessionTTL = 60
type clientSessionMgr struct {
sessions map[*v3.Client]*Session
mu sync.Mutex
}
const defaultSessionTTL = 60
// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
@ -42,14 +32,13 @@ type Session struct {
}
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client) (*Session, error) {
clientSessions.mu.Lock()
defer clientSessions.mu.Unlock()
if s, ok := clientSessions.sessions[client]; ok {
return s, nil
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
ops := &sessionOptions{ttl: defaultSessionTTL}
for _, opt := range opts {
opt(ops)
}
resp, err := client.Grant(client.Ctx(), sessionTTL)
resp, err := client.Grant(client.Ctx(), int64(ops.ttl))
if err != nil {
return nil, err
}
@ -63,16 +52,10 @@ func NewSession(client *v3.Client) (*Session, error) {
donec := make(chan struct{})
s := &Session{client: client, id: id, cancel: cancel, donec: donec}
clientSessions.sessions[client] = s
// keep the lease alive until client error or cancelled context
go func() {
defer func() {
clientSessions.mu.Lock()
delete(clientSessions.sessions, client)
clientSessions.mu.Unlock()
close(donec)
}()
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
@ -81,6 +64,11 @@ func NewSession(client *v3.Client) (*Session, error) {
return s, nil
}
// Client is the etcd client that is attached to the session.
func (s *Session) Client() *v3.Client {
return s.client
}
// Lease is the lease ID for keys bound to the session.
func (s *Session) Lease() v3.LeaseID { return s.id }
@ -102,3 +90,20 @@ func (s *Session) Close() error {
_, err := s.client.Revoke(s.client.Ctx(), s.id)
return err
}
type sessionOptions struct {
ttl int
}
// SessionOption configures Session.
type SessionOption func(*sessionOptions)
// WithTTL configures the session's TTL in seconds.
// If TTL is <= 0, the default 60 seconds TTL will be used.
func WithTTL(ttl int) SessionOption {
return func(so *sessionOptions) {
if ttl > 0 {
so.ttl = ttl
}
}
}

View File

@ -16,6 +16,7 @@ package recipe
import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
@ -23,32 +24,33 @@ import (
// DoubleBarrier blocks processes on Enter until an expected count enters, then
// blocks again on Leave until all processes have left.
type DoubleBarrier struct {
client *clientv3.Client
ctx context.Context
s *concurrency.Session
ctx context.Context
key string // key for the collective barrier
count int
myKey *EphemeralKV // current key for this process on the barrier
}
func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier {
return &DoubleBarrier{
client: client,
ctx: context.TODO(),
key: key,
count: count,
s: s,
ctx: context.TODO(),
key: key,
count: count,
}
}
// Enter waits for "count" processes to enter the barrier then returns
func (b *DoubleBarrier) Enter() error {
ek, err := NewUniqueEphemeralKey(b.client, b.key+"/waiters")
client := b.s.Client()
ek, err := NewUniqueEphemeralKey(b.s, b.key+"/waiters")
if err != nil {
return err
}
b.myKey = ek
resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
if err != nil {
return err
}
@ -59,12 +61,12 @@ func (b *DoubleBarrier) Enter() error {
if len(resp.Kvs) == b.count {
// unblock waiters
_, err = b.client.Put(b.ctx, b.key+"/ready", "")
_, err = client.Put(b.ctx, b.key+"/ready", "")
return err
}
_, err = WaitEvents(
b.client,
client,
b.key+"/ready",
ek.Revision(),
[]mvccpb.Event_EventType{mvccpb.PUT})
@ -73,7 +75,8 @@ func (b *DoubleBarrier) Enter() error {
// Leave waits for "count" processes to leave the barrier then returns
func (b *DoubleBarrier) Leave() error {
resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
client := b.s.Client()
resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
if err != nil {
return err
}
@ -94,7 +97,7 @@ func (b *DoubleBarrier) Leave() error {
if len(resp.Kvs) == 1 {
// this is the only node in the barrier; finish up
if _, err = b.client.Delete(b.ctx, b.key+"/ready"); err != nil {
if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
return err
}
return b.myKey.Delete()
@ -106,7 +109,7 @@ func (b *DoubleBarrier) Leave() error {
// lowest process in node => wait on highest process
if isLowest {
_, err = WaitEvents(
b.client,
client,
string(highest.Key),
highest.ModRevision,
[]mvccpb.Event_EventType{mvccpb.DELETE})
@ -123,7 +126,7 @@ func (b *DoubleBarrier) Leave() error {
key := string(lowest.Key)
_, err = WaitEvents(
b.client,
client,
key,
lowest.ModRevision,
[]mvccpb.Event_EventType{mvccpb.DELETE})

View File

@ -160,12 +160,8 @@ func (rk *RemoteKV) Put(val string) error {
type EphemeralKV struct{ RemoteKV }
// NewEphemeralKV creates a new key/value pair associated with a session lease
func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
s, err := concurrency.NewSession(client)
if err != nil {
return nil, err
}
k, err := NewKV(client, key, val, s.Lease())
func NewEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
k, err := NewKV(s.Client(), key, val, s.Lease())
if err != nil {
return nil, err
}
@ -173,15 +169,15 @@ func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
}
// NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) {
return NewUniqueEphemeralKV(client, prefix, "")
func NewUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
return NewUniqueEphemeralKV(s, prefix, "")
}
// NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) {
func NewUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
for {
newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
ek, err = NewEphemeralKV(client, newKey, val)
ek, err = NewEphemeralKV(s, newKey, val)
if err == nil || err != ErrKeyExists {
break
}

View File

@ -16,24 +16,27 @@ package recipe
import (
v3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/mvcc/mvccpb"
"golang.org/x/net/context"
)
type RWMutex struct {
client *v3.Client
ctx context.Context
s *concurrency.Session
ctx context.Context
key string
myKey *EphemeralKV
}
func NewRWMutex(client *v3.Client, key string) *RWMutex {
return &RWMutex{client, context.TODO(), key, nil}
func NewRWMutex(s *concurrency.Session, key string) *RWMutex {
return &RWMutex{s, context.TODO(), key, nil}
}
func (rwm *RWMutex) RLock() error {
rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/read")
client := rwm.s.Client()
rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/read")
if err != nil {
return err
}
@ -41,7 +44,7 @@ func (rwm *RWMutex) RLock() error {
// if there are nodes with "write-" and a lower
// revision number than us we must wait
resp, err := rwm.client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
resp, err := client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
if err != nil {
return err
}
@ -53,7 +56,9 @@ func (rwm *RWMutex) RLock() error {
}
func (rwm *RWMutex) Lock() error {
rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/write")
client := rwm.s.Client()
rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/write")
if err != nil {
return err
}
@ -62,7 +67,7 @@ func (rwm *RWMutex) Lock() error {
for {
// find any key of lower rev number blocks the write lock
opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
resp, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
resp, err := client.Get(rwm.ctx, rwm.key, opts...)
if err != nil {
return err
}
@ -80,15 +85,17 @@ func (rwm *RWMutex) Lock() error {
}
func (rwm *RWMutex) waitOnLowest() error {
client := rwm.s.Client()
// must block; get key before ek for waiting
opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
lastKey, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
lastKey, err := client.Get(rwm.ctx, rwm.key, opts...)
if err != nil {
return err
}
// wait for release on prior key
_, err = WaitEvents(
rwm.client,
client,
string(lastKey.Kvs[0].Key),
rwm.myKey.Revision(),
[]mvccpb.Event_EventType{mvccpb.DELETE})

View File

@ -64,7 +64,11 @@ func electCommandFunc(cmd *cobra.Command, args []string) {
}
func observe(c *clientv3.Client, election string) error {
e := concurrency.NewElection(c, election)
s, err := concurrency.NewSession(c)
if err != nil {
return err
}
e := concurrency.NewElection(s, election)
ctx, cancel := context.WithCancel(context.TODO())
donec := make(chan struct{})
@ -94,7 +98,11 @@ func observe(c *clientv3.Client, election string) error {
}
func campaign(c *clientv3.Client, election string, prop string) error {
e := concurrency.NewElection(c, election)
s, err := concurrency.NewSession(c)
if err != nil {
return err
}
e := concurrency.NewElection(s, election)
ctx, cancel := context.WithCancel(context.TODO())
donec := make(chan struct{})
@ -111,7 +119,7 @@ func campaign(c *clientv3.Client, election string, prop string) error {
return serr
}
if err := e.Campaign(ctx, prop); err != nil {
if err = e.Campaign(ctx, prop); err != nil {
return err
}

View File

@ -46,7 +46,12 @@ func lockCommandFunc(cmd *cobra.Command, args []string) {
}
func lockUntilSignal(c *clientv3.Client, lockname string) error {
m := concurrency.NewMutex(c, lockname)
s, err := concurrency.NewSession(c)
if err != nil {
return err
}
m := concurrency.NewMutex(s, lockname)
ctx, cancel := context.WithCancel(context.TODO())
// unlock in case of ordinary shutdown

View File

@ -25,15 +25,25 @@ import (
func TestDoubleBarrier(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer dropSessionLease(clus)
waiters := 10
session, err := concurrency.NewSession(clus.RandClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
b := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters)
b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
donec := make(chan struct{})
for i := 0; i < waiters-1; i++ {
go func() {
bb := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters)
session, err := concurrency.NewSession(clus.RandClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
if err := bb.Enter(); err != nil {
t.Fatalf("could not enter on barrier (%v)", err)
}
@ -86,15 +96,25 @@ func TestDoubleBarrier(t *testing.T) {
func TestDoubleBarrierFailover(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer dropSessionLease(clus)
waiters := 10
donec := make(chan struct{})
s0, err := concurrency.NewSession(clus.clients[0])
if err != nil {
t.Error(err)
}
defer s0.Orphan()
s1, err := concurrency.NewSession(clus.clients[0])
if err != nil {
t.Error(err)
}
defer s1.Orphan()
// sacrificial barrier holder; lease will be revoked
go func() {
b := recipe.NewDoubleBarrier(clus.clients[0], "test-barrier", waiters)
if err := b.Enter(); err != nil {
b := recipe.NewDoubleBarrier(s0, "test-barrier", waiters)
if err = b.Enter(); err != nil {
t.Fatalf("could not enter on barrier (%v)", err)
}
donec <- struct{}{}
@ -102,8 +122,8 @@ func TestDoubleBarrierFailover(t *testing.T) {
for i := 0; i < waiters-1; i++ {
go func() {
b := recipe.NewDoubleBarrier(clus.clients[1], "test-barrier", waiters)
if err := b.Enter(); err != nil {
b := recipe.NewDoubleBarrier(s1, "test-barrier", waiters)
if err = b.Enter(); err != nil {
t.Fatalf("could not enter on barrier (%v)", err)
}
donec <- struct{}{}
@ -120,12 +140,8 @@ func TestDoubleBarrierFailover(t *testing.T) {
t.Fatalf("timed out waiting for enter, %d", i)
}
}
// kill lease, expect Leave unblock
s, err := concurrency.NewSession(clus.clients[0])
if err != nil {
t.Fatal(err)
}
if err = s.Close(); err != nil {
if err = s0.Close(); err != nil {
t.Fatal(err)
}
// join on rest of waiters
@ -137,10 +153,3 @@ func TestDoubleBarrierFailover(t *testing.T) {
}
}
}
func dropSessionLease(clus *ClusterV3) {
for _, client := range clus.clients {
s, _ := concurrency.NewSession(client)
s.Orphan()
}
}

View File

@ -28,7 +28,6 @@ import (
func TestElectionWait(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer dropSessionLease(clus)
leaders := 3
followers := 3
@ -44,7 +43,12 @@ func TestElectionWait(t *testing.T) {
nextc = append(nextc, make(chan struct{}))
go func(ch chan struct{}) {
for j := 0; j < leaders; j++ {
b := concurrency.NewElection(newClient(), "test-election")
session, err := concurrency.NewSession(newClient())
if err != nil {
t.Error(err)
}
b := concurrency.NewElection(session, "test-election")
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
s, ok := <-b.Observe(cctx)
@ -54,6 +58,7 @@ func TestElectionWait(t *testing.T) {
electedc <- string(s.Kvs[0].Value)
// wait for next election round
<-ch
session.Orphan()
}
donec <- struct{}{}
}(nextc[i])
@ -62,7 +67,13 @@ func TestElectionWait(t *testing.T) {
// elect some leaders
for i := 0; i < leaders; i++ {
go func() {
e := concurrency.NewElection(newClient(), "test-election")
session, err := concurrency.NewSession(newClient())
if err != nil {
t.Error(err)
}
defer session.Orphan()
e := concurrency.NewElection(session, "test-election")
ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
if err := e.Campaign(context.TODO(), ev); err != nil {
t.Fatalf("failed volunteer (%v)", err)
@ -97,13 +108,23 @@ func TestElectionWait(t *testing.T) {
func TestElectionFailover(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
defer dropSessionLease(clus)
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ss := make([]*concurrency.Session, 3, 3)
for i := 0; i < 3; i++ {
var err error
ss[i], err = concurrency.NewSession(clus.clients[i])
if err != nil {
t.Error(err)
}
defer ss[i].Orphan()
}
// first leader (elected)
e := concurrency.NewElection(clus.clients[0], "test-election")
e := concurrency.NewElection(ss[0], "test-election")
if err := e.Campaign(context.TODO(), "foo"); err != nil {
t.Fatalf("failed volunteer (%v)", err)
}
@ -121,7 +142,7 @@ func TestElectionFailover(t *testing.T) {
// next leader
electedc := make(chan struct{})
go func() {
ee := concurrency.NewElection(clus.clients[1], "test-election")
ee := concurrency.NewElection(ss[1], "test-election")
if eer := ee.Campaign(context.TODO(), "bar"); eer != nil {
t.Fatal(eer)
}
@ -129,16 +150,12 @@ func TestElectionFailover(t *testing.T) {
}()
// invoke leader failover
session, serr := concurrency.NewSession(clus.clients[0])
if serr != nil {
t.Fatal(serr)
}
if err := session.Close(); err != nil {
if err := ss[0].Close(); err != nil {
t.Fatal(err)
}
// check new leader
e = concurrency.NewElection(clus.clients[2], "test-election")
e = concurrency.NewElection(ss[2], "test-election")
resp, ok = <-e.Observe(cctx)
if !ok {
t.Fatalf("could not wait for second election; channel closed")
@ -159,11 +176,17 @@ func TestElectionSessionRecampaign(t *testing.T) {
defer clus.Terminate(t)
cli := clus.RandClient()
e := concurrency.NewElection(cli, "test-elect")
session, err := concurrency.NewSession(cli)
if err != nil {
t.Error(err)
}
defer session.Orphan()
e := concurrency.NewElection(session, "test-elect")
if err := e.Campaign(context.TODO(), "abc"); err != nil {
t.Fatal(err)
}
e2 := concurrency.NewElection(cli, "test-elect")
e2 := concurrency.NewElection(session, "test-elect")
if err := e2.Campaign(context.TODO(), "def"); err != nil {
t.Fatal(err)
}

View File

@ -49,7 +49,11 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
lockedC := make(chan *concurrency.Mutex)
for i := 0; i < waiters; i++ {
go func() {
m := concurrency.NewMutex(chooseClient(), "test-mutex")
session, err := concurrency.NewSession(chooseClient())
if err != nil {
t.Error(err)
}
m := concurrency.NewMutex(session, "test-mutex")
if err := m.Lock(context.TODO()); err != nil {
t.Fatalf("could not wait on lock (%v)", err)
}
@ -81,12 +85,17 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
func TestMutexSessionRelock(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
cli := clus.RandClient()
m := concurrency.NewMutex(cli, "test-mutex")
session, err := concurrency.NewSession(clus.RandClient())
if err != nil {
t.Error(err)
}
m := concurrency.NewMutex(session, "test-mutex")
if err := m.Lock(context.TODO()); err != nil {
t.Fatal(err)
}
m2 := concurrency.NewMutex(cli, "test-mutex")
m2 := concurrency.NewMutex(session, "test-mutex")
if err := m2.Lock(context.TODO()); err != nil {
t.Fatal(err)
}
@ -119,7 +128,11 @@ func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client
wlockedC := make(chan *recipe.RWMutex, 1)
for i := 0; i < waiters; i++ {
go func() {
rwm := recipe.NewRWMutex(chooseClient(), "test-rwmutex")
session, err := concurrency.NewSession(chooseClient())
if err != nil {
t.Error(err)
}
rwm := recipe.NewRWMutex(session, "test-rwmutex")
if rand.Intn(1) == 0 {
if err := rwm.RLock(); err != nil {
t.Fatalf("could not rlock (%v)", err)

View File

@ -145,7 +145,11 @@ func doSTM(ctx context.Context, client *v3.Client, requests <-chan stmApply) {
var m *v3sync.Mutex
if stmMutex {
m = v3sync.NewMutex(client, "stmlock")
s, err := v3sync.NewSession(client)
if err != nil {
panic(err)
}
m = v3sync.NewMutex(s, "stmlock")
}
for applyf := range requests {

View File

@ -66,7 +66,17 @@ func runElection(eps []string, rounds int) {
validateWaiters := 0
rcs[i].c = randClient(eps)
e := concurrency.NewElection(rcs[i].c, "electors")
var (
s *concurrency.Session
err error
)
for {
s, err = concurrency.NewSession(rcs[i].c)
if err == nil {
break
}
}
e := concurrency.NewElection(s, "electors")
rcs[i].acquire = func() error {
<-releasec
@ -79,7 +89,7 @@ func runElection(eps []string, rounds int) {
}
}
}()
err := e.Campaign(ctx, v)
err = e.Campaign(ctx, v)
if err == nil {
observedLeader = v
}
@ -173,7 +183,17 @@ func runRacer(eps []string, round int) {
cnt := 0
for i := range rcs {
rcs[i].c = randClient(eps)
m := concurrency.NewMutex(rcs[i].c, "racers")
var (
s *concurrency.Session
err error
)
for {
s, err = concurrency.NewSession(rcs[i].c)
if err == nil {
break
}
}
m := concurrency.NewMutex(s, "racers")
rcs[i].acquire = func() error { return m.Lock(ctx) }
rcs[i].validate = func() error {
if cnt++; cnt != 1 {