Merge pull request #7268 from heyitsanthony/proxy-test-clientv3

test: add proxy tests for clientv3 integration tests
This commit is contained in:
Anthony Romano 2017-02-02 20:31:05 -08:00 committed by GitHub
commit d159353d51
3 changed files with 17 additions and 22 deletions

View File

@ -52,7 +52,7 @@ const (
func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
wp := &watchProxy{ wp := &watchProxy{
cw: c.Watcher, cw: c.Watcher,
ctx: clientv3.WithRequireLeader(c.Ctx()), ctx: c.Ctx(),
retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond), retryLimiter: rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond),
leaderc: make(chan struct{}), leaderc: make(chan struct{}),
} }
@ -63,8 +63,9 @@ func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
// a new streams without opening any watchers won't catch // a new streams without opening any watchers won't catch
// a lost leader event, so have a special watch to monitor it // a lost leader event, so have a special watch to monitor it
rev := int64((uint64(1) << 63) - 2) rev := int64((uint64(1) << 63) - 2)
lctx := clientv3.WithRequireLeader(wp.ctx)
for wp.ctx.Err() == nil { for wp.ctx.Err() == nil {
wch := wp.cw.Watch(wp.ctx, lostLeaderKey, clientv3.WithRev(rev)) wch := wp.cw.Watch(lctx, lostLeaderKey, clientv3.WithRev(rev))
for range wch { for range wch {
} }
wp.mu.Lock() wp.mu.Lock()

View File

@ -50,28 +50,21 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
wb.add(w) wb.add(w)
go func() { go func() {
defer close(wb.donec) defer close(wb.donec)
// loop because leader loss will close channel
for cctx.Err() == nil {
opts := []clientv3.OpOption{ opts := []clientv3.OpOption{
clientv3.WithRange(w.wr.end), clientv3.WithRange(w.wr.end),
clientv3.WithProgressNotify(), clientv3.WithProgressNotify(),
clientv3.WithRev(wb.nextrev), clientv3.WithRev(wb.nextrev),
clientv3.WithPrevKV(), clientv3.WithPrevKV(),
clientv3.WithCreatedNotify(),
} }
// The create notification should be the first response;
// if the watch is recreated following leader loss, it
// shouldn't post a second create response to the client.
if wb.responses == 0 {
opts = append(opts, clientv3.WithCreatedNotify())
}
wch := wp.cw.Watch(cctx, w.wr.key, opts...) wch := wp.cw.Watch(cctx, w.wr.key, opts...)
for wr := range wch { for wr := range wch {
wb.bcast(wr) wb.bcast(wr)
update(wb) update(wb)
} }
wp.retryLimiter.Wait(cctx)
}
}() }()
return wb return wb
} }

1
test
View File

@ -134,6 +134,7 @@ function integration_e2e_pass {
function grpcproxy_pass { function grpcproxy_pass {
go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration
go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
} }
function release_pass { function release_pass {