etcd-runner: remove mutex on validate() and release() in global.go

election runner can deadlock in atomic release().

suppose election runner has two clients A and B.
if A is a leader and B is a follower, B obtains lock
for release() and waits for A to close(nextc) which signal
next round is ready. However, A can only close(nextc) if it
obtains lock for release(); hence deadlock.

this pr removes atomicity of validate() and release() in global.go
and gives the responsibility of locking to each runner.

FIXES #7891
This commit is contained in:
fanmin shi 2017-05-09 13:45:11 -07:00
parent db6f45e939
commit 87d99fe038
3 changed files with 9 additions and 9 deletions

View File

@ -89,14 +89,14 @@ func runElectionFunc(cmd *cobra.Command, args []string) {
} }
}() }()
err = e.Campaign(ctx, v) err = e.Campaign(ctx, v)
cancel()
<-donec
if err == nil { if err == nil {
observedLeader = v observedLeader = v
} }
if observedLeader == v { if observedLeader == v {
validateWaiters = len(rcs) validateWaiters = len(rcs)
} }
cancel()
<-donec
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil

View File

@ -56,7 +56,6 @@ func newClient(eps []string, timeout time.Duration) *clientv3.Client {
} }
func doRounds(rcs []roundClient, rounds int, requests int) { func doRounds(rcs []roundClient, rounds int, requests int) {
var mu sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(rcs)) wg.Add(len(rcs))
@ -73,22 +72,16 @@ func doRounds(rcs []roundClient, rounds int, requests int) {
for rc.acquire() != nil { /* spin */ for rc.acquire() != nil { /* spin */
} }
mu.Lock()
if err := rc.validate(); err != nil { if err := rc.validate(); err != nil {
log.Fatal(err) log.Fatal(err)
} }
mu.Unlock()
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
rc.progress++ rc.progress++
finished <- struct{}{} finished <- struct{}{}
mu.Lock()
for rc.release() != nil { /* spin */ for rc.release() != nil { /* spin */
mu.Unlock()
mu.Lock()
} }
mu.Unlock()
} }
}(&rcs[i]) }(&rcs[i])
} }

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/clientv3/concurrency"
@ -47,6 +48,8 @@ func runRacerFunc(cmd *cobra.Command, args []string) {
rcs := make([]roundClient, totalClientConnections) rcs := make([]roundClient, totalClientConnections)
ctx := context.Background() ctx := context.Background()
// mu ensures validate and release funcs are atomic.
var mu sync.Mutex
cnt := 0 cnt := 0
eps := endpointsFromFlag(cmd) eps := endpointsFromFlag(cmd)
@ -69,12 +72,16 @@ func runRacerFunc(cmd *cobra.Command, args []string) {
m := concurrency.NewMutex(s, racers) m := concurrency.NewMutex(s, racers)
rcs[i].acquire = func() error { return m.Lock(ctx) } rcs[i].acquire = func() error { return m.Lock(ctx) }
rcs[i].validate = func() error { rcs[i].validate = func() error {
mu.Lock()
defer mu.Unlock()
if cnt++; cnt != 1 { if cnt++; cnt != 1 {
return fmt.Errorf("bad lock; count: %d", cnt) return fmt.Errorf("bad lock; count: %d", cnt)
} }
return nil return nil
} }
rcs[i].release = func() error { rcs[i].release = func() error {
mu.Lock()
defer mu.Unlock()
if err := m.Unlock(ctx); err != nil { if err := m.Unlock(ctx); err != nil {
return err return err
} }