fix redirect after seprate client and server pot

This commit is contained in:
Xiang Li 2013-06-28 15:37:29 -07:00
parent 430f5d50e3
commit c2f436a58b
6 changed files with 63 additions and 42 deletions

View File

@ -16,7 +16,7 @@ import (
// A command represents an action to be taken on the replicated state machine. // A command represents an action to be taken on the replicated state machine.
type Command interface { type Command interface {
CommandName() string CommandName() string
Apply(server *raft.Server) (interface {}, error) Apply(server *raft.Server) (interface{}, error)
} }
// Set command // Set command
@ -32,7 +32,7 @@ func (c *SetCommand) CommandName() string {
} }
// Set the value of key to value // Set the value of key to value
func (c *SetCommand) Apply(server *raft.Server) (interface {}, error) { func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Set(c.Key, c.Value, c.ExpireTime) return store.Set(c.Key, c.Value, c.ExpireTime)
} }
@ -52,7 +52,7 @@ func (c *GetCommand) CommandName() string {
} }
// Set the value of key to value // Set the value of key to value
func (c *GetCommand) Apply(server *raft.Server) (interface {}, error) { func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
res := store.Get(c.Key) res := store.Get(c.Key)
return json.Marshal(res) return json.Marshal(res)
} }
@ -72,7 +72,7 @@ func (c *DeleteCommand) CommandName() string {
} }
// Delete the key // Delete the key
func (c *DeleteCommand) Apply(server *raft.Server) (interface {}, error) { func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
return store.Delete(c.Key) return store.Delete(c.Key)
} }
@ -86,7 +86,7 @@ func (c *WatchCommand) CommandName() string {
return "watch" return "watch"
} }
func (c *WatchCommand) Apply(server *raft.Server) (interface {}, error) { func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
ch := make(chan store.Response) ch := make(chan store.Response)
// add to the watchers list // add to the watchers list
@ -107,7 +107,7 @@ func (c *JoinCommand) CommandName() string {
return "join" return "join"
} }
func (c *JoinCommand) Apply(server *raft.Server) (interface {}, error) { func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
err := server.AddPeer(c.Name) err := server.AddPeer(c.Name)
// no result will be returned // no result will be returned
return nil, err return nil, err

20
etcd.go
View File

@ -68,7 +68,7 @@ func init() {
flag.StringVar(&dirPath, "d", "./", "the directory to store log and snapshot") flag.StringVar(&dirPath, "d", "./", "the directory to store log and snapshot")
flag.BoolVar(&ignore, "i", false , "ignore the old configuration, create a new node") flag.BoolVar(&ignore, "i", false, "ignore the old configuration, create a new node")
} }
// CONSTANTS // CONSTANTS
@ -95,11 +95,10 @@ const (
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type Info struct { type Info struct {
Address string `json:"address"` Address string `json:"address"`
ServerPort int `json:"serverPort"` ServerPort int `json:"serverPort"`
ClientPort int `json:"clientPort"` ClientPort int `json:"clientPort"`
WebPort int `json:"webPort"` WebPort int `json:"webPort"`
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -109,6 +108,7 @@ type Info struct {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
var server *raft.Server var server *raft.Server
var serverTransHandler transHandler
var logger *log.Logger var logger *log.Logger
var storeMsg chan string var storeMsg chan string
@ -152,13 +152,13 @@ func main() {
panic("ERROR type") panic("ERROR type")
} }
t := createTranHandler(st) serverTransHandler = createTranHandler(st)
// Setup new raft server. // Setup new raft server.
s := store.GetStore() s := store.GetStore()
// create raft server // create raft server
server, err = raft.NewServer(name, dirPath, t, s, nil) server, err = raft.NewServer(name, dirPath, serverTransHandler, s, nil)
if err != nil { if err != nil {
fatal("%v", err) fatal("%v", err)
@ -262,7 +262,7 @@ func startServTransport(port int, st int) {
http.HandleFunc("/log", GetLogHttpHandler) http.HandleFunc("/log", GetLogHttpHandler)
http.HandleFunc("/log/append", AppendEntriesHttpHandler) http.HandleFunc("/log/append", AppendEntriesHttpHandler)
http.HandleFunc("/snapshot", SnapshotHttpHandler) http.HandleFunc("/snapshot", SnapshotHttpHandler)
http.HandleFunc("/client", clientHttpHandler)
switch st { switch st {
@ -351,14 +351,13 @@ func startClientTransport(port int, st int) {
} }
} }
//-------------------------------------- //--------------------------------------
// Config // Config
//-------------------------------------- //--------------------------------------
func securityType(source int) int { func securityType(source int) int {
var keyFile, certFile, CAFile string var keyFile, certFile, CAFile string
switch source { switch source {
case SERVER: case SERVER:
@ -480,4 +479,3 @@ func Join(s *raft.Server, serverName string) error {
} }
return fmt.Errorf("Unable to join: %v", err) return fmt.Errorf("Unable to join: %v", err)
} }

View File

@ -13,7 +13,7 @@ import (
) )
//-------------------------------------- //--------------------------------------
// HTTP Handlers // Internal HTTP Handlers via server port
//-------------------------------------- //--------------------------------------
// Get all the current logs // Get all the current logs
@ -72,6 +72,13 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} }
func clientHttpHandler(w http.ResponseWriter, req *http.Request) {
debug("[recv] Get http://%v/client/ ", server.Name())
w.WriteHeader(http.StatusOK)
client := address + ":" + strconv.Itoa(clientPort)
w.Write([]byte(client))
}
func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
command := &JoinCommand{} command := &JoinCommand{}
@ -85,6 +92,9 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
} }
} }
//--------------------------------------
// external HTTP Handlers via client port
//--------------------------------------
func SetHttpHandler(w http.ResponseWriter, req *http.Request) { func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/set/"):] key := req.URL.Path[len("/set/"):]
@ -141,12 +151,12 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
(*w).WriteHeader(http.StatusOK) (*w).WriteHeader(http.StatusOK)
if body == nil { if body == nil {
return return
} }
body, ok := body.([]byte) body, ok := body.([]byte)
if !ok { if !ok {
panic ("wrong type") panic("wrong type")
} }
(*w).Write(body) (*w).Write(body)
@ -154,19 +164,19 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
} }
} else { } else {
// tell the client where is the leader // tell the client where is the leader
debug("Redirect to the leader %s", server.Leader()) debug("Redirect to the leader %s", server.Leader())
path := req.URL.Path path := req.URL.Path
var scheme string var scheme string
if scheme = req.URL.Scheme; scheme == "" { if scheme = req.URL.Scheme; scheme == "" {
scheme = "http://" scheme = "http://"
} }
url := scheme + server.Leader() + path url := scheme + leaderClient() + path
debug("redirect to ", url) debug("redirect to ", url)
http.Redirect(*w, req, url, http.StatusTemporaryRedirect) http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
return return
} }
@ -198,7 +208,7 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) {
body, ok := body.([]byte) body, ok := body.([]byte)
if !ok { if !ok {
panic ("wrong type") panic("wrong type")
} }
w.Write(body) w.Write(body)
@ -224,9 +234,9 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
body, ok := body.([]byte) body, ok := body.([]byte)
if !ok { if !ok {
panic ("wrong type") panic("wrong type")
} }
w.Write(body) w.Write(body)
return return
} }

