From 49a0a63fc3e85b6d6e9abd7bad74496642decfe5 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 10 Feb 2017 14:24:01 -0800 Subject: [PATCH 1/3] grpcproxy: split out leadership detection code Move out of watch code since will be shared with lease code. Also assumes leader does not exist unless watch can be successfully created. --- proxy/grpcproxy/leader.go | 94 +++++++++++++++++++++++++++++++++++++++ proxy/grpcproxy/watch.go | 69 ++++++++++++---------------- 2 files changed, 123 insertions(+), 40 deletions(-) create mode 100644 proxy/grpcproxy/leader.go diff --git a/proxy/grpcproxy/leader.go b/proxy/grpcproxy/leader.go new file mode 100644 index 000000000..db6cfe94c --- /dev/null +++ b/proxy/grpcproxy/leader.go @@ -0,0 +1,94 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "math" + "sync" + + "golang.org/x/net/context" + "golang.org/x/time/rate" + "google.golang.org/grpc" + + "github.com/coreos/etcd/clientv3" +) + +const ( + lostLeaderKey = "__lostleader" // watched to detect leader loss + retryPerSecond = 10 +) + +type leader struct { + ctx context.Context + w clientv3.Watcher + // mu protects leaderc updates. + mu sync.RWMutex + leaderc chan struct{} + disconnc chan struct{} + donec chan struct{} +} + +func newLeader(ctx context.Context, w clientv3.Watcher) *leader { + l := &leader{ + ctx: clientv3.WithRequireLeader(ctx), + w: w, + leaderc: make(chan struct{}), + disconnc: make(chan struct{}), + donec: make(chan struct{}), + } + // begin assuming leader is lost + close(l.leaderc) + go l.recvLoop() + return l +} + +func (l *leader) recvLoop() { + defer close(l.donec) + + limiter := rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond) + rev := int64(math.MaxInt64 - 2) + for limiter.Wait(l.ctx) == nil { + wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify()) + cresp, ok := <-wch + if !ok { + continue + } + if cresp.Err() != nil { + if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() { + close(l.disconnc) + return + } + continue + } + // leader is available + l.mu.Lock() + l.leaderc = make(chan struct{}) + l.mu.Unlock() + <-wch + close(l.leaderc) + } +} + +func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc } + +func (l *leader) stopNotify() <-chan struct{} { return l.donec } + +// lostNotify returns a channel that is closed if there has been +// a leader loss not yet followed by a leader reacquire. +func (l *leader) lostNotify() <-chan struct{} { + l.mu.RLock() + defer l.mu.RUnlock() + return l.leaderc +} diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index b7e64daf3..a127c3100 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -18,7 +18,7 @@ import ( "sync" "golang.org/x/net/context" - "golang.org/x/time/rate" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" "github.com/coreos/etcd/clientv3" @@ -31,50 +31,35 @@ type watchProxy struct { cw clientv3.Watcher ctx context.Context + leader *leader + ranges *watchRanges - // retryLimiter controls the create watch retry rate on lost leaders. - retryLimiter *rate.Limiter - - // mu protects leaderc updates. - mu sync.RWMutex - leaderc chan struct{} + // mu protects adding outstanding watch servers through wg. + mu sync.Mutex // wg waits until all outstanding watch servers quit. wg sync.WaitGroup } -const ( - lostLeaderKey = "__lostleader" // watched to detect leader loss - retryPerSecond = 10 -) - func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { + cctx, cancel := context.WithCancel(c.Ctx()) wp := &watchProxy{ - cw: c.Watcher, - ctx: c.Ctx(), - retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond), - leaderc: make(chan struct{}), + cw: c.Watcher, + ctx: cctx, + leader: newLeader(c.Ctx(), c.Watcher), } wp.ranges = newWatchRanges(wp) ch := make(chan struct{}) go func() { defer close(ch) - // a new streams without opening any watchers won't catch - // a lost leader event, so have a special watch to monitor it - rev := int64((uint64(1) << 63) - 2) - lctx := clientv3.WithRequireLeader(wp.ctx) - for wp.ctx.Err() == nil { - wch := wp.cw.Watch(lctx, lostLeaderKey, clientv3.WithRev(rev)) - for range wch { - } - wp.mu.Lock() - close(wp.leaderc) - wp.leaderc = make(chan struct{}) - wp.mu.Unlock() - wp.retryLimiter.Wait(wp.ctx) - } + <-wp.leader.stopNotify() wp.mu.Lock() + select { + case <-wp.ctx.Done(): + case <-wp.leader.disconnectNotify(): + cancel() + } <-wp.ctx.Done() wp.mu.Unlock() wp.wg.Wait() @@ -104,11 +89,19 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { cancel: cancel, } - var leaderc <-chan struct{} + var lostLeaderC <-chan struct{} if md, ok := metadata.FromContext(stream.Context()); ok { v := md[rpctypes.MetadataRequireLeaderKey] if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader { - leaderc = wp.lostLeaderNotify() + lostLeaderC = wp.leader.lostNotify() + // if leader is known to be lost at creation time, avoid + // letting events through at all + select { + case <-lostLeaderC: + wp.wg.Done() + return rpctypes.ErrNoLeader + default: + } } } @@ -127,7 +120,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { go func() { defer func() { stopc <- struct{}{} }() select { - case <-leaderc: + case <-lostLeaderC: case <-ctx.Done(): case <-wp.ctx.Done(): } @@ -146,19 +139,15 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { }() select { - case <-leaderc: + case <-lostLeaderC: return rpctypes.ErrNoLeader + case <-wp.leader.disconnectNotify(): + return grpc.ErrClientConnClosing default: return wps.ctx.Err() } } -func (wp *watchProxy) lostLeaderNotify() <-chan struct{} { - wp.mu.RLock() - defer wp.mu.RUnlock() - return wp.leaderc -} - // watchProxyStream forwards etcd watch events to a proxied client stream. type watchProxyStream struct { ranges *watchRanges From 36f5b713bf4cad4702163ed9476ea83de9059b3d Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 14 Feb 2017 16:46:01 -0800 Subject: [PATCH 2/3] grpcproxy: don't wait for ctx.Done() to close kv donec Causes a goroutine leak in ActiveConnection.Close() tests. Channel is vestigial since removing ccache; revisit if kv ever needs goroutines. --- proxy/grpcproxy/kv.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/proxy/grpcproxy/kv.go b/proxy/grpcproxy/kv.go index 4db3b066f..65f850ed4 100644 --- a/proxy/grpcproxy/kv.go +++ b/proxy/grpcproxy/kv.go @@ -33,11 +33,7 @@ func NewKvProxy(c *clientv3.Client) (pb.KVServer, <-chan struct{}) { cache: cache.NewCache(cache.DefaultMaxEntries), } donec := make(chan struct{}) - go func() { - defer close(donec) - <-c.Ctx().Done() - kv.cache.Close() - }() + close(donec) return kv, donec } From a4a8393cb748885050b3b437d6cc81abbbca5c48 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 10 Feb 2017 15:46:48 -0800 Subject: [PATCH 3/3] integration: wait five elections before creating watch for require leader test Otherwise new watch will race with the leader watcher receiving the loss event. --- clientv3/integration/watch_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index a6d88d9fb..2d99249aa 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -681,7 +681,9 @@ func TestWatchWithRequireLeader(t *testing.T) { // wait for election timeout, then member[0] will not have a leader. tickDuration := 10 * time.Millisecond - time.Sleep(time.Duration(3*clus.Members[0].ElectionTicks) * tickDuration) + // existing streams need three elections before they're torn down; wait until 5 elections cycle + // so proxy tests receive a leader loss event on its existing watch before creating a new watch. + time.Sleep(time.Duration(5*clus.Members[0].ElectionTicks) * tickDuration) chLeader := liveClient.Watch(clientv3.WithRequireLeader(context.TODO()), "foo", clientv3.WithRev(1)) chNoLeader := liveClient.Watch(context.TODO(), "foo", clientv3.WithRev(1))