*: remove old server

This commit is contained in:
Xiang Li
2014-08-21 13:00:58 -07:00
committed by Yicheng Qin
parent f891199ab0
commit 4296cd3fa4
24 changed files with 0 additions and 3657 deletions

View File

@@ -1,186 +0,0 @@
// +build ignore
package server
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
)
// Client sends various requests using HTTP API.
// It is different from raft communication, and doesn't record anything in the log.
// The argument url is required to contain scheme and host only, and
// there is no trailing slash in it.
// Public functions return "etcd/error".Error intentionally to figure out
// etcd error code easily.
// TODO(yichengq): It is similar to go-etcd. But it could have many efforts
// to integrate the two. Leave it for further discussion.
type Client struct {
http.Client
}
func NewClient(transport http.RoundTripper) *Client {
return &Client{http.Client{Transport: transport}}
}
// CheckVersion returns true when the version check on the server returns 200.
func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) {
resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version))
if err != nil {
return false, clientError(err)
}
defer resp.Body.Close()
return resp.StatusCode == 200, nil
}
// GetVersion fetches the peer version of a cluster.
func (c *Client) GetVersion(url string) (int, *etcdErr.Error) {
resp, err := c.Get(url + "/version")
if err != nil {
return 0, clientError(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, clientError(err)
}
// Parse version number.
version, err := strconv.Atoi(string(body))
if err != nil {
return 0, clientError(err)
}
return version, nil
}
func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
resp, err := c.Get(url + "/v2/admin/machines")
if err != nil {
return nil, clientError(err)
}
msgs := new([]*machineMessage)
if uerr := c.parseJSONResponse(resp, msgs); uerr != nil {
return nil, uerr
}
return *msgs, nil
}
func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) {
resp, err := c.Get(url + "/v2/admin/config")
if err != nil {
return nil, clientError(err)
}
cfg := new(ClusterConfig)
if uerr := c.parseJSONResponse(resp, cfg); uerr != nil {
return nil, uerr
}
return cfg, nil
}
// AddMachine adds machine to the cluster.
// The first return value is the commit index of join command.
func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) {
b, _ := json.Marshal(cmd)
url = url + "/join"
log.Infof("Send Join Request to %s", url)
resp, err := c.put(url, b)
if err != nil {
return 0, clientError(err)
}
defer resp.Body.Close()
if err := c.checkErrorResponse(resp); err != nil {
return 0, err
}
b, err = ioutil.ReadAll(resp.Body)
if err != nil {
return 0, clientError(err)
}
index, numRead := binary.Uvarint(b)
if numRead < 0 {
return 0, clientError(fmt.Errorf("buf too small, or value too large"))
}
return index, nil
}
func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error {
defer resp.Body.Close()
if err := c.checkErrorResponse(resp); err != nil {
return err
}
if err := json.NewDecoder(resp.Body).Decode(val); err != nil {
log.Debugf("Error parsing join response: %v", err)
return clientError(err)
}
return nil
}
func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error {
if resp.StatusCode != http.StatusOK {
uerr := &etcdErr.Error{}
if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil {
log.Debugf("Error parsing response to etcd error: %v", err)
return clientError(err)
}
return uerr
}
return nil
}
// put sends server side PUT request.
// It always follows redirects instead of stopping according to RFC 2616.
func (c *Client) put(urlStr string, body []byte) (*http.Response, error) {
return c.doAlwaysFollowingRedirects("PUT", urlStr, body)
}
func (c *Client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) {
var req *http.Request
for redirect := 0; redirect < 10; redirect++ {
req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body))
if err != nil {
return
}
if resp, err = c.Do(req); err != nil {
if resp != nil {
resp.Body.Close()
}
return
}
if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect {
resp.Body.Close()
if urlStr = resp.Header.Get("Location"); urlStr == "" {
err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode))
return
}
continue
}
return
}
err = errors.New("stopped after 10 redirects")
return
}
func clientError(err error) *etcdErr.Error {
return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0)
}

View File

@@ -1,51 +0,0 @@
// +build ignore
package server
import (
"time"
)
const (
// DefaultActiveSize is the default number of active followers allowed.
DefaultActiveSize = 9
// MinActiveSize is the minimum active size allowed.
MinActiveSize = 3
// DefaultRemoveDelay is the default elapsed time before removal.
DefaultRemoveDelay = float64((30 * time.Minute) / time.Second)
// MinRemoveDelay is the minimum remove delay allowed.
MinRemoveDelay = float64((2 * time.Second) / time.Second)
// DefaultSyncInterval is the default interval for cluster sync.
DefaultSyncInterval = float64((5 * time.Second) / time.Second)
// MinSyncInterval is the minimum sync interval allowed.
MinSyncInterval = float64((1 * time.Second) / time.Second)
)
// ClusterConfig represents cluster-wide configuration settings.
type ClusterConfig struct {
// ActiveSize is the maximum number of node that can join as Raft followers.
// Nodes that join the cluster after the limit is reached are standbys.
ActiveSize int `json:"activeSize"`
// RemoveDelay is the amount of time, in seconds, after a node is
// unreachable that it will be swapped out as a standby node.
RemoveDelay float64 `json:"removeDelay"`
// SyncInterval is the amount of time, in seconds, between
// cluster sync when it runs in standby mode.
SyncInterval float64 `json:"syncInterval"`
}
// NewClusterConfig returns a cluster configuration with default settings.
func NewClusterConfig() *ClusterConfig {
return &ClusterConfig{
ActiveSize: DefaultActiveSize,
RemoveDelay: DefaultRemoveDelay,
SyncInterval: DefaultSyncInterval,
}
}

View File

@@ -1,118 +0,0 @@
// +build ignore
package server
import (
"encoding/binary"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {
raft.RegisterCommand(&JoinCommand{})
}
// JoinCommand represents a request to join the cluster.
// The command returns the join_index (Uvarint).
type JoinCommand struct {
MinVersion int `json:"minVersion"`
MaxVersion int `json:"maxVersion"`
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
}
// The name of the join command in the log
func (c *JoinCommand) CommandName() string {
return "etcd:join"
}
// Apply attempts to join a machine to the cluster.
func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
index, err := applyJoin(c, context)
if err != nil {
return nil, err
}
b := make([]byte, 8)
binary.PutUvarint(b, index)
return b, nil
}
func (c *JoinCommand) NodeName() string {
return c.Name
}
// applyJoin attempts to join a machine to the cluster.
func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) {
ps, _ := context.Server().Context().(*PeerServer)
commitIndex := context.CommitIndex()
// Make sure we're not getting a cached value from the registry.
ps.registry.Invalidate(c.Name)
// Check if the join command is from a previous peer, who lost all its previous log.
if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
// If previous node restarts with different peer URL,
// update its information.
if peerURL != c.RaftURL {
log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name)
if err := updatePeerURL(c, ps); err != nil {
return 0, err
}
}
if c.Name == context.Server().Name() {
ps.removedInLog = false
}
return commitIndex, nil
}
// Check if the join command adds an instance that collides with existing one on peer URL.
peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name)
for _, peerURL := range peerURLs {
if peerURL == c.RaftURL {
log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL)
return 0, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex())
}
}
// Check peer number in the cluster
count := ps.registry.Count()
// ClusterConfig doesn't init until first machine is added
if count > 0 && count >= ps.ClusterConfig().ActiveSize {
log.Debug("Reject join request from ", c.Name)
return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
}
// Add to shared peer registry.
ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL)
// Add peer in raft
if err := context.Server().AddPeer(c.Name, ""); err != nil {
return 0, err
}
// Add peer stats
if c.Name != ps.RaftServer().Name() {
ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
}
if c.Name == context.Server().Name() {
ps.removedInLog = false
}
return commitIndex, nil
}
func updatePeerURL(c *JoinCommand, ps *PeerServer) error {
log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL)
if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil {
log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
return err
}
// Flush commit index, so raft will replay to here when restart
ps.raftServer.FlushCommitIndex()
return nil
}

View File

@@ -1,68 +0,0 @@
// +build ignore
package server
import (
"crypto/tls"
"net"
"time"
"github.com/coreos/etcd/log"
)
// TLSServerConfig generates tls configuration based on TLSInfo
// If any error happens, this function will call log.Fatal
func TLSServerConfig(info *TLSInfo) *tls.Config {
if info.KeyFile == "" || info.CertFile == "" {
return nil
}
cfg, err := info.ServerConfig()
if err != nil {
log.Fatal("TLS info error: ", err)
}
return cfg
}
// NewListener creates a net.Listener
// If the given scheme is "https", it will use TLS config to set listener.
// If any error happens, this function will call log.Fatal
func NewListener(scheme, addr string, cfg *tls.Config) net.Listener {
if scheme == "https" {
l, err := newTLSListener(addr, cfg)
if err != nil {
log.Fatal("Failed to create TLS listener: ", err)
}
return l
}
l, err := newListener(addr)
if err != nil {
log.Fatal("Failed to create listener: ", err)
}
return l
}
func newListener(addr string) (net.Listener, error) {
if addr == "" {
addr = ":http"
}
l, e := net.Listen("tcp", addr)
if e != nil {
return nil, e
}
return l, nil
}
func newTLSListener(addr string, cfg *tls.Config) (net.Listener, error) {
if addr == "" {
addr = ":https"
}
conn, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
return tls.NewListener(conn, cfg), nil
}

View File

