clean up etcd.go

This commit is contained in:
Xiang Li 2013-07-09 20:37:00 -07:00
parent 5271da3d19
commit f5796244b4
3 changed files with 61 additions and 59 deletions

93
etcd.go
View File

@ -101,6 +101,14 @@ type Info struct {
ServerPort int `json:"serverPort"`
ClientPort int `json:"clientPort"`
WebPort int `json:"webPort"`
ServerCertFile string `json:"serverCertFile"`
ServerKeyFile string `json:"serverKeyFile"`
ServerCAFile string `json:"serverCAFile"`
ClientCertFile string `json:"clientCertFile"`
ClientKeyFile string `json:"clientKeyFile"`
ClientCAFile string `json:"clientCAFile"`
}
//------------------------------------------------------------------------------
@ -112,6 +120,7 @@ type Info struct {
var raftServer *raft.Server
var raftTransporter transporter
var etcdStore *store.Store
var info *Info
//------------------------------------------------------------------------------
//
@ -124,7 +133,6 @@ var etcdStore *store.Store
//--------------------------------------
func main() {
var err error
flag.Parse()
// Setup commands.
@ -134,9 +142,8 @@ func main() {
if err := os.MkdirAll(dirPath, 0744); err != nil {
fatal("Unable to create path: %v", err)
}
var info *Info = getInfo(dirPath)
name := fmt.Sprintf("%s:%d", info.Address, info.ServerPort)
info = getInfo(dirPath)
// secrity type
st := securityType(SERVER)
@ -147,14 +154,32 @@ func main() {
fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
}
// Create transporter for raft
raftTransporter = createTransporter(st)
// Create etcd key-value store
etcdStore = store.CreateStore(maxSize)
startRaft(st)
if webPort != -1 {
// start web
etcdStore.SetMessager(&storeMsg)
go webHelper()
go web.Start(raftServer, webPort)
}
startClientTransport(info.ClientPort, clientSt)
}
// Start the raft server
func startRaft(securityType int) {
raftName := fmt.Sprintf("%s:%d", info.Address, info.ServerPort)
// Create transporter for raft
raftTransporter = createTransporter(securityType)
// Create raft server
raftServer, err = raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
raftServer, err := raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
if err != nil {
fmt.Println(err)
@ -213,16 +238,8 @@ func main() {
// open the snapshot
// go server.Snapshot()
if webPort != -1 {
// start web
etcdStore.SetMessager(&storeMsg)
go webHelper()
go web.Start(raftServer, webPort)
}
go startRaftTransport(info.ServerPort, st)
startClientTransport(info.ClientPort, clientSt)
// start to response to raft requests
go startRaftTransport(info.ServerPort, securityType)
}
@ -276,11 +293,11 @@ func startRaftTransport(port int, st int) {
switch st {
case HTTP:
fmt.Println("raft server [%s] listen on http port %v", address, port)
fmt.Printf("raft server [%s] listen on http port %v\n", address, port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTPS:
fmt.Println("raft server [%s] listen on https port %v", address, port)
fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
case HTTPSANDVERIFY:
@ -292,7 +309,7 @@ func startRaftTransport(port int, st int) {
},
Addr: fmt.Sprintf(":%d", port),
}
fmt.Println("raft server [%s] listen on https port %v", address, port)
fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
err := server.ListenAndServeTLS(serverCertFile, serverKeyFile)
if err != nil {
@ -314,11 +331,11 @@ func startClientTransport(port int, st int) {
switch st {
case HTTP:
fmt.Println("etcd [%s] listen on http port %v", address, clientPort)
fmt.Printf("etcd [%s] listen on http port %v\n", address, clientPort)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTPS:
fmt.Println("etcd [%s] listen on https port %v", address, clientPort)
fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
case HTTPSANDVERIFY:
@ -330,7 +347,7 @@ func startClientTransport(port int, st int) {
},
Addr: fmt.Sprintf(":%d", port),
}
fmt.Println("etcd [%s] listen on https port %v", address, clientPort)
fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
err := server.ListenAndServeTLS(clientCertFile, clientKeyFile)
if err != nil {
@ -344,6 +361,7 @@ func startClientTransport(port int, st int) {
// Config
//--------------------------------------
// Get the security type
func securityType(source int) int {
var keyFile, certFile, CAFile string
@ -351,14 +369,14 @@ func securityType(source int) int {
switch source {
case SERVER:
keyFile = serverKeyFile
certFile = serverCertFile
CAFile = serverCAFile
keyFile = info.ServerKeyFile
certFile = info.ServerCertFile
CAFile = info.ServerCAFile
case CLIENT:
keyFile = clientKeyFile
certFile = clientCertFile
CAFile = clientCAFile
keyFile = info.ClientKeyFile
certFile = info.ClientCertFile
CAFile = info.ClientCAFile
}
// If the user do not specify key file, cert file and
@ -385,13 +403,15 @@ func securityType(source int) int {
return -1
}
// Get the server info from previous conf file
// or from the user
func getInfo(path string) *Info {
info := &Info{}
// Read in the server info if available.
infoPath := fmt.Sprintf("%s/info", path)
// delete the old configuration if exist
// Delete the old configuration if exist
if ignore {
logPath := fmt.Sprintf("%s/log", path)
snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
@ -411,8 +431,8 @@ func getInfo(path string) *Info {
}
file.Close()
// Otherwise ask user for info and write it to file.
} else {
// Otherwise ask user for info and write it to file.
if address == "" {
fatal("Please give the address of the local machine")
@ -426,6 +446,14 @@ func getInfo(path string) *Info {
info.ClientPort = clientPort
info.WebPort = webPort
info.ClientCAFile = clientCAFile
info.ClientCertFile = clientCertFile
info.ClientKeyFile = clientKeyFile
info.ServerCAFile = serverCAFile
info.ServerKeyFile = serverKeyFile
info.ServerCertFile = serverCertFile
// Write to file.
content, _ := json.Marshal(info)
content = []byte(string(content) + "\n")
@ -437,6 +465,7 @@ func getInfo(path string) *Info {
return info
}
// Create client auth certpool
func createCertPool(CAFile string) *x509.CertPool {
pemByte, _ := ioutil.ReadFile(CAFile)
@ -491,7 +520,7 @@ func joinCluster(s *raft.Server, serverName string) error {
return fmt.Errorf("Unable to join: %v", err)
}
// register commands to raft server
// Register commands to raft server
func registerCommands() {
raft.RegisterCommand(&JoinCommand{})
raft.RegisterCommand(&SetCommand{})

View File

@ -63,10 +63,6 @@ func debug(msg string, v ...interface{}) {
}
}
func info(msg string, v ...interface{}) {
logger.Printf("INFO "+msg+"\n", v...)
}
func warn(msg string, v ...interface{}) {
logger.Printf("WARN "+msg+"\n", v...)
}

View File

@ -4,7 +4,6 @@ import (
"code.google.com/p/go.net/websocket"
"fmt"
"github.com/coreos/go-raft"
//"github.com/xiangli-cmu/raft-etcd/store"
"html/template"
"net/http"
//"time"
@ -18,28 +17,6 @@ type MainPage struct {
Address string
}
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Leader:\n%s\n", s.Leader())
fmt.Fprintf(w, "Peers:\n")
for peerName, _ := range s.Peers() {
fmt.Fprintf(w, "%s\n", peerName)
}
fmt.Fprintf(w, "Data\n")
//s := store.GetStore()
// for key, node := range s.Nodes {
// if node.ExpireTime.Equal(time.Unix(0, 0)) {
// fmt.Fprintf(w, "%s %s\n", key, node.Value)
// } else {
// fmt.Fprintf(w, "%s %s %s\n", key, node.Value, node.ExpireTime)
// }
// }
}
func mainHandler(c http.ResponseWriter, req *http.Request) {
p := &MainPage{Leader: s.Leader(),