Merge pull request #1 from xiangli-cmu/refactoring

fix server/server.go accept name when creating a when server; gofmt
This commit is contained in:
Ben Johnson 2013-10-13 13:53:34 -07:00
commit 2b9c4bc90d
24 changed files with 897 additions and 907 deletions

View File

@ -32,7 +32,6 @@ func TestSingleNode(t *testing.T) {
time.Sleep(time.Second) time.Sleep(time.Second)
etcd.OpenDebug()
c := etcd.NewClient() c := etcd.NewClient()
c.SyncCluster() c.SyncCluster()

View File

@ -14,18 +14,18 @@ func init() {
// The JoinCommand adds a node to the cluster. // The JoinCommand adds a node to the cluster.
type JoinCommand struct { type JoinCommand struct {
RaftVersion string `json:"raftVersion"` RaftVersion string `json:"raftVersion"`
Name string `json:"name"` Name string `json:"name"`
RaftURL string `json:"raftURL"` RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"` EtcdURL string `json:"etcdURL"`
} }
func NewJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand { func NewJoinCommand(version, name, raftUrl, etcdUrl string) *JoinCommand {
return &JoinCommand{ return &JoinCommand{
RaftVersion: version, RaftVersion: version,
Name: name, Name: name,
RaftURL: raftUrl, RaftURL: raftUrl,
EtcdURL: etcdUrl, EtcdURL: etcdUrl,
} }
} }

View File

@ -1,363 +1,362 @@
package server package server
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"time" "time"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/go-raft" "github.com/coreos/go-raft"
) )
type PeerServer struct { type PeerServer struct {
*raft.Server *raft.Server
server *Server server *Server
joinIndex uint64 joinIndex uint64
name string name string
url string url string
listenHost string listenHost string
tlsConf *TLSConfig tlsConf *TLSConfig
tlsInfo *TLSInfo tlsInfo *TLSInfo
followersStats *raftFollowersStats followersStats *raftFollowersStats
serverStats *raftServerStats serverStats *raftServerStats
registry *Registry registry *Registry
store *store.Store store *store.Store
snapConf *snapshotConf snapConf *snapshotConf
MaxClusterSize int MaxClusterSize int
RetryTimes int RetryTimes int
} }
// TODO: find a good policy to do snapshot // TODO: find a good policy to do snapshot
type snapshotConf struct { type snapshotConf struct {
// Etcd will check if snapshot is need every checkingInterval // Etcd will check if snapshot is need every checkingInterval
checkingInterval time.Duration checkingInterval time.Duration
// The number of writes when the last snapshot happened // The number of writes when the last snapshot happened
lastWrites uint64 lastWrites uint64
// If the incremental number of writes since the last snapshot // If the incremental number of writes since the last snapshot
// exceeds the write Threshold, etcd will do a snapshot // exceeds the write Threshold, etcd will do a snapshot
writesThr uint64 writesThr uint64
} }
func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer { func NewPeerServer(name string, path string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store *store.Store) *PeerServer {
s := &PeerServer{ s := &PeerServer{
name: name, name: name,
url: url, url: url,
listenHost: listenHost, listenHost: listenHost,
tlsConf: tlsConf, tlsConf: tlsConf,
tlsInfo: tlsInfo, tlsInfo: tlsInfo,
registry: registry, registry: registry,
store: store, store: store,
snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000}, snapConf: &snapshotConf{time.Second * 3, 0, 20 * 1000},
followersStats: &raftFollowersStats{ followersStats: &raftFollowersStats{
Leader: name, Leader: name,
Followers: make(map[string]*raftFollowerStats), Followers: make(map[string]*raftFollowerStats),
}, },
serverStats: &raftServerStats{ serverStats: &raftServerStats{
StartTime: time.Now(), StartTime: time.Now(),
sendRateQueue: &statsQueue{ sendRateQueue: &statsQueue{
back: -1, back: -1,
}, },
recvRateQueue: &statsQueue{ recvRateQueue: &statsQueue{
back: -1, back: -1,
}, },
}, },
} }
// Create transporter for raft // Create transporter for raft
raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
// Create raft server // Create raft server
server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
s.Server = server s.Server = server
return s return s
} }
// Start the raft server // Start the raft server
func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) { func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
// LoadSnapshot // LoadSnapshot
if snapshot { if snapshot {
err := s.LoadSnapshot() err := s.LoadSnapshot()
if err == nil { if err == nil {
log.Debugf("%s finished load snapshot", s.name) log.Debugf("%s finished load snapshot", s.name)
} else { } else {
log.Debug(err) log.Debug(err)
} }
} }
s.SetElectionTimeout(ElectionTimeout) s.SetElectionTimeout(ElectionTimeout)
s.SetHeartbeatTimeout(HeartbeatTimeout) s.SetHeartbeatTimeout(HeartbeatTimeout)
s.Start() s.Start()
if s.IsLogEmpty() { if s.IsLogEmpty() {
// start as a leader in a new cluster // start as a leader in a new cluster
if len(cluster) == 0 { if len(cluster) == 0 {
s.startAsLeader() s.startAsLeader()
} else { } else {
s.startAsFollower(cluster) s.startAsFollower(cluster)
} }
} else { } else {
// Rejoin the previous cluster // Rejoin the previous cluster
cluster = s.registry.PeerURLs(s.Leader(), s.name) cluster = s.registry.PeerURLs(s.Leader(), s.name)
for i := 0; i < len(cluster); i++ { for i := 0; i < len(cluster); i++ {
u, err := url.Parse(cluster[i]) u, err := url.Parse(cluster[i])
if err != nil { if err != nil {
log.Debug("rejoin cannot parse url: ", err) log.Debug("rejoin cannot parse url: ", err)
} }
cluster[i] = u.Host cluster[i] = u.Host
} }
ok := s.joinCluster(cluster) ok := s.joinCluster(cluster)
if !ok { if !ok {
log.Warn("the entire cluster is down! this machine will restart the cluster.") log.Warn("the entire cluster is down! this machine will restart the cluster.")
} }
log.Debugf("%s restart as a follower", s.name) log.Debugf("%s restart as a follower", s.name)
} }
// open the snapshot // open the snapshot
if snapshot { if snapshot {
go s.monitorSnapshot() go s.monitorSnapshot()
} }
// start to response to raft requests // start to response to raft requests
go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server) go s.startTransport(s.tlsConf.Scheme, s.tlsConf.Server)
} }
// Retrieves the underlying Raft server. // Retrieves the underlying Raft server.
func (s *PeerServer) RaftServer() *raft.Server { func (s *PeerServer) RaftServer() *raft.Server {
return s.Server return s.Server
} }
// Associates the client server with the peer server. // Associates the client server with the peer server.
func (s *PeerServer) SetServer(server *Server) { func (s *PeerServer) SetServer(server *Server) {
s.server = server s.server = server
} }
// Get all the current logs // Get all the current logs
func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] GET %s/log", s.url) log.Debugf("[recv] GET %s/log", s.url)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(s.LogEntries()) json.NewEncoder(w).Encode(s.LogEntries())
} }
// Response to vote request // Response to vote request
func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{} rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq) err := decodeJsonRequest(req, rvreq)
if err == nil { if err == nil {
log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName) log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
if resp := s.RequestVote(rvreq); resp != nil { if resp := s.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp) json.NewEncoder(w).Encode(resp)
return return
} }
} }
log.Warnf("[vote] ERROR: %v", err) log.Warnf("[vote] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
// Response to append entries request // Response to append entries request
func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{} aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq) err := decodeJsonRequest(req, aereq)
if err == nil { if err == nil {
log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries)) log.Debugf("[recv] POST %s/log/append [%d]", s.url, len(aereq.Entries))
s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
if resp := s.AppendEntries(aereq); resp != nil { if resp := s.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp) json.NewEncoder(w).Encode(resp)
if !resp.Success { if !resp.Success {
log.Debugf("[Append Entry] Step back") log.Debugf("[Append Entry] Step back")
} }
return return
} }
} }
log.Warnf("[Append Entry] ERROR: %v", err) log.Warnf("[Append Entry] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
// Response to recover from snapshot request // Response to recover from snapshot request
func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRequest{} aereq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq) err := decodeJsonRequest(req, aereq)
if err == nil { if err == nil {
log.Debugf("[recv] POST %s/snapshot/ ", s.url) log.Debugf("[recv] POST %s/snapshot/ ", s.url)
if resp := s.RequestSnapshot(aereq); resp != nil { if resp := s.RequestSnapshot(aereq); resp != nil {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp) json.NewEncoder(w).Encode(resp)
return return
} }
} }
log.Warnf("[Snapshot] ERROR: %v", err) log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
// Response to recover from snapshot request // Response to recover from snapshot request
func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRecoveryRequest{} aereq := &raft.SnapshotRecoveryRequest{}
err := decodeJsonRequest(req, aereq) err := decodeJsonRequest(req, aereq)
if err == nil { if err == nil {
log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url) log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
if resp := s.SnapshotRecoveryRequest(aereq); resp != nil { if resp := s.SnapshotRecoveryRequest(aereq); resp != nil {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp) json.NewEncoder(w).Encode(resp)
return return
} }
} }
log.Warnf("[Snapshot] ERROR: %v", err) log.Warnf("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
// Get the port that listening for etcd connecting of the server // Get the port that listening for etcd connecting of the server
func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/etcdURL/ ", s.url) log.Debugf("[recv] Get %s/etcdURL/ ", s.url)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write([]byte(s.server.URL())) w.Write([]byte(s.server.URL()))
} }
// Response to the join request // Response to the join request
func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{} command := &JoinCommand{}
// Write CORS header. // Write CORS header.
if s.server.OriginAllowed("*") { if s.server.OriginAllowed("*") {
w.Header().Add("Access-Control-Allow-Origin", "*") w.Header().Add("Access-Control-Allow-Origin", "*")
} else if s.server.OriginAllowed(req.Header.Get("Origin")) { } else if s.server.OriginAllowed(req.Header.Get("Origin")) {
w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin")) w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin"))
} }
err := decodeJsonRequest(req, command) err := decodeJsonRequest(req, command)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
log.Debugf("Receive Join Request from %s", command.Name) log.Debugf("Receive Join Request from %s", command.Name)
err = s.dispatchRaftCommand(command, w, req) err = s.dispatchRaftCommand(command, w, req)
// Return status. // Return status.
if err != nil { if err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok { if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error()) log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w) etcdErr.Write(w)
} else { } else {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
} }
} }
} }
// Response to remove request // Response to remove request
func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "DELETE" { if req.Method != "DELETE" {
w.WriteHeader(http.StatusMethodNotAllowed) w.WriteHeader(http.StatusMethodNotAllowed)
return return
} }
nodeName := req.URL.Path[len("/remove/"):] nodeName := req.URL.Path[len("/remove/"):]
command := &RemoveCommand{ command := &RemoveCommand{
Name: nodeName, Name: nodeName,
} }
log.Debugf("[recv] Remove Request [%s]", command.Name) log.Debugf("[recv] Remove Request [%s]", command.Name)
s.dispatchRaftCommand(command, w, req) s.dispatchRaftCommand(command, w, req)
} }
// Response to the name request // Response to the name request
func (s *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/name/ ", s.url) log.Debugf("[recv] Get %s/name/ ", s.url)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write([]byte(s.name)) w.Write([]byte(s.name))
} }
// Response to the name request // Response to the name request
func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) { func (s *PeerServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
log.Debugf("[recv] Get %s/version/ ", s.url) log.Debugf("[recv] Get %s/version/ ", s.url)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write([]byte(PeerVersion)) w.Write([]byte(PeerVersion))
} }
func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error { func (s *PeerServer) dispatchRaftCommand(c raft.Command, w http.ResponseWriter, req *http.Request) error {
return s.dispatch(c, w, req) return s.dispatch(c, w, req)
} }
func (s *PeerServer) startAsLeader() { func (s *PeerServer) startAsLeader() {
// leader need to join self as a peer // leader need to join self as a peer
for { for {
_, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL())) _, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL()))
if err == nil { if err == nil {
break break
} }
} }
log.Debugf("%s start as a leader", s.name) log.Debugf("%s start as a leader", s.name)
} }
func (s *PeerServer) startAsFollower(cluster []string) { func (s *PeerServer) startAsFollower(cluster []string) {
// start as a follower in a existing cluster // start as a follower in a existing cluster
for i := 0; i < s.RetryTimes; i++ { for i := 0; i < s.RetryTimes; i++ {
ok := s.joinCluster(cluster) ok := s.joinCluster(cluster)
if ok { if ok {
return return
} }
log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) log.Warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
time.Sleep(time.Second * RetryInterval) time.Sleep(time.Second * RetryInterval)
} }
log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes) log.Fatalf("Cannot join the cluster via given machines after %x retries", s.RetryTimes)
} }
// Start to listen and response raft command // Start to listen and response raft command
func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) { func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url) log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
raftMux := http.NewServeMux() raftMux := http.NewServeMux()
server := &http.Server{ server := &http.Server{
Handler: raftMux, Handler: raftMux,
TLSConfig: &tlsConf, TLSConfig: &tlsConf,
Addr: s.listenHost, Addr: s.listenHost,
} }
// internal commands // internal commands
raftMux.HandleFunc("/name", s.NameHttpHandler) raftMux.HandleFunc("/name", s.NameHttpHandler)
raftMux.HandleFunc("/version", s.RaftVersionHttpHandler) raftMux.HandleFunc("/version", s.RaftVersionHttpHandler)
raftMux.HandleFunc("/join", s.JoinHttpHandler) raftMux.HandleFunc("/join", s.JoinHttpHandler)
raftMux.HandleFunc("/remove/", s.RemoveHttpHandler) raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
raftMux.HandleFunc("/vote", s.VoteHttpHandler) raftMux.HandleFunc("/vote", s.VoteHttpHandler)
raftMux.HandleFunc("/log", s.GetLogHttpHandler) raftMux.HandleFunc("/log", s.GetLogHttpHandler)
raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler) raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler) raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler)
raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler) raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler) raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
if scheme == "http" { if scheme == "http" {
log.Fatal(server.ListenAndServe()) log.Fatal(server.ListenAndServe())
} else { } else {
log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile)) log.Fatal(server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
} }
} }
@ -365,184 +364,182 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
// will need to do something more sophisticated later when we allow mixed // will need to do something more sophisticated later when we allow mixed
// version clusters. // version clusters.
func getVersion(t *transporter, versionURL url.URL) (string, error) { func getVersion(t *transporter, versionURL url.URL) (string, error) {
resp, req, err := t.Get(versionURL.String()) resp, req, err := t.Get(versionURL.String())
if err != nil { if err != nil {
return "", err return "", err
} }
defer resp.Body.Close() defer resp.Body.Close()
t.CancelWhenTimeout(req) t.CancelWhenTimeout(req)
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
return string(body), nil return string(body), nil
} }
func (s *PeerServer) joinCluster(cluster []string) bool { func (s *PeerServer) joinCluster(cluster []string) bool {
for _, machine := range cluster { for _, machine := range cluster {
if len(machine) == 0 { if len(machine) == 0 {
continue continue
} }
err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme) err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme)
if err == nil { if err == nil {
log.Debugf("%s success join to the cluster via machine %s", s.name, machine) log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
return true return true
} else { } else {
if _, ok := err.(etcdErr.Error); ok { if _, ok := err.(etcdErr.Error); ok {
log.Fatal(err) log.Fatal(err)
} }
log.Debugf("cannot join to cluster via machine %s %s", machine, err) log.Debugf("cannot join to cluster via machine %s %s", machine, err)
} }
} }
return false return false
} }
// Send join requests to machine. // Send join requests to machine.
func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error { func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error {
var b bytes.Buffer var b bytes.Buffer
// t must be ok // t must be ok
t, _ := server.Transporter().(*transporter) t, _ := server.Transporter().(*transporter)
// Our version must match the leaders version // Our version must match the leaders version
versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"} versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
version, err := getVersion(t, versionURL) version, err := getVersion(t, versionURL)
if err != nil { if err != nil {
return fmt.Errorf("Unable to join: %v", err) return fmt.Errorf("Unable to join: %v", err)
} }
// TODO: versioning of the internal protocol. See: // TODO: versioning of the internal protocol. See:
// Documentation/internatl-protocol-versioning.md // Documentation/internatl-protocol-versioning.md
if version != PeerVersion { if version != PeerVersion {
return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd") return fmt.Errorf("Unable to join: internal version mismatch, entire cluster must be running identical versions of etcd")
} }
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"} joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
log.Debugf("Send Join Request to %s", joinURL.String()) log.Debugf("Send Join Request to %s", joinURL.String())
resp, req, err := t.Post(joinURL.String(), &b) resp, req, err := t.Post(joinURL.String(), &b)
for { for {
if err != nil { if err != nil {
return fmt.Errorf("Unable to join: %v", err) return fmt.Errorf("Unable to join: %v", err)
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
t.CancelWhenTimeout(req) t.CancelWhenTimeout(req)
if resp.StatusCode == http.StatusOK { if resp.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body) b, _ := ioutil.ReadAll(resp.Body)
s.joinIndex, _ = binary.Uvarint(b) s.joinIndex, _ = binary.Uvarint(b)
return nil return nil
} }
if resp.StatusCode == http.StatusTemporaryRedirect { if resp.StatusCode == http.StatusTemporaryRedirect {
address := resp.Header.Get("Location") address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address) log.Debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL())) json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
resp, req, err = t.Post(address, &b) resp, req, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest { } else if resp.StatusCode == http.StatusBadRequest {
log.Debug("Reach max number machines in the cluster") log.Debug("Reach max number machines in the cluster")
decoder := json.NewDecoder(resp.Body) decoder := json.NewDecoder(resp.Body)
err := &etcdErr.Error{} err := &etcdErr.Error{}
decoder.Decode(err) decoder.Decode(err)
return *err return *err
} else { } else {
return fmt.Errorf("Unable to join") return fmt.Errorf("Unable to join")
} }
} }
} }
return fmt.Errorf("Unable to join: %v", err)
} }
func (s *PeerServer) Stats() []byte { func (s *PeerServer) Stats() []byte {
s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String() s.serverStats.LeaderInfo.Uptime = time.Now().Sub(s.serverStats.LeaderInfo.startTime).String()
queue := s.serverStats.sendRateQueue queue := s.serverStats.sendRateQueue
s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate() s.serverStats.SendingPkgRate, s.serverStats.SendingBandwidthRate = queue.Rate()
queue = s.serverStats.recvRateQueue queue = s.serverStats.recvRateQueue
s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate() s.serverStats.RecvingPkgRate, s.serverStats.RecvingBandwidthRate = queue.Rate()
b, _ := json.Marshal(s.serverStats) b, _ := json.Marshal(s.serverStats)
return b return b
} }
func (s *PeerServer) PeerStats() []byte { func (s *PeerServer) PeerStats() []byte {
if s.State() == raft.Leader { if s.State() == raft.Leader {
b, _ := json.Marshal(s.followersStats) b, _ := json.Marshal(s.followersStats)
return b return b
} }
return nil return nil
} }
func (s *PeerServer) monitorSnapshot() { func (s *PeerServer) monitorSnapshot() {
for { for {
time.Sleep(s.snapConf.checkingInterval) time.Sleep(s.snapConf.checkingInterval)
currentWrites := 0 currentWrites := 0
if uint64(currentWrites) > s.snapConf.writesThr { if uint64(currentWrites) > s.snapConf.writesThr {
s.TakeSnapshot() s.TakeSnapshot()
s.snapConf.lastWrites = 0 s.snapConf.lastWrites = 0
} }
} }
} }
func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error { func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
if s.State() == raft.Leader { if s.State() == raft.Leader {
if response, err := s.Do(c); err != nil { if response, err := s.Do(c); err != nil {
return err return err
} else { } else {
if response == nil { if response == nil {
return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(300, "Empty response from raft", store.UndefIndex, store.UndefTerm)
} }
event, ok := response.(*store.Event) event, ok := response.(*store.Event)
if ok { if ok {
bytes, err := json.Marshal(event) bytes, err := json.Marshal(event)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write(bytes) w.Write(bytes)
return nil return nil
} }
bytes, _ := response.([]byte) bytes, _ := response.([]byte)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write(bytes) w.Write(bytes)
return nil return nil
} }
} else { } else {
leader := s.Leader() leader := s.Leader()
// current no leader // current no leader
if leader == "" { if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
} }
url, _ := s.registry.PeerURL(leader) url, _ := s.registry.PeerURL(leader)
redirect(url, w, req) redirect(url, w, req)
return nil return nil
} }
} }

