From 9f829fdab7c3470dedd5c9eccfa3a27fb9415453 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 13 Sep 2016 13:56:44 -0700 Subject: [PATCH] recipes: fix rwmutex so locking works Fixes #6408 --- contrib/recipes/rwmutex.go | 70 ++++++++++++++------------------------ 1 file changed, 26 insertions(+), 44 deletions(-) diff --git a/contrib/recipes/rwmutex.go b/contrib/recipes/rwmutex.go index 87b237cca..8a03307ed 100644 --- a/contrib/recipes/rwmutex.go +++ b/contrib/recipes/rwmutex.go @@ -25,81 +25,63 @@ type RWMutex struct { s *concurrency.Session ctx context.Context - key string + pfx string myKey *EphemeralKV } -func NewRWMutex(s *concurrency.Session, key string) *RWMutex { - return &RWMutex{s, context.TODO(), key, nil} +func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex { + return &RWMutex{s, context.TODO(), prefix + "/", nil} } func (rwm *RWMutex) RLock() error { - client := rwm.s.Client() - - rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/read") + rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"read") if err != nil { return err } rwm.myKey = rk - - // if there are nodes with "write-" and a lower - // revision number than us we must wait - resp, err := client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...) - if err != nil { - return err + // wait until nodes with "write-" and a lower revision number than myKey are gone + for { + if done, werr := rwm.waitOnLastRev(rwm.pfx + "write"); done || werr != nil { + return werr + } } - if len(resp.Kvs) == 0 || resp.Kvs[0].ModRevision > rk.Revision() { - // no blocking since no write key - return nil - } - return rwm.waitOnLowest() } func (rwm *RWMutex) Lock() error { - client := rwm.s.Client() - - rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/write") + rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"write") if err != nil { return err } rwm.myKey = rk - + // wait until all keys of lower revision than myKey are gone for { - // find any key of lower rev number blocks the write lock - opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1)) - resp, err := client.Get(rwm.ctx, rwm.key, opts...) - if err != nil { - return err + if done, werr := rwm.waitOnLastRev(rwm.pfx); done || werr != nil { + return werr } - if len(resp.Kvs) == 0 { - // no matching for revision before myKey; acquired - break - } - if err := rwm.waitOnLowest(); err != nil { - return err - } - // get the new lowest, etc until this is the only one left + // get the new lowest key until this is the only one left } - - return nil } -func (rwm *RWMutex) waitOnLowest() error { +// waitOnLowest will wait on the last key with a revision < rwm.myKey.Revision with a +// given prefix. If there are no keys left to wait on, return true. +func (rwm *RWMutex) waitOnLastRev(pfx string) (bool, error) { client := rwm.s.Client() - - // must block; get key before ek for waiting - opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1)) - lastKey, err := client.Get(rwm.ctx, rwm.key, opts...) + // get key that's blocking myKey + opts := append(v3.WithLastRev(), v3.WithMaxModRev(rwm.myKey.Revision()-1)) + lastKey, err := client.Get(rwm.ctx, pfx, opts...) if err != nil { - return err + return false, err } - // wait for release on prior key + if len(lastKey.Kvs) == 0 { + return true, nil + } + // wait for release on blocking key _, err = WaitEvents( client, string(lastKey.Kvs[0].Key), rwm.myKey.Revision(), []mvccpb.Event_EventType{mvccpb.DELETE}) - return err + return false, err } func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }