diff --git a/client/pkg/types/id.go b/client/pkg/types/id.go index ae00388dd..9a8429391 100644 --- a/client/pkg/types/id.go +++ b/client/pkg/types/id.go @@ -14,7 +14,10 @@ package types -import "strconv" +import ( + "bytes" + "strconv" +) // ID represents a generic identifier which is canonically // stored as a uint64 but is typically represented as a @@ -37,3 +40,17 @@ type IDSlice []ID func (p IDSlice) Len() int { return len(p) } func (p IDSlice) Less(i, j int) bool { return uint64(p[i]) < uint64(p[j]) } func (p IDSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (p IDSlice) String() string { + var b bytes.Buffer + if p.Len() > 0 { + b.WriteString(p[0].String()) + } + + for i := 1; i < p.Len(); i++ { + b.WriteString(",") + b.WriteString(p[i].String()) + } + + return b.String() +} diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 7b72124e6..dd627796d 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "sort" "strings" @@ -258,57 +259,157 @@ func (cm *corruptionChecker) CompactHashCheck() { ) hashes := cm.uncheckedRevisions() // Assume that revisions are ordered from largest to smallest - for i, hash := range hashes { + for _, hash := range hashes { peers := cm.hasher.PeerHashByRev(hash.Revision) if len(peers) == 0 { continue } - peersChecked := 0 - for _, p := range peers { - if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision { - continue - } - - // follower's compact revision is leader's old one, then hashes must match - if p.resp.Hash != hash.Hash { - cm.hasher.TriggerCorruptAlarm(p.id) - cm.lg.Error("failed compaction hash check", - zap.Int64("revision", hash.Revision), - zap.Int64("leader-compact-revision", hash.CompactRevision), - zap.Uint32("leader-hash", hash.Hash), - zap.Int64("follower-compact-revision", p.resp.CompactRevision), - zap.Uint32("follower-hash", p.resp.Hash), - zap.String("follower-peer-id", p.id.String()), - ) - return - } - peersChecked++ - cm.lg.Info("successfully checked hash on follower", - zap.Int64("revision", hash.Revision), - zap.String("peer-id", p.id.String()), - ) + if cm.checkPeerHashes(hash, peers) { + return } - if len(peers) == peersChecked { + } + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) + return +} + +// check peers hash and raise alarms if detected corruption. +// return a bool indicate whether to check next hash. +// +// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash +// false: skipped some members, so need to check next hash +func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool { + leaderId := cm.hasher.MemberId() + hashMap := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}} + + peersChecked := 0 + // group all peers by hash + for _, peer := range peers { + skipped := false + reason := "" + + if peer.resp == nil { + skipped = true + reason = "no response" + } else if peer.resp.CompactRevision != leaderHash.CompactRevision { + skipped = true + reason = fmt.Sprintf("the peer's CompactRevision %d doesn't match leader's CompactRevision %d", + peer.resp.CompactRevision, leaderHash.CompactRevision) + } + if skipped { + cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)), + zap.String("leader-id", leaderId.String()), + zap.String("peer-id", peer.id.String()), + zap.String("reason", reason)) + continue + } + + peersChecked++ + if ids, ok := hashMap[peer.resp.Hash]; !ok { + hashMap[peer.resp.Hash] = []types.ID{peer.id} + } else { + ids = append(ids, peer.id) + hashMap[peer.resp.Hash] = ids + } + } + + // All members have the same CompactRevision and Hash. + if len(hashMap) == 1 { + if peersChecked == len(peers) { cm.lg.Info("successfully checked hash on whole cluster", zap.Int("number-of-peers-checked", peersChecked), - zap.Int64("revision", hash.Revision), + zap.Int64("revision", leaderHash.Revision), + zap.Int64("compactRevision", leaderHash.CompactRevision), ) cm.mux.Lock() - if hash.Revision > cm.latestRevisionChecked { - cm.latestRevisionChecked = hash.Revision + if leaderHash.Revision > cm.latestRevisionChecked { + cm.latestRevisionChecked = leaderHash.Revision } cm.mux.Unlock() - cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) - return + return true } cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", zap.Int("number-of-peers-checked", peersChecked), zap.Int("number-of-peers", len(peers)), - zap.Int64("revision", hash.Revision), + zap.Int64("revision", leaderHash.Revision), + zap.Int64("compactRevision", leaderHash.CompactRevision), + ) + // The only case which needs to check next hash + return false + } + + // Detected hashes mismatch + // The first step is to figure out the majority with the same hash. + memberCnt := len(peers) + 1 + quorum := memberCnt/2 + 1 + quorumExist := false + for k, v := range hashMap { + if len(v) >= quorum { + quorumExist = true + // remove the majority, and we might raise alarms for the left members. + delete(hashMap, k) + break + } + } + + if !quorumExist { + // If quorumExist doesn't exist, then only raise alarm for the least minority + cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + ) + + // find out the members which are the least minority + var ( + minCnt int = math.MaxInt + hashVal uint32 + memberIDs types.IDSlice + ) + + for k, v := range hashMap { + if v.Len() < minCnt { + minCnt = v.Len() + hashVal = k + memberIDs = v + } + } + + for _, pid := range memberIDs { + cm.hasher.TriggerCorruptAlarm(pid) + } + delete(hashMap, hashVal) + + cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + zap.Uint32("peer-hash", hashVal), + zap.String("peer-ids", memberIDs.String()), ) } - cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) - return + + // Raise alarm for the left members if the quorum is present. + // But we should always generate error log for debugging. + for k, v := range hashMap { + for _, pid := range v { + if quorumExist { + cm.hasher.TriggerCorruptAlarm(pid) + } + } + cm.lg.Error("Detected compaction hash mismatch", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + zap.Uint32("peer-hash", k), + zap.String("peer-ids", v.String()), + zap.Bool("alarm-raised", quorumExist), + ) + } + + return true } func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash { diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 93942051c..f8274fe16 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -251,7 +251,7 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}}, peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, }, { name: "Peer returned different compaction revision is skipped", @@ -259,15 +259,34 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, }, { name: "Peer returned same compaction revision but different hash triggers alarm", hasher: fakeHasher{ - hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, - peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}}, + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(42)"}, + expectCorrupt: true, + }, + { + name: "Peer returned same compaction revision but different hash triggers alarm for the least minority", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 46}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}}, + {peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(46)"}, expectCorrupt: true, }, { @@ -276,7 +295,7 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()"}, expectLastRevisionChecked: 2, }, { @@ -288,7 +307,7 @@ func TestCompactHashCheck(t *testing.T) { {err: fmt.Errorf("failed getting hash")}, }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"}, }, } for _, tc := range tcs {