View File

@ -1,14 +1,14 @@
package server package server
import ( import (
"fmt" "fmt"
"net/url" "net/url"
"path" "path"
"strings" "strings"
"sync" "sync"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
) )
// The location of the machine URL data. // The location of the machine URL data.
@ -16,174 +16,174 @@ const RegistryKey = "/_etcd/machines"
// The Registry stores URL information for nodes. // The Registry stores URL information for nodes.
type Registry struct { type Registry struct {
sync.Mutex sync.Mutex
store *store.Store store *store.Store
nodes map[string]*node nodes map[string]*node
} }
// The internal storage format of the registry. // The internal storage format of the registry.
type node struct { type node struct {
peerVersion string peerVersion string
peerURL string peerURL string
url string url string
} }
// Creates a new Registry. // Creates a new Registry.
func NewRegistry(s *store.Store) *Registry { func NewRegistry(s *store.Store) *Registry {
return &Registry{ return &Registry{
store: s, store: s,
nodes: make(map[string]*node), nodes: make(map[string]*node),
} }
} }
// Adds a node to the registry. // Adds a node to the registry.
func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) { func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// Write data to store. // Write data to store.
key := path.Join(RegistryKey, name) key := path.Join(RegistryKey, name)
value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion) value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
r.store.Create(key, value, false, false, store.Permanent, commitIndex, term) r.store.Create(key, value, false, false, store.Permanent, commitIndex, term)
} }
// Removes a node from the registry. // Removes a node from the registry.
func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error { func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// Remove the key from the store. // Remove the key from the store.
_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term) _, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
return err return err
} }
// Returns the number of nodes in the cluster. // Returns the number of nodes in the cluster.
func (r *Registry) Count() int { func (r *Registry) Count() int {
e, err := r.store.Get(RegistryKey, false, false, 0, 0) e, err := r.store.Get(RegistryKey, false, false, 0, 0)
if err != nil { if err != nil {
return 0 return 0
} }
return len(e.KVPairs) return len(e.KVPairs)
} }
// Retrieves the URL for a given node by name. // Retrieves the URL for a given node by name.
func (r *Registry) URL(name string) (string, bool) { func (r *Registry) URL(name string) (string, bool) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
return r.url(name) return r.url(name)
} }
func (r *Registry) url(name string) (string, bool) { func (r *Registry) url(name string) (string, bool) {
if r.nodes[name] == nil { if r.nodes[name] == nil {
r.load(name) r.load(name)
} }
if node := r.nodes[name]; node != nil { if node := r.nodes[name]; node != nil {
return node.url, true return node.url, true
} }
return "", false return "", false
} }
// Retrieves the URLs for all nodes. // Retrieves the URLs for all nodes.
func (r *Registry) URLs(leaderName, selfName string) []string { func (r *Registry) URLs(leaderName, selfName string) []string {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// Build list including the leader and self. // Build list including the leader and self.
urls := make([]string, 0) urls := make([]string, 0)
if url, _ := r.url(leaderName); len(url) > 0 { if url, _ := r.url(leaderName); len(url) > 0 {
urls = append(urls, url) urls = append(urls, url)
} }
if url, _ := r.url(selfName); len(url) > 0 { if url, _ := r.url(selfName); len(url) > 0 {
urls = append(urls, url) urls = append(urls, url)
} }
// Retrieve a list of all nodes. // Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
// Lookup the URL for each one. // Lookup the URL for each one.
for _, pair := range e.KVPairs { for _, pair := range e.KVPairs {
if url, _ := r.url(pair.Key); len(url) > 0 { if url, _ := r.url(pair.Key); len(url) > 0 {
urls = append(urls, url) urls = append(urls, url)
} }
} }
} }
log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
return urls return urls
} }
// Retrieves the peer URL for a given node by name. // Retrieves the peer URL for a given node by name.
func (r *Registry) PeerURL(name string) (string, bool) { func (r *Registry) PeerURL(name string) (string, bool) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
return r.peerURL(name) return r.peerURL(name)
} }
func (r *Registry) peerURL(name string) (string, bool) { func (r *Registry) peerURL(name string) (string, bool) {
if r.nodes[name] == nil { if r.nodes[name] == nil {
r.load(name) r.load(name)
} }
if node := r.nodes[name]; node != nil { if node := r.nodes[name]; node != nil {
return node.peerURL, true return node.peerURL, true
} }
return "", false return "", false
} }
// Retrieves the peer URLs for all nodes. // Retrieves the peer URLs for all nodes.
func (r *Registry) PeerURLs(leaderName, selfName string) []string { func (r *Registry) PeerURLs(leaderName, selfName string) []string {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// Build list including the leader and self. // Build list including the leader and self.
urls := make([]string, 0) urls := make([]string, 0)
if url, _ := r.peerURL(leaderName); len(url) > 0 { if url, _ := r.peerURL(leaderName); len(url) > 0 {
urls = append(urls, url) urls = append(urls, url)
} }
if url, _ := r.peerURL(selfName); len(url) > 0 { if url, _ := r.peerURL(selfName); len(url) > 0 {
urls = append(urls, url) urls = append(urls, url)
} }
// Retrieve a list of all nodes. // Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil { if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
// Lookup the URL for each one. // Lookup the URL for each one.
for _, pair := range e.KVPairs { for _, pair := range e.KVPairs {
if url, _ := r.peerURL(pair.Key); len(url) > 0 { if url, _ := r.peerURL(pair.Key); len(url) > 0 {
urls = append(urls, url) urls = append(urls, url)
} }
} }
} }
log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ",")) log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
return urls return urls
} }
// Loads the given node by name from the store into the cache. // Loads the given node by name from the store into the cache.
func (r *Registry) load(name string) { func (r *Registry) load(name string) {
if name == "" { if name == "" {
return return
} }
// Retrieve from store. // Retrieve from store.
e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0) e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0)
if err != nil { if err != nil {
return return
} }
// Parse as a query string. // Parse as a query string.
m, err := url.ParseQuery(e.Value) m, err := url.ParseQuery(e.Value)
if err != nil { if err != nil {
panic(fmt.Sprintf("Failed to parse machines entry: %s", name)) panic(fmt.Sprintf("Failed to parse machines entry: %s", name))
} }
// Create node. // Create node.
r.nodes[name] = &node{ r.nodes[name] = &node{
url: m["etcd"][0], url: m["etcd"][0],
peerURL: m["raft"][0], peerURL: m["raft"][0],
peerVersion: m["raftVersion"][0], peerVersion: m["raftVersion"][0],
} }
} }

