diff --git a/clientv3/integration/network_partition_test.go b/clientv3/integration/network_partition_test.go index 99c6df720..da09649f0 100644 --- a/clientv3/integration/network_partition_test.go +++ b/clientv3/integration/network_partition_test.go @@ -109,3 +109,69 @@ func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, c t.Errorf("balancer did not switch in time (%v)", err) } } + +func TestBalancerUnderNetworkPartitionWatchLeader(t *testing.T) { + testBalancerUnderNetworkPartitionWatch(t, true) +} + +func TestBalancerUnderNetworkPartitionWatchFollower(t *testing.T) { + testBalancerUnderNetworkPartitionWatch(t, false) +} + +// testBalancerUnderNetworkPartitionWatch ensures watch stream +// to a partitioned node be closed when context requires leader. +func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{ + Size: 3, + SkipCreatingClient: true, + }) + defer clus.Terminate(t) + + eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()} + + target := clus.WaitLeader(t) + if !isolateLeader { + target = (target + 1) % 3 + } + + // pin eps[target] + watchCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}}) + if err != nil { + t.Fatal(err) + } + defer watchCli.Close() + + // wait for eps[target] to be pinned + waitPinReady(t, watchCli) + + // add all eps to list, so that when the original pined one fails + // the client can switch to other available eps + watchCli.SetEndpoints(eps...) + + wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify()) + select { + case <-wch: + case <-time.After(3 * time.Second): + t.Fatal("took too long to create watch") + } + + // isolate eps[target] + clus.Members[target].InjectPartition(t, + clus.Members[(target+1)%3], + clus.Members[(target+2)%3], + ) + + select { + case ev := <-wch: + if len(ev.Events) != 0 { + t.Fatal("expected no event") + } + if err = ev.Err(); err != rpctypes.ErrNoLeader { + t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err) + } + case <-time.After(3 * time.Second): // enough time to detect leader lost + t.Fatal("took too long to detect leader lost") + } +}