From 8db4f5b8e122674d51b8991eda8511143a492bbf Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 19 Aug 2016 14:57:28 -0700 Subject: [PATCH 1/2] pkg/wait: change wait time to use logical clock --- pkg/wait/wait_time.go | 30 ++++++++++++------------------ pkg/wait/wait_time_test.go | 26 ++++++++++---------------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/pkg/wait/wait_time.go b/pkg/wait/wait_time.go index bda2d5d5c..eebbf49b3 100644 --- a/pkg/wait/wait_time.go +++ b/pkg/wait/wait_time.go @@ -14,48 +14,42 @@ package wait -import ( - "sync" - "time" -) +import "sync" type WaitTime interface { - // Wait returns a chan that waits on the given deadline. + // Wait returns a chan that waits on the given logical deadline. // The chan will be triggered when Trigger is called with a // deadline that is later than the one it is waiting for. - // The given deadline MUST be unique. The deadline should be - // retrieved by calling time.Now() in most cases. - Wait(deadline time.Time) <-chan struct{} - // Trigger triggers all the waiting chans with an earlier deadline. - Trigger(deadline time.Time) + Wait(deadline uint64) <-chan struct{} + // Trigger triggers all the waiting chans with an earlier logical deadline. + Trigger(deadline uint64) } type timeList struct { l sync.Mutex - m map[int64]chan struct{} + m map[uint64]chan struct{} } func NewTimeList() *timeList { - return &timeList{m: make(map[int64]chan struct{})} + return &timeList{m: make(map[uint64]chan struct{})} } -func (tl *timeList) Wait(deadline time.Time) <-chan struct{} { +func (tl *timeList) Wait(deadline uint64) <-chan struct{} { tl.l.Lock() defer tl.l.Unlock() - nano := deadline.UnixNano() - ch := tl.m[nano] + ch := tl.m[deadline] if ch == nil { ch = make(chan struct{}) - tl.m[nano] = ch + tl.m[deadline] = ch } return ch } -func (tl *timeList) Trigger(deadline time.Time) { +func (tl *timeList) Trigger(deadline uint64) { tl.l.Lock() defer tl.l.Unlock() for t, ch := range tl.m { - if t < deadline.UnixNano() { + if t <= deadline { delete(tl.m, t) close(ch) } diff --git a/pkg/wait/wait_time_test.go b/pkg/wait/wait_time_test.go index 098a6ea9c..13f4d7cf2 100644 --- a/pkg/wait/wait_time_test.go +++ b/pkg/wait/wait_time_test.go @@ -21,24 +21,22 @@ import ( func TestWaitTime(t *testing.T) { wt := NewTimeList() - t1 := time.Now() - ch1 := wt.Wait(t1) - wt.Trigger(time.Unix(0, t1.UnixNano()+1)) + ch1 := wt.Wait(1) + wt.Trigger(2) select { case <-ch1: default: t.Fatalf("cannot receive from ch as expected") } - t2 := time.Now() - ch2 := wt.Wait(t2) - wt.Trigger(t2) + ch2 := wt.Wait(2) + wt.Trigger(1) select { case <-ch2: t.Fatalf("unexpected to receive from ch2") default: } - wt.Trigger(time.Unix(0, t2.UnixNano()+1)) + wt.Trigger(3) select { case <-ch2: default: @@ -50,11 +48,9 @@ func TestWaitTestStress(t *testing.T) { chs := make([]<-chan struct{}, 0) wt := NewTimeList() for i := 0; i < 10000; i++ { - chs = append(chs, wt.Wait(time.Now())) - // sleep one nanosecond before waiting on the next event - time.Sleep(time.Nanosecond) + chs = append(chs, wt.Wait(uint64(i))) } - wt.Trigger(time.Now()) + wt.Trigger(10000 + 1) for _, ch := range chs { select { @@ -66,20 +62,18 @@ func TestWaitTestStress(t *testing.T) { } func BenchmarkWaitTime(b *testing.B) { - t := time.Now() wt := NewTimeList() for i := 0; i < b.N; i++ { - wt.Wait(t) + wt.Wait(1) } } func BenchmarkTriggerAnd10KWaitTime(b *testing.B) { for i := 0; i < b.N; i++ { - t := time.Now() wt := NewTimeList() for j := 0; j < 10000; j++ { - wt.Wait(t) + wt.Wait(uint64(j)) } - wt.Trigger(time.Now()) + wt.Trigger(10000 + 1) } } From 83de13e4a894c0fcb421090970f390675352b559 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 19 Aug 2016 16:10:58 -0700 Subject: [PATCH 2/2] etcdserver: support apply wait --- etcdserver/server.go | 17 +++++++++++------ etcdserver/v3_server.go | 22 +++++----------------- pkg/wait/wait_time.go | 13 +++++++++++-- pkg/wait/wait_time_test.go | 13 ++++++++++--- 4 files changed, 37 insertions(+), 28 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 3466355f0..429ac8766 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -190,12 +190,14 @@ type EtcdServer struct { applyV3 applierV3 // applyV3Base is the core applier without auth or quotas applyV3Base applierV3 - kv mvcc.ConsistentWatchableKV - lessor lease.Lessor - bemu sync.Mutex - be backend.Backend - authStore auth.AuthStore - alarmStore *alarm.AlarmStore + applyWait wait.WaitTime + + kv mvcc.ConsistentWatchableKV + lessor lease.Lessor + bemu sync.Mutex + be backend.Backend + authStore auth.AuthStore + alarmStore *alarm.AlarmStore stats *stats.ServerStats lstats *stats.LeaderStats @@ -475,6 +477,7 @@ func (s *EtcdServer) start() { s.snapCount = DefaultSnapCount } s.w = wait.New() + s.applyWait = wait.NewTimeList() s.done = make(chan struct{}) s.stop = make(chan struct{}) if s.ClusterVersion() != nil { @@ -629,10 +632,12 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { plog.Warningf("avoid queries with large range/delete range!") } proposalsApplied.Set(float64(ep.appliedi)) + s.applyWait.Trigger(ep.appliedi) // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. <-apply.raftDone + s.triggerSnapshot(ep) select { // snapshot requested via send() diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index e044dbe9d..dca135c46 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -463,24 +463,12 @@ func (s *EtcdServer) isValidSimpleToken(token string) bool { return false } - // CAUTION: below index synchronization is required because this node - // might not receive and apply the log entry of Authenticate() RPC. - authApplied := false - for i := 0; i < 10; i++ { - if uint64(index) <= s.getAppliedIndex() { - authApplied = true - break - } - - time.Sleep(100 * time.Millisecond) + select { + case <-s.applyWait.Wait(uint64(index)): + return true + case <-s.stop: + return true } - - if !authApplied { - plog.Errorf("timeout of waiting Authenticate() RPC") - return false - } - - return true } func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) { diff --git a/pkg/wait/wait_time.go b/pkg/wait/wait_time.go index eebbf49b3..297e48a47 100644 --- a/pkg/wait/wait_time.go +++ b/pkg/wait/wait_time.go @@ -25,9 +25,14 @@ type WaitTime interface { Trigger(deadline uint64) } +var closec chan struct{} + +func init() { closec = make(chan struct{}); close(closec) } + type timeList struct { - l sync.Mutex - m map[uint64]chan struct{} + l sync.Mutex + lastTriggerDeadline uint64 + m map[uint64]chan struct{} } func NewTimeList() *timeList { @@ -37,6 +42,9 @@ func NewTimeList() *timeList { func (tl *timeList) Wait(deadline uint64) <-chan struct{} { tl.l.Lock() defer tl.l.Unlock() + if tl.lastTriggerDeadline >= deadline { + return closec + } ch := tl.m[deadline] if ch == nil { ch = make(chan struct{}) @@ -48,6 +56,7 @@ func (tl *timeList) Wait(deadline uint64) <-chan struct{} { func (tl *timeList) Trigger(deadline uint64) { tl.l.Lock() defer tl.l.Unlock() + tl.lastTriggerDeadline = deadline for t, ch := range tl.m { if t <= deadline { delete(tl.m, t) diff --git a/pkg/wait/wait_time_test.go b/pkg/wait/wait_time_test.go index 13f4d7cf2..26164c4ac 100644 --- a/pkg/wait/wait_time_test.go +++ b/pkg/wait/wait_time_test.go @@ -29,19 +29,26 @@ func TestWaitTime(t *testing.T) { t.Fatalf("cannot receive from ch as expected") } - ch2 := wt.Wait(2) - wt.Trigger(1) + ch2 := wt.Wait(4) + wt.Trigger(3) select { case <-ch2: t.Fatalf("unexpected to receive from ch2") default: } - wt.Trigger(3) + wt.Trigger(4) select { case <-ch2: default: t.Fatalf("cannot receive from ch2 as expected") } + + select { + // wait on a triggered deadline + case <-wt.Wait(4): + default: + t.Fatalf("unexpected blocking when wait on triggered deadline") + } } func TestWaitTestStress(t *testing.T) {