Merge pull request #8 from xiangli-cmu/master

clean up transporter.go
This commit is contained in:
Xiang Li 2013-07-09 17:26:19 -07:00
commit 3085b06ab5
5 changed files with 186 additions and 175 deletions

73
etcd.go
View File

@ -111,8 +111,8 @@ type Info struct {
//
//------------------------------------------------------------------------------
var server *raft.Server
var serverTransHandler transHandler
var raftServer *raft.Server
var raftTransporter transporter
var etcdStore *store.Store
//------------------------------------------------------------------------------
@ -156,66 +156,67 @@ func main() {
panic("ERROR type")
}
serverTransHandler = createTranHandler(st)
raftTransporter = createTransporter(st)
// Setup new raft server.
etcdStore = store.CreateStore(maxSize)
// create raft server
server, err = raft.NewServer(name, dirPath, serverTransHandler, etcdStore, nil)
raftServer, err = raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
if err != nil {
fatal("%v", err)
}
err = server.LoadSnapshot()
err = raftServer.LoadSnapshot()
if err == nil {
debug("%s finished load snapshot", server.Name())
debug("%s finished load snapshot", raftServer.Name())
} else {
fmt.Println(err)
debug("%s bad snapshot", server.Name())
debug("%s bad snapshot", raftServer.Name())
}
server.Initialize()
debug("%s finished init", server.Name())
server.SetElectionTimeout(ELECTIONTIMTOUT)
server.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
debug("%s finished set timeout", server.Name())
if server.IsLogEmpty() {
raftServer.Initialize()
debug("%s finished init", raftServer.Name())
raftServer.SetElectionTimeout(ELECTIONTIMTOUT)
raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
debug("%s finished set timeout", raftServer.Name())
if raftServer.IsLogEmpty() {
// start as a leader in a new cluster
if cluster == "" {
server.StartLeader()
raftServer.StartLeader()
time.Sleep(time.Millisecond * 20)
// join self as a peer
// leader need to join self as a peer
for {
command := &JoinCommand{}
command.Name = server.Name()
_, err := server.Do(command)
command.Name = raftServer.Name()
_, err := raftServer.Do(command)
if err == nil {
break
}
}
debug("%s start as a leader", server.Name())
debug("%s start as a leader", raftServer.Name())
// start as a fellower in a existing cluster
// start as a follower in a existing cluster
} else {
server.StartFollower()
raftServer.StartFollower()
err := Join(server, cluster)
err := Join(raftServer, cluster)
if err != nil {
panic(err)
}
fmt.Println("success join")
debug("%s success join to the cluster", raftServer.Name())
}
// rejoin the previous cluster
} else {
server.StartFollower()
debug("%s start as a follower", server.Name())
// rejoin the previous cluster
raftServer.StartFollower()
debug("%s restart as a follower", raftServer.Name())
}
// open the snapshot
@ -225,7 +226,7 @@ func main() {
// start web
etcdStore.SetMessager(&storeMsg)
go webHelper()
go web.Start(server, webPort)
go web.Start(raftServer, webPort)
}
go startServTransport(info.ServerPort, st)
@ -233,12 +234,11 @@ func main() {
}
func createTranHandler(st int) transHandler {
t := transHandler{}
func createTransporter(st int) transporter {
t := transporter{}
switch st {
case HTTP:
t := transHandler{}
t.client = nil
return t
@ -264,7 +264,7 @@ func createTranHandler(st int) transHandler {
}
// for complier
return transHandler{}
return transporter{}
}
func startServTransport(port int, st int) {
@ -280,11 +280,12 @@ func startServTransport(port int, st int) {
switch st {
case HTTP:
debug("raft server [%s] listen on http", server.Name())
debug("raft server [%s] listen on http port %v", address, port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTPS:
http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil)
debug("raft server [%s] listen on https port %v", address, port)
log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
case HTTPSANDVERIFY:
pemByte, _ := ioutil.ReadFile(serverCAFile)
@ -328,7 +329,7 @@ func startClientTransport(port int, st int) {
switch st {
case HTTP:
debug("etcd [%s] listen on http", server.Name())
debug("etcd [%s] listen on http port %v", address, clientPort)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
case HTTPS:
@ -468,9 +469,9 @@ func Join(s *raft.Server, serverName string) error {
json.NewEncoder(&b).Encode(command)
// t must be ok
t, _ := server.Transporter().(transHandler)
t, _ := raftServer.Transporter().(transporter)
debug("Send Join Request to %s", serverName)
resp, err := Post(&t, fmt.Sprintf("%s/join", serverName), &b)
resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
for {
if resp != nil {
@ -486,7 +487,7 @@ func Join(s *raft.Server, serverName string) error {
debug("Leader is %s", address)
debug("Send Join Request to %s", address)
json.NewEncoder(&b).Encode(command)
resp, err = Post(&t, fmt.Sprintf("%s/join", address), &b)
resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
}
}
}

View File

@ -14,18 +14,18 @@ import (
// Get all the current logs
func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] GET http://%v/log", server.Name())
debug("[recv] GET http://%v/log", raftServer.Name())
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(server.LogEntries())
json.NewEncoder(w).Encode(raftServer.LogEntries())
}
func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
rvreq := &raft.RequestVoteRequest{}
err := decodeJsonRequest(req, rvreq)
if err == nil {
debug("[recv] POST http://%v/vote [%s]", server.Name(), rvreq.CandidateName)
if resp := server.RequestVote(rvreq); resp != nil {
debug("[recv] POST http://%v/vote [%s]", raftServer.Name(), rvreq.CandidateName)
if resp := raftServer.RequestVote(rvreq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
@ -38,10 +38,10 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.AppendEntriesRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
if resp := server.AppendEntries(aereq); resp != nil {
debug("[recv] POST http://%s/log/append [%d]", raftServer.Name(), len(aereq.Entries))
if resp := raftServer.AppendEntries(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
if !resp.Success {
@ -50,7 +50,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
return
}
}
warn("[append] ERROR: %v", err)
warn("[Append Entry] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
@ -58,24 +58,26 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
aereq := &raft.SnapshotRequest{}
err := decodeJsonRequest(req, aereq)
if err == nil {
debug("[recv] POST http://%s/snapshot/ ", server.Name())
if resp, _ := server.SnapshotRecovery(aereq); resp != nil {
debug("[recv] POST http://%s/snapshot/ ", raftServer.Name())
if resp, _ := raftServer.SnapshotRecovery(aereq); resp != nil {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
return
}
}
warn("[snapshot] ERROR: %v", err)
warn("[Snapshot] ERROR: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
// Get the port that listening for client connecting of the server
func clientHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] Get http://%v/client/ ", server.Name())
debug("[recv] Get http://%v/client/ ", raftServer.Name())
w.WriteHeader(http.StatusOK)
client := address + ":" + strconv.Itoa(clientPort)
w.Write([]byte(client))
}
//
func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{}
@ -93,6 +95,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
// external HTTP Handlers via client port
//--------------------------------------
// Dispatch GET/POST/DELETE request to corresponding handlers
func Multiplexer(w http.ResponseWriter, req *http.Request) {
if req.Method == "GET" {
@ -107,10 +110,11 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) {
}
}
// Set Command Handler
func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/keys/"):]
debug("[recv] POST http://%v/v1/keys/%s", server.Name(), key)
debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
command := &SetCommand{}
command.Key = key
@ -138,7 +142,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/testAndSet/"):]
debug("[recv] POST http://%v/v1/testAndSet/%s", server.Name(), key)
debug("[recv] POST http://%v/v1/testAndSet/%s", raftServer.Name(), key)
command := &TestAndSetCommand{}
command.Key = key
@ -167,7 +171,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/keys/"):]
debug("[recv] DELETE http://%v/v1/keys/%s", server.Name(), key)
debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
command := &DeleteCommand{}
command.Key = key
@ -176,8 +180,8 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
}
func excute(c Command, w *http.ResponseWriter, req *http.Request) {
if server.State() == "leader" {
if body, err := server.Do(c); err != nil {
if raftServer.State() == "leader" {
if body, err := raftServer.Do(c); err != nil {
warn("Commit failed %v", err)
(*w).WriteHeader(http.StatusInternalServerError)
return
@ -198,13 +202,13 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
}
} else {
// current no leader
if server.Leader() == "" {
if raftServer.Leader() == "" {
(*w).WriteHeader(http.StatusInternalServerError)
return
}
// tell the client where is the leader
debug("Redirect to the leader %s", server.Leader())
debug("Redirect to the leader %s", raftServer.Leader())
path := req.URL.Path
@ -214,7 +218,7 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
scheme = "http://"
}
url := scheme + leaderClient() + path
url := scheme + raftTransporter.GetLeaderClientAddress() + path
debug("redirect to %s", url)
@ -229,18 +233,18 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
func MasterHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(server.Leader()))
w.Write([]byte(raftServer.Leader()))
}
func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/v1/keys/"):]
debug("[recv] GET http://%v/v1/keys/%s", server.Name(), key)
debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
command := &GetCommand{}
command.Key = key
if body, err := command.Apply(server); err != nil {
if body, err := command.Apply(raftServer); err != nil {
warn("raftd: Unable to write file: %v", err)
(*w).WriteHeader(http.StatusInternalServerError)
return
@ -261,12 +265,12 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
prefix := req.URL.Path[len("/v1/list/"):]
debug("[recv] GET http://%v/v1/list/%s", server.Name(), prefix)
debug("[recv] GET http://%v/v1/list/%s", raftServer.Name(), prefix)
command := &ListCommand{}
command.Prefix = prefix
if body, err := command.Apply(server); err != nil {
if body, err := command.Apply(raftServer); err != nil {
warn("Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
@ -291,11 +295,11 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
command.Key = key
if req.Method == "GET" {
debug("[recv] GET http://%v/watch/%s", server.Name(), key)
debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
command.SinceIndex = 0
} else if req.Method == "POST" {
debug("[recv] POST http://%v/watch/%s", server.Name(), key)
debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
content := req.FormValue("index")
sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
@ -309,7 +313,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
return
}
if body, err := command.Apply(server); err != nil {
if body, err := command.Apply(raftServer); err != nil {
warn("Unable to write file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return

View File

@ -1,79 +0,0 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/coreos/go-raft"
"io"
"net/http"
)
type transHandler struct {
name string
client *http.Client
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send LogEntries to %s ", peer.Name())
resp, _ := Post(&t, fmt.Sprintf("%s/log/append", peer.Name()), &b)
if resp != nil {
defer resp.Body.Close()
aersp = &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}
// Sends RequestVote RPCs to a peer when the server is the candidate.
func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send Vote to %s", peer.Name())
resp, _ := Post(&t, fmt.Sprintf("%s/vote", peer.Name()), &b)
if resp != nil {
defer resp.Body.Close()
rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp
}
}
return rvrsp
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var aersp *raft.SnapshotResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
req.LastTerm, req.LastIndex)
resp, err := Post(&t, fmt.Sprintf("%s/snapshot", peer.Name()), &b)
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}

117
transporter.go Normal file
View File

@ -0,0 +1,117 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/coreos/go-raft"
"io"
"io/ioutil"
"net/http"
)
// Transporter layer for communication between raft nodes
type transporter struct {
name string
// If https is used for server internal communcation,
// we will have a http client. Or it will be nil.
client *http.Client
}
// Sends AppendEntries RPCs to a peer when the server is the leader.
func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
var aersp *raft.AppendEntriesResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send LogEntries to %s ", peer.Name())
resp, _ := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b)
if resp != nil {
defer resp.Body.Close()
aersp = &raft.AppendEntriesResponse{}
if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}
// Sends RequestVote RPCs to a peer when the server is the candidate.
func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
var rvrsp *raft.RequestVoteResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send Vote to %s", peer.Name())
resp, _ := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b)
if resp != nil {
defer resp.Body.Close()
rvrsp := &raft.RequestVoteResponse{}
if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
return rvrsp
}
}
return rvrsp
}
// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
var aersp *raft.SnapshotResponse
var b bytes.Buffer
json.NewEncoder(&b).Encode(req)
debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
req.LastTerm, req.LastIndex)
resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b)
if resp != nil {
defer resp.Body.Close()
aersp = &raft.SnapshotResponse{}
if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
return aersp
}
}
return aersp
}
// Get the the client address of the leader in the cluster
func (t transporter) GetLeaderClientAddress() string {
resp, _ := t.Get(raftServer.Leader() + "/client")
if resp != nil {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
return string(body)
}
return ""
}
// Send server side POST request
func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
if t.client != nil {
resp, err := t.client.Post("https://"+path, "application/json", body)
return resp, err
} else {
resp, err := http.Post("http://"+path, "application/json", body)
return resp, err
}
}
// Send server side GET request
func (t transporter) Get(path string) (*http.Response, error) {
if t.client != nil {
resp, err := t.client.Get("https://" + path)
return resp, err
} else {
resp, err := http.Get("http://" + path)
return resp, err
}
}

32
util.go
View File

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/coreos/etcd/web"
"io"
"io/ioutil"
"log"
"net/http"
"os"
@ -48,37 +47,6 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
}
}
func Post(t *transHandler, path string, body io.Reader) (*http.Response, error) {
if t.client != nil {
resp, err := t.client.Post("https://"+path, "application/json", body)
return resp, err
} else {
resp, err := http.Post("http://"+path, "application/json", body)
return resp, err
}
}
func Get(t *transHandler, path string) (*http.Response, error) {
if t.client != nil {
resp, err := t.client.Get("https://" + path)
return resp, err
} else {
resp, err := http.Get("http://" + path)
return resp, err
}
}
func leaderClient() string {
resp, _ := Get(&serverTransHandler, server.Leader()+"/client")
if resp != nil {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
return string(body)
}
return ""
}
//--------------------------------------
// Log
//--------------------------------------