@@ -1,27 +0,0 @@
// +build ignore
package server
import (
"time"
)
// packageStats represent the stats we need for a package.
// It has sending time and the size of the package.
type packageStats struct {
sendingTime time.Time
size int
}
// NewPackageStats creates a pacakgeStats and return the pointer to it.
func NewPackageStats(now time.Time, size int) *packageStats {
return &packageStats{
sendingTime: now,
size: size,
}
}
// Time return the sending time of the package.
func (ps *packageStats) Time() time.Time {
return ps.sendingTime
}

View File

@@ -1,899 +0,0 @@
// +build ignore
package server
import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
"github.com/coreos/etcd/discovery"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/pkg/btrfs"
"github.com/coreos/etcd/store"
)
const (
// MaxHeartbeatTimeoutBackoff is the maximum number of seconds before we warn
// the user again about a peer not accepting heartbeats.
MaxHeartbeatTimeoutBackoff = 15 * time.Second
// ThresholdMonitorTimeout is the time between log notifications that the
// Raft heartbeat is too close to the election timeout.
ThresholdMonitorTimeout = 5 * time.Second
// ActiveMonitorTimeout is the time between checks on the active size of
// the cluster. If the active size is bigger than the actual size then
// etcd attempts to demote to bring it to the correct number.
ActiveMonitorTimeout = 1 * time.Second
// PeerActivityMonitorTimeout is the time between checks for dead nodes in
// the cluster.
PeerActivityMonitorTimeout = 1 * time.Second
// The location of cluster config in key space.
ClusterConfigKey = "/_etcd/config"
)
type PeerServerConfig struct {
Name string
Scheme string
URL string
SnapshotCount int
RetryTimes int
RetryInterval float64
}
type PeerServer struct {
Config PeerServerConfig
client *Client
raftServer raft.Server
server *Server
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
store store.Store
snapConf *snapshotConf
joinIndex uint64
isNewCluster bool
removedInLog bool
removeNotify chan bool
started bool
closeChan chan bool
routineGroup sync.WaitGroup
timeoutThresholdChan chan interface{}
logBackoffs map[string]*logBackoff
metrics *metrics.Bucket
sync.Mutex
}
type logBackoff struct {
next time.Time
backoff time.Duration
count int
}
// TODO: find a good policy to do snapshot
type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration
// The index when the last snapshot happened
lastIndex uint64
// If the incremental number of index since the last snapshot
// exceeds the snapshot Threshold, etcd will do a snapshot
snapshotThr uint64
}
func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
s := &PeerServer{
Config: psConfig,
client: client,
registry: registry,
store: store,
followersStats: followersStats,
serverStats: serverStats,
timeoutThresholdChan: make(chan interface{}, 1),
logBackoffs: make(map[string]*logBackoff),
metrics: mb,
}
return s
}
func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) {
s.snapConf = &snapshotConf{
checkingInterval: time.Second * 3,
// this is not accurate, we will update raft to provide an api
lastIndex: raftServer.CommitIndex(),
snapshotThr: uint64(s.Config.SnapshotCount),
}
raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger)
raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger)
raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger)
raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger)
raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger)
raftServer.AddEventListener(raft.HeartbeatIntervalEventType, s.raftEventLogger)
raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger)
raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent)
raftServer.AddEventListener(raft.RemovedEventType, s.removedEvent)
s.raftServer = raftServer
s.removedInLog = false
// LoadSnapshot
if snapshot {
err := s.raftServer.LoadSnapshot()
if err == nil {
log.Debugf("%s finished load snapshot", s.Config.Name)
} else {
log.Debug(err)
}
}
s.raftServer.Init()
// Set NOCOW for data directory in btrfs
if btrfs.IsBtrfs(s.raftServer.LogPath()) {
if err := btrfs.SetNOCOWFile(s.raftServer.LogPath()); err != nil {
log.Warnf("Failed setting NOCOW: %v", err)
}
}
}
func (s *PeerServer) SetRegistry(registry *Registry) {
s.registry = registry
}
func (s *PeerServer) SetStore(store store.Store) {
s.store = store
}
// Try all possible ways to find clusters to join
// Include log data in -data-dir, -discovery and -peers
//
// Peer discovery follows this order:
// 1. previous peers in -data-dir
// 2. -discovery
// 3. -peers
func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bool, possiblePeers []string, err error) {
name := s.Config.Name
isNewNode := s.raftServer.IsLogEmpty()
// Try its best to find possible peers, and connect with them.
if !isNewNode {
// It is not allowed to join the cluster with existing peer address
// This prevents old node joining with different name by mistake.
if !s.checkPeerAddressNonconflict() {
err = fmt.Errorf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
return
}
// Take old nodes into account.
possiblePeers = s.getKnownPeers()
// Discover registered peers.
// TODO(yichengq): It may mess up discoverURL if this is
// set wrong by mistake. This may need to refactor discovery
// module. Fix it later.
if discoverURL != "" {
discoverPeers, _ := s.handleDiscovery(discoverURL)
possiblePeers = append(possiblePeers, discoverPeers...)
}
possiblePeers = append(possiblePeers, peers...)
possiblePeers = s.removeSelfFromList(possiblePeers)
if s.removedInLog {
return
}
// If there is possible peer list, use it to find cluster.
if len(possiblePeers) > 0 {
// TODO(yichengq): joinCluster may fail if there's no leader for
// current cluster. It should wait if the cluster is under
// leader election, or the node with changed IP cannot join
// the cluster then.
if rejected, ierr := s.startAsFollower(possiblePeers, 1); rejected {
log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr)
return
} else if ierr != nil {
log.Warnf("%s cannot connect to previous cluster %v: %v", name, possiblePeers, ierr)
} else {
log.Debugf("%s joins to the previous cluster %v", name, possiblePeers)
toStart = true
return
}
}
// TODO(yichengq): Think about the action that should be done
// if it cannot connect any of the previous known node.
log.Debugf("%s is restarting the cluster %v", name, possiblePeers)
s.SetJoinIndex(s.raftServer.CommitIndex())
toStart = true
return
}
// Attempt cluster discovery
if discoverURL != "" {
discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
// It is not registered in discover url
if discoverErr != nil {
log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr)
if len(peers) == 0 {
err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name)
return
}
log.Debugf("%s is joining peers %v from -peers flag", name, peers)
} else {
log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers)
peers = discoverPeers
}
}
possiblePeers = peers
if len(possiblePeers) > 0 {
if rejected, ierr := s.startAsFollower(possiblePeers, s.Config.RetryTimes); rejected {
log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr)
} else if ierr != nil {
log.Warnf("%s cannot connect to existing peers %v: %v", name, possiblePeers, ierr)
err = ierr
} else {
toStart = true
}
return
}
// start as a leader in a new cluster
s.isNewCluster = true
log.Infof("%s is starting a new cluster", s.Config.Name)
toStart = true
return
}
// Start starts the raft server.
// The function assumes that join has been accepted successfully.
func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
s.Lock()
defer s.Unlock()
if s.started {
return nil
}
s.started = true
s.removeNotify = make(chan bool)
s.closeChan = make(chan bool)
s.raftServer.Start()
if s.isNewCluster {
s.InitNewCluster(clusterConfig)
s.isNewCluster = false
}
s.startRoutine(s.monitorSync)
s.startRoutine(s.monitorTimeoutThreshold)
s.startRoutine(s.monitorActiveSize)
s.startRoutine(s.monitorPeerActivity)
// open the snapshot
if snapshot {
s.startRoutine(s.monitorSnapshot)
}
return nil
}
// Stop stops the server gracefully.
func (s *PeerServer) Stop() {
s.Lock()
defer s.Unlock()
if !s.started {
return
}
s.started = false
close(s.closeChan)
// TODO(yichengq): it should also call async stop for raft server,
// but this functionality has not been implemented.
s.raftServer.Stop()
s.routineGroup.Wait()
}
// asyncRemove stops the server in peer mode.
// It is called to stop the server internally when it has been removed
// from the cluster.
// The function triggers the stop action first to notice server that it
// should not continue, and wait for its stop in separate goroutine because
// the caller should also exit.
func (s *PeerServer) asyncRemove() {
s.Lock()
if !s.started {
s.Unlock()
return
}
s.started = false
close(s.closeChan)
// TODO(yichengq): it should also call async stop for raft server,
// but this functionality has not been implemented.
go func() {
s.raftServer.Stop()
s.routineGroup.Wait()
close(s.removeNotify)
s.Unlock()
}()
}
// RemoveNotify notifies the server is removed from peer mode due to
// removal from the cluster.
func (s *PeerServer) RemoveNotify() <-chan bool {
return s.removeNotify
}
func (s *PeerServer) HTTPHandler() http.Handler {
router := mux.NewRouter()
// internal commands
router.HandleFunc("/name", s.NameHttpHandler)
router.HandleFunc("/version", s.VersionHttpHandler)
router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
router.HandleFunc("/join", s.JoinHttpHandler)
router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
router.HandleFunc("/vote", s.VoteHttpHandler)
router.HandleFunc("/log", s.GetLogHttpHandler)
router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
router.HandleFunc("/snapshot", s.SnapshotHttpHandler)
router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
router.HandleFunc("/v2/admin/config", s.getClusterConfigHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT")
router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET")
router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE")
return router
}
func (s *PeerServer) SetJoinIndex(joinIndex uint64) {
s.joinIndex = joinIndex
}
// ClusterConfig retrieves the current cluster configuration.
func (s *PeerServer) ClusterConfig() *ClusterConfig {
e, err := s.store.Get(ClusterConfigKey, false, false)
// This is useful for backward compatibility because it doesn't
// set cluster config in older version.
if err != nil {
log.Debugf("failed getting cluster config key: %v", err)
return NewClusterConfig()
}
var c ClusterConfig
if err = json.Unmarshal([]byte(*e.Node.Value), &c); err != nil {
log.Debugf("failed unmarshaling cluster config: %v", err)
return NewClusterConfig()
}
return &c
}
// SetClusterConfig updates the current cluster configuration.
// Adjusting the active size will cause cluster to add or remove machines
// to match the new size.
func (s *PeerServer) SetClusterConfig(c *ClusterConfig) {
// Set minimums.
if c.ActiveSize < MinActiveSize {
c.ActiveSize = MinActiveSize
}
if c.RemoveDelay < MinRemoveDelay {
c.RemoveDelay = MinRemoveDelay
}
if c.SyncInterval < MinSyncInterval {
c.SyncInterval = MinSyncInterval
}
log.Debugf("set cluster config as %v", c)
b, _ := json.Marshal(c)
s.store.Set(ClusterConfigKey, false, string(b), store.Permanent)
}
// Retrieves the underlying Raft server.
func (s *PeerServer) RaftServer() raft.Server {
return s.raftServer
}
// Associates the client server with the peer server.
func (s *PeerServer) SetServer(server *Server) {
s.server = server
}
func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) {
// leader need to join self as a peer
s.doCommand(&JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.raftServer.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
})
log.Debugf("%s start as a leader", s.Config.Name)
s.joinIndex = 1
s.doCommand(&SetClusterConfigCommand{Config: clusterConfig})
log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig)
}
func (s *PeerServer) doCommand(cmd raft.Command) {
for {
if _, err := s.raftServer.Do(cmd); err == nil {
break
}
}
log.Debugf("%s start as a leader", s.Config.Name)
}
func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) (bool, error) {
// start as a follower in a existing cluster
for i := 0; ; i++ {
if rejected, err := s.joinCluster(cluster); rejected {
return true, err
} else if err == nil {
return false, nil
}
if i == retryTimes-1 {
break
}
log.Infof("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
continue
}
return false, fmt.Errorf("fail joining the cluster via given peers after %x retries", retryTimes)
}
// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
func (s *PeerServer) Upgradable() error {
nextVersion := s.store.Version() + 1
for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) {
u, err := url.Parse(peerURL)
if err != nil {
return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err)
}
url := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme}).String()
ok, err := s.client.CheckVersion(url, nextVersion)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host)
}
}
return nil
}
// checkPeerAddressNonconflict checks whether the peer address has existed with different name.
func (s *PeerServer) checkPeerAddressNonconflict() bool {
// there exists the (name, peer address) pair
if peerURL, ok := s.registry.PeerURL(s.Config.Name); ok {
if peerURL == s.Config.URL {
return true
}
}
// check all existing peer addresses
peerURLs := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
for _, peerURL := range peerURLs {
if peerURL == s.Config.URL {
return false
}
}
return true
}
// Helper function to do discovery and return results in expected format
func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.startRoutine)
// Warn about errors coming from discovery, this isn't fatal
// since the user might have provided a peer list elsewhere,
// or there is some log in data dir.
if err != nil {
log.Warnf("Discovery encountered an error: %v", err)
return
}
for i := range peers {
// Strip the scheme off of the peer if it has one
// TODO(bp): clean this up!
purl, err := url.Parse(peers[i])
if err == nil {
peers[i] = purl.Host
}
}
log.Infof("Discovery fetched back peer list: %v", peers)
return
}
// getKnownPeers gets the previous peers from log
func (s *PeerServer) getKnownPeers() []string {
peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
log.Infof("Peer URLs in log: %s / %s (%s)", s.raftServer.Leader(), s.Config.Name, strings.Join(peers, ","))
for i := range peers {
u, err := url.Parse(peers[i])
if err != nil {
log.Debugf("getKnownPeers cannot parse url %v", peers[i])
}
peers[i] = u.Host
}
return peers
}
// removeSelfFromList removes url of the peerServer from the peer list
func (s *PeerServer) removeSelfFromList(peers []string) []string {
// Remove its own peer address from the peer list to join
u, err := url.Parse(s.Config.URL)
if err != nil {
log.Warnf("failed parsing self peer address %v", s.Config.URL)
u = nil
}
newPeers := make([]string, 0)
for _, v := range peers {
if u == nil || v != u.Host {
newPeers = append(newPeers, v)
}
}
return newPeers
}
func (s *PeerServer) joinCluster(cluster []string) (bool, error) {
for _, peer := range cluster {
if len(peer) == 0 {
continue
}
if rejected, err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme); rejected {
return true, fmt.Errorf("rejected by peer %s: %v", peer, err)
} else if err == nil {
log.Infof("%s joined the cluster via peer %s", s.Config.Name, peer)
return false, nil
} else {
log.Infof("%s attempted to join via %s failed: %v", s.Config.Name, peer, err)
}
}
return false, fmt.Errorf("unreachable cluster")
}
// Send join requests to peer.
// The first return tells whether it is rejected by the cluster directly.
func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) (bool, error) {
u := (&url.URL{Host: peer, Scheme: scheme}).String()
// Our version must match the leaders version
version, err := s.client.GetVersion(u)
if err != nil {
return false, fmt.Errorf("fail checking join version: %v", err)
}
if version < store.MinVersion() || version > store.MaxVersion() {
return true, fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
}
// Fetch current peer list
machines, err := s.client.GetMachines(u)
if err != nil {
return false, fmt.Errorf("fail getting machine messages: %v", err)
}
exist := false
for _, machine := range machines {
if machine.Name == server.Name() {
exist = true
break
}
}
// Fetch cluster config to see whether exists some place.
clusterConfig, err := s.client.GetClusterConfig(u)
if err != nil {
return false, fmt.Errorf("fail getting cluster config: %v", err)
}
if !exist && clusterConfig.ActiveSize <= len(machines) {
return true, fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines))
}
joinIndex, err := s.client.AddMachine(u,
&JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: server.Name(),
RaftURL: s.Config.URL,
EtcdURL: s.server.URL(),
})
if err != nil {
return err.ErrorCode == etcdErr.EcodeNoMorePeer, fmt.Errorf("fail on join request: %v", err)
}
s.joinIndex = joinIndex
return false, nil
}
func (s *PeerServer) Stats() []byte {
s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.StartTime).String()
// TODO: register state listener to raft to change this field
// rather than compare the state each time Stats() is called.
if s.RaftServer().State() == raft.Leader {
s.serverStats.LeaderInfo.Name = s.RaftServer().Name()
}
queue := s.serverStats.sendRateQueue
s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
queue = s.serverStats.recvRateQueue
s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
b, _ := json.Marshal(s.serverStats)
return b
}
func (s *PeerServer) PeerStats() []byte {
if s.raftServer.State() == raft.Leader {
b, _ := json.Marshal(s.followersStats)
return b
}
return nil
}
// removedEvent handles the case where a machine has been removed from the
// cluster and is notified when it tries to become a candidate.
func (s *PeerServer) removedEvent(event raft.Event) {
// HACK(philips): we need to find a better notification for this.
log.Infof("removed during cluster re-configuration")
s.asyncRemove()
}
// raftEventLogger converts events from the Raft server into log messages.
func (s *PeerServer) raftEventLogger(event raft.Event) {
value := event.Value()
prevValue := event.PrevValue()
if value == nil {
value = "<nil>"
}
if prevValue == nil {
prevValue = "<nil>"
}
switch event.Type() {
case raft.StateChangeEventType:
log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
case raft.TermChangeEventType:
log.Infof("%s: term #%v started.", s.Config.Name, value)
case raft.LeaderChangeEventType:
log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value)
case raft.AddPeerEventType:
log.Infof("%s: peer added: '%v'", s.Config.Name, value)
case raft.RemovePeerEventType:
log.Infof("%s: peer removed: '%v'", s.Config.Name, value)
case raft.HeartbeatIntervalEventType:
peer, ok := value.(*raft.Peer)
if !ok {
log.Warnf("%s: heatbeat timeout from unknown peer", s.Config.Name)
return
}
s.logHeartbeatTimeout(peer)
case raft.ElectionTimeoutThresholdEventType:
select {
case s.timeoutThresholdChan <- value:
default:
}
}
}
// logHeartbeatTimeout logs about the edge triggered heartbeat timeout event
// only if we haven't warned within a reasonable interval.
func (s *PeerServer) logHeartbeatTimeout(peer *raft.Peer) {
b, ok := s.logBackoffs[peer.Name]
if !ok {
b = &logBackoff{time.Time{}, time.Second, 1}
s.logBackoffs[peer.Name] = b
}
if peer.LastActivity().After(b.next) {
b.next = time.Time{}
b.backoff = time.Second
b.count = 1
}
if b.next.After(time.Now()) {
b.count++
return
}
b.backoff = 2 * b.backoff
if b.backoff > MaxHeartbeatTimeoutBackoff {
b.backoff = MaxHeartbeatTimeoutBackoff
}
b.next = time.Now().Add(b.backoff)
log.Infof("%s: warning: heartbeat time out peer=%q missed=%d backoff=%q", s.Config.Name, peer.Name, b.count, b.backoff)
}
func (s *PeerServer) recordMetricEvent(event raft.Event) {
name := fmt.Sprintf("raft.event.%s", event.Type())
value := event.Value().(time.Duration)
(*s.metrics).Timer(name).Update(value)
}
// logSnapshot logs about the snapshot that was taken.
func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) {
info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.Config.Name, count, currentIndex)
if err != nil {
log.Infof("%s attempted and failed: %v", info, err)
} else {
log.Infof("%s completed", info)
}
}
func (s *PeerServer) startRoutine(f func()) {
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
f()
}()
}
func (s *PeerServer) monitorSnapshot() {
for {
timer := time.NewTimer(s.snapConf.checkingInterval)
select {
case <-s.closeChan:
timer.Stop()
return
case <-timer.C:
}
currentIndex := s.RaftServer().CommitIndex()
count := currentIndex - s.snapConf.lastIndex
if uint64(count) > s.snapConf.snapshotThr {
err := s.raftServer.TakeSnapshot()
s.logSnapshot(err, currentIndex, count)
s.snapConf.lastIndex = currentIndex
}
}
}
func (s *PeerServer) monitorSync() {
ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
for {
select {
case <-s.closeChan:
return
case now := <-ticker.C:
if s.raftServer.State() == raft.Leader {
s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now))
}
}
}
}
// monitorTimeoutThreshold groups timeout threshold events together and prints
// them as a single log line.
func (s *PeerServer) monitorTimeoutThreshold() {
ticker := time.NewTicker(ThresholdMonitorTimeout)
defer ticker.Stop()
for {
select {
case <-s.closeChan:
return
case value := <-s.timeoutThresholdChan:
log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value)
}
select {
case <-s.closeChan:
return
case <-ticker.C:
}
}
}
// monitorActiveSize has the leader periodically check the status of cluster
// nodes and swaps them out for standbys as needed.
func (s *PeerServer) monitorActiveSize() {
ticker := time.NewTicker(ActiveMonitorTimeout)
defer ticker.Stop()
for {
select {
case <-s.closeChan:
return
case <-ticker.C:
}
// Ignore while this peer is not a leader.
if s.raftServer.State() != raft.Leader {
continue
}
// Retrieve target active size and actual active size.
activeSize := s.ClusterConfig().ActiveSize
peers := s.registry.Names()
peerCount := len(peers)
if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
peers = append(peers[:index], peers[index+1:]...)
}
// If we have more active nodes than we should then remove.
if peerCount > activeSize {
peer := peers[rand.Intn(len(peers))]
log.Infof("%s: removing node: %v; peer number %d > expected size %d", s.Config.Name, peer, peerCount, activeSize)
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
log.Infof("%s: warning: remove error: %v", s.Config.Name, err)
}
continue
}
}
}
// monitorPeerActivity has the leader periodically for dead nodes and demotes them.
func (s *PeerServer) monitorPeerActivity() {
ticker := time.NewTicker(PeerActivityMonitorTimeout)
defer ticker.Stop()
for {
select {
case <-s.closeChan:
return
case <-ticker.C:
}
// Ignore while this peer is not a leader.
if s.raftServer.State() != raft.Leader {
continue
}
// Check last activity for all peers.
now := time.Now()
removeDelay := time.Duration(int64(s.ClusterConfig().RemoveDelay * float64(time.Second)))
peers := s.raftServer.Peers()
for _, peer := range peers {
// If the last response from the peer is longer than the remove delay
// then automatically demote the peer.
if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay {
log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity()))
if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil {
log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err)
}
continue
}
}
}
}

