mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #4030 from mitake/endpoint-selection
client: add a mechanism for various endpoint selection mode
This commit is contained in:
commit
dac56faf61
@ -34,6 +34,7 @@ var (
|
||||
ErrNoEndpoints = errors.New("client: no endpoints available")
|
||||
ErrTooManyRedirects = errors.New("client: too many redirects")
|
||||
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
|
||||
ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
|
||||
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
|
||||
)
|
||||
|
||||
@ -48,6 +49,19 @@ var DefaultTransport CancelableTransport = &http.Transport{
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
type EndpointSelectionMode int
|
||||
|
||||
const (
|
||||
// EndpointSelectionRandom is to pick an endpoint in a random manner.
|
||||
EndpointSelectionRandom EndpointSelectionMode = iota
|
||||
|
||||
// EndpointSelectionPrioritizeLeader is to prioritize leader for reducing needless
|
||||
// forward between follower and leader.
|
||||
//
|
||||
// This mode should be used with Client.AutoSync().
|
||||
EndpointSelectionPrioritizeLeader
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// Endpoints defines a set of URLs (schemes, hosts and ports only)
|
||||
// that can be used to communicate with a logical etcd cluster. For
|
||||
@ -104,6 +118,9 @@ type Config struct {
|
||||
//
|
||||
// A HeaderTimeoutPerRequest of zero means no timeout.
|
||||
HeaderTimeoutPerRequest time.Duration
|
||||
|
||||
// SelectionMode specifies a way of selecting destination endpoint.
|
||||
SelectionMode EndpointSelectionMode
|
||||
}
|
||||
|
||||
func (cfg *Config) transport() CancelableTransport {
|
||||
@ -174,6 +191,7 @@ func New(cfg Config) (Client, error) {
|
||||
c := &httpClusterClient{
|
||||
clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest),
|
||||
rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
|
||||
selectionMode: cfg.SelectionMode,
|
||||
}
|
||||
if cfg.Username != "" {
|
||||
c.credentials = &credentials{
|
||||
@ -221,7 +239,18 @@ type httpClusterClient struct {
|
||||
pinned int
|
||||
credentials *credentials
|
||||
sync.RWMutex
|
||||
rand *rand.Rand
|
||||
rand *rand.Rand
|
||||
selectionMode EndpointSelectionMode
|
||||
}
|
||||
|
||||
func (c *httpClusterClient) getLeaderEndpoint() (string, error) {
|
||||
mAPI := NewMembersAPI(c)
|
||||
leader, err := mAPI.Leader(context.Background())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs?
|
||||
}
|
||||
|
||||
func (c *httpClusterClient) SetEndpoints(eps []string) error {
|
||||
@ -238,9 +267,28 @@ func (c *httpClusterClient) SetEndpoints(eps []string) error {
|
||||
neps[i] = *u
|
||||
}
|
||||
|
||||
c.endpoints = shuffleEndpoints(c.rand, neps)
|
||||
// TODO: pin old endpoint if possible, and rebalance when new endpoint appears
|
||||
c.pinned = 0
|
||||
switch c.selectionMode {
|
||||
case EndpointSelectionRandom:
|
||||
c.endpoints = shuffleEndpoints(c.rand, neps)
|
||||
c.pinned = 0
|
||||
case EndpointSelectionPrioritizeLeader:
|
||||
c.endpoints = neps
|
||||
lep, err := c.getLeaderEndpoint()
|
||||
if err != nil {
|
||||
return ErrNoLeaderEndpoint
|
||||
}
|
||||
|
||||
for i := range c.endpoints {
|
||||
if c.endpoints[i].String() == lep {
|
||||
c.pinned = i
|
||||
break
|
||||
}
|
||||
}
|
||||
// If endpoints doesn't have the lu, just keep c.pinned = 0.
|
||||
// Forwarding between follower and leader would be required but it works.
|
||||
default:
|
||||
return errors.New(fmt.Sprintf("invalid endpoint selection mode: %d", c.selectionMode))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
|
||||
var (
|
||||
defaultV2MembersPrefix = "/v2/members"
|
||||
defaultLeaderSuffix = "/leader"
|
||||
)
|
||||
|
||||
type Member struct {
|
||||
@ -105,6 +106,9 @@ type MembersAPI interface {
|
||||
|
||||
// Update instructs etcd to update an existing Member in the cluster.
|
||||
Update(ctx context.Context, mID string, peerURLs []string) error
|
||||
|
||||
// Leader gets current leader of the cluster
|
||||
Leader(ctx context.Context) (*Member, error)
|
||||
}
|
||||
|
||||
type httpMembersAPI struct {
|
||||
@ -199,6 +203,25 @@ func (m *httpMembersAPI) Remove(ctx context.Context, memberID string) error {
|
||||
return assertStatusCode(resp.StatusCode, http.StatusNoContent, http.StatusGone)
|
||||
}
|
||||
|
||||
func (m *httpMembersAPI) Leader(ctx context.Context) (*Member, error) {
|
||||
req := &membersAPIActionLeader{}
|
||||
resp, body, err := m.client.Do(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := assertStatusCode(resp.StatusCode, http.StatusOK); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var leader Member
|
||||
if err := json.Unmarshal(body, &leader); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &leader, nil
|
||||
}
|
||||
|
||||
type membersAPIActionList struct{}
|
||||
|
||||
func (l *membersAPIActionList) HTTPRequest(ep url.URL) *http.Request {
|
||||
@ -255,6 +278,15 @@ func assertStatusCode(got int, want ...int) (err error) {
|
||||
return fmt.Errorf("unexpected status code %d", got)
|
||||
}
|
||||
|
||||
type membersAPIActionLeader struct{}
|
||||
|
||||
func (l *membersAPIActionLeader) HTTPRequest(ep url.URL) *http.Request {
|
||||
u := v2MembersURL(ep)
|
||||
u.Path = path.Join(u.Path, defaultLeaderSuffix)
|
||||
req, _ := http.NewRequest("GET", u.String(), nil)
|
||||
return req
|
||||
}
|
||||
|
||||
// v2MembersURL add the necessary path to the provided endpoint
|
||||
// to route requests to the default v2 members API.
|
||||
func v2MembersURL(ep url.URL) *url.URL {
|
||||
|
@ -114,6 +114,23 @@ func TestMembersAPIActionRemove(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMembersAPIActionLeader(t *testing.T) {
|
||||
ep := url.URL{Scheme: "http", Host: "example.com"}
|
||||
act := &membersAPIActionLeader{}
|
||||
|
||||
wantURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "example.com",
|
||||
Path: "/v2/members/leader",
|
||||
}
|
||||
|
||||
got := *act.HTTPRequest(ep)
|
||||
err := assertRequest(got, "GET", wantURL, http.Header{}, nil)
|
||||
if err != nil {
|
||||
t.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestAssertStatusCode(t *testing.T) {
|
||||
if err := assertStatusCode(404, 400); err == nil {
|
||||
t.Errorf("assertStatusCode failed to detect conflict in 400 vs 404")
|
||||
@ -520,3 +537,63 @@ func TestHTTPMembersAPIListError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPMembersAPILeaderSuccess(t *testing.T) {
|
||||
wantAction := &membersAPIActionLeader{}
|
||||
mAPI := &httpMembersAPI{
|
||||
client: &actionAssertingHTTPClient{
|
||||
t: t,
|
||||
act: wantAction,
|
||||
resp: http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
},
|
||||
body: []byte(`{"id":"94088180e21eb87b","name":"node2","peerURLs":["http://127.0.0.1:7002"],"clientURLs":["http://127.0.0.1:4002"]}`),
|
||||
},
|
||||
}
|
||||
|
||||
wantResponseMember := &Member{
|
||||
ID: "94088180e21eb87b",
|
||||
Name: "node2",
|
||||
PeerURLs: []string{"http://127.0.0.1:7002"},
|
||||
ClientURLs: []string{"http://127.0.0.1:4002"},
|
||||
}
|
||||
|
||||
m, err := mAPI.Leader(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want %v", err, nil)
|
||||
}
|
||||
if !reflect.DeepEqual(wantResponseMember, m) {
|
||||
t.Errorf("incorrect member: member = %v, want %v", wantResponseMember, m)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPMembersAPILeaderError(t *testing.T) {
|
||||
tests := []httpClient{
|
||||
// generic httpClient failure
|
||||
&staticHTTPClient{err: errors.New("fail!")},
|
||||
|
||||
// unrecognized HTTP status code
|
||||
&staticHTTPClient{
|
||||
resp: http.Response{StatusCode: http.StatusTeapot},
|
||||
},
|
||||
|
||||
// fail to unmarshal body on StatusOK
|
||||
&staticHTTPClient{
|
||||
resp: http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
},
|
||||
body: []byte(`[{"id":"XX`),
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
mAPI := &httpMembersAPI{client: tt}
|
||||
m, err := mAPI.Leader(context.Background())
|
||||
if err == nil {
|
||||
t.Errorf("#%d: err = nil, want not nil", i)
|
||||
}
|
||||
if m != nil {
|
||||
t.Errorf("member slice = %v, want nil", m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user