diff --git a/etcd.go b/etcd.go index 49271260f..6bc036724 100644 --- a/etcd.go +++ b/etcd.go @@ -201,13 +201,12 @@ func main() { // Create etcd key-value store etcdStore = store.New() - snapConf = newSnapshotConf() - // Create etcd and raft server - e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS) - r = newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS) + r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS) + snapConf = r.newSnapshotConf() + + e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r) - startWebInterface() r.ListenAndServe() e.ListenAndServe() diff --git a/etcd_handler_v1.go b/etcd_handler_v1.go index 44cde5a86..987b2e794 100644 --- a/etcd_handler_v1.go +++ b/etcd_handler_v1.go @@ -14,17 +14,17 @@ import ( // Handlers to handle etcd-store related request via etcd url //------------------------------------------------------------------- // Multiplex GET/POST/DELETE request to corresponding handlers -func MultiplexerV1(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) MultiplexerV1(w http.ResponseWriter, req *http.Request) error { switch req.Method { case "GET": - return GetHttpHandlerV1(w, req) + return e.GetHttpHandlerV1(w, req) case "POST": - return SetHttpHandlerV1(w, req) + return e.SetHttpHandlerV1(w, req) case "PUT": - return SetHttpHandlerV1(w, req) + return e.SetHttpHandlerV1(w, req) case "DELETE": - return DeleteHttpHandlerV1(w, req) + return e.DeleteHttpHandlerV1(w, req) default: w.WriteHeader(http.StatusMethodNotAllowed) return nil @@ -37,7 +37,7 @@ func MultiplexerV1(w http.ResponseWriter, req *http.Request) error { //-------------------------------------- // Set Command Handler -func SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -81,7 +81,7 @@ func SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { } // Delete Handler -func DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) @@ -101,9 +101,10 @@ func DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { //-------------------------------------- // Get Handler -func GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/keys/"):] + r := e.raftServer debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr) command := &GetCommand{ @@ -128,13 +129,13 @@ func GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { } // Watch handler -func WatchHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) WatchHttpHandlerV1(w http.ResponseWriter, req *http.Request) error { key := req.URL.Path[len("/v1/watch/"):] command := &WatchCommand{ Key: key, } - + r := e.raftServer if req.Method == "GET" { debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr) command.SinceIndex = 0 @@ -178,6 +179,7 @@ func dispatchEtcdCommandV1(c Command, w http.ResponseWriter, req *http.Request) } func dispatchV1(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error { + r := e.raftServer if r.State() == raft.Leader { if event, err := r.Do(c); err != nil { return err diff --git a/etcd_handlers.go b/etcd_handlers.go index fe64347a3..487c1cef5 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -19,18 +19,18 @@ import ( func NewEtcdMuxer() *http.ServeMux { // external commands etcdMux := http.NewServeMux() - etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer)) - etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler)) - etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler)) - etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler)) - etcdMux.Handle("/version", errorHandler(VersionHttpHandler)) + etcdMux.Handle("/"+version+"/keys/", errorHandler(e.Multiplexer)) + etcdMux.Handle("/"+version+"/leader", errorHandler(e.LeaderHttpHandler)) + etcdMux.Handle("/"+version+"/machines", errorHandler(e.MachinesHttpHandler)) + etcdMux.Handle("/"+version+"/stats/", errorHandler(e.StatsHttpHandler)) + etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler)) etcdMux.HandleFunc("/test/", TestHttpHandler) // backward support - etcdMux.Handle("/v1/keys/", errorHandler(MultiplexerV1)) - etcdMux.Handle("/v1/leader", errorHandler(LeaderHttpHandler)) - etcdMux.Handle("/v1/machines", errorHandler(MachinesHttpHandler)) - etcdMux.Handle("/v1/stats/", errorHandler(StatsHttpHandler)) + etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1)) + etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler)) + etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler)) + etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler)) return etcdMux } @@ -68,17 +68,17 @@ func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // Multiplex GET/POST/DELETE request to corresponding handlers -func Multiplexer(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error { switch req.Method { case "GET": - return GetHttpHandler(w, req) + return e.GetHttpHandler(w, req) case "POST": - return CreateHttpHandler(w, req) + return e.CreateHttpHandler(w, req) case "PUT": - return UpdateHttpHandler(w, req) + return e.UpdateHttpHandler(w, req) case "DELETE": - return DeleteHttpHandler(w, req) + return e.DeleteHttpHandler(w, req) default: w.WriteHeader(http.StatusMethodNotAllowed) return nil @@ -92,7 +92,7 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error { // Set/Delete will dispatch to leader //-------------------------------------- -func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { key := getNodePath(req.URL.Path) debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) @@ -115,11 +115,11 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { command.IncrementalSuffix = true } - return dispatchEtcdCommand(command, w, req) + return e.dispatchEtcdCommand(command, w, req) } -func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { key := getNodePath(req.URL.Path) debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) @@ -150,7 +150,7 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { ExpireTime: expireTime, } - return dispatchEtcdCommand(command, w, req) + return e.dispatchEtcdCommand(command, w, req) } else { // update with test var prevIndex uint64 @@ -173,13 +173,13 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { PrevIndex: prevIndex, } - return dispatchEtcdCommand(command, w, req) + return e.dispatchEtcdCommand(command, w, req) } } // Delete Handler -func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { key := getNodePath(req.URL.Path) debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) @@ -192,12 +192,12 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { command.Recursive = true } - return dispatchEtcdCommand(command, w, req) + return e.dispatchEtcdCommand(command, w, req) } // Dispatch the command to leader -func dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error { - return dispatch(c, w, req, nameToEtcdURL) +func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error { + return e.raftServer.dispatch(c, w, req, nameToEtcdURL) } //-------------------------------------- @@ -207,7 +207,9 @@ func dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) er //-------------------------------------- // Handler to return the current leader's raft address -func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { + r := e.raftServer + leader := r.Leader() if leader != "" { @@ -222,8 +224,8 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { } // Handler to return all the known machines in the current cluster -func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { - machines := getMachines(nameToEtcdURL) +func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { + machines := e.raftServer.getMachines(nameToEtcdURL) w.WriteHeader(http.StatusOK) w.Write([]byte(strings.Join(machines, ", "))) @@ -232,7 +234,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { } // Handler to return the current version of etcd -func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) fmt.Fprintf(w, "etcd %s", releaseVersion) @@ -240,10 +242,12 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { } // Handler to return the basic stats of etcd -func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { option := req.URL.Path[len("/v1/stats/"):] w.WriteHeader(http.StatusOK) + r := e.raftServer + switch option { case "self": w.Write(r.Stats()) @@ -266,9 +270,12 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { return nil } -func GetHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error { var err error var event interface{} + + r := e.raftServer + debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) if req.FormValue("consistent") == "true" && r.State() != raft.Leader { diff --git a/etcd_server.go b/etcd_server.go index 0139b03c2..657c4f967 100644 --- a/etcd_server.go +++ b/etcd_server.go @@ -6,26 +6,29 @@ import ( type etcdServer struct { http.Server - name string - url string - tlsConf *TLSConfig - tlsInfo *TLSInfo + raftServer *raftServer + name string + url string + tlsConf *TLSConfig + tlsInfo *TLSInfo } var e *etcdServer -func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer { - return &etcdServer{ +func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raftServer) *etcdServer { + e = &etcdServer{ Server: http.Server{ - Handler: NewEtcdMuxer(), TLSConfig: &tlsConf.Server, Addr: listenHost, }, - name: name, - url: urlStr, - tlsConf: tlsConf, - tlsInfo: tlsInfo, + name: name, + url: urlStr, + tlsConf: tlsConf, + tlsInfo: tlsInfo, + raftServer: raftServer, } + e.Handler = NewEtcdMuxer() + return e } // Start to listen and response etcd client command diff --git a/machines.go b/machines.go index b863c50fb..b8b4a09d5 100644 --- a/machines.go +++ b/machines.go @@ -2,7 +2,7 @@ package main // machineNum returns the number of machines in the cluster func machineNum() int { - e, err := etcdStore.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term()) + e, err := etcdStore.Get("/_etcd/machines", false, false, 0, 0) if err != nil { return 0 @@ -12,8 +12,7 @@ func machineNum() int { } // getMachines gets the current machines in the cluster -func getMachines(toURL func(string) (string, bool)) []string { - +func (r *raftServer) getMachines(toURL func(string) (string, bool)) []string { peers := r.Peers() machines := make([]string, len(peers)+1) diff --git a/name_url_map.go b/name_url_map.go index 1192f3fda..220963d5c 100644 --- a/name_url_map.go +++ b/name_url_map.go @@ -56,7 +56,7 @@ func readURL(nodeName string, urlName string) (string, bool) { // convert nodeName to url from etcd storage key := path.Join("/_etcd/machines", nodeName) - e, err := etcdStore.Get(key, false, false, r.CommitIndex(), r.Term()) + e, err := etcdStore.Get(key, false, false, 0, 0) if err != nil { return "", false diff --git a/raft_handlers.go b/raft_handlers.go index 6c95efd3f..a45fe496c 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -12,7 +12,7 @@ import ( //------------------------------------------------------------- // Get all the current logs -func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] GET %s/log", r.url) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -20,7 +20,7 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { } // Response to vote request -func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} err := decodeJsonRequest(req, rvreq) if err == nil { @@ -36,7 +36,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { } // Response to append entries request -func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.AppendEntriesRequest{} err := decodeJsonRequest(req, aereq) @@ -59,7 +59,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { } // Response to recover from snapshot request -func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRequest{} err := decodeJsonRequest(req, aereq) if err == nil { @@ -75,7 +75,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { } // Response to recover from snapshot request -func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.SnapshotRecoveryRequest{} err := decodeJsonRequest(req, aereq) if err == nil { @@ -91,20 +91,20 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { } // Get the port that listening for etcd connecting of the server -func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] Get %s/etcdURL/ ", r.url) w.WriteHeader(http.StatusOK) w.Write([]byte(argInfo.EtcdURL)) } // Response to the join request -func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { +func (r *raftServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { command := &JoinCommand{} if err := decodeJsonRequest(req, command); err == nil { debugf("Receive Join Request from %s", command.Name) - return dispatchRaftCommand(command, w, req) + return r.dispatchRaftCommand(command, w, req) } else { w.WriteHeader(http.StatusInternalServerError) return nil @@ -112,7 +112,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { } // Response to remove request -func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { if req.Method != "DELETE" { w.WriteHeader(http.StatusMethodNotAllowed) return @@ -125,23 +125,23 @@ func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] Remove Request [%s]", command.Name) - dispatchRaftCommand(command, w, req) + r.dispatchRaftCommand(command, w, req) } // Response to the name request -func NameHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] Get %s/name/ ", r.url) w.WriteHeader(http.StatusOK) w.Write([]byte(r.name)) } // Response to the name request -func RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) { +func (r *raftServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) { debugf("[recv] Get %s/version/ ", r.url) w.WriteHeader(http.StatusOK) w.Write([]byte(r.version)) } -func dispatchRaftCommand(c Command, w http.ResponseWriter, req *http.Request) error { - return dispatch(c, w, req, nameToRaftURL) +func (r *raftServer) dispatchRaftCommand(c Command, w http.ResponseWriter, req *http.Request) error { + return r.dispatch(c, w, req, nameToRaftURL) } diff --git a/raft_server.go b/raft_server.go index 8e48c027e..146a2e84e 100644 --- a/raft_server.go +++ b/raft_server.go @@ -28,13 +28,10 @@ type raftServer struct { serverStats *raftServerStats } -var r *raftServer +//var r *raftServer func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { - // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client) - raftWrapper := &raftServer{ version: raftVersion, name: name, @@ -57,6 +54,9 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi }, } + // Create transporter for raft + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raftWrapper) + // Create raft server server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, raftWrapper, "") check(err) @@ -91,16 +91,16 @@ func (r *raftServer) ListenAndServe() { // start as a leader in a new cluster if len(cluster) == 0 { - startAsLeader() + r.startAsLeader() } else { - startAsFollower() + r.startAsFollower() } } else { // rejoin the previous cluster - cluster = getMachines(nameToRaftURL) + cluster = r.getMachines(nameToRaftURL) for i := 0; i < len(cluster); i++ { u, err := url.Parse(cluster[i]) if err != nil { @@ -108,7 +108,7 @@ func (r *raftServer) ListenAndServe() { } cluster[i] = u.Host } - ok := joinCluster(cluster) + ok := r.joinCluster(cluster) if !ok { warn("the entire cluster is down! this machine will restart the cluster.") } @@ -118,7 +118,7 @@ func (r *raftServer) ListenAndServe() { // open the snapshot if snapshot { - go monitorSnapshot() + go r.monitorSnapshot() } // start to response to raft requests @@ -126,7 +126,7 @@ func (r *raftServer) ListenAndServe() { } -func startAsLeader() { +func (r *raftServer) startAsLeader() { // leader need to join self as a peer for { _, err := r.Do(newJoinCommand(r.version, r.Name(), r.url, e.url)) @@ -137,10 +137,10 @@ func startAsLeader() { debugf("%s start as a leader", r.name) } -func startAsFollower() { +func (r *raftServer) startAsFollower() { // start as a follower in a existing cluster for i := 0; i < retryTimes; i++ { - ok := joinCluster(cluster) + ok := r.joinCluster(cluster) if ok { return } @@ -164,16 +164,16 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { } // internal commands - raftMux.HandleFunc("/name", NameHttpHandler) - raftMux.HandleFunc("/version", RaftVersionHttpHandler) - raftMux.Handle("/join", errorHandler(JoinHttpHandler)) - raftMux.HandleFunc("/remove/", RemoveHttpHandler) - raftMux.HandleFunc("/vote", VoteHttpHandler) - raftMux.HandleFunc("/log", GetLogHttpHandler) - raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) - raftMux.HandleFunc("/snapshot", SnapshotHttpHandler) - raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) - raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler) + raftMux.HandleFunc("/name", r.NameHttpHandler) + raftMux.HandleFunc("/version", r.RaftVersionHttpHandler) + raftMux.Handle("/join", errorHandler(r.JoinHttpHandler)) + raftMux.HandleFunc("/remove/", r.RemoveHttpHandler) + raftMux.HandleFunc("/vote", r.VoteHttpHandler) + raftMux.HandleFunc("/log", r.GetLogHttpHandler) + raftMux.HandleFunc("/log/append", r.AppendEntriesHttpHandler) + raftMux.HandleFunc("/snapshot", r.SnapshotHttpHandler) + raftMux.HandleFunc("/snapshotRecovery", r.SnapshotRecoveryHttpHandler) + raftMux.HandleFunc("/etcdURL", r.EtcdURLHttpHandler) if scheme == "http" { fatal(server.ListenAndServe()) @@ -202,14 +202,14 @@ func getVersion(t *transporter, versionURL url.URL) (string, error) { return string(body), nil } -func joinCluster(cluster []string) bool { +func (r *raftServer) joinCluster(cluster []string) bool { for _, machine := range cluster { if len(machine) == 0 { continue } - err := joinByMachine(r.Server, machine, r.tlsConf.Scheme) + err := r.joinByMachine(r.Server, machine, r.tlsConf.Scheme) if err == nil { debugf("%s success join to the cluster via machine %s", r.name, machine) return true @@ -226,7 +226,7 @@ func joinCluster(cluster []string) bool { } // Send join requests to machine. -func joinByMachine(s *raft.Server, machine string, scheme string) error { +func (r *raftServer) joinByMachine(s *raft.Server, machine string, scheme string) error { var b bytes.Buffer // t must be ok diff --git a/raft_stats.go b/raft_stats.go index 23f97d417..45d21037f 100644 --- a/raft_stats.go +++ b/raft_stats.go @@ -33,6 +33,7 @@ func (ps *packageStats) Time() time.Time { } type raftServerStats struct { + Name string `json:"name"` State string `json:"state"` StartTime time.Time `json:"startTime"` @@ -70,7 +71,7 @@ func (ss *raftServerStats) SendAppendReq(pkgSize int) { if ss.State != raft.Leader { ss.State = raft.Leader - ss.LeaderInfo.Name = r.Name() + ss.LeaderInfo.Name = ss.Name ss.LeaderInfo.startTime = now } diff --git a/snapshot.go b/snapshot.go index a6caefd32..1b2bc728c 100644 --- a/snapshot.go +++ b/snapshot.go @@ -18,12 +18,12 @@ type snapshotConf struct { var snapConf *snapshotConf -func newSnapshotConf() *snapshotConf { +func (r *raftServer) newSnapshotConf() *snapshotConf { // check snapshot every 3 seconds and the threshold is 20K return &snapshotConf{time.Second * 3, 0, 20 * 1000} } -func monitorSnapshot() { +func (r *raftServer) monitorSnapshot() { for { time.Sleep(snapConf.checkingInterval) //currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites diff --git a/store/store.go b/store/store.go index 2af95639d..348aea02e 100644 --- a/store/store.go +++ b/store/store.go @@ -356,7 +356,9 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node, nodePath = path.Clean(path.Join("/", nodePath)) // update file system known index and term - s.Index, s.Term = index, term + if index > s.Index { + s.Index, s.Term = index, term + } walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) { diff --git a/transporter.go b/transporter.go index eb4308377..19a09c73e 100644 --- a/transporter.go +++ b/transporter.go @@ -27,14 +27,15 @@ var tranTimeout = ElectionTimeout // Transporter layer for communication between raft nodes type transporter struct { - client *http.Client - transport *http.Transport + client *http.Client + transport *http.Transport + raftServer *raftServer } // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config) *transporter { +func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *transporter { t := transporter{} tr := &http.Transport{ @@ -49,6 +50,7 @@ func newTransporter(scheme string, tlsConf tls.Config) *transporter { t.client = &http.Client{Transport: tr} t.transport = tr + t.raftServer = raftServer return &t } @@ -67,18 +69,18 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P size := b.Len() - r.serverStats.SendAppendReq(size) + t.raftServer.serverStats.SendAppendReq(size) u, _ := nameToRaftURL(peer.Name) debugf("Send LogEntries to %s ", u) - thisFollowerStats, ok := r.followersStats.Followers[peer.Name] + thisFollowerStats, ok := t.raftServer.followersStats.Followers[peer.Name] if !ok { //this is the first time this follower has been seen thisFollowerStats = &raftFollowerStats{} thisFollowerStats.Latency.Minimum = 1 << 63 - r.followersStats.Followers[peer.Name] = thisFollowerStats + t.raftServer.followersStats.Followers[peer.Name] = thisFollowerStats } start := time.Now() diff --git a/transporter_test.go b/transporter_test.go index 8c71325c6..3d9655dbd 100644 --- a/transporter_test.go +++ b/transporter_test.go @@ -21,7 +21,7 @@ func TestTransporterTimeout(t *testing.T) { conf := tls.Config{} - ts := newTransporter("http", conf) + ts := newTransporter("http", conf, nil) ts.Get("http://google.com") _, _, err := ts.Get("http://google.com:9999") diff --git a/util.go b/util.go index edecb97e4..318f728d1 100644 --- a/util.go +++ b/util.go @@ -15,7 +15,6 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" - "github.com/coreos/etcd/web" "github.com/coreos/go-log/log" "github.com/coreos/go-raft" ) @@ -39,35 +38,11 @@ func durationToExpireTime(strDuration string) (time.Time, error) { } } -//-------------------------------------- -// Web Helper -//-------------------------------------- -var storeMsg chan string - -// Help to send msg from store to webHub -func webHelper() { - storeMsg = make(chan string) - // etcdStore.SetMessager(storeMsg) - for { - // transfer the new msg to webHub - web.Hub().Send(<-storeMsg) - } -} - -// startWebInterface starts web interface if webURL is not empty -func startWebInterface() { - if argInfo.WebURL != "" { - // start web - go webHelper() - go web.Start(r.Server, argInfo.WebURL) - } -} - //-------------------------------------- // HTTP Utilities //-------------------------------------- -func dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error { +func (r *raftServer) dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error { if r.State() == raft.Leader { if response, err := r.Do(c); err != nil { return err @@ -278,7 +253,7 @@ func send(c chan bool) { command.Key = "foo" command.Value = "bar" command.ExpireTime = time.Unix(0, 0) - r.Do(command) + //r.Do(command) } c <- true }