diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 306470b88..c9abc9bba 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -26,6 +26,7 @@ import ( // ErrLocked is returned by TryLock when Mutex is already locked by another session. var ErrLocked = errors.New("mutex: Locked by another session") +var ErrSessionExpired = errors.New("mutex: session is expired") // Mutex implements the sync Locker interface with etcd type Mutex struct { @@ -80,6 +81,7 @@ func (m *Mutex) Lock(ctx context.Context) error { } client := m.s.Client() // wait for deletion revisions prior to myKey + // TODO: early termination if the session key is deleted before other session keys with smaller revisions. hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { @@ -87,7 +89,20 @@ func (m *Mutex) Lock(ctx context.Context) error { } else { m.hdr = hdr } - return werr + + // make sure the session is not expired, and the owner key still exists. + gresp, werr := client.Get(ctx, m.myKey) + if werr != nil { + m.Unlock(client.Ctx()) + return werr + } + + if len(gresp.Kvs) == 0 { // is the session key lost? + return ErrSessionExpired + } + m.hdr = gresp.Header + + return nil } func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { diff --git a/clientv3/concurrency/mutex_test.go b/clientv3/concurrency/mutex_test.go new file mode 100644 index 000000000..545469fe6 --- /dev/null +++ b/clientv3/concurrency/mutex_test.go @@ -0,0 +1,71 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency_test + +import ( + "context" + "testing" + + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" +) + +func TestMutexLockSessionExpired(t *testing.T) { + cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + + // create two separate sessions for lock competition + s1, err := concurrency.NewSession(cli) + if err != nil { + t.Fatal(err) + } + defer s1.Close() + m1 := concurrency.NewMutex(s1, "/my-lock/") + + s2, err := concurrency.NewSession(cli) + if err != nil { + t.Fatal(err) + } + m2 := concurrency.NewMutex(s2, "/my-lock/") + + // acquire lock for s1 + if err := m1.Lock(context.TODO()); err != nil { + t.Fatal(err) + } + + m2Locked := make(chan struct{}) + var err2 error + go func() { + defer close(m2Locked) + // m2 blocks since m1 already acquired lock /my-lock/ + if err2 = m2.Lock(context.TODO()); err2 == nil { + t.Fatal("expect session expired error") + } + }() + + // revoke the session of m2 before unlock m1 + err = s2.Close() + if err != nil { + t.Fatal(err) + } + if err := m1.Unlock(context.TODO()); err != nil { + t.Fatal(err) + } + + <-m2Locked +}