pkg/wait: change wait time to use logical clock

This commit is contained in:
Xiang Li 2016-08-19 14:57:28 -07:00
parent 5e9fe0dc23
commit 8db4f5b8e1
2 changed files with 22 additions and 34 deletions

View File

@ -14,48 +14,42 @@
package wait
import (
"sync"
"time"
)
import "sync"
type WaitTime interface {
// Wait returns a chan that waits on the given deadline.
// Wait returns a chan that waits on the given logical deadline.
// The chan will be triggered when Trigger is called with a
// deadline that is later than the one it is waiting for.
// The given deadline MUST be unique. The deadline should be
// retrieved by calling time.Now() in most cases.
Wait(deadline time.Time) <-chan struct{}
// Trigger triggers all the waiting chans with an earlier deadline.
Trigger(deadline time.Time)
Wait(deadline uint64) <-chan struct{}
// Trigger triggers all the waiting chans with an earlier logical deadline.
Trigger(deadline uint64)
}
type timeList struct {
l sync.Mutex
m map[int64]chan struct{}
m map[uint64]chan struct{}
}
func NewTimeList() *timeList {
return &timeList{m: make(map[int64]chan struct{})}
return &timeList{m: make(map[uint64]chan struct{})}
}
func (tl *timeList) Wait(deadline time.Time) <-chan struct{} {
func (tl *timeList) Wait(deadline uint64) <-chan struct{} {
tl.l.Lock()
defer tl.l.Unlock()
nano := deadline.UnixNano()
ch := tl.m[nano]
ch := tl.m[deadline]
if ch == nil {
ch = make(chan struct{})
tl.m[nano] = ch
tl.m[deadline] = ch
}
return ch
}
func (tl *timeList) Trigger(deadline time.Time) {
func (tl *timeList) Trigger(deadline uint64) {
tl.l.Lock()
defer tl.l.Unlock()
for t, ch := range tl.m {
if t < deadline.UnixNano() {
if t <= deadline {
delete(tl.m, t)
close(ch)
}

View File

@ -21,24 +21,22 @@ import (
func TestWaitTime(t *testing.T) {
wt := NewTimeList()
t1 := time.Now()
ch1 := wt.Wait(t1)
wt.Trigger(time.Unix(0, t1.UnixNano()+1))
ch1 := wt.Wait(1)
wt.Trigger(2)
select {
case <-ch1:
default:
t.Fatalf("cannot receive from ch as expected")
}
t2 := time.Now()
ch2 := wt.Wait(t2)
wt.Trigger(t2)
ch2 := wt.Wait(2)
wt.Trigger(1)
select {
case <-ch2:
t.Fatalf("unexpected to receive from ch2")
default:
}
wt.Trigger(time.Unix(0, t2.UnixNano()+1))
wt.Trigger(3)
select {
case <-ch2:
default:
@ -50,11 +48,9 @@ func TestWaitTestStress(t *testing.T) {
chs := make([]<-chan struct{}, 0)
wt := NewTimeList()
for i := 0; i < 10000; i++ {
chs = append(chs, wt.Wait(time.Now()))
// sleep one nanosecond before waiting on the next event
time.Sleep(time.Nanosecond)
chs = append(chs, wt.Wait(uint64(i)))
}
wt.Trigger(time.Now())
wt.Trigger(10000 + 1)
for _, ch := range chs {
select {
@ -66,20 +62,18 @@ func TestWaitTestStress(t *testing.T) {
}
func BenchmarkWaitTime(b *testing.B) {
t := time.Now()
wt := NewTimeList()
for i := 0; i < b.N; i++ {
wt.Wait(t)
wt.Wait(1)
}
}
func BenchmarkTriggerAnd10KWaitTime(b *testing.B) {
for i := 0; i < b.N; i++ {
t := time.Now()
wt := NewTimeList()
for j := 0; j < 10000; j++ {
wt.Wait(t)
wt.Wait(uint64(j))
}
wt.Trigger(time.Now())
wt.Trigger(10000 + 1)
}
}