From 80de75431ef2a2500f932bb4d910ad17326385d6 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 15 Feb 2017 14:54:59 -0800 Subject: [PATCH] grpcproxy: support forcing leader as available Leadership timeout can sometimes take too long, such as in test cases. However, it is possible to infer a leader is available based on RPCs that must go through consensus. Therefore, have a way to update the leadership status off the watch path. --- proxy/grpcproxy/leader.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/proxy/grpcproxy/leader.go b/proxy/grpcproxy/leader.go index db6cfe94c..86afdb707 100644 --- a/proxy/grpcproxy/leader.go +++ b/proxy/grpcproxy/leader.go @@ -63,24 +63,44 @@ func (l *leader) recvLoop() { wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify()) cresp, ok := <-wch if !ok { + l.loseLeader() continue } if cresp.Err() != nil { + l.loseLeader() if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() { close(l.disconnc) return } continue } - // leader is available - l.mu.Lock() - l.leaderc = make(chan struct{}) - l.mu.Unlock() + l.gotLeader() <-wch + l.loseLeader() + } +} + +func (l *leader) loseLeader() { + l.mu.RLock() + defer l.mu.RUnlock() + select { + case <-l.leaderc: + default: close(l.leaderc) } } +// gotLeader will force update the leadership status to having a leader. +func (l *leader) gotLeader() { + l.mu.Lock() + defer l.mu.Unlock() + select { + case <-l.leaderc: + l.leaderc = make(chan struct{}) + default: + } +} + func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc } func (l *leader) stopNotify() <-chan struct{} { return l.donec }