diff --git a/etcd.go b/etcd.go index a224fa154..05720fb35 100644 --- a/etcd.go +++ b/etcd.go @@ -177,7 +177,7 @@ func main() { ps.MaxClusterSize = maxClusterSize ps.RetryTimes = retryTimes - s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, ps.Server, registry, store) + s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, ps, registry, store) if err := s.AllowOrigins(cors); err != nil { panic(err) } diff --git a/etcd_test.go b/etcd_test.go index 6ba6978ee..d868be896 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -41,7 +41,7 @@ func TestSingleNode(t *testing.T) { if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 { if err != nil { - t.Fatal(err) + t.Fatal("Set 1: ", err) } t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL) @@ -53,7 +53,7 @@ func TestSingleNode(t *testing.T) { if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 { if err != nil { - t.Fatal(err) + t.Fatal("Set 2: ", err) } t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) } @@ -295,7 +295,7 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) { result, err := c.Set("foo", "bar", 0) if err != nil { - panic(err) + t.Fatalf("Recovery error: %s", err) } if result.Index != 18 { diff --git a/server/peer_server.go b/server/peer_server.go index ba698b553..d016717b1 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -116,7 +116,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { } else { // Rejoin the previous cluster - cluster = s.registry.PeerURLs() + cluster = s.registry.PeerURLs(s.Leader(), s.name) for i := 0; i < len(cluster); i++ { u, err := url.Parse(cluster[i]) if err != nil { diff --git a/server/registry.go b/server/registry.go index 25ff936b7..8a62dff40 100644 --- a/server/registry.go +++ b/server/registry.go @@ -4,8 +4,10 @@ import ( "fmt" "net/url" "path" + "strings" "sync" + "github.com/coreos/etcd/log" "github.com/coreos/etcd/store" ) @@ -84,28 +86,34 @@ func (r *Registry) url(name string) (string, bool) { } // Retrieves the URLs for all nodes. -func (r *Registry) URLs() []string { +func (r *Registry) URLs(leaderName, selfName string) []string { r.Lock() defer r.Unlock() - // Retrieve a list of all nodes. - e, err := r.store.Get(RegistryKey, false, false, 0, 0) - if err != nil { - return make([]string, 0) + // Build list including the leader and self. + urls := make([]string, 0) + if url, _ := r.url(leaderName); len(url) > 0 { + urls = append(urls, url) + } + if url, _ := r.url(selfName); len(url) > 0 { + urls = append(urls, url) } - // Lookup the URL for each one. - urls := make([]string, 0) - for _, pair := range e.KVPairs { - if url, ok := r.url(pair.Key); ok { - urls = append(urls, url) + // Retrieve a list of all nodes. + if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { + // Lookup the URL for each one. + for _, pair := range e.KVPairs { + if url, _ := r.url(pair.Key); len(url) > 0 { + urls = append(urls, url) + } } } + log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) + return urls } - // Retrieves the peer URL for a given node by name. func (r *Registry) PeerURL(name string) (string, bool) { r.Lock() @@ -126,24 +134,31 @@ func (r *Registry) peerURL(name string) (string, bool) { } // Retrieves the peer URLs for all nodes. -func (r *Registry) PeerURLs() []string { +func (r *Registry) PeerURLs(leaderName, selfName string) []string { r.Lock() defer r.Unlock() - // Retrieve a list of all nodes. - e, err := r.store.Get(RegistryKey, false, false, 0, 0) - if err != nil { - return make([]string, 0) + // Build list including the leader and self. + urls := make([]string, 0) + if url, _ := r.peerURL(leaderName); len(url) > 0 { + urls = append(urls, url) + } + if url, _ := r.peerURL(selfName); len(url) > 0 { + urls = append(urls, url) } - // Lookup the URL for each one. - urls := make([]string, 0) - for _, pair := range e.KVPairs { - if url, ok := r.peerURL(pair.Key); ok { - urls = append(urls, url) + // Retrieve a list of all nodes. + if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { + // Lookup the URL for each one. + for _, pair := range e.KVPairs { + if url, _ := r.peerURL(pair.Key); len(url) > 0 { + urls = append(urls, url) + } } } + log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) + return urls } diff --git a/server/server.go b/server/server.go index 586502378..f1adcbf86 100644 --- a/server/server.go +++ b/server/server.go @@ -18,9 +18,9 @@ import ( // This is the default implementation of the Server interface. type Server struct { http.Server - raftServer *raft.Server - registry *Registry - store *store.Store + peerServer *PeerServer + registry *Registry + store *store.Store name string url string tlsConf *TLSConfig @@ -29,7 +29,7 @@ type Server struct { } // Creates a new Server. -func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raft.Server, registry *Registry, store *store.Store) *Server { +func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store *store.Store) *Server { s := &Server{ Server: http.Server{ Handler: mux.NewRouter(), @@ -41,7 +41,7 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI url: urlStr, tlsConf: tlsConf, tlsInfo: tlsInfo, - raftServer: raftServer, + peerServer: peerServer, } // Install the routes for each version of the API. @@ -52,12 +52,12 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI // The current Raft committed index. func (s *Server) CommitIndex() uint64 { - return s.raftServer.CommitIndex() + return s.peerServer.CommitIndex() } // The current Raft term. func (s *Server) Term() uint64 { - return s.raftServer.Term() + return s.peerServer.Term() } // The server URL. @@ -74,19 +74,21 @@ func (s *Server) installV1() { s.handleFuncV1("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET") s.handleFuncV1("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT") s.handleFuncV1("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE") - s.handleFuncV1("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST") + s.handleFunc("/v1/leader", s.GetLeaderHandler).Methods("GET") + s.handleFunc("/v1/machines", s.GetMachinesHandler).Methods("GET") + s.handleFunc("/v1/stats", s.GetStatsHandler).Methods("GET") } // Adds a v1 server handler to the router. func (s *Server) handleFuncV1(path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route { - return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request, s *Server) error { + return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request) error { return f(w, req, s) }) } // Adds a server handler to the router. -func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request, *Server) error) *mux.Route { +func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route { r := s.Handler.(*mux.Router) // Wrap the standard HandleFunc interface to pass in the server reference. @@ -102,7 +104,7 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque } // Execute handler function and return error if necessary. - if err := f(w, req, s); err != nil { + if err := f(w, req); err != nil { if etcdErr, ok := err.(*etcdErr.Error); ok { log.Debug("Return error: ", (*etcdErr).Error()) etcdErr.Write(w) @@ -125,8 +127,8 @@ func (s *Server) ListenAndServe() { } func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { - if s.raftServer.State() == raft.Leader { - event, err := s.raftServer.Do(c) + if s.peerServer.State() == raft.Leader { + event, err := s.peerServer.Do(c) if err != nil { return err } @@ -143,7 +145,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque return nil } else { - leader := s.raftServer.Leader() + leader := s.peerServer.Leader() // No leader available. if leader == "" { @@ -178,3 +180,51 @@ func (s *Server) AllowOrigins(origins string) error { func (s *Server) OriginAllowed(origin string) bool { return s.corsOrigins["*"] || s.corsOrigins[origin] } + +// Handler to return the current leader's raft address +func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error { + leader := s.peerServer.Leader() + if leader == "" { + return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm) + } + w.WriteHeader(http.StatusOK) + url, _ := s.registry.PeerURL(leader) + w.Write([]byte(url)) + return nil +} + +// Handler to return all the known machines in the current cluster. +func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error { + machines := s.registry.URLs(s.peerServer.Leader(), s.name) + w.WriteHeader(http.StatusOK) + w.Write([]byte(strings.Join(machines, ", "))) + return nil +} + +// Retrieves stats on the Raft server. +func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error { + w.Write(s.peerServer.Stats()) + return nil +} + +// Retrieves stats on the leader. +func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error { + if s.peerServer.State() == raft.Leader { + w.Write(s.peerServer.PeerStats()) + return nil + } + + leader := s.peerServer.Leader() + if leader == "" { + return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) + } + hostname, _ := s.registry.URL(leader) + redirect(hostname, w, req) + return nil +} + +// Retrieves stats on the leader. +func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error { + w.Write(s.store.JsonStats()) + return nil +} diff --git a/server/v2/handlers.go b/server/v2/handlers.go index 7fbd7e1d7..d49542005 100644 --- a/server/v2/handlers.go +++ b/server/v2/handlers.go @@ -202,33 +202,6 @@ func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req * // still dispatch to the leader //-------------------------------------- -// Handler to return the current leader's raft address -func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { - r := e.raftServer - - leader := r.Leader() - - if leader != "" { - w.WriteHeader(http.StatusOK) - raftURL, _ := nameToRaftURL(leader) - w.Write([]byte(raftURL)) - - return nil - } else { - return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm) - } -} - -// Handler to return all the known machines in the current cluster -func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { - machines := e.raftServer.getMachines(nameToEtcdURL) - - w.WriteHeader(http.StatusOK) - w.Write([]byte(strings.Join(machines, ", "))) - - return nil -} - // Handler to return the current version of etcd func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) @@ -237,35 +210,6 @@ func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request return nil } -// Handler to return the basic stats of etcd -func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { - option := req.URL.Path[len("/v1/stats/"):] - w.WriteHeader(http.StatusOK) - - r := e.raftServer - - switch option { - case "self": - w.Write(r.Stats()) - case "leader": - if r.State() == raft.Leader { - w.Write(r.PeerStats()) - } else { - leader := r.Leader() - // current no leader - if leader == "" { - return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) - } - hostname, _ := nameToEtcdURL(leader) - redirect(hostname, w, req) - } - case "store": - w.Write(etcdStore.JsonStats()) - } - - return nil -} - func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error { var err error var event interface{} diff --git a/third_party/github.com/coreos/go-etcd/etcd/client.go b/third_party/github.com/coreos/go-etcd/etcd/client.go index abc4574fa..cbbd7ad9c 100644 --- a/third_party/github.com/coreos/go-etcd/etcd/client.go +++ b/third_party/github.com/coreos/go-etcd/etcd/client.go @@ -136,7 +136,6 @@ func (c *Client) internalSyncCluster(machines []string) bool { logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0]) c.cluster.Leader = c.cluster.Machines[0] - logger.Debug("sync.machines ", c.cluster.Machines) return true } }