View File

@ -1,8 +1,8 @@
package server package server
import ( import (
"fmt"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@ -38,6 +38,7 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
TLSConfig: &tlsConf.Server, TLSConfig: &tlsConf.Server,
Addr: listenHost, Addr: listenHost,
}, },
name: name,
store: store, store: store,
registry: registry, registry: registry,
url: urlStr, url: urlStr,
@ -134,7 +135,7 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque
// Wrap the standard HandleFunc interface to pass in the server reference. // Wrap the standard HandleFunc interface to pass in the server reference.
return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
// Log request. // Log request.
log.Debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr) log.Debugf("[recv] %s %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr)
// Write CORS header. // Write CORS header.
if s.OriginAllowed("*") { if s.OriginAllowed("*") {
@ -242,28 +243,28 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro
// Handler to return all the known machines in the current cluster. // Handler to return all the known machines in the current cluster.
func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error { func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error {
machines := s.registry.URLs(s.peerServer.Leader(), s.name) machines := s.registry.URLs(s.peerServer.Leader(), s.name)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(machines, ", "))) w.Write([]byte(strings.Join(machines, ", ")))
return nil return nil
} }
// Retrieves stats on the Raft server. // Retrieves stats on the Raft server.
func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error { func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Write(s.peerServer.Stats()) w.Write(s.peerServer.Stats())
return nil return nil
} }
// Retrieves stats on the leader. // Retrieves stats on the leader.
func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error { func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
if s.peerServer.State() == raft.Leader { if s.peerServer.State() == raft.Leader {
w.Write(s.peerServer.PeerStats()) w.Write(s.peerServer.PeerStats())
return nil return nil
} }
leader := s.peerServer.Leader() leader := s.peerServer.Leader()
if leader == "" { if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
} }
hostname, _ := s.registry.URL(leader) hostname, _ := s.registry.URL(leader)
redirect(hostname, w, req) redirect(hostname, w, req)
@ -272,8 +273,8 @@ func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request)
// Retrieves stats on the leader. // Retrieves stats on the leader.
func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error { func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Write(s.store.JsonStats()) w.Write(s.store.JsonStats())
return nil return nil
} }
// Executes a speed test to evaluate the performance of update replication. // Executes a speed test to evaluate the performance of update replication.
@ -284,8 +285,8 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
go func() { go func() {
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
c := &store.UpdateCommand{ c := &store.UpdateCommand{
Key: "foo", Key: "foo",
Value: "bar", Value: "bar",
ExpireTime: time.Unix(0, 0), ExpireTime: time.Unix(0, 0),
} }
s.peerServer.Do(c) s.peerServer.Do(c)

View File

@ -1,15 +1,15 @@
package server package server
import ( import (
"time" "time"
) )
const ( const (
// The amount of time to elapse without a heartbeat before becoming a candidate. // The amount of time to elapse without a heartbeat before becoming a candidate.
ElectionTimeout = 200 * time.Millisecond ElectionTimeout = 200 * time.Millisecond
// The frequency by which heartbeats are sent to followers. // The frequency by which heartbeats are sent to followers.
HeartbeatTimeout = 50 * time.Millisecond HeartbeatTimeout = 50 * time.Millisecond
RetryInterval = 10 RetryInterval = 10
) )

View File

@ -1,17 +1,17 @@
package server package server
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net" "net"
"net/http" "net/http"
"time" "time"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
"github.com/coreos/go-raft" "github.com/coreos/go-raft"
) )
// Timeout for setup internal raft http connection // Timeout for setup internal raft http connection
@ -28,200 +28,200 @@ var tranTimeout = ElectionTimeout
// Transporter layer for communication between raft nodes // Transporter layer for communication between raft nodes
type transporter struct { type transporter struct {
client *http.Client client *http.Client
transport *http.Transport transport *http.Transport
peerServer *PeerServer peerServer *PeerServer
} }
// Create transporter using by raft server // Create transporter using by raft server
// Create http or https transporter based on // Create http or https transporter based on
// whether the user give the server cert and key // whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
t := transporter{} t := transporter{}
tr := &http.Transport{ tr := &http.Transport{
Dial: dialWithTimeout, Dial: dialWithTimeout,
ResponseHeaderTimeout: responseHeaderTimeout, ResponseHeaderTimeout: responseHeaderTimeout,
} }
if scheme == "https" { if scheme == "https" {
tr.TLSClientConfig = &tlsConf tr.TLSClientConfig = &tlsConf
tr.DisableCompression = true tr.DisableCompression = true
} }
t.client = &http.Client{Transport: tr} t.client = &http.Client{Transport: tr}
t.transport = tr t.transport = tr
t.peerServer = peerServer t.peerServer = peerServer
return &t return &t
} }
// Dial with timeout // Dial with timeout
func dialWithTimeout(network, addr string) (net.Conn, error) { func dialWithTimeout(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, dailTimeout) return net.DialTimeout(network, addr, dailTimeout)
} }
// Sends AppendEntries RPCs to a peer when the server is the leader. // Sends AppendEntries RPCs to a peer when the server is the leader.
func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse var aersp *raft.AppendEntriesResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req) json.NewEncoder(&b).Encode(req)
size := b.Len() size := b.Len()
t.peerServer.serverStats.SendAppendReq(size) t.peerServer.serverStats.SendAppendReq(size)
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send LogEntries to %s ", u) log.Debugf("Send LogEntries to %s ", u)
thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name] thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name]
if !ok { //this is the first time this follower has been seen if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{} thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63 thisFollowerStats.Latency.Minimum = 1 << 63
t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats
} }
start := time.Now() start := time.Now()
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
end := time.Now() end := time.Now()
if err != nil { if err != nil {
log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err) log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok { if ok {
thisFollowerStats.Fail() thisFollowerStats.Fail()
} }
} else { } else {
if ok { if ok {
thisFollowerStats.Succ(end.Sub(start)) thisFollowerStats.Succ(end.Sub(start))
} }
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest) t.CancelWhenTimeout(httpRequest)
aersp = &raft.AppendEntriesResponse{} aersp = &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp return aersp
} }
} }
return aersp return aersp
} }
// Sends RequestVote RPCs to a peer when the server is the candidate. // Sends RequestVote RPCs to a peer when the server is the candidate.
func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var rvrsp *raft.RequestVoteResponse var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req) json.NewEncoder(&b).Encode(req)
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Vote to %s", u) log.Debugf("Send Vote to %s", u)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
if err != nil { if err != nil {
log.Debugf("Cannot send VoteRequest to %s : %s", u, err) log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest) t.CancelWhenTimeout(httpRequest)
rvrsp := &raft.RequestVoteResponse{} rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp return rvrsp
} }
} }
return rvrsp return rvrsp
} }
// Sends SnapshotRequest RPCs to a peer when the server is the candidate. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var aersp *raft.SnapshotResponse var aersp *raft.SnapshotResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req) json.NewEncoder(&b).Encode(req)
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex) req.LastTerm, req.LastIndex)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
if err != nil { if err != nil {
log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
t.CancelWhenTimeout(httpRequest) t.CancelWhenTimeout(httpRequest)
aersp = &raft.SnapshotResponse{} aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp return aersp
} }
} }
return aersp return aersp
} }
// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
var aersp *raft.SnapshotRecoveryResponse var aersp *raft.SnapshotRecoveryResponse
var b bytes.Buffer var b bytes.Buffer
json.NewEncoder(&b).Encode(req) json.NewEncoder(&b).Encode(req)
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex) req.LastTerm, req.LastIndex)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
if err != nil { if err != nil {
log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
} }
if resp != nil { if resp != nil {
defer resp.Body.Close() defer resp.Body.Close()
aersp = &raft.SnapshotRecoveryResponse{} aersp = &raft.SnapshotRecoveryResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp return aersp
} }
} }
return aersp return aersp
} }
// Send server side POST request // Send server side POST request
func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) { func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("POST", urlStr, body) req, _ := http.NewRequest("POST", urlStr, body)
resp, err := t.client.Do(req) resp, err := t.client.Do(req)
return resp, req, err return resp, req, err
} }
// Send server side GET request // Send server side GET request
func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) { func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
req, _ := http.NewRequest("GET", urlStr, nil) req, _ := http.NewRequest("GET", urlStr, nil)
resp, err := t.client.Do(req) resp, err := t.client.Do(req)
return resp, req, err return resp, req, err
} }
// Cancel the on fly HTTP transaction when timeout happens. // Cancel the on fly HTTP transaction when timeout happens.
func (t *transporter) CancelWhenTimeout(req *http.Request) { func (t *transporter) CancelWhenTimeout(req *http.Request) {
go func() { go func() {
time.Sleep(ElectionTimeout) time.Sleep(ElectionTimeout)
t.transport.CancelRequest(req) t.transport.CancelRequest(req)
}() }()
} }

