mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Currently the only way we know that a peer isn't getting a heartbeat is an edge triggered event from go raft on every missed heartbeat. This means that we need to do some book keeping in order to do exponential backoff. The upside is that instead of screaming thousands of log lines before a machine hits the default removal of 30 minutes it is only ~100.
898 lines
26 KiB
Go
898 lines
26 KiB
Go
package server
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
|
|
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
|
|
|
|
"github.com/coreos/etcd/discovery"
|
|
etcdErr "github.com/coreos/etcd/error"
|
|
"github.com/coreos/etcd/log"
|
|
"github.com/coreos/etcd/metrics"
|
|
"github.com/coreos/etcd/pkg/btrfs"
|
|
"github.com/coreos/etcd/store"
|
|
)
|
|
|
|
const (
|
|
// MaxHeartbeatTimeoutBackoff is the maximum number of seconds before we warn
|
|
// the user again about a peer not accepting heartbeats.
|
|
MaxHeartbeatTimeoutBackoff = 15 * time.Second
|
|
|
|
// ThresholdMonitorTimeout is the time between log notifications that the
|
|
// Raft heartbeat is too close to the election timeout.
|
|
ThresholdMonitorTimeout = 5 * time.Second
|
|
|
|
// ActiveMonitorTimeout is the time between checks on the active size of
|
|
// the cluster. If the active size is bigger than the actual size then
|
|
// etcd attempts to demote to bring it to the correct number.
|
|
ActiveMonitorTimeout = 1 * time.Second
|
|
|
|
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
|
|
// the cluster.
|
|
PeerActivityMonitorTimeout = 1 * time.Second
|
|
|
|
// The location of cluster config in key space.
|
|
ClusterConfigKey = "/_etcd/config"
|
|
)
|
|
|
|
type PeerServerConfig struct {
|
|
Name string
|
|
Scheme string
|
|
URL string
|
|
SnapshotCount int
|
|
RetryTimes int
|
|
RetryInterval float64
|
|
}
|
|
|
|
type PeerServer struct {
|
|
Config PeerServerConfig
|
|
client *Client
|
|
raftServer raft.Server
|
|
server *Server
|
|
followersStats *raftFollowersStats
|
|
serverStats *raftServerStats
|
|
registry *Registry
|
|
store store.Store
|
|
snapConf *snapshotConf
|
|
|
|
joinIndex uint64
|
|
isNewCluster bool
|
|
removedInLog bool
|
|
|
|
removeNotify chan bool
|
|
started bool
|
|
closeChan chan bool
|
|
routineGroup sync.WaitGroup
|
|
timeoutThresholdChan chan interface{}
|
|
|
|
logBackoffs map[string]*logBackoff
|
|
|
|
metrics *metrics.Bucket
|
|
sync.Mutex
|
|
}
|
|
|
|
type logBackoff struct {
|
|
next time.Time
|
|
backoff time.Duration
|
|
count int
|
|
}
|
|
|
|
// TODO: find a good policy to do snapshot
|
|
type snapshotConf struct {
|
|
// Etcd will check if snapshot is need every checkingInterval
|
|
checkingInterval time.Duration
|
|
|
|
// The index when the last snapshot happened
|
|
lastIndex uint64
|
|
|
|
// If the incremental number of index since the last snapshot
|
|
// exceeds the snapshot Threshold, etcd will do a snapshot
|
|
snapshotThr uint64
|
|
}
|
|
|
|
func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
|
|
s := &PeerServer{
|
|
Config: psConfig,
|
|
client: client,
|
|
registry: registry,
|
|
store: store,
|
|
followersStats: followersStats,
|
|
serverStats: serverStats,
|
|
|
|
timeoutThresholdChan: make(chan interface{}, 1),
|
|
logBackoffs: make(map[string]*logBackoff),
|
|
|
|
metrics: mb,
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) {
|
|
s.snapConf = &snapshotConf{
|
|
checkingInterval: time.Second * 3,
|
|
// this is not accurate, we will update raft to provide an api
|
|
lastIndex: raftServer.CommitIndex(),
|
|
snapshotThr: uint64(s.Config.SnapshotCount),
|
|
}
|
|
|
|
raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.HeartbeatIntervalEventType, s.raftEventLogger)
|
|
raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
|
|
|
|
raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
|
|
|
|
raftServer.AddEventListener(raft.RemovedEventType, s.removedEvent)
|
|
|
|
s.raftServer = raftServer
|
|
s.removedInLog = false
|
|
|
|
// LoadSnapshot
|
|
if snapshot {
|
|
err := s.raftServer.LoadSnapshot()
|
|
|
|
if err == nil {
|
|
log.Debugf("%s finished load snapshot", s.Config.Name)
|
|
} else {
|
|
log.Debug(err)
|
|
}
|
|
}
|
|
|
|
s.raftServer.Init()
|
|
|
|
// Set NOCOW for data directory in btrfs
|
|
if btrfs.IsBtrfs(s.raftServer.LogPath()) {
|
|
if err := btrfs.SetNOCOWFile(s.raftServer.LogPath()); err != nil {
|
|
log.Warnf("Failed setting NOCOW: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *PeerServer) SetRegistry(registry *Registry) {
|
|
s.registry = registry
|
|
}
|
|
|
|
func (s *PeerServer) SetStore(store store.Store) {
|
|
s.store = store
|
|
}
|
|
|
|
// Try all possible ways to find clusters to join
|
|
// Include log data in -data-dir, -discovery and -peers
|
|
//
|
|
// Peer discovery follows this order:
|
|
// 1. previous peers in -data-dir
|
|
// 2. -discovery
|
|
// 3. -peers
|
|
func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bool, possiblePeers []string, err error) {
|
|
name := s.Config.Name
|
|
isNewNode := s.raftServer.IsLogEmpty()
|
|
|
|
// Try its best to find possible peers, and connect with them.
|
|
if !isNewNode {
|
|
// It is not allowed to join the cluster with existing peer address
|
|
// This prevents old node joining with different name by mistake.
|
|
if !s.checkPeerAddressNonconflict() {
|
|
err = fmt.Errorf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
|
|
return
|
|
}
|
|
|
|
// Take old nodes into account.
|
|
possiblePeers = s.getKnownPeers()
|
|
// Discover registered peers.
|
|
// TODO(yichengq): It may mess up discoverURL if this is
|
|
// set wrong by mistake. This may need to refactor discovery
|
|
// module. Fix it later.
|
|
if discoverURL != "" {
|
|
discoverPeers, _ := s.handleDiscovery(discoverURL)
|
|
possiblePeers = append(possiblePeers, discoverPeers...)
|
|
}
|
|
possiblePeers = append(possiblePeers, peers...)
|
|
possiblePeers = s.removeSelfFromList(possiblePeers)
|
|
|
|
if s.removedInLog {
|
|
return
|
|
}
|
|
|
|
// If there is possible peer list, use it to find cluster.
|
|
if len(possiblePeers) > 0 {
|
|
// TODO(yichengq): joinCluster may fail if there's no leader for
|
|
// current cluster. It should wait if the cluster is under
|
|
// leader election, or the node with changed IP cannot join
|
|
// the cluster then.
|
|
if rejected, ierr := s.startAsFollower(possiblePeers, 1); rejected {
|
|
log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr)
|
|
return
|
|
} else if ierr != nil {
|
|
log.Warnf("%s cannot connect to previous cluster %v: %v", name, possiblePeers, ierr)
|
|
} else {
|
|
log.Debugf("%s joins to the previous cluster %v", name, possiblePeers)
|
|
toStart = true
|
|
return
|
|
}
|
|
}
|
|
|
|
// TODO(yichengq): Think about the action that should be done
|
|
// if it cannot connect any of the previous known node.
|
|
log.Debugf("%s is restarting the cluster %v", name, possiblePeers)
|
|
s.SetJoinIndex(s.raftServer.CommitIndex())
|
|
toStart = true
|
|
return
|
|
}
|
|
|
|
// Attempt cluster discovery
|
|
if discoverURL != "" {
|
|
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
|
|
// It is not registered in discover url
|
|
if discoverErr != nil {
|
|
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
|
|
if len(peers) == 0 {
|
|
err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name)
|
|
return
|
|
}
|
|
log.Debugf("%s is joining peers %v from -peers flag", name, peers)
|
|
} else {
|
|
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
|
|
peers = discoverPeers
|
|
}
|
|
}
|
|
possiblePeers = peers
|
|
|
|
if len(possiblePeers) > 0 {
|
|
if rejected, ierr := s.startAsFollower(possiblePeers, s.Config.RetryTimes); rejected {
|
|
log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr)
|
|
} else if ierr != nil {
|
|
log.Warnf("%s cannot connect to existing peers %v: %v", name, possiblePeers, ierr)
|
|
err = ierr
|
|
} else {
|
|
toStart = true
|
|
}
|
|
return
|
|
}
|
|
|
|
// start as a leader in a new cluster
|
|
s.isNewCluster = true
|
|
log.Infof("%s is starting a new cluster", s.Config.Name)
|
|
toStart = true
|
|
return
|
|
}
|
|
|
|
// Start starts the raft server.
|
|
// The function assumes that join has been accepted successfully.
|
|
func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
if s.started {
|
|
return nil
|
|
}
|
|
s.started = true
|
|
|
|
s.removeNotify = make(chan bool)
|
|
s.closeChan = make(chan bool)
|
|
|
|
s.raftServer.Start()
|
|
if s.isNewCluster {
|
|
s.InitNewCluster(clusterConfig)
|
|
s.isNewCluster = false
|
|
}
|
|
|
|
s.startRoutine(s.monitorSync)
|
|
s.startRoutine(s.monitorTimeoutThreshold)
|
|
s.startRoutine(s.monitorActiveSize)
|
|
s.startRoutine(s.monitorPeerActivity)
|
|
|
|
// open the snapshot
|
|
if snapshot {
|
|
s.startRoutine(s.monitorSnapshot)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the server gracefully.
|
|
func (s *PeerServer) Stop() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
if !s.started {
|
|
return
|
|
}
|
|
s.started = false
|
|
|
|
close(s.closeChan)
|
|
// TODO(yichengq): it should also call async stop for raft server,
|
|
// but this functionality has not been implemented.
|
|
s.raftServer.Stop()
|
|
s.routineGroup.Wait()
|
|
}
|
|
|
|
// asyncRemove stops the server in peer mode.
|
|
// It is called to stop the server internally when it has been removed
|
|
// from the cluster.
|
|
// The function triggers the stop action first to notice server that it
|
|
// should not continue, and wait for its stop in separate goroutine because
|
|
// the caller should also exit.
|
|
func (s *PeerServer) asyncRemove() {
|
|
s.Lock()
|
|
if !s.started {
|
|
s.Unlock()
|
|
return
|
|
}
|
|
s.started = false
|
|
|
|
close(s.closeChan)
|
|
// TODO(yichengq): it should also call async stop for raft server,
|
|
// but this functionality has not been implemented.
|
|
go func() {
|
|
s.raftServer.Stop()
|
|
s.routineGroup.Wait()
|
|
close(s.removeNotify)
|
|
s.Unlock()
|
|
}()
|
|
}
|
|
|
|
// RemoveNotify notifies the server is removed from peer mode due to
|
|
// removal from the cluster.
|
|
func (s *PeerServer) RemoveNotify() <-chan bool {
|
|
return s.removeNotify
|
|
}
|
|
|
|
func (s *PeerServer) HTTPHandler() http.Handler {
|
|
router := mux.NewRouter()
|
|
|
|
// internal commands
|
|
router.HandleFunc("/name", s.NameHttpHandler)
|
|
router.HandleFunc("/version", s.VersionHttpHandler)
|
|
router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
|
|
router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
|
|
router.HandleFunc("/join", s.JoinHttpHandler)
|
|
router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
|
|
router.HandleFunc("/vote", s.VoteHttpHandler)
|
|
router.HandleFunc("/log", s.GetLogHttpHandler)
|
|
router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
|
|
router.HandleFunc("/snapshot", s.SnapshotHttpHandler)
|
|
router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
|
|
router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
|
|
|
|
router.HandleFunc("/v2/admin/config", s.getClusterConfigHttpHandler).Methods("GET")
|
|
router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT")
|
|
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
|
|
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
|
|
router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE")
|
|
|
|
return router
|
|
}
|
|
|
|
func (s *PeerServer) SetJoinIndex(joinIndex uint64) {
|
|
s.joinIndex = joinIndex
|
|
}
|
|
|
|
// ClusterConfig retrieves the current cluster configuration.
|
|
func (s *PeerServer) ClusterConfig() *ClusterConfig {
|
|
e, err := s.store.Get(ClusterConfigKey, false, false)
|
|
// This is useful for backward compatibility because it doesn't
|
|
// set cluster config in older version.
|
|
if err != nil {
|
|
log.Debugf("failed getting cluster config key: %v", err)
|
|
return NewClusterConfig()
|
|
}
|
|
|
|
var c ClusterConfig
|
|
if err = json.Unmarshal([]byte(*e.Node.Value), &c); err != nil {
|
|
log.Debugf("failed unmarshaling cluster config: %v", err)
|
|
return NewClusterConfig()
|
|
}
|
|
return &c
|
|
}
|
|
|
|
// SetClusterConfig updates the current cluster configuration.
|
|
// Adjusting the active size will cause cluster to add or remove machines
|
|
// to match the new size.
|
|
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
|
|
// Set minimums.
|
|
if c.ActiveSize < MinActiveSize {
|
|
c.ActiveSize = MinActiveSize
|
|
}
|
|
if c.RemoveDelay < MinRemoveDelay {
|
|
c.RemoveDelay = MinRemoveDelay
|
|
}
|
|
if c.SyncInterval < MinSyncInterval {
|
|
c.SyncInterval = MinSyncInterval
|
|
}
|
|
|
|
log.Debugf("set cluster config as %v", c)
|
|
b, _ := json.Marshal(c)
|
|
s.store.Set(ClusterConfigKey, false, string(b), store.Permanent)
|
|
}
|
|
|
|
// Retrieves the underlying Raft server.
|
|
func (s *PeerServer) RaftServer() raft.Server {
|
|
return s.raftServer
|
|
}
|
|
|
|
// Associates the client server with the peer server.
|
|
func (s *PeerServer) SetServer(server *Server) {
|
|
s.server = server
|
|
}
|
|
|
|
func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) {
|
|
// leader need to join self as a peer
|
|
s.doCommand(&JoinCommand{
|
|
MinVersion: store.MinVersion(),
|
|
MaxVersion: store.MaxVersion(),
|
|
Name: s.raftServer.Name(),
|
|
RaftURL: s.Config.URL,
|
|
EtcdURL: s.server.URL(),
|
|
})
|
|
log.Debugf("%s start as a leader", s.Config.Name)
|
|
s.joinIndex = 1
|
|
|
|
s.doCommand(&SetClusterConfigCommand{Config: clusterConfig})
|
|
log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig)
|
|
}
|
|
|
|
func (s *PeerServer) doCommand(cmd raft.Command) {
|
|
for {
|
|
if _, err := s.raftServer.Do(cmd); err == nil {
|
|
break
|
|
}
|
|
}
|
|
log.Debugf("%s start as a leader", s.Config.Name)
|
|
}
|
|
|
|
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) (bool, error) {
|
|
// start as a follower in a existing cluster
|
|
for i := 0; ; i++ {
|
|
if rejected, err := s.joinCluster(cluster); rejected {
|
|
return true, err
|
|
} else if err == nil {
|
|
return false, nil
|
|
}
|
|
if i == retryTimes-1 {
|
|
break
|
|
}
|
|
log.Infof("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
|
|
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
|
|
continue
|
|
}
|
|
return false, fmt.Errorf("fail joining the cluster via given peers after %x retries", retryTimes)
|
|
}
|
|
|
|
// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
|
|
func (s *PeerServer) Upgradable() error {
|
|
nextVersion := s.store.Version() + 1
|
|
for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) {
|
|
u, err := url.Parse(peerURL)
|
|
if err != nil {
|
|
return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err)
|
|
}
|
|
|
|
url := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme}).String()
|
|
ok, err := s.client.CheckVersion(url, nextVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !ok {
|
|
return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkPeerAddressNonconflict checks whether the peer address has existed with different name.
|
|
func (s *PeerServer) checkPeerAddressNonconflict() bool {
|
|
// there exists the (name, peer address) pair
|
|
if peerURL, ok := s.registry.PeerURL(s.Config.Name); ok {
|
|
if peerURL == s.Config.URL {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// check all existing peer addresses
|
|
peerURLs := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
|
for _, peerURL := range peerURLs {
|
|
if peerURL == s.Config.URL {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Helper function to do discovery and return results in expected format
|
|
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
|
|
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.startRoutine)
|
|
|
|
// Warn about errors coming from discovery, this isn't fatal
|
|
// since the user might have provided a peer list elsewhere,
|
|
// or there is some log in data dir.
|
|
if err != nil {
|
|
log.Warnf("Discovery encountered an error: %v", err)
|
|
return
|
|
}
|
|
|
|
for i := range peers {
|
|
// Strip the scheme off of the peer if it has one
|
|
// TODO(bp): clean this up!
|
|
purl, err := url.Parse(peers[i])
|
|
if err == nil {
|
|
peers[i] = purl.Host
|
|
}
|
|
}
|
|
|
|
log.Infof("Discovery fetched back peer list: %v", peers)
|
|
|
|
return
|
|
}
|
|
|
|
// getKnownPeers gets the previous peers from log
|
|
func (s *PeerServer) getKnownPeers() []string {
|
|
peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
|
|
log.Infof("Peer URLs in log: %s / %s (%s)", s.raftServer.Leader(), s.Config.Name, strings.Join(peers, ","))
|
|
|
|
for i := range peers {
|
|
u, err := url.Parse(peers[i])
|
|
if err != nil {
|
|
log.Debugf("getKnownPeers cannot parse url %v", peers[i])
|
|
}
|
|
peers[i] = u.Host
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// removeSelfFromList removes url of the peerServer from the peer list
|
|
func (s *PeerServer) removeSelfFromList(peers []string) []string {
|
|
// Remove its own peer address from the peer list to join
|
|
u, err := url.Parse(s.Config.URL)
|
|
if err != nil {
|
|
log.Warnf("failed parsing self peer address %v", s.Config.URL)
|
|
u = nil
|
|
}
|
|
newPeers := make([]string, 0)
|
|
for _, v := range peers {
|
|
if u == nil || v != u.Host {
|
|
newPeers = append(newPeers, v)
|
|
}
|
|
}
|
|
return newPeers
|
|
}
|
|
|
|
func (s *PeerServer) joinCluster(cluster []string) (bool, error) {
|
|
for _, peer := range cluster {
|
|
if len(peer) == 0 {
|
|
continue
|
|
}
|
|
|
|
if rejected, err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme); rejected {
|
|
return true, fmt.Errorf("rejected by peer %s: %v", peer, err)
|
|
} else if err == nil {
|
|
log.Infof("%s joined the cluster via peer %s", s.Config.Name, peer)
|
|
return false, nil
|
|
} else {
|
|
log.Infof("%s attempted to join via %s failed: %v", s.Config.Name, peer, err)
|
|
}
|
|
}
|
|
|
|
return false, fmt.Errorf("unreachable cluster")
|
|
}
|
|
|
|
// Send join requests to peer.
|
|
// The first return tells whether it is rejected by the cluster directly.
|
|
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) (bool, error) {
|
|
u := (&url.URL{Host: peer, Scheme: scheme}).String()
|
|
|
|
// Our version must match the leaders version
|
|
version, err := s.client.GetVersion(u)
|
|
if err != nil {
|
|
return false, fmt.Errorf("fail checking join version: %v", err)
|
|
}
|
|
if version < store.MinVersion() || version > store.MaxVersion() {
|
|
return true, fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
|
|
}
|
|
|
|
// Fetch current peer list
|
|
machines, err := s.client.GetMachines(u)
|
|
if err != nil {
|
|
return false, fmt.Errorf("fail getting machine messages: %v", err)
|
|
}
|
|
exist := false
|
|
for _, machine := range machines {
|
|
if machine.Name == server.Name() {
|
|
exist = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// Fetch cluster config to see whether exists some place.
|
|
clusterConfig, err := s.client.GetClusterConfig(u)
|
|
if err != nil {
|
|
return false, fmt.Errorf("fail getting cluster config: %v", err)
|
|
}
|
|
if !exist && clusterConfig.ActiveSize <= len(machines) {
|
|
return true, fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines))
|
|
}
|
|
|
|
joinIndex, err := s.client.AddMachine(u,
|
|
&JoinCommand{
|
|
MinVersion: store.MinVersion(),
|
|
MaxVersion: store.MaxVersion(),
|
|
Name: server.Name(),
|
|
RaftURL: s.Config.URL,
|
|
EtcdURL: s.server.URL(),
|
|
})
|
|
if err != nil {
|
|
return err.ErrorCode == etcdErr.EcodeNoMorePeer, fmt.Errorf("fail on join request: %v", err)
|
|
}
|
|
|
|
s.joinIndex = joinIndex
|
|
return false, nil
|
|
}
|
|
|
|
func (s *PeerServer) Stats() []byte {
|
|
s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.StartTime).String()
|
|
|
|
// TODO: register state listener to raft to change this field
|
|
// rather than compare the state each time Stats() is called.
|
|
if s.RaftServer().State() == raft.Leader {
|
|
s.serverStats.LeaderInfo.Name = s.RaftServer().Name()
|
|
}
|
|
|
|
queue := s.serverStats.sendRateQueue
|
|
|
|
s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
|
|
|
|
queue = s.serverStats.recvRateQueue
|
|
|
|
s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
|
|
|
|
b, _ := json.Marshal(s.serverStats)
|
|
|
|
return b
|
|
}
|
|
|
|
func (s *PeerServer) PeerStats() []byte {
|
|
if s.raftServer.State() == raft.Leader {
|
|
b, _ := json.Marshal(s.followersStats)
|
|
return b
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// removedEvent handles the case where a machine has been removed from the
|
|
// cluster and is notified when it tries to become a candidate.
|
|
func (s *PeerServer) removedEvent(event raft.Event) {
|
|
// HACK(philips): we need to find a better notification for this.
|
|
log.Infof("removed during cluster re-configuration")
|
|
s.asyncRemove()
|
|
}
|
|
|
|
// raftEventLogger converts events from the Raft server into log messages.
|
|
func (s *PeerServer) raftEventLogger(event raft.Event) {
|
|
value := event.Value()
|
|
prevValue := event.PrevValue()
|
|
if value == nil {
|
|
value = "<nil>"
|
|
}
|
|
if prevValue == nil {
|
|
prevValue = "<nil>"
|
|
}
|
|
|
|
switch event.Type() {
|
|
case raft.StateChangeEventType:
|
|
log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
|
case raft.TermChangeEventType:
|
|
log.Infof("%s: term #%v started.", s.Config.Name, value)
|
|
case raft.LeaderChangeEventType:
|
|
log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
|
|
case raft.AddPeerEventType:
|
|
log.Infof("%s: peer added: '%v'", s.Config.Name, value)
|
|
case raft.RemovePeerEventType:
|
|
log.Infof("%s: peer removed: '%v'", s.Config.Name, value)
|
|
case raft.HeartbeatIntervalEventType:
|
|
peer, ok := value.(*raft.Peer);
|
|
if !ok {
|
|
log.Warnf("%s: heatbeat timeout from unknown peer", s.Config.Name)
|
|
return
|
|
}
|
|
s.logHeartbeatTimeout(peer)
|
|
case raft.ElectionTimeoutThresholdEventType:
|
|
select {
|
|
case s.timeoutThresholdChan <- value:
|
|
default:
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// logHeartbeatTimeout logs about the edge triggered heartbeat timeout event
|
|
// only if we haven't warned within a reasonable interval.
|
|
func (s *PeerServer) logHeartbeatTimeout(peer *raft.Peer) {
|
|
b, ok := s.logBackoffs[peer.Name]
|
|
if !ok {
|
|
b = &logBackoff{time.Time{}, time.Second, 1}
|
|
s.logBackoffs[peer.Name] = b
|
|
}
|
|
|
|
if peer.LastActivity().After(b.next) {
|
|
b.next = time.Time{}
|
|
b.backoff = time.Second
|
|
b.count = 1
|
|
}
|
|
|
|
if b.next.After(time.Now()) {
|
|
b.count++
|
|
return
|
|
}
|
|
|
|
b.backoff = 2 * b.backoff
|
|
if b.backoff > MaxHeartbeatTimeoutBackoff {
|
|
b.backoff = MaxHeartbeatTimeoutBackoff
|
|
}
|
|
b.next = time.Now().Add(b.backoff)
|
|
|
|
log.Infof("%s: warning: heartbeat time out peer=%q missed=%d backoff=%q", s.Config.Name, peer.Name, b.count, b.backoff)
|
|
}
|
|
|
|
func (s *PeerServer) recordMetricEvent(event raft.Event) {
|
|
name := fmt.Sprintf("raft.event.%s", event.Type())
|
|
value := event.Value().(time.Duration)
|
|
(*s.metrics).Timer(name).Update(value)
|
|
}
|
|
|
|
// logSnapshot logs about the snapshot that was taken.
|
|
func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) {
|
|
info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.Config.Name, count, currentIndex)
|
|
|
|
if err != nil {
|
|
log.Infof("%s attempted and failed: %v", info, err)
|
|
} else {
|
|
log.Infof("%s completed", info)
|
|
}
|
|
}
|
|
|
|
func (s *PeerServer) startRoutine(f func()) {
|
|
s.routineGroup.Add(1)
|
|
go func() {
|
|
defer s.routineGroup.Done()
|
|
f()
|
|
}()
|
|
}
|
|
|
|
func (s *PeerServer) monitorSnapshot() {
|
|
for {
|
|
timer := time.NewTimer(s.snapConf.checkingInterval)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
currentIndex := s.RaftServer().CommitIndex()
|
|
count := currentIndex - s.snapConf.lastIndex
|
|
if uint64(count) > s.snapConf.snapshotThr {
|
|
err := s.raftServer.TakeSnapshot()
|
|
s.logSnapshot(err, currentIndex, count)
|
|
s.snapConf.lastIndex = currentIndex
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *PeerServer) monitorSync() {
|
|
ticker := time.NewTicker(time.Millisecond * 500)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case now := <-ticker.C:
|
|
if s.raftServer.State() == raft.Leader {
|
|
s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorTimeoutThreshold groups timeout threshold events together and prints
|
|
// them as a single log line.
|
|
func (s *PeerServer) monitorTimeoutThreshold() {
|
|
for {
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case value := <-s.timeoutThresholdChan:
|
|
log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value)
|
|
}
|
|
|
|
timer := time.NewTimer(ThresholdMonitorTimeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorActiveSize has the leader periodically check the status of cluster
|
|
// nodes and swaps them out for standbys as needed.
|
|
func (s *PeerServer) monitorActiveSize() {
|
|
for {
|
|
timer := time.NewTimer(ActiveMonitorTimeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
// Ignore while this peer is not a leader.
|
|
if s.raftServer.State() != raft.Leader {
|
|
continue
|
|
}
|
|
|
|
// Retrieve target active size and actual active size.
|
|
activeSize := s.ClusterConfig().ActiveSize
|
|
peers := s.registry.Names()
|
|
peerCount := len(peers)
|
|
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
|
|
peers = append(peers[:index], peers[index+1:]...)
|
|
}
|
|
|
|
// If we have more active nodes than we should then remove.
|
|
if peerCount > activeSize {
|
|
peer := peers[rand.Intn(len(peers))]
|
|
log.Infof("%s: removing: %v", s.Config.Name, peer)
|
|
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
|
|
log.Infof("%s: warning: remove error: %v", s.Config.Name, err)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// monitorPeerActivity has the leader periodically for dead nodes and demotes them.
|
|
func (s *PeerServer) monitorPeerActivity() {
|
|
for {
|
|
timer := time.NewTimer(PeerActivityMonitorTimeout)
|
|
defer timer.Stop()
|
|
select {
|
|
case <-s.closeChan:
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
|
|
// Ignore while this peer is not a leader.
|
|
if s.raftServer.State() != raft.Leader {
|
|
continue
|
|
}
|
|
|
|
// Check last activity for all peers.
|
|
now := time.Now()
|
|
removeDelay := time.Duration(int64(s.ClusterConfig().RemoveDelay * float64(time.Second)))
|
|
peers := s.raftServer.Peers()
|
|
for _, peer := range peers {
|
|
// If the last response from the peer is longer than the remove delay
|
|
// then automatically demote the peer.
|
|
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay {
|
|
log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
|
|
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil {
|
|
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|