View File

@@ -1,320 +0,0 @@
// +build ignore
package server
import (
"encoding/json"
"net/http"
"strconv"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/store"
)
// Get all the current logs
func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] GET %s/log", ps.Config.URL)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(ps.raftServer.LogEntries())
}
// Response to vote request
func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{}
if _, err := rvreq.Decode(req.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err)
return
}
log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName)
resp := ps.raftServer.RequestVote(rvreq)
if resp == nil {
log.Warn("[vote] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[vote] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
}
// Response to append entries request
func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
start := time.Now()
aereq := &raft.AppendEntriesRequest{}
if _, err := aereq.Decode(req.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err)
return
}
log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries))
ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
resp := ps.raftServer.AppendEntries(aereq)
if resp == nil {
log.Warn("[ae] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if !resp.Success() {
log.Debugf("[Append Entry] Step back")
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[ae] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
(*ps.metrics).Timer("timer.appendentries.handle").UpdateSince(start)
}
// Response to recover from snapshot request
func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
ssreq := &raft.SnapshotRequest{}
if _, err := ssreq.Decode(req.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err)
return
}
log.Debugf("[recv] POST %s/snapshot", ps.Config.URL)
resp := ps.raftServer.RequestSnapshot(ssreq)
if resp == nil {
log.Warn("[ss] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[ss] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
}
// Response to recover from snapshot request
func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
ssrreq := &raft.SnapshotRecoveryRequest{}
if _, err := ssrreq.Decode(req.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err)
return
}
log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL)
resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq)
if resp == nil {
log.Warn("[ssr] Error: nil response")
http.Error(w, "", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
log.Warn("[ssr] Error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
}
// Get the port that listening for etcd connecting of the server
func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL)
w.WriteHeader(http.StatusOK)
w.Write([]byte(ps.server.URL()))
}
// Response to the join request
func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{}
if err := uhttp.DecodeJsonRequest(req, command); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Debugf("Receive Join Request from %s", command.Name)
err := ps.server.Dispatch(command, w, req)
// Return status.
if err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
// Response to remove request
func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "DELETE" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
vars := mux.Vars(req)
command := &RemoveCommand{
Name: vars["name"],
}
log.Debugf("[recv] Remove Request [%s]", command.Name)
ps.server.Dispatch(command, w, req)
}
// Returns a JSON-encoded cluster configuration.
func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(ps.ClusterConfig())
}
// Updates the cluster configuration.
func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) {
// Decode map.
m := make(map[string]interface{})
if err := json.NewDecoder(req.Body).Decode(&m); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Copy config and update fields passed in.
cfg := ps.ClusterConfig()
if activeSize, ok := m["activeSize"].(float64); ok {
cfg.ActiveSize = int(activeSize)
}
if removeDelay, ok := m["removeDelay"].(float64); ok {
cfg.RemoveDelay = removeDelay
}
if syncInterval, ok := m["syncInterval"].(float64); ok {
cfg.SyncInterval = syncInterval
}
// Issue command to update.
c := &SetClusterConfigCommand{Config: cfg}
log.Debugf("[recv] Update Cluster Config Request")
ps.server.Dispatch(c, w, req)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(ps.ClusterConfig())
}
// Retrieves a list of peers and standbys.
func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
machines := make([]*machineMessage, 0)
leader := ps.raftServer.Leader()
for _, name := range ps.registry.Names() {
if msg := ps.getMachineMessage(name, leader); msg != nil {
machines = append(machines, msg)
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(&machines)
}
// Retrieve single peer or standby.
func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
m := ps.getMachineMessage(vars["name"], ps.raftServer.Leader())
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(m)
}
func (ps *PeerServer) getMachineMessage(name string, leader string) *machineMessage {
if !ps.registry.Exists(name) {
return nil
}
clientURL, _ := ps.registry.ClientURL(name)
peerURL, _ := ps.registry.PeerURL(name)
msg := &machineMessage{
Name: name,
State: raft.Follower,
ClientURL: clientURL,
PeerURL: peerURL,
}
if name == leader {
msg.State = raft.Leader
}
return msg
}
// Response to the name request
func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/name/ ", ps.Config.URL)
w.WriteHeader(http.StatusOK)
w.Write([]byte(ps.Config.Name))
}
// Response to the name request
func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/version/ ", ps.Config.URL)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strconv.Itoa(ps.store.Version())))
}
// Checks whether a given version is supported.
func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path)
vars := mux.Vars(req)
version, _ := strconv.Atoi(vars["version"])
if version >= store.MinVersion() && version <= store.MaxVersion() {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusForbidden)
}
}
// Upgrades the current store version to the next version.
func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/version", ps.Config.URL)
// Check if upgrade is possible for all nodes.
if err := ps.Upgradable(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Create an upgrade command from the current version.
c := ps.store.CommandFactory().CreateUpgradeCommand()
if err := ps.server.Dispatch(c, w, req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
// machineMessage represents information about a peer or standby in the registry.
type machineMessage struct {
Name string `json:"name"`
State string `json:"state"`
ClientURL string `json:"clientURL"`
PeerURL string `json:"peerURL"`
}

View File

@@ -1,65 +0,0 @@
// +build ignore
package server
import (
"math"
"time"
)
type raftFollowersStats struct {
Leader string `json:"leader"`
Followers map[string]*raftFollowerStats `json:"followers"`
}
func NewRaftFollowersStats(name string) *raftFollowersStats {
return &raftFollowersStats{
Leader: name,
Followers: make(map[string]*raftFollowerStats),
}
}
type raftFollowerStats struct {
Latency struct {
Current float64 `json:"current"`
Average float64 `json:"average"`
averageSquare float64
StandardDeviation float64 `json:"standardDeviation"`
Minimum float64 `json:"minimum"`
Maximum float64 `json:"maximum"`
} `json:"latency"`
Counts struct {
Fail uint64 `json:"fail"`
Success uint64 `json:"success"`
} `json:"counts"`
}
// Succ function update the raftFollowerStats with a successful send
func (ps *raftFollowerStats) Succ(d time.Duration) {
total := float64(ps.Counts.Success) * ps.Latency.Average
totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare
ps.Counts.Success++
ps.Latency.Current = float64(d) / (1000000.0)
if ps.Latency.Current > ps.Latency.Maximum {
ps.Latency.Maximum = ps.Latency.Current
}
if ps.Latency.Current < ps.Latency.Minimum {
ps.Latency.Minimum = ps.Latency.Current
}
ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success)
ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success)
// sdv = sqrt(avg(x^2) - avg(x)^2)
ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average)
}
// Fail function update the raftFollowerStats with a unsuccessful send
func (ps *raftFollowerStats) Fail() {
ps.Counts.Fail++
}

