Merge pull request #7240 from fanminshi/balancer_fix

clientv3: fix balancer update address bug
This commit is contained in:
fanmin shi 2017-01-26 15:08:50 -08:00 committed by GitHub
commit 3351a71e84
2 changed files with 22 additions and 6 deletions

View File

@ -124,7 +124,11 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])}) addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
} }
b.addrs = addrs b.addrs = addrs
b.notifyCh <- addrs // updating notifyCh can trigger new connections,
// but balancer only expects new connections if all connections are down
if b.pinAddr == "" {
b.notifyCh <- addrs
}
} }
func (b *simpleBalancer) Up(addr grpc.Address) func(error) { func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
@ -220,7 +224,7 @@ func (b *simpleBalancer) Close() error {
close(b.notifyCh) close(b.notifyCh)
b.pinAddr = "" b.pinAddr = ""
// In the case of follwing scenerio: // In the case of following scenario:
// 1. upc is not closed; no pinned address // 1. upc is not closed; no pinned address
// 2. client issues an rpc, calling invoke(), which calls Get(), enters for loop, blocks // 2. client issues an rpc, calling invoke(), which calls Get(), enters for loop, blocks
// 3. clientconn.Close() calls balancer.Close(); closed = true // 3. clientconn.Close() calls balancer.Close(); closed = true

View File

@ -26,7 +26,16 @@ import (
) )
// TestDialSetEndpoints ensures SetEndpoints can replace unavailable endpoints with available ones. // TestDialSetEndpoints ensures SetEndpoints can replace unavailable endpoints with available ones.
func TestDialSetEndpoints(t *testing.T) { func TestDialSetEndpointsBeforeFail(t *testing.T) {
testDialSetEndpoints(t, true)
}
func TestDialSetEndpointsAfterFail(t *testing.T) {
testDialSetEndpoints(t, false)
}
// testDialSetEndpoints ensures SetEndpoints can replace unavailable endpoints with available ones.
func testDialSetEndpoints(t *testing.T, setBefore bool) {
defer testutil.AfterTest(t) defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t) defer clus.Terminate(t)
@ -45,13 +54,16 @@ func TestDialSetEndpoints(t *testing.T) {
} }
defer cli.Close() defer cli.Close()
if setBefore {
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
}
// make a dead node // make a dead node
clus.Members[toKill].Stop(t) clus.Members[toKill].Stop(t)
clus.WaitLeader(t) clus.WaitLeader(t)
// update client with available endpoints if !setBefore {
cli.SetEndpoints(eps[(toKill+1)%3]) cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil { if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
t.Fatal(err) t.Fatal(err)