mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Recent refactoring to the String() method of `Progress` hit an NPE because we return nil Inflights as part of the Raft status. Just fix this at the source and properly populate the Raft status instead of teaching String() to ignore nil. A real Progress always has a non-nil Inflights.
133 lines
3.6 KiB
Go
133 lines
3.6 KiB
Go
// Copyright 2019 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tracker
|
|
|
|
// Inflights limits the number of MsgApp (represented by the largest index
|
|
// contained within) sent to followers but not yet acknowledged by them. Callers
|
|
// use Full() to check whether more messages can be sent, call Add() whenever
|
|
// they are sending a new append, and release "quota" via FreeLE() whenever an
|
|
// ack is received.
|
|
type Inflights struct {
|
|
// the starting index in the buffer
|
|
start int
|
|
// number of inflights in the buffer
|
|
count int
|
|
|
|
// the size of the buffer
|
|
size int
|
|
|
|
// buffer contains the index of the last entry
|
|
// inside one message.
|
|
buffer []uint64
|
|
}
|
|
|
|
// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
|
|
func NewInflights(size int) *Inflights {
|
|
return &Inflights{
|
|
size: size,
|
|
}
|
|
}
|
|
|
|
// Clone returns an *Inflights that is identical to but shares no memory with
|
|
// the receiver.
|
|
func (in *Inflights) Clone() *Inflights {
|
|
ins := *in
|
|
ins.buffer = append([]uint64(nil), in.buffer...)
|
|
return &ins
|
|
}
|
|
|
|
// Add notifies the Inflights that a new message with the given index is being
|
|
// dispatched. Full() must be called prior to Add() to verify that there is room
|
|
// for one more message, and consecutive calls to add Add() must provide a
|
|
// monotonic sequence of indexes.
|
|
func (in *Inflights) Add(inflight uint64) {
|
|
if in.Full() {
|
|
panic("cannot add into a Full inflights")
|
|
}
|
|
next := in.start + in.count
|
|
size := in.size
|
|
if next >= size {
|
|
next -= size
|
|
}
|
|
if next >= len(in.buffer) {
|
|
in.grow()
|
|
}
|
|
in.buffer[next] = inflight
|
|
in.count++
|
|
}
|
|
|
|
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
|
|
// instead of preallocating to inflights.size to handle systems which have
|
|
// thousands of Raft groups per process.
|
|
func (in *Inflights) grow() {
|
|
newSize := len(in.buffer) * 2
|
|
if newSize == 0 {
|
|
newSize = 1
|
|
} else if newSize > in.size {
|
|
newSize = in.size
|
|
}
|
|
newBuffer := make([]uint64, newSize)
|
|
copy(newBuffer, in.buffer)
|
|
in.buffer = newBuffer
|
|
}
|
|
|
|
// FreeLE frees the inflights smaller or equal to the given `to` flight.
|
|
func (in *Inflights) FreeLE(to uint64) {
|
|
if in.count == 0 || to < in.buffer[in.start] {
|
|
// out of the left side of the window
|
|
return
|
|
}
|
|
|
|
idx := in.start
|
|
var i int
|
|
for i = 0; i < in.count; i++ {
|
|
if to < in.buffer[idx] { // found the first large inflight
|
|
break
|
|
}
|
|
|
|
// increase index and maybe rotate
|
|
size := in.size
|
|
if idx++; idx >= size {
|
|
idx -= size
|
|
}
|
|
}
|
|
// free i inflights and set new start index
|
|
in.count -= i
|
|
in.start = idx
|
|
if in.count == 0 {
|
|
// inflights is empty, reset the start index so that we don't grow the
|
|
// buffer unnecessarily.
|
|
in.start = 0
|
|
}
|
|
}
|
|
|
|
// FreeFirstOne releases the first inflight. This is a no-op if nothing is
|
|
// inflight.
|
|
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
|
|
|
|
// Full returns true if no more messages can be sent at the moment.
|
|
func (in *Inflights) Full() bool {
|
|
return in.count == in.size
|
|
}
|
|
|
|
// Count returns the number of inflight messages.
|
|
func (in *Inflights) Count() int { return in.count }
|
|
|
|
// reset frees all inflights.
|
|
func (in *Inflights) reset() {
|
|
in.count = 0
|
|
in.start = 0
|
|
}
|