Refactored.

This commit is contained in:
Ben Johnson 2013-10-12 15:56:43 -06:00
parent bb9401544a
commit 8670e1b7aa
26 changed files with 452 additions and 476 deletions

View File

@ -8,6 +8,9 @@ import (
"io/ioutil"
"os"
"path/filepath"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/server"
)
//--------------------------------------
@ -30,7 +33,7 @@ func getInfo(path string) *Info {
os.Remove(confPath)
os.RemoveAll(snapshotPath)
} else if info := readInfo(infoPath); info != nil {
infof("Found node configuration in '%s'. Ignoring flags", infoPath)
log.Infof("Found node configuration in '%s'. Ignoring flags", infoPath)
return info
}
@ -41,10 +44,10 @@ func getInfo(path string) *Info {
content, _ := json.MarshalIndent(info, "", " ")
content = []byte(string(content) + "\n")
if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
fatalf("Unable to write info to file: %v", err)
log.Fatalf("Unable to write info to file: %v", err)
}
infof("Wrote node configuration to '%s'", infoPath)
log.Infof("Wrote node configuration to '%s'", infoPath)
return info
}
@ -57,7 +60,7 @@ func readInfo(path string) *Info {
if os.IsNotExist(err) {
return nil
}
fatal(err)
log.Fatal(err)
}
defer file.Close()
@ -65,19 +68,19 @@ func readInfo(path string) *Info {
content, err := ioutil.ReadAll(file)
if err != nil {
fatalf("Unable to read info: %v", err)
log.Fatalf("Unable to read info: %v", err)
return nil
}
if err = json.Unmarshal(content, &info); err != nil {
fatalf("Unable to parse info: %v", err)
log.Fatalf("Unable to parse info: %v", err)
return nil
}
return info
}
func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
func tlsConfigFromInfo(info server.TLSInfo) (t server.TLSConfig, ok bool) {
var keyFile, certFile, CAFile string
var tlsCert tls.Certificate
var err error
@ -101,7 +104,7 @@ func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
fatal(err)
log.Fatal(err)
}
t.Scheme = "https"

40
etcd.go
View File

@ -1,12 +1,10 @@
package main
import (
"crypto/tls"
"flag"
"io/ioutil"
"os"
"strings"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/server"
@ -101,18 +99,10 @@ type Info struct {
RaftListenHost string `json:"raftListenHost"`
EtcdListenHost string `json:"etcdListenHost"`
RaftTLS TLSInfo `json:"raftTLS"`
EtcdTLS TLSInfo `json:"etcdTLS"`
RaftTLS server.TLSInfo `json:"raftTLS"`
EtcdTLS server.TLSInfo `json:"etcdTLS"`
}
//------------------------------------------------------------------------------
//
// Variables
//
//------------------------------------------------------------------------------
var etcdStore *store.Store
//------------------------------------------------------------------------------
//
// Functions
@ -131,7 +121,7 @@ func main() {
}
if veryVerbose {
verbose = true
log.Verbose = true
raft.SetLogLevel(raft.Debug)
}
@ -140,7 +130,7 @@ func main() {
} else if machinesFile != "" {
b, err := ioutil.ReadFile(machinesFile)
if err != nil {
fatalf("Unable to read the given machines file: %s", err)
log.Fatalf("Unable to read the given machines file: %s", err)
}
cluster = strings.Split(string(b), ",")
}
@ -148,17 +138,17 @@ func main() {
// Check TLS arguments
raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
if !ok {
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
log.Fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
}
etcdTLSConfig, ok := tlsConfigFromInfo(argInfo.EtcdTLS)
if !ok {
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
log.Fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
}
argInfo.Name = strings.TrimSpace(argInfo.Name)
if argInfo.Name == "" {
fatal("ERROR: server name required. e.g. '-n=server_name'")
log.Fatal("ERROR: server name required. e.g. '-n=server_name'")
}
// Check host name arguments
@ -171,29 +161,29 @@ func main() {
// Read server info from file or grab it from user.
if err := os.MkdirAll(dirPath, 0744); err != nil {
fatalf("Unable to create path: %s", err)
log.Fatalf("Unable to create path: %s", err)
}
info := getInfo(dirPath)
// Create etcd key-value store
etcdStore = store.New()
store := store.New()
// Create a shared node registry.
registry := server.NewRegistry()
registry := server.NewRegistry(store)
// Create peer server.
ps := NewPeerServer(info.Name, dirPath, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS, registry)
ps := server.NewPeerServer(info.Name, dirPath, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS, registry, store)
ps.MaxClusterSize = maxClusterSize
ps.RetryTimes = retryTimes
s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r)
if err := e.AllowOrigins(cors); err != nil {
s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, ps.Server, registry, store)
if err := s.AllowOrigins(cors); err != nil {
panic(err)
}
ps.SetServer(server)
ps.SetServer(s)
ps.ListenAndServe(snapshot)
ps.ListenAndServe(snapshot, cluster)
s.ListenAndServe()
}

View File

@ -12,6 +12,7 @@ import (
"testing"
"time"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/test"
"github.com/coreos/go-etcd/etcd"
)
@ -398,8 +399,8 @@ func TestKillLeader(t *testing.T) {
totalTime += take
avgTime := totalTime / (time.Duration)(i+1)
fmt.Println("Leader election time is ", take, "with election timeout", ElectionTimeout)
fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout)
fmt.Println("Leader election time is ", take, "with election timeout", server.ElectionTimeout)
fmt.Println("Leader election time average is", avgTime, "with election timeout", server.ElectionTimeout)
etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
}
stop <- true
@ -456,7 +457,7 @@ func TestKillRandom(t *testing.T) {
etcds[num].Wait()
}
time.Sleep(ElectionTimeout)
time.Sleep(server.ElectionTimeout)
<-leaderChan

