mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
076bd9903e
11
command.go
11
command.go
@ -110,10 +110,9 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
|
||||
|
||||
// JoinCommand
|
||||
type JoinCommand struct {
|
||||
Name string `json:"name"`
|
||||
Hostname string `json:"hostName"`
|
||||
RaftPort int `json:"raftPort"`
|
||||
ClientPort int `json:"clientPort"`
|
||||
Name string `json:"name"`
|
||||
RaftURL string `json:"raftURL"`
|
||||
EtcdURL string `json:"etcdURL"`
|
||||
}
|
||||
|
||||
// The name of the join command in the log
|
||||
@ -137,12 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
|
||||
return []byte("join fail"), fmt.Errorf(errors[103])
|
||||
}
|
||||
|
||||
addNameToURL(c.Name, c.RaftURL, c.EtcdURL)
|
||||
|
||||
// add peer in raft
|
||||
err := raftServer.AddPeer(c.Name)
|
||||
|
||||
// add machine in etcd storage
|
||||
key := path.Join("_etcd/machines", c.Name)
|
||||
value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
|
||||
value := fmt.Sprintf("raft=%s&etcd=%s", c.RaftURL, c.EtcdURL)
|
||||
etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
|
||||
|
||||
return []byte("join success"), err
|
||||
|
237
etcd.go
237
etcd.go
@ -59,10 +59,10 @@ func init() {
|
||||
flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
|
||||
flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
|
||||
|
||||
flag.StringVar(&argInfo.Hostname, "h", "0.0.0.0", "the hostname of the local machine")
|
||||
flag.IntVar(&argInfo.ClientPort, "c", 4001, "the port to communicate with clients")
|
||||
flag.IntVar(&argInfo.RaftPort, "s", 7001, "the port to communicate with servers")
|
||||
flag.IntVar(&argInfo.WebPort, "w", -1, "the port of web interface (-1 means do not start web interface)")
|
||||
flag.StringVar(&argInfo.Name, "n", "", "the node name (required)")
|
||||
flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the port to communicate with clients")
|
||||
flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers")
|
||||
flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface")
|
||||
|
||||
flag.StringVar(&argInfo.ServerCAFile, "serverCAFile", "", "the path of the CAFile")
|
||||
flag.StringVar(&argInfo.ServerCertFile, "serverCert", "", "the cert file of the server")
|
||||
@ -89,14 +89,8 @@ func init() {
|
||||
|
||||
// CONSTANTS
|
||||
const (
|
||||
HTTP = iota
|
||||
HTTPS
|
||||
HTTPSANDVERIFY
|
||||
)
|
||||
|
||||
const (
|
||||
SERVER = iota
|
||||
CLIENT
|
||||
RaftServer = iota
|
||||
EtcdServer
|
||||
)
|
||||
|
||||
const (
|
||||
@ -117,10 +111,11 @@ const (
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type Info struct {
|
||||
Hostname string `json:"hostname"`
|
||||
RaftPort int `json:"raftPort"`
|
||||
ClientPort int `json:"clientPort"`
|
||||
WebPort int `json:"webPort"`
|
||||
Name string `json:"name"`
|
||||
|
||||
RaftURL string `json:"raftURL"`
|
||||
EtcdURL string `json:"etcdURL"`
|
||||
WebURL string `json:"webURL"`
|
||||
|
||||
ServerCertFile string `json:"serverCertFile"`
|
||||
ServerKeyFile string `json:"serverKeyFile"`
|
||||
@ -148,6 +143,21 @@ var info *Info
|
||||
//
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Check a URL and clean it up if the user forgot the schema
|
||||
func checkURL(u string, defaultSchema string) string {
|
||||
p, err := url.Parse(u)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if len(p.Host) == 0 && len(defaultSchema) != 0 {
|
||||
return checkURL(fmt.Sprintf("%s://%s", defaultSchema, u), "")
|
||||
}
|
||||
|
||||
return p.String()
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
// Main
|
||||
//--------------------------------------
|
||||
@ -190,6 +200,16 @@ func main() {
|
||||
cluster = strings.Split(string(b), ",")
|
||||
}
|
||||
|
||||
// Otherwise ask user for info and write it to file.
|
||||
argInfo.Name = strings.TrimSpace(argInfo.Name)
|
||||
|
||||
if argInfo.Name == "" {
|
||||
fatal("Please give the name of the server")
|
||||
}
|
||||
|
||||
argInfo.RaftURL = checkURL(argInfo.RaftURL, "http")
|
||||
argInfo.EtcdURL = checkURL(argInfo.EtcdURL, "http")
|
||||
|
||||
// Setup commands.
|
||||
registerCommands()
|
||||
|
||||
@ -200,39 +220,40 @@ func main() {
|
||||
|
||||
info = getInfo(dirPath)
|
||||
|
||||
// security type
|
||||
st := securityType(SERVER)
|
||||
raftTlsConfs, ok := tlsConf(RaftServer)
|
||||
if !ok {
|
||||
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
||||
}
|
||||
|
||||
clientSt := securityType(CLIENT)
|
||||
|
||||
if st == -1 || clientSt == -1 {
|
||||
etcdTlsConfs, ok := tlsConf(EtcdServer)
|
||||
if !ok {
|
||||
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
||||
}
|
||||
|
||||
// Create etcd key-value store
|
||||
etcdStore = store.CreateStore(maxSize)
|
||||
|
||||
startRaft(st)
|
||||
startRaft(raftTlsConfs)
|
||||
|
||||
if argInfo.WebPort != -1 {
|
||||
if argInfo.WebURL != "" {
|
||||
// start web
|
||||
etcdStore.SetMessager(storeMsg)
|
||||
go webHelper()
|
||||
go web.Start(raftServer, argInfo.WebPort)
|
||||
go web.Start(raftServer, argInfo.WebURL)
|
||||
}
|
||||
|
||||
startClientTransport(*info, clientSt)
|
||||
startEtcdTransport(*info, etcdTlsConfs[0])
|
||||
|
||||
}
|
||||
|
||||
// Start the raft server
|
||||
func startRaft(securityType int) {
|
||||
func startRaft(tlsConfs []*tls.Config) {
|
||||
var err error
|
||||
|
||||
raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort)
|
||||
raftName := info.Name
|
||||
|
||||
// Create transporter for raft
|
||||
raftTransporter = createTransporter(securityType)
|
||||
raftTransporter = newTransporter(tlsConfs[1])
|
||||
|
||||
// Create raft server
|
||||
raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
|
||||
@ -267,10 +288,9 @@ func startRaft(securityType int) {
|
||||
// leader need to join self as a peer
|
||||
for {
|
||||
command := &JoinCommand{
|
||||
Name: raftServer.Name(),
|
||||
Hostname: argInfo.Hostname,
|
||||
RaftPort: argInfo.RaftPort,
|
||||
ClientPort: argInfo.ClientPort,
|
||||
Name: raftServer.Name(),
|
||||
RaftURL: argInfo.RaftURL,
|
||||
EtcdURL: argInfo.EtcdURL,
|
||||
}
|
||||
_, err := raftServer.Do(command)
|
||||
if err == nil {
|
||||
@ -328,44 +348,30 @@ func startRaft(securityType int) {
|
||||
}
|
||||
|
||||
// start to response to raft requests
|
||||
go startRaftTransport(*info, securityType)
|
||||
go startRaftTransport(*info, tlsConfs[0])
|
||||
|
||||
}
|
||||
|
||||
// Create transporter using by raft server
|
||||
// Create http or https transporter based on
|
||||
// whether the user give the server cert and key
|
||||
func createTransporter(st int) transporter {
|
||||
func newTransporter(tlsConf *tls.Config) transporter {
|
||||
t := transporter{}
|
||||
|
||||
switch st {
|
||||
case HTTP:
|
||||
if tlsConf == nil {
|
||||
t.scheme = "http://"
|
||||
|
||||
tr := &http.Transport{
|
||||
Dial: dialTimeout,
|
||||
}
|
||||
|
||||
t.client = &http.Client{
|
||||
Transport: tr,
|
||||
Transport: &http.Transport{
|
||||
Dial: dialTimeout,
|
||||
},
|
||||
}
|
||||
|
||||
case HTTPS:
|
||||
fallthrough
|
||||
case HTTPSANDVERIFY:
|
||||
} else {
|
||||
t.scheme = "https://"
|
||||
|
||||
tlsCert, err := tls.LoadX509KeyPair(argInfo.ServerCertFile, argInfo.ServerKeyFile)
|
||||
|
||||
if err != nil {
|
||||
fatal(err)
|
||||
}
|
||||
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{tlsCert},
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
TLSClientConfig: tlsConf,
|
||||
Dial: dialTimeout,
|
||||
DisableCompression: true,
|
||||
}
|
||||
@ -382,44 +388,35 @@ func dialTimeout(network, addr string) (net.Conn, error) {
|
||||
}
|
||||
|
||||
// Start to listen and response raft command
|
||||
func startRaftTransport(info Info, st int) {
|
||||
func startRaftTransport(info Info, tlsConf *tls.Config) {
|
||||
|
||||
// internal commands
|
||||
http.HandleFunc("/name", NameHttpHandler)
|
||||
http.HandleFunc("/join", JoinHttpHandler)
|
||||
http.HandleFunc("/vote", VoteHttpHandler)
|
||||
http.HandleFunc("/log", GetLogHttpHandler)
|
||||
http.HandleFunc("/log/append", AppendEntriesHttpHandler)
|
||||
http.HandleFunc("/snapshot", SnapshotHttpHandler)
|
||||
http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
|
||||
http.HandleFunc("/client", ClientHttpHandler)
|
||||
http.HandleFunc("/etcdURL", EtcdURLHttpHandler)
|
||||
|
||||
switch st {
|
||||
|
||||
case HTTP:
|
||||
fmt.Printf("raft server [%s] listen on http port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.RaftPort), nil))
|
||||
|
||||
case HTTPS:
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", info.RaftPort), info.ServerCertFile, argInfo.ServerKeyFile, nil))
|
||||
|
||||
case HTTPSANDVERIFY:
|
||||
u, _ := url.Parse(info.RaftURL)
|
||||
fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
|
||||
|
||||
if tlsConf == nil {
|
||||
http.ListenAndServe(u.Host, nil)
|
||||
} else {
|
||||
server := &http.Server{
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
ClientCAs: createCertPool(info.ServerCAFile),
|
||||
},
|
||||
Addr: fmt.Sprintf(":%d", info.RaftPort),
|
||||
TLSConfig: tlsConf,
|
||||
Addr: u.Host,
|
||||
}
|
||||
fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort)
|
||||
fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Start to listen and response client command
|
||||
func startClientTransport(info Info, st int) {
|
||||
func startEtcdTransport(info Info, tlsConf *tls.Config) {
|
||||
// external commands
|
||||
http.HandleFunc("/"+version+"/keys/", Multiplexer)
|
||||
http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
|
||||
@ -429,26 +426,16 @@ func startClientTransport(info Info, st int) {
|
||||
http.HandleFunc("/stats", StatsHttpHandler)
|
||||
http.HandleFunc("/test/", TestHttpHandler)
|
||||
|
||||
switch st {
|
||||
|
||||
case HTTP:
|
||||
fmt.Printf("etcd [%s] listen on http port %v\n", info.Hostname, info.ClientPort)
|
||||
fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.ClientPort), nil))
|
||||
|
||||
case HTTPS:
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort)
|
||||
http.ListenAndServeTLS(fmt.Sprintf(":%d", info.ClientPort), info.ClientCertFile, info.ClientKeyFile, nil)
|
||||
|
||||
case HTTPSANDVERIFY:
|
||||
u, _ := url.Parse(info.EtcdURL)
|
||||
fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
|
||||
|
||||
if tlsConf == nil {
|
||||
fatal(http.ListenAndServe(u.Host, nil))
|
||||
} else {
|
||||
server := &http.Server{
|
||||
TLSConfig: &tls.Config{
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
ClientCAs: createCertPool(info.ClientCAFile),
|
||||
},
|
||||
Addr: fmt.Sprintf(":%d", info.ClientPort),
|
||||
TLSConfig: tlsConf,
|
||||
Addr: u.Host,
|
||||
}
|
||||
fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort)
|
||||
fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile))
|
||||
}
|
||||
}
|
||||
@ -456,20 +443,28 @@ func startClientTransport(info Info, st int) {
|
||||
//--------------------------------------
|
||||
// Config
|
||||
//--------------------------------------
|
||||
|
||||
// Get the security type
|
||||
func securityType(source int) int {
|
||||
|
||||
func tlsConf(source int) ([]*tls.Config, bool) {
|
||||
var keyFile, certFile, CAFile string
|
||||
var tlsCert tls.Certificate
|
||||
var isAuth bool
|
||||
var err error
|
||||
|
||||
switch source {
|
||||
|
||||
case SERVER:
|
||||
case RaftServer:
|
||||
keyFile = info.ServerKeyFile
|
||||
certFile = info.ServerCertFile
|
||||
CAFile = info.ServerCAFile
|
||||
|
||||
case CLIENT:
|
||||
if keyFile != "" && certFile != "" {
|
||||
tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
|
||||
if err == nil {
|
||||
fatal(err)
|
||||
}
|
||||
isAuth = true
|
||||
}
|
||||
|
||||
case EtcdServer:
|
||||
keyFile = info.ClientKeyFile
|
||||
certFile = info.ClientCertFile
|
||||
CAFile = info.ClientCAFile
|
||||
@ -478,25 +473,28 @@ func securityType(source int) int {
|
||||
// If the user do not specify key file, cert file and
|
||||
// CA file, the type will be HTTP
|
||||
if keyFile == "" && certFile == "" && CAFile == "" {
|
||||
|
||||
return HTTP
|
||||
|
||||
return []*tls.Config{nil, nil}, true
|
||||
}
|
||||
|
||||
if keyFile != "" && certFile != "" {
|
||||
if CAFile != "" {
|
||||
// If the user specify all the three file, the type
|
||||
// will be HTTPS with client cert auth
|
||||
return HTTPSANDVERIFY
|
||||
serverConf := &tls.Config{}
|
||||
serverConf.ClientAuth, serverConf.ClientCAs = newCertPool(CAFile)
|
||||
|
||||
if isAuth {
|
||||
raftTransConf := &tls.Config{
|
||||
Certificates: []tls.Certificate{tlsCert},
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
return []*tls.Config{serverConf, raftTransConf}, true
|
||||
}
|
||||
// If the user specify key file and cert file but not
|
||||
// CA file, the type will be HTTPS without client cert
|
||||
// auth
|
||||
return HTTPS
|
||||
|
||||
return []*tls.Config{serverConf, nil}, true
|
||||
|
||||
}
|
||||
|
||||
// bad specification
|
||||
return -1
|
||||
return nil, false
|
||||
|
||||
}
|
||||
|
||||
func parseInfo(path string) *Info {
|
||||
@ -547,13 +545,6 @@ func getInfo(path string) *Info {
|
||||
return info
|
||||
}
|
||||
|
||||
// Otherwise ask user for info and write it to file.
|
||||
argInfo.Hostname = strings.TrimSpace(argInfo.Hostname)
|
||||
|
||||
if argInfo.Hostname == "" {
|
||||
fatal("Please give the address of the local machine")
|
||||
}
|
||||
|
||||
info = &argInfo
|
||||
|
||||
// Write to file.
|
||||
@ -569,7 +560,10 @@ func getInfo(path string) *Info {
|
||||
}
|
||||
|
||||
// Create client auth certpool
|
||||
func createCertPool(CAFile string) *x509.CertPool {
|
||||
func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
|
||||
if CAFile == "" {
|
||||
return tls.NoClientCert, nil
|
||||
}
|
||||
pemByte, _ := ioutil.ReadFile(CAFile)
|
||||
|
||||
block, pemByte := pem.Decode(pemByte)
|
||||
@ -584,7 +578,7 @@ func createCertPool(CAFile string) *x509.CertPool {
|
||||
|
||||
certPool.AddCert(cert)
|
||||
|
||||
return certPool
|
||||
return tls.RequireAndVerifyClientCert, certPool
|
||||
}
|
||||
|
||||
// Send join requests to the leader.
|
||||
@ -592,10 +586,9 @@ func joinCluster(s *raft.Server, serverName string) error {
|
||||
var b bytes.Buffer
|
||||
|
||||
command := &JoinCommand{
|
||||
Name: s.Name(),
|
||||
Hostname: info.Hostname,
|
||||
RaftPort: info.RaftPort,
|
||||
ClientPort: info.ClientPort,
|
||||
Name: s.Name(),
|
||||
RaftURL: info.RaftURL,
|
||||
EtcdURL: info.EtcdURL,
|
||||
}
|
||||
|
||||
json.NewEncoder(&b).Encode(command)
|
||||
|
@ -1,15 +1,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/coreos/etcd/store"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
// Handlers to handle etcd-store related request via raft client port
|
||||
// Handlers to handle etcd-store related request via etcd url
|
||||
//-------------------------------------------------------------------
|
||||
|
||||
// Multiplex GET/POST/DELETE request to corresponding handlers
|
||||
@ -45,7 +45,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
|
||||
debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key)
|
||||
|
||||
value := req.FormValue("value")
|
||||
|
||||
@ -72,9 +72,9 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
||||
|
||||
if len(prevValue) != 0 {
|
||||
command := &TestAndSetCommand{
|
||||
Key: key,
|
||||
Value: value,
|
||||
PrevValue: prevValue,
|
||||
Key: key,
|
||||
Value: value,
|
||||
PrevValue: prevValue,
|
||||
ExpireTime: expireTime,
|
||||
}
|
||||
|
||||
@ -82,8 +82,8 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
||||
|
||||
} else {
|
||||
command := &SetCommand{
|
||||
Key: key,
|
||||
Value: value,
|
||||
Key: key,
|
||||
Value: value,
|
||||
ExpireTime: expireTime,
|
||||
}
|
||||
|
||||
@ -96,7 +96,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
||||
func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
|
||||
key := req.URL.Path[len("/v1/keys/"):]
|
||||
|
||||
debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
|
||||
debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key)
|
||||
|
||||
command := &DeleteCommand{
|
||||
Key: key,
|
||||
@ -171,10 +171,10 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
|
||||
var url string
|
||||
|
||||
if client {
|
||||
clientAddr, _ := getClientAddr(raftServer.Leader())
|
||||
url = scheme + clientAddr + path
|
||||
clientAddr, _ := getEtcdURL(raftServer.Leader())
|
||||
url = clientAddr + path
|
||||
} else {
|
||||
url = scheme + raftServer.Leader() + path
|
||||
url = raftServer.Leader() + path
|
||||
}
|
||||
|
||||
debugf("Redirect to %s", url)
|
||||
@ -194,13 +194,14 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
|
||||
// command?
|
||||
//--------------------------------------
|
||||
|
||||
// Handler to return the current leader name
|
||||
// Handler to return the current leader's raft address
|
||||
func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
leader := raftServer.Leader()
|
||||
|
||||
if leader != "" {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(raftServer.Leader()))
|
||||
raftURL, _ := nameToRaftURL(leader)
|
||||
w.Write([]byte(raftURL))
|
||||
} else {
|
||||
|
||||
// not likely, but it may happen
|
||||
@ -215,14 +216,14 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
// Add itself to the machine list first
|
||||
// Since peer map does not contain the server itself
|
||||
machines, _ := getClientAddr(raftServer.Name())
|
||||
machines, _ := getEtcdURL(raftServer.Name())
|
||||
|
||||
// Add all peers to the list and separate by comma
|
||||
// We do not use json here since we accept machines list
|
||||
// in the command line separate by comma.
|
||||
|
||||
for peerName, _ := range peers {
|
||||
if addr, ok := getClientAddr(peerName); ok {
|
||||
if addr, ok := getEtcdURL(peerName); ok {
|
||||
machines = machines + "," + addr
|
||||
}
|
||||
}
|
@ -34,10 +34,11 @@ func TestKillLeader(t *testing.T) {
|
||||
|
||||
var totalTime time.Duration
|
||||
|
||||
leader := "127.0.0.1:7001"
|
||||
leader := "http://127.0.0.1:7001"
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
|
||||
for i := 0; i < clusterSize; i++ {
|
||||
fmt.Println("leader is ", leader)
|
||||
port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
|
||||
num := port - 7001
|
||||
fmt.Println("kill server ", num)
|
||||
etcds[num].Kill()
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
func TestSingleNode(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
args := []string{"etcd", "-h=127.0.0.1", "-f", "-d=/tmp/node1"}
|
||||
args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"}
|
||||
|
||||
process, err := os.StartProcess("etcd", args, procAttr)
|
||||
if err != nil {
|
||||
@ -56,7 +56,7 @@ func TestSingleNode(t *testing.T) {
|
||||
func TestSingleNodeRecovery(t *testing.T) {
|
||||
procAttr := new(os.ProcAttr)
|
||||
procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
|
||||
args := []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"}
|
||||
args := []string{"etcd", "-n=node1", "-d=/tmp/node1"}
|
||||
|
||||
process, err := os.StartProcess("etcd", append(args, "-f"), procAttr)
|
||||
if err != nil {
|
||||
|
16
machines.go
16
machines.go
@ -1,20 +1,20 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func getClientAddr(name string) (string, bool) {
|
||||
response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
|
||||
func getEtcdURL(name string) (string, bool) {
|
||||
resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
|
||||
|
||||
values := strings.Split(response[0].Value, ",")
|
||||
m, err := url.ParseQuery(resps[0].Value)
|
||||
|
||||
hostname := values[0]
|
||||
clientPort := values[2]
|
||||
if err != nil {
|
||||
panic("Failed to parse machines entry")
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%s", hostname, clientPort)
|
||||
addr := m["etcd"][0]
|
||||
|
||||
return addr, true
|
||||
}
|
||||
|
68
name_url_map.go
Normal file
68
name_url_map.go
Normal file
@ -0,0 +1,68 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"path"
|
||||
)
|
||||
|
||||
// we map node name to url
|
||||
type nodeInfo struct {
|
||||
raftURL string
|
||||
etcdURL string
|
||||
}
|
||||
|
||||
var namesMap = make(map[string]*nodeInfo)
|
||||
|
||||
// nameToEtcdURL maps node name to its etcd http address
|
||||
func nameToEtcdURL(name string) (string, bool) {
|
||||
|
||||
if info, ok := namesMap[name]; ok {
|
||||
// first try to read from the map
|
||||
return info.etcdURL, true
|
||||
}
|
||||
|
||||
// if fails, try to recover from etcd storage
|
||||
return readURL(name, "etcd")
|
||||
|
||||
}
|
||||
|
||||
// nameToRaftURL maps node name to its raft http address
|
||||
func nameToRaftURL(name string) (string, bool) {
|
||||
if info, ok := namesMap[name]; ok {
|
||||
// first try to read from the map
|
||||
return info.raftURL, true
|
||||
|
||||
}
|
||||
|
||||
// if fails, try to recover from etcd storage
|
||||
return readURL(name, "raft")
|
||||
}
|
||||
|
||||
// addNameToURL add a name that maps to raftURL and etcdURL
|
||||
func addNameToURL(name string, raftURL string, etcdURL string) {
|
||||
namesMap[name] = &nodeInfo{
|
||||
raftURL: raftURL,
|
||||
etcdURL: etcdURL,
|
||||
}
|
||||
}
|
||||
|
||||
func readURL(nodeName string, urlName string) (string, bool) {
|
||||
// if fails, try to recover from etcd storage
|
||||
key := path.Join("/_etcd/machines", nodeName)
|
||||
|
||||
resps, err := etcdStore.RawGet(key)
|
||||
|
||||
if err != nil {
|
||||
return "", false
|
||||
}
|
||||
|
||||
m, err := url.ParseQuery(resps[0].Value)
|
||||
|
||||
if err != nil {
|
||||
panic("Failed to parse machines entry")
|
||||
}
|
||||
|
||||
url := m[urlName][0]
|
||||
|
||||
return url, true
|
||||
}
|
@ -4,7 +4,6 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/coreos/go-raft"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
//-------------------------------------------------------------
|
||||
@ -87,12 +86,11 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// Get the port that listening for client connecting of the server
|
||||
func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
|
||||
// Get the port that listening for etcd connecting of the server
|
||||
func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
debugf("[recv] Get %s/etcdURL/ ", raftTransporter.scheme+raftServer.Name())
|
||||
w.WriteHeader(http.StatusOK)
|
||||
client := argInfo.Hostname + ":" + strconv.Itoa(argInfo.ClientPort)
|
||||
w.Write([]byte(client))
|
||||
w.Write([]byte(argInfo.EtcdURL))
|
||||
}
|
||||
|
||||
// Response to the join request
|
||||
@ -108,3 +106,10 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Response to the name request
|
||||
func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
|
||||
debugf("[recv] Get %s/name/ ", raftTransporter.scheme+raftServer.Name())
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(raftServer.Name()))
|
||||
}
|
||||
|
8
test.go
8
test.go
@ -59,10 +59,10 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process,
|
||||
argGroup := make([][]string, size)
|
||||
for i := 0; i < size; i++ {
|
||||
if i == 0 {
|
||||
argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"}
|
||||
argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
|
||||
} else {
|
||||
strI := strconv.Itoa(i + 1)
|
||||
argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
|
||||
argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"}
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,7 +103,7 @@ func destroyCluster(etcds []*os.Process) error {
|
||||
//
|
||||
func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
|
||||
leaderMap := make(map[int]string)
|
||||
baseAddrFormat := "http://0.0.0.0:400%d/leader"
|
||||
baseAddrFormat := "http://0.0.0.0:400%d"
|
||||
|
||||
for {
|
||||
knownLeader := "unknown"
|
||||
@ -151,7 +151,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
|
||||
|
||||
func getLeader(addr string) (string, error) {
|
||||
|
||||
resp, err := client.Get(addr)
|
||||
resp, err := client.Get(addr + "/leader")
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -139,7 +139,6 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
||||
// serverName should contain both hostName and port
|
||||
func (c *Client) createHttpPath(serverName string, _path string) string {
|
||||
httpPath := path.Join(serverName, _path)
|
||||
httpPath = c.config.Scheme + "://" + httpPath
|
||||
return httpPath
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"github.com/coreos/go-raft"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
@ -23,12 +22,13 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
|
||||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send LogEntries to %s ", peer.Name())
|
||||
u, _ := nameToRaftURL(peer.Name())
|
||||
debugf("Send LogEntries to %s ", u)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
|
||||
|
||||
if err != nil {
|
||||
debugf("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err)
|
||||
debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
@ -48,12 +48,13 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
|
||||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send Vote to %s", peer.Name())
|
||||
u, _ := nameToRaftURL(peer.Name())
|
||||
debugf("Send Vote to %s", u)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
|
||||
|
||||
if err != nil {
|
||||
debugf("Cannot send VoteRequest to %s : %s", peer.Name(), err)
|
||||
debugf("Cannot send VoteRequest to %s : %s", u, err)
|
||||
}
|
||||
|
||||
if resp != nil {
|
||||
@ -73,10 +74,11 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
|
||||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
|
||||
u, _ := nameToRaftURL(peer.Name())
|
||||
debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
|
||||
req.LastTerm, req.LastIndex)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
@ -95,10 +97,11 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
|
||||
var b bytes.Buffer
|
||||
json.NewEncoder(&b).Encode(req)
|
||||
|
||||
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(),
|
||||
u, _ := nameToRaftURL(peer.Name())
|
||||
debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
|
||||
req.LastTerm, req.LastIndex)
|
||||
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b)
|
||||
resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
|
||||
|
||||
if resp != nil {
|
||||
defer resp.Body.Close()
|
||||
@ -110,25 +113,14 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
|
||||
return aersp
|
||||
}
|
||||
|
||||
// Get the client address of the leader in the cluster
|
||||
func (t transporter) GetLeaderClientAddress() string {
|
||||
resp, _ := t.Get(raftServer.Leader() + "/client")
|
||||
if resp != nil {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
return string(body)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Send server side POST request
|
||||
func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
|
||||
resp, err := t.client.Post(t.scheme+path, "application/json", body)
|
||||
resp, err := t.client.Post(path, "application/json", body)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Send server side GET request
|
||||
func (t transporter) Get(path string) (*http.Response, error) {
|
||||
resp, err := t.client.Get(t.scheme + path)
|
||||
resp, err := t.client.Get(path)
|
||||
return resp, err
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"github.com/coreos/go-raft"
|
||||
"html/template"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
var s *raft.Server
|
||||
@ -24,7 +25,9 @@ func mainHandler(c http.ResponseWriter, req *http.Request) {
|
||||
mainTempl.Execute(c, p)
|
||||
}
|
||||
|
||||
func Start(server *raft.Server, port int) {
|
||||
func Start(server *raft.Server, webURL string) {
|
||||
u, _ := url.Parse(webURL)
|
||||
|
||||
mainTempl = template.Must(template.New("index.html").Parse(index_html))
|
||||
s = server
|
||||
|
||||
@ -32,6 +35,6 @@ func Start(server *raft.Server, port int) {
|
||||
http.HandleFunc("/", mainHandler)
|
||||
http.Handle("/ws", websocket.Handler(wsHandler))
|
||||
|
||||
fmt.Println("web listening at port ", port)
|
||||
http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
|
||||
fmt.Printf("etcd web server listening on %s\n", u)
|
||||
http.ListenAndServe(u.Host, nil)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user