diff --git a/error/error.go b/error/error.go index 39791be73..51b219c20 100644 --- a/error/error.go +++ b/error/error.go @@ -58,6 +58,9 @@ var errors = map[int]string{ EcodeInvalidActiveSize: "Invalid active size", EcodeInvalidPromoteDelay: "Standby promote delay", EcodePromoteError: "Standby promotion error", + + // client related errors + EcodeClientInternal: "Client Internal Error", } const ( @@ -92,6 +95,8 @@ const ( EcodeInvalidActiveSize = 403 EcodeInvalidPromoteDelay = 404 EcodePromoteError = 405 + + EcodeClientInternal = 500 ) type Error struct { @@ -116,7 +121,7 @@ func Message(code int) string { // Only for error interface func (e Error) Error() string { - return e.Message + return e.Message + " (" + e.Cause + ")" } func (e Error) toJsonString() string { diff --git a/etcd/etcd.go b/etcd/etcd.go index 8abb4c92c..8bd0d34a2 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -28,6 +28,7 @@ import ( goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" golog "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log" "github.com/coreos/etcd/third_party/github.com/goraft/raft" + httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient" "github.com/coreos/etcd/config" ehttp "github.com/coreos/etcd/http" @@ -37,6 +38,14 @@ import ( "github.com/coreos/etcd/store" ) +// TODO(yichengq): constant extraTimeout is a hack. +// Current problem is that there is big lag between join command +// execution and join success. +// Fix it later. It should be removed when proper method is found and +// enough tests are provided. It is expected to be calculated from +// heartbeatInterval and electionTimeout only. +const extraTimeout = time.Duration(1000) * time.Millisecond + type Etcd struct { Config *config.Config // etcd config Store store.Store // data store @@ -144,14 +153,27 @@ func (e *Etcd) Run() { // Calculate all of our timeouts heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond - // TODO(yichengq): constant 1000 is a hack here. The reason to use this - // is to ensure etcd instances could start successfully at the same time. - // Current problem for the failure comes from the lag between join command - // execution and join success. - // Fix it later. It should be removed when proper method is found and - // enough tests are provided. - dialTimeout := (3 * heartbeatInterval) + electionTimeout + 1000 - responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout + 1000 + dialTimeout := (3 * heartbeatInterval) + electionTimeout + responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout + + clientTransporter := &httpclient.Transport{ + ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout, + // This is a workaround for Transport.CancelRequest doesn't work on + // HTTPS connections blocked. The patch for it is in progress, + // and would be available in Go1.3 + // More: https://codereview.appspot.com/69280043/ + ConnectTimeout: dialTimeout + extraTimeout, + RequestTimeout: responseHeaderTimeout + dialTimeout + 2*extraTimeout, + } + if e.Config.PeerTLSInfo().Scheme() == "https" { + clientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig() + if err != nil { + log.Fatal("client TLS error: ", err) + } + clientTransporter.TLSClientConfig = clientTLSConfig + clientTransporter.DisableCompression = true + } + client := server.NewClient(clientTransporter) // Create peer server psConfig := server.PeerServerConfig{ @@ -162,7 +184,7 @@ func (e *Etcd) Run() { RetryTimes: e.Config.MaxRetryAttempts, RetryInterval: e.Config.RetryInterval, } - e.PeerServer = server.NewPeerServer(psConfig, e.Registry, e.Store, &mb, followersStats, serverStats) + e.PeerServer = server.NewPeerServer(psConfig, client, e.Registry, e.Store, &mb, followersStats, serverStats) // Create raft transporter and server raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout) diff --git a/server/client.go b/server/client.go new file mode 100644 index 000000000..6690e319d --- /dev/null +++ b/server/client.go @@ -0,0 +1,184 @@ +package server + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "strconv" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" +) + +// Client sends various requests using HTTP API. +// It is different from raft communication, and doesn't record anything in the log. +// The argument url is required to contain scheme and host only, and +// there is no trailing slash in it. +// Public functions return "etcd/error".Error intentionally to figure out +// etcd error code easily. +// TODO(yichengq): It is similar to go-etcd. But it could have many efforts +// to integrate the two. Leave it for further discussion. +type Client struct { + http.Client +} + +func NewClient(transport http.RoundTripper) *Client { + return &Client{http.Client{Transport: transport}} +} + +// CheckVersion checks whether the version is available. +func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) { + resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version)) + if err != nil { + return false, clientError(err) + } + + defer resp.Body.Close() + + return resp.StatusCode == 200, nil +} + +// GetVersion fetches the peer version of a cluster. +func (c *Client) GetVersion(url string) (int, *etcdErr.Error) { + resp, err := c.Get(url + "/version") + if err != nil { + return 0, clientError(err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, clientError(err) + } + + // Parse version number. + version, err := strconv.Atoi(string(body)) + if err != nil { + return 0, clientError(err) + } + return version, nil +} + +func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) { + resp, err := c.Get(url + "/v2/admin/machines") + if err != nil { + return nil, clientError(err) + } + + msgs := new([]*machineMessage) + if uerr := c.parseJSONResponse(resp, msgs); uerr != nil { + return nil, uerr + } + return *msgs, nil +} + +func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) { + resp, err := c.Get(url + "/v2/admin/config") + if err != nil { + return nil, clientError(err) + } + + config := new(ClusterConfig) + if uerr := c.parseJSONResponse(resp, config); uerr != nil { + return nil, uerr + } + return config, nil +} + +// AddMachine adds machine to the cluster. +// The first return value is the commit index of join command. +func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) { + b, _ := json.Marshal(cmd) + url = url + "/join" + + log.Infof("Send Join Request to %s", url) + resp, err := c.put(url, b) + if err != nil { + return 0, clientError(err) + } + defer resp.Body.Close() + + if err := c.checkErrorResponse(resp); err != nil { + return 0, err + } + b, err = ioutil.ReadAll(resp.Body) + if err != nil { + return 0, clientError(err) + } + index, numRead := binary.Uvarint(b) + if numRead < 0 { + return 0, clientError(fmt.Errorf("buf too small, or value too large")) + } + return index, nil +} + +func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error { + defer resp.Body.Close() + + if err := c.checkErrorResponse(resp); err != nil { + return err + } + if err := json.NewDecoder(resp.Body).Decode(val); err != nil { + log.Debugf("Error parsing join response: %v", err) + return clientError(err) + } + return nil +} + +func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error { + if resp.StatusCode != http.StatusOK { + uerr := &etcdErr.Error{} + if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil { + log.Debugf("Error parsing response to etcd error: %v", err) + return clientError(err) + } + return uerr + } + return nil +} + +// put sends server side PUT request. +// It always follows redirects instead of stopping according to RFC 2616. +func (c *Client) put(urlStr string, body []byte) (*http.Response, error) { + return c.doAlwaysFollowingRedirects("PUT", urlStr, body) +} + +func (c *Client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) { + var req *http.Request + + for redirect := 0; redirect < 10; redirect++ { + req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body)) + if err != nil { + return + } + + if resp, err = c.Do(req); err != nil { + if resp != nil { + resp.Body.Close() + } + return + } + + if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect { + resp.Body.Close() + if urlStr = resp.Header.Get("Location"); urlStr == "" { + err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode)) + return + } + continue + } + return + } + + err = errors.New("stopped after 10 redirects") + return +} + +func clientError(err error) *etcdErr.Error { + return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0) +} diff --git a/server/peer_server.go b/server/peer_server.go index 34c90c0c6..42678b594 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -1,16 +1,12 @@ package server import ( - "bytes" - "encoding/binary" "encoding/json" "fmt" - "io/ioutil" "math/rand" "net/http" "net/url" "sort" - "strconv" "strings" "sync" "time" @@ -52,6 +48,7 @@ type PeerServerConfig struct { type PeerServer struct { Config PeerServerConfig + client *Client clusterConfig *ClusterConfig raftServer raft.Server server *Server @@ -86,9 +83,10 @@ type snapshotConf struct { snapshotThr uint64 } -func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { +func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { s := &PeerServer{ Config: psConfig, + client: client, clusterConfig: NewClusterConfig(), registry: registry, store: store, @@ -410,24 +408,6 @@ func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error { return nil } -// getVersion fetches the peer version of a cluster. -func getVersion(t *transporter, versionURL url.URL) (int, error) { - resp, _, err := t.Get(versionURL.String()) - if err != nil { - return 0, err - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, err - } - - // Parse version number. - version, _ := strconv.Atoi(string(body)) - return version, nil -} - // Upgradable checks whether all peers in a cluster support an upgrade to the next store version. func (s *PeerServer) Upgradable() error { nextVersion := s.store.Version() + 1 @@ -437,13 +417,12 @@ func (s *PeerServer) Upgradable() error { return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err) } - t, _ := s.raftServer.Transporter().(*transporter) - checkURL := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String() - resp, _, err := t.Get(checkURL) + url := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme}).String() + ok, err := s.client.CheckVersion(url, nextVersion) if err != nil { - return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host) + return err } - if resp.StatusCode != 200 { + if !ok { return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host) } } @@ -552,73 +531,53 @@ func (s *PeerServer) joinCluster(cluster []string) bool { // Send join requests to peer. func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error { - // t must be ok - t, _ := server.Transporter().(*transporter) + u := (&url.URL{Host: peer, Scheme: scheme}).String() // Our version must match the leaders version - versionURL := url.URL{Host: peer, Scheme: scheme, Path: "/version"} - version, err := getVersion(t, versionURL) + version, err := s.client.GetVersion(u) if err != nil { - return fmt.Errorf("Error during join version check: %v", err) + return fmt.Errorf("fail checking join version: %v", err) } if version < store.MinVersion() || version > store.MaxVersion() { - return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion()) + return fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) } - var b bytes.Buffer - c := &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: server.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), + // Fetch current peer list + machines, err := s.client.GetMachines(u) + if err != nil { + return fmt.Errorf("fail getting machine messages: %v", err) } - json.NewEncoder(&b).Encode(c) - - joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"} - log.Infof("Send Join Request to %s", joinURL.String()) - - req, _ := http.NewRequest("PUT", joinURL.String(), &b) - resp, err := t.client.Do(req) - - for { - if err != nil { - return fmt.Errorf("Unable to join: %v", err) + exist := false + for _, machine := range machines { + if machine.Name == server.Name() { + exist = true + break } - if resp != nil { - defer resp.Body.Close() - - log.Infof("»»»» %d", resp.StatusCode) - if resp.StatusCode == http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - s.joinIndex, _ = binary.Uvarint(b) - return nil - } - if resp.StatusCode == http.StatusTemporaryRedirect { - address := resp.Header.Get("Location") - log.Debugf("Send Join Request to %s", address) - c := &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: server.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), - } - json.NewEncoder(&b).Encode(c) - resp, _, err = t.Put(address, &b) - - } else if resp.StatusCode == http.StatusBadRequest { - log.Debug("Reach max number peers in the cluster") - decoder := json.NewDecoder(resp.Body) - err := &etcdErr.Error{} - decoder.Decode(err) - return *err - } else { - return fmt.Errorf("Unable to join") - } - } - } + + // Fetch cluster config to see whether exists some place. + clusterConfig, err := s.client.GetClusterConfig(u) + if err != nil { + return fmt.Errorf("fail getting cluster config: %v", err) + } + if !exist && clusterConfig.ActiveSize <= len(machines) { + return fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines)) + } + + joinIndex, err := s.client.AddMachine(u, + &JoinCommand{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: server.Name(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), + }) + if err != nil { + return fmt.Errorf("fail on join request: %v", err) + } + + s.joinIndex = joinIndex + return nil } func (s *PeerServer) Stats() []byte { diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index 0a04f0bde..d93f5dc36 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -225,7 +225,9 @@ func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Re machines := make([]*machineMessage, 0) leader := ps.raftServer.Leader() for _, name := range ps.registry.Names() { - machines = append(machines, ps.getMachineMessage(name, leader)) + if msg := ps.getMachineMessage(name, leader); msg != nil { + machines = append(machines, msg) + } } json.NewEncoder(w).Encode(&machines) }