diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go index 234d01989..ea19cf018 100644 --- a/pkg/schedule/schedule.go +++ b/pkg/schedule/schedule.go @@ -17,9 +17,36 @@ package schedule import ( "context" "sync" + + "go.etcd.io/etcd/client/pkg/v3/verify" + + "go.uber.org/zap" ) -type Job func(context.Context) +type Job interface { + Name() string + Do(context.Context) +} + +type job struct { + name string + do func(context.Context) +} + +func (j job) Name() string { + return j.name +} + +func (j job) Do(ctx context.Context) { + j.do(ctx) +} + +func NewJob(name string, do func(ctx context.Context)) Job { + return job{ + name: name, + do: do, + } +} // Scheduler can schedule jobs. type Scheduler interface { @@ -56,14 +83,18 @@ type fifo struct { finishCond *sync.Cond donec chan struct{} + lg *zap.Logger } // NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO // order sequentially -func NewFIFOScheduler() Scheduler { +func NewFIFOScheduler(lg *zap.Logger) Scheduler { + verify.Assert(lg != nil, "the logger should not be nil") + f := &fifo{ resume: make(chan struct{}, 1), donec: make(chan struct{}, 1), + lg: lg, } f.finishCond = sync.NewCond(&f.mu) f.ctx, f.cancel = context.WithCancel(context.Background()) @@ -125,7 +156,6 @@ func (f *fifo) Stop() { } func (f *fifo) run() { - // TODO: recover from job panic? defer func() { close(f.donec) close(f.resume) @@ -149,17 +179,29 @@ func (f *fifo) run() { f.mu.Unlock() // clean up pending jobs for _, todo := range pendings { - todo(f.ctx) + f.executeJob(todo, true) } return } } else { - todo(f.ctx) + f.executeJob(todo, false) + } + } +} + +func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) { + defer func() { + if !updatedFinishedStats { f.finishCond.L.Lock() f.finished++ f.pendings = f.pendings[1:] f.finishCond.Broadcast() f.finishCond.L.Unlock() } - } + if err := recover(); err != nil { + f.lg.Panic("execute job failed", zap.String("job", todo.Name()), zap.Any("panic", err)) + } + }() + + todo.Do(f.ctx) } diff --git a/pkg/schedule/schedule_test.go b/pkg/schedule/schedule_test.go index aa9c709f8..c6bb73ca9 100644 --- a/pkg/schedule/schedule_test.go +++ b/pkg/schedule/schedule_test.go @@ -16,21 +16,32 @@ package schedule import ( "context" + "fmt" "testing" + + "go.uber.org/zap/zaptest" ) func TestFIFOSchedule(t *testing.T) { - s := NewFIFOScheduler() + s := NewFIFOScheduler(zaptest.NewLogger(t)) defer s.Stop() next := 0 jobCreator := func(i int) Job { - return func(ctx context.Context) { + return NewJob(fmt.Sprintf("i_%d_increse", i), func(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + fmt.Println("err: ", err) + } + }() if next != i { t.Fatalf("job#%d: got %d, want %d", i, next, i) } next = i + 1 - } + if next%3 == 0 { + panic("fifo panic") + } + }) } var jobs []Job diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 18d3a0fa1..63b561342 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -54,7 +54,7 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/auth" "go.etcd.io/etcd/server/v3/etcdserver/api" - "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types" + httptypes "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" @@ -744,7 +744,7 @@ func (s *EtcdServer) run() { } // asynchronously accept toApply packets, dispatch progress in-order - sched := schedule.NewFIFOScheduler() + sched := schedule.NewFIFOScheduler(lg) var ( smu sync.RWMutex @@ -839,7 +839,7 @@ func (s *EtcdServer) run() { for { select { case ap := <-s.r.apply(): - f := func(context.Context) { s.applyAll(&ep, &ap) } + f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) }) sched.Schedule(f) case leases := <-expiredLeaseC: s.revokeExpiredLeases(leases) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index c3da906b7..442a28bae 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -108,7 +108,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi currentRev: 1, compactMainRev: -1, - fifoSched: schedule.NewFIFOScheduler(), + fifoSched: schedule.NewFIFOScheduler(lg), stopc: make(chan struct{}), @@ -148,7 +148,7 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { // snapshot call, compaction and apply snapshot requests are serialized by // raft, and do not happen at the same time. s.mu.Lock() - f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } + f := schedule.NewJob("kvstore_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) }) s.fifoSched.Schedule(f) s.mu.Unlock() } @@ -202,7 +202,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { s.revMu.Lock() if rev <= s.compactMainRev { ch := make(chan struct{}) - f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } + f := schedule.NewJob("kvstore_updateCompactRev_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) }) s.fifoSched.Schedule(f) s.revMu.Unlock() return ch, 0, ErrCompacted @@ -225,7 +225,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) { func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) { ch := make(chan struct{}) - var j = func(ctx context.Context) { + j := schedule.NewJob("kvstore_compact", func(ctx context.Context) { if ctx.Err() != nil { s.compactBarrier(ctx, ch) return @@ -238,7 +238,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch } s.hashes.Store(hash) close(ch) - } + }) s.fifoSched.Schedule(j) trace.Step("schedule compaction") @@ -292,7 +292,7 @@ func (s *store) Restore(b backend.Backend) error { s.revMu.Unlock() } - s.fifoSched = schedule.NewFIFOScheduler() + s.fifoSched = schedule.NewFIFOScheduler(s.lg) s.stopc = make(chan struct{}) return s.restore() diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 099bad58e..77b93d69a 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -856,7 +856,7 @@ func newFakeStore(lg *zap.Logger) *store { kvindex: newFakeIndex(), currentRev: 0, compactMainRev: -1, - fifoSched: schedule.NewFIFOScheduler(), + fifoSched: schedule.NewFIFOScheduler(lg), stopc: make(chan struct{}), lg: lg, }