From 6f707b857ad50a9c3e5d2d54a4b18e2a4fdbe774 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 1 Apr 2016 10:59:28 -0700 Subject: [PATCH] etcdserver, storage: don't ack physical compaction on error or snap restore Snapshot recovery will reset the FIFO; reschedule the physical acknowledgment instead of acknowledging on scheduler teardown. --- etcdserver/apply.go | 2 +- storage/kvstore.go | 30 ++++++++++++++++++++++++------ storage/kvstore_compaction.go | 6 +++--- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 86f7f8b43..02d2586e2 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -377,7 +377,7 @@ func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.Com resp.Header = &pb.ResponseHeader{} ch, err := a.s.KV().Compact(compaction.Revision) if err != nil { - return nil, nil, err + return nil, ch, err } // get the current revision. which key to get is not important. _, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0) diff --git a/storage/kvstore.go b/storage/kvstore.go index 3b6d16652..e9fceb7da 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -231,12 +231,28 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err return n, rev, nil } +func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { + if ctx == nil || ctx.Err() != nil { + s.mu.Lock() + select { + case <-s.stopc: + default: + f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } + s.fifoSched.Schedule(f) + } + s.mu.Unlock() + return + } + close(ch) +} + func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() defer s.mu.Unlock() if rev <= s.compactMainRev { ch := make(chan struct{}) - s.fifoSched.Schedule(func(context.Context) { close(ch) }) + f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } + s.fifoSched.Schedule(f) return ch, ErrCompacted } if rev > s.currentRev.main { @@ -260,13 +276,15 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { keep := s.kvindex.Compact(rev) ch := make(chan struct{}) var j = func(ctx context.Context) { - defer close(ch) - select { - case <-ctx.Done(): + if ctx.Err() != nil { + s.compactBarrier(ctx, ch) return - default: } - s.scheduleCompaction(rev, keep) + if !s.scheduleCompaction(rev, keep) { + s.compactBarrier(nil, ch) + return + } + close(ch) } s.fifoSched.Schedule(j) diff --git a/storage/kvstore_compaction.go b/storage/kvstore_compaction.go index 840430a8b..a7b9cc8ad 100644 --- a/storage/kvstore_compaction.go +++ b/storage/kvstore_compaction.go @@ -19,7 +19,7 @@ import ( "time" ) -func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) { +func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool { totalStart := time.Now() defer dbCompactionTotalDurations.Observe(float64(time.Now().Sub(totalStart) / time.Millisecond)) @@ -48,7 +48,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc revToBytes(revision{main: compactMainRev}, rbytes) tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes) tx.Unlock() - return + return true } // update last @@ -59,7 +59,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc select { case <-time.After(100 * time.Millisecond): case <-s.stopc: - return + return false } } }