Merge pull request #6715 from fanminshi/lease_hash_fix

Lease hash fix
This commit is contained in:
Xiang Li 2016-10-25 16:28:11 -07:00 committed by GitHub
commit 12e4dfa9c4
2 changed files with 85 additions and 26 deletions

View File

@ -16,12 +16,18 @@ package main
import ( import (
"fmt" "fmt"
"reflect"
"strings" "strings"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const (
retries = 7
stabilizationPeriod = 3 * time.Second
)
type Checker interface { type Checker interface {
// Check returns an error if the system fails a consistency check. // Check returns an error if the system fails a consistency check.
Check() error Check() error
@ -39,41 +45,94 @@ func newHashChecker(hrg hashAndRevGetter) Checker { return &hashChecker{hrg} }
const leaseCheckerTimeout = 10 * time.Second const leaseCheckerTimeout = 10 * time.Second
func (hc *hashChecker) Check() (err error) { func (hc *hashChecker) checkRevAndHashes() (err error) {
plog.Printf("fetching current revisions...") // retries in case of transient failure or etcd nodes have not stablized yet.
var ( var (
revs map[string]int64 revsStable bool
hashes map[string]int64 hashesStable bool
ok bool
) )
// retry in case of transient failure for i := 0; i < retries; i++ {
for i := 0; i < 3; i++ { revsStable, err = hc.areRevisonsStable()
revs, hashes, err = hc.hrg.getRevisionHash() if err != nil || !revsStable {
if err != nil {
plog.Printf("#%d failed to get current revisions (%v)", i, err)
continue continue
} }
if _, ok = getSameValue(revs); ok { hashesStable, err = hc.areHashesStable()
break if err != nil || !hashesStable {
continue
} }
// hashes must be stable at this point
plog.Printf("#%d inconsistent current revisions %+v", i, revs)
time.Sleep(time.Second)
}
if !ok || err != nil {
return fmt.Errorf("checking current revisions failed [err: %v, revisions: %v]", err, revs)
}
plog.Printf("all members are consistent with current revisions [revisions: %v]", revs)
plog.Printf("checking current storage hashes...")
if _, ok = getSameValue(hashes); !ok {
return fmt.Errorf("inconsistent hashes [%v]", hashes)
}
plog.Printf("all members are consistent with storage hashes")
return nil return nil
} }
if err != nil {
return err
}
if !revsStable || !hashesStable {
return fmt.Errorf("checkRevAndHashes detects inconsistency: [revisions stable %v] [hashes stable %v]", revsStable, hashesStable)
}
return err
}
func (hc *hashChecker) areRevisonsStable() (rv bool, err error) {
var preRevs map[string]int64
for i := 0; i < 2; i++ {
revs, _, err := hc.hrg.getRevisionHash()
if err != nil {
return false, err
}
_, sameRev := getSameValue(revs)
if !sameRev {
plog.Printf("current revisions are not consistent: revisions [revisions: %v]", revs)
return false, nil
}
// sleep for N seconds. after that, check to make sure that revisions don't change
if i == 0 {
preRevs = revs
time.Sleep(stabilizationPeriod)
} else if !reflect.DeepEqual(revs, preRevs) {
// use map comparison logic found in http://stackoverflow.com/questions/18208394/testing-equivalence-of-maps-golang
plog.Printf("revisions are not stable: [current revisions: %v] [previous revisions: %v]", revs, preRevs)
return false, nil
}
}
plog.Printf("revisions are stable: revisions [revisions: %v]", preRevs)
return true, nil
}
func (hc *hashChecker) areHashesStable() (rv bool, err error) {
var prevHashes map[string]int64
for i := 0; i < 2; i++ {
revs, hashes, err := hc.hrg.getRevisionHash()
if err != nil {
return false, err
}
_, sameRev := getSameValue(revs)
_, sameHashes := getSameValue(hashes)
if !sameRev || !sameHashes {
plog.Printf("hashes are not stable: revisions [revisions: %v] and hashes [hashes: %v]", revs, hashes)
return false, nil
}
// sleep for N seconds. after that, check to make sure that the hashes and revisions don't change
if i == 0 {
time.Sleep(stabilizationPeriod)
prevHashes = hashes
} else if !reflect.DeepEqual(hashes, prevHashes) {
// use map comparison logic found in http://stackoverflow.com/questions/18208394/testing-equivalence-of-maps-golang
plog.Printf("hashes are not stable: [current hashes: %v] [previous hashes: %v]", hashes, prevHashes)
return false, nil
}
}
plog.Printf("hashes are stable: hashes [hashes: %v]", prevHashes)
return true, nil
}
func (hc *hashChecker) Check() error {
return hc.checkRevAndHashes()
}
type leaseChecker struct { type leaseChecker struct {
leaseStressers []Stresser leaseStressers []Stresser
} }

View File

@ -211,13 +211,14 @@ func (ls *leaseStresser) createLeases() {
defer wg.Done() defer wg.Done()
leaseID, err := ls.createLease() leaseID, err := ls.createLease()
if err != nil { if err != nil {
plog.Errorf("lease creation error: (%v)", err) plog.Debugf("lease creation error: (%v)", err)
return return
} }
plog.Debugf("lease %v created", leaseID) plog.Debugf("lease %v created", leaseID)
// if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map // if attaching keys to the lease encountered an error, we don't add the lease to the aliveLeases map
// because invariant check on the lease will fail due to keys not found // because invariant check on the lease will fail due to keys not found
if err := ls.attachKeysWithLease(leaseID); err != nil { if err := ls.attachKeysWithLease(leaseID); err != nil {
plog.Debugf("unable to attach keys to lease %d error (%v)", leaseID, err)
return return
} }
ls.aliveLeases.add(leaseID, time.Now()) ls.aliveLeases.add(leaseID, time.Now())
@ -239,6 +240,7 @@ func (ls *leaseStresser) randomlyDropLeases() {
// if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
// becasue we can't tell whether the lease is dropped or not. // becasue we can't tell whether the lease is dropped or not.
if err != nil { if err != nil {
plog.Debugf("drop lease %v has failed error (%v)", leaseID, err)
ls.aliveLeases.remove(leaseID) ls.aliveLeases.remove(leaseID)
return return
} }
@ -271,7 +273,6 @@ func (ls *leaseStresser) hasLeaseExpired(ctx context.Context, leaseID int64) (bo
// Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix // Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
// determines whether the attached keys for a given leaseID has been deleted or not // determines whether the attached keys for a given leaseID has been deleted or not
func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { func (ls *leaseStresser) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
// plog.Infof("retriving keys attached to lease %v", leaseID)
resp, err := ls.kvc.Range(ctx, &pb.RangeRequest{ resp, err := ls.kvc.Range(ctx, &pb.RangeRequest{
Key: []byte(fmt.Sprintf("%d", leaseID)), Key: []byte(fmt.Sprintf("%d", leaseID)),
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
@ -368,7 +369,6 @@ func (ls *leaseStresser) attachKeysWithLease(leaseID int64) error {
return err return err
} }
} }
return ls.ctx.Err() return ls.ctx.Err()
} }