diff --git a/compactor/compactor.go b/compactor/compactor.go new file mode 100644 index 000000000..ad9967b43 --- /dev/null +++ b/compactor/compactor.go @@ -0,0 +1,133 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor + +import ( + "sync" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/storage" +) + +var ( + plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver") +) + +const ( + checkCompactionInterval = 5 * time.Minute +) + +type Compactable interface { + Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) +} + +type RevGetter interface { + Rev() int64 +} + +type Periodic struct { + clock clockwork.Clock + periodInHour int + + rg RevGetter + c Compactable + + revs []int64 + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + paused bool +} + +func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { + return &Periodic{ + clock: clockwork.NewRealClock(), + periodInHour: h, + rg: rg, + c: c, + } +} + +func (t *Periodic) Run() { + t.ctx, t.cancel = context.WithCancel(context.Background()) + t.revs = make([]int64, 0) + clock := t.clock + + go func() { + last := clock.Now() + for { + t.revs = append(t.revs, t.rg.Rev()) + select { + case <-t.ctx.Done(): + return + case <-clock.After(checkCompactionInterval): + t.mu.Lock() + p := t.paused + t.mu.Unlock() + if p { + continue + } + } + if clock.Now().Sub(last) < time.Duration(t.periodInHour)*time.Hour { + continue + } + + rev := t.getRev(t.periodInHour) + if rev < 0 { + continue + } + + plog.Noticef("Starting auto-compaction at revision %d", rev) + _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == storage.ErrCompacted { + t.revs = make([]int64, 0) + last = clock.Now() + plog.Noticef("Finished auto-compaction at revision %d", rev) + } else { + plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) + plog.Noticef("Retry after %v", checkCompactionInterval) + } + } + }() +} + +func (t *Periodic) Stop() { + t.cancel() +} + +func (t *Periodic) Pause() { + t.mu.Lock() + defer t.mu.Unlock() + t.paused = true +} + +func (t *Periodic) Resume() { + t.mu.Lock() + defer t.mu.Unlock() + t.paused = false +} + +func (t *Periodic) getRev(h int) int64 { + i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) + if i < 0 { + return -1 + } + return t.revs[i] +} diff --git a/compactor/compactor_test.go b/compactor/compactor_test.go new file mode 100644 index 000000000..3b378b6ff --- /dev/null +++ b/compactor/compactor_test.go @@ -0,0 +1,111 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compactor + +import ( + "reflect" + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/testutil" +) + +func TestPeriodic(t *testing.T) { + fc := clockwork.NewFakeClock() + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := &Periodic{ + clock: fc, + periodInHour: 1, + rg: &fakeRevGetter{}, + c: compactable, + } + + tb.Run() + defer tb.Stop() + + n := int(time.Hour / checkCompactionInterval) + for i := 0; i < 3; i++ { + for j := 0; j < n; j++ { + time.Sleep(5 * time.Millisecond) + fc.Advance(checkCompactionInterval) + } + + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) + } + } +} + +func TestPeriodicPause(t *testing.T) { + fc := clockwork.NewFakeClock() + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := &Periodic{ + clock: fc, + periodInHour: 1, + rg: &fakeRevGetter{}, + c: compactable, + } + + tb.Run() + tb.Pause() + + n := int(time.Hour / checkCompactionInterval) + for i := 0; i < 3*n; i++ { + time.Sleep(5 * time.Millisecond) + fc.Advance(checkCompactionInterval) + } + + select { + case a := <-compactable.Chan(): + t.Fatal("unexpected action %v", a) + case <-time.After(10 * time.Millisecond): + } + + tb.Resume() + fc.Advance(checkCompactionInterval) + + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(2*n) + 2}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(2*n) + 2}) + } +} + +type fakeCompactable struct { + testutil.Recorder +} + +func (fc *fakeCompactable) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + fc.Record(testutil.Action{Name: "c", Params: []interface{}{r}}) + return &pb.CompactionResponse{}, nil +} + +type fakeRevGetter struct { + rev int64 +} + +func (fr *fakeRevGetter) Rev() int64 { + fr.rev++ + return fr.rev +} diff --git a/etcdmain/config.go b/etcdmain/config.go index 6e4d03c3c..5fbc69fe2 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -121,8 +121,9 @@ type config struct { printVersion bool - v3demo bool - gRPCAddr string + v3demo bool + gRPCAddr string + autoCompactionRetention int enablePprof bool @@ -224,6 +225,7 @@ func NewConfig() *config { // demo flag fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.") fs.StringVar(&cfg.gRPCAddr, "experimental-gRPC-addr", "127.0.0.1:2378", "gRPC address for experimental v3 demo API.") + fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.") // backwards-compatibility with v0.4.6 fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.") diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index ffc43926a..4cf580f31 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -274,26 +274,27 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { } srvcfg := &etcdserver.ServerConfig{ - Name: cfg.name, - ClientURLs: cfg.acurls, - PeerURLs: cfg.apurls, - DataDir: cfg.dir, - DedicatedWALDir: cfg.walDir, - SnapCount: cfg.snapCount, - MaxSnapFiles: cfg.maxSnapFiles, - MaxWALFiles: cfg.maxWalFiles, - InitialPeerURLsMap: urlsmap, - InitialClusterToken: token, - DiscoveryURL: cfg.durl, - DiscoveryProxy: cfg.dproxy, - NewCluster: cfg.isNewCluster(), - ForceNewCluster: cfg.forceNewCluster, - PeerTLSInfo: cfg.peerTLSInfo, - TickMs: cfg.TickMs, - ElectionTicks: cfg.electionTicks(), - V3demo: cfg.v3demo, - StrictReconfigCheck: cfg.strictReconfigCheck, - EnablePprof: cfg.enablePprof, + Name: cfg.name, + ClientURLs: cfg.acurls, + PeerURLs: cfg.apurls, + DataDir: cfg.dir, + DedicatedWALDir: cfg.walDir, + SnapCount: cfg.snapCount, + MaxSnapFiles: cfg.maxSnapFiles, + MaxWALFiles: cfg.maxWalFiles, + InitialPeerURLsMap: urlsmap, + InitialClusterToken: token, + DiscoveryURL: cfg.durl, + DiscoveryProxy: cfg.dproxy, + NewCluster: cfg.isNewCluster(), + ForceNewCluster: cfg.forceNewCluster, + PeerTLSInfo: cfg.peerTLSInfo, + TickMs: cfg.TickMs, + ElectionTicks: cfg.electionTicks(), + V3demo: cfg.v3demo, + AutoCompactionRetention: cfg.autoCompactionRetention, + StrictReconfigCheck: cfg.strictReconfigCheck, + EnablePprof: cfg.enablePprof, } var s *etcdserver.EtcdServer s, err = etcdserver.NewServer(srvcfg) diff --git a/etcdmain/help.go b/etcdmain/help.go index 1860df2db..3e7cfb7da 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -137,6 +137,8 @@ experimental flags: --experimental-v3demo 'false' enable experimental v3 demo API. + --experimental-auto-compaction-retention '0' + auto compaction retention in hour. 0 means disable auto compaction. --experimental-gRPC-addr '127.0.0.1:2378' gRPC address for experimental v3 demo API. diff --git a/etcdserver/config.go b/etcdserver/config.go index 74b0f2de1..481b908df 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -50,7 +50,8 @@ type ServerConfig struct { ElectionTicks int BootstrapTimeout time.Duration - V3demo bool + V3demo bool + AutoCompactionRetention int StrictReconfigCheck bool diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 9cc77b984..5d299ef7d 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -159,10 +159,16 @@ func (r *raftNode) start(s *EtcdServer) { if r.s.stats != nil { r.s.stats.BecomeLeader() } + if r.s.compactor != nil { + r.s.compactor.Resume() + } } else { if r.s.lessor != nil { r.s.lessor.Demote() } + if r.s.compactor != nil { + r.s.compactor.Pause() + } syncC = nil } } diff --git a/etcdserver/server.go b/etcdserver/server.go index cc0a3c6f4..0b79fecd4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -30,6 +30,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/compactor" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -179,6 +180,8 @@ type EtcdServer struct { lstats *stats.LeaderStats SyncTicker <-chan time.Time + // compactor is used to auto-compact the KV. + compactor *compactor.Periodic // consistent index used to hold the offset of current executing entry // It is initialized to 0 before executing any entry. @@ -368,6 +371,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) srv.lessor = lease.NewLessor(srv.be) srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex) + if h := cfg.AutoCompactionRetention; h != 0 { + srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) + srv.compactor.Run() + } } // TODO: move transport initialization near the definition of remote @@ -518,6 +525,9 @@ func (s *EtcdServer) run() { if s.be != nil { s.be.Close() } + if s.compactor != nil { + s.compactor.Stop() + } close(s.done) }()