diff --git a/proxy/grpcproxy/leader.go b/proxy/grpcproxy/leader.go new file mode 100644 index 000000000..db6cfe94c --- /dev/null +++ b/proxy/grpcproxy/leader.go @@ -0,0 +1,94 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "math" + "sync" + + "golang.org/x/net/context" + "golang.org/x/time/rate" + "google.golang.org/grpc" + + "github.com/coreos/etcd/clientv3" +) + +const ( + lostLeaderKey = "__lostleader" // watched to detect leader loss + retryPerSecond = 10 +) + +type leader struct { + ctx context.Context + w clientv3.Watcher + // mu protects leaderc updates. + mu sync.RWMutex + leaderc chan struct{} + disconnc chan struct{} + donec chan struct{} +} + +func newLeader(ctx context.Context, w clientv3.Watcher) *leader { + l := &leader{ + ctx: clientv3.WithRequireLeader(ctx), + w: w, + leaderc: make(chan struct{}), + disconnc: make(chan struct{}), + donec: make(chan struct{}), + } + // begin assuming leader is lost + close(l.leaderc) + go l.recvLoop() + return l +} + +func (l *leader) recvLoop() { + defer close(l.donec) + + limiter := rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond) + rev := int64(math.MaxInt64 - 2) + for limiter.Wait(l.ctx) == nil { + wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify()) + cresp, ok := <-wch + if !ok { + continue + } + if cresp.Err() != nil { + if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() { + close(l.disconnc) + return + } + continue + } + // leader is available + l.mu.Lock() + l.leaderc = make(chan struct{}) + l.mu.Unlock() + <-wch + close(l.leaderc) + } +} + +func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc } + +func (l *leader) stopNotify() <-chan struct{} { return l.donec } + +// lostNotify returns a channel that is closed if there has been +// a leader loss not yet followed by a leader reacquire. +func (l *leader) lostNotify() <-chan struct{} { + l.mu.RLock() + defer l.mu.RUnlock() + return l.leaderc +} diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index b7e64daf3..a127c3100 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -18,7 +18,7 @@ import ( "sync" "golang.org/x/net/context" - "golang.org/x/time/rate" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" "github.com/coreos/etcd/clientv3" @@ -31,50 +31,35 @@ type watchProxy struct { cw clientv3.Watcher ctx context.Context + leader *leader + ranges *watchRanges - // retryLimiter controls the create watch retry rate on lost leaders. - retryLimiter *rate.Limiter - - // mu protects leaderc updates. - mu sync.RWMutex - leaderc chan struct{} + // mu protects adding outstanding watch servers through wg. + mu sync.Mutex // wg waits until all outstanding watch servers quit. wg sync.WaitGroup } -const ( - lostLeaderKey = "__lostleader" // watched to detect leader loss - retryPerSecond = 10 -) - func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { + cctx, cancel := context.WithCancel(c.Ctx()) wp := &watchProxy{ - cw: c.Watcher, - ctx: c.Ctx(), - retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond), - leaderc: make(chan struct{}), + cw: c.Watcher, + ctx: cctx, + leader: newLeader(c.Ctx(), c.Watcher), } wp.ranges = newWatchRanges(wp) ch := make(chan struct{}) go func() { defer close(ch) - // a new streams without opening any watchers won't catch - // a lost leader event, so have a special watch to monitor it - rev := int64((uint64(1) << 63) - 2) - lctx := clientv3.WithRequireLeader(wp.ctx) - for wp.ctx.Err() == nil { - wch := wp.cw.Watch(lctx, lostLeaderKey, clientv3.WithRev(rev)) - for range wch { - } - wp.mu.Lock() - close(wp.leaderc) - wp.leaderc = make(chan struct{}) - wp.mu.Unlock() - wp.retryLimiter.Wait(wp.ctx) - } + <-wp.leader.stopNotify() wp.mu.Lock() + select { + case <-wp.ctx.Done(): + case <-wp.leader.disconnectNotify(): + cancel() + } <-wp.ctx.Done() wp.mu.Unlock() wp.wg.Wait() @@ -104,11 +89,19 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { cancel: cancel, } - var leaderc <-chan struct{} + var lostLeaderC <-chan struct{} if md, ok := metadata.FromContext(stream.Context()); ok { v := md[rpctypes.MetadataRequireLeaderKey] if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader { - leaderc = wp.lostLeaderNotify() + lostLeaderC = wp.leader.lostNotify() + // if leader is known to be lost at creation time, avoid + // letting events through at all + select { + case <-lostLeaderC: + wp.wg.Done() + return rpctypes.ErrNoLeader + default: + } } } @@ -127,7 +120,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { go func() { defer func() { stopc <- struct{}{} }() select { - case <-leaderc: + case <-lostLeaderC: case <-ctx.Done(): case <-wp.ctx.Done(): } @@ -146,19 +139,15 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { }() select { - case <-leaderc: + case <-lostLeaderC: return rpctypes.ErrNoLeader + case <-wp.leader.disconnectNotify(): + return grpc.ErrClientConnClosing default: return wps.ctx.Err() } } -func (wp *watchProxy) lostLeaderNotify() <-chan struct{} { - wp.mu.RLock() - defer wp.mu.RUnlock() - return wp.leaderc -} - // watchProxyStream forwards etcd watch events to a proxied client stream. type watchProxyStream struct { ranges *watchRanges