View File

@@ -1,81 +0,0 @@
// +build ignore
package server
import (
"sync"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
type raftServerStats struct {
Name string `json:"name"`
State string `json:"state"`
StartTime time.Time `json:"startTime"`
LeaderInfo struct {
Name string `json:"leader"`
Uptime string `json:"uptime"`
StartTime time.Time `json:"startTime"`
} `json:"leaderInfo"`
RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"`
RecvingPkgRate float64 `json:"recvPkgRate,omitempty"`
RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"`
SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"`
SendingPkgRate float64 `json:"sendPkgRate,omitempty"`
SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
sendRateQueue *statsQueue
recvRateQueue *statsQueue
sync.Mutex
}
func NewRaftServerStats(name string) *raftServerStats {
stats := &raftServerStats{
Name: name,
StartTime: time.Now(),
sendRateQueue: &statsQueue{
back: -1,
},
recvRateQueue: &statsQueue{
back: -1,
},
}
stats.LeaderInfo.StartTime = time.Now()
return stats
}
func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
ss.Lock()
defer ss.Unlock()
ss.State = raft.Follower
if leaderName != ss.LeaderInfo.Name {
ss.LeaderInfo.Name = leaderName
ss.LeaderInfo.StartTime = time.Now()
}
ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
ss.RecvAppendRequestCnt++
}
func (ss *raftServerStats) SendAppendReq(pkgSize int) {
ss.Lock()
defer ss.Unlock()
now := time.Now()
if ss.State != raft.Leader {
ss.State = raft.Leader
ss.LeaderInfo.Name = ss.Name
ss.LeaderInfo.StartTime = now
}
ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize))
ss.SendAppendRequestCnt++
}

