From 4e14604e5c016f06bad535eac2a68e236c3d6312 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 1 May 2014 14:10:59 -0700 Subject: [PATCH 1/5] refactor(server): add Client struct This is used to send request to web API. It will do this behavior a lot in standby mode, so I abstract this struct first. --- error/error.go | 7 +- server/client.go | 203 ++++++++++++++++++++++++++++++++++++++++++ server/peer_server.go | 107 +++++----------------- 3 files changed, 233 insertions(+), 84 deletions(-) create mode 100644 server/client.go diff --git a/error/error.go b/error/error.go index 39791be73..51b219c20 100644 --- a/error/error.go +++ b/error/error.go @@ -58,6 +58,9 @@ var errors = map[int]string{ EcodeInvalidActiveSize: "Invalid active size", EcodeInvalidPromoteDelay: "Standby promote delay", EcodePromoteError: "Standby promotion error", + + // client related errors + EcodeClientInternal: "Client Internal Error", } const ( @@ -92,6 +95,8 @@ const ( EcodeInvalidActiveSize = 403 EcodeInvalidPromoteDelay = 404 EcodePromoteError = 405 + + EcodeClientInternal = 500 ) type Error struct { @@ -116,7 +121,7 @@ func Message(code int) string { // Only for error interface func (e Error) Error() string { - return e.Message + return e.Message + " (" + e.Cause + ")" } func (e Error) toJsonString() string { diff --git a/server/client.go b/server/client.go new file mode 100644 index 000000000..a87b2f78a --- /dev/null +++ b/server/client.go @@ -0,0 +1,203 @@ +package server + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" +) + +// Client sends various requests using HTTP API. +// It is different from raft communication, and doesn't record anything in the log. +// Public functions return "etcd/error".Error intentionally to figure out +// etcd error code easily. +// TODO(yichengq): It is similar to go-etcd. But it could have many efforts +// to integrate the two. Leave it for further discussion. +type Client struct { + http.Client +} + +func NewClient(transport http.RoundTripper) *Client { + return &Client{http.Client{Transport: transport}} +} + +// CheckVersion checks whether the version is available. +func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) { + resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version)) + if err != nil { + return false, clientError(err) + } + + defer resp.Body.Close() + + return resp.StatusCode == 200, nil +} + +// GetVersion fetches the peer version of a cluster. +func (c *Client) GetVersion(url string) (int, *etcdErr.Error) { + resp, err := c.Get(url + "/version") + if err != nil { + return 0, clientError(err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, clientError(err) + } + + // Parse version number. + version, _ := strconv.Atoi(string(body)) + return version, nil +} + +// AddMachine adds machine to the cluster. +// The first return value is the commit index of join command. +func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) { + b, _ := json.Marshal(cmd) + url = url + "/join" + + log.Infof("Send Join Request to %s", url) + resp, err := c.put(url, b) + if err != nil { + return 0, clientError(err) + } + defer resp.Body.Close() + + if err := c.checkErrorResponse(resp); err != nil { + return 0, err + } + b, err = ioutil.ReadAll(resp.Body) + if err != nil { + return 0, clientError(err) + } + index, numRead := binary.Uvarint(b) + if numRead < 0 { + return 0, clientError(fmt.Errorf("buf too small, or value too large")) + } + return index, nil +} + +func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error { + defer resp.Body.Close() + + if err := c.checkErrorResponse(resp); err != nil { + return err + } + if err := json.NewDecoder(resp.Body).Decode(val); err != nil { + log.Debugf("Error parsing join response: %v", err) + return clientError(err) + } + return nil +} + +func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error { + if resp.StatusCode != http.StatusOK { + uerr := &etcdErr.Error{} + if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil { + log.Debugf("Error parsing response to etcd error: %v", err) + return clientError(err) + } + return uerr + } + return nil +} + +// put sends server side PUT request. +// It always follows redirects instead of stopping according to RFC 2616. +func (c *Client) put(urlStr string, body []byte) (*http.Response, error) { + req, err := http.NewRequest("PUT", urlStr, bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + return c.doAlwaysFollowingRedirects(req, body) +} + +// doAlwaysFollowingRedirects provides similar functionality as standard one, +// but it does redirect with the same method for PUT or POST requests. +// Part of the code is borrowed from pkg/net/http/client.go. +func (c *Client) doAlwaysFollowingRedirects(ireq *http.Request, body []byte) (resp *http.Response, err error) { + var base *url.URL + redirectChecker := c.CheckRedirect + if redirectChecker == nil { + redirectChecker = defaultCheckRedirect + } + var via []*http.Request + + req := ireq + urlStr := "" // next relative or absolute URL to fetch (after first request) + for redirect := 0; ; redirect++ { + if redirect != 0 { + req, err = http.NewRequest(ireq.Method, urlStr, bytes.NewBuffer(body)) + if err != nil { + break + } + req.URL = base.ResolveReference(req.URL) + if len(via) > 0 { + // Add the Referer header. + lastReq := via[len(via)-1] + if lastReq.URL.Scheme != "https" { + req.Header.Set("Referer", lastReq.URL.String()) + } + + err = redirectChecker(req, via) + if err != nil { + break + } + } + } + + urlStr = req.URL.String() + // It uses exported Do method here. + // It is more elegant to use unexported send method, but that will + // introduce many redundant code. + if resp, err = c.Do(req); err != nil { + break + } + + if shouldExtraRedirectPost(resp.StatusCode) { + resp.Body.Close() + if urlStr = resp.Header.Get("Location"); urlStr == "" { + err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode)) + break + } + base = req.URL + via = append(via, req) + continue + } + return + } + + if resp != nil { + resp.Body.Close() + } + return nil, err +} + +func shouldExtraRedirectPost(statusCode int) bool { + switch statusCode { + case http.StatusMovedPermanently, http.StatusTemporaryRedirect: + return true + } + return false +} + +func defaultCheckRedirect(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return errors.New("stopped after 10 redirects") + } + return nil +} + +func clientError(err error) *etcdErr.Error { + return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0) +} diff --git a/server/peer_server.go b/server/peer_server.go index fe0891fbd..07a770232 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -1,16 +1,12 @@ package server import ( - "bytes" - "encoding/binary" "encoding/json" "fmt" - "io/ioutil" "math/rand" "net/http" "net/url" "sort" - "strconv" "strings" "sync" "time" @@ -52,6 +48,7 @@ type PeerServerConfig struct { type PeerServer struct { Config PeerServerConfig + client *Client clusterConfig *ClusterConfig raftServer raft.Server server *Server @@ -250,6 +247,11 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er } } + // TODO(yichengq): client for HTTP API usage could use transport other + // than the raft one. The transport should have longer timeout because + // it doesn't have fault tolerance of raft protocol. + s.client = NewClient(s.raftServer.Transporter().(*transporter).transport) + s.raftServer.Init() // Set NOCOW for data directory in btrfs @@ -359,24 +361,6 @@ func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error { return nil } -// getVersion fetches the peer version of a cluster. -func getVersion(t *transporter, versionURL url.URL) (int, error) { - resp, _, err := t.Get(versionURL.String()) - if err != nil { - return 0, err - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, err - } - - // Parse version number. - version, _ := strconv.Atoi(string(body)) - return version, nil -} - // Upgradable checks whether all peers in a cluster support an upgrade to the next store version. func (s *PeerServer) Upgradable() error { nextVersion := s.store.Version() + 1 @@ -386,13 +370,12 @@ func (s *PeerServer) Upgradable() error { return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err) } - t, _ := s.raftServer.Transporter().(*transporter) - checkURL := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String() - resp, _, err := t.Get(checkURL) + url := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme}).String() + ok, err := s.client.CheckVersion(url, nextVersion) if err != nil { - return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host) + return err } - if resp.StatusCode != 200 { + if !ok { return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host) } } @@ -501,12 +484,10 @@ func (s *PeerServer) joinCluster(cluster []string) bool { // Send join requests to peer. func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error { - // t must be ok - t, _ := server.Transporter().(*transporter) + u := (&url.URL{Host: peer, Scheme: scheme}).String() // Our version must match the leaders version - versionURL := url.URL{Host: peer, Scheme: scheme, Path: "/version"} - version, err := getVersion(t, versionURL) + version, err := s.client.GetVersion(u) if err != nil { return fmt.Errorf("Error during join version check: %v", err) } @@ -514,60 +495,20 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion()) } - var b bytes.Buffer - c := &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: server.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), + joinIndex, err := s.client.AddMachine(u, + &JoinCommand{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + Name: server.Name(), + RaftURL: s.Config.URL, + EtcdURL: s.server.URL(), + }) + if err != nil { + return err } - json.NewEncoder(&b).Encode(c) - joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"} - log.Infof("Send Join Request to %s", joinURL.String()) - - req, _ := http.NewRequest("PUT", joinURL.String(), &b) - resp, err := t.client.Do(req) - - for { - if err != nil { - return fmt.Errorf("Unable to join: %v", err) - } - if resp != nil { - defer resp.Body.Close() - - log.Infof("»»»» %d", resp.StatusCode) - if resp.StatusCode == http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - s.joinIndex, _ = binary.Uvarint(b) - return nil - } - if resp.StatusCode == http.StatusTemporaryRedirect { - address := resp.Header.Get("Location") - log.Debugf("Send Join Request to %s", address) - c := &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: server.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), - } - json.NewEncoder(&b).Encode(c) - resp, _, err = t.Put(address, &b) - - } else if resp.StatusCode == http.StatusBadRequest { - log.Debug("Reach max number peers in the cluster") - decoder := json.NewDecoder(resp.Body) - err := &etcdErr.Error{} - decoder.Decode(err) - return *err - } else { - return fmt.Errorf("Unable to join") - } - } - - } + s.joinIndex = joinIndex + return nil } func (s *PeerServer) Stats() []byte { From 001b1fcd46d18b6ff21f14d731f393f41501d40d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 1 May 2014 15:58:14 -0700 Subject: [PATCH 2/5] feat(join): check cluster conditions before join --- server/client.go | 26 +++++++++++++++++++++++ server/peer_server.go | 39 ++++++++++++++++++++++++++++++++-- server/peer_server_handlers.go | 4 +++- 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/server/client.go b/server/client.go index a87b2f78a..6ba18e8fd 100644 --- a/server/client.go +++ b/server/client.go @@ -60,6 +60,32 @@ func (c *Client) GetVersion(url string) (int, *etcdErr.Error) { return version, nil } +func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) { + resp, err := c.Get(url + "/v2/admin/machines") + if err != nil { + return nil, clientError(err) + } + + msgs := new([]*machineMessage) + if uerr := c.parseJSONResponse(resp, msgs); uerr != nil { + return nil, uerr + } + return *msgs, nil +} + +func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) { + resp, err := c.Get(url + "/v2/admin/config") + if err != nil { + return nil, clientError(err) + } + + config := new(ClusterConfig) + if uerr := c.parseJSONResponse(resp, config); uerr != nil { + return nil, uerr + } + return config, nil +} + // AddMachine adds machine to the cluster. // The first return value is the commit index of join command. func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) { diff --git a/server/peer_server.go b/server/peer_server.go index 07a770232..c256e1ec2 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -489,10 +489,44 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) // Our version must match the leaders version version, err := s.client.GetVersion(u) if err != nil { - return fmt.Errorf("Error during join version check: %v", err) + log.Debugf("fail checking join version") + return err } if version < store.MinVersion() || version > store.MaxVersion() { - return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion()) + log.Infof("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) + return fmt.Errorf("incompatible version") + } + + // Fetch current peer list + machines, err := s.client.GetMachines(u) + if err != nil { + log.Debugf("fail getting machine messages") + return err + } + exist := false + for _, machine := range machines { + if machine.Name == server.Name() { + exist = true + // TODO(yichengq): cannot set join index for it. + // Need discussion about the best way to do it. + // + // if machine.PeerURL == s.Config.URL { + // log.Infof("has joined the cluster(%v) before", machines) + // return nil + // } + break + } + } + + // Fetch cluster config to see whether exists some place. + clusterConfig, err := s.client.GetClusterConfig(u) + if err != nil { + log.Debugf("fail getting cluster config") + return err + } + if !exist && clusterConfig.ActiveSize <= len(machines) { + log.Infof("stop joining because the cluster is full with %d nodes", len(machines)) + return fmt.Errorf("out of quota") } joinIndex, err := s.client.AddMachine(u, @@ -504,6 +538,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) EtcdURL: s.server.URL(), }) if err != nil { + log.Debugf("fail on join request") return err } diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index a8219c7a8..669d7ebf4 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -224,7 +224,9 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) { machines := make([]*machineMessage, 0) for _, name := range ps.registry.Names() { - machines = append(machines, ps.getMachineMessage(name)) + if msg := ps.getMachineMessage(name); msg != nil { + machines = append(machines, msg) + } } json.NewEncoder(w).Encode(&machines) } From c9ce14c85795e4c82c2e26efbcdeae05f30d5441 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 7 May 2014 12:18:32 -0700 Subject: [PATCH 3/5] chore(peer_server): set client transporter separately It also moves the hack on timeout from raft transporter to client transporter. --- etcd/etcd.go | 36 +++++++++++++++++++++++++++++------- server/peer_server.go | 8 ++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index 213bfec54..cd79c2a56 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -28,6 +28,7 @@ import ( goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" golog "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log" "github.com/coreos/etcd/third_party/github.com/goraft/raft" + httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient" "github.com/coreos/etcd/config" ehttp "github.com/coreos/etcd/http" @@ -37,6 +38,8 @@ import ( "github.com/coreos/etcd/store" ) +const extraTimeout = 1000 + type Etcd struct { Config *config.Config // etcd config Store store.Store // data store @@ -134,14 +137,33 @@ func (e *Etcd) Run() { // Calculate all of our timeouts heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond - // TODO(yichengq): constant 1000 is a hack here. The reason to use this - // is to ensure etcd instances could start successfully at the same time. - // Current problem for the failure comes from the lag between join command + dialTimeout := (3 * heartbeatInterval) + electionTimeout + responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout + + // TODO(yichengq): constant extraTimeout is a hack here. + // Current problem is that there is big lag between join command // execution and join success. // Fix it later. It should be removed when proper method is found and - // enough tests are provided. - dialTimeout := (3 * heartbeatInterval) + electionTimeout + 1000 - responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout + 1000 + // enough tests are provided. It is expected to be calculated from + // heartbeatInterval and electionTimeout only. + clientTransporter := &httpclient.Transport{ + ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout, + // This is a workaround for Transport.CancelRequest doesn't work on + // HTTPS connections blocked. The patch for it is in progress, + // and would be available in Go1.3 + // More: https://codereview.appspot.com/69280043/ + ConnectTimeout: dialTimeout + extraTimeout, + RequestTimeout: responseHeaderTimeout + dialTimeout + 2*extraTimeout, + } + if e.Config.PeerTLSInfo().Scheme() == "https" { + clientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig() + if err != nil { + log.Fatal("client TLS error: ", err) + } + clientTransporter.TLSClientConfig = clientTLSConfig + clientTransporter.DisableCompression = true + } + client := server.NewClient(clientTransporter) // Create peer server psConfig := server.PeerServerConfig{ @@ -152,7 +174,7 @@ func (e *Etcd) Run() { RetryTimes: e.Config.MaxRetryAttempts, RetryInterval: e.Config.RetryInterval, } - e.PeerServer = server.NewPeerServer(psConfig, e.Registry, e.Store, &mb, followersStats, serverStats) + e.PeerServer = server.NewPeerServer(psConfig, client, e.Registry, e.Store, &mb, followersStats, serverStats) // Create raft transporter and server raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout) diff --git a/server/peer_server.go b/server/peer_server.go index c256e1ec2..e37471b11 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -80,9 +80,10 @@ type snapshotConf struct { snapshotThr uint64 } -func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { +func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { s := &PeerServer{ Config: psConfig, + client: client, clusterConfig: NewClusterConfig(), registry: registry, store: store, @@ -247,11 +248,6 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er } } - // TODO(yichengq): client for HTTP API usage could use transport other - // than the raft one. The transport should have longer timeout because - // it doesn't have fault tolerance of raft protocol. - s.client = NewClient(s.raftServer.Transporter().(*transporter).transport) - s.raftServer.Init() // Set NOCOW for data directory in btrfs From ae81f843f10574d2cc16ff266b4905a02e91a99f Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 7 May 2014 16:09:08 -0700 Subject: [PATCH 4/5] refactor(client): remove useless logic in redirection --- server/client.go | 80 +++++++++--------------------------------------- 1 file changed, 15 insertions(+), 65 deletions(-) diff --git a/server/client.go b/server/client.go index 6ba18e8fd..fea52ac76 100644 --- a/server/client.go +++ b/server/client.go @@ -8,7 +8,6 @@ import ( "fmt" "io/ioutil" "net/http" - "net/url" "strconv" etcdErr "github.com/coreos/etcd/error" @@ -141,87 +140,38 @@ func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error { // put sends server side PUT request. // It always follows redirects instead of stopping according to RFC 2616. func (c *Client) put(urlStr string, body []byte) (*http.Response, error) { - req, err := http.NewRequest("PUT", urlStr, bytes.NewBuffer(body)) - if err != nil { - return nil, err - } - return c.doAlwaysFollowingRedirects(req, body) + return c.doAlwaysFollowingRedirects("PUT", urlStr, body) } -// doAlwaysFollowingRedirects provides similar functionality as standard one, -// but it does redirect with the same method for PUT or POST requests. -// Part of the code is borrowed from pkg/net/http/client.go. -func (c *Client) doAlwaysFollowingRedirects(ireq *http.Request, body []byte) (resp *http.Response, err error) { - var base *url.URL - redirectChecker := c.CheckRedirect - if redirectChecker == nil { - redirectChecker = defaultCheckRedirect - } - var via []*http.Request +func (c *Client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) { + var req *http.Request - req := ireq - urlStr := "" // next relative or absolute URL to fetch (after first request) - for redirect := 0; ; redirect++ { - if redirect != 0 { - req, err = http.NewRequest(ireq.Method, urlStr, bytes.NewBuffer(body)) - if err != nil { - break - } - req.URL = base.ResolveReference(req.URL) - if len(via) > 0 { - // Add the Referer header. - lastReq := via[len(via)-1] - if lastReq.URL.Scheme != "https" { - req.Header.Set("Referer", lastReq.URL.String()) - } - - err = redirectChecker(req, via) - if err != nil { - break - } - } + for redirect := 0; redirect < 10; redirect++ { + req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body)) + if err != nil { + return } - urlStr = req.URL.String() - // It uses exported Do method here. - // It is more elegant to use unexported send method, but that will - // introduce many redundant code. if resp, err = c.Do(req); err != nil { - break + if resp != nil { + resp.Body.Close() + } + return } - if shouldExtraRedirectPost(resp.StatusCode) { + if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect { resp.Body.Close() if urlStr = resp.Header.Get("Location"); urlStr == "" { err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode)) - break + return } - base = req.URL - via = append(via, req) continue } return } - if resp != nil { - resp.Body.Close() - } - return nil, err -} - -func shouldExtraRedirectPost(statusCode int) bool { - switch statusCode { - case http.StatusMovedPermanently, http.StatusTemporaryRedirect: - return true - } - return false -} - -func defaultCheckRedirect(req *http.Request, via []*http.Request) error { - if len(via) >= 10 { - return errors.New("stopped after 10 redirects") - } - return nil + err = errors.New("stopped after 10 redirects") + return } func clientError(err error) *etcdErr.Error { From e960a0e03c90f8726dc8088a6d5b58cf0c3821e0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 8 May 2014 13:15:10 -0700 Subject: [PATCH 5/5] chore(client): minor changes based on comments The changes are made on error handling, comments and constant. --- etcd/etcd.go | 14 +++++++------- server/client.go | 7 ++++++- server/peer_server.go | 25 ++++++------------------- 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/etcd/etcd.go b/etcd/etcd.go index cd79c2a56..0ee505fc1 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -38,7 +38,13 @@ import ( "github.com/coreos/etcd/store" ) -const extraTimeout = 1000 +// TODO(yichengq): constant extraTimeout is a hack. +// Current problem is that there is big lag between join command +// execution and join success. +// Fix it later. It should be removed when proper method is found and +// enough tests are provided. It is expected to be calculated from +// heartbeatInterval and electionTimeout only. +const extraTimeout = time.Duration(1000) * time.Millisecond type Etcd struct { Config *config.Config // etcd config @@ -140,12 +146,6 @@ func (e *Etcd) Run() { dialTimeout := (3 * heartbeatInterval) + electionTimeout responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout - // TODO(yichengq): constant extraTimeout is a hack here. - // Current problem is that there is big lag between join command - // execution and join success. - // Fix it later. It should be removed when proper method is found and - // enough tests are provided. It is expected to be calculated from - // heartbeatInterval and electionTimeout only. clientTransporter := &httpclient.Transport{ ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout, // This is a workaround for Transport.CancelRequest doesn't work on diff --git a/server/client.go b/server/client.go index fea52ac76..6690e319d 100644 --- a/server/client.go +++ b/server/client.go @@ -16,6 +16,8 @@ import ( // Client sends various requests using HTTP API. // It is different from raft communication, and doesn't record anything in the log. +// The argument url is required to contain scheme and host only, and +// there is no trailing slash in it. // Public functions return "etcd/error".Error intentionally to figure out // etcd error code easily. // TODO(yichengq): It is similar to go-etcd. But it could have many efforts @@ -55,7 +57,10 @@ func (c *Client) GetVersion(url string) (int, *etcdErr.Error) { } // Parse version number. - version, _ := strconv.Atoi(string(body)) + version, err := strconv.Atoi(string(body)) + if err != nil { + return 0, clientError(err) + } return version, nil } diff --git a/server/peer_server.go b/server/peer_server.go index e37471b11..2c9fc6321 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -485,31 +485,21 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) // Our version must match the leaders version version, err := s.client.GetVersion(u) if err != nil { - log.Debugf("fail checking join version") - return err + return fmt.Errorf("fail checking join version: %v", err) } if version < store.MinVersion() || version > store.MaxVersion() { - log.Infof("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) - return fmt.Errorf("incompatible version") + return fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) } // Fetch current peer list machines, err := s.client.GetMachines(u) if err != nil { - log.Debugf("fail getting machine messages") - return err + return fmt.Errorf("fail getting machine messages: %v", err) } exist := false for _, machine := range machines { if machine.Name == server.Name() { exist = true - // TODO(yichengq): cannot set join index for it. - // Need discussion about the best way to do it. - // - // if machine.PeerURL == s.Config.URL { - // log.Infof("has joined the cluster(%v) before", machines) - // return nil - // } break } } @@ -517,12 +507,10 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) // Fetch cluster config to see whether exists some place. clusterConfig, err := s.client.GetClusterConfig(u) if err != nil { - log.Debugf("fail getting cluster config") - return err + return fmt.Errorf("fail getting cluster config: %v", err) } if !exist && clusterConfig.ActiveSize <= len(machines) { - log.Infof("stop joining because the cluster is full with %d nodes", len(machines)) - return fmt.Errorf("out of quota") + return fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines)) } joinIndex, err := s.client.AddMachine(u, @@ -534,8 +522,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) EtcdURL: s.server.URL(), }) if err != nil { - log.Debugf("fail on join request") - return err + return fmt.Errorf("fail on join request: %v", err) } s.joinIndex = joinIndex