From 4296cd3fa4f6153f825f4288a19f7a1522e172f8 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 21 Aug 2014 13:00:58 -0700 Subject: [PATCH] *: remove old server --- server/client.go | 186 ------ server/cluster_config.go | 51 -- server/join_command.go | 118 ---- server/listener.go | 68 -- server/package_stats.go | 27 - server/peer_server.go | 899 --------------------------- server/peer_server_handlers.go | 320 ---------- server/raft_follower_stats.go | 65 -- server/raft_server_stats.go | 81 --- server/registry.go | 240 ------- server/release_version.go | 5 - server/remove_command.go | 77 --- server/server.go | 344 ---------- server/set_cluster_config_command.go | 28 - server/standby_server.go | 341 ---------- server/stats_queue.go | 91 --- server/tls_info.go | 108 ---- server/transporter.go | 259 -------- server/v2/delete_handler.go | 51 -- server/v2/get_handler.go | 145 ----- server/v2/post_handler.go | 26 - server/v2/put_handler.go | 100 --- server/v2/v2.go | 22 - server/version.go | 5 - 24 files changed, 3657 deletions(-) delete mode 100644 server/client.go delete mode 100644 server/cluster_config.go delete mode 100644 server/join_command.go delete mode 100644 server/listener.go delete mode 100644 server/package_stats.go delete mode 100644 server/peer_server.go delete mode 100644 server/peer_server_handlers.go delete mode 100644 server/raft_follower_stats.go delete mode 100644 server/raft_server_stats.go delete mode 100644 server/registry.go delete mode 100644 server/release_version.go delete mode 100644 server/remove_command.go delete mode 100644 server/server.go delete mode 100644 server/set_cluster_config_command.go delete mode 100644 server/standby_server.go delete mode 100644 server/stats_queue.go delete mode 100644 server/tls_info.go delete mode 100644 server/transporter.go delete mode 100644 server/v2/delete_handler.go delete mode 100644 server/v2/get_handler.go delete mode 100644 server/v2/post_handler.go delete mode 100644 server/v2/put_handler.go delete mode 100644 server/v2/v2.go delete mode 100644 server/version.go diff --git a/server/client.go b/server/client.go deleted file mode 100644 index a6b0529c0..000000000 --- a/server/client.go +++ /dev/null @@ -1,186 +0,0 @@ -// +build ignore - -package server - -import ( - "bytes" - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "strconv" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" -) - -// Client sends various requests using HTTP API. -// It is different from raft communication, and doesn't record anything in the log. -// The argument url is required to contain scheme and host only, and -// there is no trailing slash in it. -// Public functions return "etcd/error".Error intentionally to figure out -// etcd error code easily. -// TODO(yichengq): It is similar to go-etcd. But it could have many efforts -// to integrate the two. Leave it for further discussion. -type Client struct { - http.Client -} - -func NewClient(transport http.RoundTripper) *Client { - return &Client{http.Client{Transport: transport}} -} - -// CheckVersion returns true when the version check on the server returns 200. -func (c *Client) CheckVersion(url string, version int) (bool, *etcdErr.Error) { - resp, err := c.Get(url + fmt.Sprintf("/version/%d/check", version)) - if err != nil { - return false, clientError(err) - } - - defer resp.Body.Close() - - return resp.StatusCode == 200, nil -} - -// GetVersion fetches the peer version of a cluster. -func (c *Client) GetVersion(url string) (int, *etcdErr.Error) { - resp, err := c.Get(url + "/version") - if err != nil { - return 0, clientError(err) - } - - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, clientError(err) - } - - // Parse version number. - version, err := strconv.Atoi(string(body)) - if err != nil { - return 0, clientError(err) - } - return version, nil -} - -func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) { - resp, err := c.Get(url + "/v2/admin/machines") - if err != nil { - return nil, clientError(err) - } - - msgs := new([]*machineMessage) - if uerr := c.parseJSONResponse(resp, msgs); uerr != nil { - return nil, uerr - } - return *msgs, nil -} - -func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) { - resp, err := c.Get(url + "/v2/admin/config") - if err != nil { - return nil, clientError(err) - } - - cfg := new(ClusterConfig) - if uerr := c.parseJSONResponse(resp, cfg); uerr != nil { - return nil, uerr - } - return cfg, nil -} - -// AddMachine adds machine to the cluster. -// The first return value is the commit index of join command. -func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) { - b, _ := json.Marshal(cmd) - url = url + "/join" - - log.Infof("Send Join Request to %s", url) - resp, err := c.put(url, b) - if err != nil { - return 0, clientError(err) - } - defer resp.Body.Close() - - if err := c.checkErrorResponse(resp); err != nil { - return 0, err - } - b, err = ioutil.ReadAll(resp.Body) - if err != nil { - return 0, clientError(err) - } - index, numRead := binary.Uvarint(b) - if numRead < 0 { - return 0, clientError(fmt.Errorf("buf too small, or value too large")) - } - return index, nil -} - -func (c *Client) parseJSONResponse(resp *http.Response, val interface{}) *etcdErr.Error { - defer resp.Body.Close() - - if err := c.checkErrorResponse(resp); err != nil { - return err - } - if err := json.NewDecoder(resp.Body).Decode(val); err != nil { - log.Debugf("Error parsing join response: %v", err) - return clientError(err) - } - return nil -} - -func (c *Client) checkErrorResponse(resp *http.Response) *etcdErr.Error { - if resp.StatusCode != http.StatusOK { - uerr := &etcdErr.Error{} - if err := json.NewDecoder(resp.Body).Decode(uerr); err != nil { - log.Debugf("Error parsing response to etcd error: %v", err) - return clientError(err) - } - return uerr - } - return nil -} - -// put sends server side PUT request. -// It always follows redirects instead of stopping according to RFC 2616. -func (c *Client) put(urlStr string, body []byte) (*http.Response, error) { - return c.doAlwaysFollowingRedirects("PUT", urlStr, body) -} - -func (c *Client) doAlwaysFollowingRedirects(method string, urlStr string, body []byte) (resp *http.Response, err error) { - var req *http.Request - - for redirect := 0; redirect < 10; redirect++ { - req, err = http.NewRequest(method, urlStr, bytes.NewBuffer(body)) - if err != nil { - return - } - - if resp, err = c.Do(req); err != nil { - if resp != nil { - resp.Body.Close() - } - return - } - - if resp.StatusCode == http.StatusMovedPermanently || resp.StatusCode == http.StatusTemporaryRedirect { - resp.Body.Close() - if urlStr = resp.Header.Get("Location"); urlStr == "" { - err = errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode)) - return - } - continue - } - return - } - - err = errors.New("stopped after 10 redirects") - return -} - -func clientError(err error) *etcdErr.Error { - return etcdErr.NewError(etcdErr.EcodeClientInternal, err.Error(), 0) -} diff --git a/server/cluster_config.go b/server/cluster_config.go deleted file mode 100644 index c2beda4aa..000000000 --- a/server/cluster_config.go +++ /dev/null @@ -1,51 +0,0 @@ -// +build ignore - -package server - -import ( - "time" -) - -const ( - // DefaultActiveSize is the default number of active followers allowed. - DefaultActiveSize = 9 - - // MinActiveSize is the minimum active size allowed. - MinActiveSize = 3 - - // DefaultRemoveDelay is the default elapsed time before removal. - DefaultRemoveDelay = float64((30 * time.Minute) / time.Second) - - // MinRemoveDelay is the minimum remove delay allowed. - MinRemoveDelay = float64((2 * time.Second) / time.Second) - - // DefaultSyncInterval is the default interval for cluster sync. - DefaultSyncInterval = float64((5 * time.Second) / time.Second) - - // MinSyncInterval is the minimum sync interval allowed. - MinSyncInterval = float64((1 * time.Second) / time.Second) -) - -// ClusterConfig represents cluster-wide configuration settings. -type ClusterConfig struct { - // ActiveSize is the maximum number of node that can join as Raft followers. - // Nodes that join the cluster after the limit is reached are standbys. - ActiveSize int `json:"activeSize"` - - // RemoveDelay is the amount of time, in seconds, after a node is - // unreachable that it will be swapped out as a standby node. - RemoveDelay float64 `json:"removeDelay"` - - // SyncInterval is the amount of time, in seconds, between - // cluster sync when it runs in standby mode. - SyncInterval float64 `json:"syncInterval"` -} - -// NewClusterConfig returns a cluster configuration with default settings. -func NewClusterConfig() *ClusterConfig { - return &ClusterConfig{ - ActiveSize: DefaultActiveSize, - RemoveDelay: DefaultRemoveDelay, - SyncInterval: DefaultSyncInterval, - } -} diff --git a/server/join_command.go b/server/join_command.go deleted file mode 100644 index 600118d86..000000000 --- a/server/join_command.go +++ /dev/null @@ -1,118 +0,0 @@ -// +build ignore - -package server - -import ( - "encoding/binary" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" -) - -func init() { - raft.RegisterCommand(&JoinCommand{}) -} - -// JoinCommand represents a request to join the cluster. -// The command returns the join_index (Uvarint). -type JoinCommand struct { - MinVersion int `json:"minVersion"` - MaxVersion int `json:"maxVersion"` - Name string `json:"name"` - RaftURL string `json:"raftURL"` - EtcdURL string `json:"etcdURL"` -} - -// The name of the join command in the log -func (c *JoinCommand) CommandName() string { - return "etcd:join" -} - -// Apply attempts to join a machine to the cluster. -func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { - index, err := applyJoin(c, context) - if err != nil { - return nil, err - } - - b := make([]byte, 8) - binary.PutUvarint(b, index) - return b, nil -} - -func (c *JoinCommand) NodeName() string { - return c.Name -} - -// applyJoin attempts to join a machine to the cluster. -func applyJoin(c *JoinCommand, context raft.Context) (uint64, error) { - ps, _ := context.Server().Context().(*PeerServer) - commitIndex := context.CommitIndex() - - // Make sure we're not getting a cached value from the registry. - ps.registry.Invalidate(c.Name) - - // Check if the join command is from a previous peer, who lost all its previous log. - if peerURL, ok := ps.registry.PeerURL(c.Name); ok { - // If previous node restarts with different peer URL, - // update its information. - if peerURL != c.RaftURL { - log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name) - if err := updatePeerURL(c, ps); err != nil { - return 0, err - } - } - if c.Name == context.Server().Name() { - ps.removedInLog = false - } - return commitIndex, nil - } - - // Check if the join command adds an instance that collides with existing one on peer URL. - peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name) - for _, peerURL := range peerURLs { - if peerURL == c.RaftURL { - log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL) - return 0, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex()) - } - } - - // Check peer number in the cluster - count := ps.registry.Count() - // ClusterConfig doesn't init until first machine is added - if count > 0 && count >= ps.ClusterConfig().ActiveSize { - log.Debug("Reject join request from ", c.Name) - return 0, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) - } - - // Add to shared peer registry. - ps.registry.Register(c.Name, c.RaftURL, c.EtcdURL) - - // Add peer in raft - if err := context.Server().AddPeer(c.Name, ""); err != nil { - return 0, err - } - - // Add peer stats - if c.Name != ps.RaftServer().Name() { - ps.followersStats.Followers[c.Name] = &raftFollowerStats{} - ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 - } - - if c.Name == context.Server().Name() { - ps.removedInLog = false - } - return commitIndex, nil -} - -func updatePeerURL(c *JoinCommand, ps *PeerServer) error { - log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL) - if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil { - log.Debugf("Error while updating in registry: %s (%v)", c.Name, err) - return err - } - // Flush commit index, so raft will replay to here when restart - ps.raftServer.FlushCommitIndex() - return nil -} diff --git a/server/listener.go b/server/listener.go deleted file mode 100644 index 4de5d05eb..000000000 --- a/server/listener.go +++ /dev/null @@ -1,68 +0,0 @@ -// +build ignore - -package server - -import ( - "crypto/tls" - "net" - "time" - - "github.com/coreos/etcd/log" -) - -// TLSServerConfig generates tls configuration based on TLSInfo -// If any error happens, this function will call log.Fatal -func TLSServerConfig(info *TLSInfo) *tls.Config { - if info.KeyFile == "" || info.CertFile == "" { - return nil - } - - cfg, err := info.ServerConfig() - if err != nil { - log.Fatal("TLS info error: ", err) - } - return cfg -} - -// NewListener creates a net.Listener -// If the given scheme is "https", it will use TLS config to set listener. -// If any error happens, this function will call log.Fatal -func NewListener(scheme, addr string, cfg *tls.Config) net.Listener { - if scheme == "https" { - l, err := newTLSListener(addr, cfg) - if err != nil { - log.Fatal("Failed to create TLS listener: ", err) - } - return l - } - - l, err := newListener(addr) - if err != nil { - log.Fatal("Failed to create listener: ", err) - } - return l -} - -func newListener(addr string) (net.Listener, error) { - if addr == "" { - addr = ":http" - } - l, e := net.Listen("tcp", addr) - if e != nil { - return nil, e - } - return l, nil -} - -func newTLSListener(addr string, cfg *tls.Config) (net.Listener, error) { - if addr == "" { - addr = ":https" - } - - conn, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - - return tls.NewListener(conn, cfg), nil -} diff --git a/server/package_stats.go b/server/package_stats.go deleted file mode 100644 index bd439463b..000000000 --- a/server/package_stats.go +++ /dev/null @@ -1,27 +0,0 @@ -// +build ignore - -package server - -import ( - "time" -) - -// packageStats represent the stats we need for a package. -// It has sending time and the size of the package. -type packageStats struct { - sendingTime time.Time - size int -} - -// NewPackageStats creates a pacakgeStats and return the pointer to it. -func NewPackageStats(now time.Time, size int) *packageStats { - return &packageStats{ - sendingTime: now, - size: size, - } -} - -// Time return the sending time of the package. -func (ps *packageStats) Time() time.Time { - return ps.sendingTime -} diff --git a/server/peer_server.go b/server/peer_server.go deleted file mode 100644 index 1f68bfad3..000000000 --- a/server/peer_server.go +++ /dev/null @@ -1,899 +0,0 @@ -// +build ignore - -package server - -import ( - "encoding/json" - "fmt" - "math/rand" - "net/http" - "net/url" - "sort" - "strings" - "sync" - "time" - - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" - - "github.com/coreos/etcd/discovery" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/metrics" - "github.com/coreos/etcd/pkg/btrfs" - "github.com/coreos/etcd/store" -) - -const ( - // MaxHeartbeatTimeoutBackoff is the maximum number of seconds before we warn - // the user again about a peer not accepting heartbeats. - MaxHeartbeatTimeoutBackoff = 15 * time.Second - - // ThresholdMonitorTimeout is the time between log notifications that the - // Raft heartbeat is too close to the election timeout. - ThresholdMonitorTimeout = 5 * time.Second - - // ActiveMonitorTimeout is the time between checks on the active size of - // the cluster. If the active size is bigger than the actual size then - // etcd attempts to demote to bring it to the correct number. - ActiveMonitorTimeout = 1 * time.Second - - // PeerActivityMonitorTimeout is the time between checks for dead nodes in - // the cluster. - PeerActivityMonitorTimeout = 1 * time.Second - - // The location of cluster config in key space. - ClusterConfigKey = "/_etcd/config" -) - -type PeerServerConfig struct { - Name string - Scheme string - URL string - SnapshotCount int - RetryTimes int - RetryInterval float64 -} - -type PeerServer struct { - Config PeerServerConfig - client *Client - raftServer raft.Server - server *Server - followersStats *raftFollowersStats - serverStats *raftServerStats - registry *Registry - store store.Store - snapConf *snapshotConf - - joinIndex uint64 - isNewCluster bool - removedInLog bool - - removeNotify chan bool - started bool - closeChan chan bool - routineGroup sync.WaitGroup - timeoutThresholdChan chan interface{} - - logBackoffs map[string]*logBackoff - - metrics *metrics.Bucket - sync.Mutex -} - -type logBackoff struct { - next time.Time - backoff time.Duration - count int -} - -// TODO: find a good policy to do snapshot -type snapshotConf struct { - // Etcd will check if snapshot is need every checkingInterval - checkingInterval time.Duration - - // The index when the last snapshot happened - lastIndex uint64 - - // If the incremental number of index since the last snapshot - // exceeds the snapshot Threshold, etcd will do a snapshot - snapshotThr uint64 -} - -func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { - s := &PeerServer{ - Config: psConfig, - client: client, - registry: registry, - store: store, - followersStats: followersStats, - serverStats: serverStats, - - timeoutThresholdChan: make(chan interface{}, 1), - logBackoffs: make(map[string]*logBackoff), - - metrics: mb, - } - - return s -} - -func (s *PeerServer) SetRaftServer(raftServer raft.Server, snapshot bool) { - s.snapConf = &snapshotConf{ - checkingInterval: time.Second * 3, - // this is not accurate, we will update raft to provide an api - lastIndex: raftServer.CommitIndex(), - snapshotThr: uint64(s.Config.SnapshotCount), - } - - raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) - raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) - raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger) - raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger) - raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger) - raftServer.AddEventListener(raft.HeartbeatIntervalEventType, s.raftEventLogger) - raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger) - - raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) - - raftServer.AddEventListener(raft.RemovedEventType, s.removedEvent) - - s.raftServer = raftServer - s.removedInLog = false - - // LoadSnapshot - if snapshot { - err := s.raftServer.LoadSnapshot() - - if err == nil { - log.Debugf("%s finished load snapshot", s.Config.Name) - } else { - log.Debug(err) - } - } - - s.raftServer.Init() - - // Set NOCOW for data directory in btrfs - if btrfs.IsBtrfs(s.raftServer.LogPath()) { - if err := btrfs.SetNOCOWFile(s.raftServer.LogPath()); err != nil { - log.Warnf("Failed setting NOCOW: %v", err) - } - } -} - -func (s *PeerServer) SetRegistry(registry *Registry) { - s.registry = registry -} - -func (s *PeerServer) SetStore(store store.Store) { - s.store = store -} - -// Try all possible ways to find clusters to join -// Include log data in -data-dir, -discovery and -peers -// -// Peer discovery follows this order: -// 1. previous peers in -data-dir -// 2. -discovery -// 3. -peers -func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bool, possiblePeers []string, err error) { - name := s.Config.Name - isNewNode := s.raftServer.IsLogEmpty() - - // Try its best to find possible peers, and connect with them. - if !isNewNode { - // It is not allowed to join the cluster with existing peer address - // This prevents old node joining with different name by mistake. - if !s.checkPeerAddressNonconflict() { - err = fmt.Errorf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL) - return - } - - // Take old nodes into account. - possiblePeers = s.getKnownPeers() - // Discover registered peers. - // TODO(yichengq): It may mess up discoverURL if this is - // set wrong by mistake. This may need to refactor discovery - // module. Fix it later. - if discoverURL != "" { - discoverPeers, _ := s.handleDiscovery(discoverURL) - possiblePeers = append(possiblePeers, discoverPeers...) - } - possiblePeers = append(possiblePeers, peers...) - possiblePeers = s.removeSelfFromList(possiblePeers) - - if s.removedInLog { - return - } - - // If there is possible peer list, use it to find cluster. - if len(possiblePeers) > 0 { - // TODO(yichengq): joinCluster may fail if there's no leader for - // current cluster. It should wait if the cluster is under - // leader election, or the node with changed IP cannot join - // the cluster then. - if rejected, ierr := s.startAsFollower(possiblePeers, 1); rejected { - log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr) - return - } else if ierr != nil { - log.Warnf("%s cannot connect to previous cluster %v: %v", name, possiblePeers, ierr) - } else { - log.Debugf("%s joins to the previous cluster %v", name, possiblePeers) - toStart = true - return - } - } - - // TODO(yichengq): Think about the action that should be done - // if it cannot connect any of the previous known node. - log.Debugf("%s is restarting the cluster %v", name, possiblePeers) - s.SetJoinIndex(s.raftServer.CommitIndex()) - toStart = true - return - } - - // Attempt cluster discovery - if discoverURL != "" { - discoverPeers, discoverErr := s.handleDiscovery(discoverURL) - // It is not registered in discover url - if discoverErr != nil { - log.Warnf("%s failed to connect discovery service[%v]: %v", name, discoverURL, discoverErr) - if len(peers) == 0 { - err = fmt.Errorf("%s, the new instance, must register itself to discovery service as required", name) - return - } - log.Debugf("%s is joining peers %v from -peers flag", name, peers) - } else { - log.Debugf("%s is joining a cluster %v via discover service", name, discoverPeers) - peers = discoverPeers - } - } - possiblePeers = peers - - if len(possiblePeers) > 0 { - if rejected, ierr := s.startAsFollower(possiblePeers, s.Config.RetryTimes); rejected { - log.Debugf("%s should work as standby for the cluster %v: %v", name, possiblePeers, ierr) - } else if ierr != nil { - log.Warnf("%s cannot connect to existing peers %v: %v", name, possiblePeers, ierr) - err = ierr - } else { - toStart = true - } - return - } - - // start as a leader in a new cluster - s.isNewCluster = true - log.Infof("%s is starting a new cluster", s.Config.Name) - toStart = true - return -} - -// Start starts the raft server. -// The function assumes that join has been accepted successfully. -func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error { - s.Lock() - defer s.Unlock() - if s.started { - return nil - } - s.started = true - - s.removeNotify = make(chan bool) - s.closeChan = make(chan bool) - - s.raftServer.Start() - if s.isNewCluster { - s.InitNewCluster(clusterConfig) - s.isNewCluster = false - } - - s.startRoutine(s.monitorSync) - s.startRoutine(s.monitorTimeoutThreshold) - s.startRoutine(s.monitorActiveSize) - s.startRoutine(s.monitorPeerActivity) - - // open the snapshot - if snapshot { - s.startRoutine(s.monitorSnapshot) - } - - return nil -} - -// Stop stops the server gracefully. -func (s *PeerServer) Stop() { - s.Lock() - defer s.Unlock() - if !s.started { - return - } - s.started = false - - close(s.closeChan) - // TODO(yichengq): it should also call async stop for raft server, - // but this functionality has not been implemented. - s.raftServer.Stop() - s.routineGroup.Wait() -} - -// asyncRemove stops the server in peer mode. -// It is called to stop the server internally when it has been removed -// from the cluster. -// The function triggers the stop action first to notice server that it -// should not continue, and wait for its stop in separate goroutine because -// the caller should also exit. -func (s *PeerServer) asyncRemove() { - s.Lock() - if !s.started { - s.Unlock() - return - } - s.started = false - - close(s.closeChan) - // TODO(yichengq): it should also call async stop for raft server, - // but this functionality has not been implemented. - go func() { - s.raftServer.Stop() - s.routineGroup.Wait() - close(s.removeNotify) - s.Unlock() - }() -} - -// RemoveNotify notifies the server is removed from peer mode due to -// removal from the cluster. -func (s *PeerServer) RemoveNotify() <-chan bool { - return s.removeNotify -} - -func (s *PeerServer) HTTPHandler() http.Handler { - router := mux.NewRouter() - - // internal commands - router.HandleFunc("/name", s.NameHttpHandler) - router.HandleFunc("/version", s.VersionHttpHandler) - router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler) - router.HandleFunc("/upgrade", s.UpgradeHttpHandler) - router.HandleFunc("/join", s.JoinHttpHandler) - router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler) - router.HandleFunc("/vote", s.VoteHttpHandler) - router.HandleFunc("/log", s.GetLogHttpHandler) - router.HandleFunc("/log/append", s.AppendEntriesHttpHandler) - router.HandleFunc("/snapshot", s.SnapshotHttpHandler) - router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) - router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) - - router.HandleFunc("/v2/admin/config", s.getClusterConfigHttpHandler).Methods("GET") - router.HandleFunc("/v2/admin/config", s.setClusterConfigHttpHandler).Methods("PUT") - router.HandleFunc("/v2/admin/machines", s.getMachinesHttpHandler).Methods("GET") - router.HandleFunc("/v2/admin/machines/{name}", s.getMachineHttpHandler).Methods("GET") - router.HandleFunc("/v2/admin/machines/{name}", s.RemoveHttpHandler).Methods("DELETE") - - return router -} - -func (s *PeerServer) SetJoinIndex(joinIndex uint64) { - s.joinIndex = joinIndex -} - -// ClusterConfig retrieves the current cluster configuration. -func (s *PeerServer) ClusterConfig() *ClusterConfig { - e, err := s.store.Get(ClusterConfigKey, false, false) - // This is useful for backward compatibility because it doesn't - // set cluster config in older version. - if err != nil { - log.Debugf("failed getting cluster config key: %v", err) - return NewClusterConfig() - } - - var c ClusterConfig - if err = json.Unmarshal([]byte(*e.Node.Value), &c); err != nil { - log.Debugf("failed unmarshaling cluster config: %v", err) - return NewClusterConfig() - } - return &c -} - -// SetClusterConfig updates the current cluster configuration. -// Adjusting the active size will cause cluster to add or remove machines -// to match the new size. -func (s *PeerServer) SetClusterConfig(c *ClusterConfig) { - // Set minimums. - if c.ActiveSize < MinActiveSize { - c.ActiveSize = MinActiveSize - } - if c.RemoveDelay < MinRemoveDelay { - c.RemoveDelay = MinRemoveDelay - } - if c.SyncInterval < MinSyncInterval { - c.SyncInterval = MinSyncInterval - } - - log.Debugf("set cluster config as %v", c) - b, _ := json.Marshal(c) - s.store.Set(ClusterConfigKey, false, string(b), store.Permanent) -} - -// Retrieves the underlying Raft server. -func (s *PeerServer) RaftServer() raft.Server { - return s.raftServer -} - -// Associates the client server with the peer server. -func (s *PeerServer) SetServer(server *Server) { - s.server = server -} - -func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) { - // leader need to join self as a peer - s.doCommand(&JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: s.raftServer.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), - }) - log.Debugf("%s start as a leader", s.Config.Name) - s.joinIndex = 1 - - s.doCommand(&SetClusterConfigCommand{Config: clusterConfig}) - log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig) -} - -func (s *PeerServer) doCommand(cmd raft.Command) { - for { - if _, err := s.raftServer.Do(cmd); err == nil { - break - } - } - log.Debugf("%s start as a leader", s.Config.Name) -} - -func (s *PeerServer) startAsFollower(cluster []string, retryTimes int) (bool, error) { - // start as a follower in a existing cluster - for i := 0; ; i++ { - if rejected, err := s.joinCluster(cluster); rejected { - return true, err - } else if err == nil { - return false, nil - } - if i == retryTimes-1 { - break - } - log.Infof("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval) - time.Sleep(time.Second * time.Duration(s.Config.RetryInterval)) - continue - } - return false, fmt.Errorf("fail joining the cluster via given peers after %x retries", retryTimes) -} - -// Upgradable checks whether all peers in a cluster support an upgrade to the next store version. -func (s *PeerServer) Upgradable() error { - nextVersion := s.store.Version() + 1 - for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) { - u, err := url.Parse(peerURL) - if err != nil { - return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err) - } - - url := (&url.URL{Host: u.Host, Scheme: s.Config.Scheme}).String() - ok, err := s.client.CheckVersion(url, nextVersion) - if err != nil { - return err - } - if !ok { - return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host) - } - } - - return nil -} - -// checkPeerAddressNonconflict checks whether the peer address has existed with different name. -func (s *PeerServer) checkPeerAddressNonconflict() bool { - // there exists the (name, peer address) pair - if peerURL, ok := s.registry.PeerURL(s.Config.Name); ok { - if peerURL == s.Config.URL { - return true - } - } - - // check all existing peer addresses - peerURLs := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) - for _, peerURL := range peerURLs { - if peerURL == s.Config.URL { - return false - } - } - return true -} - -// Helper function to do discovery and return results in expected format -func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) { - peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL, s.closeChan, s.startRoutine) - - // Warn about errors coming from discovery, this isn't fatal - // since the user might have provided a peer list elsewhere, - // or there is some log in data dir. - if err != nil { - log.Warnf("Discovery encountered an error: %v", err) - return - } - - for i := range peers { - // Strip the scheme off of the peer if it has one - // TODO(bp): clean this up! - purl, err := url.Parse(peers[i]) - if err == nil { - peers[i] = purl.Host - } - } - - log.Infof("Discovery fetched back peer list: %v", peers) - - return -} - -// getKnownPeers gets the previous peers from log -func (s *PeerServer) getKnownPeers() []string { - peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) - log.Infof("Peer URLs in log: %s / %s (%s)", s.raftServer.Leader(), s.Config.Name, strings.Join(peers, ",")) - - for i := range peers { - u, err := url.Parse(peers[i]) - if err != nil { - log.Debugf("getKnownPeers cannot parse url %v", peers[i]) - } - peers[i] = u.Host - } - return peers -} - -// removeSelfFromList removes url of the peerServer from the peer list -func (s *PeerServer) removeSelfFromList(peers []string) []string { - // Remove its own peer address from the peer list to join - u, err := url.Parse(s.Config.URL) - if err != nil { - log.Warnf("failed parsing self peer address %v", s.Config.URL) - u = nil - } - newPeers := make([]string, 0) - for _, v := range peers { - if u == nil || v != u.Host { - newPeers = append(newPeers, v) - } - } - return newPeers -} - -func (s *PeerServer) joinCluster(cluster []string) (bool, error) { - for _, peer := range cluster { - if len(peer) == 0 { - continue - } - - if rejected, err := s.joinByPeer(s.raftServer, peer, s.Config.Scheme); rejected { - return true, fmt.Errorf("rejected by peer %s: %v", peer, err) - } else if err == nil { - log.Infof("%s joined the cluster via peer %s", s.Config.Name, peer) - return false, nil - } else { - log.Infof("%s attempted to join via %s failed: %v", s.Config.Name, peer, err) - } - } - - return false, fmt.Errorf("unreachable cluster") -} - -// Send join requests to peer. -// The first return tells whether it is rejected by the cluster directly. -func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) (bool, error) { - u := (&url.URL{Host: peer, Scheme: scheme}).String() - - // Our version must match the leaders version - version, err := s.client.GetVersion(u) - if err != nil { - return false, fmt.Errorf("fail checking join version: %v", err) - } - if version < store.MinVersion() || version > store.MaxVersion() { - return true, fmt.Errorf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) - } - - // Fetch current peer list - machines, err := s.client.GetMachines(u) - if err != nil { - return false, fmt.Errorf("fail getting machine messages: %v", err) - } - exist := false - for _, machine := range machines { - if machine.Name == server.Name() { - exist = true - break - } - } - - // Fetch cluster config to see whether exists some place. - clusterConfig, err := s.client.GetClusterConfig(u) - if err != nil { - return false, fmt.Errorf("fail getting cluster config: %v", err) - } - if !exist && clusterConfig.ActiveSize <= len(machines) { - return true, fmt.Errorf("stop joining because the cluster is full with %d nodes", len(machines)) - } - - joinIndex, err := s.client.AddMachine(u, - &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: server.Name(), - RaftURL: s.Config.URL, - EtcdURL: s.server.URL(), - }) - if err != nil { - return err.ErrorCode == etcdErr.EcodeNoMorePeer, fmt.Errorf("fail on join request: %v", err) - } - - s.joinIndex = joinIndex - return false, nil -} - -func (s *PeerServer) Stats() []byte { - s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.StartTime).String() - - // TODO: register state listener to raft to change this field - // rather than compare the state each time Stats() is called. - if s.RaftServer().State() == raft.Leader { - s.serverStats.LeaderInfo.Name = s.RaftServer().Name() - } - - queue := s.serverStats.sendRateQueue - - s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate() - - queue = s.serverStats.recvRateQueue - - s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate() - - b, _ := json.Marshal(s.serverStats) - - return b -} - -func (s *PeerServer) PeerStats() []byte { - if s.raftServer.State() == raft.Leader { - b, _ := json.Marshal(s.followersStats) - return b - } - return nil -} - -// removedEvent handles the case where a machine has been removed from the -// cluster and is notified when it tries to become a candidate. -func (s *PeerServer) removedEvent(event raft.Event) { - // HACK(philips): we need to find a better notification for this. - log.Infof("removed during cluster re-configuration") - s.asyncRemove() -} - -// raftEventLogger converts events from the Raft server into log messages. -func (s *PeerServer) raftEventLogger(event raft.Event) { - value := event.Value() - prevValue := event.PrevValue() - if value == nil { - value = "" - } - if prevValue == nil { - prevValue = "" - } - - switch event.Type() { - case raft.StateChangeEventType: - log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value) - case raft.TermChangeEventType: - log.Infof("%s: term #%v started.", s.Config.Name, value) - case raft.LeaderChangeEventType: - log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value) - case raft.AddPeerEventType: - log.Infof("%s: peer added: '%v'", s.Config.Name, value) - case raft.RemovePeerEventType: - log.Infof("%s: peer removed: '%v'", s.Config.Name, value) - case raft.HeartbeatIntervalEventType: - peer, ok := value.(*raft.Peer) - if !ok { - log.Warnf("%s: heatbeat timeout from unknown peer", s.Config.Name) - return - } - s.logHeartbeatTimeout(peer) - case raft.ElectionTimeoutThresholdEventType: - select { - case s.timeoutThresholdChan <- value: - default: - } - - } -} - -// logHeartbeatTimeout logs about the edge triggered heartbeat timeout event -// only if we haven't warned within a reasonable interval. -func (s *PeerServer) logHeartbeatTimeout(peer *raft.Peer) { - b, ok := s.logBackoffs[peer.Name] - if !ok { - b = &logBackoff{time.Time{}, time.Second, 1} - s.logBackoffs[peer.Name] = b - } - - if peer.LastActivity().After(b.next) { - b.next = time.Time{} - b.backoff = time.Second - b.count = 1 - } - - if b.next.After(time.Now()) { - b.count++ - return - } - - b.backoff = 2 * b.backoff - if b.backoff > MaxHeartbeatTimeoutBackoff { - b.backoff = MaxHeartbeatTimeoutBackoff - } - b.next = time.Now().Add(b.backoff) - - log.Infof("%s: warning: heartbeat time out peer=%q missed=%d backoff=%q", s.Config.Name, peer.Name, b.count, b.backoff) -} - -func (s *PeerServer) recordMetricEvent(event raft.Event) { - name := fmt.Sprintf("raft.event.%s", event.Type()) - value := event.Value().(time.Duration) - (*s.metrics).Timer(name).Update(value) -} - -// logSnapshot logs about the snapshot that was taken. -func (s *PeerServer) logSnapshot(err error, currentIndex, count uint64) { - info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.Config.Name, count, currentIndex) - - if err != nil { - log.Infof("%s attempted and failed: %v", info, err) - } else { - log.Infof("%s completed", info) - } -} - -func (s *PeerServer) startRoutine(f func()) { - s.routineGroup.Add(1) - go func() { - defer s.routineGroup.Done() - f() - }() -} - -func (s *PeerServer) monitorSnapshot() { - for { - timer := time.NewTimer(s.snapConf.checkingInterval) - select { - case <-s.closeChan: - timer.Stop() - return - case <-timer.C: - } - - currentIndex := s.RaftServer().CommitIndex() - count := currentIndex - s.snapConf.lastIndex - if uint64(count) > s.snapConf.snapshotThr { - err := s.raftServer.TakeSnapshot() - s.logSnapshot(err, currentIndex, count) - s.snapConf.lastIndex = currentIndex - } - } -} - -func (s *PeerServer) monitorSync() { - ticker := time.NewTicker(time.Millisecond * 500) - defer ticker.Stop() - for { - select { - case <-s.closeChan: - return - case now := <-ticker.C: - if s.raftServer.State() == raft.Leader { - s.raftServer.Do(s.store.CommandFactory().CreateSyncCommand(now)) - } - } - } -} - -// monitorTimeoutThreshold groups timeout threshold events together and prints -// them as a single log line. -func (s *PeerServer) monitorTimeoutThreshold() { - ticker := time.NewTicker(ThresholdMonitorTimeout) - defer ticker.Stop() - for { - select { - case <-s.closeChan: - return - case value := <-s.timeoutThresholdChan: - log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value) - } - - select { - case <-s.closeChan: - return - case <-ticker.C: - } - } -} - -// monitorActiveSize has the leader periodically check the status of cluster -// nodes and swaps them out for standbys as needed. -func (s *PeerServer) monitorActiveSize() { - ticker := time.NewTicker(ActiveMonitorTimeout) - defer ticker.Stop() - for { - select { - case <-s.closeChan: - return - case <-ticker.C: - } - - // Ignore while this peer is not a leader. - if s.raftServer.State() != raft.Leader { - continue - } - - // Retrieve target active size and actual active size. - activeSize := s.ClusterConfig().ActiveSize - peers := s.registry.Names() - peerCount := len(peers) - if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name { - peers = append(peers[:index], peers[index+1:]...) - } - - // If we have more active nodes than we should then remove. - if peerCount > activeSize { - peer := peers[rand.Intn(len(peers))] - log.Infof("%s: removing node: %v; peer number %d > expected size %d", s.Config.Name, peer, peerCount, activeSize) - if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil { - log.Infof("%s: warning: remove error: %v", s.Config.Name, err) - } - continue - } - } -} - -// monitorPeerActivity has the leader periodically for dead nodes and demotes them. -func (s *PeerServer) monitorPeerActivity() { - ticker := time.NewTicker(PeerActivityMonitorTimeout) - defer ticker.Stop() - for { - select { - case <-s.closeChan: - return - case <-ticker.C: - } - - // Ignore while this peer is not a leader. - if s.raftServer.State() != raft.Leader { - continue - } - - // Check last activity for all peers. - now := time.Now() - removeDelay := time.Duration(int64(s.ClusterConfig().RemoveDelay * float64(time.Second))) - peers := s.raftServer.Peers() - for _, peer := range peers { - // If the last response from the peer is longer than the remove delay - // then automatically demote the peer. - if !peer.LastActivity().IsZero() && now.Sub(peer.LastActivity()) > removeDelay { - log.Infof("%s: removing node: %v; last activity %v ago", s.Config.Name, peer.Name, now.Sub(peer.LastActivity())) - if _, err := s.raftServer.Do(&RemoveCommand{Name: peer.Name}); err != nil { - log.Infof("%s: warning: autodemotion error: %v", s.Config.Name, err) - } - continue - } - } - } -} diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go deleted file mode 100644 index 5ad0f9c0b..000000000 --- a/server/peer_server_handlers.go +++ /dev/null @@ -1,320 +0,0 @@ -// +build ignore - -package server - -import ( - "encoding/json" - "net/http" - "strconv" - "time" - - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" - uhttp "github.com/coreos/etcd/pkg/http" - "github.com/coreos/etcd/store" -) - -// Get all the current logs -func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] GET %s/log", ps.Config.URL) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(ps.raftServer.LogEntries()) -} - -// Response to vote request -func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { - rvreq := &raft.RequestVoteRequest{} - - if _, err := rvreq.Decode(req.Body); err != nil { - http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err) - return - } - - log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName) - - resp := ps.raftServer.RequestVote(rvreq) - - if resp == nil { - log.Warn("[vote] Error: nil response") - http.Error(w, "", http.StatusInternalServerError) - return - } - - if _, err := resp.Encode(w); err != nil { - log.Warn("[vote] Error: %v", err) - http.Error(w, "", http.StatusInternalServerError) - return - } -} - -// Response to append entries request -func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { - start := time.Now() - aereq := &raft.AppendEntriesRequest{} - - if _, err := aereq.Decode(req.Body); err != nil { - http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err) - return - } - - log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries)) - - ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) - - resp := ps.raftServer.AppendEntries(aereq) - - if resp == nil { - log.Warn("[ae] Error: nil response") - http.Error(w, "", http.StatusInternalServerError) - return - } - - if !resp.Success() { - log.Debugf("[Append Entry] Step back") - } - - if _, err := resp.Encode(w); err != nil { - log.Warn("[ae] Error: %v", err) - http.Error(w, "", http.StatusInternalServerError) - return - } - - (*ps.metrics).Timer("timer.appendentries.handle").UpdateSince(start) -} - -// Response to recover from snapshot request -func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { - ssreq := &raft.SnapshotRequest{} - - if _, err := ssreq.Decode(req.Body); err != nil { - http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err) - return - } - - log.Debugf("[recv] POST %s/snapshot", ps.Config.URL) - - resp := ps.raftServer.RequestSnapshot(ssreq) - - if resp == nil { - log.Warn("[ss] Error: nil response") - http.Error(w, "", http.StatusInternalServerError) - return - } - - if _, err := resp.Encode(w); err != nil { - log.Warn("[ss] Error: %v", err) - http.Error(w, "", http.StatusInternalServerError) - return - } -} - -// Response to recover from snapshot request -func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { - ssrreq := &raft.SnapshotRecoveryRequest{} - - if _, err := ssrreq.Decode(req.Body); err != nil { - http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err) - return - } - - log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL) - - resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq) - - if resp == nil { - log.Warn("[ssr] Error: nil response") - http.Error(w, "", http.StatusInternalServerError) - return - } - - if _, err := resp.Encode(w); err != nil { - log.Warn("[ssr] Error: %v", err) - http.Error(w, "", http.StatusInternalServerError) - return - } -} - -// Get the port that listening for etcd connecting of the server -func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL) - w.WriteHeader(http.StatusOK) - w.Write([]byte(ps.server.URL())) -} - -// Response to the join request -func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) { - command := &JoinCommand{} - if err := uhttp.DecodeJsonRequest(req, command); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - log.Debugf("Receive Join Request from %s", command.Name) - err := ps.server.Dispatch(command, w, req) - - // Return status. - if err != nil { - if etcdErr, ok := err.(*etcdErr.Error); ok { - log.Debug("Return error: ", (*etcdErr).Error()) - etcdErr.Write(w) - } else { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } -} - -// Response to remove request -func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { - if req.Method != "DELETE" { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - - vars := mux.Vars(req) - command := &RemoveCommand{ - Name: vars["name"], - } - - log.Debugf("[recv] Remove Request [%s]", command.Name) - - ps.server.Dispatch(command, w, req) -} - -// Returns a JSON-encoded cluster configuration. -func (ps *PeerServer) getClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(ps.ClusterConfig()) -} - -// Updates the cluster configuration. -func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *http.Request) { - // Decode map. - m := make(map[string]interface{}) - if err := json.NewDecoder(req.Body).Decode(&m); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Copy config and update fields passed in. - cfg := ps.ClusterConfig() - if activeSize, ok := m["activeSize"].(float64); ok { - cfg.ActiveSize = int(activeSize) - } - if removeDelay, ok := m["removeDelay"].(float64); ok { - cfg.RemoveDelay = removeDelay - } - if syncInterval, ok := m["syncInterval"].(float64); ok { - cfg.SyncInterval = syncInterval - } - - // Issue command to update. - c := &SetClusterConfigCommand{Config: cfg} - log.Debugf("[recv] Update Cluster Config Request") - ps.server.Dispatch(c, w, req) - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(ps.ClusterConfig()) -} - -// Retrieves a list of peers and standbys. -func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) { - machines := make([]*machineMessage, 0) - leader := ps.raftServer.Leader() - for _, name := range ps.registry.Names() { - if msg := ps.getMachineMessage(name, leader); msg != nil { - machines = append(machines, msg) - } - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(&machines) -} - -// Retrieve single peer or standby. -func (ps *PeerServer) getMachineHttpHandler(w http.ResponseWriter, req *http.Request) { - vars := mux.Vars(req) - m := ps.getMachineMessage(vars["name"], ps.raftServer.Leader()) - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(m) -} - -func (ps *PeerServer) getMachineMessage(name string, leader string) *machineMessage { - if !ps.registry.Exists(name) { - return nil - } - - clientURL, _ := ps.registry.ClientURL(name) - peerURL, _ := ps.registry.PeerURL(name) - msg := &machineMessage{ - Name: name, - State: raft.Follower, - ClientURL: clientURL, - PeerURL: peerURL, - } - if name == leader { - msg.State = raft.Leader - } - return msg -} - -// Response to the name request -func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/name/ ", ps.Config.URL) - w.WriteHeader(http.StatusOK) - w.Write([]byte(ps.Config.Name)) -} - -// Response to the name request -func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/version/ ", ps.Config.URL) - w.WriteHeader(http.StatusOK) - w.Write([]byte(strconv.Itoa(ps.store.Version()))) -} - -// Checks whether a given version is supported. -func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path) - vars := mux.Vars(req) - version, _ := strconv.Atoi(vars["version"]) - if version >= store.MinVersion() && version <= store.MaxVersion() { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(http.StatusForbidden) - } -} - -// Upgrades the current store version to the next version. -func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/version", ps.Config.URL) - - // Check if upgrade is possible for all nodes. - if err := ps.Upgradable(); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Create an upgrade command from the current version. - c := ps.store.CommandFactory().CreateUpgradeCommand() - if err := ps.server.Dispatch(c, w, req); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) -} - -// machineMessage represents information about a peer or standby in the registry. -type machineMessage struct { - Name string `json:"name"` - State string `json:"state"` - ClientURL string `json:"clientURL"` - PeerURL string `json:"peerURL"` -} diff --git a/server/raft_follower_stats.go b/server/raft_follower_stats.go deleted file mode 100644 index 3f43af8f3..000000000 --- a/server/raft_follower_stats.go +++ /dev/null @@ -1,65 +0,0 @@ -// +build ignore - -package server - -import ( - "math" - "time" -) - -type raftFollowersStats struct { - Leader string `json:"leader"` - Followers map[string]*raftFollowerStats `json:"followers"` -} - -func NewRaftFollowersStats(name string) *raftFollowersStats { - return &raftFollowersStats{ - Leader: name, - Followers: make(map[string]*raftFollowerStats), - } -} - -type raftFollowerStats struct { - Latency struct { - Current float64 `json:"current"` - Average float64 `json:"average"` - averageSquare float64 - StandardDeviation float64 `json:"standardDeviation"` - Minimum float64 `json:"minimum"` - Maximum float64 `json:"maximum"` - } `json:"latency"` - - Counts struct { - Fail uint64 `json:"fail"` - Success uint64 `json:"success"` - } `json:"counts"` -} - -// Succ function update the raftFollowerStats with a successful send -func (ps *raftFollowerStats) Succ(d time.Duration) { - total := float64(ps.Counts.Success) * ps.Latency.Average - totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare - - ps.Counts.Success++ - - ps.Latency.Current = float64(d) / (1000000.0) - - if ps.Latency.Current > ps.Latency.Maximum { - ps.Latency.Maximum = ps.Latency.Current - } - - if ps.Latency.Current < ps.Latency.Minimum { - ps.Latency.Minimum = ps.Latency.Current - } - - ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success) - ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success) - - // sdv = sqrt(avg(x^2) - avg(x)^2) - ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average) -} - -// Fail function update the raftFollowerStats with a unsuccessful send -func (ps *raftFollowerStats) Fail() { - ps.Counts.Fail++ -} diff --git a/server/raft_server_stats.go b/server/raft_server_stats.go deleted file mode 100644 index e2e96812c..000000000 --- a/server/raft_server_stats.go +++ /dev/null @@ -1,81 +0,0 @@ -// +build ignore - -package server - -import ( - "sync" - "time" - - "github.com/coreos/etcd/third_party/github.com/goraft/raft" -) - -type raftServerStats struct { - Name string `json:"name"` - State string `json:"state"` - StartTime time.Time `json:"startTime"` - - LeaderInfo struct { - Name string `json:"leader"` - Uptime string `json:"uptime"` - StartTime time.Time `json:"startTime"` - } `json:"leaderInfo"` - - RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` - RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` - RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"` - - SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` - SendingPkgRate float64 `json:"sendPkgRate,omitempty"` - SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` - - sendRateQueue *statsQueue - recvRateQueue *statsQueue - - sync.Mutex -} - -func NewRaftServerStats(name string) *raftServerStats { - stats := &raftServerStats{ - Name: name, - StartTime: time.Now(), - sendRateQueue: &statsQueue{ - back: -1, - }, - recvRateQueue: &statsQueue{ - back: -1, - }, - } - stats.LeaderInfo.StartTime = time.Now() - return stats -} - -func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { - ss.Lock() - defer ss.Unlock() - - ss.State = raft.Follower - if leaderName != ss.LeaderInfo.Name { - ss.LeaderInfo.Name = leaderName - ss.LeaderInfo.StartTime = time.Now() - } - - ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) - ss.RecvAppendRequestCnt++ -} - -func (ss *raftServerStats) SendAppendReq(pkgSize int) { - ss.Lock() - defer ss.Unlock() - - now := time.Now() - - if ss.State != raft.Leader { - ss.State = raft.Leader - ss.LeaderInfo.Name = ss.Name - ss.LeaderInfo.StartTime = now - } - - ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize)) - - ss.SendAppendRequestCnt++ -} diff --git a/server/registry.go b/server/registry.go deleted file mode 100644 index 1df5565ee..000000000 --- a/server/registry.go +++ /dev/null @@ -1,240 +0,0 @@ -// +build ignore - -package server - -import ( - "fmt" - "net/url" - "path" - "path/filepath" - "sort" - "strings" - "sync" - - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/store" -) - -// The location of the peer URL data. -const RegistryKey = "/_etcd/machines" - -// The Registry stores URL information for nodes. -type Registry struct { - sync.Mutex - store store.Store - peers map[string]*node -} - -// The internal storage format of the registry. -type node struct { - peerVersion string - peerURL string - url string -} - -// Creates a new Registry. -func NewRegistry(s store.Store) *Registry { - return &Registry{ - store: s, - peers: make(map[string]*node), - } -} - -// Register adds a peer to the registry. -func (r *Registry) Register(name string, peerURL string, machURL string) error { - // Write data to store. - v := url.Values{} - v.Set("raft", peerURL) - v.Set("etcd", machURL) - log.Debugf("Register: %s", name) - if _, err := r.store.Create(path.Join(RegistryKey, name), false, v.Encode(), false, store.Permanent); err != nil { - return err - } - - r.Lock() - defer r.Unlock() - r.peers[name] = r.load(RegistryKey, name) - return nil -} - -// Unregister removes a peer from the registry. -func (r *Registry) Unregister(name string) error { - // Remove the key from the store. - log.Debugf("Unregister: %s", name) - _, err := r.store.Delete(path.Join(RegistryKey, name), false, false) - return err -} - -// Count returns the number of peers in the cluster. -func (r *Registry) Count() int { - e, err := r.store.Get(RegistryKey, false, false) - if err != nil { - return 0 - } - return len(e.Node.Nodes) -} - -// Exists checks if a peer with the given name exists. -func (r *Registry) Exists(name string) bool { - e, err := r.store.Get(path.Join(RegistryKey, name), false, false) - if err != nil { - return false - } - return (e.Node != nil) -} - -// Retrieves the client URL for a given node by name. -func (r *Registry) ClientURL(name string) (string, bool) { - r.Lock() - defer r.Unlock() - return r.clientURL(RegistryKey, name) -} - -func (r *Registry) clientURL(key, name string) (string, bool) { - if r.peers[name] == nil { - if peer := r.load(key, name); peer != nil { - r.peers[name] = peer - } - } - - if peer := r.peers[name]; peer != nil { - return peer.url, true - } - - return "", false -} - -// TODO(yichengq): have all of the code use a full URL with scheme -// and remove this method -// PeerHost retrieves the host part of peer URL for a given node by name. -func (r *Registry) PeerHost(name string) (string, bool) { - rawurl, ok := r.PeerURL(name) - if ok { - u, _ := url.Parse(rawurl) - return u.Host, ok - } - return rawurl, ok -} - -// Retrieves the peer URL for a given node by name. -func (r *Registry) PeerURL(name string) (string, bool) { - r.Lock() - defer r.Unlock() - return r.peerURL(RegistryKey, name) -} - -func (r *Registry) peerURL(key, name string) (string, bool) { - if r.peers[name] == nil { - if peer := r.load(key, name); peer != nil { - r.peers[name] = peer - } - } - - if peer := r.peers[name]; peer != nil { - return peer.peerURL, true - } - - return "", false -} - -// UpdatePeerURL updates peer URL in registry -func (r *Registry) UpdatePeerURL(name string, peerURL string) error { - machURL, _ := r.clientURL(RegistryKey, name) - // Write data to store. - v := url.Values{} - v.Set("raft", peerURL) - v.Set("etcd", machURL) - log.Debugf("Update PeerURL: %s", name) - if _, err := r.store.Update(path.Join(RegistryKey, name), v.Encode(), store.Permanent); err != nil { - return err - } - - r.Lock() - defer r.Unlock() - // Invalidate outdated cache. - r.invalidate(name) - return nil -} - -func (r *Registry) name(key, name string) (string, bool) { - return name, true -} - -// Names returns a list of cached peer names. -func (r *Registry) Names() []string { - names := r.urls(RegistryKey, "", "", r.name) - sort.Sort(sort.StringSlice(names)) - return names -} - -// Retrieves the Client URLs for all nodes. -func (r *Registry) ClientURLs(leaderName, selfName string) []string { - return r.urls(RegistryKey, leaderName, selfName, r.clientURL) -} - -// Retrieves the Peer URLs for all nodes. -func (r *Registry) PeerURLs(leaderName, selfName string) []string { - return r.urls(RegistryKey, leaderName, selfName, r.peerURL) -} - -// Retrieves the URLs for all nodes using url function. -func (r *Registry) urls(key, leaderName, selfName string, url func(key, name string) (string, bool)) []string { - r.Lock() - defer r.Unlock() - - // Build list including the leader and self. - urls := make([]string, 0) - if url, _ := url(key, leaderName); len(url) > 0 { - urls = append(urls, url) - } - - // Retrieve a list of all nodes. - if e, _ := r.store.Get(key, false, false); e != nil { - // Lookup the URL for each one. - for _, pair := range e.Node.Nodes { - _, name := filepath.Split(pair.Key) - if url, _ := url(key, name); len(url) > 0 && name != leaderName { - urls = append(urls, url) - } - } - } - - log.Debugf("URLs: %s: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ",")) - return urls -} - -// Removes a node from the cache. -func (r *Registry) Invalidate(name string) { - r.Lock() - defer r.Unlock() - r.invalidate(name) -} - -func (r *Registry) invalidate(name string) { - delete(r.peers, name) -} - -// Loads the given node by name from the store into the cache. -func (r *Registry) load(key, name string) *node { - if name == "" { - return nil - } - - // Retrieve from store. - e, err := r.store.Get(path.Join(key, name), false, false) - if err != nil { - return nil - } - - // Parse as a query string. - m, err := url.ParseQuery(*e.Node.Value) - if err != nil { - panic(fmt.Sprintf("Failed to parse peers entry: %s", name)) - } - - // Create node. - return &node{ - url: m["etcd"][0], - peerURL: m["raft"][0], - } -} diff --git a/server/release_version.go b/server/release_version.go deleted file mode 100644 index 87e2fd16c..000000000 --- a/server/release_version.go +++ /dev/null @@ -1,5 +0,0 @@ -// +build ignore - -package server - -const ReleaseVersion = "0.4.6+git" diff --git a/server/remove_command.go b/server/remove_command.go deleted file mode 100644 index 481ac06f2..000000000 --- a/server/remove_command.go +++ /dev/null @@ -1,77 +0,0 @@ -// +build ignore - -package server - -import ( - "encoding/binary" - - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" -) - -func init() { - raft.RegisterCommand(&RemoveCommand{}) -} - -// The RemoveCommand removes a server from the cluster. -type RemoveCommand struct { - Name string `json:"name"` -} - -// The name of the remove command in the log -func (c *RemoveCommand) CommandName() string { - return "etcd:remove" -} - -// Remove a server from the cluster -func (c *RemoveCommand) Apply(context raft.Context) (interface{}, error) { - index, err := applyRemove(c, context) - if err != nil { - return nil, err - } - - b := make([]byte, 8) - binary.PutUvarint(b, index) - return b, nil -} - -// applyRemove removes the given machine from the cluster. -func applyRemove(c *RemoveCommand, context raft.Context) (uint64, error) { - ps, _ := context.Server().Context().(*PeerServer) - commitIndex := context.CommitIndex() - - // Remove node from the shared registry. - err := ps.registry.Unregister(c.Name) - - // Delete from stats - delete(ps.followersStats.Followers, c.Name) - - if err != nil { - log.Debugf("Error while unregistering: %s (%v)", c.Name, err) - return 0, err - } - - // Remove peer in raft - if err := context.Server().RemovePeer(c.Name); err != nil { - log.Debugf("Unable to remove peer: %s (%v)", c.Name, err) - return 0, err - } - - if c.Name == context.Server().Name() { - // the removed node is this node - - // if the node is not replaying the previous logs - // and the node has sent out a join request in this - // start. It is sure that this node received a new remove - // command and need to be removed - if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 { - log.Debugf("server [%s] is removed", context.Server().Name()) - ps.asyncRemove() - } else { - // else ignore remove - log.Debugf("ignore previous remove command.") - ps.removedInLog = true - } - } - return commitIndex, nil -} diff --git a/server/server.go b/server/server.go deleted file mode 100644 index b1493c4d3..000000000 --- a/server/server.go +++ /dev/null @@ -1,344 +0,0 @@ -// +build ignore - -package server - -import ( - "encoding/json" - "fmt" - "net/http" - "net/http/pprof" - "strings" - "time" - - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" - - etcdErr "github.com/coreos/etcd/error" - ehttp "github.com/coreos/etcd/http" - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/metrics" - "github.com/coreos/etcd/mod" - uhttp "github.com/coreos/etcd/pkg/http" - "github.com/coreos/etcd/server/v2" - "github.com/coreos/etcd/store" - _ "github.com/coreos/etcd/store/v2" -) - -// This is the default implementation of the Server interface. -type Server struct { - Name string - url string - handler http.Handler - peerServer *PeerServer - registry *Registry - store store.Store - metrics *metrics.Bucket - - trace bool -} - -// Creates a new Server. -func New(name, url string, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server { - s := &Server{ - Name: name, - url: url, - store: store, - registry: registry, - peerServer: peerServer, - metrics: mb, - } - - return s -} - -func (s *Server) EnableTracing() { - s.trace = true -} - -// The current state of the server in the cluster. -func (s *Server) State() string { - return s.peerServer.RaftServer().State() -} - -// The node name of the leader in the cluster. -func (s *Server) Leader() string { - return s.peerServer.RaftServer().Leader() -} - -// The current Raft committed index. -func (s *Server) CommitIndex() uint64 { - return s.peerServer.RaftServer().CommitIndex() -} - -// The current Raft term. -func (s *Server) Term() uint64 { - return s.peerServer.RaftServer().Term() -} - -// The server URL. -func (s *Server) URL() string { - return s.url -} - -// PeerHost retrieves the host part of Peer URL for a given node name. -func (s *Server) PeerHost(name string) (string, bool) { - return s.registry.PeerHost(name) -} - -// Retrives the Peer URL for a given node name. -func (s *Server) PeerURL(name string) (string, bool) { - return s.registry.PeerURL(name) -} - -// ClientURL retrieves the Client URL for a given node name. -func (s *Server) ClientURL(name string) (string, bool) { - return s.registry.ClientURL(name) -} - -// Returns a reference to the Store. -func (s *Server) Store() store.Store { - return s.store -} - -func (s *Server) SetRegistry(registry *Registry) { - s.registry = registry -} - -func (s *Server) SetStore(store store.Store) { - s.store = store -} - -func (s *Server) installV2(r *mux.Router) { - r2 := mux.NewRouter() - r.PathPrefix("/v2").Handler(ehttp.NewLowerQueryParamsHandler(r2)) - - s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.GetHandler).Methods("GET", "HEAD") - s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PostHandler).Methods("POST") - s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT") - s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE") - s.handleFunc(r2, "/v2/leader", s.GetLeaderHandler).Methods("GET", "HEAD") - s.handleFunc(r2, "/v2/machines", s.GetPeersHandler).Methods("GET", "HEAD") - s.handleFunc(r2, "/v2/peers", s.GetPeersHandler).Methods("GET", "HEAD") - s.handleFunc(r2, "/v2/stats/self", s.GetStatsHandler).Methods("GET", "HEAD") - s.handleFunc(r2, "/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD") - s.handleFunc(r2, "/v2/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD") - s.handleFunc(r2, "/v2/speedTest", s.SpeedTestHandler).Methods("GET", "HEAD") -} - -func (s *Server) installMod(r *mux.Router) { - r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.URL()))) -} - -func (s *Server) installDebug(r *mux.Router) { - s.handleFunc(r, "/debug/metrics", s.GetMetricsHandler).Methods("GET", "HEAD") - r.HandleFunc("/debug/pprof", pprof.Index) - r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - r.HandleFunc("/debug/pprof/profile", pprof.Profile) - r.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - r.HandleFunc("/debug/pprof/{name}", pprof.Index) -} - -// Adds a v2 server handler to the router. -func (s *Server) handleFuncV2(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route { - return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error { - return f(w, req, s) - }) -} - -type HEADResponseWriter struct { - http.ResponseWriter -} - -func (w *HEADResponseWriter) Write([]byte) (int, error) { - return 0, nil -} - -// Adds a server handler to the router. -func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route { - - // Wrap the standard HandleFunc interface to pass in the server reference. - return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { - if req.Method == "HEAD" { - w = &HEADResponseWriter{w} - } - - // Log request. - log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr) - - // Execute handler function and return error if necessary. - if err := f(w, req); err != nil { - if etcdErr, ok := err.(*etcdErr.Error); ok { - log.Debug("Return error: ", (*etcdErr).Error()) - w.Header().Set("Content-Type", "application/json") - etcdErr.Write(w) - } else { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } - }) -} - -func (s *Server) HTTPHandler() http.Handler { - router := mux.NewRouter() - - // Install the routes. - s.handleFunc(router, "/version", s.GetVersionHandler).Methods("GET") - s.installV2(router) - // Mod is deprecated temporariy due to its unstable state. - // It would be added back later. - // s.installMod(router) - - if s.trace { - s.installDebug(router) - } - - return router -} - -// Dispatch command to the current leader -func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { - ps := s.peerServer - if ps.raftServer.State() == raft.Leader { - result, err := ps.raftServer.Do(c) - if err != nil { - return err - } - - if result == nil { - return etcdErr.NewError(300, "Empty result from raft", s.Store().Index()) - } - - // response for raft related commands[join/remove] - if b, ok := result.([]byte); ok { - w.WriteHeader(http.StatusOK) - w.Write(b) - return nil - } - - e, _ := result.(*store.Event) - b, _ := json.Marshal(e) - - w.Header().Set("Content-Type", "application/json") - // etcd index should be the same as the event index - // which is also the last modified index of the node - w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index())) - w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex())) - w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term())) - - if e.IsCreated() { - w.WriteHeader(http.StatusCreated) - } else { - w.WriteHeader(http.StatusOK) - } - - w.Write(b) - - return nil - - } - - leader := ps.raftServer.Leader() - if leader == "" { - return etcdErr.NewError(300, "", s.Store().Index()) - } - - var url string - switch c.(type) { - case *JoinCommand, *RemoveCommand, - *SetClusterConfigCommand: - url, _ = ps.registry.PeerURL(leader) - default: - url, _ = ps.registry.ClientURL(leader) - } - - uhttp.Redirect(url, w, req) - - return nil -} - -// Handler to return the current version of etcd. -func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, "etcd %s", ReleaseVersion) - return nil -} - -// Handler to return the current leader's raft address -func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error { - leader := s.peerServer.RaftServer().Leader() - if leader == "" { - return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index()) - } - w.WriteHeader(http.StatusOK) - url, _ := s.registry.PeerURL(leader) - w.Write([]byte(url)) - return nil -} - -// Handler to return all the known peers in the current cluster. -func (s *Server) GetPeersHandler(w http.ResponseWriter, req *http.Request) error { - peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.Name) - w.WriteHeader(http.StatusOK) - w.Write([]byte(strings.Join(peers, ", "))) - return nil -} - -// Retrieves stats on the Raft server. -func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error { - w.Header().Set("Content-Type", "application/json") - w.Write(s.peerServer.Stats()) - return nil -} - -// Retrieves stats on the leader. -func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error { - if s.peerServer.RaftServer().State() == raft.Leader { - w.Header().Set("Content-Type", "application/json") - w.Write(s.peerServer.PeerStats()) - return nil - } - - leader := s.peerServer.RaftServer().Leader() - if leader == "" { - return etcdErr.NewError(300, "", s.Store().Index()) - } - hostname, _ := s.registry.ClientURL(leader) - uhttp.Redirect(hostname, w, req) - return nil -} - -// Retrieves stats on the leader. -func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error { - w.Header().Set("Content-Type", "application/json") - w.Write(s.store.JsonStats()) - return nil -} - -// Executes a speed test to evaluate the performance of update replication. -func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) error { - count := 1000 - c := make(chan bool, count) - for i := 0; i < count; i++ { - go func() { - for j := 0; j < 10; j++ { - c := s.Store().CommandFactory().CreateSetCommand("foo", false, "bar", time.Unix(0, 0)) - s.peerServer.RaftServer().Do(c) - } - c <- true - }() - } - - for i := 0; i < count; i++ { - <-c - } - - w.WriteHeader(http.StatusOK) - w.Write([]byte("speed test success")) - return nil -} - -// Retrieves metrics from bucket -func (s *Server) GetMetricsHandler(w http.ResponseWriter, req *http.Request) error { - (*s.metrics).Dump(w) - return nil -} diff --git a/server/set_cluster_config_command.go b/server/set_cluster_config_command.go deleted file mode 100644 index eb21bae6f..000000000 --- a/server/set_cluster_config_command.go +++ /dev/null @@ -1,28 +0,0 @@ -// +build ignore - -package server - -import ( - "github.com/coreos/etcd/third_party/github.com/goraft/raft" -) - -func init() { - raft.RegisterCommand(&SetClusterConfigCommand{}) -} - -// SetClusterConfigCommand sets the cluster-level configuration. -type SetClusterConfigCommand struct { - Config *ClusterConfig `json:"config"` -} - -// CommandName returns the name of the command. -func (c *SetClusterConfigCommand) CommandName() string { - return "etcd:setClusterConfig" -} - -// Apply updates the cluster configuration. -func (c *SetClusterConfigCommand) Apply(context raft.Context) (interface{}, error) { - ps, _ := context.Server().Context().(*PeerServer) - ps.SetClusterConfig(c.Config) - return nil, nil -} diff --git a/server/standby_server.go b/server/standby_server.go deleted file mode 100644 index 05a489ab7..000000000 --- a/server/standby_server.go +++ /dev/null @@ -1,341 +0,0 @@ -// +build ignore - -package server - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "os" - "path/filepath" - "sync" - "time" - - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" - uhttp "github.com/coreos/etcd/pkg/http" - "github.com/coreos/etcd/store" -) - -const standbyInfoName = "standby_info" - -type StandbyServerConfig struct { - Name string - PeerScheme string - PeerURL string - ClientURL string - DataDir string -} - -type standbyInfo struct { - // stay running in standby mode - Running bool - Cluster []*machineMessage - SyncInterval float64 -} - -type StandbyServer struct { - Config StandbyServerConfig - client *Client - raftServer raft.Server - - standbyInfo - joinIndex uint64 - - removeNotify chan bool - started bool - closeChan chan bool - routineGroup sync.WaitGroup - - sync.Mutex -} - -func NewStandbyServer(cfg StandbyServerConfig, client *Client) *StandbyServer { - s := &StandbyServer{ - Config: cfg, - client: client, - standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval}, - } - if err := s.loadInfo(); err != nil { - log.Warnf("error load standby info file: %v", err) - } - return s -} - -func (s *StandbyServer) SetRaftServer(raftServer raft.Server) { - s.raftServer = raftServer -} - -func (s *StandbyServer) Start() { - s.Lock() - defer s.Unlock() - if s.started { - return - } - s.started = true - - s.removeNotify = make(chan bool) - s.closeChan = make(chan bool) - - s.Running = true - if err := s.saveInfo(); err != nil { - log.Warnf("error saving cluster info for standby") - } - - s.routineGroup.Add(1) - go func() { - defer s.routineGroup.Done() - s.monitorCluster() - }() -} - -// Stop stops the server gracefully. -func (s *StandbyServer) Stop() { - s.Lock() - defer s.Unlock() - if !s.started { - return - } - s.started = false - - close(s.closeChan) - s.routineGroup.Wait() -} - -// RemoveNotify notifies the server is removed from standby mode and ready -// for peer mode. It should have joined the cluster successfully. -func (s *StandbyServer) RemoveNotify() <-chan bool { - return s.removeNotify -} - -func (s *StandbyServer) ClientHTTPHandler() http.Handler { - return http.HandlerFunc(s.redirectRequests) -} - -func (s *StandbyServer) IsRunning() bool { - return s.Running -} - -func (s *StandbyServer) ClusterURLs() []string { - peerURLs := make([]string, 0) - for _, peer := range s.Cluster { - peerURLs = append(peerURLs, peer.PeerURL) - } - return peerURLs -} - -func (s *StandbyServer) ClusterSize() int { - return len(s.Cluster) -} - -func (s *StandbyServer) setCluster(cluster []*machineMessage) { - s.Cluster = cluster -} - -func (s *StandbyServer) SyncCluster(peers []string) error { - for i, url := range peers { - peers[i] = s.fullPeerURL(url) - } - - if err := s.syncCluster(peers); err != nil { - log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err) - return err - } - - log.Infof("set cluster(%v) for standby server", s.ClusterURLs()) - return nil -} - -func (s *StandbyServer) SetSyncInterval(second float64) { - s.SyncInterval = second -} - -func (s *StandbyServer) ClusterLeader() *machineMessage { - for _, machine := range s.Cluster { - if machine.State == raft.Leader { - return machine - } - } - return nil -} - -func (s *StandbyServer) JoinIndex() uint64 { - return s.joinIndex -} - -func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request) { - leader := s.ClusterLeader() - if leader == nil { - w.Header().Set("Content-Type", "application/json") - etcdErr.NewError(etcdErr.EcodeStandbyInternal, "", 0).Write(w) - return - } - uhttp.Redirect(leader.ClientURL, w, r) -} - -// monitorCluster assumes that the machine has tried to join the cluster and -// failed, so it waits for the interval at the beginning. -func (s *StandbyServer) monitorCluster() { - ticker := time.NewTicker(time.Duration(int64(s.SyncInterval * float64(time.Second)))) - defer ticker.Stop() - for { - select { - case <-s.closeChan: - return - case <-ticker.C: - } - - if err := s.syncCluster(nil); err != nil { - log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err) - continue - } - - leader := s.ClusterLeader() - if leader == nil { - log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs()) - continue - } - - if err := s.join(leader.PeerURL); err != nil { - log.Debugf("fail joining through leader %v: %v", leader, err) - continue - } - - log.Infof("join through leader %v", leader.PeerURL) - s.Running = false - if err := s.saveInfo(); err != nil { - log.Warnf("error saving cluster info for standby") - } - go func() { - s.Stop() - close(s.removeNotify) - }() - return - } -} - -func (s *StandbyServer) syncCluster(peerURLs []string) error { - peerURLs = append(s.ClusterURLs(), peerURLs...) - - for _, peerURL := range peerURLs { - // Fetch current peer list - machines, err := s.client.GetMachines(peerURL) - if err != nil { - log.Debugf("fail getting machine messages from %v", peerURL) - continue - } - - cfg, err := s.client.GetClusterConfig(peerURL) - if err != nil { - log.Debugf("fail getting cluster config from %v", peerURL) - continue - } - - s.setCluster(machines) - s.SetSyncInterval(cfg.SyncInterval) - if err := s.saveInfo(); err != nil { - log.Warnf("fail saving cluster info into disk: %v", err) - } - return nil - } - return fmt.Errorf("unreachable cluster") -} - -func (s *StandbyServer) join(peer string) error { - for _, url := range s.ClusterURLs() { - if s.Config.PeerURL == url { - s.joinIndex = s.raftServer.CommitIndex() - return nil - } - } - - // Our version must match the leaders version - version, err := s.client.GetVersion(peer) - if err != nil { - log.Debugf("error getting peer version") - return err - } - if version < store.MinVersion() || version > store.MaxVersion() { - log.Debugf("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version) - return fmt.Errorf("incompatible version") - } - - // Fetch cluster config to see whether exists some place. - clusterConfig, err := s.client.GetClusterConfig(peer) - if err != nil { - log.Debugf("error getting cluster config") - return err - } - if clusterConfig.ActiveSize <= len(s.Cluster) { - log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster)) - return fmt.Errorf("out of quota") - } - - commitIndex, err := s.client.AddMachine(peer, - &JoinCommand{ - MinVersion: store.MinVersion(), - MaxVersion: store.MaxVersion(), - Name: s.Config.Name, - RaftURL: s.Config.PeerURL, - EtcdURL: s.Config.ClientURL, - }) - if err != nil { - log.Debugf("error on join request") - return err - } - s.joinIndex = commitIndex - - return nil -} - -func (s *StandbyServer) fullPeerURL(urlStr string) string { - u, err := url.Parse(urlStr) - if err != nil { - log.Warnf("fail parsing url %v", u) - return urlStr - } - u.Scheme = s.Config.PeerScheme - return u.String() -} - -func (s *StandbyServer) loadInfo() error { - var info standbyInfo - - path := filepath.Join(s.Config.DataDir, standbyInfoName) - file, err := os.OpenFile(path, os.O_RDONLY, 0600) - if err != nil { - if os.IsNotExist(err) { - return nil - } - return err - } - defer file.Close() - if err = json.NewDecoder(file).Decode(&info); err != nil { - return err - } - s.standbyInfo = info - return nil -} - -func (s *StandbyServer) saveInfo() error { - tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName) - if err != nil { - return err - } - if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil { - tmpFile.Close() - os.Remove(tmpFile.Name()) - return err - } - tmpFile.Close() - - path := filepath.Join(s.Config.DataDir, standbyInfoName) - if err = os.Rename(tmpFile.Name(), path); err != nil { - return err - } - return nil -} diff --git a/server/stats_queue.go b/server/stats_queue.go deleted file mode 100644 index 349a5592f..000000000 --- a/server/stats_queue.go +++ /dev/null @@ -1,91 +0,0 @@ -// +build ignore - -package server - -import ( - "sync" - "time" -) - -const ( - queueCapacity = 200 -) - -type statsQueue struct { - items [queueCapacity]*packageStats - size int - front int - back int - totalPkgSize int - rwl sync.RWMutex -} - -func (q *statsQueue) Len() int { - return q.size -} - -func (q *statsQueue) PkgSize() int { - return q.totalPkgSize -} - -// FrontAndBack gets the front and back elements in the queue -// We must grab front and back together with the protection of the lock -func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) { - q.rwl.RLock() - defer q.rwl.RUnlock() - if q.size != 0 { - return q.items[q.front], q.items[q.back] - } - return nil, nil -} - -// Insert function insert a packageStats into the queue and update the records -func (q *statsQueue) Insert(p *packageStats) { - q.rwl.Lock() - defer q.rwl.Unlock() - - q.back = (q.back + 1) % queueCapacity - - if q.size == queueCapacity { //dequeue - q.totalPkgSize -= q.items[q.front].size - q.front = (q.back + 1) % queueCapacity - } else { - q.size++ - } - - q.items[q.back] = p - q.totalPkgSize += q.items[q.back].size - -} - -// Rate function returns the package rate and byte rate -func (q *statsQueue) Rate() (float64, float64) { - front, back := q.frontAndBack() - - if front == nil || back == nil { - return 0, 0 - } - - if time.Now().Sub(back.Time()) > time.Second { - q.Clear() - return 0, 0 - } - - sampleDuration := back.Time().Sub(front.Time()) - - pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second) - - br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second) - - return pr, br -} - -// Clear function clear up the statsQueue -func (q *statsQueue) Clear() { - q.rwl.Lock() - defer q.rwl.Unlock() - q.back = -1 - q.front = 0 - q.size = 0 - q.totalPkgSize = 0 -} diff --git a/server/tls_info.go b/server/tls_info.go deleted file mode 100644 index defdfd00c..000000000 --- a/server/tls_info.go +++ /dev/null @@ -1,108 +0,0 @@ -// +build ignore - -package server - -import ( - "crypto/tls" - "crypto/x509" - "encoding/pem" - "fmt" - "io/ioutil" -) - -// TLSInfo holds the SSL certificates paths. -type TLSInfo struct { - CertFile string `json:"CertFile"` - KeyFile string `json:"KeyFile"` - CAFile string `json:"CAFile"` -} - -func (info TLSInfo) Scheme() string { - if info.KeyFile != "" && info.CertFile != "" { - return "https" - } else { - return "http" - } -} - -// Generates a tls.Config object for a server from the given files. -func (info TLSInfo) ServerConfig() (*tls.Config, error) { - // Both the key and cert must be present. - if info.KeyFile == "" || info.CertFile == "" { - return nil, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile) - } - - var cfg tls.Config - - tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile) - if err != nil { - return nil, err - } - - cfg.Certificates = []tls.Certificate{tlsCert} - - if info.CAFile != "" { - cfg.ClientAuth = tls.RequireAndVerifyClientCert - cp, err := newCertPool(info.CAFile) - if err != nil { - return nil, err - } - - cfg.RootCAs = cp - cfg.ClientCAs = cp - } else { - cfg.ClientAuth = tls.NoClientCert - } - - return &cfg, nil -} - -// Generates a tls.Config object for a client from the given files. -func (info TLSInfo) ClientConfig() (*tls.Config, error) { - var cfg tls.Config - - if info.KeyFile == "" || info.CertFile == "" { - return &cfg, nil - } - - tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile) - if err != nil { - return nil, err - } - - cfg.Certificates = []tls.Certificate{tlsCert} - - if info.CAFile != "" { - cp, err := newCertPool(info.CAFile) - if err != nil { - return nil, err - } - - cfg.RootCAs = cp - } - - return &cfg, nil -} - -// newCertPool creates x509 certPool with provided CA file -func newCertPool(CAFile string) (*x509.CertPool, error) { - certPool := x509.NewCertPool() - pemByte, err := ioutil.ReadFile(CAFile) - if err != nil { - return nil, err - } - - for { - var block *pem.Block - block, pemByte = pem.Decode(pemByte) - if block == nil { - return certPool, nil - } - cert, err := x509.ParseCertificate(block.Bytes) - if err != nil { - return nil, err - } - certPool.AddCert(cert) - } - -} diff --git a/server/transporter.go b/server/transporter.go deleted file mode 100644 index 333c93054..000000000 --- a/server/transporter.go +++ /dev/null @@ -1,259 +0,0 @@ -// +build ignore - -package server - -import ( - "bytes" - "crypto/tls" - "fmt" - "io" - "net" - "net/http" - "time" - - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient" -) - -const ( - snapshotTimeout = time.Second * 120 -) - -// Transporter layer for communication between raft nodes -type transporter struct { - followersStats *raftFollowersStats - serverStats *raftServerStats - registry *Registry - - client *http.Client - transport *httpclient.Transport - snapshotClient *http.Client - snapshotTransport *httpclient.Transport -} - -type dialer func(network, addr string) (net.Conn, error) - -// Create transporter using by raft server -// Create http or https transporter based on -// whether the user give the server cert and key -func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { - tr := &httpclient.Transport{ - ResponseHeaderTimeout: responseHeaderTimeout, - // This is a workaround for Transport.CancelRequest doesn't work on - // HTTPS connections blocked. The patch for it is in progress, - // and would be available in Go1.3 - // More: https://codereview.appspot.com/69280043/ - ConnectTimeout: dialTimeout, - RequestTimeout: requestTimeout, - } - - // Sending snapshot might take a long time so we use a different HTTP transporter - // Timeout is set to 120s (Around 100MB if the bandwidth is 10Mbits/s) - // This timeout is not calculated by heartbeat time. - // TODO(xiangl1) we can actually calculate the max bandwidth if we know - // average RTT. - // It should be equal to (TCP max window size/RTT). - sTr := &httpclient.Transport{ - ConnectTimeout: dialTimeout, - RequestTimeout: snapshotTimeout, - } - - t := transporter{ - client: &http.Client{Transport: tr}, - transport: tr, - snapshotClient: &http.Client{Transport: sTr}, - snapshotTransport: sTr, - followersStats: followersStats, - serverStats: serverStats, - registry: registry, - } - - return &t -} - -func (t *transporter) SetTLSConfig(tlsConf tls.Config) { - t.transport.TLSClientConfig = &tlsConf - t.transport.DisableCompression = true - - t.snapshotTransport.TLSClientConfig = &tlsConf - t.snapshotTransport.DisableCompression = true -} - -// Sends AppendEntries RPCs to a peer when the server is the leader. -func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { - var b bytes.Buffer - - if _, err := req.Encode(&b); err != nil { - log.Warn("transporter.ae.encoding.error:", err) - return nil - } - - size := b.Len() - - t.serverStats.SendAppendReq(size) - - u, _ := t.registry.PeerURL(peer.Name) - - log.Debugf("Send LogEntries to %s ", u) - - thisFollowerStats, ok := t.followersStats.Followers[peer.Name] - - if !ok { //this is the first time this follower has been seen - thisFollowerStats = &raftFollowerStats{} - thisFollowerStats.Latency.Minimum = 1 << 63 - t.followersStats.Followers[peer.Name] = thisFollowerStats - } - - start := time.Now() - - resp, _, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) - - end := time.Now() - - if err != nil { - log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) - if ok { - thisFollowerStats.Fail() - } - return nil - } else { - if ok { - thisFollowerStats.Succ(end.Sub(start)) - } - } - - if resp != nil { - defer resp.Body.Close() - - aeresp := &raft.AppendEntriesResponse{} - if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF { - log.Warn("transporter.ae.decoding.error:", err) - return nil - } - return aeresp - } - - return nil -} - -// Sends RequestVote RPCs to a peer when the server is the candidate. -func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { - var b bytes.Buffer - - if _, err := req.Encode(&b); err != nil { - log.Warn("transporter.vr.encoding.error:", err) - return nil - } - - u, _ := t.registry.PeerURL(peer.Name) - log.Debugf("Send Vote from %s to %s", server.Name(), u) - - resp, _, err := t.Post(fmt.Sprintf("%s/vote", u), &b) - - if err != nil { - log.Debugf("Cannot send VoteRequest to %s : %s", u, err) - } - - if resp != nil { - defer resp.Body.Close() - - rvrsp := &raft.RequestVoteResponse{} - if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF { - log.Warn("transporter.vr.decoding.error:", err) - return nil - } - return rvrsp - } - return nil -} - -// Sends SnapshotRequest RPCs to a peer when the server is the candidate. -func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { - var b bytes.Buffer - - if _, err := req.Encode(&b); err != nil { - log.Warn("transporter.ss.encoding.error:", err) - return nil - } - - u, _ := t.registry.PeerURL(peer.Name) - log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u) - - resp, _, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) - - if err != nil { - log.Debugf("Cannot send Snapshot Request to %s : %s", u, err) - } - - if resp != nil { - defer resp.Body.Close() - - ssrsp := &raft.SnapshotResponse{} - if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF { - log.Warn("transporter.ss.decoding.error:", err) - return nil - } - return ssrsp - } - return nil -} - -// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. -func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { - var b bytes.Buffer - - if _, err := req.Encode(&b); err != nil { - log.Warn("transporter.ss.encoding.error:", err) - return nil - } - - u, _ := t.registry.PeerURL(peer.Name) - log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u) - - resp, err := t.PostSnapshot(fmt.Sprintf("%s/snapshotRecovery", u), &b) - - if err != nil { - log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err) - } - - if resp != nil { - defer resp.Body.Close() - - ssrrsp := &raft.SnapshotRecoveryResponse{} - if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF { - log.Warn("transporter.ssr.decoding.error:", err) - return nil - } - return ssrrsp - } - return nil - -} - -// Send server side POST request -func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) { - req, _ := http.NewRequest("POST", urlStr, body) - resp, err := t.client.Do(req) - return resp, req, err -} - -// Send server side GET request -func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) { - req, _ := http.NewRequest("GET", urlStr, nil) - resp, err := t.client.Do(req) - return resp, req, err -} - -// Send server side PUT request -func (t *transporter) Put(urlStr string, body io.Reader) (*http.Response, *http.Request, error) { - req, _ := http.NewRequest("PUT", urlStr, body) - resp, err := t.client.Do(req) - return resp, req, err -} - -// PostSnapshot posts a json format snapshot to the given url -// The underlying HTTP transport has a minute level timeout -func (t *transporter) PostSnapshot(url string, body io.Reader) (*http.Response, error) { - return t.snapshotClient.Post(url, "application/json", body) -} diff --git a/server/v2/delete_handler.go b/server/v2/delete_handler.go deleted file mode 100644 index 6fe77f6f4..000000000 --- a/server/v2/delete_handler.go +++ /dev/null @@ -1,51 +0,0 @@ -// +build ignore - -package v2 - -import ( - "net/http" - "strconv" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" -) - -func DeleteHandler(w http.ResponseWriter, req *http.Request, s Server) error { - vars := mux.Vars(req) - key := "/" + vars["key"] - - recursive := (req.FormValue("recursive") == "true") - dir := (req.FormValue("dir") == "true") - - req.ParseForm() - _, valueOk := req.Form["prevValue"] - _, indexOk := req.Form["prevIndex"] - - if !valueOk && !indexOk { - c := s.Store().CommandFactory().CreateDeleteCommand(key, dir, recursive) - return s.Dispatch(c, w, req) - } - - var err error - prevIndex := uint64(0) - prevValue := req.Form.Get("prevValue") - - if indexOk { - prevIndexStr := req.Form.Get("prevIndex") - prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64) - - // bad previous index - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store().Index()) - } - } - - if valueOk { - if prevValue == "" { - return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store().Index()) - } - } - - c := s.Store().CommandFactory().CreateCompareAndDeleteCommand(key, prevValue, prevIndex) - return s.Dispatch(c, w, req) -} diff --git a/server/v2/get_handler.go b/server/v2/get_handler.go deleted file mode 100644 index 671e297cd..000000000 --- a/server/v2/get_handler.go +++ /dev/null @@ -1,145 +0,0 @@ -// +build ignore - -package v2 - -import ( - "encoding/json" - "fmt" - "net/http" - "net/url" - "strconv" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" -) - -func GetHandler(w http.ResponseWriter, req *http.Request, s Server) error { - vars := mux.Vars(req) - key := "/" + vars["key"] - - recursive := (req.FormValue("recursive") == "true") - sort := (req.FormValue("sorted") == "true") - - if req.FormValue("quorum") == "true" { - c := s.Store().CommandFactory().CreateGetCommand(key, recursive, sort) - return s.Dispatch(c, w, req) - } - - // Help client to redirect the request to the current leader - if req.FormValue("consistent") == "true" && s.State() != raft.Leader { - leader := s.Leader() - hostname, _ := s.ClientURL(leader) - - url, err := url.Parse(hostname) - if err != nil { - log.Warn("Redirect cannot parse hostName ", hostname) - return err - } - url.RawQuery = req.URL.RawQuery - url.Path = req.URL.Path - - log.Debugf("Redirect consistent get to %s", url.String()) - http.Redirect(w, req, url.String(), http.StatusTemporaryRedirect) - return nil - } - - waitIndex := req.FormValue("waitIndex") - stream := (req.FormValue("stream") == "true") - - if req.FormValue("wait") == "true" { - return handleWatch(key, recursive, stream, waitIndex, w, req, s) - } - - return handleGet(key, recursive, sort, w, req, s) -} - -func handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request, s Server) error { - // Create a command to watch from a given index (default 0). - var sinceIndex uint64 = 0 - var err error - - if waitIndex != "" { - sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store().Index()) - } - } - - watcher, err := s.Store().Watch(key, recursive, stream, sinceIndex) - if err != nil { - return err - } - - cn, _ := w.(http.CloseNotifier) - closeChan := cn.CloseNotify() - - writeHeaders(w, s) - w.(http.Flusher).Flush() - - if stream { - // watcher hub will not help to remove stream watcher - // so we need to remove here - defer watcher.Remove() - for { - select { - case <-closeChan: - return nil - case event, ok := <-watcher.EventChan: - if !ok { - // If the channel is closed this may be an indication of - // that notifications are much more than we are able to - // send to the client in time. Then we simply end streaming. - return nil - } - if req.Method == "HEAD" { - continue - } - - b, _ := json.Marshal(event) - _, err := w.Write(b) - if err != nil { - return nil - } - w.(http.Flusher).Flush() - } - } - } - - select { - case <-closeChan: - watcher.Remove() - case event := <-watcher.EventChan: - if req.Method == "HEAD" { - return nil - } - b, _ := json.Marshal(event) - w.Write(b) - } - return nil -} - -func handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request, s Server) error { - event, err := s.Store().Get(key, recursive, sort) - if err != nil { - return err - } - - if req.Method == "HEAD" { - return nil - } - - writeHeaders(w, s) - b, _ := json.Marshal(event) - w.Write(b) - return nil -} - -func writeHeaders(w http.ResponseWriter, s Server) { - w.Header().Set("Content-Type", "application/json") - w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store().Index())) - w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex())) - w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term())) - w.WriteHeader(http.StatusOK) -} diff --git a/server/v2/post_handler.go b/server/v2/post_handler.go deleted file mode 100644 index face97203..000000000 --- a/server/v2/post_handler.go +++ /dev/null @@ -1,26 +0,0 @@ -// +build ignore - -package v2 - -import ( - "net/http" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" -) - -func PostHandler(w http.ResponseWriter, req *http.Request, s Server) error { - vars := mux.Vars(req) - key := "/" + vars["key"] - - value := req.FormValue("value") - dir := (req.FormValue("dir") == "true") - expireTime, err := store.TTL(req.FormValue("ttl")) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store().Index()) - } - - c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, true) - return s.Dispatch(c, w, req) -} diff --git a/server/v2/put_handler.go b/server/v2/put_handler.go deleted file mode 100644 index 305109fcb..000000000 --- a/server/v2/put_handler.go +++ /dev/null @@ -1,100 +0,0 @@ -// +build ignore - -package v2 - -import ( - "net/http" - "strconv" - "time" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - "github.com/coreos/etcd/third_party/github.com/gorilla/mux" -) - -func PutHandler(w http.ResponseWriter, req *http.Request, s Server) error { - var c raft.Command - - vars := mux.Vars(req) - key := "/" + vars["key"] - - req.ParseForm() - - value := req.Form.Get("value") - dir := (req.FormValue("dir") == "true") - - expireTime, err := store.TTL(req.Form.Get("ttl")) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store().Index()) - } - - _, valueOk := req.Form["prevValue"] - prevValue := req.FormValue("prevValue") - - _, indexOk := req.Form["prevIndex"] - prevIndexStr := req.FormValue("prevIndex") - - _, existOk := req.Form["prevExist"] - prevExist := req.FormValue("prevExist") - - // Set handler: create a new node or replace the old one. - if !valueOk && !indexOk && !existOk { - return SetHandler(w, req, s, key, dir, value, expireTime) - } - - // update with test - if existOk { - if prevExist == "false" { - // Create command: create a new node. Fail, if a node already exists - // Ignore prevIndex and prevValue - return CreateHandler(w, req, s, key, dir, value, expireTime) - } - - if prevExist == "true" && !indexOk && !valueOk { - return UpdateHandler(w, req, s, key, value, expireTime) - } - } - - var prevIndex uint64 - - if indexOk { - prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64) - - // bad previous index - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store().Index()) - } - } else { - prevIndex = 0 - } - - if valueOk { - if prevValue == "" { - return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store().Index()) - } - } - - c = s.Store().CommandFactory().CreateCompareAndSwapCommand(key, value, prevValue, prevIndex, expireTime) - return s.Dispatch(c, w, req) -} - -func SetHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error { - c := s.Store().CommandFactory().CreateSetCommand(key, dir, value, expireTime) - return s.Dispatch(c, w, req) -} - -func CreateHandler(w http.ResponseWriter, req *http.Request, s Server, key string, dir bool, value string, expireTime time.Time) error { - c := s.Store().CommandFactory().CreateCreateCommand(key, dir, value, expireTime, false) - return s.Dispatch(c, w, req) -} - -func UpdateHandler(w http.ResponseWriter, req *http.Request, s Server, key, value string, expireTime time.Time) error { - // Update should give at least one option - if value == "" && expireTime.Sub(store.Permanent) == 0 { - return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store().Index()) - } - - c := s.Store().CommandFactory().CreateUpdateCommand(key, value, expireTime) - return s.Dispatch(c, w, req) -} diff --git a/server/v2/v2.go b/server/v2/v2.go deleted file mode 100644 index c65fd022f..000000000 --- a/server/v2/v2.go +++ /dev/null @@ -1,22 +0,0 @@ -// +build ignore - -package v2 - -import ( - "net/http" - - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" -) - -// The Server interface provides all the methods required for the v2 API. -type Server interface { - State() string - Leader() string - CommitIndex() uint64 - Term() uint64 - PeerURL(string) (string, bool) - ClientURL(string) (string, bool) - Store() store.Store - Dispatch(raft.Command, http.ResponseWriter, *http.Request) error -} diff --git a/server/version.go b/server/version.go deleted file mode 100644 index 44a2dc288..000000000 --- a/server/version.go +++ /dev/null @@ -1,5 +0,0 @@ -// +build ignore - -package server - -const Version = "v2"