View File

@@ -1,240 +0,0 @@
// +build ignore
package server
import (
"fmt"
"net/url"
"path"
"path/filepath"
"sort"
"strings"
"sync"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
)
// The location of the peer URL data.
const RegistryKey = "/_etcd/machines"
// The Registry stores URL information for nodes.
type Registry struct {
sync.Mutex
store store.Store
peers map[string]*node
}
// The internal storage format of the registry.
type node struct {
peerVersion string
peerURL string
url string
}
// Creates a new Registry.
func NewRegistry(s store.Store) *Registry {
return &Registry{
store: s,
peers: make(map[string]*node),
}
}
// Register adds a peer to the registry.
func (r *Registry) Register(name string, peerURL string, machURL string) error {
// Write data to store.
v := url.Values{}
v.Set("raft", peerURL)
v.Set("etcd", machURL)
log.Debugf("Register: %s", name)
if _, err := r.store.Create(path.Join(RegistryKey, name), false, v.Encode(), false, store.Permanent); err != nil {
return err
}
r.Lock()
defer r.Unlock()
r.peers[name] = r.load(RegistryKey, name)
return nil
}
// Unregister removes a peer from the registry.
func (r *Registry) Unregister(name string) error {
// Remove the key from the store.
log.Debugf("Unregister: %s", name)
_, err := r.store.Delete(path.Join(RegistryKey, name), false, false)
return err
}
// Count returns the number of peers in the cluster.
func (r *Registry) Count() int {
e, err := r.store.Get(RegistryKey, false, false)
if err != nil {
return 0
}
return len(e.Node.Nodes)
}
// Exists checks if a peer with the given name exists.
func (r *Registry) Exists(name string) bool {
e, err := r.store.Get(path.Join(RegistryKey, name), false, false)
if err != nil {
return false
}
return (e.Node != nil)
}
// Retrieves the client URL for a given node by name.
func (r *Registry) ClientURL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.clientURL(RegistryKey, name)
}
func (r *Registry) clientURL(key, name string) (string, bool) {
if r.peers[name] == nil {
if peer := r.load(key, name); peer != nil {
r.peers[name] = peer
}
}
if peer := r.peers[name]; peer != nil {
return peer.url, true
}
return "", false
}
// TODO(yichengq): have all of the code use a full URL with scheme
// and remove this method
// PeerHost retrieves the host part of peer URL for a given node by name.
func (r *Registry) PeerHost(name string) (string, bool) {
rawurl, ok := r.PeerURL(name)
if ok {
u, _ := url.Parse(rawurl)
return u.Host, ok
}
return rawurl, ok
}
// Retrieves the peer URL for a given node by name.
func (r *Registry) PeerURL(name string) (string, bool) {
r.Lock()
defer r.Unlock()
return r.peerURL(RegistryKey, name)
}
func (r *Registry) peerURL(key, name string) (string, bool) {
if r.peers[name] == nil {
if peer := r.load(key, name); peer != nil {
r.peers[name] = peer
}
}
if peer := r.peers[name]; peer != nil {
return peer.peerURL, true
}
return "", false
}
// UpdatePeerURL updates peer URL in registry
func (r *Registry) UpdatePeerURL(name string, peerURL string) error {
machURL, _ := r.clientURL(RegistryKey, name)
// Write data to store.
v := url.Values{}
v.Set("raft", peerURL)
v.Set("etcd", machURL)
log.Debugf("Update PeerURL: %s", name)
if _, err := r.store.Update(path.Join(RegistryKey, name), v.Encode(), store.Permanent); err != nil {
return err
}
r.Lock()
defer r.Unlock()
// Invalidate outdated cache.
r.invalidate(name)
return nil
}
func (r *Registry) name(key, name string) (string, bool) {
return name, true
}
// Names returns a list of cached peer names.
func (r *Registry) Names() []string {
names := r.urls(RegistryKey, "", "", r.name)
sort.Sort(sort.StringSlice(names))
return names
}
// Retrieves the Client URLs for all nodes.
func (r *Registry) ClientURLs(leaderName, selfName string) []string {
return r.urls(RegistryKey, leaderName, selfName, r.clientURL)
}
// Retrieves the Peer URLs for all nodes.
func (r *Registry) PeerURLs(leaderName, selfName string) []string {
return r.urls(RegistryKey, leaderName, selfName, r.peerURL)
}
// Retrieves the URLs for all nodes using url function.
func (r *Registry) urls(key, leaderName, selfName string, url func(key, name string) (string, bool)) []string {
r.Lock()
defer r.Unlock()
// Build list including the leader and self.
urls := make([]string, 0)
if url, _ := url(key, leaderName); len(url) > 0 {
urls = append(urls, url)
}
// Retrieve a list of all nodes.
if e, _ := r.store.Get(key, false, false); e != nil {
// Lookup the URL for each one.
for _, pair := range e.Node.Nodes {
_, name := filepath.Split(pair.Key)
if url, _ := url(key, name); len(url) > 0 && name != leaderName {
urls = append(urls, url)
}
}
}
log.Debugf("URLs: %s: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ","))
return urls
}
// Removes a node from the cache.
func (r *Registry) Invalidate(name string) {
r.Lock()
defer r.Unlock()
r.invalidate(name)
}
func (r *Registry) invalidate(name string) {
delete(r.peers, name)
}
// Loads the given node by name from the store into the cache.
func (r *Registry) load(key, name string) *node {
if name == "" {
return nil
}
// Retrieve from store.
e, err := r.store.Get(path.Join(key, name), false, false)
if err != nil {
return nil
}
// Parse as a query string.
m, err := url.ParseQuery(*e.Node.Value)
if err != nil {
panic(fmt.Sprintf("Failed to parse peers entry: %s", name))
}
// Create node.
return &node{
url: m["etcd"][0],
peerURL: m["raft"][0],
}
}

View File

@@ -1,5 +0,0 @@
// +build ignore
package server
const ReleaseVersion = "0.4.6+git"

View File

@@ -1,77 +0,0 @@
// +build ignore
package server
import (
"encoding/binary"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {
raft.RegisterCommand(&RemoveCommand{})
}
// The RemoveCommand removes a server from the cluster.
type RemoveCommand struct {
Name string `json:"name"`
}
// The name of the remove command in the log
func (c *RemoveCommand) CommandName() string {
return "etcd:remove"
}
// Remove a server from the cluster
func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) {
index, err := applyRemove(c, context)
if err != nil {
return nil, err
}
b := make([]byte, 8)
binary.PutUvarint(b, index)
return b, nil
}
// applyRemove removes the given machine from the cluster.
func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) {
ps, _ := context.Server().Context().(*PeerServer)
commitIndex := context.CommitIndex()
// Remove node from the shared registry.
err := ps.registry.Unregister(c.Name)
// Delete from stats
delete(ps.followersStats.Followers, c.Name)
if err != nil {
log.Debugf("Error while unregistering: %s (%v)", c.Name, err)
return 0, err
}
// Remove peer in raft
if err := context.Server().RemovePeer(c.Name); err != nil {
log.Debugf("Unable to remove peer: %s (%v)", c.Name, err)
return 0, err
}
if c.Name == context.Server().Name() {
// the removed node is this node
// if the node is not replaying the previous logs
// and the node has sent out a join request in this
// start. It is sure that this node received a new remove
// command and need to be removed
if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
log.Debugf("server [%s] is removed", context.Server().Name())
ps.asyncRemove()
} else {
// else ignore remove
log.Debugf("ignore previous remove command.")
ps.removedInLog = true
}
}
return commitIndex, nil
}

View File

