From 5af8fe9a8478e3a957ab01be12800962aea51def Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Fri, 11 Jul 2014 16:36:41 -0700 Subject: [PATCH] server: use /v2/admin/machines/ http endpoint to join --- etcd/etcd.go | 35 +++++---- etcd/etcd_test.go | 18 +++-- etcd/v2_client.go | 177 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 18 deletions(-) create mode 100644 etcd/v2_client.go diff --git a/etcd/etcd.go b/etcd/etcd.go index e560860ff..02a35c190 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -57,6 +57,7 @@ type Server struct { addNodeC chan raft.Config removeNodeC chan raft.Config t *transporter + client *v2client store.Store @@ -96,6 +97,7 @@ func New(c *config.Config, id int64) *Server { addNodeC: make(chan raft.Config), removeNodeC: make(chan raft.Config), t: newTransporter(tc), + client: newClient(tc), Store: store.New(), @@ -159,24 +161,29 @@ func (s *Server) Bootstrap() { func (s *Server) Join() { log.Println("joining cluster via peers", s.config.Peers) - d, err := json.Marshal(&raft.Config{s.id, s.raftPubAddr, []byte(s.pubAddr)}) - if err != nil { - panic(err) + info := &context{ + MinVersion: store.MinVersion(), + MaxVersion: store.MaxVersion(), + ClientURL: s.pubAddr, + PeerURL: s.raftPubAddr, } - b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 1, Data: d}}}) - if err != nil { - panic(err) - } - - for seed := range s.nodes { - if err := s.t.send(seed+raftPrefix, b); err != nil { - log.Println(err) - continue + succeed := false + for i := 0; i < 5; i++ { + for seed := range s.nodes { + if err := s.client.AddMachine(seed, fmt.Sprint(s.id), info); err == nil { + succeed = true + break + } else { + log.Println(err) + } } - // todo(xiangli) WAIT for join to be committed or retry... - break + if succeed { + break + } + time.Sleep(100 * time.Millisecond) } + s.run() } diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 307047d99..b650f7f9c 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -259,15 +259,25 @@ func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptes func waitCluster(t *testing.T, es []*Server) { n := len(es) for i, e := range es { - for k := 1; k < n+1; k++ { - w, err := e.Watch(v2machineKVPrefix, true, false, uint64(k)) + var index uint64 + for k := 0; k < n; k++ { + index++ + w, err := e.Watch(v2machineKVPrefix, true, false, index) if err != nil { panic(err) } v := <-w.EventChan - ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, k-1) + // join command may appear several times due to retry + // when timeout + if k > 0 { + pw := fmt.Sprintf("%s/%d", v2machineKVPrefix, k-1) + if v.Node.Key == pw { + continue + } + } + ww := fmt.Sprintf("%s/%d", v2machineKVPrefix, k) if v.Node.Key != ww { - t.Errorf("#%d path = %v, want %v", i, v.Node.Key, w) + t.Errorf("#%d path = %v, want %v", i, v.Node.Key, ww) } } } diff --git a/etcd/v2_client.go b/etcd/v2_client.go new file mode 100644 index 000000000..2fb006d62 --- /dev/null +++ b/etcd/v2_client.go @@ -0,0 +1,177 @@ +package etcd + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "strconv" + + "github.com/coreos/etcd/config" + etcdErr "github.com/coreos/etcd/error" +) + +// v2client sends various requests using HTTP API. +// It is different from raft communication, and doesn't record anything in the log. +// The argument url is required to contain scheme and host only, and +// there is no trailing slash in it. +// Public functions return "etcd/error".Error intentionally to figure out +// etcd error code easily. +type v2client struct { + http.Client +} + +func newClient(tc *tls.Config) *v2client { + tr := new(http.Transport) + tr.TLSClientConfig = tc + return &v2client{http.Client{Transport: tr}} +} + +// CheckVersion returns true when the version check on the server returns 200. +func (c *v2client) CheckVersion(url string, version int) (bool, *etcdErr.Error) { + resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version)) + if err != nil { + return false, clientError(err) + } + + defer resp.Body.Close() + + return resp.StatusCode == 200, nil +} + +// GetVersion fetches the peer version of a cluster. +func (c *v2client) GetVersion(url string) (int, *etcdErr.Error) { + resp, err := c.Get(url + "/version") + if err != nil { + return 0, clientError(err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, clientError(err) + } + + // Parse version number. + version, err := strconv.Atoi(string(body)) + if err != nil { + return 0, clientError(err) + } + return version, nil +} + +func (c *v2client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) { + resp, err := c.Get(url + "/v2/admin/machines/") + if err != nil { + return nil, clientError(err) + } + + msgs := new([]*machineMessage) + if uerr := c.parseJSONResponse(resp, msgs); uerr != nil { + return nil, uerr + } + return *msgs, nil +} + +func (c *v2client) GetClusterConfig(url string) (*config.ClusterConfig, *etcdErr.Error) { + resp, err := c.Get(url + "/v2/admin/config") + if err != nil { + return nil, clientError(err) + } + + config := new(config.ClusterConfig) + if uerr := c.parseJSONResponse(resp, config); uerr != nil { + return nil, uerr + } + return config, nil +} + +// AddMachine adds machine to the cluster. +// The first return value is the commit index of join command. +func (c *v2client) AddMachine(url string, name string, info *context) *etcdErr.Error { + b, _ := json.Marshal(info) + url = url + "/v2/admin/machines/" + name + + log.Printf("Send Join Request to %s", url) + resp, err := c.put(url, b) + if err != nil { + return clientError(err) + } + defer resp.Body.Close() + + if err := c.checkErrorResponse(resp); err != nil { + return err + } + return nil +} + +func (c *v2client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error { + defer resp.Body.Close() + + if err := c.checkErrorResponse(resp); err != nil { + return err + } + if err := json.NewDecoder(resp.Body).Decode(val); err != nil { + log.Printf("Error parsing join response: %v", err) + return clientError(err) + } + return nil +} + +func (c *v2client) checkErrorResponse(resp *http.Response) *etcdErr.Error { + if resp.StatusCode != http.StatusOK { + uerr := &etcdErr.Error{} + if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil { + log.Printf("Error parsing response to etcd error: %v", err) + return clientError(err) + } + return uerr + } + return nil +} + +// put sends server side PUT request. +// It always follows redirects instead of stopping according to RFC 2616. +func (c *v2client) put(urlStr string, body []byte) (*http.Response, error) { + return c.doAlwaysFollowingRedirects("PUT", urlStr, body) +} + +func (c *v2client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) { + var req *http.Request + + for redirect := 0; redirect < 10; redirect++ { + req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body)) + if err != nil { + return + } + + if resp, err = c.Do(req); err != nil { + if resp != nil { + resp.Body.Close() + } + return + } + + if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect { + resp.Body.Close() + if urlStr = resp.Header.Get("Location"); urlStr == "" { + err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode)) + return + } + continue + } + return + } + + err = errors.New("stopped after 10 redirects") + return +} + +func clientError(err error) *etcdErr.Error { + return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0) +}