View File

@ -1,27 +1,26 @@
package server package server
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
) )
func decodeJsonRequest(req *http.Request, data interface{}) error { func decodeJsonRequest(req *http.Request, data interface{}) error {
decoder := json.NewDecoder(req.Body) decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&data); err != nil && err != io.EOF { if err := decoder.Decode(&data); err != nil && err != io.EOF {
log.Warnf("Malformed json request: %v", err) log.Warnf("Malformed json request: %v", err)
return fmt.Errorf("Malformed json request: %v", err) return fmt.Errorf("Malformed json request: %v", err)
} }
return nil return nil
} }
func redirect(hostname string, w http.ResponseWriter, req *http.Request) { func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
path := req.URL.Path path := req.URL.Path
url := hostname + path url := hostname + path
log.Debugf("Redirect to %s", url) log.Debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect) http.Redirect(w, req, url, http.StatusTemporaryRedirect)
} }

View File

@ -1,9 +1,9 @@
package v1 package v1
import ( import (
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
"net/http" "net/http"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
) )
// Removes a key from the store. // Removes a key from the store.

View File

@ -1,9 +1,9 @@
package v1 package v1
import ( import (
"net/http"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/go-raft" "github.com/coreos/go-raft"
"net/http"
) )
// The Server interface provides all the methods required for the v1 API. // The Server interface provides all the methods required for the v1 API.

