mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
grpcproxy, etcdmain, integration: return done channel with WatchServer
Makes it possible to synchronously close the watch server. Fixes #7078
This commit is contained in:
parent
0dce29ae57
commit
9b5eb1ae5a
@ -104,7 +104,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
|
||||
kvp := grpcproxy.NewKvProxy(client)
|
||||
watchp := grpcproxy.NewWatchProxy(client)
|
||||
watchp, _ := grpcproxy.NewWatchProxy(client)
|
||||
clusterp := grpcproxy.NewClusterProxy(client)
|
||||
leasep := grpcproxy.NewLeaseProxy(client)
|
||||
mainp := grpcproxy.NewMaintenanceProxy(client)
|
||||
|
@ -26,25 +26,43 @@ import (
|
||||
|
||||
var (
|
||||
pmu sync.Mutex
|
||||
proxies map[*clientv3.Client]grpcAPI = make(map[*clientv3.Client]grpcAPI)
|
||||
proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy)
|
||||
)
|
||||
|
||||
type grpcClientProxy struct {
|
||||
grpc grpcAPI
|
||||
wdonec <-chan struct{}
|
||||
}
|
||||
|
||||
func toGRPC(c *clientv3.Client) grpcAPI {
|
||||
pmu.Lock()
|
||||
defer pmu.Unlock()
|
||||
|
||||
if v, ok := proxies[c]; ok {
|
||||
return v
|
||||
return v.grpc
|
||||
}
|
||||
api := grpcAPI{
|
||||
|
||||
wp, wpch := grpcproxy.NewWatchProxy(c)
|
||||
grpc := grpcAPI{
|
||||
pb.NewClusterClient(c.ActiveConnection()),
|
||||
grpcproxy.KvServerToKvClient(grpcproxy.NewKvProxy(c)),
|
||||
pb.NewLeaseClient(c.ActiveConnection()),
|
||||
grpcproxy.WatchServerToWatchClient(grpcproxy.NewWatchProxy(c)),
|
||||
grpcproxy.WatchServerToWatchClient(wp),
|
||||
pb.NewMaintenanceClient(c.ActiveConnection()),
|
||||
}
|
||||
proxies[c] = api
|
||||
return api
|
||||
proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch}
|
||||
return grpc
|
||||
}
|
||||
|
||||
type watchCloser struct {
|
||||
clientv3.Watcher
|
||||
wdonec <-chan struct{}
|
||||
}
|
||||
|
||||
func (wc *watchCloser) Close() error {
|
||||
err := wc.Watcher.Close()
|
||||
<-wc.wdonec
|
||||
return err
|
||||
}
|
||||
|
||||
func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
|
||||
@ -54,6 +72,11 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) {
|
||||
}
|
||||
rpc := toGRPC(c)
|
||||
c.KV = clientv3.NewKVFromKVClient(rpc.KV)
|
||||
c.Watcher = clientv3.NewWatchFromWatchClient(rpc.Watch)
|
||||
pmu.Lock()
|
||||
c.Watcher = &watchCloser{
|
||||
Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch),
|
||||
wdonec: proxies[c].wdonec,
|
||||
}
|
||||
pmu.Unlock()
|
||||
return c, nil
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ const (
|
||||
retryPerSecond = 10
|
||||
)
|
||||
|
||||
func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
|
||||
func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
|
||||
wp := &watchProxy{
|
||||
cw: c.Watcher,
|
||||
ctx: clientv3.WithRequireLeader(c.Ctx()),
|
||||
@ -57,7 +57,9 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
|
||||
leaderc: make(chan struct{}),
|
||||
}
|
||||
wp.ranges = newWatchRanges(wp)
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
defer close(ch)
|
||||
// a new streams without opening any watchers won't catch
|
||||
// a lost leader event, so have a special watch to monitor it
|
||||
rev := int64((uint64(1) << 63) - 2)
|
||||
@ -77,7 +79,7 @@ func NewWatchProxy(c *clientv3.Client) pb.WatchServer {
|
||||
wp.wg.Wait()
|
||||
wp.ranges.stop()
|
||||
}()
|
||||
return wp
|
||||
return wp, ch
|
||||
}
|
||||
|
||||
func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user