@@ -1,344 +0,0 @@
// +build ignore
package server
import (
"encoding/json"
"fmt"
"net/http"
"net/http/pprof"
"strings"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"
ehttp "github.com/coreos/etcd/http"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/mod"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/server/v2"
"github.com/coreos/etcd/store"
_ "github.com/coreos/etcd/store/v2"
)
// This is the default implementation of the Server interface.
type Server struct {
Name string
url string
handler http.Handler
peerServer *PeerServer
registry *Registry
store store.Store
metrics *metrics.Bucket
trace bool
}
// Creates a new Server.
func New(name, url string, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server {
s := &Server{
Name: name,
url: url,
store: store,
registry: registry,
peerServer: peerServer,
metrics: mb,
}
return s
}
func (s *Server) EnableTracing() {
s.trace = true
}
// The current state of the server in the cluster.
func (s *Server) State() string {
return s.peerServer.RaftServer().State()
}
// The node name of the leader in the cluster.
func (s *Server) Leader() string {
return s.peerServer.RaftServer().Leader()
}
// The current Raft committed index.
func (s *Server) CommitIndex() uint64 {
return s.peerServer.RaftServer().CommitIndex()
}
// The current Raft term.
func (s *Server) Term() uint64 {
return s.peerServer.RaftServer().Term()
}
// The server URL.
func (s *Server) URL() string {
return s.url
}
// PeerHost retrieves the host part of Peer URL for a given node name.
func (s *Server) PeerHost(name string) (string, bool) {
return s.registry.PeerHost(name)
}
// Retrives the Peer URL for a given node name.
func (s *Server) PeerURL(name string) (string, bool) {
return s.registry.PeerURL(name)
}
// ClientURL retrieves the Client URL for a given node name.
func (s *Server) ClientURL(name string) (string, bool) {
return s.registry.ClientURL(name)
}
// Returns a reference to the Store.
func (s *Server) Store() store.Store {
return s.store
}
func (s *Server) SetRegistry(registry *Registry) {
s.registry = registry
}
func (s *Server) SetStore(store store.Store) {
s.store = store
}
func (s *Server) installV2(r *mux.Router) {
r2 := mux.NewRouter()
r.PathPrefix("/v2").Handler(ehttp.NewLowerQueryParamsHandler(r2))
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.GetHandler).Methods("GET", "HEAD")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PostHandler).Methods("POST")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE")
s.handleFunc(r2, "/v2/leader", s.GetLeaderHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/machines", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/peers", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/self", s.GetStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/speedTest", s.SpeedTestHandler).Methods("GET", "HEAD")
}
func (s *Server) installMod(r *mux.Router) {
r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.URL())))
}
func (s *Server) installDebug(r *mux.Router) {
s.handleFunc(r, "/debug/metrics", s.GetMetricsHandler).Methods("GET", "HEAD")
r.HandleFunc("/debug/pprof", pprof.Index)
r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
r.HandleFunc("/debug/pprof/profile", pprof.Profile)
r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
r.HandleFunc("/debug/pprof/{name}", pprof.Index)
}
// Adds a v2 server handler to the router.
func (s *Server) handleFuncV2(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route {
return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error {
return f(w, req, s)
})
}
type HEADResponseWriter struct {
http.ResponseWriter
}
func (w *HEADResponseWriter) Write([]byte) (int, error) {
return 0, nil
}
// Adds a server handler to the router.
func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route {
// Wrap the standard HandleFunc interface to pass in the server reference.
return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
if req.Method == "HEAD" {
w = &HEADResponseWriter{w}
}
// Log request.
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
// Execute handler function and return error if necessary.
if err := f(w, req); err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
w.Header().Set("Content-Type", "application/json")
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
})
}
func (s *Server) HTTPHandler() http.Handler {
router := mux.NewRouter()
// Install the routes.
s.handleFunc(router, "/version", s.GetVersionHandler).Methods("GET")
s.installV2(router)
// Mod is deprecated temporariy due to its unstable state.
// It would be added back later.
// s.installMod(router)
if s.trace {
s.installDebug(router)
}
return router
}
// Dispatch command to the current leader
func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
ps := s.peerServer
if ps.raftServer.State() == raft.Leader {
result, err := ps.raftServer.Do(c)
if err != nil {
return err
}
if result == nil {
return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
}
// response for raft related commands[join/remove]
if b, ok := result.([]byte); ok {
w.WriteHeader(http.StatusOK)
w.Write(b)
return nil
}
e, _ := result.(*store.Event)
b, _ := json.Marshal(e)
w.Header().Set("Content-Type", "application/json")
// etcd index should be the same as the event index
// which is also the last modified index of the node
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index()))
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
if e.IsCreated() {
w.WriteHeader(http.StatusCreated)
} else {
w.WriteHeader(http.StatusOK)
}
w.Write(b)
return nil
}
leader := ps.raftServer.Leader()
if leader == "" {
return etcdErr.NewError(300, "", s.Store().Index())
}
var url string
switch c.(type) {
case *JoinCommand, *RemoveCommand,
*SetClusterConfigCommand:
url, _ = ps.registry.PeerURL(leader)
default:
url, _ = ps.registry.ClientURL(leader)
}
uhttp.Redirect(url, w, req)
return nil
}
// Handler to return the current version of etcd.
func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "etcd %s", ReleaseVersion)
return nil
}
// Handler to return the current leader's raft address
func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
leader := s.peerServer.RaftServer().Leader()
if leader == "" {
return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index())
}
w.WriteHeader(http.StatusOK)
url, _ := s.registry.PeerURL(leader)
w.Write([]byte(url))
return nil
}
// Handler to return all the known peers in the current cluster.
func (s *Server) GetPeersHandler(w http.ResponseWriter, req *http.Request) error {
peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.Name)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(peers, ", ")))
return nil
}
// Retrieves stats on the Raft server.
func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.Write(s.peerServer.Stats())
return nil
}
// Retrieves stats on the leader.
func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
if s.peerServer.RaftServer().State() == raft.Leader {
w.Header().Set("Content-Type", "application/json")
w.Write(s.peerServer.PeerStats())
return nil
}
leader := s.peerServer.RaftServer().Leader()
if leader == "" {
return etcdErr.NewError(300, "", s.Store().Index())
}
hostname, _ := s.registry.ClientURL(leader)
uhttp.Redirect(hostname, w, req)
return nil
}
// Retrieves stats on the leader.
func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.Write(s.store.JsonStats())
return nil
}
// Executes a speed test to evaluate the performance of update replication.
func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) error {
count := 1000
c := make(chan bool, count)
for i := 0; i < count; i++ {
go func() {
for j := 0; j < 10; j++ {
c := s.Store().CommandFactory().CreateSetCommand("foo", false, "bar", time.Unix(0, 0))
s.peerServer.RaftServer().Do(c)
}
c <- true
}()
}
for i := 0; i < count; i++ {
<-c
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("speed test success"))
return nil
}
// Retrieves metrics from bucket
func (s *Server) GetMetricsHandler(w http.ResponseWriter, req *http.Request) error {
(*s.metrics).Dump(w)
return nil
}

View File