View File

@ -1,29 +1,29 @@
package v2 package v2
import ( import (
"net/http" "net/http"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req) vars := mux.Vars(req)
key := "/" + vars["key"] key := "/" + vars["key"]
value := req.FormValue("value") value := req.FormValue("value")
expireTime, err := store.TTL(req.FormValue("ttl")) expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
} }
c := &store.CreateCommand{ c := &store.CreateCommand{
Key: key, Key: key,
Value: value, Value: value,
ExpireTime: expireTime, ExpireTime: expireTime,
IncrementalSuffix: (req.FormValue("incremental") == "true"), IncrementalSuffix: (req.FormValue("incremental") == "true"),
} }
return s.Dispatch(c, w, req) return s.Dispatch(c, w, req)
} }

View File

@ -1,20 +1,20 @@
package v2 package v2
import ( import (
"net/http" "net/http"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req) vars := mux.Vars(req)
key := "/" + vars["key"] key := "/" + vars["key"]
c := &store.DeleteCommand{ c := &store.DeleteCommand{
Key: key, Key: key,
Recursive: (req.FormValue("recursive") == "true"), Recursive: (req.FormValue("recursive") == "true"),
} }
return s.Dispatch(c, w, req) return s.Dispatch(c, w, req)
} }

View File

@ -1,69 +1,69 @@
package v2 package v2
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log" "github.com/coreos/etcd/log"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/go-raft" "github.com/coreos/go-raft"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
var err error var err error
var event *store.Event var event *store.Event
vars := mux.Vars(req) vars := mux.Vars(req)
key := "/" + vars["key"] key := "/" + vars["key"]
// Help client to redirect the request to the current leader // Help client to redirect the request to the current leader
if req.FormValue("consistent") == "true" && s.State() != raft.Leader { if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
leader := s.Leader() leader := s.Leader()
hostname, _ := s.PeerURL(leader) hostname, _ := s.PeerURL(leader)
url := hostname + req.URL.Path url := hostname + req.URL.Path
log.Debugf("Redirect to %s", url) log.Debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect) http.Redirect(w, req, url, http.StatusTemporaryRedirect)
return nil return nil
} }
recursive := (req.FormValue("recursive") == "true") recursive := (req.FormValue("recursive") == "true")
sorted := (req.FormValue("sorted") == "true") sorted := (req.FormValue("sorted") == "true")
if req.FormValue("wait") == "true" { // watch if req.FormValue("wait") == "true" { // watch
// Create a command to watch from a given index (default 0). // Create a command to watch from a given index (default 0).
var sinceIndex uint64 = 0 var sinceIndex uint64 = 0
if req.Method == "POST" { if req.Method == "POST" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64) sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64)
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
} }
} }
// Start the watcher on the store. // Start the watcher on the store.
c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term()) c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term())
if err != nil { if err != nil {
return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm) return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
} }
event = <-c event = <-c
} else { //get } else { //get
// Retrieve the key from the store. // Retrieve the key from the store.
event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term()) event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term())
if err != nil { if err != nil {
return err return err
} }
} }
w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index)) w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term)) w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
b, _ := json.Marshal(event) b, _ := json.Marshal(event)
w.Write(b) w.Write(b)
return nil return nil
} }

