From eb78d96a204704026497cc6aef4eb372d1946ff3 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 12 Oct 2013 00:28:46 -0600 Subject: [PATCH] Intermediate commit. --- .gitignore | 2 +- build | 2 +- build.ps1 | 2 +- command/command.go | 19 - etcd.go | 18 +- name_url_map.go | 74 --- raft_handlers.go | 147 ------ raft_server.go | 317 ------------ server/join_command.go | 32 +- server/peer_server.go | 474 ++++++++++++++++++ server/registry.go | 167 ++++++ server/remove_command.go | 16 +- server/server.go | 3 +- server/timeout.go | 11 + server/transporter.go | 226 +++++++++ .../transporter_test.go | 2 +- server/util.go | 17 + server/v2/handlers.go | 4 - version.go => server/version.go | 6 +- snapshot.go | 36 -- {command => store}/create_command.go | 9 +- {command => store}/delete_command.go | 9 +- store/store.go | 10 - {command => store}/test_and_set_command.go | 9 +- {command => store}/update_command.go | 9 +- transporter.go | 233 --------- util.go | 22 - 27 files changed, 953 insertions(+), 923 deletions(-) delete mode 100644 command/command.go delete mode 100644 name_url_map.go delete mode 100644 raft_handlers.go delete mode 100644 raft_server.go create mode 100644 server/peer_server.go create mode 100644 server/registry.go create mode 100644 server/timeout.go create mode 100644 server/transporter.go rename transporter_test.go => server/transporter_test.go (98%) create mode 100644 server/util.go rename version.go => server/version.go (71%) delete mode 100644 snapshot.go rename {command => store}/create_command.go (86%) rename {command => store}/delete_command.go (83%) rename {command => store}/test_and_set_command.go (88%) rename {command => store}/update_command.go (85%) delete mode 100644 transporter.go diff --git a/.gitignore b/.gitignore index d00d899e2..4d8d3967c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ src/ pkg/ /etcd -release_version.go +/server/release_version.go /machine* diff --git a/build b/build index b121ba30d..9c2f95f6a 100755 --- a/build +++ b/build @@ -21,5 +21,5 @@ for i in third_party/*; do cp -R "$i" src/ done -./scripts/release-version > release_version.go +./scripts/release-version > server/release_version.go go build "${ETCD_PACKAGE}" diff --git a/build.ps1 b/build.ps1 index c75f608d7..fb8a386ce 100644 --- a/build.ps1 +++ b/build.ps1 @@ -20,5 +20,5 @@ foreach($i in (ls third_party/*)){ cp -Recurse -force "$i" src/ } -./scripts/release-version.ps1 | Out-File -Encoding UTF8 release_version.go +./scripts/release-version.ps1 | Out-File -Encoding UTF8 server/release_version.go go build -v "${ETCD_PACKAGE}" diff --git a/command/command.go b/command/command.go deleted file mode 100644 index 50513d5f5..000000000 --- a/command/command.go +++ /dev/null @@ -1,19 +0,0 @@ -package command - -import ( - "github.com/coreos/go-raft" -) - -// A command represents an action to be taken on the replicated state machine. -type Command interface { - CommandName() string - Apply(server *raft.Server) (interface{}, error) -} - -// Registers commands to the Raft library. -func Register() { - raft.RegisterCommand(&DeleteCommand{}) - raft.RegisterCommand(&TestAndSetCommand{}) - raft.RegisterCommand(&CreateCommand{}) - raft.RegisterCommand(&UpdateCommand{}) -} diff --git a/etcd.go b/etcd.go index 79bda7b67..82bc09649 100644 --- a/etcd.go +++ b/etcd.go @@ -85,12 +85,6 @@ func init() { flag.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')") } -const ( - ElectionTimeout = 200 * time.Millisecond - HeartbeatTimeout = 50 * time.Millisecond - RetryInterval = 10 -) - //------------------------------------------------------------------------------ // // Typedefs @@ -185,16 +179,18 @@ func main() { // Create etcd key-value store etcdStore = store.New() - // Create etcd and raft server - r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS) - r.MaxClusterSize = maxClusterSize - snapConf = r.newSnapshotConf() + // Create a shared node registry. + registry := server.NewRegistry() + + // Create peer server. + ps := NewPeerServer(info.Name, dirPath, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS, registry) + ps.MaxClusterSize = maxClusterSize s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r) if err := e.AllowOrigins(cors); err != nil { panic(err) } - r.ListenAndServe() + ps.ListenAndServe(snapshot) s.ListenAndServe() } diff --git a/name_url_map.go b/name_url_map.go deleted file mode 100644 index 220963d5c..000000000 --- a/name_url_map.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "net/url" - "path" -) - -// we map node name to url -type nodeInfo struct { - raftVersion string - raftURL string - etcdURL string -} - -var namesMap = make(map[string]*nodeInfo) - -// nameToEtcdURL maps node name to its etcd http address -func nameToEtcdURL(name string) (string, bool) { - - if info, ok := namesMap[name]; ok { - // first try to read from the map - return info.etcdURL, true - } - - // if fails, try to recover from etcd storage - return readURL(name, "etcd") - -} - -// nameToRaftURL maps node name to its raft http address -func nameToRaftURL(name string) (string, bool) { - if info, ok := namesMap[name]; ok { - // first try to read from the map - return info.raftURL, true - - } - - // if fails, try to recover from etcd storage - return readURL(name, "raft") -} - -// addNameToURL add a name that maps to raftURL and etcdURL -func addNameToURL(name string, version string, raftURL string, etcdURL string) { - namesMap[name] = &nodeInfo{ - raftVersion: raftVersion, - raftURL: raftURL, - etcdURL: etcdURL, - } -} - -func readURL(nodeName string, urlName string) (string, bool) { - if nodeName == "" { - return "", false - } - - // convert nodeName to url from etcd storage - key := path.Join("/_etcd/machines", nodeName) - - e, err := etcdStore.Get(key, false, false, 0, 0) - - if err != nil { - return "", false - } - - m, err := url.ParseQuery(e.Value) - - if err != nil { - panic("Failed to parse machines entry") - } - - url := m[urlName][0] - - return url, true -} diff --git a/raft_handlers.go b/raft_handlers.go deleted file mode 100644 index a45fe496c..000000000 --- a/raft_handlers.go +++ /dev/null @@ -1,147 +0,0 @@ -package main - -import ( - "encoding/json" - "net/http" - - "github.com/coreos/go-raft" -) - -//------------------------------------------------------------- -// Handlers to handle raft related request via raft server port -//------------------------------------------------------------- - -// Get all the current logs -func (r *raftServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] GET %s/log", r.url) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(r.LogEntries()) -} - -// Response to vote request -func (r *raftServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { - rvreq := &raft.RequestVoteRequest{} - err := decodeJsonRequest(req, rvreq) - if err == nil { - debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName) - if resp := r.RequestVote(rvreq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - warnf("[vote] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) -} - -// Response to append entries request -func (r *raftServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.AppendEntriesRequest{} - err := decodeJsonRequest(req, aereq) - - if err == nil { - debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries)) - - r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) - - if resp := r.AppendEntries(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - if !resp.Success { - debugf("[Append Entry] Step back") - } - return - } - } - warnf("[Append Entry] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) -} - -// Response to recover from snapshot request -func (r *raftServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - debugf("[recv] POST %s/snapshot/ ", r.url) - if resp := r.RequestSnapshot(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) -} - -// Response to recover from snapshot request -func (r *raftServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRecoveryRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - debugf("[recv] POST %s/snapshotRecovery/ ", r.url) - if resp := r.SnapshotRecoveryRequest(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } - } - warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) -} - -// Get the port that listening for etcd connecting of the server -func (r *raftServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] Get %s/etcdURL/ ", r.url) - w.WriteHeader(http.StatusOK) - w.Write([]byte(argInfo.EtcdURL)) -} - -// Response to the join request -func (r *raftServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { - - command := &JoinCommand{} - - if err := decodeJsonRequest(req, command); err == nil { - debugf("Receive Join Request from %s", command.Name) - return r.dispatchRaftCommand(command, w, req) - } else { - w.WriteHeader(http.StatusInternalServerError) - return nil - } -} - -// Response to remove request -func (r *raftServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "DELETE" { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - - nodeName := req.URL.Path[len("/remove/"):] - command := &RemoveCommand{ - Name: nodeName, - } - - debugf("[recv] Remove Request [%s]", command.Name) - - r.dispatchRaftCommand(command, w, req) -} - -// Response to the name request -func (r *raftServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] Get %s/name/ ", r.url) - w.WriteHeader(http.StatusOK) - w.Write([]byte(r.name)) -} - -// Response to the name request -func (r *raftServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) { - debugf("[recv] Get %s/version/ ", r.url) - w.WriteHeader(http.StatusOK) - w.Write([]byte(r.version)) -} - -func (r *raftServer) dispatchRaftCommand(c Command, w http.ResponseWriter, req *http.Request) error { - return r.dispatch(c, w, req, nameToRaftURL) -} diff --git a/raft_server.go b/raft_server.go deleted file mode 100644 index 00cf4cbdb..000000000 --- a/raft_server.go +++ /dev/null @@ -1,317 +0,0 @@ -package main - -import ( - "bytes" - "crypto/tls" - "encoding/binary" - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "time" - - "github.com/coreos/etcd/command" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/go-raft" -) - -func init() { - command.Register() -} - -type raftServer struct { - *raft.Server - version string - joinIndex uint64 - name string - url string - listenHost string - tlsConf *TLSConfig - tlsInfo *TLSInfo - followersStats *raftFollowersStats - serverStats *raftServerStats - MaxClusterSize int -} - -func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { - - raftWrapper := &raftServer{ - version: raftVersion, - name: name, - url: url, - listenHost: listenHost, - tlsConf: tlsConf, - tlsInfo: tlsInfo, - followersStats: &raftFollowersStats{ - Leader: name, - Followers: make(map[string]*raftFollowerStats), - }, - serverStats: &raftServerStats{ - StartTime: time.Now(), - sendRateQueue: &statsQueue{ - back: -1, - }, - recvRateQueue: &statsQueue{ - back: -1, - }, - }, - } - - // Create transporter for raft - raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raftWrapper) - - // Create raft server - server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, raftWrapper, "") - check(err) - - raftWrapper.Server = server - - return raftWrapper -} - -// Start the raft server -func (r *raftServer) ListenAndServe() { - // LoadSnapshot - if snapshot { - err := r.LoadSnapshot() - - if err == nil { - debugf("%s finished load snapshot", r.name) - } else { - debug(err) - } - } - - r.SetElectionTimeout(ElectionTimeout) - r.SetHeartbeatTimeout(HeartbeatTimeout) - - r.Start() - - if r.IsLogEmpty() { - - // start as a leader in a new cluster - if len(cluster) == 0 { - r.startAsLeader() - - } else { - r.startAsFollower() - } - - } else { - - // rejoin the previous cluster - cluster = r.getMachines(nameToRaftURL) - for i := 0; i < len(cluster); i++ { - u, err := url.Parse(cluster[i]) - if err != nil { - debug("rejoin cannot parse url: ", err) - } - cluster[i] = u.Host - } - ok := r.joinCluster(cluster) - if !ok { - warn("the entire cluster is down! this machine will restart the cluster.") - } - - debugf("%s restart as a follower", r.name) - } - - // open the snapshot - if snapshot { - go r.monitorSnapshot() - } - - // start to response to raft requests - go r.startTransport(r.tlsConf.Scheme, r.tlsConf.Server) - -} - -func (r *raftServer) startAsLeader() { - // leader need to join self as a peer - for { - _, err := r.Do(newJoinCommand(r.version, r.Name(), r.url, e.url)) - if err == nil { - break - } - } - debugf("%s start as a leader", r.name) -} - -func (r *raftServer) startAsFollower() { - // start as a follower in a existing cluster - for i := 0; i < retryTimes; i++ { - ok := r.joinCluster(cluster) - if ok { - return - } - warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) - time.Sleep(time.Second * RetryInterval) - } - - fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) -} - -// Start to listen and response raft command -func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { - infof("raft server [name %s, listen on %s, advertised url %s]", r.name, r.listenHost, r.url) - - raftMux := http.NewServeMux() - - server := &http.Server{ - Handler: raftMux, - TLSConfig: &tlsConf, - Addr: r.listenHost, - } - - // internal commands - raftMux.HandleFunc("/name", r.NameHttpHandler) - raftMux.HandleFunc("/version", r.RaftVersionHttpHandler) - raftMux.Handle("/join", errorHandler(r.JoinHttpHandler)) - raftMux.HandleFunc("/remove/", r.RemoveHttpHandler) - raftMux.HandleFunc("/vote", r.VoteHttpHandler) - raftMux.HandleFunc("/log", r.GetLogHttpHandler) - raftMux.HandleFunc("/log/append", r.AppendEntriesHttpHandler) - raftMux.HandleFunc("/snapshot", r.SnapshotHttpHandler) - raftMux.HandleFunc("/snapshotRecovery", r.SnapshotRecoveryHttpHandler) - raftMux.HandleFunc("/etcdURL", r.EtcdURLHttpHandler) - - if scheme == "http" { - fatal(server.ListenAndServe()) - } else { - fatal(server.ListenAndServeTLS(r.tlsInfo.CertFile, r.tlsInfo.KeyFile)) - } - -} - -// getVersion fetches the raft version of a peer. This works for now but we -// will need to do something more sophisticated later when we allow mixed -// version clusters. -func getVersion(t *transporter, versionURL url.URL) (string, error) { - resp, req, err := t.Get(versionURL.String()) - - if err != nil { - return "", err - } - - defer resp.Body.Close() - - t.CancelWhenTimeout(req) - - body, err := ioutil.ReadAll(resp.Body) - - return string(body), nil -} - -func (r *raftServer) joinCluster(cluster []string) bool { - for _, machine := range cluster { - - if len(machine) == 0 { - continue - } - - err := r.joinByMachine(r.Server, machine, r.tlsConf.Scheme) - if err == nil { - debugf("%s success join to the cluster via machine %s", r.name, machine) - return true - - } else { - if _, ok := err.(etcdErr.Error); ok { - fatal(err) - } - - debugf("cannot join to cluster via machine %s %s", machine, err) - } - } - return false -} - -// Send join requests to machine. -func (r *raftServer) joinByMachine(s *raft.Server, machine string, scheme string) error { - var b bytes.Buffer - - // t must be ok - t, _ := r.Transporter().(*transporter) - - // Our version must match the leaders version - versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} - version, err := getVersion(t, versionURL) - if err != nil { - return fmt.Errorf("Unable to join: %v", err) - } - - // TODO: versioning of the internal protocol. See: - // Documentation/internatl-protocol-versioning.md - if version != r.version { - return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd") - } - - json.NewEncoder(&b).Encode(newJoinCommand(r.version, r.Name(), r.url, e.url)) - - joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} - - debugf("Send Join Request to %s", joinURL.String()) - - resp, req, err := t.Post(joinURL.String(), &b) - - for { - if err != nil { - return fmt.Errorf("Unable to join: %v", err) - } - if resp != nil { - defer resp.Body.Close() - - t.CancelWhenTimeout(req) - - if resp.StatusCode == http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - r.joinIndex, _ = binary.Uvarint(b) - return nil - } - if resp.StatusCode == http.StatusTemporaryRedirect { - - address := resp.Header.Get("Location") - debugf("Send Join Request to %s", address) - - json.NewEncoder(&b).Encode(newJoinCommand(r.version, r.Name(), r.url, e.url)) - - resp, req, err = t.Post(address, &b) - - } else if resp.StatusCode == http.StatusBadRequest { - debug("Reach max number machines in the cluster") - decoder := json.NewDecoder(resp.Body) - err := &etcdErr.Error{} - decoder.Decode(err) - return *err - } else { - return fmt.Errorf("Unable to join") - } - } - - } - return fmt.Errorf("Unable to join: %v", err) -} - -func (r *raftServer) Stats() []byte { - r.serverStats.LeaderInfo.Uptime = time.Now().Sub(r.serverStats.LeaderInfo.startTime).String() - - queue := r.serverStats.sendRateQueue - - r.serverStats.SendingPkgRate, r.serverStats.SendingBandwidthRate = queue.Rate() - - queue = r.serverStats.recvRateQueue - - r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate() - - b, _ := json.Marshal(r.serverStats) - - return b -} - -func (r *raftServer) PeerStats() []byte { - if r.State() == raft.Leader { - b, _ := json.Marshal(r.followersStats) - return b - } - return nil -} diff --git a/server/join_command.go b/server/join_command.go index 89a3a4e54..f338f3d39 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -2,8 +2,6 @@ package server import ( "encoding/binary" - "fmt" - "path" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" @@ -42,38 +40,32 @@ func (c *JoinCommand) CommandName() string { // Join a server to the cluster func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { s, _ := server.StateMachine().(*store.Store) - r, _ := server.Context().(*RaftServer) - - // check if the join command is from a previous machine, who lost all its previous log. - e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term()) + ps, _ := server.Context().(*PeerServer) b := make([]byte, 8) binary.PutUvarint(b, server.CommitIndex()) - if e != nil { + // Check if the join command is from a previous machine, who lost all its previous log. + if _, ok := ps.registry.URL(c.Name); ok { return b, nil } - // check machine number in the cluster - if s.MachineCount() == c.MaxClusterSize { + // Check machine number in the cluster + if ps.registry.Count() == c.MaxClusterSize { log.Debug("Reject join request from ", c.Name) return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term()) } - addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL) + // Add to shared machine registry. + ps.registry.Register(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term()) - // add peer in raft + // Add peer in raft err := server.AddPeer(c.Name, "") - // add machine in etcd storage - key := path.Join("_etcd/machines", c.Name) - value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) - s.Create(key, value, false, false, store.Permanent, server.CommitIndex(), server.Term()) - - // add peer stats - if c.Name != r.Name() { - r.followersStats.Followers[c.Name] = &raftFollowerStats{} - r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 + // Add peer stats + if c.Name != ps.Name() { + ps.followersStats.Followers[c.Name] = &raftFollowerStats{} + ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 } return b, err diff --git a/server/peer_server.go b/server/peer_server.go new file mode 100644 index 000000000..0c24c1521 --- /dev/null +++ b/server/peer_server.go @@ -0,0 +1,474 @@ +package server + +import ( + "bytes" + "crypto/tls" + "encoding/binary" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" +) + +type PeerServer struct { + *raft.Server + joinIndex uint64 + name string + url string + listenHost string + tlsConf *TLSConfig + tlsInfo *TLSInfo + followersStats *raftFollowersStats + serverStats *raftServerStats + registry *Registry + store *store.Store + snapConf *snapshotConf + MaxClusterSize int +} + +// TODO: find a good policy to do snapshot +type snapshotConf struct { + // Etcd will check if snapshot is need every checkingInterval + checkingInterval time.Duration + + // The number of writes when the last snapshot happened + lastWrites uint64 + + // If the incremental number of writes since the last snapshot + // exceeds the write Threshold, etcd will do a snapshot + writesThr uint64 +} + + +func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer { + s := &PeerServer{ + name: name, + url: url, + listenHost: listenHost, + tlsConf: tlsConf, + tlsInfo: tlsInfo, + registry: registry, + store: store, + snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000}, + followersStats: &raftFollowersStats{ + Leader: name, + Followers: make(map[string]*raftFollowerStats), + }, + serverStats: &raftServerStats{ + StartTime: time.Now(), + sendRateQueue: &statsQueue{ + back: -1, + }, + recvRateQueue: &statsQueue{ + back: -1, + }, + }, + } + + // Create transporter for raft + raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) + + // Create raft server + server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") + if err != nil { + log.Fatal(err) + } + + s.Server = server + + return s +} + +// Start the raft server +func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { + // LoadSnapshot + if snapshot { + err := s.LoadSnapshot() + + if err == nil { + log.Debugf("%s finished load snapshot", s.name) + } else { + log.Debug(err) + } + } + + s.SetElectionTimeout(ElectionTimeout) + s.SetHeartbeatTimeout(HeartbeatTimeout) + + s.Start() + + if s.IsLogEmpty() { + // start as a leader in a new cluster + if len(cluster) == 0 { + s.startAsLeader() + } else { + s.startAsFollower(cluster) + } + + } else { + // Rejoin the previous cluster + cluster = s.registry.PeerURLs() + for i := 0; i < len(cluster); i++ { + u, err := url.Parse(cluster[i]) + if err != nil { + log.Debug("rejoin cannot parse url: ", err) + } + cluster[i] = u.Host + } + ok := s.joinCluster(cluster) + if !ok { + log.Warn("the entire cluster is down! this machine will restart the cluster.") + } + + log.Debugf("%s restart as a follower", s.name) + } + + // open the snapshot + if snapshot { + go s.monitorSnapshot() + } + + // start to response to raft requests + go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server) + +} + +// Get all the current logs +func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { + log.Debugf("[recv] GET %s/log", s.url) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(s.LogEntries()) +} + +// Response to vote request +func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { + rvreq := &raft.RequestVoteRequest{} + err := decodeJsonRequest(req, rvreq) + if err == nil { + log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName) + if resp := s.RequestVote(rvreq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + log.Warnf("[vote] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + +// Response to append entries request +func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { + aereq := &raft.AppendEntriesRequest{} + err := decodeJsonRequest(req, aereq) + + if err == nil { + log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries)) + + s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) + + if resp := s.AppendEntries(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + if !resp.Success { + log.Debugf("[Append Entry] Step back") + } + return + } + } + log.Warnf("[Append Entry] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + +// Response to recover from snapshot request +func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { + aereq := &raft.SnapshotRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + log.Debugf("[recv] POST %s/snapshot/ ", s.url) + if resp := s.RequestSnapshot(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + log.Warnf("[Snapshot] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + +// Response to recover from snapshot request +func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { + aereq := &raft.SnapshotRecoveryRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url) + if resp := s.SnapshotRecoveryRequest(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + log.Warnf("[Snapshot] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + +// Get the port that listening for etcd connecting of the server +func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { + log.Debugf("[recv] Get %s/etcdURL/ ", s.url) + w.WriteHeader(http.StatusOK) + w.Write([]byte(argInfo.EtcdURL)) +} + +// Response to the join request +func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) error { + command := &JoinCommand{} + + if err := decodeJsonRequest(req, command); err == nil { + log.Debugf("Receive Join Request from %s", command.Name) + return s.dispatchRaftCommand(command, w, req) + } else { + w.WriteHeader(http.StatusInternalServerError) + return nil + } +} + +// Response to remove request +func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { + if req.Method != "DELETE" { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + nodeName := req.URL.Path[len("/remove/"):] + command := &RemoveCommand{ + Name: nodeName, + } + + log.Debugf("[recv] Remove Request [%s]", command.Name) + + s.dispatchRaftCommand(command, w, req) +} + +// Response to the name request +func (s *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { + log.Debugf("[recv] Get %s/name/ ", s.url) + w.WriteHeader(http.StatusOK) + w.Write([]byte(s.name)) +} + +// Response to the name request +func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) { + log.Debugf("[recv] Get %s/version/ ", s.url) + w.WriteHeader(http.StatusOK) + w.Write([]byte(PeerVersion)) +} + +func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error { + return s.dispatch(c, w, req, nameToRaftURL) +} + +func (s *PeerServer) startAsLeader() { + // leader need to join self as a peer + for { + _, err := s.Do(newJoinCommand(PeerVersion, s.Name(), s.url, e.url)) + if err == nil { + break + } + } + log.Debugf("%s start as a leader", s.name) +} + +func (s *PeerServer) startAsFollower(cluster []string) { + // start as a follower in a existing cluster + for i := 0; i < retryTimes; i++ { + ok := s.joinCluster(cluster) + if ok { + return + } + log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) + time.Sleep(time.Second * RetryInterval) + } + + fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) +} + +// Start to listen and response raft command +func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) { + infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url) + + raftMux := http.NewServeMux() + + server := &http.Server{ + Handler: raftMux, + TLSConfig: &tlsConf, + Addr: s.listenHost, + } + + // internal commands + raftMux.HandleFunc("/name", s.NameHttpHandler) + raftMux.HandleFunc("/version", s.RaftVersionHttpHandler) + raftMux.Handle("/join", errorHandler(s.JoinHttpHandler)) + raftMux.HandleFunc("/remove/", s.RemoveHttpHandler) + raftMux.HandleFunc("/vote", s.VoteHttpHandler) + raftMux.HandleFunc("/log", s.GetLogHttpHandler) + raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler) + raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler) + raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) + raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) + + if scheme == "http" { + fatal(server.ListenAndServe()) + } else { + fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) + } + +} + +// getVersion fetches the raft version of a peer. This works for now but we +// will need to do something more sophisticated later when we allow mixed +// version clusters. +func getVersion(t *transporter, versionURL url.URL) (string, error) { + resp, req, err := t.Get(versionURL.String()) + + if err != nil { + return "", err + } + + defer resp.Body.Close() + + t.CancelWhenTimeout(req) + + body, err := ioutil.ReadAll(resp.Body) + + return string(body), nil +} + +func (s *PeerServer) joinCluster(cluster []string) bool { + for _, machine := range cluster { + if len(machine) == 0 { + continue + } + + err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme) + if err == nil { + log.Debugf("%s success join to the cluster via machine %s", s.name, machine) + return true + + } else { + if _, ok := err.(etcdErr.Error); ok { + fatal(err) + } + + log.Debugf("cannot join to cluster via machine %s %s", machine, err) + } + } + return false +} + +// Send join requests to machine. +func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error { + var b bytes.Buffer + + // t must be ok + t, _ := server.Transporter().(*transporter) + + // Our version must match the leaders version + versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} + version, err := getVersion(t, versionURL) + if err != nil { + return fmt.Errorf("Unable to join: %v", err) + } + + // TODO: versioning of the internal protocol. See: + // Documentation/internatl-protocol-versioning.md + if version != PeerVersion { + return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd") + } + + json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url)) + + joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} + + log.Debugf("Send Join Request to %s", joinURL.String()) + + resp, req, err := t.Post(joinURL.String(), &b) + + for { + if err != nil { + return fmt.Errorf("Unable to join: %v", err) + } + if resp != nil { + defer resp.Body.Close() + + t.CancelWhenTimeout(req) + + if resp.StatusCode == http.StatusOK { + b, _ := ioutil.ReadAll(resp.Body) + server.joinIndex, _ = binary.Uvarint(b) + return nil + } + if resp.StatusCode == http.StatusTemporaryRedirect { + + address := resp.Header.Get("Location") + log.Debugf("Send Join Request to %s", address) + + json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), server.url, e.url)) + + resp, req, err = t.Post(address, &b) + + } else if resp.StatusCode == http.StatusBadRequest { + debug("Reach max number machines in the cluster") + decoder := json.NewDecoder(resp.Body) + err := &etcdErr.Error{} + decoder.Decode(err) + return *err + } else { + return fmt.Errorf("Unable to join") + } + } + + } + return fmt.Errorf("Unable to join: %v", err) +} + +func (s *PeerServer) Stats() []byte { + s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String() + + queue := s.serverStats.sendRateQueue + + s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate() + + queue = s.serverStats.recvRateQueue + + s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate() + + b, _ := json.Marshal(s.serverStats) + + return b +} + +func (s *PeerServer) PeerStats() []byte { + if s.State() == raft.Leader { + b, _ := json.Marshal(s.followersStats) + return b + } + return nil +} + +func (s *PeerServer) monitorSnapshot() { + for { + time.Sleep(s.snapConf.checkingInterval) + currentWrites := 0 + if uint64(currentWrites) > s.snapConf.writesThr { + r.TakeSnapshot() + s.snapConf.lastWrites = 0 + } + } +} diff --git a/server/registry.go b/server/registry.go new file mode 100644 index 000000000..cd7078d81 --- /dev/null +++ b/server/registry.go @@ -0,0 +1,167 @@ +package server + +import ( + "sync" + + "github.com/coreos/etcd/store" +) + +// The location of the machine URL data. +const RegistryKey = "/_etcd/machines" + +// The Registry stores URL information for nodes. +type Registry struct { + sync.Mutex + store *store.Store + nodes map[string]*node +} + +// The internal storage format of the registry. +type node struct { + peerVersion string + peerURL string + url string +} + +// Creates a new Registry. +func NewRegistry(s *store.Store) *Registry { + return &Registry{ + store: s, + nodes: make(map[string]*node), + } +} + +// Adds a node to the registry. +func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) { + r.Lock() + defer r.Unlock() + + // Write data to store. + key := path.Join(RegistryKey, name) + value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion) + r.store.Create(key, value, false, false, store.Permanent, commitIndex, term) +} + +// Removes a node from the registry. +func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error { + r.Lock() + defer r.Unlock() + + // Remove the key from the store. + _, err := s.Delete(path.Join(RegistryKey, name), false, commitIndex, term) + return err +} + +// Returns the number of nodes in the cluster. +func (r *Registry) Count() int { + e, err := s.Get(RegistryKey, false, false, 0, 0) + if err != nil { + return 0 + } + return len(e.KVPairs) +} + +// Retrieves the URL for a given node by name. +func (r *Registry) URL(name string) (string, bool) { + r.Lock() + defer r.Unlock() + return r.url(name) +} + +func (r *Registry) url(name string) (string, bool) { + if r.nodes[name] == nil { + r.load(name) + } + + if node := r.nodes[name]; node != nil { + return node.url, true + } + + return "", false +} + +// Retrieves the URLs for all nodes. +func (r *Registry) URLs() []string { + r.Lock() + defer r.Unlock() + + // Retrieve a list of all nodes. + e, err := s.Get(RegistryKey, false, false, 0, 0) + if err != nil { + return make([]string, 0) + } + + // Lookup the URL for each one. + urls := make([]string, 0) + for _, pair := range e.KVPairs { + urls = append(urls, r.url(pair.Key)) + } + + return urls +} + + +// Retrieves the peer URL for a given node by name. +func (r *Registry) PeerURL(name string) (string, bool) { + r.Lock() + defer r.Unlock() + return r.peerURL(name) +} + +func (r *Registry) peerURL(name string) (string, bool) { + if r.nodes[name] == nil { + r.load(name) + } + + if node := r.nodes[name]; node != nil { + return node.peerURL, true + } + + return "", false +} + +// Retrieves the peer URLs for all nodes. +func (r *Registry) PeerURLs() []string { + r.Lock() + defer r.Unlock() + + // Retrieve a list of all nodes. + e, err := s.Get(RegistryKey, false, false, 0, 0) + if err != nil { + return make([]string, 0) + } + + // Lookup the URL for each one. + urls := make([]string, 0) + for _, pair := range e.KVPairs { + urls = append(urls, r.peerURL(pair.Key)) + } + + return urls +} + +// Loads the given node by name from the store into the cache. +func (r *Registry) load(name string) { + if name == "" { + return + } + + // Retrieve from store. + e, err := etcdStore.Get(path.Join(RegistryKey, name), false, false, 0, 0) + if err != nil { + return + } + + // Parse as a query string. + m, err := url.ParseQuery(e.Value) + if err != nil { + panic(fmt.Sprintf("Failed to parse machines entry: %s", name)) + } + + // Create node. + r.nodes[name] := &node{ + url: m["etcd"][0], + peerURL: m["raft"][0], + peerVersion: m["raftVersion"][0], + } +} diff --git a/server/remove_command.go b/server/remove_command.go index a992de67c..43ff17d24 100644 --- a/server/remove_command.go +++ b/server/remove_command.go @@ -25,22 +25,20 @@ func (c *RemoveCommand) CommandName() string { // Remove a server from the cluster func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { s, _ := server.StateMachine().(*store.Store) - r, _ := server.Context().(*RaftServer) + ps, _ := server.Context().(*PeerServer) - // remove machine in etcd storage - key := path.Join("_etcd/machines", c.Name) + // Remove node from the shared registry. + err := ps.registry.Unregister(c.Name, server.CommitIndex(), server.Term()) - _, err := s.Delete(key, false, server.CommitIndex(), server.Term()) - // delete from stats - delete(r.followersStats.Followers, c.Name) + // Delete from stats + delete(ps.followersStats.Followers, c.Name) if err != nil { return []byte{0}, err } - // remove peer in raft + // Remove peer in raft err = server.RemovePeer(c.Name) - if err != nil { return []byte{0}, err } @@ -52,7 +50,7 @@ func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { // and the node has sent out a join request in this // start. It is sure that this node received a new remove // command and need to be removed - if server.CommitIndex() > r.joinIndex && r.joinIndex != 0 { + if server.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { debugf("server [%s] is removed", server.Name()) os.Exit(0) } else { diff --git a/server/server.go b/server/server.go index fad643aec..5835e1a12 100644 --- a/server/server.go +++ b/server/server.go @@ -4,7 +4,6 @@ import ( "net/http" "net/url" - "github.com/coreos/etcd/command" "github.com/coreos/go-raft" "github.com/gorilla/mux" ) @@ -13,7 +12,7 @@ import ( type Server interface { CommitIndex() uint64 Term() uint64 - Dispatch(command.Command, http.ResponseWriter, *http.Request) + Dispatch(raft.Command, http.ResponseWriter, *http.Request) } // This is the default implementation of the Server interface. diff --git a/server/timeout.go b/server/timeout.go new file mode 100644 index 000000000..35b49b630 --- /dev/null +++ b/server/timeout.go @@ -0,0 +1,11 @@ +package server + +const ( + // The amount of time to elapse without a heartbeat before becoming a candidate. + ElectionTimeout = 200 * time.Millisecond + + // The frequency by which heartbeats are sent to followers. + HeartbeatTimeout = 50 * time.Millisecond + + RetryInterval = 10 +) diff --git a/server/transporter.go b/server/transporter.go new file mode 100644 index 000000000..83f0f07fc --- /dev/null +++ b/server/transporter.go @@ -0,0 +1,226 @@ +package server + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "time" + + "github.com/coreos/go-raft" +) + +// Timeout for setup internal raft http connection +// This should not exceed 3 * RTT +var dailTimeout = 3 * HeartbeatTimeout + +// Timeout for setup internal raft http connection + receive response header +// This should not exceed 3 * RTT + RTT +var responseHeaderTimeout = 4 * HeartbeatTimeout + +// Timeout for receiving the response body from the server +// This should not exceed election timeout +var tranTimeout = ElectionTimeout + +// Transporter layer for communication between raft nodes +type transporter struct { + client *http.Client + transport *http.Transport + raftServer *raftServer +} + +// Create transporter using by raft server +// Create http or https transporter based on +// whether the user give the server cert and key +func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *transporter { + t := transporter{} + + tr := &http.Transport{ + Dial: dialWithTimeout, + ResponseHeaderTimeout: responseHeaderTimeout, + } + + if scheme == "https" { + tr.TLSClientConfig = &tlsConf + tr.DisableCompression = true + } + + t.client = &http.Client{Transport: tr} + t.transport = tr + t.raftServer = raftServer + + return &t +} + +// Dial with timeout +func dialWithTimeout(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, dailTimeout) +} + +// Sends AppendEntries RPCs to a peer when the server is the leader. +func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { + var aersp *raft.AppendEntriesResponse + var b bytes.Buffer + + json.NewEncoder(&b).Encode(req) + + size := b.Len() + + t.raftServer.serverStats.SendAppendReq(size) + + u, _ := nameToRaftURL(peer.Name) + + debugf("Send LogEntries to %s ", u) + + thisFollowerStats, ok := t.raftServer.followersStats.Followers[peer.Name] + + if !ok { //this is the first time this follower has been seen + thisFollowerStats = &raftFollowerStats{} + thisFollowerStats.Latency.Minimum = 1 << 63 + t.raftServer.followersStats.Followers[peer.Name] = thisFollowerStats + } + + start := time.Now() + + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) + + end := time.Now() + + if err != nil { + debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) + if ok { + thisFollowerStats.Fail() + } + } else { + if ok { + thisFollowerStats.Succ(end.Sub(start)) + } + } + + if resp != nil { + defer resp.Body.Close() + + t.CancelWhenTimeout(httpRequest) + + aersp = &raft.AppendEntriesResponse{} + if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + return aersp + } + + } + + return aersp +} + +// Sends RequestVote RPCs to a peer when the server is the candidate. +func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { + var rvrsp *raft.RequestVoteResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + + u, _ := nameToRaftURL(peer.Name) + debugf("Send Vote to %s", u) + + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) + + if err != nil { + debugf("Cannot send VoteRequest to %s : %s", u, err) + } + + if resp != nil { + defer resp.Body.Close() + + t.CancelWhenTimeout(httpRequest) + + rvrsp := &raft.RequestVoteResponse{} + if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { + return rvrsp + } + + } + return rvrsp +} + +// Sends SnapshotRequest RPCs to a peer when the server is the candidate. +func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { + var aersp *raft.SnapshotResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + + u, _ := nameToRaftURL(peer.Name) + debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, + req.LastTerm, req.LastIndex) + + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) + + if err != nil { + debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) + } + + if resp != nil { + defer resp.Body.Close() + + t.CancelWhenTimeout(httpRequest) + + aersp = &raft.SnapshotResponse{} + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + + return aersp + } + } + + return aersp +} + +// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. +func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { + var aersp *raft.SnapshotRecoveryResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + + u, _ := nameToRaftURL(peer.Name) + debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, + req.LastTerm, req.LastIndex) + + resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) + + if err != nil { + debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) + } + + if resp != nil { + defer resp.Body.Close() + aersp = &raft.SnapshotRecoveryResponse{} + + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + return aersp + } + } + + return aersp +} + +// Send server side POST request +func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) { + req, _ := http.NewRequest("POST", urlStr, body) + resp, err := t.client.Do(req) + return resp, req, err +} + +// Send server side GET request +func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) { + req, _ := http.NewRequest("GET", urlStr, nil) + resp, err := t.client.Do(req) + return resp, req, err +} + +// Cancel the on fly HTTP transaction when timeout happens. +func (t *transporter) CancelWhenTimeout(req *http.Request) { + go func() { + time.Sleep(ElectionTimeout) + t.transport.CancelRequest(req) + }() +} diff --git a/transporter_test.go b/server/transporter_test.go similarity index 98% rename from transporter_test.go rename to server/transporter_test.go index 3d9655dbd..e83ea3400 100644 --- a/transporter_test.go +++ b/server/transporter_test.go @@ -1,4 +1,4 @@ -package main +package server import ( "crypto/tls" diff --git a/server/util.go b/server/util.go new file mode 100644 index 000000000..bae347cd3 --- /dev/null +++ b/server/util.go @@ -0,0 +1,17 @@ +package server + +import ( + "fmt" + "net/http" + "github.com/coreos/etcd/log" +) + +func decodeJsonRequest(req *http.Request, data interface{}) error { + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&data); err != nil && err != io.EOF { + log.Warnf("Malformed json request: %v", err) + return fmt.Errorf("Malformed json request: %v", err) + } + return nil +} + diff --git a/server/v2/handlers.go b/server/v2/handlers.go index 0d294de7d..067e5fd38 100644 --- a/server/v2/handlers.go +++ b/server/v2/handlers.go @@ -12,10 +12,6 @@ import ( "github.com/coreos/go-raft" ) -//------------------------------------------------------------------- -// Handlers to handle etcd-store related request via etcd url -//------------------------------------------------------------------- - func NewEtcdMuxer() *http.ServeMux { // external commands router := mux.NewRouter() diff --git a/version.go b/server/version.go similarity index 71% rename from version.go rename to server/version.go index 20d31c916..c9e2a48c0 100644 --- a/version.go +++ b/server/version.go @@ -1,8 +1,8 @@ -package main +package server -const version = "v2" +const Version = "v2" // TODO: The release version (generated from the git tag) will be the raft // protocol version for now. When things settle down we will fix it like the // client API above. -const raftVersion = releaseVersion +const PeerVersion = releaseVersion diff --git a/snapshot.go b/snapshot.go deleted file mode 100644 index 1b2bc728c..000000000 --- a/snapshot.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -import ( - "time" -) - -// basic conf. -// TODO: find a good policy to do snapshot -type snapshotConf struct { - // Etcd will check if snapshot is need every checkingInterval - checkingInterval time.Duration - // The number of writes when the last snapshot happened - lastWrites uint64 - // If the incremental number of writes since the last snapshot - // exceeds the write Threshold, etcd will do a snapshot - writesThr uint64 -} - -var snapConf *snapshotConf - -func (r *raftServer) newSnapshotConf() *snapshotConf { - // check snapshot every 3 seconds and the threshold is 20K - return &snapshotConf{time.Second * 3, 0, 20 * 1000} -} - -func (r *raftServer) monitorSnapshot() { - for { - time.Sleep(snapConf.checkingInterval) - //currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites - currentWrites := 0 - if uint64(currentWrites) > snapConf.writesThr { - r.TakeSnapshot() - snapConf.lastWrites = 0 - } - } -} diff --git a/command/create_command.go b/store/create_command.go similarity index 86% rename from command/create_command.go rename to store/create_command.go index 6dd2c5aba..0263347a6 100644 --- a/command/create_command.go +++ b/store/create_command.go @@ -1,12 +1,15 @@ -package command +package store import ( "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" "github.com/coreos/go-raft" "time" ) +func init() { + raft.RegisterCommand(&CreateCommand{}) +} + // Create command type CreateCommand struct { Key string `json:"key"` @@ -23,7 +26,7 @@ func (c *CreateCommand) CommandName() string { // Create node func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) + s, _ := server.StateMachine().(*Store) e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/command/delete_command.go b/store/delete_command.go similarity index 83% rename from command/delete_command.go rename to store/delete_command.go index a0d03c99d..3ac48bc35 100644 --- a/command/delete_command.go +++ b/store/delete_command.go @@ -1,11 +1,14 @@ -package command +package store import ( "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) +func init() { + raft.RegisterCommand(&DeleteCommand{}) +} + // The DeleteCommand removes a key from the Store. type DeleteCommand struct { Key string `json:"key"` @@ -19,7 +22,7 @@ func (c *DeleteCommand) CommandName() string { // Delete the key func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) + s, _ := server.StateMachine().(*Store) e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) diff --git a/store/store.go b/store/store.go index ddf076280..348aea02e 100644 --- a/store/store.go +++ b/store/store.go @@ -401,16 +401,6 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) { return n, nil } -// Returns the number of machines in the cluster. -func (s *Store) MachineCount() int { - e, err := s.Get("/_etcd/machines", false, false, 0, 0) - if err != nil { - return 0 - } - - return len(e.KVPairs) -} - // Save function saves the static state of the store system. // Save function will not be able to save the state of watchers. // Save function will not save the parent field of the node. Or there will diff --git a/command/test_and_set_command.go b/store/test_and_set_command.go similarity index 88% rename from command/test_and_set_command.go rename to store/test_and_set_command.go index 4d723e221..cf4167d1c 100644 --- a/command/test_and_set_command.go +++ b/store/test_and_set_command.go @@ -1,13 +1,16 @@ -package command +package store import ( "time" "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) +func init() { + raft.RegisterCommand(&TestAndSetCommand{}) +} + // The TestAndSetCommand performs a conditional update on a key in the store. type TestAndSetCommand struct { Key string `json:"key"` @@ -24,7 +27,7 @@ func (c *TestAndSetCommand) CommandName() string { // Set the key-value pair if the current value of the key equals to the given prevValue func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) + s, _ := server.StateMachine().(*Store) e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/command/update_command.go b/store/update_command.go similarity index 85% rename from command/update_command.go rename to store/update_command.go index 245e3c1c7..694be9844 100644 --- a/command/update_command.go +++ b/store/update_command.go @@ -1,13 +1,16 @@ -package command +package store import ( "time" "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) +func init() { + raft.RegisterCommand(&UpdateCommand{}) +} + // The UpdateCommand updates the value of a key in the Store. type UpdateCommand struct { Key string `json:"key"` @@ -22,7 +25,7 @@ func (c *UpdateCommand) CommandName() string { // Update node func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) + s, _ := server.StateMachine().(*Store) e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) diff --git a/transporter.go b/transporter.go deleted file mode 100644 index 19a09c73e..000000000 --- a/transporter.go +++ /dev/null @@ -1,233 +0,0 @@ -package main - -import ( - "bytes" - "crypto/tls" - "encoding/json" - "fmt" - "io" - "net" - "net/http" - "time" - - "github.com/coreos/go-raft" -) - -// Timeout for setup internal raft http connection -// This should not exceed 3 * RTT -var dailTimeout = 3 * HeartbeatTimeout - -// Timeout for setup internal raft http connection + receive response header -// This should not exceed 3 * RTT + RTT -var responseHeaderTimeout = 4 * HeartbeatTimeout - -// Timeout for receiving the response body from the server -// This should not exceed election timeout -var tranTimeout = ElectionTimeout - -// Transporter layer for communication between raft nodes -type transporter struct { - client *http.Client - transport *http.Transport - raftServer *raftServer -} - -// Create transporter using by raft server -// Create http or https transporter based on -// whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *transporter { - t := transporter{} - - tr := &http.Transport{ - Dial: dialWithTimeout, - ResponseHeaderTimeout: responseHeaderTimeout, - } - - if scheme == "https" { - tr.TLSClientConfig = &tlsConf - tr.DisableCompression = true - } - - t.client = &http.Client{Transport: tr} - t.transport = tr - t.raftServer = raftServer - - return &t -} - -// Dial with timeout -func dialWithTimeout(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, dailTimeout) -} - -// Sends AppendEntries RPCs to a peer when the server is the leader. -func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { - var aersp *raft.AppendEntriesResponse - var b bytes.Buffer - - json.NewEncoder(&b).Encode(req) - - size := b.Len() - - t.raftServer.serverStats.SendAppendReq(size) - - u, _ := nameToRaftURL(peer.Name) - - debugf("Send LogEntries to %s ", u) - - thisFollowerStats, ok := t.raftServer.followersStats.Followers[peer.Name] - - if !ok { //this is the first time this follower has been seen - thisFollowerStats = &raftFollowerStats{} - thisFollowerStats.Latency.Minimum = 1 << 63 - t.raftServer.followersStats.Followers[peer.Name] = thisFollowerStats - } - - start := time.Now() - - resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) - - end := time.Now() - - if err != nil { - debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) - if ok { - thisFollowerStats.Fail() - } - } else { - if ok { - thisFollowerStats.Succ(end.Sub(start)) - } - } - - if resp != nil { - defer resp.Body.Close() - - t.CancelWhenTimeout(httpRequest) - - aersp = &raft.AppendEntriesResponse{} - if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp - } - - } - - return aersp -} - -// Sends RequestVote RPCs to a peer when the server is the candidate. -func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { - var rvrsp *raft.RequestVoteResponse - var b bytes.Buffer - json.NewEncoder(&b).Encode(req) - - u, _ := nameToRaftURL(peer.Name) - debugf("Send Vote to %s", u) - - resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) - - if err != nil { - debugf("Cannot send VoteRequest to %s : %s", u, err) - } - - if resp != nil { - defer resp.Body.Close() - - t.CancelWhenTimeout(httpRequest) - - rvrsp := &raft.RequestVoteResponse{} - if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { - return rvrsp - } - - } - return rvrsp -} - -// Sends SnapshotRequest RPCs to a peer when the server is the candidate. -func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { - var aersp *raft.SnapshotResponse - var b bytes.Buffer - json.NewEncoder(&b).Encode(req) - - u, _ := nameToRaftURL(peer.Name) - debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) - - resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) - - if err != nil { - debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) - } - - if resp != nil { - defer resp.Body.Close() - - t.CancelWhenTimeout(httpRequest) - - aersp = &raft.SnapshotResponse{} - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - - return aersp - } - } - - return aersp -} - -// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. -func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { - var aersp *raft.SnapshotRecoveryResponse - var b bytes.Buffer - json.NewEncoder(&b).Encode(req) - - u, _ := nameToRaftURL(peer.Name) - debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) - - resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) - - if err != nil { - debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) - } - - if resp != nil { - defer resp.Body.Close() - aersp = &raft.SnapshotRecoveryResponse{} - - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp - } - } - - return aersp -} - -// Send server side POST request -func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) { - - req, _ := http.NewRequest("POST", urlStr, body) - - resp, err := t.client.Do(req) - - return resp, req, err - -} - -// Send server side GET request -func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) { - - req, _ := http.NewRequest("GET", urlStr, nil) - - resp, err := t.client.Do(req) - - return resp, req, err -} - -// Cancel the on fly HTTP transaction when timeout happens -func (t *transporter) CancelWhenTimeout(req *http.Request) { - go func() { - time.Sleep(ElectionTimeout) - t.transport.CancelRequest(req) - }() -} diff --git a/util.go b/util.go index fade01ec6..e9b534085 100644 --- a/util.go +++ b/util.go @@ -88,33 +88,11 @@ func (r *raftServer) dispatch(c Command, w http.ResponseWriter, req *http.Reques func redirect(hostname string, w http.ResponseWriter, req *http.Request) { path := req.URL.Path - url := hostname + path - debugf("Redirect to %s", url) - http.Redirect(w, req, url, http.StatusTemporaryRedirect) } -func decodeJsonRequest(req *http.Request, data interface{}) error { - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&data); err != nil && err != io.EOF { - warnf("Malformed json request: %v", err) - return fmt.Errorf("Malformed json request: %v", err) - } - return nil -} - -func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) - - if data != nil { - encoder := json.NewEncoder(w) - encoder.Encode(data) - } -} - // sanitizeURL will cleanup a host string in the format hostname:port and // attach a schema. func sanitizeURL(host string, defaultScheme string) string {