mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8123 from yudai/revision_compactor
Compactor: Add Revisional compactor
This commit is contained in:
commit
0fe8fdcb29
@ -15,14 +15,13 @@
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
"github.com/jonboulle/clockwork"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -32,8 +31,24 @@ var (
|
||||
const (
|
||||
checkCompactionInterval = 5 * time.Minute
|
||||
executeCompactionInterval = time.Hour
|
||||
|
||||
ModePeriodic = "periodic"
|
||||
ModeRevision = "revision"
|
||||
)
|
||||
|
||||
// Compactor purges old log from the storage periodically.
|
||||
type Compactor interface {
|
||||
// Run starts the main loop of the compactor in background.
|
||||
// Use Stop() to halt the loop and release the resource.
|
||||
Run()
|
||||
// Stop halts the main loop of the compactor.
|
||||
Stop()
|
||||
// Pause temporally suspend the compactor not to run compaction. Resume() to unpose.
|
||||
Pause()
|
||||
// Resume restarts the compactor suspended by Pause().
|
||||
Resume()
|
||||
}
|
||||
|
||||
type Compactable interface {
|
||||
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
|
||||
}
|
||||
@ -42,96 +57,13 @@ type RevGetter interface {
|
||||
Rev() int64
|
||||
}
|
||||
|
||||
// Periodic compacts the log by purging revisions older than
|
||||
// the configured retention time. Compaction happens hourly.
|
||||
type Periodic struct {
|
||||
clock clockwork.Clock
|
||||
periodInHour int
|
||||
|
||||
rg RevGetter
|
||||
c Compactable
|
||||
|
||||
revs []int64
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
mu sync.Mutex
|
||||
paused bool
|
||||
}
|
||||
|
||||
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
|
||||
return &Periodic{
|
||||
clock: clockwork.NewRealClock(),
|
||||
periodInHour: h,
|
||||
rg: rg,
|
||||
c: c,
|
||||
func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) {
|
||||
switch mode {
|
||||
case ModePeriodic:
|
||||
return NewPeriodic(retention, rg, c), nil
|
||||
case ModeRevision:
|
||||
return NewRevision(int64(retention), rg, c), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Periodic) Run() {
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
t.revs = make([]int64, 0)
|
||||
clock := t.clock
|
||||
|
||||
go func() {
|
||||
last := clock.Now()
|
||||
for {
|
||||
t.revs = append(t.revs, t.rg.Rev())
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
return
|
||||
case <-clock.After(checkCompactionInterval):
|
||||
t.mu.Lock()
|
||||
p := t.paused
|
||||
t.mu.Unlock()
|
||||
if p {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if clock.Now().Sub(last) < executeCompactionInterval {
|
||||
continue
|
||||
}
|
||||
|
||||
rev, remaining := t.getRev(t.periodInHour)
|
||||
if rev < 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
plog.Noticef("Starting auto-compaction at revision %d", rev)
|
||||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
t.revs = remaining
|
||||
last = clock.Now()
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
|
||||
plog.Noticef("Retry after %v", checkCompactionInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *Periodic) Stop() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
func (t *Periodic) Pause() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = true
|
||||
}
|
||||
|
||||
func (t *Periodic) Resume() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = false
|
||||
}
|
||||
|
||||
func (t *Periodic) getRev(h int) (int64, []int64) {
|
||||
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
|
||||
if i < 0 {
|
||||
return -1, t.revs
|
||||
}
|
||||
return t.revs[i], t.revs[i+1:]
|
||||
}
|
||||
|
@ -15,108 +15,11 @@
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/jonboulle/clockwork"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestPeriodic(t *testing.T) {
|
||||
retentionHours := 2
|
||||
|
||||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := &Periodic{
|
||||
clock: fc,
|
||||
periodInHour: retentionHours,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
||||
n := int(time.Hour / checkCompactionInterval)
|
||||
// collect 5 hours of revisions
|
||||
for i := 0; i < 5; i++ {
|
||||
// advance one hour, one revision for each interval
|
||||
for j := 0; j < n; j++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
}
|
||||
|
||||
// compaction doesn't happen til 2 hours elapses
|
||||
if i+1 < retentionHours {
|
||||
continue
|
||||
}
|
||||
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
}
|
||||
|
||||
// unblock the rev getter, so we can stop the compactor routine.
|
||||
_, err := rg.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeriodicPause(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
tb := &Periodic{
|
||||
clock: fc,
|
||||
periodInHour: 1,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
|
||||
tb.Run()
|
||||
tb.Pause()
|
||||
|
||||
// tb will collect 3 hours of revisions but not compact since paused
|
||||
n := int(time.Hour / checkCompactionInterval)
|
||||
for i := 0; i < 3*n; i++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
}
|
||||
// tb ends up waiting for the clock
|
||||
|
||||
select {
|
||||
case a := <-compactable.Chan():
|
||||
t.Fatalf("unexpected action %v", a)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
|
||||
// tb resumes to being blocked on the clock
|
||||
tb.Resume()
|
||||
|
||||
// unblock clock, will kick off a compaction at hour 3:05
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// compact the revision from hour 2:05
|
||||
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
|
||||
if !reflect.DeepEqual(a[0].Params[0], wreq) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeCompactable struct {
|
||||
testutil.Recorder
|
||||
}
|
||||
|
122
compactor/periodic.go
Normal file
122
compactor/periodic.go
Normal file
@ -0,0 +1,122 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
)
|
||||
|
||||
// Periodic compacts the log by purging revisions older than
|
||||
// the configured retention time. Compaction happens hourly.
|
||||
type Periodic struct {
|
||||
clock clockwork.Clock
|
||||
periodInHour int
|
||||
|
||||
rg RevGetter
|
||||
c Compactable
|
||||
|
||||
revs []int64
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
mu sync.Mutex
|
||||
paused bool
|
||||
}
|
||||
|
||||
// NewPeriodic creates a new instance of Periodic compactor that purges
|
||||
// the log older than h hours.
|
||||
func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic {
|
||||
return &Periodic{
|
||||
clock: clockwork.NewRealClock(),
|
||||
periodInHour: h,
|
||||
rg: rg,
|
||||
c: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Periodic) Run() {
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
t.revs = make([]int64, 0)
|
||||
clock := t.clock
|
||||
|
||||
go func() {
|
||||
last := clock.Now()
|
||||
for {
|
||||
t.revs = append(t.revs, t.rg.Rev())
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
return
|
||||
case <-clock.After(checkCompactionInterval):
|
||||
t.mu.Lock()
|
||||
p := t.paused
|
||||
t.mu.Unlock()
|
||||
if p {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if clock.Now().Sub(last) < executeCompactionInterval {
|
||||
continue
|
||||
}
|
||||
|
||||
rev, remaining := t.getRev(t.periodInHour)
|
||||
if rev < 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour)
|
||||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
t.revs = remaining
|
||||
last = clock.Now()
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
|
||||
plog.Noticef("Retry after %v", checkCompactionInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *Periodic) Stop() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
func (t *Periodic) Pause() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = true
|
||||
}
|
||||
|
||||
func (t *Periodic) Resume() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = false
|
||||
}
|
||||
|
||||
func (t *Periodic) getRev(h int) (int64, []int64) {
|
||||
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
|
||||
if i < 0 {
|
||||
return -1, t.revs
|
||||
}
|
||||
return t.revs[i], t.revs[i+1:]
|
||||
}
|
117
compactor/periodic_test.go
Normal file
117
compactor/periodic_test.go
Normal file
@ -0,0 +1,117 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/jonboulle/clockwork"
|
||||
)
|
||||
|
||||
func TestPeriodic(t *testing.T) {
|
||||
retentionHours := 2
|
||||
|
||||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := &Periodic{
|
||||
clock: fc,
|
||||
periodInHour: retentionHours,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
||||
n := int(time.Hour / checkCompactionInterval)
|
||||
// collect 5 hours of revisions
|
||||
for i := 0; i < 5; i++ {
|
||||
// advance one hour, one revision for each interval
|
||||
for j := 0; j < n; j++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
}
|
||||
|
||||
// compaction doesn't happen til 2 hours elapses
|
||||
if i+1 < retentionHours {
|
||||
continue
|
||||
}
|
||||
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
}
|
||||
|
||||
// unblock the rev getter, so we can stop the compactor routine.
|
||||
_, err := rg.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeriodicPause(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
tb := &Periodic{
|
||||
clock: fc,
|
||||
periodInHour: 1,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
|
||||
tb.Run()
|
||||
tb.Pause()
|
||||
|
||||
// tb will collect 3 hours of revisions but not compact since paused
|
||||
n := int(time.Hour / checkCompactionInterval)
|
||||
for i := 0; i < 3*n; i++ {
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
}
|
||||
// tb ends up waiting for the clock
|
||||
|
||||
select {
|
||||
case a := <-compactable.Chan():
|
||||
t.Fatalf("unexpected action %v", a)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
|
||||
// tb resumes to being blocked on the clock
|
||||
tb.Resume()
|
||||
|
||||
// unblock clock, will kick off a compaction at hour 3:05
|
||||
rg.Wait(1)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// compact the revision from hour 2:05
|
||||
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
|
||||
if !reflect.DeepEqual(a[0].Params[0], wreq) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
|
||||
}
|
||||
}
|
106
compactor/revision.go
Normal file
106
compactor/revision.go
Normal file
@ -0,0 +1,106 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/jonboulle/clockwork"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
)
|
||||
|
||||
// Revision compacts the log by purging revisions older than
|
||||
// the configured reivison number. Compaction happens every 5 minutes.
|
||||
type Revision struct {
|
||||
clock clockwork.Clock
|
||||
retention int64
|
||||
|
||||
rg RevGetter
|
||||
c Compactable
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
mu sync.Mutex
|
||||
paused bool
|
||||
}
|
||||
|
||||
// NewRevision creates a new instance of Revisonal compactor that purges
|
||||
// the log older than retention revisions from the current revision.
|
||||
func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
|
||||
return &Revision{
|
||||
clock: clockwork.NewRealClock(),
|
||||
retention: retention,
|
||||
rg: rg,
|
||||
c: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Revision) Run() {
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
clock := t.clock
|
||||
previous := int64(0)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
return
|
||||
case <-clock.After(checkCompactionInterval):
|
||||
t.mu.Lock()
|
||||
p := t.paused
|
||||
t.mu.Unlock()
|
||||
if p {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
rev := t.rg.Rev() - t.retention
|
||||
|
||||
if rev <= 0 || rev == previous {
|
||||
continue
|
||||
}
|
||||
|
||||
plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
|
||||
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
|
||||
if err == nil || err == mvcc.ErrCompacted {
|
||||
previous = rev
|
||||
plog.Noticef("Finished auto-compaction at revision %d", rev)
|
||||
} else {
|
||||
plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev)
|
||||
plog.Noticef("Retry after %v", checkCompactionInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *Revision) Stop() {
|
||||
t.cancel()
|
||||
}
|
||||
|
||||
func (t *Revision) Pause() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = true
|
||||
}
|
||||
|
||||
func (t *Revision) Resume() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.paused = false
|
||||
}
|
117
compactor/revision_test.go
Normal file
117
compactor/revision_test.go
Normal file
@ -0,0 +1,117 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package compactor
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/jonboulle/clockwork"
|
||||
)
|
||||
|
||||
func TestRevision(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
tb := &Revision{
|
||||
clock: fc,
|
||||
retention: 10,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
|
||||
tb.Run()
|
||||
defer tb.Stop()
|
||||
|
||||
fc.Advance(checkCompactionInterval)
|
||||
rg.Wait(1)
|
||||
// nothing happens
|
||||
|
||||
rg.rev = 99 // will be 100
|
||||
expectedRevision := int64(90)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
rg.Wait(1)
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
|
||||
// skip the same revision
|
||||
rg.rev = 99 // will be 100
|
||||
expectedRevision = int64(90)
|
||||
rg.Wait(1)
|
||||
// nothing happens
|
||||
|
||||
rg.rev = 199 // will be 200
|
||||
expectedRevision = int64(190)
|
||||
fc.Advance(checkCompactionInterval)
|
||||
rg.Wait(1)
|
||||
a, err = compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRevisionPause(t *testing.T) {
|
||||
fc := clockwork.NewFakeClock()
|
||||
compactable := &fakeCompactable{testutil.NewRecorderStream()}
|
||||
rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
|
||||
tb := &Revision{
|
||||
clock: fc,
|
||||
retention: 10,
|
||||
rg: rg,
|
||||
c: compactable,
|
||||
}
|
||||
|
||||
tb.Run()
|
||||
tb.Pause()
|
||||
|
||||
// tb will collect 3 hours of revisions but not compact since paused
|
||||
n := int(time.Hour / checkCompactionInterval)
|
||||
for i := 0; i < 3*n; i++ {
|
||||
fc.Advance(checkCompactionInterval)
|
||||
}
|
||||
// tb ends up waiting for the clock
|
||||
|
||||
select {
|
||||
case a := <-compactable.Chan():
|
||||
t.Fatalf("unexpected action %v", a)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
|
||||
// tb resumes to being blocked on the clock
|
||||
tb.Resume()
|
||||
|
||||
// unblock clock, will kick off a compaction at hour 3:05
|
||||
fc.Advance(checkCompactionInterval)
|
||||
rg.Wait(1)
|
||||
a, err := compactable.Wait(1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wreq := &pb.CompactionRequest{Revision: int64(90)}
|
||||
if !reflect.DeepEqual(a[0].Params[0], wreq) {
|
||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
|
||||
}
|
||||
}
|
@ -80,6 +80,7 @@ type Config struct {
|
||||
Name string `json:"name"`
|
||||
SnapCount uint64 `json:"snapshot-count"`
|
||||
AutoCompactionRetention int `json:"auto-compaction-retention"`
|
||||
AutoCompactionMode string `json:"auto-compaction-mode"`
|
||||
|
||||
// TickMs is the number of milliseconds between heartbeat ticks.
|
||||
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
|
||||
|
@ -137,6 +137,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
TickMs: cfg.TickMs,
|
||||
ElectionTicks: cfg.ElectionTicks(),
|
||||
AutoCompactionRetention: cfg.AutoCompactionRetention,
|
||||
AutoCompactionMode: cfg.AutoCompactionMode,
|
||||
QuotaBackendBytes: cfg.QuotaBackendBytes,
|
||||
MaxTxnOps: cfg.MaxTxnOps,
|
||||
MaxRequestBytes: cfg.MaxRequestBytes,
|
||||
|
@ -201,7 +201,8 @@ func newConfig() *config {
|
||||
// version
|
||||
fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit.")
|
||||
|
||||
fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store in hour. 0 means disable auto compaction.")
|
||||
fs.IntVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", 0, "Auto compaction retention for mvcc key value store. 0 means disable auto compaction.")
|
||||
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", "periodic", "Interpret 'auto-compaction-retention' as hours when 'periodic', as revision numbers when 'revision'.")
|
||||
|
||||
// pprof profiler via HTTP
|
||||
fs.BoolVar(&cfg.EnablePprof, "enable-pprof", false, "Enable runtime profiling data via HTTP server. Address is at client URL + \"/debug/pprof/\"")
|
||||
|
@ -97,7 +97,9 @@ clustering flags:
|
||||
--strict-reconfig-check
|
||||
reject reconfiguration requests that would cause quorum loss.
|
||||
--auto-compaction-retention '0'
|
||||
auto compaction retention in hour. 0 means disable auto compaction.
|
||||
auto compaction retention length. 0 means disable auto compaction.
|
||||
--auto-compaction-mode 'periodic'
|
||||
'periodic' means hours, 'revision' means revision numbers to retain by auto compaction
|
||||
--enable-v2
|
||||
Accept etcd V2 client requests.
|
||||
|
||||
|
@ -53,6 +53,7 @@ type ServerConfig struct {
|
||||
BootstrapTimeout time.Duration
|
||||
|
||||
AutoCompactionRetention int
|
||||
AutoCompactionMode string
|
||||
QuotaBackendBytes int64
|
||||
MaxTxnOps uint
|
||||
|
||||
|
@ -221,7 +221,7 @@ type EtcdServer struct {
|
||||
|
||||
SyncTicker *time.Ticker
|
||||
// compactor is used to auto-compact the KV.
|
||||
compactor *compactor.Periodic
|
||||
compactor compactor.Compactor
|
||||
|
||||
// peerRt used to send requests (version, lease) to peers.
|
||||
peerRt http.RoundTripper
|
||||
@ -469,8 +469,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
return nil, err
|
||||
}
|
||||
srv.authStore = auth.NewAuthStore(srv.be, tp)
|
||||
if h := cfg.AutoCompactionRetention; h != 0 {
|
||||
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
|
||||
if num := cfg.AutoCompactionRetention; num != 0 {
|
||||
srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
srv.compactor.Run()
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user