mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: add log compact
This commit is contained in:
parent
6a232dfc13
commit
064004b899
38
raft/log.go
38
raft/log.go
@ -1,5 +1,7 @@
|
||||
package raft
|
||||
|
||||
import "fmt"
|
||||
|
||||
const (
|
||||
Normal int = iota
|
||||
|
||||
@ -7,6 +9,10 @@ const (
|
||||
RemoveNode
|
||||
)
|
||||
|
||||
const (
|
||||
defaultCompactThreshold = 10000
|
||||
)
|
||||
|
||||
type Entry struct {
|
||||
Type int
|
||||
Term int
|
||||
@ -22,13 +28,18 @@ type log struct {
|
||||
committed int
|
||||
applied int
|
||||
offset int
|
||||
|
||||
// want a compact after the number of entries exceeds the threshold
|
||||
// TODO(xiangli) size might be a better criteria
|
||||
compactThreshold int
|
||||
}
|
||||
|
||||
func newLog() *log {
|
||||
return &log{
|
||||
ents: make([]Entry, 1),
|
||||
committed: 0,
|
||||
applied: 0,
|
||||
ents: make([]Entry, 1),
|
||||
committed: 0,
|
||||
applied: 0,
|
||||
compactThreshold: defaultCompactThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,7 +53,7 @@ func (l *log) maybeAppend(index, logTerm, committed int, ents ...Entry) bool {
|
||||
}
|
||||
|
||||
func (l *log) append(after int, ents ...Entry) int {
|
||||
l.ents = append(l.slice(0, after+1), ents...)
|
||||
l.ents = append(l.slice(l.offset, after+1), ents...)
|
||||
return l.lastIndex()
|
||||
}
|
||||
|
||||
@ -81,7 +92,7 @@ func (l *log) maybeCommit(maxIndex, term int) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// nextEnts returns all the avaliable entries for execution.
|
||||
// nextEnts returns all the available entries for execution.
|
||||
// all the returned entries will be marked as applied.
|
||||
func (l *log) nextEnts() (ents []Entry) {
|
||||
if l.committed > l.applied {
|
||||
@ -91,6 +102,23 @@ func (l *log) nextEnts() (ents []Entry) {
|
||||
return ents
|
||||
}
|
||||
|
||||
// compact removes the log entries before i, exclusive.
|
||||
// i must be not smaller than the index of the first entry
|
||||
// and not greater than the index of the last entry.
|
||||
// the number of entries after compaction will be returned.
|
||||
func (l *log) compact(i int) int {
|
||||
if l.isOutOfBounds(i) {
|
||||
panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.lastIndex()))
|
||||
}
|
||||
l.ents = l.slice(i, l.lastIndex()+1)
|
||||
l.offset = i
|
||||
return len(l.ents)
|
||||
}
|
||||
|
||||
func (l *log) shouldCompact() bool {
|
||||
return (l.committed - l.offset) > l.compactThreshold
|
||||
}
|
||||
|
||||
func (l *log) at(i int) *Entry {
|
||||
if l.isOutOfBounds(i) {
|
||||
return nil
|
||||
|
@ -5,6 +5,86 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestCompactionSideEffects ensures that all the log related funcationality works correctly after
|
||||
// a compaction.
|
||||
func TestCompactionSideEffects(t *testing.T) {
|
||||
lastIndex := 1000
|
||||
log := newLog()
|
||||
|
||||
for i := 0; i < lastIndex; i++ {
|
||||
log.append(i, Entry{Term: i + 1})
|
||||
}
|
||||
|
||||
log.compact(500)
|
||||
|
||||
if log.lastIndex() != lastIndex {
|
||||
t.Errorf("lastIndex = %d, want %d", log.lastIndex(), lastIndex)
|
||||
}
|
||||
|
||||
for i := log.offset; i <= log.lastIndex(); i++ {
|
||||
if log.term(i) != i {
|
||||
t.Errorf("term(%d) = %d, want %d", i, log.term(i), i)
|
||||
}
|
||||
}
|
||||
|
||||
for i := log.offset; i <= log.lastIndex(); i++ {
|
||||
if !log.matchTerm(i, i) {
|
||||
t.Errorf("matchTerm(%d) = false, want true", i)
|
||||
}
|
||||
}
|
||||
|
||||
prev := log.lastIndex()
|
||||
log.append(log.lastIndex(), Entry{Term: log.lastIndex() + 1})
|
||||
if log.lastIndex() != prev+1 {
|
||||
t.Errorf("lastIndex = %d, want = %d", log.lastIndex(), prev+1)
|
||||
}
|
||||
|
||||
ents := log.entries(log.lastIndex())
|
||||
if len(ents) != 1 {
|
||||
t.Errorf("len(entries) = %d, want = %d", len(ents), 1)
|
||||
}
|
||||
}
|
||||
|
||||
//TestCompaction ensures that the number of log entreis is correct after compactions.
|
||||
func TestCompaction(t *testing.T) {
|
||||
tests := []struct {
|
||||
app int
|
||||
compact []int
|
||||
wleft []int
|
||||
wallow bool
|
||||
}{
|
||||
// out of upper bound
|
||||
{1000, []int{1001}, []int{-1}, false},
|
||||
{1000, []int{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
|
||||
// out of lower bound
|
||||
{1000, []int{300, 299}, []int{701, -1}, false},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if tt.wallow == true {
|
||||
t.Errorf("%d: allow = %v, want %v", i, false, true)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log := newLog()
|
||||
for i := 0; i < tt.app; i++ {
|
||||
log.append(i, Entry{})
|
||||
}
|
||||
|
||||
for j := 0; j < len(tt.compact); j++ {
|
||||
log.compact(tt.compact[j])
|
||||
if len(log.ents) != tt.wleft[j] {
|
||||
t.Errorf("#%d.%d len = %d, want %d", i, j, len(log.ents), tt.wleft[j])
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsOutOfBounds(t *testing.T) {
|
||||
offset := 100
|
||||
num := 100
|
||||
|
Loading…
x
Reference in New Issue
Block a user