diff --git a/client/client.go b/client/client.go index 4c4d41eb9..f9131b472 100644 --- a/client/client.go +++ b/client/client.go @@ -22,7 +22,6 @@ import ( "net" "net/http" "net/url" - "reflect" "sort" "strconv" "sync" @@ -261,53 +260,67 @@ type httpClusterClient struct { selectionMode EndpointSelectionMode } -func (c *httpClusterClient) getLeaderEndpoint() (string, error) { - mAPI := NewMembersAPI(c) - leader, err := mAPI.Leader(context.Background()) +func (c *httpClusterClient) getLeaderEndpoint(ctx context.Context, eps []url.URL) (string, error) { + ceps := make([]url.URL, len(eps)) + copy(ceps, eps) + + // To perform a lookup on the new endpoint list without using the current + // client, we'll copy it + clientCopy := &httpClusterClient{ + clientFactory: c.clientFactory, + credentials: c.credentials, + rand: c.rand, + + pinned: 0, + endpoints: ceps, + } + + mAPI := NewMembersAPI(clientCopy) + leader, err := mAPI.Leader(ctx) if err != nil { return "", err } + if len(leader.ClientURLs) == 0 { + return "", ErrNoLeaderEndpoint + } return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs? } -func (c *httpClusterClient) SetEndpoints(eps []string) error { +func (c *httpClusterClient) parseEndpoints(eps []string) ([]url.URL, error) { if len(eps) == 0 { - return ErrNoEndpoints + return []url.URL{}, ErrNoEndpoints } neps := make([]url.URL, len(eps)) for i, ep := range eps { u, err := url.Parse(ep) if err != nil { - return err + return []url.URL{}, err } neps[i] = *u } + return neps, nil +} - switch c.selectionMode { - case EndpointSelectionRandom: - c.endpoints = shuffleEndpoints(c.rand, neps) - c.pinned = 0 - case EndpointSelectionPrioritizeLeader: - c.endpoints = neps - lep, err := c.getLeaderEndpoint() - if err != nil { - return ErrNoLeaderEndpoint - } - - for i := range c.endpoints { - if c.endpoints[i].String() == lep { - c.pinned = i - break - } - } - // If endpoints doesn't have the lu, just keep c.pinned = 0. - // Forwarding between follower and leader would be required but it works. - default: - return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode) +func (c *httpClusterClient) SetEndpoints(eps []string) error { + neps, err := c.parseEndpoints(eps) + if err != nil { + return err } + c.Lock() + defer c.Unlock() + + c.endpoints = shuffleEndpoints(c.rand, neps) + // We're not doing anything for PrioritizeLeader here. This is + // due to not having a context meaning we can't call getLeaderEndpoint + // However, if you're using PrioritizeLeader, you've already been told + // to regularly call sync, where we do have a ctx, and can figure the + // leader. PrioritizeLeader is also quite a loose guarantee, so deal + // with it + c.pinned = 0 + return nil } @@ -401,27 +414,51 @@ func (c *httpClusterClient) Sync(ctx context.Context) error { return err } - c.Lock() - defer c.Unlock() - var eps []string for _, m := range ms { eps = append(eps, m.ClientURLs...) } - sort.Sort(sort.StringSlice(eps)) - ceps := make([]string, len(c.endpoints)) - for i, cep := range c.endpoints { - ceps[i] = cep.String() - } - sort.Sort(sort.StringSlice(ceps)) - // fast path if no change happens - // this helps client to pin the endpoint when no cluster change - if reflect.DeepEqual(eps, ceps) { - return nil + neps, err := c.parseEndpoints(eps) + if err != nil { + return err } - return c.SetEndpoints(eps) + npin := 0 + + switch c.selectionMode { + case EndpointSelectionRandom: + c.RLock() + eq := endpointsEqual(c.endpoints, neps) + c.RUnlock() + + if eq { + return nil + } + // When items in the endpoint list changes, we choose a new pin + neps = shuffleEndpoints(c.rand, neps) + case EndpointSelectionPrioritizeLeader: + nle, err := c.getLeaderEndpoint(ctx, neps) + if err != nil { + return ErrNoLeaderEndpoint + } + + for i, n := range neps { + if n.String() == nle { + npin = i + break + } + } + default: + return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode) + } + + c.Lock() + defer c.Unlock() + c.endpoints = neps + c.pinned = npin + + return nil } func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error { @@ -607,3 +644,27 @@ func shuffleEndpoints(r *rand.Rand, eps []url.URL) []url.URL { } return neps } + +func endpointsEqual(left, right []url.URL) bool { + if len(left) != len(right) { + return false + } + + sLeft := make([]string, len(left)) + sRight := make([]string, len(right)) + for i, l := range left { + sLeft[i] = l.String() + } + for i, r := range right { + sRight[i] = r.String() + } + + sort.Strings(sLeft) + sort.Strings(sRight) + for i := range sLeft { + if sLeft[i] != sRight[i] { + return false + } + } + return true +} diff --git a/client/client_test.go b/client/client_test.go index aaf2bec47..598e01c4a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -900,6 +900,90 @@ func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) { } } +// TestHTTPClusterClientSyncUnpinEndpoint tests that Sync() unpins the endpoint when +// it gets a different member list than before. +func TestHTTPClusterClientSyncUnpinEndpoint(t *testing.T) { + cf := newStaticHTTPClientFactory([]staticHTTPResponse{ + { + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + }, + { + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"members":[{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + }, + { + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + }, + }) + + hc := &httpClusterClient{ + clientFactory: cf, + rand: rand.New(rand.NewSource(0)), + } + err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"}) + if err != nil { + t.Fatalf("unexpected error during setup: %#v", err) + } + wants := []string{"http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"} + + for i := 0; i < 3; i++ { + err = hc.Sync(context.Background()) + if err != nil { + t.Fatalf("#%d: unexpected error during Sync: %#v", i, err) + } + + if g := hc.endpoints[hc.pinned]; g.String() != wants[i] { + t.Errorf("#%d: pinned endpoint = %v, want %v", i, g, wants[i]) + } + } +} + +// TestHTTPClusterClientSyncPinLeaderEndpoint tests that Sync() pins the leader +// when the selection mode is EndpointSelectionPrioritizeLeader +func TestHTTPClusterClientSyncPinLeaderEndpoint(t *testing.T) { + cf := newStaticHTTPClientFactory([]staticHTTPResponse{ + { + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + }, + { + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]}`), + }, + { + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + }, + { + resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}}, + body: []byte(`{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}`), + }, + }) + + hc := &httpClusterClient{ + clientFactory: cf, + rand: rand.New(rand.NewSource(0)), + selectionMode: EndpointSelectionPrioritizeLeader, + endpoints: []url.URL{{}}, // Need somewhere to pretend to send to initially + } + + wants := []string{"http://127.0.0.1:4003", "http://127.0.0.1:4002"} + + for i, want := range wants { + err := hc.Sync(context.Background()) + if err != nil { + t.Fatalf("#%d: unexpected error during Sync: %#v", i, err) + } + + pinned := hc.endpoints[hc.pinned].String() + if pinned != want { + t.Errorf("#%d: pinned endpoint = %v, want %v", i, pinned, want) + } + } +} + func TestHTTPClusterClientResetFail(t *testing.T) { tests := [][]string{ // need at least one endpoint