diff --git a/command.go b/command.go deleted file mode 100644 index dec76216c..000000000 --- a/command.go +++ /dev/null @@ -1,255 +0,0 @@ -package main - -import ( - "encoding/binary" - "fmt" - "os" - "path" - "time" - - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" -) - -const commandPrefix = "etcd:" - -func commandName(name string) string { - return commandPrefix + name -} - -// A command represents an action to be taken on the replicated state machine. -type Command interface { - CommandName() string - Apply(server *raft.Server) (interface{}, error) -} - -// Create command -type CreateCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` - IncrementalSuffix bool `json:"incrementalSuffix"` - Force bool `json:"force"` -} - -// The name of the create command in the log -func (c *CreateCommand) CommandName() string { - return commandName("create") -} - -// Create node -func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) - - e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) - - if err != nil { - debug(err) - return nil, err - } - - return e, nil -} - -// Update command -type UpdateCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` -} - -// The name of the update command in the log -func (c *UpdateCommand) CommandName() string { - return commandName("update") -} - -// Update node -func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) - - e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) - - if err != nil { - debug(err) - return nil, err - } - - return e, nil -} - -// TestAndSet command -type TestAndSetCommand struct { - Key string `json:"key"` - Value string `json:"value"` - ExpireTime time.Time `json:"expireTime"` - PrevValue string `json: prevValue` - PrevIndex uint64 `json: prevValue` -} - -// The name of the testAndSet command in the log -func (c *TestAndSetCommand) CommandName() string { - return commandName("testAndSet") -} - -// Set the key-value pair if the current value of the key equals to the given prevValue -func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) - - e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, - c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) - - if err != nil { - debug(err) - return nil, err - } - - return e, nil -} - -// Delete command -type DeleteCommand struct { - Key string `json:"key"` - Recursive bool `json:"recursive"` -} - -// The name of the delete command in the log -func (c *DeleteCommand) CommandName() string { - return commandName("delete") -} - -// Delete the key -func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) - - e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) - - if err != nil { - debug(err) - return nil, err - } - - return e, nil -} - -// JoinCommand -type JoinCommand struct { - RaftVersion string `json:"raftVersion"` - Name string `json:"name"` - RaftURL string `json:"raftURL"` - EtcdURL string `json:"etcdURL"` -} - -func newJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand { - return &JoinCommand{ - RaftVersion: version, - Name: name, - RaftURL: raftUrl, - EtcdURL: etcdUrl, - } -} - -// The name of the join command in the log -func (c *JoinCommand) CommandName() string { - return commandName("join") -} - -// Join a server to the cluster -func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) - r, _ := server.Context().(*raftServer) - - // check if the join command is from a previous machine, who lost all its previous log. - e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term()) - - b := make([]byte, 8) - binary.PutUvarint(b, server.CommitIndex()) - - if e != nil { - return b, nil - } - - // check machine number in the cluster - num := machineNum() - if num == maxClusterSize { - debug("Reject join request from ", c.Name) - return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term()) - } - - addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL) - - // add peer in raft - err := server.AddPeer(c.Name, "") - - // add machine in etcd storage - key := path.Join("_etcd/machines", c.Name) - value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) - s.Create(key, value, false, false, store.Permanent, server.CommitIndex(), server.Term()) - - // add peer stats - if c.Name != r.Name() { - r.followersStats.Followers[c.Name] = &raftFollowerStats{} - r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 - } - - return b, err -} - -func (c *JoinCommand) NodeName() string { - return c.Name -} - -// RemoveCommand -type RemoveCommand struct { - Name string `json:"name"` -} - -// The name of the remove command in the log -func (c *RemoveCommand) CommandName() string { - return commandName("remove") -} - -// Remove a server from the cluster -func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { - s, _ := server.StateMachine().(*store.Store) - r, _ := server.Context().(*raftServer) - - // remove machine in etcd storage - key := path.Join("_etcd/machines", c.Name) - - _, err := s.Delete(key, false, server.CommitIndex(), server.Term()) - // delete from stats - delete(r.followersStats.Followers, c.Name) - - if err != nil { - return []byte{0}, err - } - - // remove peer in raft - err = server.RemovePeer(c.Name) - - if err != nil { - return []byte{0}, err - } - - if c.Name == server.Name() { - // the removed node is this node - - // if the node is not replaying the previous logs - // and the node has sent out a join request in this - // start. It is sure that this node received a new remove - // command and need to be removed - if server.CommitIndex() > r.joinIndex && r.joinIndex != 0 { - debugf("server [%s] is removed", server.Name()) - os.Exit(0) - } else { - // else ignore remove - debugf("ignore previous remove command.") - } - } - - b := make([]byte, 8) - binary.PutUvarint(b, server.CommitIndex()) - - return b, err -} diff --git a/command/command.go b/command/command.go new file mode 100644 index 000000000..50513d5f5 --- /dev/null +++ b/command/command.go @@ -0,0 +1,19 @@ +package command + +import ( + "github.com/coreos/go-raft" +) + +// A command represents an action to be taken on the replicated state machine. +type Command interface { + CommandName() string + Apply(server *raft.Server) (interface{}, error) +} + +// Registers commands to the Raft library. +func Register() { + raft.RegisterCommand(&DeleteCommand{}) + raft.RegisterCommand(&TestAndSetCommand{}) + raft.RegisterCommand(&CreateCommand{}) + raft.RegisterCommand(&UpdateCommand{}) +} diff --git a/command/create_command.go b/command/create_command.go new file mode 100644 index 000000000..6dd2c5aba --- /dev/null +++ b/command/create_command.go @@ -0,0 +1,36 @@ +package command + +import ( + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" + "time" +) + +// Create command +type CreateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + IncrementalSuffix bool `json:"incrementalSuffix"` + Force bool `json:"force"` +} + +// The name of the create command in the log +func (c *CreateCommand) CommandName() string { + return "etcd:create" +} + +// Create node +func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(*store.Store) + + e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + log.Debug(err) + return nil, err + } + + return e, nil +} diff --git a/command/delete_command.go b/command/delete_command.go new file mode 100644 index 000000000..a0d03c99d --- /dev/null +++ b/command/delete_command.go @@ -0,0 +1,32 @@ +package command + +import ( + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" +) + +// The DeleteCommand removes a key from the Store. +type DeleteCommand struct { + Key string `json:"key"` + Recursive bool `json:"recursive"` +} + +// The name of the delete command in the log +func (c *DeleteCommand) CommandName() string { + return "etcd:delete" +} + +// Delete the key +func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(*store.Store) + + e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term()) + + if err != nil { + log.Debug(err) + return nil, err + } + + return e, nil +} diff --git a/command/test_and_set_command.go b/command/test_and_set_command.go new file mode 100644 index 000000000..4d723e221 --- /dev/null +++ b/command/test_and_set_command.go @@ -0,0 +1,38 @@ +package command + +import ( + "time" + + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" +) + +// The TestAndSetCommand performs a conditional update on a key in the store. +type TestAndSetCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + PrevValue string `json: prevValue` + PrevIndex uint64 `json: prevIndex` +} + +// The name of the testAndSet command in the log +func (c *TestAndSetCommand) CommandName() string { + return "etcd:testAndSet" +} + +// Set the key-value pair if the current value of the key equals to the given prevValue +func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(*store.Store) + + e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex, + c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + log.Debug(err) + return nil, err + } + + return e, nil +} diff --git a/command/update_command.go b/command/update_command.go new file mode 100644 index 000000000..245e3c1c7 --- /dev/null +++ b/command/update_command.go @@ -0,0 +1,35 @@ +package command + +import ( + "time" + + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" +) + +// The UpdateCommand updates the value of a key in the Store. +type UpdateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` +} + +// The name of the update command in the log +func (c *UpdateCommand) CommandName() string { + return "etcd:update" +} + +// Update node +func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(*store.Store) + + e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term()) + + if err != nil { + log.Debug(err) + return nil, err + } + + return e, nil +} diff --git a/etcd.go b/etcd.go index 8a1c63042..79bda7b67 100644 --- a/etcd.go +++ b/etcd.go @@ -8,8 +8,9 @@ import ( "strings" "time" - "github.com/coreos/etcd/store" + "github.com/coreos/etcd/log" "github.com/coreos/etcd/server" + "github.com/coreos/etcd/store" "github.com/coreos/go-raft" ) @@ -20,7 +21,6 @@ import ( //------------------------------------------------------------------------------ var ( - verbose bool veryVerbose bool machines string @@ -43,11 +43,11 @@ var ( cpuprofile string - cors string + cors string ) func init() { - flag.BoolVar(&verbose, "v", false, "verbose logging") + flag.BoolVar(&log.Verbose, "v", false, "verbose logging") flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging") flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma") @@ -97,12 +97,6 @@ const ( // //------------------------------------------------------------------------------ -type TLSInfo struct { - CertFile string `json:"CertFile"` - KeyFile string `json:"KeyFile"` - CAFile string `json:"CAFile"` -} - type Info struct { Name string `json:"name"` @@ -117,12 +111,6 @@ type Info struct { EtcdTLS TLSInfo `json:"etcdTLS"` } -type TLSConfig struct { - Scheme string - Server tls.Config - Client tls.Config -} - //------------------------------------------------------------------------------ // // Variables @@ -199,6 +187,7 @@ func main() { // Create etcd and raft server r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS) + r.MaxClusterSize = maxClusterSize snapConf = r.newSnapshotConf() s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r) @@ -209,4 +198,3 @@ func main() { r.ListenAndServe() s.ListenAndServe() } - diff --git a/log/log.go b/log/log.go new file mode 100644 index 000000000..7f827b0ce --- /dev/null +++ b/log/log.go @@ -0,0 +1,44 @@ +package log + +import ( + golog "github.com/coreos/go-log/log" + "os" +) + +// The Verbose flag turns on verbose logging. +var Verbose bool = false + +var logger *golog.Logger = golog.New("etcd", false, + golog.CombinedSink(os.Stdout, "[%s] %s %-9s | %s\n", []string{"prefix", "time", "priority", "message"})) + +func Infof(format string, v ...interface{}) { + logger.Infof(format, v...) +} + +func Debugf(format string, v ...interface{}) { + if Verbose { + logger.Debugf(format, v...) + } +} + +func Debug(v ...interface{}) { + if Verbose { + logger.Debug(v...) + } +} + +func Warnf(format string, v ...interface{}) { + logger.Warningf(format, v...) +} + +func Warn(v ...interface{}) { + logger.Warning(v...) +} + +func Fatalf(format string, v ...interface{}) { + logger.Fatalf(format, v...) +} + +func Fatal(v ...interface{}) { + logger.Fatalln(v...) +} diff --git a/machines.go b/machines.go index b8b4a09d5..1da25ed75 100644 --- a/machines.go +++ b/machines.go @@ -1,16 +1,5 @@ package main -// machineNum returns the number of machines in the cluster -func machineNum() int { - e, err := etcdStore.Get("/_etcd/machines", false, false, 0, 0) - - if err != nil { - return 0 - } - - return len(e.KVPairs) -} - // getMachines gets the current machines in the cluster func (r *raftServer) getMachines(toURL func(string) (string, bool)) []string { peers := r.Peers() diff --git a/raft_server.go b/raft_server.go index 146a2e84e..00cf4cbdb 100644 --- a/raft_server.go +++ b/raft_server.go @@ -11,10 +11,15 @@ import ( "net/url" "time" + "github.com/coreos/etcd/command" etcdErr "github.com/coreos/etcd/error" "github.com/coreos/go-raft" ) +func init() { + command.Register() +} + type raftServer struct { *raft.Server version string @@ -26,10 +31,9 @@ type raftServer struct { tlsInfo *TLSInfo followersStats *raftFollowersStats serverStats *raftServerStats + MaxClusterSize int } -//var r *raftServer - func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer { raftWrapper := &raftServer{ @@ -68,9 +72,6 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi // Start the raft server func (r *raftServer) ListenAndServe() { - // Setup commands. - registerCommands() - // LoadSnapshot if snapshot { err := r.LoadSnapshot() @@ -314,16 +315,3 @@ func (r *raftServer) PeerStats() []byte { } return nil } - -// Register commands to raft server -func registerCommands() { - raft.RegisterCommand(&JoinCommand{}) - raft.RegisterCommand(&RemoveCommand{}) - raft.RegisterCommand(&GetCommand{}) - raft.RegisterCommand(&DeleteCommand{}) - raft.RegisterCommand(&WatchCommand{}) - raft.RegisterCommand(&TestAndSetCommand{}) - - raft.RegisterCommand(&CreateCommand{}) - raft.RegisterCommand(&UpdateCommand{}) -} diff --git a/raft_stats.go b/raft_stats.go deleted file mode 100644 index 45d21037f..000000000 --- a/raft_stats.go +++ /dev/null @@ -1,210 +0,0 @@ -package main - -import ( - "math" - "sync" - "time" - - "github.com/coreos/go-raft" -) - -const ( - queueCapacity = 200 -) - -// packageStats represent the stats we need for a package. -// It has sending time and the size of the package. -type packageStats struct { - sendingTime time.Time - size int -} - -// NewPackageStats creates a pacakgeStats and return the pointer to it. -func NewPackageStats(now time.Time, size int) *packageStats { - return &packageStats{ - sendingTime: now, - size: size, - } -} - -// Time return the sending time of the package. -func (ps *packageStats) Time() time.Time { - return ps.sendingTime -} - -type raftServerStats struct { - Name string `json:"name"` - State string `json:"state"` - StartTime time.Time `json:"startTime"` - - LeaderInfo struct { - Name string `json:"leader"` - Uptime string `json:"uptime"` - startTime time.Time - } `json:"leaderInfo"` - - RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` - RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` - RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"` - - SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` - SendingPkgRate float64 `json:"sendPkgRate,omitempty"` - SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` - - sendRateQueue *statsQueue - recvRateQueue *statsQueue -} - -func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { - ss.State = raft.Follower - if leaderName != ss.LeaderInfo.Name { - ss.LeaderInfo.Name = leaderName - ss.LeaderInfo.startTime = time.Now() - } - - ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) - ss.RecvAppendRequestCnt++ -} - -func (ss *raftServerStats) SendAppendReq(pkgSize int) { - now := time.Now() - - if ss.State != raft.Leader { - ss.State = raft.Leader - ss.LeaderInfo.Name = ss.Name - ss.LeaderInfo.startTime = now - } - - ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize)) - - ss.SendAppendRequestCnt++ -} - -type raftFollowersStats struct { - Leader string `json:"leader"` - Followers map[string]*raftFollowerStats `json:"followers"` -} - -type raftFollowerStats struct { - Latency struct { - Current float64 `json:"current"` - Average float64 `json:"average"` - averageSquare float64 - StandardDeviation float64 `json:"standardDeviation"` - Minimum float64 `json:"minimum"` - Maximum float64 `json:"maximum"` - } `json:"latency"` - - Counts struct { - Fail uint64 `json:"fail"` - Success uint64 `json:"success"` - } `json:"counts"` -} - -// Succ function update the raftFollowerStats with a successful send -func (ps *raftFollowerStats) Succ(d time.Duration) { - total := float64(ps.Counts.Success) * ps.Latency.Average - totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare - - ps.Counts.Success++ - - ps.Latency.Current = float64(d) / (1000000.0) - - if ps.Latency.Current > ps.Latency.Maximum { - ps.Latency.Maximum = ps.Latency.Current - } - - if ps.Latency.Current < ps.Latency.Minimum { - ps.Latency.Minimum = ps.Latency.Current - } - - ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success) - ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success) - - // sdv = sqrt(avg(x^2) - avg(x)^2) - ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average) -} - -// Fail function update the raftFollowerStats with a unsuccessful send -func (ps *raftFollowerStats) Fail() { - ps.Counts.Fail++ -} - -type statsQueue struct { - items [queueCapacity]*packageStats - size int - front int - back int - totalPkgSize int - rwl sync.RWMutex -} - -func (q *statsQueue) Len() int { - return q.size -} - -func (q *statsQueue) PkgSize() int { - return q.totalPkgSize -} - -// FrontAndBack gets the front and back elements in the queue -// We must grab front and back together with the protection of the lock -func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) { - q.rwl.RLock() - defer q.rwl.RUnlock() - if q.size != 0 { - return q.items[q.front], q.items[q.back] - } - return nil, nil -} - -// Insert function insert a packageStats into the queue and update the records -func (q *statsQueue) Insert(p *packageStats) { - q.rwl.Lock() - defer q.rwl.Unlock() - - q.back = (q.back + 1) % queueCapacity - - if q.size == queueCapacity { //dequeue - q.totalPkgSize -= q.items[q.front].size - q.front = (q.back + 1) % queueCapacity - } else { - q.size++ - } - - q.items[q.back] = p - q.totalPkgSize += q.items[q.back].size - -} - -// Rate function returns the package rate and byte rate -func (q *statsQueue) Rate() (float64, float64) { - front, back := q.frontAndBack() - - if front == nil || back == nil { - return 0, 0 - } - - if time.Now().Sub(back.Time()) > time.Second { - q.Clear() - return 0, 0 - } - - sampleDuration := back.Time().Sub(front.Time()) - - pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second) - - br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second) - - return pr, br -} - -// Clear function clear up the statsQueue -func (q *statsQueue) Clear() { - q.rwl.Lock() - defer q.rwl.Unlock() - q.back = -1 - q.front = 0 - q.size = 0 - q.totalPkgSize = 0 -} diff --git a/server/join_command.go b/server/join_command.go new file mode 100644 index 000000000..89a3a4e54 --- /dev/null +++ b/server/join_command.go @@ -0,0 +1,84 @@ +package server + +import ( + "encoding/binary" + "fmt" + "path" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/log" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" +) + +func init() { + raft.RegisterCommand(&JoinCommand{}) +} + +// The JoinCommand adds a node to the cluster. +type JoinCommand struct { + RaftVersion string `json:"raftVersion"` + Name string `json:"name"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` + MaxClusterSize int `json:"maxClusterSize"` +} + +func NewJoinCommand(version, name, raftUrl, etcdUrl string, maxClusterSize int) *JoinCommand { + return &JoinCommand{ + RaftVersion: version, + Name: name, + RaftURL: raftUrl, + EtcdURL: etcdUrl, + MaxClusterSize: maxClusterSize, + } +} + +// The name of the join command in the log +func (c *JoinCommand) CommandName() string { + return "etcd:join" +} + +// Join a server to the cluster +func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(*store.Store) + r, _ := server.Context().(*RaftServer) + + // check if the join command is from a previous machine, who lost all its previous log. + e, _ := s.Get(path.Join("/_etcd/machines", c.Name), false, false, server.CommitIndex(), server.Term()) + + b := make([]byte, 8) + binary.PutUvarint(b, server.CommitIndex()) + + if e != nil { + return b, nil + } + + // check machine number in the cluster + if s.MachineCount() == c.MaxClusterSize { + log.Debug("Reject join request from ", c.Name) + return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term()) + } + + addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL) + + // add peer in raft + err := server.AddPeer(c.Name, "") + + // add machine in etcd storage + key := path.Join("_etcd/machines", c.Name) + value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) + s.Create(key, value, false, false, store.Permanent, server.CommitIndex(), server.Term()) + + // add peer stats + if c.Name != r.Name() { + r.followersStats.Followers[c.Name] = &raftFollowerStats{} + r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63 + } + + return b, err +} + +func (c *JoinCommand) NodeName() string { + return c.Name +} diff --git a/server/package_stats.go b/server/package_stats.go new file mode 100644 index 000000000..519168033 --- /dev/null +++ b/server/package_stats.go @@ -0,0 +1,25 @@ +package server + +import ( + "time" +) + +// packageStats represent the stats we need for a package. +// It has sending time and the size of the package. +type packageStats struct { + sendingTime time.Time + size int +} + +// NewPackageStats creates a pacakgeStats and return the pointer to it. +func NewPackageStats(now time.Time, size int) *packageStats { + return &packageStats{ + sendingTime: now, + size: size, + } +} + +// Time return the sending time of the package. +func (ps *packageStats) Time() time.Time { + return ps.sendingTime +} diff --git a/server/raft_follower_stats.go b/server/raft_follower_stats.go new file mode 100644 index 000000000..96b76c85b --- /dev/null +++ b/server/raft_follower_stats.go @@ -0,0 +1,56 @@ +package server + +import ( + "math" + "time" +) + +type raftFollowersStats struct { + Leader string `json:"leader"` + Followers map[string]*raftFollowerStats `json:"followers"` +} + +type raftFollowerStats struct { + Latency struct { + Current float64 `json:"current"` + Average float64 `json:"average"` + averageSquare float64 + StandardDeviation float64 `json:"standardDeviation"` + Minimum float64 `json:"minimum"` + Maximum float64 `json:"maximum"` + } `json:"latency"` + + Counts struct { + Fail uint64 `json:"fail"` + Success uint64 `json:"success"` + } `json:"counts"` +} + +// Succ function update the raftFollowerStats with a successful send +func (ps *raftFollowerStats) Succ(d time.Duration) { + total := float64(ps.Counts.Success) * ps.Latency.Average + totalSquare := float64(ps.Counts.Success) * ps.Latency.averageSquare + + ps.Counts.Success++ + + ps.Latency.Current = float64(d) / (1000000.0) + + if ps.Latency.Current > ps.Latency.Maximum { + ps.Latency.Maximum = ps.Latency.Current + } + + if ps.Latency.Current < ps.Latency.Minimum { + ps.Latency.Minimum = ps.Latency.Current + } + + ps.Latency.Average = (total + ps.Latency.Current) / float64(ps.Counts.Success) + ps.Latency.averageSquare = (totalSquare + ps.Latency.Current*ps.Latency.Current) / float64(ps.Counts.Success) + + // sdv = sqrt(avg(x^2) - avg(x)^2) + ps.Latency.StandardDeviation = math.Sqrt(ps.Latency.averageSquare - ps.Latency.Average*ps.Latency.Average) +} + +// Fail function update the raftFollowerStats with a unsuccessful send +func (ps *raftFollowerStats) Fail() { + ps.Counts.Fail++ +} diff --git a/server/raft_server_stats.go b/server/raft_server_stats.go new file mode 100644 index 000000000..451578ab7 --- /dev/null +++ b/server/raft_server_stats.go @@ -0,0 +1,55 @@ +package server + +import ( + "time" + + "github.com/coreos/go-raft" +) + +type raftServerStats struct { + Name string `json:"name"` + State string `json:"state"` + StartTime time.Time `json:"startTime"` + + LeaderInfo struct { + Name string `json:"leader"` + Uptime string `json:"uptime"` + startTime time.Time + } `json:"leaderInfo"` + + RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` + RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` + RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"` + + SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` + SendingPkgRate float64 `json:"sendPkgRate,omitempty"` + SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` + + sendRateQueue *statsQueue + recvRateQueue *statsQueue +} + +func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { + ss.State = raft.Follower + if leaderName != ss.LeaderInfo.Name { + ss.LeaderInfo.Name = leaderName + ss.LeaderInfo.startTime = time.Now() + } + + ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) + ss.RecvAppendRequestCnt++ +} + +func (ss *raftServerStats) SendAppendReq(pkgSize int) { + now := time.Now() + + if ss.State != raft.Leader { + ss.State = raft.Leader + ss.LeaderInfo.Name = ss.Name + ss.LeaderInfo.startTime = now + } + + ss.sendRateQueue.Insert(NewPackageStats(now, pkgSize)) + + ss.SendAppendRequestCnt++ +} diff --git a/server/remove_command.go b/server/remove_command.go new file mode 100644 index 000000000..a992de67c --- /dev/null +++ b/server/remove_command.go @@ -0,0 +1,68 @@ +package server + +import ( + "encoding/binary" + "path" + + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" +) + +func init() { + raft.RegisterCommand(&RemoveCommand{}) +} + +// The RemoveCommand removes a server from the cluster. +type RemoveCommand struct { + Name string `json:"name"` +} + +// The name of the remove command in the log +func (c *RemoveCommand) CommandName() string { + return "etcd:remove" +} + +// Remove a server from the cluster +func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) { + s, _ := server.StateMachine().(*store.Store) + r, _ := server.Context().(*RaftServer) + + // remove machine in etcd storage + key := path.Join("_etcd/machines", c.Name) + + _, err := s.Delete(key, false, server.CommitIndex(), server.Term()) + // delete from stats + delete(r.followersStats.Followers, c.Name) + + if err != nil { + return []byte{0}, err + } + + // remove peer in raft + err = server.RemovePeer(c.Name) + + if err != nil { + return []byte{0}, err + } + + if c.Name == server.Name() { + // the removed node is this node + + // if the node is not replaying the previous logs + // and the node has sent out a join request in this + // start. It is sure that this node received a new remove + // command and need to be removed + if server.CommitIndex() > r.joinIndex && r.joinIndex != 0 { + debugf("server [%s] is removed", server.Name()) + os.Exit(0) + } else { + // else ignore remove + debugf("ignore previous remove command.") + } + } + + b := make([]byte, 8) + binary.PutUvarint(b, server.CommitIndex()) + + return b, err +} diff --git a/server/server.go b/server/server.go index 4bf108301..fad643aec 100644 --- a/server/server.go +++ b/server/server.go @@ -1,16 +1,19 @@ package server import ( - "github.com/gorilla/mux" "net/http" "net/url" + + "github.com/coreos/etcd/command" + "github.com/coreos/go-raft" + "github.com/gorilla/mux" ) // The Server provides an HTTP interface to the underlying store. type Server interface { - CommitIndex() uint64 - Term() uint64 - Dispatch(Command, http.ResponseWriter, *http.Request) + CommitIndex() uint64 + Term() uint64 + Dispatch(command.Command, http.ResponseWriter, *http.Request) } // This is the default implementation of the Server interface. @@ -55,11 +58,6 @@ func (s *server) Term() uint64 { return c.raftServer.Term() } -// Executes a command against the Raft server. -func (s *server) Do(c Command, localOnly bool) (interface{}, error) { - return c.raftServer.Do(s.RaftServer().Server) -} - func (s *server) installV1() { s.handleFunc("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET") s.handleFunc("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT") diff --git a/server/stats_queue.go b/server/stats_queue.go new file mode 100644 index 000000000..5927eed15 --- /dev/null +++ b/server/stats_queue.go @@ -0,0 +1,88 @@ +package server + +import ( + "sync" +) + +const ( + queueCapacity = 200 +) + +type statsQueue struct { + items [queueCapacity]*packageStats + size int + front int + back int + totalPkgSize int + rwl sync.RWMutex +} + +func (q *statsQueue) Len() int { + return q.size +} + +func (q *statsQueue) PkgSize() int { + return q.totalPkgSize +} + +// FrontAndBack gets the front and back elements in the queue +// We must grab front and back together with the protection of the lock +func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) { + q.rwl.RLock() + defer q.rwl.RUnlock() + if q.size != 0 { + return q.items[q.front], q.items[q.back] + } + return nil, nil +} + +// Insert function insert a packageStats into the queue and update the records +func (q *statsQueue) Insert(p *packageStats) { + q.rwl.Lock() + defer q.rwl.Unlock() + + q.back = (q.back + 1) % queueCapacity + + if q.size == queueCapacity { //dequeue + q.totalPkgSize -= q.items[q.front].size + q.front = (q.back + 1) % queueCapacity + } else { + q.size++ + } + + q.items[q.back] = p + q.totalPkgSize += q.items[q.back].size + +} + +// Rate function returns the package rate and byte rate +func (q *statsQueue) Rate() (float64, float64) { + front, back := q.frontAndBack() + + if front == nil || back == nil { + return 0, 0 + } + + if time.Now().Sub(back.Time()) > time.Second { + q.Clear() + return 0, 0 + } + + sampleDuration := back.Time().Sub(front.Time()) + + pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second) + + br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second) + + return pr, br +} + +// Clear function clear up the statsQueue +func (q *statsQueue) Clear() { + q.rwl.Lock() + defer q.rwl.Unlock() + q.back = -1 + q.front = 0 + q.size = 0 + q.totalPkgSize = 0 +} diff --git a/server/tls_config.go b/server/tls_config.go new file mode 100644 index 000000000..4b944626d --- /dev/null +++ b/server/tls_config.go @@ -0,0 +1,11 @@ +package server + +import ( + "crypto/tls" +) + +type TLSConfig struct { + Scheme string + Server tls.Config + Client tls.Config +} diff --git a/server/tls_info.go b/server/tls_info.go new file mode 100644 index 000000000..91936b090 --- /dev/null +++ b/server/tls_info.go @@ -0,0 +1,7 @@ +package server + +type TLSInfo struct { + CertFile string `json:"CertFile"` + KeyFile string `json:"KeyFile"` + CAFile string `json:"CAFile"` +} diff --git a/server/v1/delete_key_handler.go b/server/v1/delete_key_handler.go index 8b9e315bd..c9d695fdc 100644 --- a/server/v1/delete_key_handler.go +++ b/server/v1/delete_key_handler.go @@ -1,15 +1,15 @@ package v1 import ( - "encoding/json" - "github.com/coreos/etcd/store" - "net/http" + "encoding/json" + "github.com/coreos/etcd/store" + "net/http" ) // Removes a key from the store. func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { - vars := mux.Vars(req) - key := "/" + vars["key"] + vars := mux.Vars(req) + key := "/" + vars["key"] command := &DeleteCommand{Key: key} return s.Dispatch(command, w, req) } diff --git a/server/v1/v1.go b/server/v1/v1.go index 709463b07..7f9f80a0e 100644 --- a/server/v1/v1.go +++ b/server/v1/v1.go @@ -7,44 +7,44 @@ import ( // The Server interface provides all the methods required for the v1 API. type Server interface { - CommitIndex() uint64 - Term() uint64 - Dispatch(http.ResponseWriter, *http.Request, Command) + CommitIndex() uint64 + Term() uint64 + Dispatch(http.ResponseWriter, *http.Request, Command) } // Converts an event object into a response object. func eventToResponse(event *store.Event) interface{} { - if !event.Dir { - response := &store.Response{ - Action: event.Action, - Key: event.Key, - Value: event.Value, - PrevValue: event.PrevValue, - Index: event.Index, - TTL: event.TTL, - Expiration: event.Expiration, - } + if !event.Dir { + response := &store.Response{ + Action: event.Action, + Key: event.Key, + Value: event.Value, + PrevValue: event.PrevValue, + Index: event.Index, + TTL: event.TTL, + Expiration: event.Expiration, + } - if response.Action == store.Create || response.Action == store.Update { - response.Action = "set" - if response.PrevValue == "" { - response.NewKey = true - } - } + if response.Action == store.Create || response.Action == store.Update { + response.Action = "set" + if response.PrevValue == "" { + response.NewKey = true + } + } - return response - } else { - responses := make([]*store.Response, len(event.KVPairs)) + return response + } else { + responses := make([]*store.Response, len(event.KVPairs)) - for i, kv := range event.KVPairs { - responses[i] = &store.Response{ - Action: event.Action, - Key: kv.Key, - Value: kv.Value, - Dir: kv.Dir, - Index: event.Index, - } - } - return responses - } + for i, kv := range event.KVPairs { + responses[i] = &store.Response{ + Action: event.Action, + Key: kv.Key, + Value: kv.Value, + Dir: kv.Dir, + Index: event.Index, + } + } + return responses + } } diff --git a/server/v2/handlers.go b/server/v2/handlers.go index 7f1029531..0d294de7d 100644 --- a/server/v2/handlers.go +++ b/server/v2/handlers.go @@ -1,15 +1,15 @@ package main import ( - "encoding/json" - "fmt" - "net/http" - "strconv" - "strings" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/etcd/store" - "github.com/coreos/go-raft" + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" + "github.com/coreos/go-raft" ) //------------------------------------------------------------------- @@ -17,22 +17,22 @@ import ( //------------------------------------------------------------------- func NewEtcdMuxer() *http.ServeMux { - // external commands - router := mux.NewRouter() - etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer)) - etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler)) - etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler)) - etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler)) - etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler)) - etcdMux.HandleFunc("/test/", TestHttpHandler) + // external commands + router := mux.NewRouter() + etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer)) + etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler)) + etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler)) + etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler)) + etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler)) + etcdMux.HandleFunc("/test/", TestHttpHandler) - // backward support - etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1)) - etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler)) - etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler)) - etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler)) + // backward support + etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1)) + etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler)) + etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler)) + etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler)) - return etcdMux + return etcdMux } type errorHandler func(http.ResponseWriter, *http.Request) error @@ -41,50 +41,50 @@ type errorHandler func(http.ResponseWriter, *http.Request) error // provided allowed origins and sets the Access-Control-Allow-Origin header if // there is a match. func addCorsHeader(w http.ResponseWriter, r *http.Request) { - val, ok := corsList["*"] - if val && ok { - w.Header().Add("Access-Control-Allow-Origin", "*") - return - } + val, ok := corsList["*"] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", "*") + return + } - requestOrigin := r.Header.Get("Origin") - val, ok = corsList[requestOrigin] - if val && ok { - w.Header().Add("Access-Control-Allow-Origin", requestOrigin) - return - } + requestOrigin := r.Header.Get("Origin") + val, ok = corsList[requestOrigin] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", requestOrigin) + return + } } func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - addCorsHeader(w, r) - if e := fn(w, r); e != nil { - if etcdErr, ok := e.(*etcdErr.Error); ok { - debug("Return error: ", (*etcdErr).Error()) - etcdErr.Write(w) - } else { - http.Error(w, e.Error(), http.StatusInternalServerError) - } - } + addCorsHeader(w, r) + if e := fn(w, r); e != nil { + if etcdErr, ok := e.(*etcdErr.Error); ok { + debug("Return error: ", (*etcdErr).Error()) + etcdErr.Write(w) + } else { + http.Error(w, e.Error(), http.StatusInternalServerError) + } + } } // Multiplex GET/POST/DELETE request to corresponding handlers func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error { - switch req.Method { - case "GET": - return e.GetHttpHandler(w, req) - case "POST": - return e.CreateHttpHandler(w, req) - case "PUT": - return e.UpdateHttpHandler(w, req) - case "DELETE": - return e.DeleteHttpHandler(w, req) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - return nil - } + switch req.Method { + case "GET": + return e.GetHttpHandler(w, req) + case "POST": + return e.CreateHttpHandler(w, req) + case "PUT": + return e.UpdateHttpHandler(w, req) + case "DELETE": + return e.DeleteHttpHandler(w, req) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + return nil + } - return nil + return nil } //-------------------------------------- @@ -93,111 +93,111 @@ func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error //-------------------------------------- func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := getNodePath(req.URL.Path) + key := getNodePath(req.URL.Path) - debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) + debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - value := req.FormValue("value") + value := req.FormValue("value") - expireTime, err := durationToExpireTime(req.FormValue("ttl")) + expireTime, err := durationToExpireTime(req.FormValue("ttl")) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) - } + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) + } - command := &CreateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - } + command := &CreateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } - if req.FormValue("incremental") == "true" { - command.IncrementalSuffix = true - } + if req.FormValue("incremental") == "true" { + command.IncrementalSuffix = true + } - return e.dispatchEtcdCommand(command, w, req) + return e.dispatchEtcdCommand(command, w, req) } func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := getNodePath(req.URL.Path) + key := getNodePath(req.URL.Path) - debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) + debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - req.ParseForm() + req.ParseForm() - value := req.Form.Get("value") + value := req.Form.Get("value") - expireTime, err := durationToExpireTime(req.Form.Get("ttl")) + expireTime, err := durationToExpireTime(req.Form.Get("ttl")) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) - } + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) + } - // update should give at least one option - if value == "" && expireTime.Sub(store.Permanent) == 0 { - return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) - } + // update should give at least one option + if value == "" && expireTime.Sub(store.Permanent) == 0 { + return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) + } - prevValue, valueOk := req.Form["prevValue"] + prevValue, valueOk := req.Form["prevValue"] - prevIndexStr, indexOk := req.Form["prevIndex"] + prevIndexStr, indexOk := req.Form["prevIndex"] - if !valueOk && !indexOk { // update without test - command := &UpdateCommand{ - Key: key, - Value: value, - ExpireTime: expireTime, - } + if !valueOk && !indexOk { // update without test + command := &UpdateCommand{ + Key: key, + Value: value, + ExpireTime: expireTime, + } - return e.dispatchEtcdCommand(command, w, req) + return e.dispatchEtcdCommand(command, w, req) - } else { // update with test - var prevIndex uint64 + } else { // update with test + var prevIndex uint64 - if indexOk { - prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) + if indexOk { + prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) - // bad previous index - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) - } - } else { - prevIndex = 0 - } + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) + } + } else { + prevIndex = 0 + } - command := &TestAndSetCommand{ - Key: key, - Value: value, - PrevValue: prevValue[0], - PrevIndex: prevIndex, - } + command := &TestAndSetCommand{ + Key: key, + Value: value, + PrevValue: prevValue[0], + PrevIndex: prevIndex, + } - return e.dispatchEtcdCommand(command, w, req) - } + return e.dispatchEtcdCommand(command, w, req) + } } // Delete Handler func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error { - key := getNodePath(req.URL.Path) + key := getNodePath(req.URL.Path) - debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) + debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - command := &DeleteCommand{ - Key: key, - } + command := &DeleteCommand{ + Key: key, + } - if req.FormValue("recursive") == "true" { - command.Recursive = true - } + if req.FormValue("recursive") == "true" { + command.Recursive = true + } - return e.dispatchEtcdCommand(command, w, req) + return e.dispatchEtcdCommand(command, w, req) } // Dispatch the command to leader func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error { - return e.raftServer.dispatch(c, w, req, nameToEtcdURL) + return e.raftServer.dispatch(c, w, req, nameToEtcdURL) } //-------------------------------------- @@ -208,157 +208,157 @@ func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req * // Handler to return the current leader's raft address func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { - r := e.raftServer + r := e.raftServer - leader := r.Leader() + leader := r.Leader() - if leader != "" { - w.WriteHeader(http.StatusOK) - raftURL, _ := nameToRaftURL(leader) - w.Write([]byte(raftURL)) + if leader != "" { + w.WriteHeader(http.StatusOK) + raftURL, _ := nameToRaftURL(leader) + w.Write([]byte(raftURL)) - return nil - } else { - return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm) - } + return nil + } else { + return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm) + } } // Handler to return all the known machines in the current cluster func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { - machines := e.raftServer.getMachines(nameToEtcdURL) + machines := e.raftServer.getMachines(nameToEtcdURL) - w.WriteHeader(http.StatusOK) - w.Write([]byte(strings.Join(machines, ", "))) + w.WriteHeader(http.StatusOK) + w.Write([]byte(strings.Join(machines, ", "))) - return nil + return nil } // Handler to return the current version of etcd func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, "etcd %s", releaseVersion) + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "etcd %s", releaseVersion) - return nil + return nil } // Handler to return the basic stats of etcd func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { - option := req.URL.Path[len("/v1/stats/"):] - w.WriteHeader(http.StatusOK) + option := req.URL.Path[len("/v1/stats/"):] + w.WriteHeader(http.StatusOK) - r := e.raftServer + r := e.raftServer - switch option { - case "self": - w.Write(r.Stats()) - case "leader": - if r.State() == raft.Leader { - w.Write(r.PeerStats()) - } else { - leader := r.Leader() - // current no leader - if leader == "" { - return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) - } - hostname, _ := nameToEtcdURL(leader) - redirect(hostname, w, req) - } - case "store": - w.Write(etcdStore.JsonStats()) - } + switch option { + case "self": + w.Write(r.Stats()) + case "leader": + if r.State() == raft.Leader { + w.Write(r.PeerStats()) + } else { + leader := r.Leader() + // current no leader + if leader == "" { + return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) + } + hostname, _ := nameToEtcdURL(leader) + redirect(hostname, w, req) + } + case "store": + w.Write(etcdStore.JsonStats()) + } - return nil + return nil } func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error { - var err error - var event interface{} + var err error + var event interface{} - r := e.raftServer + r := e.raftServer - debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) + debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - if req.FormValue("consistent") == "true" && r.State() != raft.Leader { - // help client to redirect the request to the current leader - leader := r.Leader() - hostname, _ := nameToEtcdURL(leader) - redirect(hostname, w, req) - return nil - } + if req.FormValue("consistent") == "true" && r.State() != raft.Leader { + // help client to redirect the request to the current leader + leader := r.Leader() + hostname, _ := nameToEtcdURL(leader) + redirect(hostname, w, req) + return nil + } - key := getNodePath(req.URL.Path) + key := getNodePath(req.URL.Path) - recursive := req.FormValue("recursive") + recursive := req.FormValue("recursive") - if req.FormValue("wait") == "true" { // watch - command := &WatchCommand{ - Key: key, - } + if req.FormValue("wait") == "true" { // watch + command := &WatchCommand{ + Key: key, + } - if recursive == "true" { - command.Recursive = true - } + if recursive == "true" { + command.Recursive = true + } - indexStr := req.FormValue("wait_index") - if indexStr != "" { - sinceIndex, err := strconv.ParseUint(indexStr, 10, 64) + indexStr := req.FormValue("wait_index") + if indexStr != "" { + sinceIndex, err := strconv.ParseUint(indexStr, 10, 64) - if err != nil { - return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) - } + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) + } - command.SinceIndex = sinceIndex - } + command.SinceIndex = sinceIndex + } - event, err = command.Apply(r.Server) + event, err = command.Apply(r.Server) - } else { //get + } else { //get - command := &GetCommand{ - Key: key, - } + command := &GetCommand{ + Key: key, + } - sorted := req.FormValue("sorted") - if sorted == "true" { - command.Sorted = true - } + sorted := req.FormValue("sorted") + if sorted == "true" { + command.Sorted = true + } - if recursive == "true" { - command.Recursive = true - } + if recursive == "true" { + command.Recursive = true + } - event, err = command.Apply(r.Server) - } + event, err = command.Apply(r.Server) + } - if err != nil { - return err + if err != nil { + return err - } else { - event, _ := event.(*store.Event) - bytes, _ := json.Marshal(event) + } else { + event, _ := event.(*store.Event) + bytes, _ := json.Marshal(event) - w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) - w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) - w.WriteHeader(http.StatusOK) + w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) + w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) + w.WriteHeader(http.StatusOK) - w.Write(bytes) + w.Write(bytes) - return nil - } + return nil + } } // TestHandler func TestHttpHandler(w http.ResponseWriter, req *http.Request) { - testType := req.URL.Path[len("/test/"):] + testType := req.URL.Path[len("/test/"):] - if testType == "speed" { - directSet() - w.WriteHeader(http.StatusOK) - w.Write([]byte("speed test success")) + if testType == "speed" { + directSet() + w.WriteHeader(http.StatusOK) + w.Write([]byte("speed test success")) - return - } + return + } - w.WriteHeader(http.StatusBadRequest) + w.WriteHeader(http.StatusBadRequest) } diff --git a/store/store.go b/store/store.go index 348aea02e..ddf076280 100644 --- a/store/store.go +++ b/store/store.go @@ -401,6 +401,16 @@ func (s *Store) checkDir(parent *Node, dirName string) (*Node, *etcdErr.Error) { return n, nil } +// Returns the number of machines in the cluster. +func (s *Store) MachineCount() int { + e, err := s.Get("/_etcd/machines", false, false, 0, 0) + if err != nil { + return 0 + } + + return len(e.KVPairs) +} + // Save function saves the static state of the store system. // Save function will not be able to save the state of watchers. // Save function will not save the parent field of the node. Or there will diff --git a/util.go b/util.go index 318f728d1..fade01ec6 100644 --- a/util.go +++ b/util.go @@ -15,7 +15,6 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/store" - "github.com/coreos/go-log/log" "github.com/coreos/go-raft" ) @@ -172,45 +171,6 @@ func getNodePath(urlPath string) string { return urlPath[pathPrefixLen:] } -//-------------------------------------- -// Log -//-------------------------------------- - -var logger *log.Logger = log.New("etcd", false, - log.CombinedSink(os.Stdout, "[%s] %s %-9s | %s\n", []string{"prefix", "time", "priority", "message"})) - -func infof(format string, v ...interface{}) { - logger.Infof(format, v...) -} - -func debugf(format string, v ...interface{}) { - if verbose { - logger.Debugf(format, v...) - } -} - -func debug(v ...interface{}) { - if verbose { - logger.Debug(v...) - } -} - -func warnf(format string, v ...interface{}) { - logger.Warningf(format, v...) -} - -func warn(v ...interface{}) { - logger.Warning(v...) -} - -func fatalf(format string, v ...interface{}) { - logger.Fatalf(format, v...) -} - -func fatal(v ...interface{}) { - logger.Fatalln(v...) -} - //-------------------------------------- // CPU profile //--------------------------------------