mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: provide protection against unbounded Raft log growth
The suggested pattern for Raft proposals is that they be retried periodically until they succeed. This turns out to be an issue when a leader cannot commit entries because the leader will continue to append re-proposed entries to its log without committing anything. This can result in the uncommitted tail of a leader's log growing without bound until it is able to commit entries. This change add a safeguard to protect against this case where a leader's log can grow without bound during loss of quorum scenarios. It does so by introducing a new, optional ``MaxUncommittedEntriesSize configuration. This config limits the max aggregate size of uncommitted entries that may be appended to a leader's log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors. See cockroachdb/cockroach#27772
This commit is contained in:
parent
b046a37256
commit
f89b06dc6d
@ -274,12 +274,13 @@ func (rc *raftNode) startRaft() {
|
||||
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
|
||||
}
|
||||
c := &raft.Config{
|
||||
ID: uint64(rc.id),
|
||||
ElectionTick: 10,
|
||||
HeartbeatTick: 1,
|
||||
Storage: rc.raftStorage,
|
||||
MaxSizePerMsg: 1024 * 1024,
|
||||
MaxInflightMsgs: 256,
|
||||
ID: uint64(rc.id),
|
||||
ElectionTick: 10,
|
||||
HeartbeatTick: 1,
|
||||
Storage: rc.raftStorage,
|
||||
MaxSizePerMsg: 1024 * 1024,
|
||||
MaxInflightMsgs: 256,
|
||||
MaxUncommittedEntriesSize: 1 << 30,
|
||||
}
|
||||
|
||||
if oldwal {
|
||||
|
@ -41,6 +41,7 @@ This raft implementation also includes a few optional enhancements:
|
||||
- Writing to leader's disk in parallel
|
||||
- Internal proposal redirection from followers to leader
|
||||
- Automatic stepping down when the leader loses quorum
|
||||
- Protection against unbounded log growth when quorum is lost
|
||||
|
||||
## Notable Users
|
||||
|
||||
|
@ -401,6 +401,7 @@ func (n *node) run(r *raft) {
|
||||
|
||||
r.msgs = nil
|
||||
r.readStates = nil
|
||||
r.reduceUncommittedSize(rd.CommittedEntries)
|
||||
advancec = n.advancec
|
||||
case <-advancec:
|
||||
if applyingToI != 0 {
|
||||
|
@ -997,3 +997,57 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
|
||||
// partitioned from a quorum of nodes. It verifies that the leader's log is
|
||||
// protected from unbounded growth even as new entries continue to be proposed.
|
||||
// This protection is provided by the MaxUncommittedEntriesSize configuration.
|
||||
func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
|
||||
const maxEntries = 16
|
||||
data := []byte("testdata")
|
||||
testEntry := raftpb.Entry{Data: data}
|
||||
maxEntrySize := uint64(maxEntries * testEntry.Size())
|
||||
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxUncommittedEntriesSize = maxEntrySize
|
||||
r := newRaft(cfg)
|
||||
n := newNode()
|
||||
go n.run(r)
|
||||
defer n.Stop()
|
||||
n.Campaign(context.TODO())
|
||||
|
||||
rd := readyWithTimeout(&n)
|
||||
if len(rd.CommittedEntries) != 1 {
|
||||
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
|
||||
}
|
||||
s.Append(rd.Entries)
|
||||
n.Advance()
|
||||
|
||||
// Simulate a network partition while we make our proposals by never
|
||||
// committing anything. These proposals should not cause the leader's
|
||||
// log to grow indefinitely.
|
||||
for i := 0; i < 1024; i++ {
|
||||
n.Propose(context.TODO(), data)
|
||||
}
|
||||
|
||||
// Check the size of leader's uncommitted log tail. It should not exceed the
|
||||
// MaxUncommittedEntriesSize limit.
|
||||
checkUncommitted := func(exp uint64) {
|
||||
t.Helper()
|
||||
if a := r.uncommittedSize; exp != a {
|
||||
t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
|
||||
}
|
||||
}
|
||||
checkUncommitted(maxEntrySize)
|
||||
|
||||
// Recover from the partition. The uncommitted tail of the Raft log should
|
||||
// disappear as entries are committed.
|
||||
rd = readyWithTimeout(&n)
|
||||
if len(rd.CommittedEntries) != maxEntries {
|
||||
t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
|
||||
}
|
||||
s.Append(rd.Entries)
|
||||
n.Advance()
|
||||
checkUncommitted(0)
|
||||
}
|
||||
|
83
raft/raft.go
83
raft/raft.go
@ -148,12 +148,17 @@ type Config struct {
|
||||
// applied entries. This is a very application dependent configuration.
|
||||
Applied uint64
|
||||
|
||||
// MaxSizePerMsg limits the max size of each append message. Smaller value
|
||||
// lowers the raft recovery cost(initial probing and message lost during normal
|
||||
// operation). On the other side, it might affect the throughput during normal
|
||||
// replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
|
||||
// message.
|
||||
// MaxSizePerMsg limits the max byte size of each append message. Smaller
|
||||
// value lowers the raft recovery cost(initial probing and message lost
|
||||
// during normal operation). On the other side, it might affect the
|
||||
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
|
||||
// 0 for at most one entry per message.
|
||||
MaxSizePerMsg uint64
|
||||
// MaxUncommittedEntriesSize limits the aggregate byte size of the
|
||||
// uncommitted entries that may be appended to a leader's log. Once this
|
||||
// limit is exceeded, proposals will begin to return ErrProposalDropped
|
||||
// errors. Note: 0 for no limit.
|
||||
MaxUncommittedEntriesSize uint64
|
||||
// MaxInflightMsgs limits the max number of in-flight append messages during
|
||||
// optimistic replication phase. The application transportation layer usually
|
||||
// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
|
||||
@ -215,6 +220,10 @@ func (c *Config) validate() error {
|
||||
return errors.New("storage cannot be nil")
|
||||
}
|
||||
|
||||
if c.MaxUncommittedEntriesSize == 0 {
|
||||
c.MaxUncommittedEntriesSize = noLimit
|
||||
}
|
||||
|
||||
if c.MaxInflightMsgs <= 0 {
|
||||
return errors.New("max inflight messages must be greater than 0")
|
||||
}
|
||||
@ -241,11 +250,12 @@ type raft struct {
|
||||
// the log
|
||||
raftLog *raftLog
|
||||
|
||||
maxInflight int
|
||||
maxMsgSize uint64
|
||||
prs map[uint64]*Progress
|
||||
learnerPrs map[uint64]*Progress
|
||||
matchBuf uint64Slice
|
||||
maxMsgSize uint64
|
||||
maxUncommittedSize uint64
|
||||
maxInflight int
|
||||
prs map[uint64]*Progress
|
||||
learnerPrs map[uint64]*Progress
|
||||
matchBuf uint64Slice
|
||||
|
||||
state StateType
|
||||
|
||||
@ -268,6 +278,10 @@ type raft struct {
|
||||
// be proposed if the leader's applied index is greater than this
|
||||
// value.
|
||||
pendingConfIndex uint64
|
||||
// an estimate of the size of the uncommitted tail of the Raft log. Used to
|
||||
// prevent unbounded log growth. Only maintained by the leader. Reset on
|
||||
// term changes.
|
||||
uncommittedSize uint64
|
||||
|
||||
readOnly *readOnly
|
||||
|
||||
@ -326,6 +340,7 @@ func newRaft(c *Config) *raft {
|
||||
raftLog: raftlog,
|
||||
maxMsgSize: c.MaxSizePerMsg,
|
||||
maxInflight: c.MaxInflightMsgs,
|
||||
maxUncommittedSize: c.MaxUncommittedEntriesSize,
|
||||
prs: make(map[uint64]*Progress),
|
||||
learnerPrs: make(map[uint64]*Progress),
|
||||
electionTimeout: c.ElectionTick,
|
||||
@ -616,6 +631,7 @@ func (r *raft) reset(term uint64) {
|
||||
})
|
||||
|
||||
r.pendingConfIndex = 0
|
||||
r.uncommittedSize = 0
|
||||
r.readOnly = newReadOnly(r.readOnly.option)
|
||||
}
|
||||
|
||||
@ -954,6 +970,10 @@ func stepLeader(r *raft, m pb.Message) error {
|
||||
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
|
||||
return ErrProposalDropped
|
||||
}
|
||||
if !r.increaseUncommittedSize(m.Entries) {
|
||||
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
|
||||
return ErrProposalDropped
|
||||
}
|
||||
|
||||
for i, e := range m.Entries {
|
||||
if e.Type == pb.EntryConfChange {
|
||||
@ -1462,6 +1482,49 @@ func (r *raft) abortLeaderTransfer() {
|
||||
r.leadTransferee = None
|
||||
}
|
||||
|
||||
// increaseUncommittedSize computes the size of the proposed entries and
|
||||
// determines whether they would push leader over its maxUncommittedSize limit.
|
||||
// If the new entries would exceed the limit, the method returns false. If not,
|
||||
// the increase in uncommitted entry size is recorded and the method returns
|
||||
// true.
|
||||
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
|
||||
var s uint64
|
||||
for _, e := range ents {
|
||||
s += uint64(e.Size())
|
||||
}
|
||||
|
||||
if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
|
||||
// If the uncommitted tail of the Raft log is empty, allow any size
|
||||
// proposal. Otherwise, limit the size of the uncommitted tail of the
|
||||
// log and drop any proposal that would push the size over the limit.
|
||||
return false
|
||||
}
|
||||
r.uncommittedSize += s
|
||||
return true
|
||||
}
|
||||
|
||||
// reduceUncommittedSize accounts for the newly committed entries by decreasing
|
||||
// the uncommitted entry size limit.
|
||||
func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
|
||||
if r.uncommittedSize == 0 {
|
||||
// Fast-path for followers, who do not track or enforce the limit.
|
||||
return
|
||||
}
|
||||
|
||||
var s uint64
|
||||
for _, e := range ents {
|
||||
s += uint64(e.Size())
|
||||
}
|
||||
if s > r.uncommittedSize {
|
||||
// uncommittedSize may underestimate the size of the uncommitted Raft
|
||||
// log tail but will never overestimate it. Saturate at 0 instead of
|
||||
// allowing overflow.
|
||||
r.uncommittedSize = 0
|
||||
} else {
|
||||
r.uncommittedSize -= s
|
||||
}
|
||||
}
|
||||
|
||||
func numOfPendingConf(ents []pb.Entry) int {
|
||||
n := 0
|
||||
for i := range ents {
|
||||
|
@ -362,6 +362,71 @@ func TestProgressFlowControl(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestUncommittedEntryLimit(t *testing.T) {
|
||||
const maxEntries = 16
|
||||
testEntry := pb.Entry{Data: []byte("testdata")}
|
||||
maxEntrySize := maxEntries * testEntry.Size()
|
||||
|
||||
cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
|
||||
cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
|
||||
r := newRaft(cfg)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
|
||||
// Set the two followers to the replicate state. Commit to tail of log.
|
||||
const numFollowers = 2
|
||||
r.prs[2].becomeReplicate()
|
||||
r.prs[3].becomeReplicate()
|
||||
r.uncommittedSize = 0
|
||||
|
||||
// Send proposals to r1. The first 5 entries should be appended to the log.
|
||||
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}}
|
||||
propEnts := make([]pb.Entry, maxEntries)
|
||||
for i := 0; i < maxEntries; i++ {
|
||||
if err := r.Step(propMsg); err != nil {
|
||||
t.Fatalf("proposal resulted in error: %v", err)
|
||||
}
|
||||
propEnts[i] = testEntry
|
||||
}
|
||||
|
||||
// Send one more proposal to r1. It should be rejected.
|
||||
if err := r.Step(propMsg); err != ErrProposalDropped {
|
||||
t.Fatalf("proposal not dropped: %v", err)
|
||||
}
|
||||
|
||||
// Read messages and reduce the uncommitted size as if we had committed
|
||||
// these entries.
|
||||
ms := r.readMessages()
|
||||
if e := maxEntries * numFollowers; len(ms) != e {
|
||||
t.Fatalf("expected %d messages, got %d", e, len(ms))
|
||||
}
|
||||
r.reduceUncommittedSize(propEnts)
|
||||
|
||||
// Send a single large proposal to r1. Should be accepted even though it
|
||||
// pushes us above the limit because we were beneath it before the proposal.
|
||||
propEnts = make([]pb.Entry, 2*maxEntries)
|
||||
for i := range propEnts {
|
||||
propEnts[i] = testEntry
|
||||
}
|
||||
propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts}
|
||||
if err := r.Step(propMsgLarge); err != nil {
|
||||
t.Fatalf("proposal resulted in error: %v", err)
|
||||
}
|
||||
|
||||
// Send one more proposal to r1. It should be rejected, again.
|
||||
if err := r.Step(propMsg); err != ErrProposalDropped {
|
||||
t.Fatalf("proposal not dropped: %v", err)
|
||||
}
|
||||
|
||||
// Read messages and reduce the uncommitted size as if we had committed
|
||||
// these entries.
|
||||
ms = r.readMessages()
|
||||
if e := 1 * numFollowers; len(ms) != e {
|
||||
t.Fatalf("expected %d messages, got %d", e, len(ms))
|
||||
}
|
||||
r.reduceUncommittedSize(propEnts)
|
||||
}
|
||||
|
||||
func TestLeaderElection(t *testing.T) {
|
||||
testLeaderElection(t, false)
|
||||
}
|
||||
|
@ -41,12 +41,13 @@ type node struct {
|
||||
func startNode(id uint64, peers []raft.Peer, iface iface) *node {
|
||||
st := raft.NewMemoryStorage()
|
||||
c := &raft.Config{
|
||||
ID: id,
|
||||
ElectionTick: 10,
|
||||
HeartbeatTick: 1,
|
||||
Storage: st,
|
||||
MaxSizePerMsg: 1024 * 1024,
|
||||
MaxInflightMsgs: 256,
|
||||
ID: id,
|
||||
ElectionTick: 10,
|
||||
HeartbeatTick: 1,
|
||||
Storage: st,
|
||||
MaxSizePerMsg: 1024 * 1024,
|
||||
MaxInflightMsgs: 256,
|
||||
MaxUncommittedEntriesSize: 1 << 30,
|
||||
}
|
||||
rn := raft.StartNode(c, peers)
|
||||
n := &node{
|
||||
@ -125,12 +126,13 @@ func (n *node) restart() {
|
||||
// wait for the shutdown
|
||||
<-n.stopc
|
||||
c := &raft.Config{
|
||||
ID: n.id,
|
||||
ElectionTick: 10,
|
||||
HeartbeatTick: 1,
|
||||
Storage: n.storage,
|
||||
MaxSizePerMsg: 1024 * 1024,
|
||||
MaxInflightMsgs: 256,
|
||||
ID: n.id,
|
||||
ElectionTick: 10,
|
||||
HeartbeatTick: 1,
|
||||
Storage: n.storage,
|
||||
MaxSizePerMsg: 1024 * 1024,
|
||||
MaxInflightMsgs: 256,
|
||||
MaxUncommittedEntriesSize: 1 << 30,
|
||||
}
|
||||
n.Node = raft.RestartNode(c)
|
||||
n.start()
|
||||
|
@ -198,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error {
|
||||
func (rn *RawNode) Ready() Ready {
|
||||
rd := rn.newReady()
|
||||
rn.raft.msgs = nil
|
||||
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
|
||||
return rd
|
||||
}
|
||||
|
||||
|
@ -484,3 +484,64 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is
|
||||
// partitioned from a quorum of nodes. It verifies that the leader's log is
|
||||
// protected from unbounded growth even as new entries continue to be proposed.
|
||||
// This protection is provided by the MaxUncommittedEntriesSize configuration.
|
||||
func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
|
||||
const maxEntries = 16
|
||||
data := []byte("testdata")
|
||||
testEntry := raftpb.Entry{Data: data}
|
||||
maxEntrySize := uint64(maxEntries * testEntry.Size())
|
||||
|
||||
s := NewMemoryStorage()
|
||||
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
|
||||
cfg.MaxUncommittedEntriesSize = maxEntrySize
|
||||
rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rd := rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
|
||||
// Become the leader.
|
||||
rawNode.Campaign()
|
||||
for {
|
||||
rd = rawNode.Ready()
|
||||
s.Append(rd.Entries)
|
||||
if rd.SoftState.Lead == rawNode.raft.id {
|
||||
rawNode.Advance(rd)
|
||||
break
|
||||
}
|
||||
rawNode.Advance(rd)
|
||||
}
|
||||
|
||||
// Simulate a network partition while we make our proposals by never
|
||||
// committing anything. These proposals should not cause the leader's
|
||||
// log to grow indefinitely.
|
||||
for i := 0; i < 1024; i++ {
|
||||
rawNode.Propose(data)
|
||||
}
|
||||
|
||||
// Check the size of leader's uncommitted log tail. It should not exceed the
|
||||
// MaxUncommittedEntriesSize limit.
|
||||
checkUncommitted := func(exp uint64) {
|
||||
t.Helper()
|
||||
if a := rawNode.raft.uncommittedSize; exp != a {
|
||||
t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
|
||||
}
|
||||
}
|
||||
checkUncommitted(maxEntrySize)
|
||||
|
||||
// Recover from the partition. The uncommitted tail of the Raft log should
|
||||
// disappear as entries are committed.
|
||||
rd = rawNode.Ready()
|
||||
if len(rd.CommittedEntries) != maxEntries {
|
||||
t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
|
||||
}
|
||||
s.Append(rd.Entries)
|
||||
rawNode.Advance(rd)
|
||||
checkUncommitted(0)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user