diff --git a/README.md b/README.md index cf883b0d1..7dbd55ae7 100644 --- a/README.md +++ b/README.md @@ -187,17 +187,17 @@ The watch command returns immediately with the same response as previous. Etcd can be used as a centralized coordination service in a cluster and `TestAndSet` is the most basic operation to build distributed lock service. This command will set the value only if the client provided `prevValue` is equal the current key value. -Here is a simple example. Let's create a key-value pair first: `testAndSet=one`. +Here is a simple example. Let's create a key-value pair first: `foo=one`. ```sh -curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d value=one +curl -L http://127.0.0.1:4001/v1/keys/foo -d value=one ``` -Let's try an invaild `TestAndSet` command. +Let's try an invalid `TestAndSet` command. We can give another parameter prevValue to set command to make it a TestAndSet command. ```sh -curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d prevValue=two -d value=three +curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=two -d value=three ``` This will try to test if the previous of the key is two, it is change it to three. @@ -208,16 +208,16 @@ This will try to test if the previous of the key is two, it is change it to thre which means `testAndSet` failed. -Let us try a vaild one. +Let us try a valid one. ```sh -curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d prevValue=one -d value=two +curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=one -d value=two ``` The response should be ```json -{"action":"SET","key":"/testAndSet","prevValue":"one","value":"two","index":10} +{"action":"SET","key":"/foo","prevValue":"one","value":"two","index":10} ``` We successfully changed the value from “one” to “two”, since we give the correct previous value. @@ -465,6 +465,16 @@ If you are using SSL for server to server communication, you must use it on all - [go-etcd](https://github.com/coreos/go-etcd) +**Java libraries** + +- [justinsb/jetcd](https://github.com/justinsb/jetcd) +- [diwakergupta/jetcd](https://github.com/diwakergupta/jetcd) + + +**Python libraries** + +- [transitorykris/etcd-py](https://github.com/transitorykris/etcd-py) + **Node libraries** - [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd) @@ -487,6 +497,19 @@ If you are using SSL for server to server communication, you must use it on all - [mattn/etcd-vim](https://github.com/mattn/etcd-vim) - SET and GET keys from inside vim - [mattn/etcdenv](https://github.com/mattn/etcdenv) - "env" shebang with etcd integration +## FAQ + +### What size cluster should I use? + +Every command the client sends to the master is broadcast it to all of the followers. +But, the command is not be committed until the majority of the cluster machines receive that command. + +Because of this majority voting property the ideal cluster should be kept small to keep speed up and be made up of an odd number of machines. + +Odd numbers are good because if you have 8 machines the majority will be 5 and if you have 9 machines the majority with be 5. +The result is that an 8 machine cluster can tolerate 3 machine failures and a 9 machine cluster can tolerate 4 nodes failures. +And in the best case when all 9 machines are responding the cluster will perform at the speed of the fastest 5 nodes. + ## Project Details ### Versioning diff --git a/command.go b/command.go index 0a7cc1c97..0e4a58bc1 100644 --- a/command.go +++ b/command.go @@ -239,6 +239,10 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) { value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion) etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex()) + if c.Name != r.Name() { + r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63} + } + return b, err } @@ -263,6 +267,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) { key := path.Join("_etcd/machines", c.Name) _, err := etcdStore.Delete(key, raftServer.CommitIndex()) + delete(r.peersStats, c.Name) if err != nil { return []byte{0}, err diff --git a/etcd.go b/etcd.go index bffb4e02e..bd2144183 100644 --- a/etcd.go +++ b/etcd.go @@ -3,7 +3,9 @@ package main import ( "crypto/tls" "flag" + "fmt" "io/ioutil" + "net/url" "os" "strings" "time" @@ -42,6 +44,9 @@ var ( maxClusterSize int cpuprofile string + + cors string + corsList map[string]bool ) func init() { @@ -79,6 +84,8 @@ func init() { flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster") flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file") + + flag.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')") } const ( @@ -155,6 +162,8 @@ func main() { raft.SetLogLevel(raft.Debug) } + parseCorsFlag() + if machines != "" { cluster = strings.Split(machines, ",") } else if machinesFile != "" { @@ -211,3 +220,21 @@ func main() { e.ListenAndServe() } + +// parseCorsFlag gathers up the cors whitelist and puts it into the corsList. +func parseCorsFlag() { + if cors != "" { + corsList = make(map[string]bool) + list := strings.Split(cors, ",") + for _, v := range list { + fmt.Println(v) + if v != "*" { + _, err := url.Parse(v) + if err != nil { + panic(fmt.Sprintf("bad cors url: %s", err)) + } + } + corsList[v] = true + } + } +} diff --git a/etcd_handlers.go b/etcd_handlers.go index e1c6a8b9e..2faebc641 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -28,7 +28,26 @@ func NewEtcdMuxer() *http.ServeMux { type errorHandler func(http.ResponseWriter, *http.Request) error +// addCorsHeader parses the request Origin header and loops through the user +// provided allowed origins and sets the Access-Control-Allow-Origin header if +// there is a match. +func addCorsHeader(w http.ResponseWriter, r *http.Request) { + val, ok := corsList["*"] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", "*") + return + } + + requestOrigin := r.Header.Get("Origin") + val, ok = corsList[requestOrigin] + if val && ok { + w.Header().Add("Access-Control-Allow-Origin", requestOrigin) + return + } +} + func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + addCorsHeader(w, r) if e := fn(w, r); e != nil { if etcdErr, ok := e.(etcdErr.Error); ok { debug("Return error: ", etcdErr.Error()) @@ -68,7 +87,9 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error { debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL) - value := req.FormValue("value") + req.ParseForm() + + value := req.Form.Get("value") ttl := req.FormValue("ttl") @@ -248,6 +269,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error { func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error { w.WriteHeader(http.StatusOK) w.Write(etcdStore.Stats()) + w.Write(r.Stats()) return nil } diff --git a/etcd_server.go b/etcd_server.go index 2cef01558..0139b03c2 100644 --- a/etcd_server.go +++ b/etcd_server.go @@ -31,7 +31,7 @@ func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSCo // Start to listen and response etcd client command func (e *etcdServer) ListenAndServe() { - infof("etcd server [%s:%s]", e.name, e.url) + infof("etcd server [name %s, listen on %s, advertised url %s]", e.name, e.Server.Addr, e.url) if e.tlsConf.Scheme == "http" { fatal(e.Server.ListenAndServe()) diff --git a/etcd_test.go b/etcd_test.go index caa6af84d..32c320ad6 100644 --- a/etcd_test.go +++ b/etcd_test.go @@ -55,6 +55,32 @@ func TestSingleNode(t *testing.T) { } t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL) } + + // Add a test-and-set test + + // First, we'll test we can change the value if we get it write + result, match, err := c.TestAndSet("foo", "bar", "foobar", 100) + + if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 99 || !match { + if err != nil { + t.Fatal(err) + } + t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL) + } + + // Next, we'll make sure we can't set it without the correct prior value + _, _, err = c.TestAndSet("foo", "bar", "foofoo", 100) + + if err == nil { + t.Fatalf("Set 4 expecting error when setting key with incorrect previous value") + } + + // Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match + _, _, err = c.TestAndSet("foo", "", "barbar", 100) + + if err == nil { + t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value") + } } // TestInternalVersionFail will ensure that etcd does not come up if the internal raft diff --git a/raft_handlers.go b/raft_handlers.go index 1a92560ec..4fff0780d 100644 --- a/raft_handlers.go +++ b/raft_handlers.go @@ -42,6 +42,9 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { if err == nil { debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries)) + + r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) + if resp := r.AppendEntries(aereq); resp != nil { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) diff --git a/raft_server.go b/raft_server.go index 1175f4665..628136479 100644 --- a/raft_server.go +++ b/raft_server.go @@ -6,23 +6,26 @@ import ( "encoding/binary" "encoding/json" "fmt" - etcdErr "github.com/coreos/etcd/error" - "github.com/coreos/go-raft" "io/ioutil" "net/http" "net/url" "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/go-raft" ) type raftServer struct { *raft.Server - version string - joinIndex uint64 - name string - url string - listenHost string - tlsConf *TLSConfig - tlsInfo *TLSInfo + version string + joinIndex uint64 + name string + url string + listenHost string + tlsConf *TLSConfig + tlsInfo *TLSInfo + peersStats map[string]*raftPeerStats + serverStats *raftServerStats } var r *raftServer @@ -45,6 +48,16 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi listenHost: listenHost, tlsConf: tlsConf, tlsInfo: tlsInfo, + peersStats: make(map[string]*raftPeerStats), + serverStats: &raftServerStats{ + StartTime: time.Now(), + sendRateQueue: &statsQueue{ + back: -1, + }, + recvRateQueue: &statsQueue{ + back: -1, + }, + }, } } @@ -93,7 +106,7 @@ func (r *raftServer) ListenAndServe() { } ok := joinCluster(cluster) if !ok { - warn("the whole cluster dies! restart the cluster") + warn("the entire cluster is down! this machine will restart the cluster.") } debugf("%s restart as a follower", r.name) @@ -136,7 +149,7 @@ func startAsFollower() { // Start to listen and response raft command func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) { - infof("raft server [%s:%s]", r.name, r.listenHost) + infof("raft server [name %s, listen on %s, advertised url %s]", r.name, r.listenHost, r.url) raftMux := http.NewServeMux() @@ -268,6 +281,33 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error { return fmt.Errorf("Unable to join: %v", err) } +func (r *raftServer) Stats() []byte { + r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String() + + queue := r.serverStats.sendRateQueue + + r.serverStats.SendingPkgRate, r.serverStats.SendingBandwidthRate = queue.Rate() + + queue = r.serverStats.recvRateQueue + + r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate() + + sBytes, err := json.Marshal(r.serverStats) + + if err != nil { + warn(err) + } + + if r.State() == raft.Leader { + pBytes, _ := json.Marshal(r.peersStats) + + b := append(sBytes, pBytes...) + return b + } + + return sBytes +} + // Register commands to raft server func registerCommands() { raft.RegisterCommand(&JoinCommand{}) diff --git a/raft_stats.go b/raft_stats.go new file mode 100644 index 000000000..175a1be55 --- /dev/null +++ b/raft_stats.go @@ -0,0 +1,195 @@ +package main + +import ( + "math" + "sync" + "time" + + "github.com/coreos/go-raft" +) + +const ( + queueCapacity = 200 +) + +// packageStats represent the stats we need for a package. +// It has sending time and the size of the package. +type packageStats struct { + sendingTime time.Time + size int +} + +// NewPackageStats creates a pacakgeStats and return the pointer to it. +func NewPackageStats(now time.Time, size int) *packageStats { + return &packageStats{ + sendingTime: now, + size: size, + } +} + +// Time return the sending time of the package. +func (ps *packageStats) Time() time.Time { + return ps.sendingTime +} + +type raftServerStats struct { + State string `json:"state"` + StartTime time.Time `json:"startTime"` + Leader string `json:"leader"` + LeaderUptime string `json:"leaderUptime"` + + RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"` + RecvingPkgRate float64 `json:"recvPkgRate,omitempty"` + RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"` + + SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"` + SendingPkgRate float64 `json:"sendPkgRate,omitempty"` + SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"` + + leaderStartTime time.Time + sendRateQueue *statsQueue + recvRateQueue *statsQueue +} + +func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) { + ss.State = raft.Follower + if leaderName != ss.Leader { + ss.Leader = leaderName + ss.leaderStartTime = time.Now() + } + + ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) + ss.RecvAppendRequestCnt++ +} + +func (ss *raftServerStats) SendAppendReq(pkgSize int) { + now := time.Now() + if ss.State != raft.Leader { + ss.State = raft.Leader + ss.Leader = r.Name() + ss.leaderStartTime = now + } + + ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize)) + + ss.SendAppendRequestCnt++ +} + +type raftPeerStats struct { + Latency float64 `json:"latency"` + AvgLatency float64 `json:"averageLatency"` + avgLatencySquare float64 + SdvLatency float64 `json:"sdvLatency"` + MinLatency float64 `json:"minLatency"` + MaxLatency float64 `json:"maxLatency"` + FailCnt uint64 `json:"failsCount"` + SuccCnt uint64 `json:"successCount"` +} + +// Succ function update the raftPeerStats with a successful send +func (ps *raftPeerStats) Succ(d time.Duration) { + total := float64(ps.SuccCnt) * ps.AvgLatency + totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare + + ps.SuccCnt++ + + ps.Latency = float64(d) / (1000000.0) + + if ps.Latency > ps.MaxLatency { + ps.MaxLatency = ps.Latency + } + + if ps.Latency < ps.MinLatency { + ps.MinLatency = ps.Latency + } + + ps.AvgLatency = (total + ps.Latency) / float64(ps.SuccCnt) + ps.avgLatencySquare = (totalSquare + ps.Latency*ps.Latency) / float64(ps.SuccCnt) + + // sdv = sqrt(avg(x^2) - avg(x)^2) + ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency) +} + +// Fail function update the raftPeerStats with a unsuccessful send +func (ps *raftPeerStats) Fail() { + ps.FailCnt++ +} + +type statsQueue struct { + items [queueCapacity]*packageStats + size int + front int + back int + totalPkgSize int + rwl sync.RWMutex +} + +func (q *statsQueue) Len() int { + return q.size +} + +func (q *statsQueue) PkgSize() int { + return q.totalPkgSize +} + +// FrontAndBack gets the front and back elements in the queue +// We must grab front and back together with the protection of the lock +func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) { + q.rwl.RLock() + defer q.rwl.RUnlock() + if q.size != 0 { + return q.items[q.front], q.items[q.back] + } + return nil, nil +} + +// Insert function insert a packageStats into the queue and update the records +func (q *statsQueue) Insert(p *packageStats) { + q.rwl.Lock() + defer q.rwl.Unlock() + + q.back = (q.back + 1) % queueCapacity + + if q.size == queueCapacity { //dequeue + q.totalPkgSize -= q.items[q.front].size + q.front = (q.back + 1) % queueCapacity + } else { + q.size++ + } + + q.items[q.back] = p + q.totalPkgSize += q.items[q.back].size + +} + +// Rate function returns the package rate and byte rate +func (q *statsQueue) Rate() (float64, float64) { + front, back := q.frontAndBack() + + if front == nil || back == nil { + return 0, 0 + } + + if time.Now().Sub(back.Time()) > time.Second { + q.Clear() + return 0, 0 + } + + sampleDuration := back.Time().Sub(front.Time()) + + pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second) + + br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second) + + return pr, br +} + +// Clear function clear up the statsQueue +func (q *statsQueue) Clear() { + q.rwl.Lock() + defer q.rwl.Unlock() + q.back = -1 + q.front = 0 + q.size = 0 + q.totalPkgSize = 0 +} diff --git a/store/store.go b/store/store.go index e4eb79db0..916c1394e 100644 --- a/store/store.go +++ b/store/store.go @@ -3,11 +3,12 @@ package store import ( "encoding/json" "fmt" - etcdErr "github.com/coreos/etcd/error" "path" "strconv" "sync" "time" + + etcdErr "github.com/coreos/etcd/error" ) //------------------------------------------------------------------------------ diff --git a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go index 5097ea57e..413f3614e 100644 --- a/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go +++ b/third_party/code.google.com/p/goprotobuf/protoc-gen-go/generator/generator.go @@ -1781,7 +1781,7 @@ func isASCIIDigit(c byte) bool { // but it's so remote we're prepared to pretend it's nonexistent - since the // C++ generator lowercases names, it's extremely unlikely to have two fields // with different capitalizations. -// In short, _my_field_name_2 becomes XMyFieldName2. +// In short, _my_field_name_2 becomes XMyFieldName_2. func CamelCase(s string) string { if s == "" { return "" diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index c3d3fbd46..b4dab92ae 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -927,26 +927,25 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot // Adds a peer to the server. func (s *Server) AddPeer(name string, connectiongString string) error { s.debugln("server.peer.add: ", name, len(s.peers)) - defer s.writeConf() + // Do not allow peers to be added twice. if s.peers[name] != nil { return nil } // Skip the Peer if it has the same name as the Server - if s.name == name { - return nil + if s.name != name { + peer := newPeer(s, name, connectiongString, s.heartbeatTimeout) + + if s.State() == Leader { + peer.startHeartbeat() + } + + s.peers[peer.Name] = peer } - peer := newPeer(s, name, connectiongString, s.heartbeatTimeout) - - if s.State() == Leader { - peer.startHeartbeat() - } - - s.peers[peer.Name] = peer - - s.debugln("server.peer.conf.write: ", name) + // Write the configuration to file. + s.writeConf() return nil } @@ -955,26 +954,24 @@ func (s *Server) AddPeer(name string, connectiongString string) error { func (s *Server) RemovePeer(name string) error { s.debugln("server.peer.remove: ", name, len(s.peers)) - defer s.writeConf() + // Skip the Peer if it has the same name as the Server + if name != s.Name() { + // Return error if peer doesn't exist. + peer := s.peers[name] + if peer == nil { + return fmt.Errorf("raft: Peer not found: %s", name) + } - if name == s.Name() { - // when the removed node restart, it should be able - // to know it has been removed before. So we need - // to update knownCommitIndex - return nil - } - // Return error if peer doesn't exist. - peer := s.peers[name] - if peer == nil { - return fmt.Errorf("raft: Peer not found: %s", name) + // Stop peer and remove it. + if s.State() == Leader { + peer.stopHeartbeat(true) + } + + delete(s.peers, name) } - // Stop peer and remove it. - if s.State() == Leader { - peer.stopHeartbeat(true) - } - - delete(s.peers, name) + // Write the configuration to file. + s.writeConf() return nil } diff --git a/third_party/github.com/coreos/go-raft/snapshot.go b/third_party/github.com/coreos/go-raft/snapshot.go index fd41c08f0..93b1a97cd 100644 --- a/third_party/github.com/coreos/go-raft/snapshot.go +++ b/third_party/github.com/coreos/go-raft/snapshot.go @@ -6,7 +6,6 @@ import ( "fmt" "hash/crc32" "os" - "syscall" ) //------------------------------------------------------------------------------ @@ -54,7 +53,7 @@ func (ss *Snapshot) save() error { } // force the change writting to disk - syscall.Fsync(int(file.Fd())) + file.Sync() return err } diff --git a/transporter.go b/transporter.go index b4564742c..461741ce6 100644 --- a/transporter.go +++ b/transporter.go @@ -5,11 +5,12 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/coreos/go-raft" "io" "net" "net/http" "time" + + "github.com/coreos/go-raft" ) // Transporter layer for communication between raft nodes @@ -54,17 +55,38 @@ func dialTimeout(network, addr string) (net.Conn, error) { func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var aersp *raft.AppendEntriesResponse var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + size := b.Len() + + r.serverStats.SendAppendReq(size) + u, _ := nameToRaftURL(peer.Name) + debugf("Send LogEntries to %s ", u) + thisPeerStats, ok := r.peersStats[peer.Name] + + start := time.Now() + resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) + end := time.Now() + if err != nil { debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) + if ok { + thisPeerStats.Fail() + } + } else { + if ok { + thisPeerStats.Succ(end.Sub(start)) + } } + r.peersStats[peer.Name] = thisPeerStats + if resp != nil { defer resp.Body.Close() aersp = &raft.AppendEntriesResponse{}