View File

@ -1,64 +1,64 @@
package v2 package v2
import ( import (
"net/http" "net/http"
"strconv" "strconv"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/go-raft" "github.com/coreos/go-raft"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error { func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req) vars := mux.Vars(req)
key := "/" + vars["key"] key := "/" + vars["key"]
req.ParseForm() req.ParseForm()
value := req.Form.Get("value") value := req.Form.Get("value")
expireTime, err := store.TTL(req.Form.Get("ttl")) expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
} }
// Update should give at least one option // Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 { if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
} }
prevValue, valueOk := req.Form["prevValue"] prevValue, valueOk := req.Form["prevValue"]
prevIndexStr, indexOk := req.Form["prevIndex"] prevIndexStr, indexOk := req.Form["prevIndex"]
var c raft.Command var c raft.Command
if !valueOk && !indexOk { // update without test if !valueOk && !indexOk { // update without test
c = &store.UpdateCommand{ c = &store.UpdateCommand{
Key: key, Key: key,
Value: value, Value: value,
ExpireTime: expireTime, ExpireTime: expireTime,
} }
} else { // update with test } else { // update with test
var prevIndex uint64 var prevIndex uint64
if indexOk { if indexOk {
prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64) prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
// bad previous index // bad previous index
if err != nil { if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm) return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
} }
} else { } else {
prevIndex = 0 prevIndex = 0
} }
c = &store.TestAndSetCommand{ c = &store.TestAndSetCommand{
Key: key, Key: key,
Value: value, Value: value,
PrevValue: prevValue[0], PrevValue: prevValue[0],
PrevIndex: prevIndex, PrevIndex: prevIndex,
} }
} }
return s.Dispatch(c, w, req) return s.Dispatch(c, w, req)
} }

