mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Move out of watch code since will be shared with lease code. Also assumes leader does not exist unless watch can be successfully created.
258 lines
5.6 KiB
Go
258 lines
5.6 KiB
Go
// Copyright 2016 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package grpcproxy
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"github.com/coreos/etcd/clientv3"
|
|
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
)
|
|
|
|
type watchProxy struct {
|
|
cw clientv3.Watcher
|
|
ctx context.Context
|
|
|
|
leader *leader
|
|
|
|
ranges *watchRanges
|
|
|
|
// mu protects adding outstanding watch servers through wg.
|
|
mu sync.Mutex
|
|
|
|
// wg waits until all outstanding watch servers quit.
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
|
cctx, cancel := context.WithCancel(c.Ctx())
|
|
wp := &watchProxy{
|
|
cw: c.Watcher,
|
|
ctx: cctx,
|
|
leader: newLeader(c.Ctx(), c.Watcher),
|
|
}
|
|
wp.ranges = newWatchRanges(wp)
|
|
ch := make(chan struct{})
|
|
go func() {
|
|
defer close(ch)
|
|
<-wp.leader.stopNotify()
|
|
wp.mu.Lock()
|
|
select {
|
|
case <-wp.ctx.Done():
|
|
case <-wp.leader.disconnectNotify():
|
|
cancel()
|
|
}
|
|
<-wp.ctx.Done()
|
|
wp.mu.Unlock()
|
|
wp.wg.Wait()
|
|
wp.ranges.stop()
|
|
}()
|
|
return wp, ch
|
|
}
|
|
|
|
func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
|
wp.mu.Lock()
|
|
select {
|
|
case <-wp.ctx.Done():
|
|
wp.mu.Unlock()
|
|
return
|
|
default:
|
|
wp.wg.Add(1)
|
|
}
|
|
wp.mu.Unlock()
|
|
|
|
ctx, cancel := context.WithCancel(stream.Context())
|
|
wps := &watchProxyStream{
|
|
ranges: wp.ranges,
|
|
watchers: make(map[int64]*watcher),
|
|
stream: stream,
|
|
watchCh: make(chan *pb.WatchResponse, 1024),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
var lostLeaderC <-chan struct{}
|
|
if md, ok := metadata.FromContext(stream.Context()); ok {
|
|
v := md[rpctypes.MetadataRequireLeaderKey]
|
|
if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
|
|
lostLeaderC = wp.leader.lostNotify()
|
|
// if leader is known to be lost at creation time, avoid
|
|
// letting events through at all
|
|
select {
|
|
case <-lostLeaderC:
|
|
wp.wg.Done()
|
|
return rpctypes.ErrNoLeader
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// post to stopc => terminate server stream; can't use a waitgroup
|
|
// since all goroutines will only terminate after Watch() exits.
|
|
stopc := make(chan struct{}, 3)
|
|
go func() {
|
|
defer func() { stopc <- struct{}{} }()
|
|
wps.recvLoop()
|
|
}()
|
|
go func() {
|
|
defer func() { stopc <- struct{}{} }()
|
|
wps.sendLoop()
|
|
}()
|
|
// tear down watch if leader goes down or entire watch proxy is terminated
|
|
go func() {
|
|
defer func() { stopc <- struct{}{} }()
|
|
select {
|
|
case <-lostLeaderC:
|
|
case <-ctx.Done():
|
|
case <-wp.ctx.Done():
|
|
}
|
|
}()
|
|
|
|
<-stopc
|
|
cancel()
|
|
|
|
// recv/send may only shutdown after function exits;
|
|
// goroutine notifies proxy that stream is through
|
|
go func() {
|
|
<-stopc
|
|
<-stopc
|
|
wps.close()
|
|
wp.wg.Done()
|
|
}()
|
|
|
|
select {
|
|
case <-lostLeaderC:
|
|
return rpctypes.ErrNoLeader
|
|
case <-wp.leader.disconnectNotify():
|
|
return grpc.ErrClientConnClosing
|
|
default:
|
|
return wps.ctx.Err()
|
|
}
|
|
}
|
|
|
|
// watchProxyStream forwards etcd watch events to a proxied client stream.
|
|
type watchProxyStream struct {
|
|
ranges *watchRanges
|
|
|
|
// mu protects watchers and nextWatcherID
|
|
mu sync.Mutex
|
|
// watchers receive events from watch broadcast.
|
|
watchers map[int64]*watcher
|
|
// nextWatcherID is the id to assign the next watcher on this stream.
|
|
nextWatcherID int64
|
|
|
|
stream pb.Watch_WatchServer
|
|
|
|
// watchCh receives watch responses from the watchers.
|
|
watchCh chan *pb.WatchResponse
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func (wps *watchProxyStream) close() {
|
|
var wg sync.WaitGroup
|
|
wps.cancel()
|
|
wps.mu.Lock()
|
|
wg.Add(len(wps.watchers))
|
|
for _, wpsw := range wps.watchers {
|
|
go func(w *watcher) {
|
|
wps.ranges.delete(w)
|
|
wg.Done()
|
|
}(wpsw)
|
|
}
|
|
wps.watchers = nil
|
|
wps.mu.Unlock()
|
|
|
|
wg.Wait()
|
|
|
|
close(wps.watchCh)
|
|
}
|
|
|
|
func (wps *watchProxyStream) recvLoop() error {
|
|
for {
|
|
req, err := wps.stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch uv := req.RequestUnion.(type) {
|
|
case *pb.WatchRequest_CreateRequest:
|
|
cr := uv.CreateRequest
|
|
w := &watcher{
|
|
wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
|
|
id: wps.nextWatcherID,
|
|
wps: wps,
|
|
|
|
nextrev: cr.StartRevision,
|
|
progress: cr.ProgressNotify,
|
|
prevKV: cr.PrevKv,
|
|
filters: v3rpc.FiltersFromRequest(cr),
|
|
}
|
|
if !w.wr.valid() {
|
|
w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
|
|
continue
|
|
}
|
|
wps.nextWatcherID++
|
|
w.nextrev = cr.StartRevision
|
|
wps.watchers[w.id] = w
|
|
wps.ranges.add(w)
|
|
case *pb.WatchRequest_CancelRequest:
|
|
wps.delete(uv.CancelRequest.WatchId)
|
|
default:
|
|
panic("not implemented")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (wps *watchProxyStream) sendLoop() {
|
|
for {
|
|
select {
|
|
case wresp, ok := <-wps.watchCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := wps.stream.Send(wresp); err != nil {
|
|
return
|
|
}
|
|
case <-wps.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (wps *watchProxyStream) delete(id int64) {
|
|
wps.mu.Lock()
|
|
defer wps.mu.Unlock()
|
|
|
|
w, ok := wps.watchers[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
wps.ranges.delete(w)
|
|
delete(wps.watchers, id)
|
|
resp := &pb.WatchResponse{
|
|
Header: &w.lastHeader,
|
|
WatchId: id,
|
|
Canceled: true,
|
|
}
|
|
wps.watchCh <- resp
|
|
}
|