From 064004b8992d3ef5eb153758d0a5553e4b1677ad Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 30 Jun 2014 15:15:51 -0700 Subject: [PATCH] raft: add log compact --- raft/log.go | 38 ++++++++++++++++++++--- raft/log_test.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 5 deletions(-) diff --git a/raft/log.go b/raft/log.go index af8101ee2..c94257164 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1,5 +1,7 @@ package raft +import "fmt" + const ( Normal int = iota @@ -7,6 +9,10 @@ const ( RemoveNode ) +const ( + defaultCompactThreshold = 10000 +) + type Entry struct { Type int Term int @@ -22,13 +28,18 @@ type log struct { committed int applied int offset int + + // want a compact after the number of entries exceeds the threshold + // TODO(xiangli) size might be a better criteria + compactThreshold int } func newLog() *log { return &log{ - ents: make([]Entry, 1), - committed: 0, - applied: 0, + ents: make([]Entry, 1), + committed: 0, + applied: 0, + compactThreshold: defaultCompactThreshold, } } @@ -42,7 +53,7 @@ func (l *log) maybeAppend(index, logTerm, committed int, ents ...Entry) bool { } func (l *log) append(after int, ents ...Entry) int { - l.ents = append(l.slice(0, after+1), ents...) + l.ents = append(l.slice(l.offset, after+1), ents...) return l.lastIndex() } @@ -81,7 +92,7 @@ func (l *log) maybeCommit(maxIndex, term int) bool { return false } -// nextEnts returns all the avaliable entries for execution. +// nextEnts returns all the available entries for execution. // all the returned entries will be marked as applied. func (l *log) nextEnts() (ents []Entry) { if l.committed > l.applied { @@ -91,6 +102,23 @@ func (l *log) nextEnts() (ents []Entry) { return ents } +// compact removes the log entries before i, exclusive. +// i must be not smaller than the index of the first entry +// and not greater than the index of the last entry. +// the number of entries after compaction will be returned. +func (l *log) compact(i int) int { + if l.isOutOfBounds(i) { + panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.lastIndex())) + } + l.ents = l.slice(i, l.lastIndex()+1) + l.offset = i + return len(l.ents) +} + +func (l *log) shouldCompact() bool { + return (l.committed - l.offset) > l.compactThreshold +} + func (l *log) at(i int) *Entry { if l.isOutOfBounds(i) { return nil diff --git a/raft/log_test.go b/raft/log_test.go index b0ca79199..3229b00ce 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -5,6 +5,86 @@ import ( "testing" ) +// TestCompactionSideEffects ensures that all the log related funcationality works correctly after +// a compaction. +func TestCompactionSideEffects(t *testing.T) { + lastIndex := 1000 + log := newLog() + + for i := 0; i < lastIndex; i++ { + log.append(i, Entry{Term: i + 1}) + } + + log.compact(500) + + if log.lastIndex() != lastIndex { + t.Errorf("lastIndex = %d, want %d", log.lastIndex(), lastIndex) + } + + for i := log.offset; i <= log.lastIndex(); i++ { + if log.term(i) != i { + t.Errorf("term(%d) = %d, want %d", i, log.term(i), i) + } + } + + for i := log.offset; i <= log.lastIndex(); i++ { + if !log.matchTerm(i, i) { + t.Errorf("matchTerm(%d) = false, want true", i) + } + } + + prev := log.lastIndex() + log.append(log.lastIndex(), Entry{Term: log.lastIndex() + 1}) + if log.lastIndex() != prev+1 { + t.Errorf("lastIndex = %d, want = %d", log.lastIndex(), prev+1) + } + + ents := log.entries(log.lastIndex()) + if len(ents) != 1 { + t.Errorf("len(entries) = %d, want = %d", len(ents), 1) + } +} + +//TestCompaction ensures that the number of log entreis is correct after compactions. +func TestCompaction(t *testing.T) { + tests := []struct { + app int + compact []int + wleft []int + wallow bool + }{ + // out of upper bound + {1000, []int{1001}, []int{-1}, false}, + {1000, []int{300, 500, 800, 900}, []int{701, 501, 201, 101}, true}, + // out of lower bound + {1000, []int{300, 299}, []int{701, -1}, false}, + } + + for i, tt := range tests { + func() { + defer func() { + if r := recover(); r != nil { + if tt.wallow == true { + t.Errorf("%d: allow = %v, want %v", i, false, true) + } + } + }() + + log := newLog() + for i := 0; i < tt.app; i++ { + log.append(i, Entry{}) + } + + for j := 0; j < len(tt.compact); j++ { + log.compact(tt.compact[j]) + if len(log.ents) != tt.wleft[j] { + t.Errorf("#%d.%d len = %d, want %d", i, j, len(log.ents), tt.wleft[j]) + } + } + }() + } +} + func TestIsOutOfBounds(t *testing.T) { offset := 100 num := 100