From 8db4f5b8e122674d51b8991eda8511143a492bbf Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 19 Aug 2016 14:57:28 -0700 Subject: [PATCH] pkg/wait: change wait time to use logical clock --- pkg/wait/wait_time.go | 30 ++++++++++++------------------ pkg/wait/wait_time_test.go | 26 ++++++++++---------------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/pkg/wait/wait_time.go b/pkg/wait/wait_time.go index bda2d5d5c..eebbf49b3 100644 --- a/pkg/wait/wait_time.go +++ b/pkg/wait/wait_time.go @@ -14,48 +14,42 @@ package wait -import ( - "sync" - "time" -) +import "sync" type WaitTime interface { - // Wait returns a chan that waits on the given deadline. + // Wait returns a chan that waits on the given logical deadline. // The chan will be triggered when Trigger is called with a // deadline that is later than the one it is waiting for. - // The given deadline MUST be unique. The deadline should be - // retrieved by calling time.Now() in most cases. - Wait(deadline time.Time) <-chan struct{} - // Trigger triggers all the waiting chans with an earlier deadline. - Trigger(deadline time.Time) + Wait(deadline uint64) <-chan struct{} + // Trigger triggers all the waiting chans with an earlier logical deadline. + Trigger(deadline uint64) } type timeList struct { l sync.Mutex - m map[int64]chan struct{} + m map[uint64]chan struct{} } func NewTimeList() *timeList { - return &timeList{m: make(map[int64]chan struct{})} + return &timeList{m: make(map[uint64]chan struct{})} } -func (tl *timeList) Wait(deadline time.Time) <-chan struct{} { +func (tl *timeList) Wait(deadline uint64) <-chan struct{} { tl.l.Lock() defer tl.l.Unlock() - nano := deadline.UnixNano() - ch := tl.m[nano] + ch := tl.m[deadline] if ch == nil { ch = make(chan struct{}) - tl.m[nano] = ch + tl.m[deadline] = ch } return ch } -func (tl *timeList) Trigger(deadline time.Time) { +func (tl *timeList) Trigger(deadline uint64) { tl.l.Lock() defer tl.l.Unlock() for t, ch := range tl.m { - if t < deadline.UnixNano() { + if t <= deadline { delete(tl.m, t) close(ch) } diff --git a/pkg/wait/wait_time_test.go b/pkg/wait/wait_time_test.go index 098a6ea9c..13f4d7cf2 100644 --- a/pkg/wait/wait_time_test.go +++ b/pkg/wait/wait_time_test.go @@ -21,24 +21,22 @@ import ( func TestWaitTime(t *testing.T) { wt := NewTimeList() - t1 := time.Now() - ch1 := wt.Wait(t1) - wt.Trigger(time.Unix(0, t1.UnixNano()+1)) + ch1 := wt.Wait(1) + wt.Trigger(2) select { case <-ch1: default: t.Fatalf("cannot receive from ch as expected") } - t2 := time.Now() - ch2 := wt.Wait(t2) - wt.Trigger(t2) + ch2 := wt.Wait(2) + wt.Trigger(1) select { case <-ch2: t.Fatalf("unexpected to receive from ch2") default: } - wt.Trigger(time.Unix(0, t2.UnixNano()+1)) + wt.Trigger(3) select { case <-ch2: default: @@ -50,11 +48,9 @@ func TestWaitTestStress(t *testing.T) { chs := make([]<-chan struct{}, 0) wt := NewTimeList() for i := 0; i < 10000; i++ { - chs = append(chs, wt.Wait(time.Now())) - // sleep one nanosecond before waiting on the next event - time.Sleep(time.Nanosecond) + chs = append(chs, wt.Wait(uint64(i))) } - wt.Trigger(time.Now()) + wt.Trigger(10000 + 1) for _, ch := range chs { select { @@ -66,20 +62,18 @@ func TestWaitTestStress(t *testing.T) { } func BenchmarkWaitTime(b *testing.B) { - t := time.Now() wt := NewTimeList() for i := 0; i < b.N; i++ { - wt.Wait(t) + wt.Wait(1) } } func BenchmarkTriggerAnd10KWaitTime(b *testing.B) { for i := 0; i < b.N; i++ { - t := time.Now() wt := NewTimeList() for j := 0; j < 10000; j++ { - wt.Wait(t) + wt.Wait(uint64(j)) } - wt.Trigger(time.Now()) + wt.Trigger(10000 + 1) } }