etcdserver: detect corrupted member based on quorum

When the leader detects data inconsistency by comparing hashes,
currently it assumes that the follower is the corrupted member.
It isn't correct, the leader might be the corrupted member as well.

We should depend on quorum to identify the corrupted member.
For example, for 3 member cluster, if 2 members have the same hash,
the the member with different hash is the corrupted one. For 5 member
cluster, if 3 members have the same same, the corrupted member is one
of the left two members; it's also possible that both the left members
are corrupted.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
Benjamin Wang 2022-11-23 13:11:21 +08:00
parent cdb9b8b2a0
commit 8b98fee9ce
3 changed files with 179 additions and 42 deletions

View File

@ -14,7 +14,10 @@
package types
import "strconv"
import (
"bytes"
"strconv"
)
// ID represents a generic identifier which is canonically
// stored as a uint64 but is typically represented as a
@ -37,3 +40,17 @@ type IDSlice []ID
func (p IDSlice) Len() int { return len(p) }
func (p IDSlice) Less(i, j int) bool { return uint64(p[i]) < uint64(p[j]) }
func (p IDSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p IDSlice) String() string {
var b bytes.Buffer
if p.Len() > 0 {
b.WriteString(p[0].String())
}
for i := 1; i < p.Len(); i++ {
b.WriteString(",")
b.WriteString(p[i].String())
}
return b.String()
}

View File

@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"sort"
"strings"
@ -258,57 +259,157 @@ func (cm *corruptionChecker) CompactHashCheck() {
)
hashes := cm.uncheckedRevisions()
// Assume that revisions are ordered from largest to smallest
for i, hash := range hashes {
for _, hash := range hashes {
peers := cm.hasher.PeerHashByRev(hash.Revision)
if len(peers) == 0 {
continue
}
peersChecked := 0
for _, p := range peers {
if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision {
continue
}
// follower's compact revision is leader's old one, then hashes must match
if p.resp.Hash != hash.Hash {
cm.hasher.TriggerCorruptAlarm(p.id)
cm.lg.Error("failed compaction hash check",
zap.Int64("revision", hash.Revision),
zap.Int64("leader-compact-revision", hash.CompactRevision),
zap.Uint32("leader-hash", hash.Hash),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", p.id.String()),
)
return
}
peersChecked++
cm.lg.Info("successfully checked hash on follower",
zap.Int64("revision", hash.Revision),
zap.String("peer-id", p.id.String()),
)
if cm.checkPeerHashes(hash, peers) {
return
}
if len(peers) == peersChecked {
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
}
// check peers hash and raise alarms if detected corruption.
// return a bool indicate whether to check next hash.
//
// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash
// false: skipped some members, so need to check next hash
func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool {
leaderId := cm.hasher.MemberId()
hashMap := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}}
peersChecked := 0
// group all peers by hash
for _, peer := range peers {
skipped := false
reason := ""
if peer.resp == nil {
skipped = true
reason = "no response"
} else if peer.resp.CompactRevision != leaderHash.CompactRevision {
skipped = true
reason = fmt.Sprintf("the peer's CompactRevision %d doesn't match leader's CompactRevision %d",
peer.resp.CompactRevision, leaderHash.CompactRevision)
}
if skipped {
cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)),
zap.String("leader-id", leaderId.String()),
zap.String("peer-id", peer.id.String()),
zap.String("reason", reason))
continue
}
peersChecked++
if ids, ok := hashMap[peer.resp.Hash]; !ok {
hashMap[peer.resp.Hash] = []types.ID{peer.id}
} else {
ids = append(ids, peer.id)
hashMap[peer.resp.Hash] = ids
}
}
// All members have the same CompactRevision and Hash.
if len(hashMap) == 1 {
if peersChecked == len(peers) {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int64("revision", hash.Revision),
zap.Int64("revision", leaderHash.Revision),
zap.Int64("compactRevision", leaderHash.CompactRevision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
if leaderHash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = leaderHash.Revision
}
cm.mux.Unlock()
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
return
return true
}
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", len(peers)),
zap.Int64("revision", hash.Revision),
zap.Int64("revision", leaderHash.Revision),
zap.Int64("compactRevision", leaderHash.CompactRevision),
)
// The only case which needs to check next hash
return false
}
// Detected hashes mismatch
// The first step is to figure out the majority with the same hash.
memberCnt := len(peers) + 1
quorum := memberCnt/2 + 1
quorumExist := false
for k, v := range hashMap {
if len(v) >= quorum {
quorumExist = true
// remove the majority, and we might raise alarms for the left members.
delete(hashMap, k)
break
}
}
if !quorumExist {
// If quorumExist doesn't exist, then only raise alarm for the least minority
cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority",
zap.String("leader-id", leaderId.String()),
zap.Int64("leader-revision", leaderHash.Revision),
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
zap.Uint32("leader-hash", leaderHash.Hash),
)
// find out the members which are the least minority
var (
minCnt int = math.MaxInt
hashVal uint32
memberIDs types.IDSlice
)
for k, v := range hashMap {
if v.Len() < minCnt {
minCnt = v.Len()
hashVal = k
memberIDs = v
}
}
for _, pid := range memberIDs {
cm.hasher.TriggerCorruptAlarm(pid)
}
delete(hashMap, hashVal)
cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority",
zap.String("leader-id", leaderId.String()),
zap.Int64("leader-revision", leaderHash.Revision),
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
zap.Uint32("leader-hash", leaderHash.Hash),
zap.Uint32("peer-hash", hashVal),
zap.String("peer-ids", memberIDs.String()),
)
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
// Raise alarm for the left members if the quorum is present.
// But we should always generate error log for debugging.
for k, v := range hashMap {
for _, pid := range v {
if quorumExist {
cm.hasher.TriggerCorruptAlarm(pid)
}
}
cm.lg.Error("Detected compaction hash mismatch",
zap.String("leader-id", leaderId.String()),
zap.Int64("leader-revision", leaderHash.Revision),
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
zap.Uint32("leader-hash", leaderHash.Hash),
zap.Uint32("peer-hash", k),
zap.String("peer-ids", v.String()),
zap.Bool("alarm-raised", quorumExist),
)
}
return true
}
func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash {

View File

@ -251,7 +251,7 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
},
{
name: "Peer returned different compaction revision is skipped",
@ -259,15 +259,34 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
},
{
name: "Peer returned same compaction revision but different hash triggers alarm",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}},
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(42)"},
expectCorrupt: true,
},
{
name: "Peer returned same compaction revision but different hash triggers alarm for the least minority",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 46}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}},
{peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(46)"},
expectCorrupt: true,
},
{
@ -276,7 +295,7 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()"},
expectLastRevisionChecked: 2,
},
{
@ -288,7 +307,7 @@ func TestCompactHashCheck(t *testing.T) {
{err: fmt.Errorf("failed getting hash")},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"},
},
}
for _, tc := range tcs {