Merge pull request #786 from unihorn/91

feat(standby_server): write cluster info to disk
This commit is contained in:
Brandon Philips
2014-05-18 10:08:52 -07:00
7 changed files with 157 additions and 59 deletions

View File

@@ -1,9 +1,13 @@
package server
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"sync"
"time"
@@ -15,20 +19,28 @@ import (
"github.com/coreos/etcd/store"
)
const standbyInfoName = "standby_info"
type StandbyServerConfig struct {
Name string
PeerScheme string
PeerURL string
ClientURL string
DataDir string
}
type standbyInfo struct {
Running bool
Cluster []*machineMessage
SyncInterval float64
}
type StandbyServer struct {
Config StandbyServerConfig
client *Client
cluster []*machineMessage
syncInterval float64
joinIndex uint64
standbyInfo
joinIndex uint64
removeNotify chan bool
started bool
@@ -39,11 +51,15 @@ type StandbyServer struct {
}
func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
return &StandbyServer{
Config: config,
client: client,
syncInterval: DefaultSyncInterval,
s := &StandbyServer{
Config: config,
client: client,
standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
}
if err := s.loadInfo(); err != nil {
log.Warnf("error load standby info file: %v", err)
}
return s
}
func (s *StandbyServer) Start() {
@@ -62,6 +78,7 @@ func (s *StandbyServer) Start() {
defer s.routineGroup.Done()
s.monitorCluster()
}()
s.Running = true
}
// Stop stops the server gracefully.
@@ -75,6 +92,11 @@ func (s *StandbyServer) Stop() {
close(s.closeChan)
s.routineGroup.Wait()
if err := s.saveInfo(); err != nil {
log.Warnf("error saving cluster info for standby")
}
s.Running = false
}
// RemoveNotify notifies the server is removed from standby mode and ready
@@ -87,20 +109,24 @@ func (s *StandbyServer) ClientHTTPHandler() http.Handler {
return http.HandlerFunc(s.redirectRequests)
}
func (s *StandbyServer) Cluster() []string {
func (s *StandbyServer) IsRunning() bool {
return s.Running
}
func (s *StandbyServer) ClusterURLs() []string {
peerURLs := make([]string, 0)
for _, peer := range s.cluster {
for _, peer := range s.Cluster {
peerURLs = append(peerURLs, peer.PeerURL)
}
return peerURLs
}
func (s *StandbyServer) ClusterSize() int {
return len(s.cluster)
return len(s.Cluster)
}
func (s *StandbyServer) setCluster(cluster []*machineMessage) {
s.cluster = cluster
s.Cluster = cluster
}
func (s *StandbyServer) SyncCluster(peers []string) error {
@@ -109,20 +135,20 @@ func (s *StandbyServer) SyncCluster(peers []string) error {
}
if err := s.syncCluster(peers); err != nil {
log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err)
log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
return err
}
log.Infof("set cluster(%v) for standby server", s.Cluster())
log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
return nil
}
func (s *StandbyServer) SetSyncInterval(second float64) {
s.syncInterval = second
s.SyncInterval = second
}
func (s *StandbyServer) ClusterLeader() *machineMessage {
for _, machine := range s.cluster {
for _, machine := range s.Cluster {
if machine.State == raft.Leader {
return machine
}
@@ -144,9 +170,11 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request)
uhttp.Redirect(leader.ClientURL, w, r)
}
// monitorCluster assumes that the machine has tried to join the cluster and
// failed, so it waits for the interval at the beginning.
func (s *StandbyServer) monitorCluster() {
for {
timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second))))
timer := time.NewTimer(time.Duration(int64(s.SyncInterval * float64(time.Second))))
defer timer.Stop()
select {
case <-s.closeChan:
@@ -155,13 +183,13 @@ func (s *StandbyServer) monitorCluster() {
}
if err := s.syncCluster(nil); err != nil {
log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
continue
}
leader := s.ClusterLeader()
if leader == nil {
log.Warnf("fail getting leader from cluster(%v)", s.Cluster())
log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
continue
}
@@ -180,7 +208,7 @@ func (s *StandbyServer) monitorCluster() {
}
func (s *StandbyServer) syncCluster(peerURLs []string) error {
peerURLs = append(s.Cluster(), peerURLs...)
peerURLs = append(s.ClusterURLs(), peerURLs...)
for _, peerURL := range peerURLs {
// Fetch current peer list
@@ -198,6 +226,9 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error {
s.setCluster(machines)
s.SetSyncInterval(config.SyncInterval)
if err := s.saveInfo(); err != nil {
log.Warnf("fail saving cluster info into disk: %v", err)
}
return nil
}
return fmt.Errorf("unreachable cluster")
@@ -221,8 +252,8 @@ func (s *StandbyServer) join(peer string) error {
log.Debugf("error getting cluster config")
return err
}
if clusterConfig.ActiveSize <= len(s.Cluster()) {
log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster()))
if clusterConfig.ActiveSize <= len(s.Cluster) {
log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
return fmt.Errorf("out of quota")
}
@@ -252,3 +283,41 @@ func (s *StandbyServer) fullPeerURL(urlStr string) string {
u.Scheme = s.Config.PeerScheme
return u.String()
}
func (s *StandbyServer) loadInfo() error {
var info standbyInfo
path := filepath.Join(s.Config.DataDir, standbyInfoName)
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
defer file.Close()
if err = json.NewDecoder(file).Decode(&info); err != nil {
return err
}
s.standbyInfo = info
return nil
}
func (s *StandbyServer) saveInfo() error {
tmpFile, err := ioutil.TempFile(s.Config.DataDir, standbyInfoName)
if err != nil {
return err
}
if err = json.NewEncoder(tmpFile).Encode(s.standbyInfo); err != nil {
tmpFile.Close()
os.Remove(tmpFile.Name())
return err
}
tmpFile.Close()
path := filepath.Join(s.Config.DataDir, standbyInfoName)
if err = os.Rename(tmpFile.Name(), path); err != nil {
return err
}
return nil
}