View File

@ -109,7 +109,7 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) {
go expire(key, node.update, expireTime) go expire(key, node.update, expireTime)
} }
} }
// update the information of the node // update the information of the node
s.Nodes[key] = Node{value, expireTime, node.update} s.Nodes[key] = Node{value, expireTime, node.update}

View File

@ -15,7 +15,7 @@ type WatcherHub struct {
} }
type Watcher struct { type Watcher struct {
c chan Response c chan Response
wType int wType int
} }
@ -46,7 +46,7 @@ func AddWatcher(prefix string, c chan Response, wType int) error {
_, ok := w.watchers[prefix] _, ok := w.watchers[prefix]
if !ok { if !ok {
w.watchers[prefix] = make([]Watcher, 0) w.watchers[prefix] = make([]Watcher, 0)
watcher := Watcher{c, wType} watcher := Watcher{c, wType}
@ -80,10 +80,10 @@ func notify(resp Response) error {
newWatchers := make([]Watcher, 0) newWatchers := make([]Watcher, 0)
// notify all the watchers // notify all the watchers
for _, watcher := range watchers { for _, watcher := range watchers {
watcher.c <- resp watcher.c <- resp
if watcher.wType == LONG { if watcher.wType == LONG {
newWatchers = append(newWatchers, watcher) newWatchers = append(newWatchers, watcher)
} }
} }
if len(newWatchers) == 0 { if len(newWatchers) == 0 {

21
util.go
View File

@ -1,13 +1,15 @@
package main package main
import ( import (
"net/http"
"io"
"fmt"
"encoding/json" "encoding/json"
"fmt"
"github.com/xiangli-cmu/raft-etcd/web" "github.com/xiangli-cmu/raft-etcd/web"
"io"
"io/ioutil"
"net/http"
"os" "os"
) )
//-------------------------------------- //--------------------------------------
// Web Helper // Web Helper
//-------------------------------------- //--------------------------------------
@ -63,6 +65,17 @@ func Get(t *transHandler, path string) (*http.Response, error) {
} }
} }
func leaderClient() string {
resp, _ := Get(&serverTransHandler, server.Leader()+"/client")
if resp != nil {
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
return string(body)
}
return ""
}
//-------------------------------------- //--------------------------------------
// Log // Log
//-------------------------------------- //--------------------------------------
@ -84,4 +97,4 @@ func warn(msg string, v ...interface{}) {
func fatal(msg string, v ...interface{}) { func fatal(msg string, v ...interface{}) {
logger.Printf("FATAL "+msg+"\n", v...) logger.Printf("FATAL "+msg+"\n", v...)
os.Exit(1) os.Exit(1)
} }