etcdserver: CheckInitialHashKV when "InitialCorruptCheck==true"

etcdserver: only compare hash values if any

It's possible that peer has higher revision than local node.
In such case, hashes will still be different on requested
revision, but peer's header revision is greater.

etcdserver: count mismatch only when compact revisions are same

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyu-Ho Lee 2017-11-22 14:57:07 -08:00
parent 1f38f1fddb
commit e0dfc4368f

View File

@ -16,14 +16,61 @@ package etcdserver
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/pkg/types"
)
// CheckInitialHashKV compares initial hash values with its peers
// before serving any peer/client traffic. Only mismatch when hashes
// are different at requested revision, with same compact revision.
func (s *EtcdServer) CheckInitialHashKV() error {
if !s.Cfg.InitialCorruptCheck {
return nil
}
plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
h, rev, crev, err := s.kv.HashByRev(0)
if err != nil {
return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
}
peers := s.getPeerHashKVs(rev)
mismatch := 0
for _, p := range peers {
if p.resp != nil {
peerID := types.ID(p.resp.Header.MemberId)
if h != p.resp.Hash {
if crev == p.resp.CompactRevision {
plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
mismatch++
} else {
plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
}
}
continue
}
if p.err != nil {
switch p.err {
case rpctypes.ErrFutureRev:
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
case rpctypes.ErrCompacted:
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
}
}
}
if mismatch > 0 {
return fmt.Errorf("%s found data inconsistency with peers", s.ID())
}
plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
return nil
}
func (s *EtcdServer) monitorKVHash() {
t := s.Cfg.CorruptCheckTime
if t == 0 {
@ -50,7 +97,7 @@ func (s *EtcdServer) checkHashKV() error {
if err != nil {
plog.Fatalf("failed to hash kv store (%v)", err)
}
resps := s.getPeerHashKVs(rev)
peers := s.getPeerHashKVs(rev)
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
err = s.linearizableReadNotify(ctx)
@ -86,24 +133,27 @@ func (s *EtcdServer) checkHashKV() error {
mismatch(uint64(s.ID()))
}
for _, resp := range resps {
id := resp.Header.MemberId
for _, p := range peers {
if p.resp == nil {
continue
}
id := p.resp.Header.MemberId
// leader expects follower's latest revision less than or equal to leader's
if resp.Header.Revision > rev2 {
if p.resp.Header.Revision > rev2 {
plog.Warningf(
"revision %d from member %v, expected at most %d",
resp.Header.Revision,
p.resp.Header.Revision,
types.ID(id),
rev2)
mismatch(id)
}
// leader expects follower's latest compact revision less than or equal to leader's
if resp.CompactRevision > crev2 {
if p.resp.CompactRevision > crev2 {
plog.Warningf(
"compact revision %d from member %v, expected at most %d",
resp.CompactRevision,
p.resp.CompactRevision,
types.ID(id),
crev2,
)
@ -111,10 +161,10 @@ func (s *EtcdServer) checkHashKV() error {
}
// follower's compact revision is leader's old one, then hashes must match
if resp.CompactRevision == crev && resp.Hash != h {
if p.resp.CompactRevision == crev && p.resp.Hash != h {
plog.Warningf(
"hash %d at revision %d from member %v, expected hash %d",
resp.Hash,
p.resp.Hash,
rev,
types.ID(id),
h,
@ -125,36 +175,53 @@ func (s *EtcdServer) checkHashKV() error {
return nil
}
func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) {
for _, m := range s.cluster.Members() {
type peerHashKVResp struct {
resp *clientv3.HashKVResponse
err error
eps []string
}
func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
// TODO: handle the case when "s.cluster.Members" have not
// been populated (e.g. no snapshot to load from disk)
mbs := s.cluster.Members()
pURLs := make([][]string, len(mbs))
for _, m := range mbs {
if m.ID == s.ID() {
continue
}
pURLs = append(pURLs, m.PeerURLs)
}
for _, purls := range pURLs {
if len(purls) == 0 {
continue
}
cli, cerr := clientv3.New(clientv3.Config{
DialTimeout: s.Cfg.ReqTimeout(),
Endpoints: m.PeerURLs,
Endpoints: purls,
})
if cerr != nil {
plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error())
plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error())
continue
}
respsLen := len(resps)
for _, c := range cli.Endpoints() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
resp, herr := cli.HashKV(ctx, c, rev)
var resp *clientv3.HashKVResponse
resp, cerr = cli.HashKV(ctx, c, rev)
cancel()
if herr == nil {
cerr = herr
resps = append(resps, resp)
if cerr == nil {
resps = append(resps, &peerHashKVResp{resp: resp})
break
}
plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
}
cli.Close()
if respsLen == len(resps) {
plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr)
resps = append(resps, &peerHashKVResp{err: cerr, eps: purls})
}
}
return resps