mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Change log: 1. PeerServer - estimate initial mode from its log through removedInLog variable - refactor FindCluster to return the estimation - refactor Start to call FindCluster explicitly - move raftServer start and cluster init from FindCluster to Start - remove stopNotify from PeerServer because it is not used anymore 2. Etcd - refactor Run logic to fit the specification 3. ClusterConfig - rename promoteDelay to removeDelay for better naming - add SyncClusterInterval field to ClusterConfig - commit command to set default cluster config when cluster is created - store cluster config info into key space for consistency - reload cluster config when reboot 4. add StandbyServer 5. Error - remove unused EcodePromoteError
185 lines
4.7 KiB
Go
185 lines
4.7 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strconv"
|
|
|
|
etcdErr "github.com/coreos/etcd/error"
|
|
"github.com/coreos/etcd/log"
|
|
)
|
|
|
|
// Client sends various requests using HTTP API.
|
|
// It is different from raft communication, and doesn't record anything in the log.
|
|
// The argument url is required to contain scheme and host only, and
|
|
// there is no trailing slash in it.
|
|
// Public functions return "etcd/error".Error intentionally to figure out
|
|
// etcd error code easily.
|
|
// TODO(yichengq): It is similar to go-etcd. But it could have many efforts
|
|
// to integrate the two. Leave it for further discussion.
|
|
type Client struct {
|
|
http.Client
|
|
}
|
|
|
|
func NewClient(transport http.RoundTripper) *Client {
|
|
return &Client{http.Client{Transport: transport}}
|
|
}
|
|
|
|
// CheckVersion returns true when the version check on the server returns 200.
|
|
func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
|
|
resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
|
|
if err != nil {
|
|
return false, clientError(err)
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
return resp.StatusCode == 200, nil
|
|
}
|
|
|
|
// GetVersion fetches the peer version of a cluster.
|
|
func (c *Client) GetVersion(url string) (int, *etcdErr.Error) {
|
|
resp, err := c.Get(url + "/version")
|
|
if err != nil {
|
|
return 0, clientError(err)
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return 0, clientError(err)
|
|
}
|
|
|
|
// Parse version number.
|
|
version, err := strconv.Atoi(string(body))
|
|
if err != nil {
|
|
return 0, clientError(err)
|
|
}
|
|
return version, nil
|
|
}
|
|
|
|
func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
|
|
resp, err := c.Get(url + "/v2/admin/machines")
|
|
if err != nil {
|
|
return nil, clientError(err)
|
|
}
|
|
|
|
msgs := new([]*machineMessage)
|
|
if uerr := c.parseJSONResponse(resp, msgs); uerr != nil {
|
|
return nil, uerr
|
|
}
|
|
return *msgs, nil
|
|
}
|
|
|
|
func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) {
|
|
resp, err := c.Get(url + "/v2/admin/config")
|
|
if err != nil {
|
|
return nil, clientError(err)
|
|
}
|
|
|
|
config := new(ClusterConfig)
|
|
if uerr := c.parseJSONResponse(resp, config); uerr != nil {
|
|
return nil, uerr
|
|
}
|
|
return config, nil
|
|
}
|
|
|
|
// AddMachine adds machine to the cluster.
|
|
// The first return value is the commit index of join command.
|
|
func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) {
|
|
b, _ := json.Marshal(cmd)
|
|
url = url + "/join"
|
|
|
|
log.Infof("Send Join Request to %s", url)
|
|
resp, err := c.put(url, b)
|
|
if err != nil {
|
|
return 0, clientError(err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if err := c.checkErrorResponse(resp); err != nil {
|
|
return 0, err
|
|
}
|
|
b, err = ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return 0, clientError(err)
|
|
}
|
|
index, numRead := binary.Uvarint(b)
|
|
if numRead < 0 {
|
|
return 0, clientError(fmt.Errorf("buf too small, or value too large"))
|
|
}
|
|
return index, nil
|
|
}
|
|
|
|
func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error {
|
|
defer resp.Body.Close()
|
|
|
|
if err := c.checkErrorResponse(resp); err != nil {
|
|
return err
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(val); err != nil {
|
|
log.Debugf("Error parsing join response: %v", err)
|
|
return clientError(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error {
|
|
if resp.StatusCode != http.StatusOK {
|
|
uerr := &etcdErr.Error{}
|
|
if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil {
|
|
log.Debugf("Error parsing response to etcd error: %v", err)
|
|
return clientError(err)
|
|
}
|
|
return uerr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// put sends server side PUT request.
|
|
// It always follows redirects instead of stopping according to RFC 2616.
|
|
func (c *Client) put(urlStr string, body []byte) (*http.Response, error) {
|
|
return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
|
|
}
|
|
|
|
func (c *Client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) {
|
|
var req *http.Request
|
|
|
|
for redirect := 0; redirect < 10; redirect++ {
|
|
req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if resp, err = c.Do(req); err != nil {
|
|
if resp != nil {
|
|
resp.Body.Close()
|
|
}
|
|
return
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect {
|
|
resp.Body.Close()
|
|
if urlStr = resp.Header.Get("Location"); urlStr == "" {
|
|
err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
return
|
|
}
|
|
|
|
err = errors.New("stopped after 10 redirects")
|
|
return
|
|
}
|
|
|
|
func clientError(err error) *etcdErr.Error {
|
|
return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
|
|
}
|