mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
clientv3: correct the nextRev on receving progress notification response
Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
parent
3c64ae4b79
commit
36fc3cae65
@ -880,12 +880,13 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// current progress of watch; <= store revision
|
// current progress of watch; <= store revision
|
||||||
nextRev = wr.Header.Revision
|
nextRev = wr.Header.Revision + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(wr.Events) > 0 {
|
if len(wr.Events) > 0 {
|
||||||
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
ws.initReq.rev = nextRev
|
ws.initReq.rev = nextRev
|
||||||
|
|
||||||
// created event is already sent above,
|
// created event is already sent above,
|
||||||
|
@ -24,6 +24,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
@ -871,6 +873,99 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestV3WatchProgressOnMemberRestart verifies the client side doesn't
|
||||||
|
// receive duplicated events.
|
||||||
|
// Refer to https://github.com/etcd-io/etcd/pull/15248#issuecomment-1423225742.
|
||||||
|
func TestV3WatchProgressOnMemberRestart(t *testing.T) {
|
||||||
|
integration.BeforeTest(t)
|
||||||
|
|
||||||
|
clus := integration.NewCluster(t, &integration.ClusterConfig{
|
||||||
|
Size: 1,
|
||||||
|
WatchProgressNotifyInterval: time.Second,
|
||||||
|
})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
client := clus.RandClient()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
errC := make(chan error, 1)
|
||||||
|
doneC := make(chan struct{}, 1)
|
||||||
|
progressNotifyC := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
defer close(doneC)
|
||||||
|
|
||||||
|
var (
|
||||||
|
lastWatchedModRevision int64
|
||||||
|
gotProgressNotification bool
|
||||||
|
)
|
||||||
|
|
||||||
|
wch := client.Watch(ctx, "foo", clientv3.WithProgressNotify())
|
||||||
|
for wr := range wch {
|
||||||
|
if wr.Err() != nil {
|
||||||
|
errC <- fmt.Errorf("watch error: %w", wr.Err())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if wr.IsProgressNotify() {
|
||||||
|
// We need to make sure at least one progress notification
|
||||||
|
// is received after receiving the normal watch response
|
||||||
|
// and before restarting the member.
|
||||||
|
if lastWatchedModRevision > 0 {
|
||||||
|
gotProgressNotification = true
|
||||||
|
progressNotifyC <- struct{}{}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range wr.Events {
|
||||||
|
if event.Kv.ModRevision <= lastWatchedModRevision {
|
||||||
|
errC <- fmt.Errorf("got an unexpected revision: %d, lastWatchedModRevision: %d",
|
||||||
|
event.Kv.ModRevision,
|
||||||
|
lastWatchedModRevision)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastWatchedModRevision = event.Kv.ModRevision
|
||||||
|
}
|
||||||
|
|
||||||
|
if gotProgressNotification {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// write the key before the member restarts
|
||||||
|
t.Log("Writing key 'foo'")
|
||||||
|
_, err := client.Put(ctx, "foo", "bar1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// make sure at least one progress notification is received
|
||||||
|
// before restarting the member
|
||||||
|
t.Log("Waiting for the progress notification")
|
||||||
|
<-progressNotifyC
|
||||||
|
|
||||||
|
// restart the member
|
||||||
|
t.Log("Restarting the member")
|
||||||
|
clus.Members[0].Stop(t)
|
||||||
|
clus.Members[0].Restart(t)
|
||||||
|
clus.WaitMembersForLeader(t, clus.Members)
|
||||||
|
|
||||||
|
// write the same key again after the member restarted
|
||||||
|
t.Log("Writing the same key 'foo' again after restarting the member")
|
||||||
|
_, err = client.Put(ctx, "foo", "bar2")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Log("Waiting for result")
|
||||||
|
select {
|
||||||
|
case err := <-errC:
|
||||||
|
t.Fatal(err)
|
||||||
|
case <-doneC:
|
||||||
|
t.Log("Done")
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
t.Fatal("Timed out waiting for the response")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestV3WatchMultipleStreamsSynced(t *testing.T) {
|
func TestV3WatchMultipleStreamsSynced(t *testing.T) {
|
||||||
integration.BeforeTest(t)
|
integration.BeforeTest(t)
|
||||||
testV3WatchMultipleStreams(t, 0)
|
testV3WatchMultipleStreams(t, 0)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user