etcd/proxy/grpcproxy/watch.go
2016-09-03 07:53:18 -07:00

268 lines
5.3 KiB
Go

// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"io"
"sync"
"golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type watchProxy struct {
cw clientv3.Watcher
wgs watchergroups
mu sync.Mutex
nextStreamID int64
ctx context.Context
}
func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
wp := &watchProxy{
cw: c.Watcher,
wgs: watchergroups{
cw: c.Watcher,
groups: make(map[watchRange]*watcherGroup),
idToGroup: make(map[receiverID]*watcherGroup),
proxyCtx: c.Ctx(),
},
ctx: c.Ctx(),
}
go func() {
<-wp.ctx.Done()
wp.wgs.stop()
}()
return wp
}
func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
wp.mu.Lock()
wp.nextStreamID++
sid := wp.nextStreamID
wp.mu.Unlock()
sws := serverWatchStream{
cw: wp.cw,
groups: &wp.wgs,
singles: make(map[int64]*watcherSingle),
inGroups: make(map[int64]struct{}),
id: sid,
gRPCStream: stream,
watchCh: make(chan *pb.WatchResponse, 1024),
proxyCtx: wp.ctx,
}
go sws.recvLoop()
sws.sendLoop()
return wp.ctx.Err()
}
type serverWatchStream struct {
id int64
cw clientv3.Watcher
mu sync.Mutex // make sure any access of groups and singles is atomic
groups *watchergroups
singles map[int64]*watcherSingle
inGroups map[int64]struct{}
gRPCStream pb.Watch_WatchServer
watchCh chan *pb.WatchResponse
nextWatcherID int64
proxyCtx context.Context
}
func (sws *serverWatchStream) close() {
var wg sync.WaitGroup
sws.mu.Lock()
wg.Add(len(sws.singles) + len(sws.inGroups))
for _, ws := range sws.singles {
// copy the range variable to avoid race
copyws := ws
go func() {
copyws.stop()
wg.Done()
}()
}
for id := range sws.inGroups {
// copy the range variable to avoid race
wid := id
go func() {
sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: wid})
wg.Done()
}()
}
sws.inGroups = nil
sws.mu.Unlock()
wg.Wait()
close(sws.watchCh)
}
func (sws *serverWatchStream) recvLoop() error {
defer sws.close()
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
cr := uv.CreateRequest
watcher := watcher{
wr: watchRange{
key: string(cr.Key),
end: string(cr.RangeEnd),
},
id: sws.nextWatcherID,
ch: sws.watchCh,
progress: cr.ProgressNotify,
filters: v3rpc.FiltersFromRequest(cr),
}
if cr.StartRevision != 0 {
sws.addDedicatedWatcher(watcher, cr.StartRevision)
} else {
sws.addCoalescedWatcher(watcher)
}
sws.nextWatcherID++
case *pb.WatchRequest_CancelRequest:
sws.removeWatcher(uv.CancelRequest.WatchId)
default:
panic("not implemented")
}
}
}
func (sws *serverWatchStream) sendLoop() {
for {
select {
case wresp, ok := <-sws.watchCh:
if !ok {
return
}
if err := sws.gRPCStream.Send(wresp); err != nil {
return
}
case <-sws.proxyCtx.Done():
return
}
}
}
func (sws *serverWatchStream) addCoalescedWatcher(w watcher) {
sws.mu.Lock()
defer sws.mu.Unlock()
rid := receiverID{streamID: sws.id, watcherID: w.id}
sws.groups.addWatcher(rid, w)
sws.inGroups[w.id] = struct{}{}
}
func (sws *serverWatchStream) addDedicatedWatcher(w watcher, rev int64) {
sws.mu.Lock()
defer sws.mu.Unlock()
ctx, cancel := context.WithCancel(sws.proxyCtx)
wch := sws.cw.Watch(ctx,
w.wr.key, clientv3.WithRange(w.wr.end),
clientv3.WithRev(rev),
clientv3.WithProgressNotify(),
clientv3.WithCreatedNotify(),
)
ws := newWatcherSingle(wch, cancel, w, sws)
sws.singles[w.id] = ws
go ws.run()
}
func (sws *serverWatchStream) maybeCoalesceWatcher(ws watcherSingle) bool {
sws.mu.Lock()
defer sws.mu.Unlock()
rid := receiverID{streamID: sws.id, watcherID: ws.w.id}
// do not add new watchers when stream is closing
if sws.inGroups == nil {
return false
}
if sws.groups.maybeJoinWatcherSingle(rid, ws) {
delete(sws.singles, ws.w.id)
sws.inGroups[ws.w.id] = struct{}{}
return true
}
return false
}
func (sws *serverWatchStream) removeWatcher(id int64) {
sws.mu.Lock()
defer sws.mu.Unlock()
var (
rev int64
ok bool
)
defer func() {
if !ok {
return
}
resp := &pb.WatchResponse{
Header: &pb.ResponseHeader{
// todo: fill in ClusterId
// todo: fill in MemberId:
Revision: rev,
// todo: fill in RaftTerm:
},
WatchId: id,
Canceled: true,
}
sws.watchCh <- resp
}()
rev, ok = sws.groups.removeWatcher(receiverID{streamID: sws.id, watcherID: id})
if ok {
delete(sws.inGroups, id)
return
}
var ws *watcherSingle
if ws, ok = sws.singles[id]; ok {
delete(sws.singles, id)
ws.stop()
rev = ws.lastStoreRev
}
}