diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go new file mode 100644 index 000000000..e680f8554 --- /dev/null +++ b/pkg/schedule/schedule.go @@ -0,0 +1,172 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedule + +import ( + "sync" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" +) + +type Job func(context.Context) + +// Scheduler can schedule jobs. +type Scheduler interface { + // Schedule asks the scheduler to schedule a job defined by the given func. + // Schedule to a stopped scheduler might panic. + Schedule(j Job) + + // Pending returns number of pending jobs + Pending() int + + // Scheduled returns the number of scheduled jobs (excluding pending jobs) + Scheduled() int + + // Finished returns the number of finished jobs + Finished() int + + // WaitFinish waits all pending jobs to finish. + WaitFinish() + + // Stop stops the scheduler. + Stop() +} + +type fifo struct { + mu sync.Mutex + + resume chan struct{} + scheduled int + finished int + pendings []Job + + ctx context.Context + cancel context.CancelFunc + + finishCond *sync.Cond + donec chan struct{} +} + +// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO +// order sequentially +func NewFIFOScheduler() Scheduler { + f := &fifo{ + resume: make(chan struct{}, 1), + donec: make(chan struct{}, 1), + } + f.finishCond = sync.NewCond(&f.mu) + f.ctx, f.cancel = context.WithCancel(context.Background()) + go f.run() + return f +} + +// Schedule schedules a job that will be ran in FIFO order sequentially. +func (f *fifo) Schedule(j Job) { + f.mu.Lock() + defer f.mu.Unlock() + + if f.cancel == nil { + panic("schedule: schedule to stopped scheduler") + } + + if len(f.pendings) == 0 { + select { + case f.resume <- struct{}{}: + default: + } + } + f.pendings = append(f.pendings, j) + + return +} + +func (f *fifo) Pending() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.pendings) +} + +func (f *fifo) Scheduled() int { + f.mu.Lock() + defer f.mu.Unlock() + return f.scheduled +} + +func (f *fifo) Finished() int { + f.finishCond.L.Lock() + defer f.finishCond.L.Unlock() + return f.finished +} + +func (f *fifo) WaitFinish() { + f.finishCond.L.Lock() + finish := f.finished + f.finishCond.L.Unlock() + + f.finishCond.L.Lock() + for f.finished == finish || len(f.pendings) != 0 { + f.finishCond.Wait() + } + f.finishCond.L.Unlock() +} + +// Stop stops the scheduler and cancels all pending jobs. +func (f *fifo) Stop() { + f.mu.Lock() + f.cancel() + f.cancel = nil + f.mu.Unlock() + <-f.donec +} + +func (f *fifo) run() { + // TODO: recover from job panic? + defer func() { + close(f.donec) + close(f.resume) + }() + + for { + var todo Job + f.mu.Lock() + if len(f.pendings) != 0 { + f.scheduled++ + todo = f.pendings[0] + } + f.mu.Unlock() + if todo == nil { + select { + case <-f.resume: + case <-f.ctx.Done(): + f.mu.Lock() + pendings := f.pendings + f.pendings = nil + f.mu.Unlock() + // clean up pending jobs + for _, todo := range pendings { + todo(f.ctx) + } + return + } + } else { + todo(f.ctx) + f.finishCond.L.Lock() + f.finished++ + f.pendings = f.pendings[1:] + f.finishCond.Broadcast() + f.finishCond.L.Unlock() + } + } +} diff --git a/pkg/schedule/schedule_test.go b/pkg/schedule/schedule_test.go new file mode 100644 index 000000000..9c61ec6e2 --- /dev/null +++ b/pkg/schedule/schedule_test.go @@ -0,0 +1,50 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedule + +import ( + "testing" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" +) + +func TestFIFOSchedule(t *testing.T) { + s := NewFIFOScheduler() + defer s.Stop() + + next := 0 + jobCreator := func(i int) Job { + return func(ctx context.Context) { + if next != i { + t.Fatalf("job#%d: got %d, want %d", next, i) + } + next = i + 1 + } + } + + var jobs []Job + for i := 0; i < 100; i++ { + jobs = append(jobs, jobCreator(i)) + } + + for _, j := range jobs { + s.Schedule(j) + } + + s.WaitFinish() + if s.Scheduled() != 100 { + t.Errorf("scheduled = %d, want %d", s.Scheduled(), 100) + } +} diff --git a/storage/kvstore.go b/storage/kvstore.go index c50048e74..626d9ca7f 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -22,7 +22,9 @@ import ( "sync" "time" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/pkg/schedule" "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" ) @@ -62,9 +64,9 @@ type store struct { tx backend.BatchTx txnID int64 // tracks the current txnID to verify txn operations - changes []storagepb.KeyValue + changes []storagepb.KeyValue + fifoSched schedule.Scheduler - wg sync.WaitGroup stopc chan struct{} } @@ -79,7 +81,10 @@ func NewStore(b backend.Backend, le lease.Lessor) *store { currentRev: revision{main: 1}, compactMainRev: -1, - stopc: make(chan struct{}), + + fifoSched: schedule.NewFIFOScheduler(), + + stopc: make(chan struct{}), } if s.le != nil { @@ -239,8 +244,16 @@ func (s *store) Compact(rev int64) error { keep := s.kvindex.Compact(rev) - s.wg.Add(1) - go s.scheduleCompaction(rev, keep) + var j = func(ctx context.Context) { + select { + case <-ctx.Done(): + return + default: + } + s.scheduleCompaction(rev, keep) + } + + s.fifoSched.Schedule(j) indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond)) return nil @@ -258,10 +271,7 @@ func (s *store) Restore(b backend.Backend) error { defer s.mu.Unlock() close(s.stopc) - // TODO: restore without waiting for compaction routine to finish. - // We need a way to notify that the store is finished using the old - // backend though. - s.wg.Wait() + s.fifoSched.Stop() s.b = b s.kvindex = newTreeIndex() @@ -269,6 +279,7 @@ func (s *store) Restore(b backend.Backend) error { s.compactMainRev = -1 s.tx = b.BatchTx() s.txnID = -1 + s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) return s.restore() @@ -340,7 +351,7 @@ func (s *store) restore() error { func (s *store) Close() error { close(s.stopc) - s.wg.Wait() + s.fifoSched.Stop() return nil } diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go index 78d768683..840430a8b 100644 --- a/storage/kvstore_compaction.go +++ b/storage/kvstore_compaction.go @@ -20,8 +20,6 @@ import ( ) func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) { - defer s.wg.Done() - totalStart := time.Now() defer dbCompactionTotalDurations.Observe(float64(time.Now().Sub(totalStart) / time.Millisecond)) diff --git a/storage/kvstore_compaction_test.go b/storage/kvstore_compaction_test.go index 1637e2437..da8a523d3 100644 --- a/storage/kvstore_compaction_test.go +++ b/storage/kvstore_compaction_test.go @@ -72,9 +72,7 @@ func TestScheduleCompaction(t *testing.T) { tx.UnsafePut(keyBucketName, ibytes, []byte("bar")) } tx.Unlock() - // call `s.wg.Add(1)` to match the `s.wg.Done()` call in scheduleCompaction - // to avoid panic from wait group - s.wg.Add(1) + s.scheduleCompaction(tt.rev, tt.keep) tx.Lock() diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index c425e8135..b616545d7 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/pkg/schedule" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/storage/storagepb" @@ -32,6 +33,7 @@ import ( func TestStoreRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := NewStore(b, &lease.FakeLessor{}) + defer s.Close() defer os.Remove(tmpPath) for i := 1; i <= 3; i++ { @@ -129,6 +131,8 @@ func TestStorePut(t *testing.T) { if s.currentRev != tt.wrev { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } + + s.Close() } } @@ -198,6 +202,8 @@ func TestStoreRange(t *testing.T) { if s.currentRev != currev { t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev) } + + s.Close() } } @@ -269,6 +275,7 @@ func TestStoreDeleteRange(t *testing.T) { func TestStoreCompact(t *testing.T) { s := newFakeStore() + defer s.Close() b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) @@ -279,7 +286,7 @@ func TestStoreCompact(t *testing.T) { b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, nil} s.Compact(3) - s.wg.Wait() + s.fifoSched.WaitFinish() if s.compactMainRev != 3 { t.Errorf("compact main rev = %d, want 3", s.compactMainRev) @@ -494,6 +501,8 @@ func newFakeStore() *store { kvindex: fi, currentRev: revision{}, compactMainRev: -1, + fifoSched: schedule.NewFIFOScheduler(), + stopc: make(chan struct{}), } }