This commit is contained in:
Xiang Li 2013-09-14 15:22:37 -04:00
commit 2d7c1be164
14 changed files with 414 additions and 54 deletions

View File

@ -187,17 +187,17 @@ The watch command returns immediately with the same response as previous.
Etcd can be used as a centralized coordination service in a cluster and `TestAndSet` is the most basic operation to build distributed lock service. This command will set the value only if the client provided `prevValue` is equal the current key value.
Here is a simple example. Let's create a key-value pair first: `testAndSet=one`.
Here is a simple example. Let's create a key-value pair first: `foo=one`.
```sh
curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d value=one
curl -L http://127.0.0.1:4001/v1/keys/foo -d value=one
```
Let's try an invaild `TestAndSet` command.
Let's try an invalid `TestAndSet` command.
We can give another parameter prevValue to set command to make it a TestAndSet command.
```sh
curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d prevValue=two -d value=three
curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=two -d value=three
```
This will try to test if the previous of the key is two, it is change it to three.
@ -208,16 +208,16 @@ This will try to test if the previous of the key is two, it is change it to thre
which means `testAndSet` failed.
Let us try a vaild one.
Let us try a valid one.
```sh
curl -L http://127.0.0.1:4001/v1/keys/testAndSet -d prevValue=one -d value=two
curl -L http://127.0.0.1:4001/v1/keys/foo -d prevValue=one -d value=two
```
The response should be
```json
{"action":"SET","key":"/testAndSet","prevValue":"one","value":"two","index":10}
{"action":"SET","key":"/foo","prevValue":"one","value":"two","index":10}
```
We successfully changed the value from “one” to “two”, since we give the correct previous value.
@ -465,6 +465,16 @@ If you are using SSL for server to server communication, you must use it on all
- [go-etcd](https://github.com/coreos/go-etcd)
**Java libraries**
- [justinsb/jetcd](https://github.com/justinsb/jetcd)
- [diwakergupta/jetcd](https://github.com/diwakergupta/jetcd)
**Python libraries**
- [transitorykris/etcd-py](https://github.com/transitorykris/etcd-py)
**Node libraries**
- [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd)
@ -487,6 +497,19 @@ If you are using SSL for server to server communication, you must use it on all
- [mattn/etcd-vim](https://github.com/mattn/etcd-vim) - SET and GET keys from inside vim
- [mattn/etcdenv](https://github.com/mattn/etcdenv) - "env" shebang with etcd integration
## FAQ
### What size cluster should I use?
Every command the client sends to the master is broadcast it to all of the followers.
But, the command is not be committed until the majority of the cluster machines receive that command.
Because of this majority voting property the ideal cluster should be kept small to keep speed up and be made up of an odd number of machines.
Odd numbers are good because if you have 8 machines the majority will be 5 and if you have 9 machines the majority with be 5.
The result is that an 8 machine cluster can tolerate 3 machine failures and a 9 machine cluster can tolerate 4 nodes failures.
And in the best case when all 9 machines are responding the cluster will perform at the speed of the fastest 5 nodes.
## Project Details
### Versioning

View File

@ -239,6 +239,10 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
if c.Name != r.Name() {
r.peersStats[c.Name] = &raftPeerStats{MinLatency: 1 << 63}
}
return b, err
}
@ -263,6 +267,7 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
key := path.Join("_etcd/machines", c.Name)
_, err := etcdStore.Delete(key, raftServer.CommitIndex())
delete(r.peersStats, c.Name)
if err != nil {
return []byte{0}, err

27
etcd.go
View File

@ -3,7 +3,9 @@ package main
import (
"crypto/tls"
"flag"
"fmt"
"io/ioutil"
"net/url"
"os"
"strings"
"time"
@ -42,6 +44,9 @@ var (
maxClusterSize int
cpuprofile string
cors string
corsList map[string]bool
)
func init() {
@ -79,6 +84,8 @@ func init() {
flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
flag.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')")
}
const (
@ -155,6 +162,8 @@ func main() {
raft.SetLogLevel(raft.Debug)
}
parseCorsFlag()
if machines != "" {
cluster = strings.Split(machines, ",")
} else if machinesFile != "" {
@ -211,3 +220,21 @@ func main() {
e.ListenAndServe()
}
// parseCorsFlag gathers up the cors whitelist and puts it into the corsList.
func parseCorsFlag() {
if cors != "" {
corsList = make(map[string]bool)
list := strings.Split(cors, ",")
for _, v := range list {
fmt.Println(v)
if v != "*" {
_, err := url.Parse(v)
if err != nil {
panic(fmt.Sprintf("bad cors url: %s", err))
}
}
corsList[v] = true
}
}
}

View File

@ -28,7 +28,26 @@ func NewEtcdMuxer() *http.ServeMux {
type errorHandler func(http.ResponseWriter, *http.Request) error
// addCorsHeader parses the request Origin header and loops through the user
// provided allowed origins and sets the Access-Control-Allow-Origin header if
// there is a match.
func addCorsHeader(w http.ResponseWriter, r *http.Request) {
val, ok := corsList["*"]
if val && ok {
w.Header().Add("Access-Control-Allow-Origin", "*")
return
}
requestOrigin := r.Header.Get("Origin")
val, ok = corsList[requestOrigin]
if val && ok {
w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
return
}
}
func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
addCorsHeader(w, r)
if e := fn(w, r); e != nil {
if etcdErr, ok := e.(etcdErr.Error); ok {
debug("Return error: ", etcdErr.Error())
@ -68,7 +87,9 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
value := req.FormValue("value")
req.ParseForm()
value := req.Form.Get("value")
ttl := req.FormValue("ttl")
@ -248,6 +269,7 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
w.Write(etcdStore.Stats())
w.Write(r.Stats())
return nil
}

View File

@ -31,7 +31,7 @@ func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSCo
// Start to listen and response etcd client command
func (e *etcdServer) ListenAndServe() {
infof("etcd server [%s:%s]", e.name, e.url)
infof("etcd server [name %s, listen on %s, advertised url %s]", e.name, e.Server.Addr, e.url)
if e.tlsConf.Scheme == "http" {
fatal(e.Server.ListenAndServe())

View File

@ -55,6 +55,32 @@ func TestSingleNode(t *testing.T) {
}
t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL)
}
// Add a test-and-set test
// First, we'll test we can change the value if we get it write
result, match, err := c.TestAndSet("foo", "bar", "foobar", 100)
if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 99 || !match {
if err != nil {
t.Fatal(err)
}
t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL)
}
// Next, we'll make sure we can't set it without the correct prior value
_, _, err = c.TestAndSet("foo", "bar", "foofoo", 100)
if err == nil {
t.Fatalf("Set 4 expecting error when setting key with incorrect previous value")
}
// Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match
_, _, err = c.TestAndSet("foo", "", "barbar", 100)
if err == nil {
t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value")
}
}
// TestInternalVersionFail will ensure that etcd does not come up if the internal raft

View File

@ -42,6 +42,9 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
if err == nil {
debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
if resp := r.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)

View File

@ -6,23 +6,26 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/go-raft"
"io/ioutil"
"net/http"
"net/url"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/go-raft"
)
type raftServer struct {
*raft.Server
version string
joinIndex uint64
name string
url string
listenHost string
tlsConf *TLSConfig
tlsInfo *TLSInfo
version string
joinIndex uint64
name string
url string
listenHost string
tlsConf *TLSConfig
tlsInfo *TLSInfo
peersStats map[string]*raftPeerStats
serverStats *raftServerStats
}
var r *raftServer
@ -45,6 +48,16 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
listenHost: listenHost,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
peersStats: make(map[string]*raftPeerStats),
serverStats: &raftServerStats{
StartTime: time.Now(),
sendRateQueue: &statsQueue{
back: -1,
},
recvRateQueue: &statsQueue{
back: -1,
},
},
}
}
@ -93,7 +106,7 @@ func (r *raftServer) ListenAndServe() {
}
ok := joinCluster(cluster)
if !ok {
warn("the whole cluster dies! restart the cluster")
warn("the entire cluster is down! this machine will restart the cluster.")
}
debugf("%s restart as a follower", r.name)
@ -136,7 +149,7 @@ func startAsFollower() {
// Start to listen and response raft command
func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
infof("raft server [%s:%s]", r.name, r.listenHost)
infof("raft server [name %s, listen on %s, advertised url %s]", r.name, r.listenHost, r.url)
raftMux := http.NewServeMux()
@ -268,6 +281,33 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
return fmt.Errorf("Unable to join: %v", err)
}
func (r *raftServer) Stats() []byte {
r.serverStats.LeaderUptime = time.Now().Sub(r.serverStats.leaderStartTime).String()
queue := r.serverStats.sendRateQueue
r.serverStats.SendingPkgRate, r.serverStats.SendingBandwidthRate = queue.Rate()
queue = r.serverStats.recvRateQueue
r.serverStats.RecvingPkgRate, r.serverStats.RecvingBandwidthRate = queue.Rate()
sBytes, err := json.Marshal(r.serverStats)
if err != nil {
warn(err)
}
if r.State() == raft.Leader {
pBytes, _ := json.Marshal(r.peersStats)
b := append(sBytes, pBytes...)
return b
}
return sBytes
}
// Register commands to raft server
func registerCommands() {
raft.RegisterCommand(&JoinCommand{})

195
raft_stats.go Normal file
View File

@ -0,0 +1,195 @@
package main
import (
"math"
"sync"
"time"
"github.com/coreos/go-raft"
)
const (
queueCapacity = 200
)
// packageStats represent the stats we need for a package.
// It has sending time and the size of the package.
type packageStats struct {
sendingTime time.Time
size int
}
// NewPackageStats creates a pacakgeStats and return the pointer to it.
func NewPackageStats(now time.Time, size int) *packageStats {
return &packageStats{
sendingTime: now,
size: size,
}
}
// Time return the sending time of the package.
func (ps *packageStats) Time() time.Time {
return ps.sendingTime
}
type raftServerStats struct {
State string `json:"state"`
StartTime time.Time `json:"startTime"`
Leader string `json:"leader"`
LeaderUptime string `json:"leaderUptime"`
RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"`
RecvingPkgRate float64 `json:"recvPkgRate,omitempty"`
RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"`
SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"`
SendingPkgRate float64 `json:"sendPkgRate,omitempty"`
SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
leaderStartTime time.Time
sendRateQueue *statsQueue
recvRateQueue *statsQueue
}
func (ss *raftServerStats) RecvAppendReq(leaderName string, pkgSize int) {
ss.State = raft.Follower
if leaderName != ss.Leader {
ss.Leader = leaderName
ss.leaderStartTime = time.Now()
}
ss.recvRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
ss.RecvAppendRequestCnt++
}
func (ss *raftServerStats) SendAppendReq(pkgSize int) {
now := time.Now()
if ss.State != raft.Leader {
ss.State = raft.Leader
ss.Leader = r.Name()
ss.leaderStartTime = now
}
ss.sendRateQueue.Insert(NewPackageStats(time.Now(), pkgSize))
ss.SendAppendRequestCnt++
}
type raftPeerStats struct {
Latency float64 `json:"latency"`
AvgLatency float64 `json:"averageLatency"`
avgLatencySquare float64
SdvLatency float64 `json:"sdvLatency"`
MinLatency float64 `json:"minLatency"`
MaxLatency float64 `json:"maxLatency"`
FailCnt uint64 `json:"failsCount"`
SuccCnt uint64 `json:"successCount"`
}
// Succ function update the raftPeerStats with a successful send
func (ps *raftPeerStats) Succ(d time.Duration) {
total := float64(ps.SuccCnt) * ps.AvgLatency
totalSquare := float64(ps.SuccCnt) * ps.avgLatencySquare
ps.SuccCnt++
ps.Latency = float64(d) / (1000000.0)
if ps.Latency > ps.MaxLatency {
ps.MaxLatency = ps.Latency
}
if ps.Latency < ps.MinLatency {
ps.MinLatency = ps.Latency
}
ps.AvgLatency = (total + ps.Latency) / float64(ps.SuccCnt)
ps.avgLatencySquare = (totalSquare + ps.Latency*ps.Latency) / float64(ps.SuccCnt)
// sdv = sqrt(avg(x^2) - avg(x)^2)
ps.SdvLatency = math.Sqrt(ps.avgLatencySquare - ps.AvgLatency*ps.AvgLatency)
}
// Fail function update the raftPeerStats with a unsuccessful send
func (ps *raftPeerStats) Fail() {
ps.FailCnt++
}
type statsQueue struct {
items [queueCapacity]*packageStats
size int
front int
back int
totalPkgSize int
rwl sync.RWMutex
}
func (q *statsQueue) Len() int {
return q.size
}
func (q *statsQueue) PkgSize() int {
return q.totalPkgSize
}
// FrontAndBack gets the front and back elements in the queue
// We must grab front and back together with the protection of the lock
func (q *statsQueue) frontAndBack() (*packageStats, *packageStats) {
q.rwl.RLock()
defer q.rwl.RUnlock()
if q.size != 0 {
return q.items[q.front], q.items[q.back]
}
return nil, nil
}
// Insert function insert a packageStats into the queue and update the records
func (q *statsQueue) Insert(p *packageStats) {
q.rwl.Lock()
defer q.rwl.Unlock()
q.back = (q.back + 1) % queueCapacity
if q.size == queueCapacity { //dequeue
q.totalPkgSize -= q.items[q.front].size
q.front = (q.back + 1) % queueCapacity
} else {
q.size++
}
q.items[q.back] = p
q.totalPkgSize += q.items[q.back].size
}
// Rate function returns the package rate and byte rate
func (q *statsQueue) Rate() (float64, float64) {
front, back := q.frontAndBack()
if front == nil || back == nil {
return 0, 0
}
if time.Now().Sub(back.Time()) > time.Second {
q.Clear()
return 0, 0
}
sampleDuration := back.Time().Sub(front.Time())
pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
br := float64(q.PkgSize()) / float64(sampleDuration) * float64(time.Second)
return pr, br
}
// Clear function clear up the statsQueue
func (q *statsQueue) Clear() {
q.rwl.Lock()
defer q.rwl.Unlock()
q.back = -1
q.front = 0
q.size = 0
q.totalPkgSize = 0
}

View File

@ -3,11 +3,12 @@ package store
import (
"encoding/json"
"fmt"
etcdErr "github.com/coreos/etcd/error"
"path"
"strconv"
"sync"
"time"
etcdErr "github.com/coreos/etcd/error"
)
//------------------------------------------------------------------------------

View File

@ -1781,7 +1781,7 @@ func isASCIIDigit(c byte) bool {
// but it's so remote we're prepared to pretend it's nonexistent - since the
// C++ generator lowercases names, it's extremely unlikely to have two fields
// with different capitalizations.
// In short, _my_field_name_2 becomes XMyFieldName2.
// In short, _my_field_name_2 becomes XMyFieldName_2.
func CamelCase(s string) string {
if s == "" {
return ""

View File

@ -927,26 +927,25 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
// Adds a peer to the server.
func (s *Server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
defer s.writeConf()
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
}
// Skip the Peer if it has the same name as the Server
if s.name == name {
return nil
if s.name != name {
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.Name] = peer
}
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.Name] = peer
s.debugln("server.peer.conf.write: ", name)
// Write the configuration to file.
s.writeConf()
return nil
}
@ -955,26 +954,24 @@ func (s *Server) AddPeer(name string, connectiongString string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
defer s.writeConf()
// Skip the Peer if it has the same name as the Server
if name != s.Name() {
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
return fmt.Errorf("raft: Peer not found: %s", name)
}
if name == s.Name() {
// when the removed node restart, it should be able
// to know it has been removed before. So we need
// to update knownCommitIndex
return nil
}
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
return fmt.Errorf("raft: Peer not found: %s", name)
// Stop peer and remove it.
if s.State() == Leader {
peer.stopHeartbeat(true)
}
delete(s.peers, name)
}
// Stop peer and remove it.
if s.State() == Leader {
peer.stopHeartbeat(true)
}
delete(s.peers, name)
// Write the configuration to file.
s.writeConf()
return nil
}

View File

@ -6,7 +6,6 @@ import (
"fmt"
"hash/crc32"
"os"
"syscall"
)
//------------------------------------------------------------------------------
@ -54,7 +53,7 @@ func (ss *Snapshot) save() error {
}
// force the change writting to disk
syscall.Fsync(int(file.Fd()))
file.Sync()
return err
}

View File

@ -5,11 +5,12 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/coreos/go-raft"
"io"
"net"
"net/http"
"time"
"github.com/coreos/go-raft"
)
// Transporter layer for communication between raft nodes
@ -54,17 +55,38 @@ func dialTimeout(network, addr string) (net.Conn, error) {
func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
size := b.Len()
r.serverStats.SendAppendReq(size)
u, _ := nameToRaftURL(peer.Name)
debugf("Send LogEntries to %s ", u)
thisPeerStats, ok := r.peersStats[peer.Name]
start := time.Now()
resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
end := time.Now()
if err != nil {
debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok {
thisPeerStats.Fail()
}
} else {
if ok {
thisPeerStats.Succ(end.Sub(start))
}
}
r.peersStats[peer.Name] = thisPeerStats
if resp != nil {
defer resp.Body.Close()
aersp = &raft.AppendEntriesResponse{}