etcd/raft/util.go
Tobias Schottdorf ad49c8fd98 raft: fix bug in unbounded log growth prevention mechanism
The previous code was using the proto-generated `Size()` method to
track the size of an incoming proposal at the leader. This includes
the Index and Term, which were mutated after the call to `Size()`
when appending to the log. Additionally, it was not taking into
account that an ignored configuration change would ignore the
original proposal and append an empty entry instead.

As a result, a fully committed Raft group could end up with a non-
zero tracked uncommitted Raft log counter that would eventually hit
the ceiling and drop all future proposals indiscriminately. It would
also immediately imply that proposals exceeding the threshold alone
would get refused (as the "first uncommitted proposal" gets special
treatment and is always allowed in).

Track only the size of the payload actually appended to the Raft log
instead.

For context, see:
https://github.com/cockroachdb/cockroach/issues/31618#issuecomment-431374938
2018-10-22 11:28:39 +02:00

146 lines
3.9 KiB
Go

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"bytes"
"fmt"
pb "go.etcd.io/etcd/raft/raftpb"
)
func (st StateType) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("%q", st.String())), nil
}
// uint64Slice implements sort interface
type uint64Slice []uint64
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func min(a, b uint64) uint64 {
if a > b {
return b
}
return a
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
func IsLocalMsg(msgt pb.MessageType) bool {
return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable ||
msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum
}
func IsResponseMsg(msgt pb.MessageType) bool {
return msgt == pb.MsgAppResp || msgt == pb.MsgVoteResp || msgt == pb.MsgHeartbeatResp || msgt == pb.MsgUnreachable || msgt == pb.MsgPreVoteResp
}
// voteResponseType maps vote and prevote message types to their corresponding responses.
func voteRespMsgType(msgt pb.MessageType) pb.MessageType {
switch msgt {
case pb.MsgVote:
return pb.MsgVoteResp
case pb.MsgPreVote:
return pb.MsgPreVoteResp
default:
panic(fmt.Sprintf("not a vote message: %s", msgt))
}
}
// EntryFormatter can be implemented by the application to provide human-readable formatting
// of entry data. Nil is a valid EntryFormatter and will use a default format.
type EntryFormatter func([]byte) string
// DescribeMessage returns a concise human-readable description of a
// Message for debugging.
func DescribeMessage(m pb.Message, f EntryFormatter) string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index)
if m.Reject {
fmt.Fprintf(&buf, " Rejected")
if m.RejectHint != 0 {
fmt.Fprintf(&buf, "(Hint:%d)", m.RejectHint)
}
}
if m.Commit != 0 {
fmt.Fprintf(&buf, " Commit:%d", m.Commit)
}
if len(m.Entries) > 0 {
fmt.Fprintf(&buf, " Entries:[")
for i, e := range m.Entries {
if i != 0 {
buf.WriteString(", ")
}
buf.WriteString(DescribeEntry(e, f))
}
fmt.Fprintf(&buf, "]")
}
if !IsEmptySnap(m.Snapshot) {
fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot)
}
return buf.String()
}
// PayloadSize is the size of the payload of this Entry. Notably, it does not
// depend on its Index or Term.
func PayloadSize(e pb.Entry) int {
return len(e.Data)
}
// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
var formatted string
if e.Type == pb.EntryNormal && f != nil {
formatted = f(e.Data)
} else {
formatted = fmt.Sprintf("%q", e.Data)
}
return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
}
// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
// each.
func DescribeEntries(ents []pb.Entry, f EntryFormatter) string {
var buf bytes.Buffer
for _, e := range ents {
_, _ = buf.WriteString(DescribeEntry(e, f) + "\n")
}
return buf.String()
}
func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
if len(ents) == 0 {
return ents
}
size := ents[0].Size()
var limit int
for limit = 1; limit < len(ents); limit++ {
size += ents[limit].Size()
if uint64(size) > maxSize {
break
}
}
return ents[:limit]
}