diff --git a/etcd.go b/etcd.go index 82bc09649..d307bf306 100644 --- a/etcd.go +++ b/etcd.go @@ -185,12 +185,15 @@ func main() { // Create peer server. ps := NewPeerServer(info.Name, dirPath, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS, registry) ps.MaxClusterSize = maxClusterSize + ps.RetryTimes = retryTimes s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r) if err := e.AllowOrigins(cors); err != nil { panic(err) } + ps.SetServer(server) + ps.ListenAndServe(snapshot) s.ListenAndServe() } diff --git a/server/join_command.go b/server/join_command.go index f338f3d39..71caaa0ce 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -19,16 +19,14 @@ type JoinCommand struct { Name string `json:"name"` RaftURL string `json:"raftURL"` EtcdURL string `json:"etcdURL"` - MaxClusterSize int `json:"maxClusterSize"` } -func NewJoinCommand(version, name, raftUrl, etcdUrl string, maxClusterSize int) *JoinCommand { +func NewJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand { return &JoinCommand{ RaftVersion: version, Name: name, RaftURL: raftUrl, EtcdURL: etcdUrl, - MaxClusterSize: maxClusterSize, } } @@ -51,7 +49,7 @@ func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { } // Check machine number in the cluster - if ps.registry.Count() == c.MaxClusterSize { + if ps.registry.Count() == ps.MaxClusterSize { log.Debug("Reject join request from ", c.Name) return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term()) } diff --git a/server/peer_server.go b/server/peer_server.go index 0c24c1521..de9316a4b 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -19,6 +19,7 @@ import ( type PeerServer struct { *raft.Server + server Server joinIndex uint64 name string url string @@ -31,6 +32,7 @@ type PeerServer struct { store *store.Store snapConf *snapshotConf MaxClusterSize int + RetryTimes int } // TODO: find a good policy to do snapshot @@ -140,6 +142,16 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { } +// Retrieves the underlying Raft server. +func (s *PeerServer) RaftServer() *raft.Server { + return s.Server +} + +// Associates the client server with the peer server. +func (s *PeerServer) SetServer(server Server) { + s.server = server +} + // Get all the current logs func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { log.Debugf("[recv] GET %s/log", s.url) @@ -223,7 +235,7 @@ func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *htt func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { log.Debugf("[recv] Get %s/etcdURL/ ", s.url) w.WriteHeader(http.StatusOK) - w.Write([]byte(argInfo.EtcdURL)) + w.Write([]byte(s.server.URL())) } // Response to the join request @@ -271,13 +283,13 @@ func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Req } func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error { - return s.dispatch(c, w, req, nameToRaftURL) + return s.dispatch(c, w, req) } func (s *PeerServer) startAsLeader() { // leader need to join self as a peer for { - _, err := s.Do(newJoinCommand(PeerVersion, s.Name(), s.url, e.url)) + _, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL())) if err == nil { break } @@ -287,7 +299,7 @@ func (s *PeerServer) startAsLeader() { func (s *PeerServer) startAsFollower(cluster []string) { // start as a follower in a existing cluster - for i := 0; i < retryTimes; i++ { + for i := 0; i < s.RetryTimes; i++ { ok := s.joinCluster(cluster) if ok { return @@ -296,12 +308,12 @@ func (s *PeerServer) startAsFollower(cluster []string) { time.Sleep(time.Second * RetryInterval) } - fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) + log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes) } // Start to listen and response raft command func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) { - infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url) + log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url) raftMux := http.NewServeMux() @@ -324,9 +336,9 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) { raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) if scheme == "http" { - fatal(server.ListenAndServe()) + log.Fatal(server.ListenAndServe()) } else { - fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) + log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) } } @@ -336,11 +348,9 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) { // version clusters. func getVersion(t *transporter, versionURL url.URL) (string, error) { resp, req, err := t.Get(versionURL.String()) - if err != nil { return "", err } - defer resp.Body.Close() t.CancelWhenTimeout(req) @@ -363,7 +373,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool { } else { if _, ok := err.(etcdErr.Error); ok { - fatal(err) + log.Fatal(err) } log.Debugf("cannot join to cluster via machine %s %s", machine, err) @@ -392,7 +402,7 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd") } - json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url)) + json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} @@ -419,7 +429,7 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s address := resp.Header.Get("Location") log.Debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url)) + json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) resp, req, err = t.Post(address, &b) @@ -472,3 +482,81 @@ func (s *PeerServer) monitorSnapshot() { } } } + +func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { + if r.State() == raft.Leader { + if response, err := r.Do(c); err != nil { + return err + } else { + if response == nil { + return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm) + } + + event, ok := response.(*store.Event) + if ok { + bytes, err := json.Marshal(event) + if err != nil { + fmt.Println(err) + } + + w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) + w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) + w.WriteHeader(http.StatusOK) + w.Write(bytes) + + return nil + } + + bytes, _ := response.([]byte) + w.WriteHeader(http.StatusOK) + w.Write(bytes) + + return nil + } + + } else { + leader := r.Leader() + // current no leader + if leader == "" { + return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) + } + url, _ := s.registry.PeerURL(leader) + + redirect(url, w, req) + + return nil + } +} + + +type errorHandler func(http.ResponseWriter, *http.Request) error + +// addCorsHeader parses the request Origin header and loops through the user +// provided allowed origins and sets the Access-Control-Allow-Origin header if +// there is a match. +func addCorsHeader(w http.ResponseWriter, r *http.Request) { + val, ok := corsList["*"] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", "*") + return + } + + requestOrigin := r.Header.Get("Origin") + val, ok = corsList[requestOrigin] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", requestOrigin) + return + } +} + +func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + addCorsHeader(w, r) + if e := fn(w, r); e != nil { + if etcdErr, ok := e.(*etcdErr.Error); ok { + debug("Return error: ", (*etcdErr).Error()) + etcdErr.Write(w) + } else { + http.Error(w, e.Error(), http.StatusInternalServerError) + } + } +} diff --git a/server/registry.go b/server/registry.go index cd7078d81..468b79179 100644 --- a/server/registry.go +++ b/server/registry.go @@ -159,7 +159,7 @@ func (r *Registry) load(name string) { } // Create node. - r.nodes[name] := &node{ + r.nodes[name] = &node{ url: m["etcd"][0], peerURL: m["raft"][0], peerVersion: m["raftVersion"][0], diff --git a/server/remove_command.go b/server/remove_command.go index 43ff17d24..5e5feab5c 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,7 +2,6 @@ package server import ( "encoding/binary" - "path" "github.com/coreos/etcd/store" "github.com/coreos/go-raft" diff --git a/server/server.go b/server/server.go index 5835e1a12..f283f627d 100644 --- a/server/server.go +++ b/server/server.go @@ -12,6 +12,7 @@ import ( type Server interface { CommitIndex() uint64 Term() uint64 + URL() string Dispatch(raft.Command, http.ResponseWriter, *http.Request) } @@ -49,12 +50,17 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI // The current Raft committed index. func (s *server) CommitIndex() uint64 { - return c.raftServer.CommitIndex() + return s.raftServer.CommitIndex() } // The current Raft term. func (s *server) Term() uint64 { - return c.raftServer.Term() + return s.raftServer.Term() +} + +// The server URL. +func (s *server) URL() string { + return s.url } func (s *server) installV1() { diff --git a/server/util.go b/server/util.go index bae347cd3..0154e22bd 100644 --- a/server/util.go +++ b/server/util.go @@ -15,3 +15,10 @@ func decodeJsonRequest(req *http.Request, data interface{}) error { return nil } +func redirect(hostname string, w http.ResponseWriter, req *http.Request) { + path := req.URL.Path + url := hostname + path + debugf("Redirect to %s", url) + http.Redirect(w, req, url, http.StatusTemporaryRedirect) +} + diff --git a/util.go b/util.go index e9b534085..c97d7f77d 100644 --- a/util.go +++ b/util.go @@ -41,57 +41,6 @@ func durationToExpireTime(strDuration string) (time.Time, error) { // HTTP Utilities //-------------------------------------- -func (r *raftServer) dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error { - if r.State() == raft.Leader { - if response, err := r.Do(c); err != nil { - return err - } else { - if response == nil { - return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm) - } - - event, ok := response.(*store.Event) - if ok { - bytes, err := json.Marshal(event) - if err != nil { - fmt.Println(err) - } - - w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) - w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) - w.WriteHeader(http.StatusOK) - w.Write(bytes) - - return nil - } - - bytes, _ := response.([]byte) - w.WriteHeader(http.StatusOK) - w.Write(bytes) - - return nil - } - - } else { - leader := r.Leader() - // current no leader - if leader == "" { - return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) - } - url, _ := toURL(leader) - - redirect(url, w, req) - - return nil - } -} - -func redirect(hostname string, w http.ResponseWriter, req *http.Request) { - path := req.URL.Path - url := hostname + path - debugf("Redirect to %s", url) - http.Redirect(w, req, url, http.StatusTemporaryRedirect) -} // sanitizeURL will cleanup a host string in the format hostname:port and // attach a schema.