mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
parent
6dbdf6e55f
commit
c920ce0453
@ -23,6 +23,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error {
|
func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error {
|
||||||
|
return purgeFile(dirname, suffix, max, interval, stop, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// purgeFile is the internal implementation for PurgeFile which can post purged files to purgec if non-nil.
|
||||||
|
func purgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}, purgec chan<- string) <-chan error {
|
||||||
errC := make(chan error, 1)
|
errC := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@ -38,6 +43,7 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Strings(newfnames)
|
sort.Strings(newfnames)
|
||||||
|
fnames = newfnames
|
||||||
for len(newfnames) > int(max) {
|
for len(newfnames) > int(max) {
|
||||||
f := path.Join(dirname, newfnames[0])
|
f := path.Join(dirname, newfnames[0])
|
||||||
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
|
l, err := TryLockFile(f, os.O_WRONLY, PrivateFileMode)
|
||||||
@ -56,6 +62,11 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration,
|
|||||||
plog.Infof("purged file %s successfully", f)
|
plog.Infof("purged file %s successfully", f)
|
||||||
newfnames = newfnames[1:]
|
newfnames = newfnames[1:]
|
||||||
}
|
}
|
||||||
|
if purgec != nil {
|
||||||
|
for i := 0; i < len(fnames)-len(newfnames); i++ {
|
||||||
|
purgec <- fnames[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-time.After(interval):
|
case <-time.After(interval):
|
||||||
case <-stop:
|
case <-stop:
|
||||||
|
@ -31,44 +31,48 @@ func TestPurgeFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
// minimal file set
|
||||||
var f *os.File
|
for i := 0; i < 3; i++ {
|
||||||
f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
|
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
|
||||||
if err != nil {
|
if ferr != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
stop := make(chan struct{})
|
stop, purgec := make(chan struct{}), make(chan string, 10)
|
||||||
|
|
||||||
// keep at most 3 most recent files
|
// keep 3 most recent files
|
||||||
errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
|
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec)
|
||||||
|
select {
|
||||||
// create 5 more files
|
case f := <-purgec:
|
||||||
for i := 5; i < 10; i++ {
|
t.Errorf("unexpected purge on %q", f)
|
||||||
var f *os.File
|
case <-time.After(10 * time.Millisecond):
|
||||||
f, err = os.Create(path.Join(dir, fmt.Sprintf("%d.test", i)))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// purge routine should purge 7 out of 10 files and only keep the
|
// rest of the files
|
||||||
// 3 most recent ones.
|
for i := 4; i < 10; i++ {
|
||||||
// Wait for purging for at most 300ms.
|
go func(n int) {
|
||||||
var fnames []string
|
f, ferr := os.Create(path.Join(dir, fmt.Sprintf("%d.test", n)))
|
||||||
for i := 0; i < 30; i++ {
|
if ferr != nil {
|
||||||
fnames, err = ReadDir(dir)
|
t.Fatal(err)
|
||||||
if err != nil {
|
}
|
||||||
t.Fatal(err)
|
f.Close()
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// watch files purge away
|
||||||
|
for i := 4; i < 10; i++ {
|
||||||
|
select {
|
||||||
|
case <-purgec:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Errorf("purge took too long")
|
||||||
}
|
}
|
||||||
if len(fnames) <= 3 {
|
}
|
||||||
break
|
|
||||||
}
|
fnames, rerr := ReadDir(dir)
|
||||||
time.Sleep(10 * time.Millisecond)
|
if rerr != nil {
|
||||||
|
t.Fatal(rerr)
|
||||||
}
|
}
|
||||||
wnames := []string{"7.test", "8.test", "9.test"}
|
wnames := []string{"7.test", "8.test", "9.test"}
|
||||||
if !reflect.DeepEqual(fnames, wnames) {
|
if !reflect.DeepEqual(fnames, wnames) {
|
||||||
@ -77,9 +81,11 @@ func TestPurgeFile(t *testing.T) {
|
|||||||
|
|
||||||
// no error should be reported from purge routine
|
// no error should be reported from purge routine
|
||||||
select {
|
select {
|
||||||
|
case f := <-purgec:
|
||||||
|
t.Errorf("unexpected purge on %q", f)
|
||||||
case err := <-errch:
|
case err := <-errch:
|
||||||
t.Errorf("unexpected purge error %v", err)
|
t.Errorf("unexpected purge error %v", err)
|
||||||
case <-time.After(time.Millisecond):
|
case <-time.After(10 * time.Millisecond):
|
||||||
}
|
}
|
||||||
close(stop)
|
close(stop)
|
||||||
}
|
}
|
||||||
@ -107,29 +113,33 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stop := make(chan struct{})
|
stop, purgec := make(chan struct{}), make(chan string, 10)
|
||||||
errch := PurgeFile(dir, "test", 3, time.Millisecond, stop)
|
errch := purgeFile(dir, "test", 3, time.Millisecond, stop, purgec)
|
||||||
|
|
||||||
var fnames []string
|
for i := 0; i < 5; i++ {
|
||||||
for i := 0; i < 10; i++ {
|
select {
|
||||||
fnames, err = ReadDir(dir)
|
case <-purgec:
|
||||||
if err != nil {
|
case <-time.After(time.Second):
|
||||||
t.Fatal(err)
|
t.Fatalf("purge took too long")
|
||||||
}
|
}
|
||||||
if len(fnames) <= 5 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fnames, rerr := ReadDir(dir)
|
||||||
|
if rerr != nil {
|
||||||
|
t.Fatal(rerr)
|
||||||
|
}
|
||||||
|
|
||||||
wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"}
|
wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"}
|
||||||
if !reflect.DeepEqual(fnames, wnames) {
|
if !reflect.DeepEqual(fnames, wnames) {
|
||||||
t.Errorf("filenames = %v, want %v", fnames, wnames)
|
t.Errorf("filenames = %v, want %v", fnames, wnames)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
case s := <-purgec:
|
||||||
|
t.Errorf("unexpected purge %q", s)
|
||||||
case err = <-errch:
|
case err = <-errch:
|
||||||
t.Errorf("unexpected purge error %v", err)
|
t.Errorf("unexpected purge error %v", err)
|
||||||
case <-time.After(time.Millisecond):
|
case <-time.After(10 * time.Millisecond):
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the purge barrier
|
// remove the purge barrier
|
||||||
@ -137,15 +147,18 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
// wait for rest of purges (5, 6)
|
||||||
fnames, err = ReadDir(dir)
|
for i := 0; i < 2; i++ {
|
||||||
if err != nil {
|
select {
|
||||||
t.Fatal(err)
|
case <-purgec:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("purge took too long")
|
||||||
}
|
}
|
||||||
if len(fnames) <= 3 {
|
}
|
||||||
break
|
|
||||||
}
|
fnames, rerr = ReadDir(dir)
|
||||||
time.Sleep(10 * time.Millisecond)
|
if rerr != nil {
|
||||||
|
t.Fatal(rerr)
|
||||||
}
|
}
|
||||||
wnames = []string{"7.test", "8.test", "9.test"}
|
wnames = []string{"7.test", "8.test", "9.test"}
|
||||||
if !reflect.DeepEqual(fnames, wnames) {
|
if !reflect.DeepEqual(fnames, wnames) {
|
||||||
@ -153,9 +166,11 @@ func TestPurgeFileHoldingLockFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
case f := <-purgec:
|
||||||
|
t.Errorf("unexpected purge on %q", f)
|
||||||
case err := <-errch:
|
case err := <-errch:
|
||||||
t.Errorf("unexpected purge error %v", err)
|
t.Errorf("unexpected purge error %v", err)
|
||||||
case <-time.After(time.Millisecond):
|
case <-time.After(10 * time.Millisecond):
|
||||||
}
|
}
|
||||||
|
|
||||||
close(stop)
|
close(stop)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user