From d3db010190f87091dd9da9dbd1a62adb788bd396 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 28 Nov 2014 21:32:28 -0800 Subject: [PATCH] *: support purging old wal/snap files --- etcdmain/etcd.go | 4 +++ etcdserver/config.go | 2 ++ etcdserver/server.go | 24 ++++++++++++++++++ pkg/fileutil/fileutil.go | 14 +++++++++++ pkg/fileutil/purge.go | 46 +++++++++++++++++++++++++++++++++++ pkg/fileutil/purge_test.go | 50 ++++++++++++++++++++++++++++++++++++++ wal/util.go | 20 +++------------ wal/wal.go | 3 ++- 8 files changed, 145 insertions(+), 18 deletions(-) create mode 100644 pkg/fileutil/purge.go create mode 100644 pkg/fileutil/purge_test.go diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index ca9a73c7a..e3b965b0d 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -63,6 +63,8 @@ var ( snapCount = fs.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") printVersion = fs.Bool("version", false, "Print the version and exit") forceNewCluster = fs.Bool("force-new-cluster", false, "Force to create a new one member cluster") + maxSnapFiles = fs.Uint("max-snapshots", 5, "Maximum number of snapshot files to retain (0 is unlimited)") + maxWalFiles = fs.Uint("max-wals", 5, "Maximum number of wal files to retain (0 is unlimited)") initialCluster = fs.String("initial-cluster", "default=http://localhost:2380,default=http://localhost:7001", "Initial cluster configuration for bootstrapping") initialClusterToken = fs.String("initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during bootstrap") @@ -280,6 +282,8 @@ func startEtcd() (<-chan struct{}, error) { PeerURLs: apurls, DataDir: *dir, SnapCount: *snapCount, + MaxSnapFiles: *maxSnapFiles, + MaxWALFiles: *maxWalFiles, Cluster: cls, DiscoveryURL: *durl, DiscoveryProxy: *dproxy, diff --git a/etcdserver/config.go b/etcdserver/config.go index 918f13bef..3d8f4df5b 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -37,6 +37,8 @@ type ServerConfig struct { PeerURLs types.URLs DataDir string SnapCount uint64 + MaxSnapFiles uint + MaxWALFiles uint Cluster *Cluster NewCluster bool ForceNewCluster bool diff --git a/etcdserver/server.go b/etcdserver/server.go index a36ae47ee..fdbcb84e3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -37,6 +37,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/migrate" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/wait" @@ -59,6 +60,8 @@ const ( StoreAdminPrefix = "/0" StoreKeysPrefix = "/1" + + purgeFileInterval = 30 * time.Second ) var ( @@ -157,6 +160,7 @@ type RaftTimer interface { // EtcdServer is the production implementation of the Server interface type EtcdServer struct { + cfg *ServerConfig w wait.Wait done chan struct{} stop chan struct{} @@ -301,6 +305,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { lstats := stats.NewLeaderStats(id.String()) srv := &EtcdServer{ + cfg: cfg, store: st, node: n, raftStorage: s, @@ -327,6 +332,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { func (s *EtcdServer) Start() { s.start() go s.publish(defaultPublishRetryInterval) + go s.purgeFile() } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -346,6 +352,24 @@ func (s *EtcdServer) start() { go s.run() } +func (s *EtcdServer) purgeFile() { + var serrc, werrc <-chan error + if s.cfg.MaxSnapFiles > 0 { + serrc = fileutil.PurgeFile(s.cfg.SnapDir(), "snap", s.cfg.MaxSnapFiles, purgeFileInterval, s.done) + } + if s.cfg.MaxWALFiles > 0 { + werrc = fileutil.PurgeFile(s.cfg.WALDir(), "wal", s.cfg.MaxWALFiles, purgeFileInterval, s.done) + } + select { + case e := <-werrc: + log.Fatalf("etcdserver: failed to purge wal file %v", e) + case e := <-serrc: + log.Fatalf("etcdserver: failed to purge snap file %v", e) + case <-s.done: + return + } +} + func (s *EtcdServer) ID() types.ID { return s.id } func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub } diff --git a/pkg/fileutil/fileutil.go b/pkg/fileutil/fileutil.go index 382d7b0fa..d807e6c1b 100644 --- a/pkg/fileutil/fileutil.go +++ b/pkg/fileutil/fileutil.go @@ -35,3 +35,17 @@ func IsDirWriteable(dir string) error { } return os.Remove(f) } + +// ReadDir returns the filenames in the given directory. +func ReadDir(dirpath string) ([]string, error) { + dir, err := os.Open(dirpath) + if err != nil { + return nil, err + } + defer dir.Close() + names, err := dir.Readdirnames(-1) + if err != nil { + return nil, err + } + return names, nil +} diff --git a/pkg/fileutil/purge.go b/pkg/fileutil/purge.go new file mode 100644 index 000000000..8b3e02894 --- /dev/null +++ b/pkg/fileutil/purge.go @@ -0,0 +1,46 @@ +package fileutil + +import ( + "log" + "os" + "path" + "sort" + "strings" + "time" +) + +func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error { + errC := make(chan error, 1) + go func() { + for { + fnames, err := ReadDir(dirname) + if err != nil { + errC <- err + return + } + newfnames := make([]string, 0) + for _, fname := range fnames { + if strings.HasSuffix(fname, suffix) { + newfnames = append(newfnames, fname) + } + } + sort.Strings(newfnames) + for len(newfnames) > int(max) { + f := path.Join(dirname, newfnames[0]) + err := os.Remove(f) + if err != nil { + errC <- err + return + } + log.Printf("filePurge: successfully remvoed file %s", f) + newfnames = newfnames[1:] + } + select { + case <-time.After(interval): + case <-stop: + return + } + } + }() + return errC +} diff --git a/pkg/fileutil/purge_test.go b/pkg/fileutil/purge_test.go new file mode 100644 index 000000000..b60804bd7 --- /dev/null +++ b/pkg/fileutil/purge_test.go @@ -0,0 +1,50 @@ +package fileutil + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "reflect" + "testing" + "time" +) + +func TestPurgeFile(t *testing.T) { + dir, err := ioutil.TempDir("", "purgefile") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + for i := 0; i < 5; i++ { + _, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + } + + stop := make(chan struct{}) + errch := PurgeFile(dir, "test", 3, time.Millisecond, stop) + for i := 5; i < 10; i++ { + _, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Millisecond) + } + fnames, err := ReadDir(dir) + if err != nil { + t.Fatal(err) + } + wnames := []string{"7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + close(stop) +} diff --git a/wal/util.go b/wal/util.go index b3be44e8a..8d13f2137 100644 --- a/wal/util.go +++ b/wal/util.go @@ -19,9 +19,9 @@ package wal import ( "fmt" "log" - "os" "path" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/types" ) @@ -36,7 +36,7 @@ const ( ) func DetectVersion(dirpath string) WalVersion { - names, err := readDir(dirpath) + names, err := fileutil.ReadDir(dirpath) if err != nil || len(names) == 0 { return WALNotExist } @@ -56,7 +56,7 @@ func DetectVersion(dirpath string) WalVersion { } func Exist(dirpath string) bool { - names, err := readDir(dirpath) + names, err := fileutil.ReadDir(dirpath) if err != nil { return false } @@ -97,20 +97,6 @@ func isValidSeq(names []string) bool { return true } -// readDir returns the filenames in wal directory. -func readDir(dirpath string) ([]string, error) { - dir, err := os.Open(dirpath) - if err != nil { - return nil, err - } - defer dir.Close() - names, err := dir.Readdirnames(-1) - if err != nil { - return nil, err - } - return names, nil -} - func checkWalNames(names []string) []string { wnames := make([]string, 0) for _, name := range names { diff --git a/wal/wal.go b/wal/wal.go index 9585e3bb1..42f8d289d 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -26,6 +26,7 @@ import ( "reflect" "sort" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" @@ -110,7 +111,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { // index. The WAL cannot be appended to before reading out all of its // previous records. func OpenAtIndex(dirpath string, index uint64) (*WAL, error) { - names, err := readDir(dirpath) + names, err := fileutil.ReadDir(dirpath) if err != nil { return nil, err }