package server import ( "bytes" "encoding/json" "fmt" "io/ioutil" "math/rand" "net/http" "net/url" "sort" "strconv" "time" "github.com/coreos/etcd/third_party/github.com/goraft/raft" "github.com/coreos/etcd/third_party/github.com/gorilla/mux" "github.com/coreos/etcd/discovery" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/log" "github.com/coreos/etcd/metrics" "github.com/coreos/etcd/store" ) const ( // ThresholdMonitorTimeout is the time between log notifications that the // Raft heartbeat is too close to the election timeout. ThresholdMonitorTimeout = 5 * time.Second // ActiveMonitorTimeout is the time between checks on the active size of // the cluster. If the active size is different than the actual size then // etcd attempts to promote/demote to bring it to the correct number. ActiveMonitorTimeout = 1 * time.Second // PeerActivityMonitorTimeout is the time between checks for dead nodes in // the cluster. PeerActivityMonitorTimeout = 1 * time.Second ) const ( peerModeFlag = 0 proxyModeFlag = 1 ) type PeerServerConfig struct { Name string Scheme string URL string SnapshotCount int RetryTimes int RetryInterval float64 } type PeerServer struct { Config PeerServerConfig clusterConfig *ClusterConfig raftServer raft.Server server *Server joinIndex uint64 followersStats *raftFollowersStats serverStats *raftServerStats registry *Registry store store.Store snapConf *snapshotConf mode Mode closeChan chan bool timeoutThresholdChan chan interface{} proxyPeerURL string proxyClientURL string metrics *metrics.Bucket } // TODO: find a good policy to do snapshot type snapshotConf struct { // Etcd will check if snapshot is need every checkingInterval checkingInterval time.Duration // The index when the last snapshot happened lastIndex uint64 // If the incremental number of index since the last snapshot // exceeds the snapshot Threshold, etcd will do a snapshot snapshotThr uint64 } func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { s := &PeerServer{ Config: psConfig, clusterConfig: NewClusterConfig(), registry: registry, store: store, followersStats: followersStats, serverStats: serverStats, timeoutThresholdChan: make(chan interface{}, 1), metrics: mb, } return s } func (s *PeerServer) SetRaftServer(raftServer raft.Server) { s.snapConf = &snapshotConf{ checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api lastIndex: raftServer.CommitIndex(), snapshotThr: uint64(s.Config.SnapshotCount), } raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger) raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger) raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger) raftServer.AddEventListener(raft.HeartbeatIntervalEventType, s.raftEventLogger) raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger) raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) s.raftServer = raftServer } // Mode retrieves the current mode of the server. func (s *PeerServer) Mode() Mode { return s.mode } // SetMode updates the current mode of the server. // Switching to a peer mode will start the Raft server. // Switching to a proxy mode will stop the Raft server. func (s *PeerServer) setMode(mode Mode) { s.mode = mode switch mode { case PeerMode: if !s.raftServer.Running() { s.raftServer.Start() } case ProxyMode: if s.raftServer.Running() { s.raftServer.Stop() } } } // ClusterConfig retrieves the current cluster configuration. func (s *PeerServer) ClusterConfig() *ClusterConfig { return s.clusterConfig } // SetClusterConfig updates the current cluster configuration. // Adjusting the active size will cause the PeerServer to demote peers or // promote proxies to match the new size. func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { // Set minimums. if c.ActiveSize < MinActiveSize { c.ActiveSize = MinActiveSize } if c.PromoteDelay < MinPromoteDelay { c.PromoteDelay = MinPromoteDelay } s.clusterConfig = c } // Helper function to do discovery and return results in expected format func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL) // Warn about errors coming from discovery, this isn't fatal // since the user might have provided a peer list elsewhere, // or there is some log in data dir. if err != nil { log.Warnf("Discovery encountered an error: %v", err) return } for i := range peers { // Strip the scheme off of the peer if it has one // TODO(bp): clean this up! purl, err := url.Parse(peers[i]) if err == nil { peers[i] = purl.Host } } log.Infof("Discovery fetched back peer list: %v", peers) return } // Try all possible ways to find clusters to join // Include -discovery, -peers and log data in -data-dir // // Peer discovery follows this order: // 1. -discovery // 2. -peers // 3. previous peers in -data-dir // RaftServer should be started as late as possible. Current implementation // to start it is not that good, and will be refactored in #627. func (s *PeerServer) findCluster(discoverURL string, peers []string) { // Attempt cluster discovery toDiscover := discoverURL != "" if toDiscover { discoverPeers, discoverErr := s.handleDiscovery(discoverURL) // It is registered in discover url if discoverErr == nil { // start as a leader in a new cluster if len(discoverPeers) == 0 { log.Debug("This peer is starting a brand new cluster based on discover URL.") s.startAsLeader() } else { s.startAsFollower(discoverPeers) } return } } hasPeerList := len(peers) > 0 // if there is log in data dir, append previous peers to peers in config // to find cluster prevPeers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) for i := 0; i < len(prevPeers); i++ { u, err := url.Parse(prevPeers[i]) if err != nil { log.Debug("rejoin cannot parse url: ", err) } prevPeers[i] = u.Host } peers = append(peers, prevPeers...) // Remove its own peer address from the peer list to join u, err := url.Parse(s.Config.URL) if err != nil { log.Fatalf("cannot parse peer address %v: %v", s.Config.URL, err) } filteredPeers := make([]string, 0) for _, v := range peers { if v != u.Host { filteredPeers = append(filteredPeers, v) } } peers = filteredPeers // if there is backup peer lists, use it to find cluster if len(peers) > 0 { ok := s.joinCluster(peers) if !ok { log.Warn("No living peers are found!") } else { s.raftServer.Start() log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name) return } } if !s.raftServer.IsLogEmpty() { log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name) s.raftServer.Start() return } if toDiscover { log.Fatalf("Discovery failed, no available peers in backup list, and no log data") } if hasPeerList { log.Fatalf("No available peers in backup list, and no log data") } log.Infof("This peer is starting a brand new cluster now.") s.startAsLeader() } // Start the raft server func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error { // LoadSnapshot if snapshot { err := s.raftServer.LoadSnapshot() if err == nil { log.Debugf("%s finished load snapshot", s.Config.Name) } else { log.Debug(err) } } s.raftServer.Init() s.findCluster(discoverURL, peers) s.closeChan = make(chan bool) go s.monitorSync() go s.monitorTimeoutThreshold(s.closeChan) go s.monitorActiveSize(s.closeChan) go s.monitorPeerActivity(s.closeChan) // open the snapshot if snapshot { go s.monitorSnapshot() } return nil } func (s *PeerServer) Stop() { if s.closeChan != nil { close(s.closeChan) s.closeChan = nil } s.raftServer.Stop() } func (s *PeerServer) HTTPHandler() http.Handler { router := mux.NewRouter() // internal commands router.HandleFunc("/name", s.NameHttpHandler) router.HandleFunc("/version", s.VersionHttpHandler) router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler) router.HandleFunc("/upgrade", s.UpgradeHttpHandler) router.HandleFunc("/join", s.JoinHttpHandler) router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST") router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) router.HandleFunc("/vote", s.VoteHttpHandler) router.HandleFunc("/log", s.GetLogHttpHandler) router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) router.HandleFunc("/snapshot", s.SnapshotHttpHandler) router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) router.HandleFunc("/v2/admin/config", s.getClusterConfigHttpHandler).Methods("GET") router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT") router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET") router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET") router.HandleFunc("/v2/admin/machines/{name}", s.addMachineHttpHandler).Methods("PUT") router.HandleFunc("/v2/admin/machines/{name}", s.removeMachineHttpHandler).Methods("DELETE") return router } // Retrieves the underlying Raft server. func (s *PeerServer) RaftServer() raft.Server { return s.raftServer } // Associates the client server with the peer server. func (s *PeerServer) SetServer(server *Server) { s.server = server } func (s *PeerServer) startAsLeader() { s.raftServer.Start() // leader need to join self as a peer for { c := &JoinCommandV1{ MinVersion: store.MinVersion(), MaxVersion: store.MaxVersion(), Name: s.raftServer.Name(), RaftURL: s.Config.URL, EtcdURL: s.server.URL(), } _, err := s.raftServer.Do(c) if err == nil { break } } log.Debugf("%s start as a leader", s.Config.Name) } func (s *PeerServer) startAsFollower(cluster []string) { // start as a follower in a existing cluster for i := 0; i < s.Config.RetryTimes; i++ { ok := s.joinCluster(cluster) if ok { s.raftServer.Start() return } log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) time.Sleep(time.Second * time.Duration(s.Config.RetryInterval)) } log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) } // getVersion fetches the peer version of a cluster. func getVersion(t *transporter, versionURL url.URL) (int, error) { resp, _, err := t.Get(versionURL.String()) if err != nil { return 0, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return 0, err } // Parse version number. version, _ := strconv.Atoi(string(body)) return version, nil } // Upgradable checks whether all peers in a cluster support an upgrade to the next store version. func (s *PeerServer) Upgradable() error { nextVersion := s.store.Version() + 1 for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) { u, err := url.Parse(peerURL) if err != nil { return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err) } t, _ := s.raftServer.Transporter().(*transporter) checkURL := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String() resp, _, err := t.Get(checkURL) if err != nil { return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host) } if resp.StatusCode != 200 { return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host) } } return nil } func (s *PeerServer) joinCluster(cluster []string) bool { for _, peer := range cluster { if len(peer) == 0 { continue } err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme) if err == nil { log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer) return true } if _, ok := err.(etcdErr.Error); ok { log.Fatal(err) } log.Warnf("Attempt to join via %s failed: %s", peer, err) } return false } // Send join requests to peer. func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error { // t must be ok t, _ := server.Transporter().(*transporter) // Our version must match the leaders version versionURL := url.URL{Host: peer, Scheme: scheme, Path: "/version"} version, err := getVersion(t, versionURL) if err != nil { return fmt.Errorf("Error during join version check: %v", err) } if version < store.MinVersion() || version > store.MaxVersion() { return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion()) } var b bytes.Buffer c := &JoinCommandV2{ MinVersion: store.MinVersion(), MaxVersion: store.MaxVersion(), Name: server.Name(), PeerURL: s.Config.URL, ClientURL: s.server.URL(), } json.NewEncoder(&b).Encode(c) joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/v2/admin/machines/" + server.Name()} log.Infof("Send Join Request to %s", joinURL.String()) req, _ := http.NewRequest("PUT", joinURL.String(), &b) resp, err := t.client.Do(req) for { if err != nil { return fmt.Errorf("Unable to join: %v", err) } if resp != nil { defer resp.Body.Close() log.Infof("»»»» %d", resp.StatusCode) if resp.StatusCode == http.StatusOK { var msg joinMessageV2 if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil { log.Debugf("Error reading join response: %v", err) return err } s.joinIndex = msg.CommitIndex s.setMode(msg.Mode) if msg.Mode == ProxyMode { s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL") s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL") } return nil } if resp.StatusCode == http.StatusTemporaryRedirect { address := resp.Header.Get("Location") log.Debugf("Send Join Request to %s", address) c := &JoinCommandV1{ MinVersion: store.MinVersion(), MaxVersion: store.MaxVersion(), Name: server.Name(), RaftURL: s.Config.URL, EtcdURL: s.server.URL(), } json.NewEncoder(&b).Encode(c) resp, _, err = t.Post(address, &b) } else if resp.StatusCode == http.StatusBadRequest { log.Debug("Reach max number peers in the cluster") decoder := json.NewDecoder(resp.Body) err := &etcdErr.Error{} decoder.Decode(err) return *err } else { return fmt.Errorf("Unable to join") } } } } func (s *PeerServer) Stats() []byte { s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String() // TODO: register state listener to raft to change this field // rather than compare the state each time Stats() is called. if s.RaftServer().State() == raft.Leader { s.serverStats.LeaderInfo.Name = s.RaftServer().Name() } queue := s.serverStats.sendRateQueue s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate() queue = s.serverStats.recvRateQueue s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate() b, _ := json.Marshal(s.serverStats) return b } func (s *PeerServer) PeerStats() []byte { if s.raftServer.State() == raft.Leader { b, _ := json.Marshal(s.followersStats) return b } return nil } // raftEventLogger converts events from the Raft server into log messages. func (s *PeerServer) raftEventLogger(event raft.Event) { value := event.Value() prevValue := event.PrevValue() if value == nil { value = "" } if prevValue == nil { prevValue = "" } switch event.Type() { case raft.StateChangeEventType: log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value) case raft.TermChangeEventType: log.Infof("%s: term #%v started.", s.Config.Name, value) case raft.LeaderChangeEventType: log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value) case raft.AddPeerEventType: log.Infof("%s: peer added: '%v'", s.Config.Name, value) case raft.RemovePeerEventType: log.Infof("%s: peer removed: '%v'", s.Config.Name, value) case raft.HeartbeatIntervalEventType: var name = "" if peer, ok := value.(*raft.Peer); ok { name = peer.Name } log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name) case raft.ElectionTimeoutThresholdEventType: select { case s.timeoutThresholdChan <- value: default: } } } func (s *PeerServer) recordMetricEvent(event raft.Event) { name := fmt.Sprintf("raft.event.%s", event.Type()) value := event.Value().(time.Duration) (*s.metrics).Timer(name).Update(value) } // logSnapshot logs about the snapshot that was taken. func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) { info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.Config.Name, count, currentIndex) if err != nil { log.Infof("%s attempted and failed: %v", info, err) } else { log.Infof("%s completed", info) } } func (s *PeerServer) monitorSnapshot() { for { time.Sleep(s.snapConf.checkingInterval) currentIndex := s.RaftServer().CommitIndex() count := currentIndex - s.snapConf.lastIndex if uint64(count) > s.snapConf.snapshotThr { err := s.raftServer.TakeSnapshot() s.logSnapshot(err, currentIndex, count) s.snapConf.lastIndex = currentIndex } } } func (s *PeerServer) monitorSync() { ticker := time.Tick(time.Millisecond * 500) for { select { case now := <-ticker: if s.raftServer.State() == raft.Leader { s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now)) } } } } // monitorTimeoutThreshold groups timeout threshold events together and prints // them as a single log line. func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { for { select { case value := <-s.timeoutThresholdChan: log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value) case <-closeChan: return } time.Sleep(ThresholdMonitorTimeout) } } // monitorActiveSize has the leader periodically check the status of cluster // nodes and swaps them out for proxies as needed. func (s *PeerServer) monitorActiveSize(closeChan chan bool) { for { select { case <-time.After(ActiveMonitorTimeout): case <-closeChan: return } // Ignore while this peer is not a leader. if s.raftServer.State() != raft.Leader { continue } // Retrieve target active size and actual active size. activeSize := s.ClusterConfig().ActiveSize peerCount := s.registry.PeerCount() proxies := s.registry.Proxies() peers := s.registry.Peers() if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { peers = append(peers[:index], peers[index+1:]...) } // If we have more active nodes than we should then demote. if peerCount > activeSize { peer := peers[rand.Intn(len(peers))] log.Infof("%s: demoting: %v", s.Config.Name, peer) if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil { log.Infof("%s: warning: demotion error: %v", s.Config.Name, err) } continue } // If we don't have enough active nodes then try to promote a proxy. if peerCount < activeSize && len(proxies) > 0 { loop: for _, i := range rand.Perm(len(proxies)) { proxy := proxies[i] proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy) log.Infof("%s: attempting to promote: %v (%s)", s.Config.Name, proxy, proxyPeerURL) // Notify proxy to promote itself. client := &http.Client{ Transport: &http.Transport{ DisableKeepAlives: false, ResponseHeaderTimeout: ActiveMonitorTimeout, }, } resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil) if err != nil { log.Infof("%s: warning: promotion error: %v", s.Config.Name, err) continue } else if resp.StatusCode != http.StatusOK { log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode) continue } break loop } } } } // monitorPeerActivity has the leader periodically for dead nodes and demotes them. func (s *PeerServer) monitorPeerActivity(closeChan chan bool) { for { select { case <-time.After(PeerActivityMonitorTimeout): case <-closeChan: return } // Ignore while this peer is not a leader. if s.raftServer.State() != raft.Leader { continue } // Check last activity for all peers. now := time.Now() promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second peers := s.raftServer.Peers() for _, peer := range peers { // If the last response from the peer is longer than the promote delay // then automatically demote the peer. if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay { log.Infof("%s: demoting node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) if _, err := s.raftServer.Do(&DemoteCommand{Name: peer.Name}); err != nil { log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err) } continue } } } } // Mode represents whether the server is an active peer or if the server is // simply acting as a proxy. type Mode string const ( // PeerMode is when the server is an active node in Raft. PeerMode = Mode("peer") // ProxyMode is when the server is an inactive, request-forwarding node. ProxyMode = Mode("proxy") )