mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
791 lines
22 KiB
Go
791 lines
22 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
|
|
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
|
|
|
|
"github.com/coreos/etcd/discovery"
|
|
etcdErr "github.com/coreos/etcd/error"
|
|
"github.com/coreos/etcd/log"
|
|
"github.com/coreos/etcd/metrics"
|
|
"github.com/coreos/etcd/pkg/btrfs"
|
|
"github.com/coreos/etcd/store"
|
|
)
|
|
|
|
const (
|
|
// ThresholdMonitorTimeout is the time between log notifications that the
|
|
// Raft heartbeat is too close to the election timeout.
|
|
ThresholdMonitorTimeout = 5 * time.Second
|
|
|
|
// ActiveMonitorTimeout is the time between checks on the active size of
|
|
// the cluster. If the active size is bigger than the actual size then
|
|
// etcd attempts to demote to bring it to the correct number.
|
|
ActiveMonitorTimeout = 1 * time.Second
|
|
|
|
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
|
|
// the cluster.
|
|
PeerActivityMonitorTimeout = 1 * time.Second
|
|
)
|
|
|
|
type PeerServerConfig struct {
|
|
Name string
|
|
Scheme string
|
|
URL string
|
|
SnapshotCount int
|
|
RetryTimes int
|
|
RetryInterval float64
|
|
}
|
|
|
|
type PeerServer struct {
|
|
Config PeerServerConfig
|
|
clusterConfig *ClusterConfig
|
|
raftServer raft.Server
|
|
server *Server
|
|
joinIndex uint64
|
|
followersStats *raftFollowersStats
|
|
serverStats *raftServerStats
|
|
registry *Registry
|
|
store store.Store
|
|
snapConf *snapshotConf
|
|
|
|
closeChan chan bool
|
|
routineGroup sync.WaitGroup
|
|
timeoutThresholdChan chan interface{}
|
|
|
|
metrics *metrics.Bucket
|
|
sync.Mutex
|
|
}
|
|
|
|
// TODO: find a good policy to do snapshot
|
|
type snapshotConf struct {
|
|
// Etcd will check if snapshot is need every checkingInterval
|
|
checkingInterval time.Duration
|
|
|
|
// The index when the last snapshot happened
|
|
lastIndex uint64
|
|
|
|
// If the incremental number of index since the last snapshot
|
|
// exceeds the snapshot Threshold, etcd will do a snapshot
|
|
snapshotThr uint64
|
|
}
|
|
|
|
func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
|
|
s := &PeerServer{
|
|
Config: psConfig,
|
|
clusterConfig: NewClusterConfig(),
|
|
registry: registry,
|
|
store: store,
|
|
followersStats: followersStats,
|
|
serverStats: serverStats,
|
|
|
|
timeoutThresholdChan: make(chan interface{}, 1),
|
|
|
|
metrics: mb,
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
|
|
s.snapConf = &snapshotConf{
|
|
checkingInterval: time.Second * 3,
|
|
// this is not accurate, we will update raft to provide an api
|
|
lastIndex: raftServer.CommitIndex(),
|
|
snapshotThr: uint64(s.Config.SnapshotCount),
|
|
}
|
|
|
|
raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.HeartbeatIntervalEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
|
|
|
|
raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
|
|
|
|
s.raftServer = raftServer
|
|
}
|
|
|
|
// ClusterConfig retrieves the current cluster configuration.
|
|
func (s *PeerServer) ClusterConfig() *ClusterConfig {
|
|
return s.clusterConfig
|
|
}
|
|
|
|
// SetClusterConfig updates the current cluster configuration.
|
|
// Adjusting the active size will cause the PeerServer to demote peers or
|
|
// promote standbys to match the new size.
|
|
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
|
|
// Set minimums.
|
|
if c.ActiveSize < MinActiveSize {
|
|
c.ActiveSize = MinActiveSize
|
|
}
|
|
if c.PromoteDelay < MinPromoteDelay {
|
|
c.PromoteDelay = MinPromoteDelay
|
|
}
|
|
|
|
s.clusterConfig = c
|
|
}
|
|
|
|
// Try all possible ways to find clusters to join
|
|
// Include log data in -data-dir, -discovery and -peers
|
|
//
|
|
// Peer discovery follows this order:
|
|
// 1. previous peers in -data-dir
|
|
// 2. -discovery
|
|
// 3. -peers
|
|
//
|
|
// TODO(yichengq): RaftServer should be started as late as possible.
|
|
// Current implementation to start it is not that good,
|
|
// and should be refactored later.
|
|
func (s *PeerServer) findCluster(discoverURL string, peers []string) {
|
|
name := s.Config.Name
|
|
isNewNode := s.raftServer.IsLogEmpty()
|
|
|
|
// Try its best to find possible peers, and connect with them.
|
|
if !isNewNode {
|
|
// It is not allowed to join the cluster with existing peer address
|
|
// This prevents old node joining with different name by mistake.
|
|
if !s.checkPeerAddressNonconflict() {
|
|
log.Fatalf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
|
|
}
|
|
|
|
// Take old nodes into account.
|
|
allPeers := s.getKnownPeers()
|
|
// Discover registered peers.
|
|
// TODO(yichengq): It may mess up discoverURL if this is
|
|
// set wrong by mistake. This may need to refactor discovery
|
|
// module. Fix it later.
|
|
if discoverURL != "" {
|
|
discoverPeers, _ := s.handleDiscovery(discoverURL)
|
|
allPeers = append(allPeers, discoverPeers...)
|
|
}
|
|
allPeers = append(allPeers, peers...)
|
|
allPeers = s.removeSelfFromList(allPeers)
|
|
|
|
// If there is possible peer list, use it to find cluster.
|
|
if len(allPeers) > 0 {
|
|
// TODO(yichengq): joinCluster may fail if there's no leader for
|
|
// current cluster. It should wait if the cluster is under
|
|
// leader election, or the node with changed IP cannot join
|
|
// the cluster then.
|
|
if err := s.startAsFollower(allPeers, 1); err == nil {
|
|
log.Debugf("%s joins to the previous cluster %v", name, allPeers)
|
|
return
|
|
}
|
|
|
|
log.Warnf("%s cannot connect to previous cluster %v", name, allPeers)
|
|
}
|
|
|
|
// TODO(yichengq): Think about the action that should be done
|
|
// if it cannot connect any of the previous known node.
|
|
s.raftServer.Start()
|
|
log.Debugf("%s is restarting the cluster %v", name, allPeers)
|
|
return
|
|
}
|
|
|
|
// Attempt cluster discovery
|
|
if discoverURL != "" {
|
|
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
|
|
// It is registered in discover url
|
|
if discoverErr == nil {
|
|
// start as a leader in a new cluster
|
|
if len(discoverPeers) == 0 {
|
|
log.Debugf("%s is starting a new cluster via discover service", name)
|
|
s.startAsLeader()
|
|
} else {
|
|
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
|
|
if err := s.startAsFollower(discoverPeers, s.Config.RetryTimes); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
|
|
|
|
if len(peers) == 0 {
|
|
log.Fatalf("%s, the new leader, must register itself to discovery service as required", name)
|
|
}
|
|
}
|
|
|
|
if len(peers) > 0 {
|
|
if err := s.startAsFollower(peers, s.Config.RetryTimes); err != nil {
|
|
log.Fatalf("%s cannot connect to existing cluster %v", name, peers)
|
|
}
|
|
return
|
|
}
|
|
|
|
log.Infof("%s is starting a new cluster.", s.Config.Name)
|
|
s.startAsLeader()
|
|
return
|
|
}
|
|
|
|
// Start the raft server
|
|
func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
// LoadSnapshot
|
|
if snapshot {
|
|
err := s.raftServer.LoadSnapshot()
|
|
|
|
if err == nil {
|
|
log.Debugf("%s finished load snapshot", s.Config.Name)
|
|
} else {
|
|
log.Debug(err)
|
|
}
|
|
}
|
|
|
|
s.raftServer.Init()
|
|
|
|
// Set NOCOW for data directory in btrfs
|
|
if btrfs.IsBtrfs(s.raftServer.LogPath()) {
|
|
if err := btrfs.SetNOCOWFile(s.raftServer.LogPath()); err != nil {
|
|
log.Warnf("Failed setting NOCOW: %v", err)
|
|
}
|
|
}
|
|
|
|
s.findCluster(discoverURL, peers)
|
|
|
|
s.closeChan = make(chan bool)
|
|
|
|
s.startRoutine(s.monitorSync)
|
|
s.startRoutine(s.monitorTimeoutThreshold)
|
|
s.startRoutine(s.monitorActiveSize)
|
|
s.startRoutine(s.monitorPeerActivity)
|
|
|
|
// open the snapshot
|
|
if snapshot {
|
|
s.startRoutine(s.monitorSnapshot)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *PeerServer) Stop() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
if s.closeChan != nil {
|
|
close(s.closeChan)
|
|
}
|
|
s.raftServer.Stop()
|
|
s.routineGroup.Wait()
|
|
s.closeChan = nil
|
|
}
|
|
|
|
func (s *PeerServer) HTTPHandler() http.Handler {
|
|
router := mux.NewRouter()
|
|
|
|
// internal commands
|
|
router.HandleFunc("/name", s.NameHttpHandler)
|
|
router.HandleFunc("/version", s.VersionHttpHandler)
|
|
router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
|
|
router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
|
|
router.HandleFunc("/join", s.JoinHttpHandler)
|
|
router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
|
|
router.HandleFunc("/vote", s.VoteHttpHandler)
|
|
router.HandleFunc("/log", s.GetLogHttpHandler)
|
|
router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
|
|
router.HandleFunc("/snapshot", s.SnapshotHttpHandler)
|
|
router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
|
|
router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
|
|
|
|
router.HandleFunc("/v2/admin/config", s.getClusterConfigHttpHandler).Methods("GET")
|
|
router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT")
|
|
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
|
|
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
|
|
|
|
return router
|
|
}
|
|
|
|
// Retrieves the underlying Raft server.
|
|
func (s *PeerServer) RaftServer() raft.Server {
|
|
return s.raftServer
|
|
}
|
|
|
|
// Associates the client server with the peer server.
|
|
func (s *PeerServer) SetServer(server *Server) {
|
|
s.server = server
|
|
}
|
|
|
|
func (s *PeerServer) startAsLeader() {
|
|
s.raftServer.Start()
|
|
// leader need to join self as a peer
|
|
for {
|
|
c := &JoinCommand{
|
|
MinVersion: store.MinVersion(),
|
|
MaxVersion: store.MaxVersion(),
|
|
Name: s.raftServer.Name(),
|
|
RaftURL: s.Config.URL,
|
|
EtcdURL: s.server.URL(),
|
|
}
|
|
if _, err := s.raftServer.Do(c); err == nil {
|
|
break
|
|
}
|
|
}
|
|
log.Debugf("%s start as a leader", s.Config.Name)
|
|
}
|
|
|
|
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) error {
|
|
// start as a follower in a existing cluster
|
|
for i := 0; ; i++ {
|
|
ok := s.joinCluster(cluster)
|
|
if ok {
|
|
break
|
|
}
|
|
if i == retryTimes-1 {
|
|
return fmt.Errorf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes)
|
|
}
|
|
log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
|
|
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
|
|
}
|
|
|
|
s.raftServer.Start()
|
|
return nil
|
|
}
|
|
|
|
// getVersion fetches the peer version of a cluster.
|
|
func getVersion(t *transporter, versionURL url.URL) (int, error) {
|
|
resp, _, err := t.Get(versionURL.String())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Parse version number.
|
|
version, _ := strconv.Atoi(string(body))
|
|
return version, nil
|
|
}
|
|
|
|
// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
|
|
func (s *PeerServer) Upgradable() error {
|
|
nextVersion := s.store.Version() + 1
|
|
for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) {
|
|
u, err := url.Parse(peerURL)
|
|
if err != nil {
|
|
return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err)
|
|
}
|
|
|
|
t, _ := s.raftServer.Transporter().(*transporter)
|
|
checkURL := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String()
|
|
resp, _, err := t.Get(checkURL)
|
|
if err != nil {
|
|
return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host)
|
|
}
|
|
if resp.StatusCode != 200 {
|
|
return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkPeerAddressNonconflict checks whether the peer address has existed with different name.
|
|
func (s *PeerServer) checkPeerAddressNonconflict() bool {
|
|
// there exists the (name, peer address) pair
|
|
if peerURL, ok := s.registry.PeerURL(s.Config.Name); ok {
|
|
if peerURL == s.Config.URL {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// check all existing peer addresses
|
|
peerURLs := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
|
for _, peerURL := range peerURLs {
|
|
if peerURL == s.Config.URL {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Helper function to do discovery and return results in expected format
|
|
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
|
|
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.startRoutine)
|
|
|
|
// Warn about errors coming from discovery, this isn't fatal
|
|
// since the user might have provided a peer list elsewhere,
|
|
// or there is some log in data dir.
|
|
if err != nil {
|
|
log.Warnf("Discovery encountered an error: %v", err)
|
|
return
|
|
}
|
|
|
|
for i := range peers {
|
|
// Strip the scheme off of the peer if it has one
|
|
// TODO(bp): clean this up!
|
|
purl, err := url.Parse(peers[i])
|
|
if err == nil {
|
|
peers[i] = purl.Host
|
|
}
|
|
}
|
|
|
|
log.Infof("Discovery fetched back peer list: %v", peers)
|
|
|
|
return
|
|
}
|
|
|
|
// getKnownPeers gets the previous peers from log
|
|
func (s *PeerServer) getKnownPeers() []string {
|
|
peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
|
log.Infof("Peer URLs in log: %s / %s (%s)", s.raftServer.Leader(), s.Config.Name, strings.Join(peers, ","))
|
|
|
|
for i := range peers {
|
|
u, err := url.Parse(peers[i])
|
|
if err != nil {
|
|
log.Debug("getPrevPeers cannot parse url %v", peers[i])
|
|
}
|
|
peers[i] = u.Host
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// removeSelfFromList removes url of the peerServer from the peer list
|
|
func (s *PeerServer) removeSelfFromList(peers []string) []string {
|
|
// Remove its own peer address from the peer list to join
|
|
u, err := url.Parse(s.Config.URL)
|
|
if err != nil {
|
|
log.Fatalf("removeSelfFromList cannot parse peer address %v", s.Config.URL)
|
|
}
|
|
newPeers := make([]string, 0)
|
|
for _, v := range peers {
|
|
if v != u.Host {
|
|
newPeers = append(newPeers, v)
|
|
}
|
|
}
|
|
return newPeers
|
|
}
|
|
|
|
func (s *PeerServer) joinCluster(cluster []string) bool {
|
|
for _, peer := range cluster {
|
|
if len(peer) == 0 {
|
|
continue
|
|
}
|
|
|
|
err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme)
|
|
if err == nil {
|
|
log.Debugf("%s joined the cluster via peer %s", s.Config.Name, peer)
|
|
return true
|
|
|
|
}
|
|
|
|
if _, ok := err.(etcdErr.Error); ok {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
log.Warnf("Attempt to join via %s failed: %s", peer, err)
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Send join requests to peer.
|
|
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) error {
|
|
// t must be ok
|
|
t, _ := server.Transporter().(*transporter)
|
|
|
|
// Our version must match the leaders version
|
|
versionURL := url.URL{Host: peer, Scheme: scheme, Path: "/version"}
|
|
version, err := getVersion(t, versionURL)
|
|
if err != nil {
|
|
return fmt.Errorf("Error during join version check: %v", err)
|
|
}
|
|
if version < store.MinVersion() || version > store.MaxVersion() {
|
|
return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion())
|
|
}
|
|
|
|
var b bytes.Buffer
|
|
c := &JoinCommand{
|
|
MinVersion: store.MinVersion(),
|
|
MaxVersion: store.MaxVersion(),
|
|
Name: server.Name(),
|
|
RaftURL: s.Config.URL,
|
|
EtcdURL: s.server.URL(),
|
|
}
|
|
json.NewEncoder(&b).Encode(c)
|
|
|
|
joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"}
|
|
log.Infof("Send Join Request to %s", joinURL.String())
|
|
|
|
req, _ := http.NewRequest("PUT", joinURL.String(), &b)
|
|
resp, err := t.client.Do(req)
|
|
|
|
for {
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to join: %v", err)
|
|
}
|
|
if resp != nil {
|
|
defer resp.Body.Close()
|
|
|
|
log.Infof("»»»» %d", resp.StatusCode)
|
|
if resp.StatusCode == http.StatusOK {
|
|
b, _ := ioutil.ReadAll(resp.Body)
|
|
s.joinIndex, _ = binary.Uvarint(b)
|
|
return nil
|
|
}
|
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
address := resp.Header.Get("Location")
|
|
log.Debugf("Send Join Request to %s", address)
|
|
c := &JoinCommand{
|
|
MinVersion: store.MinVersion(),
|
|
MaxVersion: store.MaxVersion(),
|
|
Name: server.Name(),
|
|
RaftURL: s.Config.URL,
|
|
EtcdURL: s.server.URL(),
|
|
}
|
|
json.NewEncoder(&b).Encode(c)
|
|
resp, _, err = t.Put(address, &b)
|
|
|
|
} else if resp.StatusCode == http.StatusBadRequest {
|
|
log.Debug("Reach max number peers in the cluster")
|
|
decoder := json.NewDecoder(resp.Body)
|
|
err := &etcdErr.Error{}
|
|
decoder.Decode(err)
|
|
return *err
|
|
} else {
|
|
return fmt.Errorf("Unable to join")
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func (s *PeerServer) Stats() []byte {
|
|
s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String()
|
|
|
|
// TODO: register state listener to raft to change this field
|
|
// rather than compare the state each time Stats() is called.
|
|
if s.RaftServer().State() == raft.Leader {
|
|
s.serverStats.LeaderInfo.Name = s.RaftServer().Name()
|
|
}
|
|
|
|
queue := s.serverStats.sendRateQueue
|
|
|
|
s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
|
|
|
|
queue = s.serverStats.recvRateQueue
|
|
|
|
s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
|
|
|
|
b, _ := json.Marshal(s.serverStats)
|
|
|
|
return b
|
|
}
|
|
|
|
func (s *PeerServer) PeerStats() []byte {
|
|
if s.raftServer.State() == raft.Leader {
|
|
b, _ := json.Marshal(s.followersStats)
|
|
return b
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// raftEventLogger converts events from the Raft server into log messages.
|
|
func (s *PeerServer) raftEventLogger(event raft.Event) {
|
|
value := event.Value()
|
|
prevValue := event.PrevValue()
|
|
if value == nil {
|
|
value = "<nil>"
|
|
}
|
|
if prevValue == nil {
|
|
prevValue = "<nil>"
|
|
}
|
|
|
|
switch event.Type() {
|
|
case raft.StateChangeEventType:
|
|
log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
|
case raft.TermChangeEventType:
|
|
log.Infof("%s: term #%v started.", s.Config.Name, value)
|
|
case raft.LeaderChangeEventType:
|
|
log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
|
case raft.AddPeerEventType:
|
|
log.Infof("%s: peer added: '%v'", s.Config.Name, value)
|
|
case raft.RemovePeerEventType:
|
|
log.Infof("%s: peer removed: '%v'", s.Config.Name, value)
|
|
case raft.HeartbeatIntervalEventType:
|
|
var name = "<unknown>"
|
|
if peer, ok := value.(*raft.Peer); ok {
|
|
name = peer.Name
|
|
}
|
|
log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name)
|
|
case raft.ElectionTimeoutThresholdEventType:
|
|
select {
|
|
case s.timeoutThresholdChan <- value:
|
|
default:
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func (s *PeerServer) recordMetricEvent(event raft.Event) {
|
|
name := fmt.Sprintf("raft.event.%s", event.Type())
|
|
value := event.Value().(time.Duration)
|
|
(*s.metrics).Timer(name).Update(value)
|
|
}
|
|
|
|
// logSnapshot logs about the snapshot that was taken.
|
|
func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) {
|
|
info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.Config.Name, count, currentIndex)
|
|
|
|
if err != nil {
|
|
log.Infof("%s attempted and failed: %v", info, err)
|
|
} else {
|
|
log.Infof("%s completed", info)
|
|
}
|
|
}
|
|
|
|
func (s *PeerServer) startRoutine(f func()) {
|
|
s.routineGroup.Add(1)
|
|
go func() {
|
|
defer s.routineGroup.Done()
|
|
f()
|
|
}()
|
|
}
|
|
|
|
func (s *PeerServer) monitorSnapshot() {
|
|
for {
|
|
timer := time.NewTimer(s.snapConf.checkingInterval)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
currentIndex := s.RaftServer().CommitIndex()
|
|
count := currentIndex - s.snapConf.lastIndex
|
|
if uint64(count) > s.snapConf.snapshotThr {
|
|
err := s.raftServer.TakeSnapshot()
|
|
s.logSnapshot(err, currentIndex, count)
|
|
s.snapConf.lastIndex = currentIndex
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *PeerServer) monitorSync() {
|
|
ticker := time.NewTicker(time.Millisecond * 500)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case now := <-ticker.C:
|
|
if s.raftServer.State() == raft.Leader {
|
|
s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorTimeoutThreshold groups timeout threshold events together and prints
|
|
// them as a single log line.
|
|
func (s *PeerServer) monitorTimeoutThreshold() {
|
|
for {
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case value := <-s.timeoutThresholdChan:
|
|
log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value)
|
|
}
|
|
|
|
timer := time.NewTimer(ThresholdMonitorTimeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorActiveSize has the leader periodically check the status of cluster
|
|
// nodes and swaps them out for standbys as needed.
|
|
func (s *PeerServer) monitorActiveSize() {
|
|
for {
|
|
timer := time.NewTimer(ActiveMonitorTimeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
// Ignore while this peer is not a leader.
|
|
if s.raftServer.State() != raft.Leader {
|
|
continue
|
|
}
|
|
|
|
// Retrieve target active size and actual active size.
|
|
activeSize := s.ClusterConfig().ActiveSize
|
|
peerCount := s.registry.Count()
|
|
peers := s.registry.Names()
|
|
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
|
|
peers = append(peers[:index], peers[index+1:]...)
|
|
}
|
|
|
|
// If we have more active nodes than we should then remove.
|
|
if peerCount > activeSize {
|
|
peer := peers[rand.Intn(len(peers))]
|
|
log.Infof("%s: removing: %v", s.Config.Name, peer)
|
|
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
|
|
log.Infof("%s: warning: remove error: %v", s.Config.Name, err)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorPeerActivity has the leader periodically for dead nodes and demotes them.
|
|
func (s *PeerServer) monitorPeerActivity() {
|
|
for {
|
|
timer := time.NewTimer(PeerActivityMonitorTimeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
// Ignore while this peer is not a leader.
|
|
if s.raftServer.State() != raft.Leader {
|
|
continue
|
|
}
|
|
|
|
// Check last activity for all peers.
|
|
now := time.Now()
|
|
promoteDelay := time.Duration(s.ClusterConfig().PromoteDelay) * time.Second
|
|
peers := s.raftServer.Peers()
|
|
for _, peer := range peers {
|
|
// If the last response from the peer is longer than the promote delay
|
|
// then automatically demote the peer.
|
|
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > promoteDelay {
|
|
log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
|
|
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil {
|
|
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|