clientv3: fix current watcher reconnection

If a current watcher didn't receive any events, a reconnect cycle would
advance its revision to the store's current revision. Instead, reconnect
using the watcher's creation header revision if the watcher hasn't received
any events.

Fixes #4502
This commit is contained in:
Anthony Romano 2016-02-18 14:28:52 -08:00
parent 71288597da
commit f66162932c
2 changed files with 19 additions and 4 deletions

View File

@ -163,13 +163,18 @@ func TestWatchReconnRequest(t *testing.T) {
}
func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
// take down watcher connection
donec := make(chan struct{})
donec, stopc := make(chan struct{}), make(chan struct{}, 1)
go func() {
timer := time.After(2 * time.Second)
defer close(donec)
// take down watcher connection
for {
wctx.wclient.ActiveConnection().Close()
select {
case <-donec:
case <-timer:
// spinning on close may live lock reconnection
return
case <-stopc:
return
default:
}
@ -179,7 +184,11 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
close(donec)
// wait for disconnections to stop
stopc <- struct{}{}
<-donec
// ensure watcher works
putAndWatch(t, wctx, "a", "a")
}

View File

@ -198,6 +198,12 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) {
resumec: make(chan int64),
}
if pendingReq.rev == 0 {
// note the header revision so that a put following a current watcher
// disconnect will arrive on the watcher channel after reconnect
ws.initReq.rev = resp.Header.Revision
}
w.mu.Lock()
w.streams[ws.id] = ws
w.mu.Unlock()