View File

@ -1,18 +1,18 @@
package v2 package v2
import ( import (
"net/http" "github.com/coreos/etcd/store"
"github.com/coreos/etcd/store" "github.com/coreos/go-raft"
"github.com/coreos/go-raft" "net/http"
) )
// The Server interface provides all the methods required for the v2 API. // The Server interface provides all the methods required for the v2 API.
type Server interface { type Server interface {
State() string State() string
Leader() string Leader() string
CommitIndex() uint64 CommitIndex() uint64
Term() uint64 Term() uint64
PeerURL(string) (string, bool) PeerURL(string) (string, bool)
Store() *store.Store Store() *store.Store
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
} }

View File

@ -7,7 +7,7 @@ import (
) )
func init() { func init() {
raft.RegisterCommand(&CreateCommand{}) raft.RegisterCommand(&CreateCommand{})
} }
// Create command // Create command

View File

@ -6,7 +6,7 @@ import (
) )
func init() { func init() {
raft.RegisterCommand(&DeleteCommand{}) raft.RegisterCommand(&DeleteCommand{})
} }
// The DeleteCommand removes a key from the Store. // The DeleteCommand removes a key from the Store.

View File

@ -77,5 +77,3 @@ func (event *Event) Response() interface{} {
return responses return responses
} }
} }

View File