@@ -1,28 +0,0 @@
// +build ignore
package server
import (
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
func init() {
raft.RegisterCommand(&SetClusterConfigCommand{})
}
// SetClusterConfigCommand sets the cluster-level configuration.
type SetClusterConfigCommand struct {
Config *ClusterConfig `json:"config"`
}
// CommandName returns the name of the command.
func (c *SetClusterConfigCommand) CommandName() string {
return "etcd:setClusterConfig"
}
// Apply updates the cluster configuration.
func (c *SetClusterConfigCommand) Apply(context raft.Context) (interface{}, error) {
ps, _ := context.Server().Context().(*PeerServer)
ps.SetClusterConfig(c.Config)
return nil, nil
}

View File

@@ -1,341 +0,0 @@
// +build ignore
package server
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"sync"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/store"
)
const standbyInfoName = "standby_info"
type StandbyServerConfig struct {
Name string
PeerScheme string
PeerURL string
ClientURL string
DataDir string
}
type standbyInfo struct {
// stay running in standby mode
Running bool
Cluster []*machineMessage
SyncInterval float64
}
type StandbyServer struct {
Config StandbyServerConfig
client *Client
raftServer raft.Server
standbyInfo
joinIndex uint64
removeNotify chan bool
started bool
closeChan chan bool
routineGroup sync.WaitGroup
sync.Mutex
}
func NewStandbyServer(cfg StandbyServerConfig, client *Client) *StandbyServer {
s := &StandbyServer{
Config: cfg,
client: client,
standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
}
if err := s.loadInfo(); err != nil {
log.Warnf("error load standby info file: %v", err)
}
return s
}
func (s *StandbyServer) SetRaftServer(raftServer raft.Server) {
s.raftServer = raftServer
}
func (s *StandbyServer) Start() {
s.Lock()
defer s.Unlock()
if s.started {
return
}
s.started = true
s.removeNotify = make(chan bool)
s.closeChan = make(chan bool)
s.Running = true
if err := s.saveInfo(); err != nil {
log.Warnf("error saving cluster info for standby")
}
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
s.monitorCluster()
}()
}
// Stop stops the server gracefully.
func (s *StandbyServer) Stop() {
s.Lock()
defer s.Unlock()
if !s.started {
return
}
s.started = false
close(s.closeChan)
s.routineGroup.Wait()
}
// RemoveNotify notifies the server is removed from standby mode and ready
// for peer mode. It should have joined the cluster successfully.
func (s *StandbyServer) RemoveNotify() <-chan bool {
return s.removeNotify
}
func (s *StandbyServer) ClientHTTPHandler() http.Handler {
return http.HandlerFunc(s.redirectRequests)
}
func (s *StandbyServer) IsRunning() bool {
return s.Running
}
func (s *StandbyServer) ClusterURLs() []string {
peerURLs := make([]string, 0)
for _, peer := range s.Cluster {
peerURLs = append(peerURLs, peer.PeerURL)
}
return peerURLs
}
func (s *StandbyServer) ClusterSize() int {
return len(s.Cluster)
}
func (s *StandbyServer) setCluster(cluster []*machineMessage) {
s.Cluster = cluster
}
func (s *StandbyServer) SyncCluster(peers []string) error {
for i, url := range peers {
peers[i] = s.fullPeerURL(url)
}
if err := s.syncCluster(peers); err != nil {
log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
return err
}
log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
return nil
}
func (s *StandbyServer) SetSyncInterval(second float64) {
s.SyncInterval = second
}
func (s *StandbyServer) ClusterLeader() *machineMessage {
for _, machine := range s.Cluster {
if machine.State == raft.Leader {
return machine
}
}
return nil
}
func (s *StandbyServer) JoinIndex() uint64 {
return s.joinIndex
}
func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) {
leader := s.ClusterLeader()
if leader == nil {
w.Header().Set("Content-Type", "application/json")
etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w)
return
}
uhttp.Redirect(leader.ClientURL, w, r)
}
// monitorCluster assumes that the machine has tried to join the cluster and
// failed, so it waits for the interval at the beginning.
func (s *StandbyServer) monitorCluster() {
ticker := time.NewTicker(time.Duration(int64(s.SyncInterval * float64(time.Second))))
defer ticker.Stop()
for {
select {
case <-s.closeChan:
return
case <-ticker.C:
}
if err := s.syncCluster(nil); err != nil {
log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
continue
}
leader := s.ClusterLeader()
if leader == nil {
log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
continue
}
if err := s.join(leader.PeerURL); err != nil {
log.Debugf("fail joining through leader %v: %v", leader, err)
continue
}
log.Infof("join through leader %v", leader.PeerURL)
s.Running = false
if err := s.saveInfo(); err != nil {
log.Warnf("error saving cluster info for standby")
}
go func() {
s.Stop()
close(s.removeNotify)
}()
return
}
}
func (s *StandbyServer) syncCluster(peerURLs []string) error {
peerURLs = append(s.ClusterURLs(), peerURLs...)
for _, peerURL := range peerURLs {
// Fetch current peer list
machines, err := s.client.GetMachines(peerURL)
if err != nil {
log.Debugf("fail getting machine messages from %v", peerURL)
continue
}
cfg, err := s.client.GetClusterConfig(peerURL)
if err != nil {
log.Debugf("fail getting cluster config from %v", peerURL)
continue
}
s.setCluster(machines)
s.SetSyncInterval(cfg.SyncInterval)
if err := s.saveInfo(); err != nil {
log.Warnf("fail saving cluster info into disk: %v", err)
}
return nil
}
return fmt.Errorf("unreachable cluster")
}
func (s *StandbyServer) join(peer string) error {
for _, url := range s.ClusterURLs() {
if s.Config.PeerURL == url {
s.joinIndex = s.raftServer.CommitIndex()
return nil
}
}
// Our version must match the leaders version
version, err := s.client.GetVersion(peer)
if err != nil {
log.Debugf("error getting peer version")
return err
}
if version < store.MinVersion() || version > store.MaxVersion() {
log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
return fmt.Errorf("incompatible version")
}
// Fetch cluster config to see whether exists some place.
clusterConfig, err := s.client.GetClusterConfig(peer)
if err != nil {
log.Debugf("error getting cluster config")
return err
}
if clusterConfig.ActiveSize <= len(s.Cluster) {
log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
return fmt.Errorf("out of quota")
}
commitIndex, err := s.client.AddMachine(peer,
&JoinCommand{
MinVersion: store.MinVersion(),
MaxVersion: store.MaxVersion(),
Name: s.Config.Name,
RaftURL: s.Config.PeerURL,
EtcdURL: s.Config.ClientURL,
})
if err != nil {
log.Debugf("error on join request")
return err
}
s.joinIndex = commitIndex
return nil
}
func (s *StandbyServer) fullPeerURL(urlStr string) string {
u, err := url.Parse(urlStr)
if err != nil {
log.Warnf("fail parsing url %v", u)
return urlStr
}
u.Scheme = s.Config.PeerScheme
return u.String()
}
func (s *StandbyServer) loadInfo() error {
var info standbyInfo
path := filepath.Join(s.Config.DataDir, standbyInfoName)
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer file.Close()
if err = json.NewDecoder(file).Decode(&info); err != nil {
return err
}
s.standbyInfo = info
return nil
}
func (s *StandbyServer) saveInfo() error {
tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName)
if err != nil {
return err
}
if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
return err
}
tmpFile.Close()
path := filepath.Join(s.Config.DataDir, standbyInfoName)
if err = os.Rename(tmpFile.Name(), path); err != nil {
return err
}
return nil
}

View File

@@ -1,91 +0,0 @@
// +build ignore
package server
import (
"sync"
"time"
)
const (
queueCapacity = 200
)
type statsQueue struct {
items [queueCapacity]*packageStats
size int
front int
back int
totalPkgSize int
rwl sync.RWMutex
}
func (q *statsQueue) Len() int {
return q.size
}
func (q *statsQueue) PkgSize() int {
return q.totalPkgSize
}
// FrontAndBack gets the front and back elements in the queue
// We must grab front and back together with the protection of the lock
func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) {
q.rwl.RLock()
defer q.rwl.RUnlock()
if q.size != 0 {
return q.items[q.front], q.items[q.back]
}
return nil, nil
}
// Insert function insert a packageStats into the queue and update the records
func (q *statsQueue) Insert(p *packageStats) {
q.rwl.Lock()
defer q.rwl.Unlock()
q.back = (q.back + 1) % queueCapacity
if q.size == queueCapacity { //dequeue
q.totalPkgSize -= q.items[q.front].size
q.front = (q.back + 1) % queueCapacity
} else {
q.size++
}
q.items[q.back] = p
q.totalPkgSize += q.items[q.back].size
}
// Rate function returns the package rate and byte rate
func (q *statsQueue) Rate() (float64, float64) {
front, back := q.frontAndBack()
if front == nil || back == nil {
return 0, 0
}
if time.Now().Sub(back.Time()) > time.Second {
q.Clear()
return 0, 0
}
sampleDuration := back.Time().Sub(front.Time())
pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second)
return pr, br
}
// Clear function clear up the statsQueue
func (q *statsQueue) Clear() {
q.rwl.Lock()
defer q.rwl.Unlock()
q.back = -1
q.front = 0
q.size = 0
q.totalPkgSize = 0
}

View File

@@ -1,108 +0,0 @@
// +build ignore
package server
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"io/ioutil"
)
// TLSInfo holds the SSL certificates paths.
type TLSInfo struct {
CertFile string `json:"CertFile"`
KeyFile string `json:"KeyFile"`
CAFile string `json:"CAFile"`
}
func (info TLSInfo) Scheme() string {
if info.KeyFile != "" && info.CertFile != "" {
return "https"
} else {
return "http"
}
}
// Generates a tls.Config object for a server from the given files.
func (info TLSInfo) ServerConfig() (*tls.Config, error) {
// Both the key and cert must be present.
if info.KeyFile == "" || info.CertFile == "" {
return nil, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile)
}
var cfg tls.Config
tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
if err != nil {
return nil, err
}
cfg.Certificates = []tls.Certificate{tlsCert}
if info.CAFile != "" {
cfg.ClientAuth = tls.RequireAndVerifyClientCert
cp, err := newCertPool(info.CAFile)
if err != nil {
return nil, err
}
cfg.RootCAs = cp
cfg.ClientCAs = cp
} else {
cfg.ClientAuth = tls.NoClientCert
}
return &cfg, nil
}
// Generates a tls.Config object for a client from the given files.
func (info TLSInfo) ClientConfig() (*tls.Config, error) {
var cfg tls.Config
if info.KeyFile == "" || info.CertFile == "" {
return &cfg, nil
}
tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile)
if err != nil {
return nil, err
}
cfg.Certificates = []tls.Certificate{tlsCert}
if info.CAFile != "" {
cp, err := newCertPool(info.CAFile)
if err != nil {
return nil, err
}
cfg.RootCAs = cp
}
return &cfg, nil
}
// newCertPool creates x509 certPool with provided CA file
func newCertPool(CAFile string) (*x509.CertPool, error) {
certPool := x509.NewCertPool()
pemByte, err := ioutil.ReadFile(CAFile)
if err != nil {
return nil, err
}
for {
var block *pem.Block
block, pemByte = pem.Decode(pemByte)
if block == nil {
return certPool, nil
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return nil, err
}
certPool.AddCert(cert)
}
}

View File

