diff --git a/config.go b/config.go index 9580bcaf3..d1549c7be 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,9 @@ import ( "io/ioutil" "os" "path/filepath" + + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/server" ) //-------------------------------------- @@ -30,7 +33,7 @@ func getInfo(path string) *Info { os.Remove(confPath) os.RemoveAll(snapshotPath) } else if info := readInfo(infoPath); info != nil { - infof("Found node configuration in '%s'. Ignoring flags", infoPath) + log.Infof("Found node configuration in '%s'. Ignoring flags", infoPath) return info } @@ -41,10 +44,10 @@ func getInfo(path string) *Info { content, _ := json.MarshalIndent(info, "", " ") content = []byte(string(content) + "\n") if err := ioutil.WriteFile(infoPath, content, 0644); err != nil { - fatalf("Unable to write info to file: %v", err) + log.Fatalf("Unable to write info to file: %v", err) } - infof("Wrote node configuration to '%s'", infoPath) + log.Infof("Wrote node configuration to '%s'", infoPath) return info } @@ -57,7 +60,7 @@ func readInfo(path string) *Info { if os.IsNotExist(err) { return nil } - fatal(err) + log.Fatal(err) } defer file.Close() @@ -65,19 +68,19 @@ func readInfo(path string) *Info { content, err := ioutil.ReadAll(file) if err != nil { - fatalf("Unable to read info: %v", err) + log.Fatalf("Unable to read info: %v", err) return nil } if err = json.Unmarshal(content, &info); err != nil { - fatalf("Unable to parse info: %v", err) + log.Fatalf("Unable to parse info: %v", err) return nil } return info } -func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) { +func tlsConfigFromInfo(info server.TLSInfo) (t server.TLSConfig, ok bool) { var keyFile, certFile, CAFile string var tlsCert tls.Certificate var err error @@ -101,7 +104,7 @@ func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) { tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile) if err != nil { - fatal(err) + log.Fatal(err) } t.Scheme = "https" diff --git a/etcd.go b/etcd.go index d307bf306..a224fa154 100644 --- a/etcd.go +++ b/etcd.go @@ -1,12 +1,10 @@ package main import ( - "crypto/tls" "flag" "io/ioutil" "os" "strings" - "time" "github.com/coreos/etcd/log" "github.com/coreos/etcd/server" @@ -101,18 +99,10 @@ type Info struct { RaftListenHost string `json:"raftListenHost"` EtcdListenHost string `json:"etcdListenHost"` - RaftTLS TLSInfo `json:"raftTLS"` - EtcdTLS TLSInfo `json:"etcdTLS"` + RaftTLS server.TLSInfo `json:"raftTLS"` + EtcdTLS server.TLSInfo `json:"etcdTLS"` } -//------------------------------------------------------------------------------ -// -// Variables -// -//------------------------------------------------------------------------------ - -var etcdStore *store.Store - //------------------------------------------------------------------------------ // // Functions @@ -131,7 +121,7 @@ func main() { } if veryVerbose { - verbose = true + log.Verbose = true raft.SetLogLevel(raft.Debug) } @@ -140,7 +130,7 @@ func main() { } else if machinesFile != "" { b, err := ioutil.ReadFile(machinesFile) if err != nil { - fatalf("Unable to read the given machines file: %s", err) + log.Fatalf("Unable to read the given machines file: %s", err) } cluster = strings.Split(string(b), ",") } @@ -148,17 +138,17 @@ func main() { // Check TLS arguments raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS) if !ok { - fatal("Please specify cert and key file or cert and key file and CAFile or none of the three") + log.Fatal("Please specify cert and key file or cert and key file and CAFile or none of the three") } etcdTLSConfig, ok := tlsConfigFromInfo(argInfo.EtcdTLS) if !ok { - fatal("Please specify cert and key file or cert and key file and CAFile or none of the three") + log.Fatal("Please specify cert and key file or cert and key file and CAFile or none of the three") } argInfo.Name = strings.TrimSpace(argInfo.Name) if argInfo.Name == "" { - fatal("ERROR: server name required. e.g. '-n=server_name'") + log.Fatal("ERROR: server name required. e.g. '-n=server_name'") } // Check host name arguments @@ -171,29 +161,29 @@ func main() { // Read server info from file or grab it from user. if err := os.MkdirAll(dirPath, 0744); err != nil { - fatalf("Unable to create path: %s", err) + log.Fatalf("Unable to create path: %s", err) } info := getInfo(dirPath) // Create etcd key-value store - etcdStore = store.New() + store := store.New() // Create a shared node registry. - registry := server.NewRegistry() + registry := server.NewRegistry(store) // Create peer server. - ps := NewPeerServer(info.Name, dirPath, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS, registry) + ps := server.NewPeerServer(info.Name, dirPath, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS, registry, store) ps.MaxClusterSize = maxClusterSize ps.RetryTimes = retryTimes - s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r) - if err := e.AllowOrigins(cors); err != nil { + s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, ps.Server, registry, store) + if err := s.AllowOrigins(cors); err != nil { panic(err) } - ps.SetServer(server) + ps.SetServer(s) - ps.ListenAndServe(snapshot) + ps.ListenAndServe(snapshot, cluster) s.ListenAndServe() } diff --git a/etcd_test.go b/etcd_test.go index a62119e35..71222a246 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/coreos/etcd/server" "github.com/coreos/etcd/test" "github.com/coreos/go-etcd/etcd" ) @@ -398,8 +399,8 @@ func TestKillLeader(t *testing.T) { totalTime += take avgTime := totalTime / (time.Duration)(i+1) - fmt.Println("Leader election time is ", take, "with election timeout", ElectionTimeout) - fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout) + fmt.Println("Leader election time is ", take, "with election timeout", server.ElectionTimeout) + fmt.Println("Leader election time average is", avgTime, "with election timeout", server.ElectionTimeout) etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr) } stop <- true @@ -456,7 +457,7 @@ func TestKillRandom(t *testing.T) { etcds[num].Wait() } - time.Sleep(ElectionTimeout) + time.Sleep(server.ElectionTimeout) <-leaderChan diff --git a/machines.go b/machines.go deleted file mode 100644 index 1da25ed75..000000000 --- a/machines.go +++ /dev/null @@ -1,34 +0,0 @@ -package main - -// getMachines gets the current machines in the cluster -func (r *raftServer) getMachines(toURL func(string) (string, bool)) []string { - peers := r.Peers() - - machines := make([]string, len(peers)+1) - - leader, ok := toURL(r.Leader()) - self, _ := toURL(r.Name()) - i := 1 - - if ok { - machines[0] = leader - if leader != self { - machines[1] = self - i = 2 - } - } else { - machines[0] = self - } - - // Add all peers to the slice - for peerName, _ := range peers { - if machine, ok := toURL(peerName); ok { - // do not add leader twice - if machine != leader { - machines[i] = machine - i++ - } - } - } - return machines -} diff --git a/scripts/release-version b/scripts/release-version index b1f68e4f7..c1fdaba07 100755 --- a/scripts/release-version +++ b/scripts/release-version @@ -3,6 +3,6 @@ VER=$(git describe --tags HEAD) cat < s.snapConf.writesThr { - r.TakeSnapshot() + s.TakeSnapshot() s.snapConf.lastWrites = 0 } } } func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { - if r.State() == raft.Leader { - if response, err := r.Do(c); err != nil { + if s.State() == raft.Leader { + if response, err := s.Do(c); err != nil { return err } else { if response == nil { @@ -515,7 +533,7 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R } } else { - leader := r.Leader() + leader := s.Leader() // current no leader if leader == "" { return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) @@ -528,35 +546,3 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R } } - -type errorHandler func(http.ResponseWriter, *http.Request) error - -// addCorsHeader parses the request Origin header and loops through the user -// provided allowed origins and sets the Access-Control-Allow-Origin header if -// there is a match. -func addCorsHeader(w http.ResponseWriter, r *http.Request) { - val, ok := corsList["*"] - if val && ok { - w.Header().Add("Access-Control-Allow-Origin", "*") - return - } - - requestOrigin := r.Header.Get("Origin") - val, ok = corsList[requestOrigin] - if val && ok { - w.Header().Add("Access-Control-Allow-Origin", requestOrigin) - return - } -} - -func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - addCorsHeader(w, r) - if e := fn(w, r); e != nil { - if etcdErr, ok := e.(*etcdErr.Error); ok { - debug("Return error: ", (*etcdErr).Error()) - etcdErr.Write(w) - } else { - http.Error(w, e.Error(), http.StatusInternalServerError) - } - } -} diff --git a/server/registry.go b/server/registry.go index 468b79179..25ff936b7 100644 --- a/server/registry.go +++ b/server/registry.go @@ -1,6 +1,9 @@ package server import ( + "fmt" + "net/url" + "path" "sync" "github.com/coreos/etcd/store" @@ -48,13 +51,13 @@ func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) erro defer r.Unlock() // Remove the key from the store. - _, err := s.Delete(path.Join(RegistryKey, name), false, commitIndex, term) + _, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term) return err } // Returns the number of nodes in the cluster. func (r *Registry) Count() int { - e, err := s.Get(RegistryKey, false, false, 0, 0) + e, err := r.store.Get(RegistryKey, false, false, 0, 0) if err != nil { return 0 } @@ -86,7 +89,7 @@ func (r *Registry) URLs() []string { defer r.Unlock() // Retrieve a list of all nodes. - e, err := s.Get(RegistryKey, false, false, 0, 0) + e, err := r.store.Get(RegistryKey, false, false, 0, 0) if err != nil { return make([]string, 0) } @@ -94,7 +97,9 @@ func (r *Registry) URLs() []string { // Lookup the URL for each one. urls := make([]string, 0) for _, pair := range e.KVPairs { - urls = append(urls, r.url(pair.Key)) + if url, ok := r.url(pair.Key); ok { + urls = append(urls, url) + } } return urls @@ -126,7 +131,7 @@ func (r *Registry) PeerURLs() []string { defer r.Unlock() // Retrieve a list of all nodes. - e, err := s.Get(RegistryKey, false, false, 0, 0) + e, err := r.store.Get(RegistryKey, false, false, 0, 0) if err != nil { return make([]string, 0) } @@ -134,7 +139,9 @@ func (r *Registry) PeerURLs() []string { // Lookup the URL for each one. urls := make([]string, 0) for _, pair := range e.KVPairs { - urls = append(urls, r.peerURL(pair.Key)) + if url, ok := r.peerURL(pair.Key); ok { + urls = append(urls, url) + } } return urls @@ -147,7 +154,7 @@ func (r *Registry) load(name string) { } // Retrieve from store. - e, err := etcdStore.Get(path.Join(RegistryKey, name), false, false, 0, 0) + e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0) if err != nil { return } diff --git a/server/remove_command.go b/server/remove_command.go index 5e5feab5c..51140922b 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -2,8 +2,9 @@ package server import ( "encoding/binary" + "os" - "github.com/coreos/etcd/store" + "github.com/coreos/etcd/log" "github.com/coreos/go-raft" ) @@ -23,7 +24,6 @@ func (c *RemoveCommand) CommandName() string { // Remove a server from the cluster func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) ps, _ := server.Context().(*PeerServer) // Remove node from the shared registry. @@ -50,11 +50,11 @@ func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { // start. It is sure that this node received a new remove // command and need to be removed if server.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { - debugf("server [%s] is removed", server.Name()) + log.Debugf("server [%s] is removed", server.Name()) os.Exit(0) } else { // else ignore remove - debugf("ignore previous remove command.") + log.Debugf("ignore previous remove command.") } } diff --git a/server/server.go b/server/server.go index f283f627d..586502378 100644 --- a/server/server.go +++ b/server/server.go @@ -1,25 +1,26 @@ package server import ( + "fmt" + "encoding/json" "net/http" "net/url" + "strings" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/server/v1" + "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "github.com/gorilla/mux" ) -// The Server provides an HTTP interface to the underlying store. -type Server interface { - CommitIndex() uint64 - Term() uint64 - URL() string - Dispatch(raft.Command, http.ResponseWriter, *http.Request) -} - // This is the default implementation of the Server interface. -type server struct { +type Server struct { http.Server raftServer *raft.Server + registry *Registry + store *store.Store name string url string tlsConf *TLSConfig @@ -28,14 +29,15 @@ type server struct { } // Creates a new Server. -func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raft.Server) *Server { - s := &server{ +func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raft.Server, registry *Registry, store *store.Store) *Server { + s := &Server{ Server: http.Server{ Handler: mux.NewRouter(), TLSConfig: &tlsConf.Server, Addr: listenHost, }, - name: name, + store: store, + registry: registry, url: urlStr, tlsConf: tlsConf, tlsInfo: tlsInfo, @@ -49,72 +51,117 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI } // The current Raft committed index. -func (s *server) CommitIndex() uint64 { +func (s *Server) CommitIndex() uint64 { return s.raftServer.CommitIndex() } // The current Raft term. -func (s *server) Term() uint64 { +func (s *Server) Term() uint64 { return s.raftServer.Term() } // The server URL. -func (s *server) URL() string { +func (s *Server) URL() string { return s.url } -func (s *server) installV1() { - s.handleFunc("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET") - s.handleFunc("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT") - s.handleFunc("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE") +// Returns a reference to the Store. +func (s *Server) Store() *store.Store { + return s.store +} - s.handleFunc("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST") +func (s *Server) installV1() { + s.handleFuncV1("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET") + s.handleFuncV1("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT") + s.handleFuncV1("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE") + + s.handleFuncV1("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST") +} + +// Adds a v1 server handler to the router. +func (s *Server) handleFuncV1(path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route { + return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request, s *Server) error { + return f(w, req, s) + }) } // Adds a server handler to the router. -func (s *server) handleFunc(path string, f func(http.ResponseWriter, *http.Request, Server) error) *mux.Route { +func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request, *Server) error) *mux.Route { r := s.Handler.(*mux.Router) // Wrap the standard HandleFunc interface to pass in the server reference. return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { // Log request. - debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr) + log.Debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr) // Write CORS header. if s.OriginAllowed("*") { w.Header().Add("Access-Control-Allow-Origin", "*") - } else if s.OriginAllowed(r.Header.Get("Origin")) { - w.Header().Add("Access-Control-Allow-Origin", r.Header.Get("Origin")) + } else if origin := req.Header.Get("Origin"); s.OriginAllowed(origin) { + w.Header().Add("Access-Control-Allow-Origin", origin) } // Execute handler function and return error if necessary. if err := f(w, req, s); err != nil { if etcdErr, ok := err.(*etcdErr.Error); ok { - debug("Return error: ", (*etcdErr).Error()) + log.Debug("Return error: ", (*etcdErr).Error()) etcdErr.Write(w) } else { - http.Error(w, e.Error(), http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) } } }) } // Start to listen and response etcd client command -func (s *server) ListenAndServe() { - infof("etcd server [name %s, listen on %s, advertised url %s]", s.name, s.Server.Addr, s.url) +func (s *Server) ListenAndServe() { + log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.name, s.Server.Addr, s.url) if s.tlsConf.Scheme == "http" { - fatal(s.Server.ListenAndServe()) + log.Fatal(s.Server.ListenAndServe()) } else { - fatal(s.Server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) + log.Fatal(s.Server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) + } +} + +func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { + if s.raftServer.State() == raft.Leader { + event, err := s.raftServer.Do(c) + if err != nil { + return err + } + + if event == nil { + return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm) + } + + response := event.(*store.Event).Response() + b, _ := json.Marshal(response) + w.WriteHeader(http.StatusOK) + w.Write(b) + + return nil + + } else { + leader := s.raftServer.Leader() + + // No leader available. + if leader == "" { + return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) + } + + url, _ := s.registry.PeerURL(leader) + redirect(url, w, req) + + return nil } } // Sets a comma-delimited list of origins that are allowed. -func (s *server) AllowOrigins(origins string) error { +func (s *Server) AllowOrigins(origins string) error { // Construct a lookup of all origins. m := make(map[string]bool) - for _, v := range strings.Split(cors, ",") { + for _, v := range strings.Split(origins, ",") { if v != "*" { if _, err := url.Parse(v); err != nil { return fmt.Errorf("Invalid CORS origin: %s", err) @@ -128,6 +175,6 @@ func (s *server) AllowOrigins(origins string) error { } // Determines whether the server will allow a given CORS origin. -func (s *server) OriginAllowed(origin string) { +func (s *Server) OriginAllowed(origin string) bool { return s.corsOrigins["*"] || s.corsOrigins[origin] } diff --git a/server/stats_queue.go b/server/stats_queue.go index 5927eed15..8ee689c0a 100644 --- a/server/stats_queue.go +++ b/server/stats_queue.go @@ -2,6 +2,7 @@ package server import ( "sync" + "time" ) const ( diff --git a/server/timeout.go b/server/timeout.go index 35b49b630..321e77b38 100644 --- a/server/timeout.go +++ b/server/timeout.go @@ -1,5 +1,9 @@ package server +import ( + "time" +) + const ( // The amount of time to elapse without a heartbeat before becoming a candidate. ElectionTimeout = 200 * time.Millisecond diff --git a/server/transporter.go b/server/transporter.go index 83f0f07fc..7397d4534 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -10,6 +10,7 @@ import ( "net/http" "time" + "github.com/coreos/etcd/log" "github.com/coreos/go-raft" ) @@ -29,13 +30,13 @@ var tranTimeout = ElectionTimeout type transporter struct { client *http.Client transport *http.Transport - raftServer *raftServer + peerServer *PeerServer } // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *transporter { +func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { t := transporter{} tr := &http.Transport{ @@ -50,7 +51,7 @@ func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) * t.client = &http.Client{Transport: tr} t.transport = tr - t.raftServer = raftServer + t.peerServer = peerServer return &t } @@ -69,18 +70,18 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P size := b.Len() - t.raftServer.serverStats.SendAppendReq(size) + t.peerServer.serverStats.SendAppendReq(size) - u, _ := nameToRaftURL(peer.Name) + u, _ := t.peerServer.registry.PeerURL(peer.Name) - debugf("Send LogEntries to %s ", u) + log.Debugf("Send LogEntries to %s ", u) - thisFollowerStats, ok := t.raftServer.followersStats.Followers[peer.Name] + thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name] if !ok { //this is the first time this follower has been seen thisFollowerStats = &raftFollowerStats{} thisFollowerStats.Latency.Minimum = 1 << 63 - t.raftServer.followersStats.Followers[peer.Name] = thisFollowerStats + t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats } start := time.Now() @@ -90,7 +91,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P end := time.Now() if err != nil { - debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) + log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) if ok { thisFollowerStats.Fail() } @@ -121,13 +122,13 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u, _ := nameToRaftURL(peer.Name) - debugf("Send Vote to %s", u) + u, _ := t.peerServer.registry.PeerURL(peer.Name) + log.Debugf("Send Vote to %s", u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) if err != nil { - debugf("Cannot send VoteRequest to %s : %s", u, err) + log.Debugf("Cannot send VoteRequest to %s : %s", u, err) } if resp != nil { @@ -150,14 +151,14 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u, _ := nameToRaftURL(peer.Name) - debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, + u, _ := t.peerServer.registry.PeerURL(peer.Name) + log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) if err != nil { - debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) + log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) } if resp != nil { @@ -181,14 +182,14 @@ func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raf var b bytes.Buffer json.NewEncoder(&b).Encode(req) - u, _ := nameToRaftURL(peer.Name) - debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, + u, _ := t.peerServer.registry.PeerURL(peer.Name) + log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, req.LastTerm, req.LastIndex) resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) if err != nil { - debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) + log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) } if resp != nil { diff --git a/server/util.go b/server/util.go index 0154e22bd..95d93c179 100644 --- a/server/util.go +++ b/server/util.go @@ -1,8 +1,11 @@ package server import ( + "encoding/json" "fmt" + "io" "net/http" + "github.com/coreos/etcd/log" ) @@ -18,7 +21,7 @@ func decodeJsonRequest(req *http.Request, data interface{}) error { func redirect(hostname string, w http.ResponseWriter, req *http.Request) { path := req.URL.Path url := hostname + path - debugf("Redirect to %s", url) + log.Debugf("Redirect to %s", url) http.Redirect(w, req, url, http.StatusTemporaryRedirect) } diff --git a/server/v1/delete_key_handler.go b/server/v1/delete_key_handler.go index c9d695fdc..a657e9f3d 100644 --- a/server/v1/delete_key_handler.go +++ b/server/v1/delete_key_handler.go @@ -1,15 +1,15 @@ package v1 import ( - "encoding/json" - "github.com/coreos/etcd/store" "net/http" + "github.com/coreos/etcd/store" + "github.com/gorilla/mux" ) // Removes a key from the store. func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { vars := mux.Vars(req) key := "/" + vars["key"] - command := &DeleteCommand{Key: key} - return s.Dispatch(command, w, req) + c := &store.DeleteCommand{Key: key} + return s.Dispatch(c, w, req) } diff --git a/server/v1/dispatch.go b/server/v1/dispatch.go deleted file mode 100644 index 7dd13861c..000000000 --- a/server/v1/dispatch.go +++ /dev/null @@ -1,42 +0,0 @@ -package v1 - -// Dispatch the command to leader. -func dispatchCommand(c Command, w http.ResponseWriter, req *http.Request, s *server.Server) error { - return dispatch(c, w, req, s, nameToEtcdURL) -} - -// Dispatches a command to a given URL. -func dispatch(c Command, w http.ResponseWriter, req *http.Request, s *server.Server, toURL func(name string) (string, bool)) error { - r := s.raftServer - if r.State() == raft.Leader { - event, err := r.Do(c) - if err != nil { - return err - } - - if event == nil { - return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm) - } - - event, _ := event.(*store.Event) - response := eventToResponse(event) - b, _ := json.Marshal(response) - w.WriteHeader(http.StatusOK) - w.Write(b) - - return nil - - } else { - leader := r.Leader() - - // No leader available. - if leader == "" { - return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) - } - - url, _ := toURL(leader) - redirect(url, w, req) - - return nil - } -} diff --git a/server/v1/get_key_handler.go b/server/v1/get_key_handler.go index 6469313a3..53558e142 100644 --- a/server/v1/get_key_handler.go +++ b/server/v1/get_key_handler.go @@ -2,12 +2,13 @@ package v1 import ( "encoding/json" - "github.com/coreos/etcd/store" "net/http" + + "github.com/gorilla/mux" ) // Retrieves the value for a given key. -func GetKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) error { +func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { vars := mux.Vars(req) key := "/" + vars["key"] @@ -18,9 +19,7 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) erro } // Convert event to a response and write to client. - event, _ := event.(*store.Event) - response := eventToResponse(event) - b, _ := json.Marshal(response) + b, _ := json.Marshal(event.Response()) w.WriteHeader(http.StatusOK) w.Write(b) diff --git a/server/v1/set_key_handler.go b/server/v1/set_key_handler.go index ad629e48f..03b6d7f9b 100644 --- a/server/v1/set_key_handler.go +++ b/server/v1/set_key_handler.go @@ -1,9 +1,12 @@ package v1 import ( - "encoding/json" - "github.com/coreos/etcd/store" "net/http" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" + "github.com/gorilla/mux" ) // Sets the value for a given key. @@ -16,19 +19,19 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { // Parse non-blank value. value := req.Form.Get("value") if len(value) == 0 { - return error.NewError(200, "Set", store.UndefIndex, store.UndefTerm) + return etcdErr.NewError(200, "Set", store.UndefIndex, store.UndefTerm) } // Convert time-to-live to an expiration time. - expireTime, err := durationToExpireTime(req.Form.Get("ttl")) + expireTime, err := store.TTL(req.Form.Get("ttl")) if err != nil { return etcdErr.NewError(202, "Set", store.UndefIndex, store.UndefTerm) } // If the "prevValue" is specified then test-and-set. Otherwise create a new key. - var c command.Command + var c raft.Command if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 { - c = &TestAndSetCommand{ + c = &store.TestAndSetCommand{ Key: key, Value: value, PrevValue: prevValueArr[0], @@ -36,7 +39,7 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { } } else { - c = &CreateCommand{ + c = &store.CreateCommand{ Key: key, Value: value, ExpireTime: expireTime, @@ -44,5 +47,5 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { } } - return s.Dispatch(command, w, req) + return s.Dispatch(c, w, req) } diff --git a/server/v1/v1.go b/server/v1/v1.go index 7f9f80a0e..586a08e67 100644 --- a/server/v1/v1.go +++ b/server/v1/v1.go @@ -1,50 +1,15 @@ package v1 import ( - "github.com/coreos/etcd/server" - "github.com/gorilla/mux" + "net/http" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" ) // The Server interface provides all the methods required for the v1 API. type Server interface { CommitIndex() uint64 Term() uint64 - Dispatch(http.ResponseWriter, *http.Request, Command) -} - -// Converts an event object into a response object. -func eventToResponse(event *store.Event) interface{} { - if !event.Dir { - response := &store.Response{ - Action: event.Action, - Key: event.Key, - Value: event.Value, - PrevValue: event.PrevValue, - Index: event.Index, - TTL: event.TTL, - Expiration: event.Expiration, - } - - if response.Action == store.Create || response.Action == store.Update { - response.Action = "set" - if response.PrevValue == "" { - response.NewKey = true - } - } - - return response - } else { - responses := make([]*store.Response, len(event.KVPairs)) - - for i, kv := range event.KVPairs { - responses[i] = &store.Response{ - Action: event.Action, - Key: kv.Key, - Value: kv.Value, - Dir: kv.Dir, - Index: event.Index, - } - } - return responses - } + Store() *store.Store + Dispatch(raft.Command, http.ResponseWriter, *http.Request) error } diff --git a/server/v1/watch_key_handler.go b/server/v1/watch_key_handler.go index 8ad103b8b..e8db56c30 100644 --- a/server/v1/watch_key_handler.go +++ b/server/v1/watch_key_handler.go @@ -2,8 +2,12 @@ package v1 import ( "encoding/json" - "github.com/coreos/etcd/store" "net/http" + "strconv" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/gorilla/mux" ) // Watches a given key prefix for changes. @@ -13,7 +17,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { key := "/" + vars["key"] // Create a command to watch from a given index (default 0). - sinceIndex := 0 + var sinceIndex uint64 = 0 if req.Method == "POST" { sinceIndex, err = strconv.ParseUint(string(req.FormValue("index")), 10, 64) if err != nil { @@ -28,9 +32,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { } event := <-c - event, _ := event.(*store.Event) - response := eventToResponse(event) - b, _ := json.Marshal(response) + b, _ := json.Marshal(event.Response()) w.WriteHeader(http.StatusOK) w.Write(b) diff --git a/server/v2/handlers.go b/server/v2/handlers.go index 067e5fd38..7fbd7e1d7 100644 --- a/server/v2/handlers.go +++ b/server/v2/handlers.go @@ -95,7 +95,7 @@ func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) value := req.FormValue("value") - expireTime, err := durationToExpireTime(req.FormValue("ttl")) + expireTime, err := store.TTL(req.FormValue("ttl")) if err != nil { return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) @@ -124,7 +124,7 @@ func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) value := req.Form.Get("value") - expireTime, err := durationToExpireTime(req.Form.Get("ttl")) + expireTime, err := store.TTL(req.Form.Get("ttl")) if err != nil { return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) @@ -344,6 +344,16 @@ func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) er } +func getNodePath(urlPath string) string { + pathPrefixLen := len("/" + version + "/keys") + return urlPath[pathPrefixLen:] +} + + +//-------------------------------------- +// Testing +//-------------------------------------- + // TestHandler func TestHttpHandler(w http.ResponseWriter, req *http.Request) { testType := req.URL.Path[len("/test/"):] @@ -358,3 +368,25 @@ func TestHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusBadRequest) } + +func directSet() { + c := make(chan bool, 1000) + for i := 0; i < 1000; i++ { + go send(c) + } + + for i := 0; i < 1000; i++ { + <-c + } +} + +func send(c chan bool) { + for i := 0; i < 10; i++ { + command := &UpdateCommand{} + command.Key = "foo" + command.Value = "bar" + command.ExpireTime = time.Unix(0, 0) + //r.Do(command) + } + c <- true +} diff --git a/store/event.go b/store/event.go index 15866d745..484201865 100644 --- a/store/event.go +++ b/store/event.go @@ -1,12 +1,7 @@ package store import ( - "fmt" - "strings" - "sync" "time" - - etcdErr "github.com/coreos/etcd/error" ) const ( @@ -46,129 +41,41 @@ func newEvent(action string, key string, index uint64, term uint64) *Event { } } -type eventQueue struct { - Events []*Event - Size int - Front int - Capacity int -} +// Converts an event object into a response object. +func (event *Event) Response() interface{} { + if !event.Dir { + response := &Response{ + Action: event.Action, + Key: event.Key, + Value: event.Value, + PrevValue: event.PrevValue, + Index: event.Index, + TTL: event.TTL, + Expiration: event.Expiration, + } -func (eq *eventQueue) back() int { - return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity -} + if response.Action == Create || response.Action == Update { + response.Action = "set" + if response.PrevValue == "" { + response.NewKey = true + } + } -func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.Capacity - - eq.Events[index] = e - - if eq.Size == eq.Capacity { //dequeue - eq.Front = (index + 1) % eq.Capacity + return response } else { - eq.Size++ - } + responses := make([]*Response, len(event.KVPairs)) -} - -type EventHistory struct { - Queue eventQueue - StartIndex uint64 - LastIndex uint64 - LastTerm uint64 - DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue - rwl sync.RWMutex -} - -func newEventHistory(capacity int) *EventHistory { - return &EventHistory{ - Queue: eventQueue{ - Capacity: capacity, - Events: make([]*Event, capacity), - }, - } -} - -// addEvent function adds event into the eventHistory -func (eh *EventHistory) addEvent(e *Event) *Event { - eh.rwl.Lock() - defer eh.rwl.Unlock() - - var duped uint64 - - if e.Index == UndefIndex { - e.Index = eh.LastIndex - e.Term = eh.LastTerm - duped = 1 - } - - eh.Queue.insert(e) - - eh.LastIndex = e.Index - eh.LastTerm = e.Term - eh.DupCnt += duped - - eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index - - return e -} - -// scan function is enumerating events from the index in history and -// stops till the first point where the key has identified prefix -func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) { - eh.rwl.RLock() - defer eh.rwl.RUnlock() - - start := index - eh.StartIndex - - // the index should locate after the event history's StartIndex - if start < 0 { - return nil, - etcdErr.NewError(etcdErr.EcodeEventIndexCleared, - fmt.Sprintf("the requested history has been cleared [%v/%v]", - eh.StartIndex, index), UndefIndex, UndefTerm) - } - - // the index should locate before the size of the queue minus the duplicate count - if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index - return nil, nil - } - - i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) - - for { - e := eh.Queue.Events[i] - if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one - return e, nil - } - - i = (i + 1) % eh.Queue.Capacity - - if i == eh.Queue.back() { // find nothing, return and watch from current index - return nil, nil + for i, kv := range event.KVPairs { + responses[i] = &Response{ + Action: event.Action, + Key: kv.Key, + Value: kv.Value, + Dir: kv.Dir, + Index: event.Index, + } } + return responses } } -// clone will be protected by a stop-world lock -// do not need to obtain internal lock -func (eh *EventHistory) clone() *EventHistory { - clonedQueue := eventQueue{ - Capacity: eh.Queue.Capacity, - Events: make([]*Event, eh.Queue.Capacity), - Size: eh.Queue.Size, - Front: eh.Queue.Front, - } - for i, e := range eh.Queue.Events { - clonedQueue.Events[i] = e - } - - return &EventHistory{ - StartIndex: eh.StartIndex, - Queue: clonedQueue, - LastIndex: eh.LastIndex, - LastTerm: eh.LastTerm, - DupCnt: eh.DupCnt, - } - -} diff --git a/store/event_history.go b/store/event_history.go new file mode 100644 index 000000000..73db5d876 --- /dev/null +++ b/store/event_history.go @@ -0,0 +1,112 @@ +package store + +import ( + "fmt" + "strings" + "sync" + + etcdErr "github.com/coreos/etcd/error" +) + +type EventHistory struct { + Queue eventQueue + StartIndex uint64 + LastIndex uint64 + LastTerm uint64 + DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue + rwl sync.RWMutex +} + +func newEventHistory(capacity int) *EventHistory { + return &EventHistory{ + Queue: eventQueue{ + Capacity: capacity, + Events: make([]*Event, capacity), + }, + } +} + +// addEvent function adds event into the eventHistory +func (eh *EventHistory) addEvent(e *Event) *Event { + eh.rwl.Lock() + defer eh.rwl.Unlock() + + var duped uint64 + + if e.Index == UndefIndex { + e.Index = eh.LastIndex + e.Term = eh.LastTerm + duped = 1 + } + + eh.Queue.insert(e) + + eh.LastIndex = e.Index + eh.LastTerm = e.Term + eh.DupCnt += duped + + eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index + + return e +} + +// scan function is enumerating events from the index in history and +// stops till the first point where the key has identified prefix +func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) { + eh.rwl.RLock() + defer eh.rwl.RUnlock() + + start := index - eh.StartIndex + + // the index should locate after the event history's StartIndex + if start < 0 { + return nil, + etcdErr.NewError(etcdErr.EcodeEventIndexCleared, + fmt.Sprintf("the requested history has been cleared [%v/%v]", + eh.StartIndex, index), UndefIndex, UndefTerm) + } + + // the index should locate before the size of the queue minus the duplicate count + if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index + return nil, nil + } + + i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) + + for { + e := eh.Queue.Events[i] + if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one + return e, nil + } + + i = (i + 1) % eh.Queue.Capacity + + if i == eh.Queue.back() { // find nothing, return and watch from current index + return nil, nil + } + } +} + +// clone will be protected by a stop-world lock +// do not need to obtain internal lock +func (eh *EventHistory) clone() *EventHistory { + clonedQueue := eventQueue{ + Capacity: eh.Queue.Capacity, + Events: make([]*Event, eh.Queue.Capacity), + Size: eh.Queue.Size, + Front: eh.Queue.Front, + } + + for i, e := range eh.Queue.Events { + clonedQueue.Events[i] = e + } + + return &EventHistory{ + StartIndex: eh.StartIndex, + Queue: clonedQueue, + LastIndex: eh.LastIndex, + LastTerm: eh.LastTerm, + DupCnt: eh.DupCnt, + } + +} diff --git a/store/event_queue.go b/store/event_queue.go new file mode 100644 index 000000000..7c520ffe4 --- /dev/null +++ b/store/event_queue.go @@ -0,0 +1,26 @@ +package store + + +type eventQueue struct { + Events []*Event + Size int + Front int + Capacity int +} + +func (eq *eventQueue) back() int { + return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity +} + +func (eq *eventQueue) insert(e *Event) { + index := (eq.back() + 1) % eq.Capacity + + eq.Events[index] = e + + if eq.Size == eq.Capacity { //dequeue + eq.Front = (index + 1) % eq.Capacity + } else { + eq.Size++ + } + +} diff --git a/store/ttl.go b/store/ttl.go new file mode 100644 index 000000000..c73d95f8c --- /dev/null +++ b/store/ttl.go @@ -0,0 +1,21 @@ +package store + +import ( + "strconv" + "time" +) + +// Convert string duration to time format +func TTL(duration string) (time.Time, error) { + if duration != "" { + duration, err := strconv.Atoi(duration) + if err != nil { + return Permanent, err + } + return time.Now().Add(time.Second * (time.Duration)(duration)), nil + + } else { + return Permanent, nil + } +} + diff --git a/util.go b/util.go index c97d7f77d..d519fc1aa 100644 --- a/util.go +++ b/util.go @@ -1,42 +1,15 @@ package main import ( - "encoding/json" - "fmt" - "io" "net" - "net/http" "net/url" "os" "os/signal" "runtime/pprof" - "strconv" - "time" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" + "github.com/coreos/etcd/log" ) -//-------------------------------------- -// etcd http Helper -//-------------------------------------- - -// Convert string duration to time format -func durationToExpireTime(strDuration string) (time.Time, error) { - if strDuration != "" { - duration, err := strconv.Atoi(strDuration) - - if err != nil { - return store.Permanent, err - } - return time.Now().Add(time.Second * (time.Duration)(duration)), nil - - } else { - return store.Permanent, nil - } -} - //-------------------------------------- // HTTP Utilities //-------------------------------------- @@ -52,13 +25,13 @@ func sanitizeURL(host string, defaultScheme string) string { p, err := url.Parse(host) if err != nil { - fatal(err) + log.Fatal(err) } // Make sure the host is in Host:Port format _, _, err = net.SplitHostPort(host) if err != nil { - fatal(err) + log.Fatal(err) } p = &url.URL{Host: host, Scheme: defaultScheme} @@ -71,12 +44,12 @@ func sanitizeURL(host string, defaultScheme string) string { func sanitizeListenHost(listen string, advertised string) string { aurl, err := url.Parse(advertised) if err != nil { - fatal(err) + log.Fatal(err) } ahost, aport, err := net.SplitHostPort(aurl.Host) if err != nil { - fatal(err) + log.Fatal(err) } // If the listen host isn't set use the advertised host @@ -89,15 +62,10 @@ func sanitizeListenHost(listen string, advertised string) string { func check(err error) { if err != nil { - fatal(err) + log.Fatal(err) } } -func getNodePath(urlPath string) string { - pathPrefixLen := len("/" + version + "/keys") - return urlPath[pathPrefixLen:] -} - //-------------------------------------- // CPU profile //-------------------------------------- @@ -105,7 +73,7 @@ func runCPUProfile() { f, err := os.Create(cpuprofile) if err != nil { - fatal(err) + log.Fatal(err) } pprof.StartCPUProfile(f) @@ -113,34 +81,10 @@ func runCPUProfile() { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - infof("captured %v, stopping profiler and exiting..", sig) + log.Infof("captured %v, stopping profiler and exiting..", sig) pprof.StopCPUProfile() os.Exit(1) } }() } -//-------------------------------------- -// Testing -//-------------------------------------- -func directSet() { - c := make(chan bool, 1000) - for i := 0; i < 1000; i++ { - go send(c) - } - - for i := 0; i < 1000; i++ { - <-c - } -} - -func send(c chan bool) { - for i := 0; i < 10; i++ { - command := &UpdateCommand{} - command.Key = "foo" - command.Value = "bar" - command.ExpireTime = time.Unix(0, 0) - //r.Do(command) - } - c <- true -}