View File

@ -1,34 +0,0 @@
package main
// getMachines gets the current machines in the cluster
func (r *raftServer) getMachines(toURL func(string) (string, bool)) []string {
peers := r.Peers()
machines := make([]string, len(peers)+1)
leader, ok := toURL(r.Leader())
self, _ := toURL(r.Name())
i := 1
if ok {
machines[0] = leader
if leader != self {
machines[1] = self
i = 2
}
} else {
machines[0] = self
}
// Add all peers to the slice
for peerName, _ := range peers {
if machine, ok := toURL(peerName); ok {
// do not add leader twice
if machine != leader {
machines[i] = machine
i++
}
}
}
return machines
}

View File

@ -3,6 +3,6 @@
VER=$(git describe --tags HEAD)
cat <<EOF
package main
package server
const releaseVersion = "$VER"
EOF

View File

@ -5,7 +5,6 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
@ -37,7 +36,6 @@ func (c *JoinCommand) CommandName() string {
// Join a server to the cluster
func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*store.Store)
ps, _ := server.Context().(*PeerServer)
b := make([]byte, 8)

View File

@ -19,7 +19,7 @@ import (
type PeerServer struct {
*raft.Server
server Server
server *Server
joinIndex uint64
name string
url string
@ -148,7 +148,7 @@ func (s *PeerServer) RaftServer() *raft.Server {
}
// Associates the client server with the peer server.
func (s *PeerServer) SetServer(server Server) {
func (s *PeerServer) SetServer(server *Server) {
s.server = server
}
@ -239,15 +239,33 @@ func (s *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request
}
// Response to the join request
func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
func (s *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{}
if err := decodeJsonRequest(req, command); err == nil {
log.Debugf("Receive Join Request from %s", command.Name)
return s.dispatchRaftCommand(command, w, req)
} else {
// Write CORS header.
if s.server.OriginAllowed("*") {
w.Header().Add("Access-Control-Allow-Origin", "*")
} else if s.server.OriginAllowed(req.Header.Get("Origin")) {
w.Header().Add("Access-Control-Allow-Origin", req.Header.Get("Origin"))
}
err := decodeJsonRequest(req, command)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return nil
return
}
log.Debugf("Receive Join Request from %s", command.Name)
err = s.dispatchRaftCommand(command, w, req)
// Return status.
if err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
@ -326,7 +344,7 @@ func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) {
// internal commands
raftMux.HandleFunc("/name", s.NameHttpHandler)
raftMux.HandleFunc("/version", s.RaftVersionHttpHandler)
raftMux.Handle("/join", errorHandler(s.JoinHttpHandler))
raftMux.HandleFunc("/join", s.JoinHttpHandler)
raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
raftMux.HandleFunc("/vote", s.VoteHttpHandler)
raftMux.HandleFunc("/log", s.GetLogHttpHandler)
@ -421,7 +439,7 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s
if resp.StatusCode == http.StatusOK {
b, _ := ioutil.ReadAll(resp.Body)
server.joinIndex, _ = binary.Uvarint(b)
s.joinIndex, _ = binary.Uvarint(b)
return nil
}
if resp.StatusCode == http.StatusTemporaryRedirect {
@ -429,12 +447,12 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s
address := resp.Header.Get("Location")
log.Debugf("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(newJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
resp, req, err = t.Post(address, &b)
} else if resp.StatusCode == http.StatusBadRequest {
debug("Reach max number machines in the cluster")
log.Debug("Reach max number machines in the cluster")
decoder := json.NewDecoder(resp.Body)
err := &etcdErr.Error{}
decoder.Decode(err)
@ -477,15 +495,15 @@ func (s *PeerServer) monitorSnapshot() {
time.Sleep(s.snapConf.checkingInterval)
currentWrites := 0
if uint64(currentWrites) > s.snapConf.writesThr {
r.TakeSnapshot()
s.TakeSnapshot()
s.snapConf.lastWrites = 0
}
}
}
func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
if r.State() == raft.Leader {
if response, err := r.Do(c); err != nil {
if s.State() == raft.Leader {
if response, err := s.Do(c); err != nil {
return err
} else {
if response == nil {
@ -515,7 +533,7 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R
}
} else {
leader := r.Leader()
leader := s.Leader()
// current no leader
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
@ -528,35 +546,3 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R
}
}
type errorHandler func(http.ResponseWriter, *http.Request) error
// addCorsHeader parses the request Origin header and loops through the user
// provided allowed origins and sets the Access-Control-Allow-Origin header if
// there is a match.
func addCorsHeader(w http.ResponseWriter, r *http.Request) {
val, ok := corsList["*"]
if val && ok {
w.Header().Add("Access-Control-Allow-Origin", "*")
return
}
requestOrigin := r.Header.Get("Origin")
val, ok = corsList[requestOrigin]
if val && ok {
w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
return
}
}
func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
addCorsHeader(w, r)
if e := fn(w, r); e != nil {
if etcdErr, ok := e.(*etcdErr.Error); ok {
debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, e.Error(), http.StatusInternalServerError)
}
}
}

View File

@ -1,6 +1,9 @@
package server
import (
"fmt"
"net/url"
"path"
"sync"
"github.com/coreos/etcd/store"
@ -48,13 +51,13 @@ func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) erro
defer r.Unlock()
// Remove the key from the store.
_, err := s.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
return err
}
// Returns the number of nodes in the cluster.
func (r *Registry) Count() int {
e, err := s.Get(RegistryKey, false, false, 0, 0)
e, err := r.store.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return 0
}
@ -86,7 +89,7 @@ func (r *Registry) URLs() []string {
defer r.Unlock()
// Retrieve a list of all nodes.
e, err := s.Get(RegistryKey, false, false, 0, 0)
e, err := r.store.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return make([]string, 0)
}
@ -94,7 +97,9 @@ func (r *Registry) URLs() []string {
// Lookup the URL for each one.
urls := make([]string, 0)
for _, pair := range e.KVPairs {
urls = append(urls, r.url(pair.Key))
if url, ok := r.url(pair.Key); ok {
urls = append(urls, url)
}
}
return urls
@ -126,7 +131,7 @@ func (r *Registry) PeerURLs() []string {
defer r.Unlock()
// Retrieve a list of all nodes.
e, err := s.Get(RegistryKey, false, false, 0, 0)
e, err := r.store.Get(RegistryKey, false, false, 0, 0)
if err != nil {
return make([]string, 0)
}
@ -134,7 +139,9 @@ func (r *Registry) PeerURLs() []string {
// Lookup the URL for each one.
urls := make([]string, 0)
for _, pair := range e.KVPairs {
urls = append(urls, r.peerURL(pair.Key))
if url, ok := r.peerURL(pair.Key); ok {
urls = append(urls, url)
}
}
return urls
@ -147,7 +154,7 @@ func (r *Registry) load(name string) {
}
// Retrieve from store.
e, err := etcdStore.Get(path.Join(RegistryKey, name), false, false, 0, 0)
e, err := r.store.Get(path.Join(RegistryKey, name), false, false, 0, 0)
if err != nil {
return
}

View File

@ -2,8 +2,9 @@ package server
import (
"encoding/binary"
"os"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
)
@ -23,7 +24,6 @@ func (c *RemoveCommand) CommandName() string {
// Remove a server from the cluster
func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
s, _ := server.StateMachine().(*store.Store)
ps, _ := server.Context().(*PeerServer)
// Remove node from the shared registry.
@ -50,11 +50,11 @@ func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
// start. It is sure that this node received a new remove
// command and need to be removed
if server.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
debugf("server [%s] is removed", server.Name())
log.Debugf("server [%s] is removed", server.Name())
os.Exit(0)
} else {
// else ignore remove
debugf("ignore previous remove command.")
log.Debugf("ignore previous remove command.")
}
}

View File

@ -1,25 +1,26 @@
package server
import (
"fmt"
"encoding/json"
"net/http"
"net/url"
"strings"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/server/v1"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
// The Server provides an HTTP interface to the underlying store.
type Server interface {
CommitIndex() uint64
Term() uint64
URL() string
Dispatch(raft.Command, http.ResponseWriter, *http.Request)
}
// This is the default implementation of the Server interface.
type server struct {
type Server struct {
http.Server
raftServer *raft.Server
registry *Registry
store *store.Store
name string
url string
tlsConf *TLSConfig
@ -28,14 +29,15 @@ type server struct {
}
// Creates a new Server.
func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raft.Server) *Server {
s := &server{
func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raft.Server, registry *Registry, store *store.Store) *Server {
s := &Server{
Server: http.Server{
Handler: mux.NewRouter(),
TLSConfig: &tlsConf.Server,
Addr: listenHost,
},
name: name,
store: store,
registry: registry,
url: urlStr,
tlsConf: tlsConf,
tlsInfo: tlsInfo,
@ -49,72 +51,117 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
}
// The current Raft committed index.
func (s *server) CommitIndex() uint64 {
func (s *Server) CommitIndex() uint64 {
return s.raftServer.CommitIndex()
}
// The current Raft term.
func (s *server) Term() uint64 {
func (s *Server) Term() uint64 {
return s.raftServer.Term()
}
// The server URL.
func (s *server) URL() string {
func (s *Server) URL() string {
return s.url
}
func (s *server) installV1() {
s.handleFunc("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET")
s.handleFunc("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")
s.handleFunc("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE")
// Returns a reference to the Store.
func (s *Server) Store() *store.Store {
return s.store
}
s.handleFunc("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST")
func (s *Server) installV1() {
s.handleFuncV1("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET")
s.handleFuncV1("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")
s.handleFuncV1("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE")
s.handleFuncV1("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST")
}
// Adds a v1 server handler to the router.
func (s *Server) handleFuncV1(path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route {
return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request, s *Server) error {
return f(w, req, s)
})
}
// Adds a server handler to the router.
func (s *server) handleFunc(path string, f func(http.ResponseWriter, *http.Request, Server) error) *mux.Route {
func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request, *Server) error) *mux.Route {
r := s.Handler.(*mux.Router)
// Wrap the standard HandleFunc interface to pass in the server reference.
return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
// Log request.
debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr)
log.Debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr)
// Write CORS header.
if s.OriginAllowed("*") {
w.Header().Add("Access-Control-Allow-Origin", "*")
} else if s.OriginAllowed(r.Header.Get("Origin")) {
w.Header().Add("Access-Control-Allow-Origin", r.Header.Get("Origin"))
} else if origin := req.Header.Get("Origin"); s.OriginAllowed(origin) {
w.Header().Add("Access-Control-Allow-Origin", origin)
}
// Execute handler function and return error if necessary.
if err := f(w, req, s); err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
debug("Return error: ", (*etcdErr).Error())
log.Debug("Return error: ", (*etcdErr).Error())
etcdErr.Write(w)
} else {
http.Error(w, e.Error(), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
})
}
// Start to listen and response etcd client command
func (s *server) ListenAndServe() {
infof("etcd server [name %s, listen on %s, advertised url %s]", s.name, s.Server.Addr, s.url)
func (s *Server) ListenAndServe() {
log.Infof("etcd server [name %s, listen on %s, advertised url %s]", s.name, s.Server.Addr, s.url)
if s.tlsConf.Scheme == "http" {
fatal(s.Server.ListenAndServe())
log.Fatal(s.Server.ListenAndServe())
} else {
fatal(s.Server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
log.Fatal(s.Server.ListenAndServeTLS(s.tlsInfo.CertFile, s.tlsInfo.KeyFile))
}
}
func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
if s.raftServer.State() == raft.Leader {
event, err := s.raftServer.Do(c)
if err != nil {
return err
}
if event == nil {
return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
}
response := event.(*store.Event).Response()
b, _ := json.Marshal(response)
w.WriteHeader(http.StatusOK)
w.Write(b)
return nil
} else {
leader := s.raftServer.Leader()
// No leader available.
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
url, _ := s.registry.PeerURL(leader)
redirect(url, w, req)
return nil
}
}
// Sets a comma-delimited list of origins that are allowed.
func (s *server) AllowOrigins(origins string) error {
func (s *Server) AllowOrigins(origins string) error {
// Construct a lookup of all origins.
m := make(map[string]bool)
for _, v := range strings.Split(cors, ",") {
for _, v := range strings.Split(origins, ",") {
if v != "*" {
if _, err := url.Parse(v); err != nil {
return fmt.Errorf("Invalid CORS origin: %s", err)
@ -128,6 +175,6 @@ func (s *server) AllowOrigins(origins string) error {
}
// Determines whether the server will allow a given CORS origin.
func (s *server) OriginAllowed(origin string) {
func (s *Server) OriginAllowed(origin string) bool {
return s.corsOrigins["*"] || s.corsOrigins[origin]
}

View File

@ -2,6 +2,7 @@ package server
import (
"sync"
"time"
)
const (

View File

@ -1,5 +1,9 @@
package server
import (
"time"
)
const (
// The amount of time to elapse without a heartbeat before becoming a candidate.
ElectionTimeout = 200 * time.Millisecond

View File

@ -10,6 +10,7 @@ import (
"net/http"
"time"
"github.com/coreos/etcd/log"
"github.com/coreos/go-raft"
)
@ -29,13 +30,13 @@ var tranTimeout = ElectionTimeout
type transporter struct {
client *http.Client
transport *http.Transport
raftServer *raftServer
peerServer *PeerServer
}
// Create transporter using by raft server
// Create http or https transporter based on
// whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *transporter {
func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
t := transporter{}
tr := &http.Transport{
@ -50,7 +51,7 @@ func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *
t.client = &http.Client{Transport: tr}
t.transport = tr
t.raftServer = raftServer
t.peerServer = peerServer
return &t
}
@ -69,18 +70,18 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
size := b.Len()
t.raftServer.serverStats.SendAppendReq(size)
t.peerServer.serverStats.SendAppendReq(size)
u, _ := nameToRaftURL(peer.Name)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
debugf("Send LogEntries to %s ", u)
log.Debugf("Send LogEntries to %s ", u)
thisFollowerStats, ok := t.raftServer.followersStats.Followers[peer.Name]
thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name]
if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63
t.raftServer.followersStats.Followers[peer.Name] = thisFollowerStats
t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats
}
start := time.Now()
@ -90,7 +91,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
end := time.Now()
if err != nil {
debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
if ok {
thisFollowerStats.Fail()
}
@ -121,13 +122,13 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name)
debugf("Send Vote to %s", u)
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Vote to %s", u)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
if err != nil {
debugf("Cannot send VoteRequest to %s : %s", u, err)
log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
}
if resp != nil {
@ -150,14 +151,14 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name)
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
if err != nil {
debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
}
if resp != nil {
@ -181,14 +182,14 @@ func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raf
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
u, _ := nameToRaftURL(peer.Name)
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
u, _ := t.peerServer.registry.PeerURL(peer.Name)
log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
req.LastTerm, req.LastIndex)
resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
if err != nil {
debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
}
if resp != nil {

View File

@ -1,8 +1,11 @@
package server
import (
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/coreos/etcd/log"
)
@ -18,7 +21,7 @@ func decodeJsonRequest(req *http.Request, data interface{}) error {
func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
url := hostname + path
debugf("Redirect to %s", url)
log.Debugf("Redirect to %s", url)
http.Redirect(w, req, url, http.StatusTemporaryRedirect)
}

View File

@ -1,15 +1,15 @@
package v1
import (
"encoding/json"
"github.com/coreos/etcd/store"
"net/http"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
)
// Removes a key from the store.
func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
command := &DeleteCommand{Key: key}
return s.Dispatch(command, w, req)
c := &store.DeleteCommand{Key: key}
return s.Dispatch(c, w, req)
}

View File

@ -1,42 +0,0 @@
package v1
// Dispatch the command to leader.
func dispatchCommand(c Command, w http.ResponseWriter, req *http.Request, s *server.Server) error {
return dispatch(c, w, req, s, nameToEtcdURL)
}
// Dispatches a command to a given URL.
func dispatch(c Command, w http.ResponseWriter, req *http.Request, s *server.Server, toURL func(name string) (string, bool)) error {
r := s.raftServer
if r.State() == raft.Leader {
event, err := r.Do(c)
if err != nil {
return err
}
if event == nil {
return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
}
event, _ := event.(*store.Event)
response := eventToResponse(event)
b, _ := json.Marshal(response)
w.WriteHeader(http.StatusOK)
w.Write(b)
return nil
} else {
leader := r.Leader()
// No leader available.
if leader == "" {
return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
}
url, _ := toURL(leader)
redirect(url, w, req)
return nil
}
}

View File

@ -2,12 +2,13 @@ package v1
import (
"encoding/json"
"github.com/coreos/etcd/store"
"net/http"
"github.com/gorilla/mux"
)
// Retrieves the value for a given key.
func GetKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) error {
func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
vars := mux.Vars(req)
key := "/" + vars["key"]
@ -18,9 +19,7 @@ func GetKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) erro
}
// Convert event to a response and write to client.
event, _ := event.(*store.Event)
response := eventToResponse(event)
b, _ := json.Marshal(response)
b, _ := json.Marshal(event.Response())
w.WriteHeader(http.StatusOK)
w.Write(b)

View File

@ -1,9 +1,12 @@
package v1
import (
"encoding/json"
"github.com/coreos/etcd/store"
"net/http"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/gorilla/mux"
)
// Sets the value for a given key.
@ -16,19 +19,19 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
// Parse non-blank value.
value := req.Form.Get("value")
if len(value) == 0 {
return error.NewError(200, "Set", store.UndefIndex, store.UndefTerm)
return etcdErr.NewError(200, "Set", store.UndefIndex, store.UndefTerm)
}
// Convert time-to-live to an expiration time.
expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(202, "Set", store.UndefIndex, store.UndefTerm)
}
// If the "prevValue" is specified then test-and-set. Otherwise create a new key.
var c command.Command
var c raft.Command
if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
c = &TestAndSetCommand{
c = &store.TestAndSetCommand{
Key: key,
Value: value,
PrevValue: prevValueArr[0],
@ -36,7 +39,7 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
}
} else {
c = &CreateCommand{
c = &store.CreateCommand{
Key: key,
Value: value,
ExpireTime: expireTime,
@ -44,5 +47,5 @@ func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
}
}
return s.Dispatch(command, w, req)
return s.Dispatch(c, w, req)
}

View File

@ -1,50 +1,15 @@
package v1
import (
"github.com/coreos/etcd/server"
"github.com/gorilla/mux"
"net/http"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
)
// The Server interface provides all the methods required for the v1 API.
type Server interface {
CommitIndex() uint64
Term() uint64
Dispatch(http.ResponseWriter, *http.Request, Command)
}
// Converts an event object into a response object.
func eventToResponse(event *store.Event) interface{} {
if !event.Dir {
response := &store.Response{
Action: event.Action,
Key: event.Key,
Value: event.Value,
PrevValue: event.PrevValue,
Index: event.Index,
TTL: event.TTL,
Expiration: event.Expiration,
}
if response.Action == store.Create || response.Action == store.Update {
response.Action = "set"
if response.PrevValue == "" {
response.NewKey = true
}
}
return response
} else {
responses := make([]*store.Response, len(event.KVPairs))
for i, kv := range event.KVPairs {
responses[i] = &store.Response{
Action: event.Action,
Key: kv.Key,
Value: kv.Value,
Dir: kv.Dir,
Index: event.Index,
}
}
return responses
}
Store() *store.Store
Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
}

View File

@ -2,8 +2,12 @@ package v1
import (
"encoding/json"
"github.com/coreos/etcd/store"
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/gorilla/mux"
)
// Watches a given key prefix for changes.
@ -13,7 +17,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
key := "/" + vars["key"]
// Create a command to watch from a given index (default 0).
sinceIndex := 0
var sinceIndex uint64 = 0
if req.Method == "POST" {
sinceIndex, err = strconv.ParseUint(string(req.FormValue("index")), 10, 64)
if err != nil {
@ -28,9 +32,7 @@ func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
}
event := <-c
event, _ := event.(*store.Event)
response := eventToResponse(event)
b, _ := json.Marshal(response)
b, _ := json.Marshal(event.Response())
w.WriteHeader(http.StatusOK)
w.Write(b)

View File

@ -95,7 +95,7 @@ func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request)
value := req.FormValue("value")
expireTime, err := durationToExpireTime(req.FormValue("ttl"))
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
@ -124,7 +124,7 @@ func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request)
value := req.Form.Get("value")
expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
@ -344,6 +344,16 @@ func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) er
}
func getNodePath(urlPath string) string {
pathPrefixLen := len("/" + version + "/keys")
return urlPath[pathPrefixLen:]
}
//--------------------------------------
// Testing
//--------------------------------------
// TestHandler
func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
testType := req.URL.Path[len("/test/"):]
@ -358,3 +368,25 @@ func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusBadRequest)
}
func directSet() {
c := make(chan bool, 1000)
for i := 0; i < 1000; i++ {
go send(c)
}
for i := 0; i < 1000; i++ {
<-c
}
}
func send(c chan bool) {
for i := 0; i < 10; i++ {
command := &UpdateCommand{}
command.Key = "foo"
command.Value = "bar"
command.ExpireTime = time.Unix(0, 0)
//r.Do(command)
}
c <- true
}

View File

@ -1,12 +1,7 @@
package store
import (
"fmt"
"strings"
"sync"
"time"
etcdErr "github.com/coreos/etcd/error"
)
const (
@ -46,129 +41,41 @@ func newEvent(action string, key string, index uint64, term uint64) *Event {
}
}
type eventQueue struct {
Events []*Event
Size int
Front int
Capacity int
}
// Converts an event object into a response object.
func (event *Event) Response() interface{} {
if !event.Dir {
response := &Response{
Action: event.Action,
Key: event.Key,
Value: event.Value,
PrevValue: event.PrevValue,
Index: event.Index,
TTL: event.TTL,
Expiration: event.Expiration,
}
func (eq *eventQueue) back() int {
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
}
if response.Action == Create || response.Action == Update {
response.Action = "set"
if response.PrevValue == "" {
response.NewKey = true
}
}
func (eq *eventQueue) insert(e *Event) {
index := (eq.back() + 1) % eq.Capacity
eq.Events[index] = e
if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity
return response
} else {
eq.Size++
}
responses := make([]*Response, len(event.KVPairs))
}
type EventHistory struct {
Queue eventQueue
StartIndex uint64
LastIndex uint64
LastTerm uint64
DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue
rwl sync.RWMutex
}
func newEventHistory(capacity int) *EventHistory {
return &EventHistory{
Queue: eventQueue{
Capacity: capacity,
Events: make([]*Event, capacity),
},
}
}
// addEvent function adds event into the eventHistory
func (eh *EventHistory) addEvent(e *Event) *Event {
eh.rwl.Lock()
defer eh.rwl.Unlock()
var duped uint64
if e.Index == UndefIndex {
e.Index = eh.LastIndex
e.Term = eh.LastTerm
duped = 1
}
eh.Queue.insert(e)
eh.LastIndex = e.Index
eh.LastTerm = e.Term
eh.DupCnt += duped
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
return e
}
// scan function is enumerating events from the index in history and
// stops till the first point where the key has identified prefix
func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
eh.rwl.RLock()
defer eh.rwl.RUnlock()
start := index - eh.StartIndex
// the index should locate after the event history's StartIndex
if start < 0 {
return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), UndefIndex, UndefTerm)
}
// the index should locate before the size of the queue minus the duplicate count
if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
return nil, nil
}
i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
for {
e := eh.Queue.Events[i]
if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
return e, nil
}
i = (i + 1) % eh.Queue.Capacity
if i == eh.Queue.back() { // find nothing, return and watch from current index
return nil, nil
for i, kv := range event.KVPairs {
responses[i] = &Response{
Action: event.Action,
Key: kv.Key,
Value: kv.Value,
Dir: kv.Dir,
Index: event.Index,
}
}
return responses
}
}
// clone will be protected by a stop-world lock
// do not need to obtain internal lock
func (eh *EventHistory) clone() *EventHistory {
clonedQueue := eventQueue{
Capacity: eh.Queue.Capacity,
Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size,
Front: eh.Queue.Front,
}
for i, e := range eh.Queue.Events {
clonedQueue.Events[i] = e
}
return &EventHistory{
StartIndex: eh.StartIndex,
Queue: clonedQueue,
LastIndex: eh.LastIndex,
LastTerm: eh.LastTerm,
DupCnt: eh.DupCnt,
}
}

112
store/event_history.go Normal file
View File

@ -0,0 +1,112 @@
package store
import (
"fmt"
"strings"
"sync"
etcdErr "github.com/coreos/etcd/error"
)
type EventHistory struct {
Queue eventQueue
StartIndex uint64
LastIndex uint64
LastTerm uint64
DupCnt uint64 // help to compute the watching point with duplicated indexes in the queue
rwl sync.RWMutex
}
func newEventHistory(capacity int) *EventHistory {
return &EventHistory{
Queue: eventQueue{
Capacity: capacity,
Events: make([]*Event, capacity),
},
}
}
// addEvent function adds event into the eventHistory
func (eh *EventHistory) addEvent(e *Event) *Event {
eh.rwl.Lock()
defer eh.rwl.Unlock()
var duped uint64
if e.Index == UndefIndex {
e.Index = eh.LastIndex
e.Term = eh.LastTerm
duped = 1
}
eh.Queue.insert(e)
eh.LastIndex = e.Index
eh.LastTerm = e.Term
eh.DupCnt += duped
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
return e
}
// scan function is enumerating events from the index in history and
// stops till the first point where the key has identified prefix
func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Error) {
eh.rwl.RLock()
defer eh.rwl.RUnlock()
start := index - eh.StartIndex
// the index should locate after the event history's StartIndex
if start < 0 {
return nil,
etcdErr.NewError(etcdErr.EcodeEventIndexCleared,
fmt.Sprintf("the requested history has been cleared [%v/%v]",
eh.StartIndex, index), UndefIndex, UndefTerm)
}
// the index should locate before the size of the queue minus the duplicate count
if start >= (uint64(eh.Queue.Size) - eh.DupCnt) { // future index
return nil, nil
}
i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity))
for {
e := eh.Queue.Events[i]
if strings.HasPrefix(e.Key, prefix) && index <= e.Index { // make sure we bypass the smaller one
return e, nil
}
i = (i + 1) % eh.Queue.Capacity
if i == eh.Queue.back() { // find nothing, return and watch from current index
return nil, nil
}
}
}
// clone will be protected by a stop-world lock
// do not need to obtain internal lock
func (eh *EventHistory) clone() *EventHistory {
clonedQueue := eventQueue{
Capacity: eh.Queue.Capacity,
Events: make([]*Event, eh.Queue.Capacity),
Size: eh.Queue.Size,
Front: eh.Queue.Front,
}
for i, e := range eh.Queue.Events {
clonedQueue.Events[i] = e
}
return &EventHistory{
StartIndex: eh.StartIndex,
Queue: clonedQueue,
LastIndex: eh.LastIndex,
LastTerm: eh.LastTerm,
DupCnt: eh.DupCnt,
}
}

26
store/event_queue.go Normal file
View File

@ -0,0 +1,26 @@
package store
type eventQueue struct {
Events []*Event
Size int
Front int
Capacity int
}
func (eq *eventQueue) back() int {
return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity
}
func (eq *eventQueue) insert(e *Event) {
index := (eq.back() + 1) % eq.Capacity
eq.Events[index] = e
if eq.Size == eq.Capacity { //dequeue
eq.Front = (index + 1) % eq.Capacity
} else {
eq.Size++
}
}

21
store/ttl.go Normal file
View File

@ -0,0 +1,21 @@
package store
import (
"strconv"
"time"
)
// Convert string duration to time format
func TTL(duration string) (time.Time, error) {
if duration != "" {
duration, err := strconv.Atoi(duration)
if err != nil {
return Permanent, err
}
return time.Now().Add(time.Second * (time.Duration)(duration)), nil
} else {
return Permanent, nil
}
}

72
util.go
View File

@ -1,42 +1,15 @@
package main
import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"runtime/pprof"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
"github.com/coreos/go-raft"
"github.com/coreos/etcd/log"
)
//--------------------------------------
// etcd http Helper
//--------------------------------------
// Convert string duration to time format
func durationToExpireTime(strDuration string) (time.Time, error) {
if strDuration != "" {
duration, err := strconv.Atoi(strDuration)
if err != nil {
return store.Permanent, err
}
return time.Now().Add(time.Second * (time.Duration)(duration)), nil
} else {
return store.Permanent, nil
}
}
//--------------------------------------
// HTTP Utilities
//--------------------------------------
@ -52,13 +25,13 @@ func sanitizeURL(host string, defaultScheme string) string {
p, err := url.Parse(host)
if err != nil {
fatal(err)
log.Fatal(err)
}
// Make sure the host is in Host:Port format
_, _, err = net.SplitHostPort(host)
if err != nil {
fatal(err)
log.Fatal(err)
}
p = &url.URL{Host: host, Scheme: defaultScheme}
@ -71,12 +44,12 @@ func sanitizeURL(host string, defaultScheme string) string {
func sanitizeListenHost(listen string, advertised string) string {
aurl, err := url.Parse(advertised)
if err != nil {
fatal(err)
log.Fatal(err)
}
ahost, aport, err := net.SplitHostPort(aurl.Host)
if err != nil {
fatal(err)
log.Fatal(err)
}
// If the listen host isn't set use the advertised host
@ -89,15 +62,10 @@ func sanitizeListenHost(listen string, advertised string) string {
func check(err error) {
if err != nil {
fatal(err)
log.Fatal(err)
}
}
func getNodePath(urlPath string) string {
pathPrefixLen := len("/" + version + "/keys")
return urlPath[pathPrefixLen:]
}
//--------------------------------------
// CPU profile
//--------------------------------------
@ -105,7 +73,7 @@ func runCPUProfile() {
f, err := os.Create(cpuprofile)
if err != nil {
fatal(err)
log.Fatal(err)
}
pprof.StartCPUProfile(f)
@ -113,34 +81,10 @@ func runCPUProfile() {
signal.Notify(c, os.Interrupt)
go func() {
for sig := range c {
infof("captured %v, stopping profiler and exiting..", sig)
log.Infof("captured %v, stopping profiler and exiting..", sig)
pprof.StopCPUProfile()
os.Exit(1)
}
}()
}
//--------------------------------------
// Testing
//--------------------------------------
func directSet() {
c := make(chan bool, 1000)
for i := 0; i < 1000; i++ {
go send(c)
}
for i := 0; i < 1000; i++ {
<-c
}
}
func send(c chan bool) {
for i := 0; i < 10; i++ {
command := &UpdateCommand{}
command.Key = "foo"
command.Value = "bar"
command.ExpireTime = time.Unix(0, 0)
//r.Do(command)
}
c <- true
}