@@ -1,259 +0,0 @@
// +build ignore
package server
import (
"bytes"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
)
const (
snapshotTimeout = time.Second * 120
)
// Transporter layer for communication between raft nodes
type transporter struct {
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
client *http.Client
transport *httpclient.Transport
snapshotClient *http.Client
snapshotTransport *httpclient.Transport
}
type dialer func(network, addr string) (net.Conn, error)
// Create transporter using by raft server
// Create http or https transporter based on
// whether the user give the server cert and key
func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
tr := &httpclient.Transport{
ResponseHeaderTimeout: responseHeaderTimeout,
// This is a workaround for Transport.CancelRequest doesn't work on
// HTTPS connections blocked. The patch for it is in progress,
// and would be available in Go1.3
// More: https://codereview.appspot.com/69280043/
ConnectTimeout: dialTimeout,
RequestTimeout: requestTimeout,
}
// Sending snapshot might take a long time so we use a different HTTP transporter
// Timeout is set to 120s (Around 100MB if the bandwidth is 10Mbits/s)
// This timeout is not calculated by heartbeat time.
// TODO(xiangl1) we can actually calculate the max bandwidth if we know
// average RTT.
// It should be equal to (TCP max window size/RTT).
sTr := &httpclient.Transport{
ConnectTimeout: dialTimeout,
RequestTimeout: snapshotTimeout,
}
t := transporter{
client: &http.Client{Transport: tr},
transport: tr,
snapshotClient: &http.Client{Transport: sTr},
snapshotTransport: sTr,
followersStats: followersStats,
serverStats: serverStats,
registry: registry,
}
return &t
}
func (t *transporter) SetTLSConfig(tlsConf tls.Config) {
t.transport.TLSClientConfig = &tlsConf
t.transport.DisableCompression = true
t.snapshotTransport.TLSClientConfig = &tlsConf
t.snapshotTransport.DisableCompression = true
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.ae.encoding.error:", err)
return nil
}
size := b.Len()
t.serverStats.SendAppendReq(size)
u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send LogEntries to %s ", u)
thisFollowerStats, ok := t.followersStats.Followers[peer.Name]
if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63
t.followersStats.Followers[peer.Name] = thisFollowerStats
}
start := time.Now()
resp, _, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
end := time.Now()
if err != nil {
log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok {
thisFollowerStats.Fail()
}
return nil
} else {
if ok {
thisFollowerStats.Succ(end.Sub(start))
}
}
if resp != nil {
defer resp.Body.Close()
aeresp := &raft.AppendEntriesResponse{}
if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
log.Warn("transporter.ae.decoding.error:", err)
return nil
}
return aeresp
}
return nil
}
// Sends RequestVote RPCs to a peer when the server is the candidate.
func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.vr.encoding.error:", err)
return nil
}
u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send Vote from %s to %s", server.Name(), u)
resp, _, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
if err != nil {
log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
rvrsp := &raft.RequestVoteResponse{}
if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
log.Warn("transporter.vr.decoding.error:", err)
return nil
}
return rvrsp
}
return nil
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.ss.encoding.error:", err)
return nil
}
u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
if err != nil {
log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
ssrsp := &raft.SnapshotResponse{}
if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
log.Warn("transporter.ss.decoding.error:", err)
return nil
}
return ssrsp
}
return nil
}
// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
log.Warn("transporter.ss.encoding.error:", err)
return nil
}
u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
resp, err := t.PostSnapshot(fmt.Sprintf("%s/snapshotRecovery", u), &b)
if err != nil {
log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
}
if resp != nil {
defer resp.Body.Close()
ssrrsp := &raft.SnapshotRecoveryResponse{}
if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
log.Warn("transporter.ssr.decoding.error:", err)
return nil
}
return ssrrsp
}
return nil
}
// Send server side POST request
func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("POST", urlStr, body)
resp, err := t.client.Do(req)
return resp, req, err
}
// Send server side GET request
func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("GET", urlStr, nil)
resp, err := t.client.Do(req)
return resp, req, err
}
// Send server side PUT request
func (t *transporter) Put(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("PUT", urlStr, body)
resp, err := t.client.Do(req)
return resp, req, err
}
// PostSnapshot posts a json format snapshot to the given url
// The underlying HTTP transport has a minute level timeout
func (t *transporter) PostSnapshot(url string, body io.Reader) (*http.Response, error) {
return t.snapshotClient.Post(url, "application/json", body)
}

View File

@@ -1,51 +0,0 @@
// +build ignore
package v2
import (
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
)
func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
recursive := (req.FormValue("recursive") == "true")
dir := (req.FormValue("dir") == "true")
req.ParseForm()
_, valueOk := req.Form["prevValue"]
_, indexOk := req.Form["prevIndex"]
if !valueOk && !indexOk {
c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive)
return s.Dispatch(c, w, req)
}
var err error
prevIndex := uint64(0)
prevValue := req.Form.Get("prevValue")
if indexOk {
prevIndexStr := req.Form.Get("prevIndex")
prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store().Index())
}
}
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store().Index())
}
}
c := s.Store().CommandFactory().CreateCompareAndDeleteCommand(key, prevValue, prevIndex)
return s.Dispatch(c, w, req)
}

View File

@@ -1,145 +0,0 @@
// +build ignore
package v2
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
)
func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
recursive := (req.FormValue("recursive") == "true")
sort := (req.FormValue("sorted") == "true")
if req.FormValue("quorum") == "true" {
c := s.Store().CommandFactory().CreateGetCommand(key, recursive, sort)
return s.Dispatch(c, w, req)
}
// Help client to redirect the request to the current leader
if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
leader := s.Leader()
hostname, _ := s.ClientURL(leader)
url, err := url.Parse(hostname)
if err != nil {
log.Warn("Redirect cannot parse hostName ", hostname)
return err
}
url.RawQuery = req.URL.RawQuery
url.Path = req.URL.Path
log.Debugf("Redirect consistent get to %s", url.String())
http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect)
return nil
}
waitIndex := req.FormValue("waitIndex")
stream := (req.FormValue("stream") == "true")
if req.FormValue("wait") == "true" {
return handleWatch(key, recursive, stream, waitIndex, w, req, s)
}
return handleGet(key, recursive, sort, w, req, s)
}
func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request, s Server) error {
// Create a command to watch from a given index (default 0).
var sinceIndex uint64 = 0
var err error
if waitIndex != "" {
sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index())
}
}
watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex)
if err != nil {
return err
}
cn, _ := w.(http.CloseNotifier)
closeChan := cn.CloseNotify()
writeHeaders(w, s)
w.(http.Flusher).Flush()
if stream {
// watcher hub will not help to remove stream watcher
// so we need to remove here
defer watcher.Remove()
for {
select {
case <-closeChan:
return nil
case event, ok := <-watcher.EventChan:
if !ok {
// If the channel is closed this may be an indication of
// that notifications are much more than we are able to
// send to the client in time. Then we simply end streaming.
return nil
}
if req.Method == "HEAD" {
continue
}
b, _ := json.Marshal(event)
_, err := w.Write(b)
if err != nil {
return nil
}
w.(http.Flusher).Flush()
}
}
}
select {
case <-closeChan:
watcher.Remove()
case event := <-watcher.EventChan:
if req.Method == "HEAD" {
return nil
}
b, _ := json.Marshal(event)
w.Write(b)
}
return nil
}
func handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request, s Server) error {
event, err := s.Store().Get(key, recursive, sort)
if err != nil {
return err
}
if req.Method == "HEAD" {
return nil
}
writeHeaders(w, s)
b, _ := json.Marshal(event)
w.Write(b)
return nil
}
func writeHeaders(w http.ResponseWriter, s Server) {
w.Header().Set("Content-Type", "application/json")
w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index()))
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
w.WriteHeader(http.StatusOK)
}

View File

@@ -1,26 +0,0 @@
// +build ignore
package v2
import (
"net/http"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
)
func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
value := req.FormValue("value")
dir := (req.FormValue("dir") == "true")
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index())
}
c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, true)
return s.Dispatch(c, w, req)
}

View File

@@ -1,100 +0,0 @@
// +build ignore
package v2
import (
"net/http"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
)
func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error {
var c raft.Command
vars := mux.Vars(req)
key := "/" + vars["key"]
req.ParseForm()
value := req.Form.Get("value")
dir := (req.FormValue("dir") == "true")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store().Index())
}
_, valueOk := req.Form["prevValue"]
prevValue := req.FormValue("prevValue")
_, indexOk := req.Form["prevIndex"]
prevIndexStr := req.FormValue("prevIndex")
_, existOk := req.Form["prevExist"]
prevExist := req.FormValue("prevExist")
// Set handler: create a new node or replace the old one.
if !valueOk && !indexOk && !existOk {
return SetHandler(w, req, s, key, dir, value, expireTime)
}
// update with test
if existOk {
if prevExist == "false" {
// Create command: create a new node. Fail, if a node already exists
// Ignore prevIndex and prevValue
return CreateHandler(w, req, s, key, dir, value, expireTime)
}
if prevExist == "true" && !indexOk && !valueOk {
return UpdateHandler(w, req, s, key, value, expireTime)
}
}
var prevIndex uint64
if indexOk {
prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store().Index())
}
} else {
prevIndex = 0
}
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store().Index())
}
}
c = s.Store().CommandFactory().CreateCompareAndSwapCommand(key, value, prevValue, prevIndex, expireTime)
return s.Dispatch(c, w, req)
}
func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error {
c := s.Store().CommandFactory().CreateSetCommand(key, dir, value, expireTime)
return s.Dispatch(c, w, req)
}
func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error {
c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, false)
return s.Dispatch(c, w, req)
}
func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error {
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store().Index())
}
c := s.Store().CommandFactory().CreateUpdateCommand(key, value, expireTime)
return s.Dispatch(c, w, req)
}

View File

@@ -1,22 +0,0 @@
// +build ignore
package v2
import (
"net/http"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
)
// The Server interface provides all the methods required for the v2 API.
type Server interface {
State() string
Leader() string
CommitIndex() uint64
Term() uint64
PeerURL(string) (string, bool)
ClientURL(string) (string, bool)
Store() store.Store
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
}

View File

@@ -1,5 +0,0 @@
// +build ignore
package server
const Version = "v2"