etcd/raft/tracker/inflights.go
Tobias Schottdorf 26a1e60eab raft: return non-nil Inflights in raft status
Recent refactoring to the String() method of `Progress` hit an NPE
because we return nil Inflights as part of the Raft status. Just
fix this at the source and properly populate the Raft status instead
of teaching String() to ignore nil. A real Progress always has a
non-nil Inflights.
2019-07-17 12:53:28 +02:00

133 lines
3.6 KiB
Go

// Copyright 2019 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracker
// Inflights limits the number of MsgApp (represented by the largest index
// contained within) sent to followers but not yet acknowledged by them. Callers
// use Full() to check whether more messages can be sent, call Add() whenever
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int
// number of inflights in the buffer
count int
// the size of the buffer
size int
// buffer contains the index of the last entry
// inside one message.
buffer []uint64
}
// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
func NewInflights(size int) *Inflights {
return &Inflights{
size: size,
}
}
// Clone returns an *Inflights that is identical to but shares no memory with
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]uint64(nil), in.buffer...)
return &ins
}
// Add notifies the Inflights that a new message with the given index is being
// dispatched. Full() must be called prior to Add() to verify that there is room
// for one more message, and consecutive calls to add Add() must provide a
// monotonic sequence of indexes.
func (in *Inflights) Add(inflight uint64) {
if in.Full() {
panic("cannot add into a Full inflights")
}
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight
in.count++
}
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]uint64, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
// out of the left side of the window
return
}
idx := in.start
var i int
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] { // found the first large inflight
break
}
// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
}
// free i inflights and set new start index
in.count -= i
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}
// FreeFirstOne releases the first inflight. This is a no-op if nothing is
// inflight.
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count == in.size
}
// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }
// reset frees all inflights.
func (in *Inflights) reset() {
in.count = 0
in.start = 0
}