Merge pull request #14828 from ahrtr/identify_corrupted_member_20221123

Identify corrupted member depending on quorum
This commit is contained in:
Benjamin Wang 2022-11-29 06:08:25 +08:00 committed by GitHub
commit cf171fdd1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 308 additions and 83 deletions

View File

@ -14,7 +14,10 @@
package types
import "strconv"
import (
"bytes"
"strconv"
)
// ID represents a generic identifier which is canonically
// stored as a uint64 but is typically represented as a
@ -37,3 +40,17 @@ type IDSlice []ID
func (p IDSlice) Len() int { return len(p) }
func (p IDSlice) Less(i, j int) bool { return uint64(p[i]) < uint64(p[j]) }
func (p IDSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p IDSlice) String() string {
var b bytes.Buffer
if p.Len() > 0 {
b.WriteString(p[0].String())
}
for i := 1; i < p.Len(); i++ {
b.WriteString(",")
b.WriteString(p[i].String())
}
return b.String()
}

View File

@ -251,6 +251,17 @@ func (cm *corruptionChecker) PeriodicCheck() error {
return nil
}
// CompactHashCheck is based on the fact that 'compactions' are coordinated
// between raft members and performed at the same revision. For each compacted
// revision there is KV store hash computed and saved for some time.
//
// This method communicates with peers to find a recent common revision across
// members, and raises alarm if 2 or more members at the same compact revision
// have different hashes.
//
// We might miss opportunity to perform the check if the compaction is still
// ongoing on one of the members or it was unresponsive. In such situation the
// method still passes without raising alarm.
func (cm *corruptionChecker) CompactHashCheck() {
cm.lg.Info("starting compact hash check",
zap.String("local-member-id", cm.hasher.MemberId().String()),
@ -258,59 +269,138 @@ func (cm *corruptionChecker) CompactHashCheck() {
)
hashes := cm.uncheckedRevisions()
// Assume that revisions are ordered from largest to smallest
for i, hash := range hashes {
for _, hash := range hashes {
peers := cm.hasher.PeerHashByRev(hash.Revision)
if len(peers) == 0 {
continue
}
peersChecked := 0
for _, p := range peers {
if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision {
continue
}
// follower's compact revision is leader's old one, then hashes must match
if p.resp.Hash != hash.Hash {
cm.hasher.TriggerCorruptAlarm(p.id)
cm.lg.Error("failed compaction hash check",
zap.Int64("revision", hash.Revision),
zap.Int64("leader-compact-revision", hash.CompactRevision),
zap.Uint32("leader-hash", hash.Hash),
zap.Int64("follower-compact-revision", p.resp.CompactRevision),
zap.Uint32("follower-hash", p.resp.Hash),
zap.String("follower-peer-id", p.id.String()),
)
return
}
peersChecked++
cm.lg.Info("successfully checked hash on follower",
zap.Int64("revision", hash.Revision),
zap.String("peer-id", p.id.String()),
)
}
if len(peers) == peersChecked {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int64("revision", hash.Revision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
}
cm.mux.Unlock()
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1))
if cm.checkPeerHashes(hash, peers) {
return
}
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", len(peers)),
zap.Int64("revision", hash.Revision),
)
}
cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes)))
return
}
// check peers hash and raise alarms if detected corruption.
// return a bool indicate whether to check next hash.
//
// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash
// false: skipped some members, so need to check next hash
func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool {
leaderId := cm.hasher.MemberId()
hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}}
peersChecked := 0
// group all peers by hash
for _, peer := range peers {
skipped := false
reason := ""
if peer.resp == nil {
skipped = true
reason = "no response"
} else if peer.resp.CompactRevision != leaderHash.CompactRevision {
skipped = true
reason = fmt.Sprintf("the peer's CompactRevision %d doesn't match leader's CompactRevision %d",
peer.resp.CompactRevision, leaderHash.CompactRevision)
}
if skipped {
cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)),
zap.String("leader-id", leaderId.String()),
zap.String("peer-id", peer.id.String()),
zap.String("reason", reason))
continue
}
peersChecked++
if ids, ok := hash2members[peer.resp.Hash]; !ok {
hash2members[peer.resp.Hash] = []types.ID{peer.id}
} else {
ids = append(ids, peer.id)
hash2members[peer.resp.Hash] = ids
}
}
// All members have the same CompactRevision and Hash.
if len(hash2members) == 1 {
return cm.handleConsistentHash(leaderHash, peersChecked, len(peers))
}
// Detected hashes mismatch
// The first step is to figure out the majority with the same hash.
memberCnt := len(peers) + 1
quorum := memberCnt/2 + 1
quorumExist := false
for k, v := range hash2members {
if len(v) >= quorum {
quorumExist = true
// remove the majority, and we might raise alarms for the left members.
delete(hash2members, k)
break
}
}
if !quorumExist {
// If quorum doesn't exist, we don't know which members data are
// corrupted. In such situation, we intentionally set the memberID
// as 0, it means it affects the whole cluster.
cm.lg.Error("Detected compaction hash mismatch but cannot identify the corrupted members, so intentionally set the memberID as 0",
zap.String("leader-id", leaderId.String()),
zap.Int64("leader-revision", leaderHash.Revision),
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
zap.Uint32("leader-hash", leaderHash.Hash),
)
cm.hasher.TriggerCorruptAlarm(0)
}
// Raise alarm for the left members if the quorum is present.
// But we should always generate error log for debugging.
for k, v := range hash2members {
if quorumExist {
for _, pid := range v {
cm.hasher.TriggerCorruptAlarm(pid)
}
}
cm.lg.Error("Detected compaction hash mismatch",
zap.String("leader-id", leaderId.String()),
zap.Int64("leader-revision", leaderHash.Revision),
zap.Int64("leader-compact-revision", leaderHash.CompactRevision),
zap.Uint32("leader-hash", leaderHash.Hash),
zap.Uint32("peer-hash", k),
zap.String("peer-ids", v.String()),
zap.Bool("quorum-exist", quorumExist),
)
}
return true
}
func (cm *corruptionChecker) handleConsistentHash(hash mvcc.KeyValueHash, peersChecked, peerCnt int) bool {
if peersChecked == peerCnt {
cm.lg.Info("successfully checked hash on whole cluster",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int64("revision", hash.Revision),
zap.Int64("compactRevision", hash.CompactRevision),
)
cm.mux.Lock()
if hash.Revision > cm.latestRevisionChecked {
cm.latestRevisionChecked = hash.Revision
}
cm.mux.Unlock()
return true
}
cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers",
zap.Int("number-of-peers-checked", peersChecked),
zap.Int("number-of-peers", peerCnt),
zap.Int64("revision", hash.Revision),
zap.Int64("compactRevision", hash.CompactRevision),
)
// The only case which needs to check next hash
return false
}
func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash {
cm.mux.RLock()
lastRevisionChecked := cm.latestRevisionChecked

View File

@ -251,7 +251,7 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}},
peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
},
{
name: "Peer returned different compaction revision is skipped",
@ -259,15 +259,112 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
},
{
name: "Peer returned same compaction revision but different hash triggers alarm",
name: "Etcd can identify two corrupted members in 5 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}},
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 7}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 8}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)", "TriggerCorruptAlarm(45)"},
expectCorrupt: true,
},
{
name: "Etcd checks next hash when one member is unresponsive in 3 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{err: fmt.Errorf("failed getting hash")},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"},
expectCorrupt: false,
},
{
name: "Etcd can identify single corrupted member in 3 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(43)"},
expectCorrupt: true,
},
{
name: "Etcd can identify single corrupted member in 5 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"},
expectCorrupt: true,
},
{
name: "Etcd triggers corrupted alarm on whole cluster if in 3 member cluster one member is down and one member corrupted",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{err: fmt.Errorf("failed getting hash")},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"},
expectCorrupt: true,
},
{
name: "Etcd triggers corrupted alarm on whole cluster if no quorum in 5 member cluster",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 46}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}},
{peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"},
expectCorrupt: true,
},
{
name: "Etcd can identify corrupted member in 5 member cluster even if one member is down",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
{err: fmt.Errorf("failed getting hash")},
{peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"},
expectCorrupt: true,
},
{
name: "Etcd can identify that leader is corrupted",
hasher: fakeHasher{
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}},
peerHashes: []*peerHashKVResp{
{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
{peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(1)"},
expectCorrupt: true,
},
{
@ -276,7 +373,7 @@ func TestCompactHashCheck(t *testing.T) {
hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}},
peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()"},
expectLastRevisionChecked: 2,
},
{
@ -288,7 +385,7 @@ func TestCompactHashCheck(t *testing.T) {
{err: fmt.Errorf("failed getting hash")},
},
},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"},
expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"},
},
}
for _, tc := range tcs {

View File

@ -93,30 +93,13 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
time.Sleep(50 * time.Millisecond)
leader := clus.WaitLeader(t)
// Get sorted member IDs
members, err := cc.MemberList(ctx)
assert.NoError(t, err, "error on member list %v")
// NOTE: If the corrupted member has been elected as leader, the
// alarm will show the smaller member.
var expectedID = uint64(clus.Members[0].ID())
if leader == 0 {
for _, m := range members.Members {
if m.Name != clus.Members[0].Name {
expectedID = m.ID
break
}
}
}
err = clus.Members[leader].Server.CorruptionChecker().PeriodicCheck()
assert.NoError(t, err, "error on periodic check")
time.Sleep(50 * time.Millisecond)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: expectedID}}, alarmResponse.Alarms)
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms)
}
func TestCompactHashCheck(t *testing.T) {
@ -186,26 +169,64 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
time.Sleep(50 * time.Millisecond)
leader := clus.WaitLeader(t)
// Get sorted member IDs
members, err := cc.MemberList(ctx)
assert.NoError(t, err, "error on member list %v")
clus.Members[leader].Server.CorruptionChecker().CompactHashCheck()
time.Sleep(50 * time.Millisecond)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms)
}
// NOTE: If the corrupted member has been elected as leader, the
// alarm will show the smaller member.
var expectedID = uint64(clus.Members[0].ID())
if leader == 0 {
for _, m := range members.Members {
if m.Name != clus.Members[0].Name {
expectedID = m.ID
break
}
}
func TestCompactHashCheckDetectMultipleCorruption(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 5})
defer clus.Terminate(t)
cc, err := clus.ClusterClient(t)
require.NoError(t, err)
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
}
clus.Members[0].Server.CorruptionChecker().CompactHashCheck()
clus.Members[0].Stop(t)
clus.Members[1].Server.CorruptionChecker().CompactHashCheck()
clus.Members[1].Stop(t)
clus.WaitLeader(t)
err = testutil.CorruptBBolt(clus.Members[0].BackendPath())
require.NoError(t, err)
err = testutil.CorruptBBolt(clus.Members[1].BackendPath())
require.NoError(t, err)
err = clus.Members[0].Restart(t)
require.NoError(t, err)
err = clus.Members[1].Restart(t)
require.NoError(t, err)
_, err = cc.Compact(ctx, 5)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond)
leader := clus.WaitLeader(t)
clus.Members[leader].Server.CorruptionChecker().CompactHashCheck()
time.Sleep(50 * time.Millisecond)
alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: expectedID}}, alarmResponse.Alarms)
expectedAlarmMap := map[uint64]etcdserverpb.AlarmType{
uint64(clus.Members[0].ID()): etcdserverpb.AlarmType_CORRUPT,
uint64(clus.Members[1].ID()): etcdserverpb.AlarmType_CORRUPT,
}
actualAlarmMap := make(map[uint64]etcdserverpb.AlarmType)
for _, alarm := range alarmResponse.Alarms {
actualAlarmMap[alarm.MemberID] = alarm.Alarm
}
require.Equal(t, expectedAlarmMap, actualAlarmMap)
}