Merge pull request #14109 from SimFG/fifo_panic

schedule: Provide logs when the fifo job panic happens
This commit is contained in:
Piotr Tabor 2022-06-15 22:24:42 +02:00 committed by GitHub
commit e61d431792
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 19 deletions

View File

@ -17,9 +17,36 @@ package schedule
import (
"context"
"sync"
"go.etcd.io/etcd/client/pkg/v3/verify"
"go.uber.org/zap"
)
type Job func(context.Context)
type Job interface {
Name() string
Do(context.Context)
}
type job struct {
name string
do func(context.Context)
}
func (j job) Name() string {
return j.name
}
func (j job) Do(ctx context.Context) {
j.do(ctx)
}
func NewJob(name string, do func(ctx context.Context)) Job {
return job{
name: name,
do: do,
}
}
// Scheduler can schedule jobs.
type Scheduler interface {
@ -56,14 +83,18 @@ type fifo struct {
finishCond *sync.Cond
donec chan struct{}
lg *zap.Logger
}
// NewFIFOScheduler returns a Scheduler that schedules jobs in FIFO
// order sequentially
func NewFIFOScheduler() Scheduler {
func NewFIFOScheduler(lg *zap.Logger) Scheduler {
verify.Assert(lg != nil, "the logger should not be nil")
f := &fifo{
resume: make(chan struct{}, 1),
donec: make(chan struct{}, 1),
lg: lg,
}
f.finishCond = sync.NewCond(&f.mu)
f.ctx, f.cancel = context.WithCancel(context.Background())
@ -125,7 +156,6 @@ func (f *fifo) Stop() {
}
func (f *fifo) run() {
// TODO: recover from job panic?
defer func() {
close(f.donec)
close(f.resume)
@ -149,17 +179,29 @@ func (f *fifo) run() {
f.mu.Unlock()
// clean up pending jobs
for _, todo := range pendings {
todo(f.ctx)
f.executeJob(todo, true)
}
return
}
} else {
todo(f.ctx)
f.executeJob(todo, false)
}
}
}
func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) {
defer func() {
if !updatedFinishedStats {
f.finishCond.L.Lock()
f.finished++
f.pendings = f.pendings[1:]
f.finishCond.Broadcast()
f.finishCond.L.Unlock()
}
}
if err := recover(); err != nil {
f.lg.Panic("execute job failed", zap.String("job", todo.Name()), zap.Any("panic", err))
}
}()
todo.Do(f.ctx)
}

View File

@ -16,21 +16,32 @@ package schedule
import (
"context"
"fmt"
"testing"
"go.uber.org/zap/zaptest"
)
func TestFIFOSchedule(t *testing.T) {
s := NewFIFOScheduler()
s := NewFIFOScheduler(zaptest.NewLogger(t))
defer s.Stop()
next := 0
jobCreator := func(i int) Job {
return func(ctx context.Context) {
return NewJob(fmt.Sprintf("i_%d_increse", i), func(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
fmt.Println("err: ", err)
}
}()
if next != i {
t.Fatalf("job#%d: got %d, want %d", i, next, i)
}
next = i + 1
}
if next%3 == 0 {
panic("fifo panic")
}
})
}
var jobs []Job

View File

@ -54,7 +54,7 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/auth"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types"
httptypes "go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp/types"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
@ -744,7 +744,7 @@ func (s *EtcdServer) run() {
}
// asynchronously accept toApply packets, dispatch progress in-order
sched := schedule.NewFIFOScheduler()
sched := schedule.NewFIFOScheduler(lg)
var (
smu sync.RWMutex
@ -839,7 +839,7 @@ func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
case leases := <-expiredLeaseC:
s.revokeExpiredLeases(leases)

View File

@ -108,7 +108,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
currentRev: 1,
compactMainRev: -1,
fifoSched: schedule.NewFIFOScheduler(),
fifoSched: schedule.NewFIFOScheduler(lg),
stopc: make(chan struct{}),
@ -148,7 +148,7 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
// snapshot call, compaction and apply snapshot requests are serialized by
// raft, and do not happen at the same time.
s.mu.Lock()
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
f := schedule.NewJob("kvstore_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
s.fifoSched.Schedule(f)
s.mu.Unlock()
}
@ -202,7 +202,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
s.revMu.Lock()
if rev <= s.compactMainRev {
ch := make(chan struct{})
f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
f := schedule.NewJob("kvstore_updateCompactRev_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
s.fifoSched.Schedule(f)
s.revMu.Unlock()
return ch, 0, ErrCompacted
@ -225,7 +225,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
ch := make(chan struct{})
var j = func(ctx context.Context) {
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
@ -238,7 +238,7 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}
s.hashes.Store(hash)
close(ch)
}
})
s.fifoSched.Schedule(j)
trace.Step("schedule compaction")
@ -292,7 +292,7 @@ func (s *store) Restore(b backend.Backend) error {
s.revMu.Unlock()
}
s.fifoSched = schedule.NewFIFOScheduler()
s.fifoSched = schedule.NewFIFOScheduler(s.lg)
s.stopc = make(chan struct{})
return s.restore()

View File

@ -856,7 +856,7 @@ func newFakeStore(lg *zap.Logger) *store {
kvindex: newFakeIndex(),
currentRev: 0,
compactMainRev: -1,
fifoSched: schedule.NewFIFOScheduler(),
fifoSched: schedule.NewFIFOScheduler(lg),
stopc: make(chan struct{}),
lg: lg,
}