mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #18608 from srivastav-abhishek/periodic-compaction-flake-fix
Fixed periodic compaction tests
This commit is contained in:
commit
2f9532bfca
@ -115,7 +115,10 @@ func (r *recorderStream) Chan() <-chan Action {
|
|||||||
|
|
||||||
func (r *recorderStream) Wait(n int) ([]Action, error) {
|
func (r *recorderStream) Wait(n int) ([]Action, error) {
|
||||||
acts := make([]Action, n)
|
acts := make([]Action, n)
|
||||||
timeoutC := time.After(r.waitTimeout)
|
var timeoutC <-chan time.Time
|
||||||
|
if r.waitTimeout != 0 {
|
||||||
|
timeoutC = time.After(r.waitTimeout)
|
||||||
|
}
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
select {
|
select {
|
||||||
case acts[i] = <-r.ch:
|
case acts[i] = <-r.ch:
|
||||||
|
@ -33,7 +33,7 @@ func TestPeriodicHourly(t *testing.T) {
|
|||||||
|
|
||||||
fc := clockwork.NewFakeClock()
|
fc := clockwork.NewFakeClock()
|
||||||
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
|
// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
|
||||||
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
|
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
|
||||||
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
||||||
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
||||||
|
|
||||||
@ -43,8 +43,8 @@ func TestPeriodicHourly(t *testing.T) {
|
|||||||
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
||||||
|
|
||||||
// compaction doesn't happen til 2 hours elapse
|
// compaction doesn't happen til 2 hours elapse
|
||||||
for i := 0; i < initialIntervals; i++ {
|
for i := 0; i < initialIntervals-1; i++ {
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,7 +63,7 @@ func TestPeriodicHourly(t *testing.T) {
|
|||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
// advance one hour, one revision for each interval
|
// advance one hour, one revision for each interval
|
||||||
for j := 0; j < intervalsPerPeriod; j++ {
|
for j := 0; j < intervalsPerPeriod; j++ {
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,7 +84,7 @@ func TestPeriodicMinutes(t *testing.T) {
|
|||||||
retentionDuration := time.Duration(retentionMinutes) * time.Minute
|
retentionDuration := time.Duration(retentionMinutes) * time.Minute
|
||||||
|
|
||||||
fc := clockwork.NewFakeClock()
|
fc := clockwork.NewFakeClock()
|
||||||
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
|
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
|
||||||
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
||||||
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
||||||
|
|
||||||
@ -94,8 +94,8 @@ func TestPeriodicMinutes(t *testing.T) {
|
|||||||
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
||||||
|
|
||||||
// compaction doesn't happen til 5 minutes elapse
|
// compaction doesn't happen til 5 minutes elapse
|
||||||
for i := 0; i < initialIntervals; i++ {
|
for i := 0; i < initialIntervals-1; i++ {
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,7 +113,7 @@ func TestPeriodicMinutes(t *testing.T) {
|
|||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
// advance 5-minute, one revision for each interval
|
// advance 5-minute, one revision for each interval
|
||||||
for j := 0; j < intervalsPerPeriod; j++ {
|
for j := 0; j < intervalsPerPeriod; j++ {
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,7 +132,7 @@ func TestPeriodicMinutes(t *testing.T) {
|
|||||||
func TestPeriodicPause(t *testing.T) {
|
func TestPeriodicPause(t *testing.T) {
|
||||||
fc := clockwork.NewFakeClock()
|
fc := clockwork.NewFakeClock()
|
||||||
retentionDuration := time.Hour
|
retentionDuration := time.Hour
|
||||||
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
|
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
|
||||||
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
||||||
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
||||||
|
|
||||||
@ -143,7 +143,7 @@ func TestPeriodicPause(t *testing.T) {
|
|||||||
|
|
||||||
// tb will collect 3 hours of revisions but not compact since paused
|
// tb will collect 3 hours of revisions but not compact since paused
|
||||||
for i := 0; i < n*3; i++ {
|
for i := 0; i < n*3; i++ {
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
// t.revs = [21 22 23 24 25 26 27 28 29 30]
|
// t.revs = [21 22 23 24 25 26 27 28 29 30]
|
||||||
@ -156,7 +156,7 @@ func TestPeriodicPause(t *testing.T) {
|
|||||||
|
|
||||||
// tb resumes to being blocked on the clock
|
// tb resumes to being blocked on the clock
|
||||||
tb.Resume()
|
tb.Resume()
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
|
|
||||||
// unblock clock, will kick off a compaction at T=3h6m by retry
|
// unblock clock, will kick off a compaction at T=3h6m by retry
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
@ -179,7 +179,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
|
|||||||
retentionDuration := time.Duration(retentionMinutes) * time.Minute
|
retentionDuration := time.Duration(retentionMinutes) * time.Minute
|
||||||
|
|
||||||
fc := clockwork.NewFakeClock()
|
fc := clockwork.NewFakeClock()
|
||||||
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
|
rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(0), 0}
|
||||||
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
|
||||||
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable)
|
||||||
|
|
||||||
@ -189,10 +189,10 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
|
|||||||
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
|
||||||
|
|
||||||
// first compaction happens til 5 minutes elapsed
|
// first compaction happens til 5 minutes elapsed
|
||||||
for i := 0; i < initialIntervals; i++ {
|
for i := 0; i < initialIntervals-1; i++ {
|
||||||
// every time set the same revision with 100
|
// every time set the same revision with 100
|
||||||
rg.SetRev(int64(100))
|
rg.SetRev(int64(100))
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,7 +212,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
|
|||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
for j := 0; j < intervalsPerPeriod; j++ {
|
for j := 0; j < intervalsPerPeriod; j++ {
|
||||||
rg.SetRev(int64(100))
|
rg.SetRev(int64(100))
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,7 +224,7 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
|
|||||||
|
|
||||||
// when revision changed, compaction is normally
|
// when revision changed, compaction is normally
|
||||||
for i := 0; i < initialIntervals; i++ {
|
for i := 0; i < initialIntervals; i++ {
|
||||||
rg.Wait(1)
|
waitOneAction(t, rg)
|
||||||
fc.Advance(tb.getRetryInterval())
|
fc.Advance(tb.getRetryInterval())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,3 +238,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) {
|
|||||||
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waitOneAction(t *testing.T, r testutil.Recorder) {
|
||||||
|
if actions, _ := r.Wait(1); len(actions) != 1 {
|
||||||
|
t.Errorf("expect 1 action, got %v instead", len(actions))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user