Merge pull request #6229 from xiang90/applynotify

etcdserver: add waitApplyIndex
This commit is contained in:
Xiang Li 2016-08-19 16:58:21 -07:00 committed by GitHub
commit 262c98f327
4 changed files with 55 additions and 58 deletions

View File

@ -190,12 +190,14 @@ type EtcdServer struct {
applyV3 applierV3
// applyV3Base is the core applier without auth or quotas
applyV3Base applierV3
kv mvcc.ConsistentWatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
authStore auth.AuthStore
alarmStore *alarm.AlarmStore
applyWait wait.WaitTime
kv mvcc.ConsistentWatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
authStore auth.AuthStore
alarmStore *alarm.AlarmStore
stats *stats.ServerStats
lstats *stats.LeaderStats
@ -475,6 +477,7 @@ func (s *EtcdServer) start() {
s.snapCount = DefaultSnapCount
}
s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
s.stop = make(chan struct{})
if s.ClusterVersion() != nil {
@ -629,10 +632,12 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
plog.Warningf("avoid queries with large range/delete range!")
}
proposalsApplied.Set(float64(ep.appliedi))
s.applyWait.Trigger(ep.appliedi)
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
<-apply.raftDone
s.triggerSnapshot(ep)
select {
// snapshot requested via send()

View File

@ -463,24 +463,12 @@ func (s *EtcdServer) isValidSimpleToken(token string) bool {
return false
}
// CAUTION: below index synchronization is required because this node
// might not receive and apply the log entry of Authenticate() RPC.
authApplied := false
for i := 0; i < 10; i++ {
if uint64(index) <= s.getAppliedIndex() {
authApplied = true
break
}
time.Sleep(100 * time.Millisecond)
select {
case <-s.applyWait.Wait(uint64(index)):
return true
case <-s.stop:
return true
}
if !authApplied {
plog.Errorf("timeout of waiting Authenticate() RPC")
return false
}
return true
}
func (s *EtcdServer) authInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {

View File

@ -14,48 +14,51 @@
package wait
import (
"sync"
"time"
)
import "sync"
type WaitTime interface {
// Wait returns a chan that waits on the given deadline.
// Wait returns a chan that waits on the given logical deadline.
// The chan will be triggered when Trigger is called with a
// deadline that is later than the one it is waiting for.
// The given deadline MUST be unique. The deadline should be
// retrieved by calling time.Now() in most cases.
Wait(deadline time.Time) <-chan struct{}
// Trigger triggers all the waiting chans with an earlier deadline.
Trigger(deadline time.Time)
Wait(deadline uint64) <-chan struct{}
// Trigger triggers all the waiting chans with an earlier logical deadline.
Trigger(deadline uint64)
}
var closec chan struct{}
func init() { closec = make(chan struct{}); close(closec) }
type timeList struct {
l sync.Mutex
m map[int64]chan struct{}
l sync.Mutex
lastTriggerDeadline uint64
m map[uint64]chan struct{}
}
func NewTimeList() *timeList {
return &timeList{m: make(map[int64]chan struct{})}
return &timeList{m: make(map[uint64]chan struct{})}
}
func (tl *timeList) Wait(deadline time.Time) <-chan struct{} {
func (tl *timeList) Wait(deadline uint64) <-chan struct{} {
tl.l.Lock()
defer tl.l.Unlock()
nano := deadline.UnixNano()
ch := tl.m[nano]
if tl.lastTriggerDeadline >= deadline {
return closec
}
ch := tl.m[deadline]
if ch == nil {
ch = make(chan struct{})
tl.m[nano] = ch
tl.m[deadline] = ch
}
return ch
}
func (tl *timeList) Trigger(deadline time.Time) {
func (tl *timeList) Trigger(deadline uint64) {
tl.l.Lock()
defer tl.l.Unlock()
tl.lastTriggerDeadline = deadline
for t, ch := range tl.m {
if t < deadline.UnixNano() {
if t <= deadline {
delete(tl.m, t)
close(ch)
}

View File

@ -21,40 +21,43 @@ import (
func TestWaitTime(t *testing.T) {
wt := NewTimeList()
t1 := time.Now()
ch1 := wt.Wait(t1)
wt.Trigger(time.Unix(0, t1.UnixNano()+1))
ch1 := wt.Wait(1)
wt.Trigger(2)
select {
case <-ch1:
default:
t.Fatalf("cannot receive from ch as expected")
}
t2 := time.Now()
ch2 := wt.Wait(t2)
wt.Trigger(t2)
ch2 := wt.Wait(4)
wt.Trigger(3)
select {
case <-ch2:
t.Fatalf("unexpected to receive from ch2")
default:
}
wt.Trigger(time.Unix(0, t2.UnixNano()+1))
wt.Trigger(4)
select {
case <-ch2:
default:
t.Fatalf("cannot receive from ch2 as expected")
}
select {
// wait on a triggered deadline
case <-wt.Wait(4):
default:
t.Fatalf("unexpected blocking when wait on triggered deadline")
}
}
func TestWaitTestStress(t *testing.T) {
chs := make([]<-chan struct{}, 0)
wt := NewTimeList()
for i := 0; i < 10000; i++ {
chs = append(chs, wt.Wait(time.Now()))
// sleep one nanosecond before waiting on the next event
time.Sleep(time.Nanosecond)
chs = append(chs, wt.Wait(uint64(i)))
}
wt.Trigger(time.Now())
wt.Trigger(10000 + 1)
for _, ch := range chs {
select {
@ -66,20 +69,18 @@ func TestWaitTestStress(t *testing.T) {
}
func BenchmarkWaitTime(b *testing.B) {
t := time.Now()
wt := NewTimeList()
for i := 0; i < b.N; i++ {
wt.Wait(t)
wt.Wait(1)
}
}
func BenchmarkTriggerAnd10KWaitTime(b *testing.B) {
for i := 0; i < b.N; i++ {
t := time.Now()
wt := NewTimeList()
for j := 0; j < 10000; j++ {
wt.Wait(t)
wt.Wait(uint64(j))
}
wt.Trigger(time.Now())
wt.Trigger(10000 + 1)
}
}