From 53be8405f302ed36ec27f4f376f96c3d2f6fcf63 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Mon, 21 Dec 2015 17:46:41 +0900 Subject: [PATCH 1/2] client: a new API for obtaining a leader node information --- client/members.go | 32 ++++++++++++++++++ client/members_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/client/members.go b/client/members.go index b6f33ed76..71b01b27f 100644 --- a/client/members.go +++ b/client/members.go @@ -29,6 +29,7 @@ import ( var ( defaultV2MembersPrefix = "/v2/members" + defaultLeaderSuffix = "/leader" ) type Member struct { @@ -105,6 +106,9 @@ type MembersAPI interface { // Update instructs etcd to update an existing Member in the cluster. Update(ctx context.Context, mID string, peerURLs []string) error + + // Leader gets current leader of the cluster + Leader(ctx context.Context) (*Member, error) } type httpMembersAPI struct { @@ -199,6 +203,25 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error { return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone) } +func (m *httpMembersAPI) Leader(ctx context.Context) (*Member, error) { + req := &membersAPIActionLeader{} + resp, body, err := m.client.Do(ctx, req) + if err != nil { + return nil, err + } + + if err := assertStatusCode(resp.StatusCode, http.StatusOK); err != nil { + return nil, err + } + + var leader Member + if err := json.Unmarshal(body, &leader); err != nil { + return nil, err + } + + return &leader, nil +} + type membersAPIActionList struct{} func (l *membersAPIActionList) HTTPRequest(ep url.URL) *http.Request { @@ -255,6 +278,15 @@ func assertStatusCode(got int, want ...int) (err error) { return fmt.Errorf("unexpected status code %d", got) } +type membersAPIActionLeader struct{} + +func (l *membersAPIActionLeader) HTTPRequest(ep url.URL) *http.Request { + u := v2MembersURL(ep) + u.Path = path.Join(u.Path, defaultLeaderSuffix) + req, _ := http.NewRequest("GET", u.String(), nil) + return req +} + // v2MembersURL add the necessary path to the provided endpoint // to route requests to the default v2 members API. func v2MembersURL(ep url.URL) *url.URL { diff --git a/client/members_test.go b/client/members_test.go index df7be1ccb..494e2dc56 100644 --- a/client/members_test.go +++ b/client/members_test.go @@ -114,6 +114,23 @@ func TestMembersAPIActionRemove(t *testing.T) { } } +func TestMembersAPIActionLeader(t *testing.T) { + ep := url.URL{Scheme: "http", Host: "example.com"} + act := &membersAPIActionLeader{} + + wantURL := &url.URL{ + Scheme: "http", + Host: "example.com", + Path: "/v2/members/leader", + } + + got := *act.HTTPRequest(ep) + err := assertRequest(got, "GET", wantURL, http.Header{}, nil) + if err != nil { + t.Error(err.Error()) + } +} + func TestAssertStatusCode(t *testing.T) { if err := assertStatusCode(404, 400); err == nil { t.Errorf("assertStatusCode failed to detect conflict in 400 vs 404") @@ -520,3 +537,63 @@ func TestHTTPMembersAPIListError(t *testing.T) { } } } + +func TestHTTPMembersAPILeaderSuccess(t *testing.T) { + wantAction := &membersAPIActionLeader{} + mAPI := &httpMembersAPI{ + client: &actionAssertingHTTPClient{ + t: t, + act: wantAction, + resp: http.Response{ + StatusCode: http.StatusOK, + }, + body: []byte(`{"id":"94088180e21eb87b","name":"node2","peerURLs":["http://127.0.0.1:7002"],"clientURLs":["http://127.0.0.1:4002"]}`), + }, + } + + wantResponseMember := &Member{ + ID: "94088180e21eb87b", + Name: "node2", + PeerURLs: []string{"http://127.0.0.1:7002"}, + ClientURLs: []string{"http://127.0.0.1:4002"}, + } + + m, err := mAPI.Leader(context.Background()) + if err != nil { + t.Errorf("err = %v, want %v", err, nil) + } + if !reflect.DeepEqual(wantResponseMember, m) { + t.Errorf("incorrect member: member = %v, want %v", wantResponseMember, m) + } +} + +func TestHTTPMembersAPILeaderError(t *testing.T) { + tests := []httpClient{ + // generic httpClient failure + &staticHTTPClient{err: errors.New("fail!")}, + + // unrecognized HTTP status code + &staticHTTPClient{ + resp: http.Response{StatusCode: http.StatusTeapot}, + }, + + // fail to unmarshal body on StatusOK + &staticHTTPClient{ + resp: http.Response{ + StatusCode: http.StatusOK, + }, + body: []byte(`[{"id":"XX`), + }, + } + + for i, tt := range tests { + mAPI := &httpMembersAPI{client: tt} + m, err := mAPI.Leader(context.Background()) + if err == nil { + t.Errorf("#%d: err = nil, want not nil", i) + } + if m != nil { + t.Errorf("member slice = %v, want nil", m) + } + } +} From a46ffc60e50abde706cd6cff80be24606d704b20 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Mon, 21 Dec 2015 17:47:22 +0900 Subject: [PATCH 2/2] client: add a mechanism for various endpoint selection mode Current etcd client library chooses a default destination node from every member of a cluster in a random manner. However, requests of write and read (for consistent results) need to be forwarded to the leader node as the nature of Raft algorithm. If the chosen node is a follower, additional network traffic will be caused by the forwarding from follower to leader. Mainly for reducing the forward traffic, this commit adds a new mechanism for various endpoint selection mode to the client library which can be configured with client.Config.SelectionMode. Currently, two modes are provided: - EndpointSelectionRandom: default, same to existing behavior (pick a node in a random manner) - EndpointSelectionPrioritizeLeader: prioritize leader, for the above purpose I evaluated the effectiveness of the EndpointSelectionPrioritizeLeader with 4 t1.micro instances of AWS (3 nodes for etcd cluster and 1 node for etcd client). Client executes this simple benchmark (https://github.com/mitake/etcd-things/tree/master/prioritize-leader-bench), just writes 10000 keys. When SelectionMode == EndpointSelectionRandom (default), the benchmark needed 1 min and 32.102 sec to finish. When SelectionMode == EndpointSelectionPrioritizeLeader, the benchmark needed 1 min 4.760 sec. --- client/client.go | 56 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/client/client.go b/client/client.go index ece4cc0cd..eef77347e 100644 --- a/client/client.go +++ b/client/client.go @@ -34,6 +34,7 @@ var ( ErrNoEndpoints = errors.New("client: no endpoints available") ErrTooManyRedirects = errors.New("client: too many redirects") ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured") + ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available") errTooManyRedirectChecks = errors.New("client: too many redirect checks") ) @@ -48,6 +49,19 @@ var DefaultTransport CancelableTransport = &http.Transport{ TLSHandshakeTimeout: 10 * time.Second, } +type EndpointSelectionMode int + +const ( + // EndpointSelectionRandom is to pick an endpoint in a random manner. + EndpointSelectionRandom EndpointSelectionMode = iota + + // EndpointSelectionPrioritizeLeader is to prioritize leader for reducing needless + // forward between follower and leader. + // + // This mode should be used with Client.AutoSync(). + EndpointSelectionPrioritizeLeader +) + type Config struct { // Endpoints defines a set of URLs (schemes, hosts and ports only) // that can be used to communicate with a logical etcd cluster. For @@ -104,6 +118,9 @@ type Config struct { // // A HeaderTimeoutPerRequest of zero means no timeout. HeaderTimeoutPerRequest time.Duration + + // SelectionMode specifies a way of selecting destination endpoint. + SelectionMode EndpointSelectionMode } func (cfg *Config) transport() CancelableTransport { @@ -169,6 +186,7 @@ func New(cfg Config) (Client, error) { c := &httpClusterClient{ clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest), rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))), + selectionMode: cfg.SelectionMode, } if cfg.Username != "" { c.credentials = &credentials{ @@ -216,7 +234,18 @@ type httpClusterClient struct { pinned int credentials *credentials sync.RWMutex - rand *rand.Rand + rand *rand.Rand + selectionMode EndpointSelectionMode +} + +func (c *httpClusterClient) getLeaderEndpoint() (string, error) { + mAPI := NewMembersAPI(c) + leader, err := mAPI.Leader(context.Background()) + if err != nil { + return "", err + } + + return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs? } func (c *httpClusterClient) reset(eps []string) error { @@ -233,9 +262,28 @@ func (c *httpClusterClient) reset(eps []string) error { neps[i] = *u } - c.endpoints = shuffleEndpoints(c.rand, neps) - // TODO: pin old endpoint if possible, and rebalance when new endpoint appears - c.pinned = 0 + switch c.selectionMode { + case EndpointSelectionRandom: + c.endpoints = shuffleEndpoints(c.rand, neps) + c.pinned = 0 + case EndpointSelectionPrioritizeLeader: + c.endpoints = neps + lep, err := c.getLeaderEndpoint() + if err != nil { + return ErrNoLeaderEndpoint + } + + for i := range c.endpoints { + if c.endpoints[i].String() == lep { + c.pinned = i + break + } + } + // If endpoints doesn't have the lu, just keep c.pinned = 0. + // Forwarding between follower and leader would be required but it works. + default: + return errors.New(fmt.Sprintf("invalid endpoint selection mode: %d", c.selectionMode)) + } return nil }