@ -1,112 +1,112 @@
package store package store
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
) )
type EventHistory struct { type EventHistory struct {
Queue eventQueue Queue eventQueue
StartIndex uint64 StartIndex uint64
LastIndex uint64 LastIndex uint64
LastTerm uint64 LastTerm uint64
DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue
rwl sync.RWMutex rwl sync.RWMutex
} }
func newEventHistory(capacity int) *EventHistory { func newEventHistory(capacity int) *EventHistory {
return &EventHistory{ return &EventHistory{
Queue: eventQueue{ Queue: eventQueue{
Capacity: capacity, Capacity: capacity,
Events: make([]*Event, capacity), Events: make([]*Event, capacity),
}, },
} }
} }
// addEvent function adds event into the eventHistory // addEvent function adds event into the eventHistory
func (eh *EventHistory) addEvent(e *Event) *Event { func (eh *EventHistory) addEvent(e *Event) *Event {
eh.rwl.Lock() eh.rwl.Lock()
defer eh.rwl.Unlock() defer eh.rwl.Unlock()
var duped uint64 var duped uint64
if e.Index == UndefIndex { if e.Index == UndefIndex {
e.Index = eh.LastIndex e.Index = eh.LastIndex
e.Term = eh.LastTerm e.Term = eh.LastTerm
duped = 1 duped = 1
} }
eh.Queue.insert(e) eh.Queue.insert(e)
eh.LastIndex = e.Index eh.LastIndex = e.Index
eh.LastTerm = e.Term eh.LastTerm = e.Term
eh.DupCnt += duped eh.DupCnt += duped
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
return e return e
} }
// scan function is enumerating events from the index in history and // scan function is enumerating events from the index in history and
// stops till the first point where the key has identified prefix // stops till the first point where the key has identified prefix
func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) { func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
eh.rwl.RLock() eh.rwl.RLock()
defer eh.rwl.RUnlock() defer eh.rwl.RUnlock()
start := index - eh.StartIndex start := index - eh.StartIndex
// the index should locate after the event history's StartIndex // the index should locate after the event history's StartIndex
if start < 0 { if start < 0 {
return nil, return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared, etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]", fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), UndefIndex, UndefTerm) eh.StartIndex, index), UndefIndex, UndefTerm)
} }
// the index should locate before the size of the queue minus the duplicate count // the index should locate before the size of the queue minus the duplicate count
if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
return nil, nil return nil, nil
} }
i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
for { for {
e := eh.Queue.Events[i] e := eh.Queue.Events[i]
if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
return e, nil return e, nil
} }
i = (i + 1) % eh.Queue.Capacity i = (i + 1) % eh.Queue.Capacity
if i == eh.Queue.back() { // find nothing, return and watch from current index if i == eh.Queue.back() { // find nothing, return and watch from current index
return nil, nil return nil, nil
} }
} }
} }
// clone will be protected by a stop-world lock // clone will be protected by a stop-world lock
// do not need to obtain internal lock // do not need to obtain internal lock
func (eh *EventHistory) clone() *EventHistory { func (eh *EventHistory) clone() *EventHistory {
clonedQueue := eventQueue{ clonedQueue := eventQueue{
Capacity: eh.Queue.Capacity, Capacity: eh.Queue.Capacity,
Events: make([]*Event, eh.Queue.Capacity), Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size, Size: eh.Queue.Size,
Front: eh.Queue.Front, Front: eh.Queue.Front,
} }
for i, e := range eh.Queue.Events { for i, e := range eh.Queue.Events {
clonedQueue.Events[i] = e clonedQueue.Events[i] = e
} }
return &EventHistory{ return &EventHistory{
StartIndex: eh.StartIndex, StartIndex: eh.StartIndex,
Queue: clonedQueue, Queue: clonedQueue,
LastIndex: eh.LastIndex, LastIndex: eh.LastIndex,
LastTerm: eh.LastTerm, LastTerm: eh.LastTerm,
DupCnt: eh.DupCnt, DupCnt: eh.DupCnt,
} }
} }

View File

@ -1,26 +1,25 @@
package store package store
type eventQueue struct { type eventQueue struct {
Events []*Event Events []*Event
Size int Size int
Front int Front int
Capacity int Capacity int
} }
func (eq *eventQueue) back() int { func (eq *eventQueue) back() int {
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
} }
func (eq *eventQueue) insert(e *Event) { func (eq *eventQueue) insert(e *Event) {
index := (eq.back() + 1) % eq.Capacity index := (eq.back() + 1) % eq.Capacity
eq.Events[index] = e eq.Events[index] = e
if eq.Size == eq.Capacity { //dequeue if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity eq.Front = (index + 1) % eq.Capacity
} else { } else {
eq.Size++ eq.Size++
} }
} }

View File

@ -8,7 +8,7 @@ import (
) )
func init() { func init() {
raft.RegisterCommand(&TestAndSetCommand{}) raft.RegisterCommand(&TestAndSetCommand{})
} }
// The TestAndSetCommand performs a conditional update on a key in the store. // The TestAndSetCommand performs a conditional update on a key in the store.

View File

@ -1,21 +1,20 @@
package store package store
import ( import (
"strconv" "strconv"
"time" "time"
) )
// Convert string duration to time format // Convert string duration to time format
func TTL(duration string) (time.Time, error) { func TTL(duration string) (time.Time, error) {
if duration != "" { if duration != "" {
duration, err := strconv.Atoi(duration) duration, err := strconv.Atoi(duration)
if err != nil { if err != nil {
return Permanent, err return Permanent, err
} }
return time.Now().Add(time.Second * (time.Duration)(duration)), nil return time.Now().Add(time.Second * (time.Duration)(duration)), nil
} else { } else {
return Permanent, nil return Permanent, nil
} }
} }

View File

@ -8,7 +8,7 @@ import (
) )
func init() { func init() {
raft.RegisterCommand(&UpdateCommand{}) raft.RegisterCommand(&UpdateCommand{})
} }
// The UpdateCommand updates the value of a key in the Store. // The UpdateCommand updates the value of a key in the Store.

View File

@ -14,7 +14,6 @@ import (
// HTTP Utilities // HTTP Utilities
//-------------------------------------- //--------------------------------------
// sanitizeURL will cleanup a host string in the format hostname:port and // sanitizeURL will cleanup a host string in the format hostname:port and
// attach a schema. // attach a schema.
func sanitizeURL(host string, defaultScheme string) string { func sanitizeURL(host string, defaultScheme string) string {
@ -87,4 +86,3 @@ func runCPUProfile() {
} }
}() }()
} }