diff --git a/command.go b/command.go index 7a27362d4..0510f7563 100644 --- a/command.go +++ b/command.go @@ -7,20 +7,15 @@ package main import ( "github.com/benbjohnson/go-raft" + "github.com/xiangli-cmu/raft-etcd/store" "encoding/json" "time" - "github.com/xiangli-cmu/raft-etcd/store" ) // A command represents an action to be taken on the replicated state machine. type Command interface { CommandName() string Apply(server *raft.Server) ([]byte, error) - GeneratePath() string // Gererate a path for http request - Type() string // http request type - GetValue() string - GetKey() string - Sensitive() bool // Sensitive to the stateMachine } // Set command @@ -45,23 +40,6 @@ func (c *SetCommand) GeneratePath() string { return "set/" + c.Key } -// Get the type for http request -func (c *SetCommand) Type() string { - return "POST" -} - -func (c *SetCommand) GetValue() string { - return c.Value -} - -func (c *SetCommand) GetKey() string { - return c.Key -} - -func (c *SetCommand) Sensitive() bool { - return true -} - // Get command type GetCommand struct { @@ -83,23 +61,6 @@ func (c *GetCommand) GeneratePath() string{ return "get/" + c.Key } -func (c *GetCommand) Type() string{ - return "GET" -} - -func (c *GetCommand) GetValue() string{ - return "" -} - -func (c *GetCommand) GetKey() string{ - return c.Key -} - -func (c *GetCommand) Sensitive() bool { - return false -} - - // Delete command type DeleteCommand struct { Key string `json:"key"` @@ -115,27 +76,6 @@ func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){ return store.Delete(c.Key) } -func (c *DeleteCommand) GeneratePath() string{ - return "delete/" + c.Key -} - -func (c *DeleteCommand) Type() string{ - return "GET" -} - -func (c *DeleteCommand) GetValue() string{ - return "" -} - -func (c *DeleteCommand) GetKey() string{ - return c.Key -} - -func (c *DeleteCommand) Sensitive() bool { - return true -} - - // Watch command type WatchCommand struct { Key string `json:"key"` @@ -158,27 +98,6 @@ func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){ return json.Marshal(res) } -func (c *WatchCommand) GeneratePath() string{ - return "watch/" + c.Key -} - -func (c *WatchCommand) Type() string{ - return "GET" -} - -func (c *WatchCommand) GetValue() string{ - return "" -} - -func (c *WatchCommand) GetKey() string{ - return c.Key -} - -func (c *WatchCommand) Sensitive() bool { - return false -} - - // JoinCommand type JoinCommand struct { Name string `json:"name"` @@ -193,3 +112,6 @@ func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) { // no result will be returned return nil, err } + + + diff --git a/handlers.go b/handlers.go index d681f91d1..48105c7ae 100644 --- a/handlers.go +++ b/handlers.go @@ -4,9 +4,9 @@ import ( "github.com/benbjohnson/go-raft" "net/http" "encoding/json" - "fmt" + //"fmt" "io/ioutil" - "bytes" + //"bytes" "time" "strings" "strconv" @@ -75,18 +75,15 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { - debug("[recv] POST http://%v/join", server.Name()) + command := &JoinCommand{} + if err := decodeJsonRequest(req, command); err == nil { - if _, err= server.Do(command); err != nil { - warn("raftd: Unable to join: %v", err) - w.WriteHeader(http.StatusInternalServerError) - } else { - w.WriteHeader(http.StatusOK) - } + debug("Receive Join Request from %s", command.Name) + excute(command, &w) } else { - warn("[join] ERROR: %v", err) w.WriteHeader(http.StatusInternalServerError) + return } } @@ -94,8 +91,6 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { func SetHttpHandler(w http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/set/"):] - debug("[recv] POST http://%v/set/%s", server.Name(), key) - content, err := ioutil.ReadAll(req.Body) if err != nil { @@ -104,6 +99,8 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) { return } + debug("[recv] POST http://%v/set/%s [%s]", server.Name(), key, content) + command := &SetCommand{} command.Key = key values := strings.Split(string(content), " ") @@ -122,18 +119,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) { command.ExpireTime = time.Unix(0,0) } - Dispatch(server, command, w) -} - -func GetHttpHandler(w http.ResponseWriter, req *http.Request) { - key := req.URL.Path[len("/get/"):] - - debug("[recv] GET http://%v/get/%s", server.Name(), key) - - command := &GetCommand{} - command.Key = key - - Dispatch(server, command, w) + excute(command, &w) } @@ -145,10 +131,58 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) { command := &DeleteCommand{} command.Key = key - Dispatch(server, command, w) + excute(command, &w) } +func excute(c Command, w *http.ResponseWriter) { + if server.State() == "leader" { + if body, err := server.Do(c); err != nil { + warn("raftd: Unable to write file: %v", err) + (*w).WriteHeader(http.StatusInternalServerError) + return + } else { + (*w).WriteHeader(http.StatusOK) + (*w).Write(body) + return + } + } else { + // tell the client where is the leader + (*w).WriteHeader(http.StatusTemporaryRedirect) + (*w).Write([]byte(server.Leader())) + return + } + + (*w).WriteHeader(http.StatusInternalServerError) + + return +} + +func MasterHttpHandler(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(server.Leader())) +} + +func GetHttpHandler(w http.ResponseWriter, req *http.Request) { + key := req.URL.Path[len("/get/"):] + + debug("[recv] GET http://%v/get/%s", server.Name(), key) + + command := &GetCommand{} + command.Key = key + + if body, err := command.Apply(server); err != nil { + warn("raftd: Unable to write file: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } else { + w.WriteHeader(http.StatusOK) + w.Write(body) + return + } + +} + func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/watch/"):] @@ -157,105 +191,19 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { command := &WatchCommand{} command.Key = key - Dispatch(server, command, w) - -} - -func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { - var body []byte - var err error - - debug("Dispatch command") - - // i am the leader, i will take care of the command - if server.State() == "leader" { - // if the command will change the state of the state machine - // the command need to append to the log entry - if command.Sensitive() { - if body, err = server.Do(command); err != nil { - warn("raftd: Unable to write file: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } else { - // good to go - w.WriteHeader(http.StatusOK) - w.Write(body) - return - } - // for non-sentitive command, directly apply it - } else { - if body, err = command.Apply(server); err != nil { - warn("raftd: Unable to write file: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } else { - w.WriteHeader(http.StatusOK) - w.Write(body) - return - } - } - - // redirect the command to the current leader + if body, err := command.Apply(server); err != nil { + warn("raftd: Unable to write file: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return } else { - leaderName := server.Leader() + w.WriteHeader(http.StatusOK) + w.Write(body) + return + } - if leaderName =="" { - // no luckey, during the voting process - // the client need to catch the error and try again - w.WriteHeader(http.StatusInternalServerError) - return - } - - debug("forward command to %s", leaderName) - - path := command.GeneratePath() - - if command.Type() == "POST" { - debug("[send] POST http://%v/%s", leaderName, path) - - reader := bytes.NewReader([]byte(command.GetValue())) - - // t must be ok - t,_ := server.Transporter().(transHandler) - - reps, _ := t.client.Post(fmt.Sprintf("http://%v/%s", - leaderName, command.GeneratePath()), "application/json", reader) - - if reps == nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - // forwarding - w.WriteHeader(reps.StatusCode) - - body, _ := ioutil.ReadAll(reps.Body) - - w.Write(body) - return - - } else if command.Type() == "GET" { - debug("[send] GET http://%v/%s", leaderName, path) - - reps, _ := http.Get(fmt.Sprintf("http://%v/%s", - leaderName, command.GeneratePath())) - - - if reps == nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - // forwarding - body, _ := ioutil.ReadAll(reps.Body) - - w.WriteHeader(reps.StatusCode) - - w.Write(body) - - } else { - //unsupported type - } - - } } + + + + + diff --git a/raftd.go b/raftd.go index e5b212a35..3bc01d183 100644 --- a/raftd.go +++ b/raftd.go @@ -27,6 +27,7 @@ import ( // //------------------------------------------------------------------------------ + var verbose bool var leaderHost string var address string @@ -44,14 +45,18 @@ func init() { flag.StringVar(&certFile, "cert", "", "the cert file of the server") flag.StringVar(&keyFile, "key", "", "the key file of the server") } +// CONSTANTS +const ( + HTTP = iota + HTTPS + HTTPSANDVERIFY +) const ( ELECTIONTIMTOUT = 3 * time.Second HEARTBEATTIMEOUT = 1 * time.Second ) - - //------------------------------------------------------------------------------ // // Typedefs @@ -74,13 +79,6 @@ var logger *log.Logger var storeMsg chan string -// CONSTANTS -const ( - HTTP = iota - HTTPS - HTTPSANDVERIFY -) - //------------------------------------------------------------------------------ // @@ -247,6 +245,7 @@ func startTransport(port int, st int) { http.HandleFunc("/get/", GetHttpHandler) http.HandleFunc("/delete/", DeleteHttpHandler) http.HandleFunc("/watch/", WatchHttpHandler) + http.HandleFunc("/master", MasterHttpHandler) switch st { @@ -375,26 +374,30 @@ func Join(s *raft.Server, serverName string) error { json.NewEncoder(&b).Encode(command) - var resp *http.Response - var err error - // t must be ok t,_ := server.Transporter().(transHandler) - if t.client != nil { - debug("[send] POST https://%v/join", "localhost:4001") - resp, err = t.client.Post(fmt.Sprintf("https://%s/join", serverName), "application/json", &b) - } else { - debug("[send] POST http://%v/join", "localhost:4001") - resp, err = http.Post(fmt.Sprintf("https://%s/join", serverName), "application/json", &b) - } + debug("Send Join Request to %s", serverName) + resp, err := Post(&t, fmt.Sprintf("%s/join", serverName), &b) - if resp != nil { - resp.Body.Close() - if resp.StatusCode == http.StatusOK { - return nil + for { + if resp != nil { + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return nil + } + if resp.StatusCode == http.StatusTemporaryRedirect { + address, err := ioutil.ReadAll(resp.Body) + if err != nil { + warn("Cannot Read Leader info: %v", err) + } + debug("Leader is %s", address) + debug("Send Join Request to %s", address) + json.NewEncoder(&b).Encode(command) + resp, err = Post(&t, fmt.Sprintf("%s/join", address), &b) + } } } - return fmt.Errorf("raftd: Unable to join: %v", err) + return fmt.Errorf("Unable to join: %v", err) } //-------------------------------------- // Web Helper @@ -431,6 +434,27 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) { } } +func Post(t *transHandler, path string, body io.Reader) (*http.Response, error){ + + if t.client != nil { + resp, err := t.client.Post("https://" + path, "application/json", body) + return resp, err + } else { + resp, err := http.Post("http://" + path, "application/json", body) + return resp, err + } +} + +func Get(t *transHandler, path string) (*http.Response, error) { + if t.client != nil { + resp, err := t.client.Get("https://" + path) + return resp, err + } else { + resp, err := http.Get("http://" + path) + return resp, err + } +} + //-------------------------------------- // Log //-------------------------------------- @@ -446,7 +470,7 @@ func info(msg string, v ...interface{}) { } func warn(msg string, v ...interface{}) { - logger.Printf("WARN " + msg + "\n", v...) + logger.Printf("Alpaca Server: WARN " + msg + "\n", v...) } func fatal(msg string, v ...interface{}) { diff --git a/store/store.go b/store/store.go index 0842e108b..db0d26ce4 100644 --- a/store/store.go +++ b/store/store.go @@ -7,6 +7,9 @@ import ( "fmt" ) +// global store +var s *Store + // CONSTANTS const ( ERROR = -1 + iota @@ -15,14 +18,27 @@ const ( GET ) + +var PERMANENT = time.Unix(0,0) + type Store struct { + // use the build-in hash map as the key-value store structure Nodes map[string]Node `json:"nodes"` + + // the string channel to send messages to the outside world + // now we use it to send changes to the hub of the web service messager *chan string } + type Node struct { Value string `json:"value"` + + // if the node is a permanent one the ExprieTime will be Unix(0,0) + // Otherwise after the expireTime, the node will be deleted ExpireTime time.Time `json:"expireTime"` + + // a channel to update the expireTime of the node update chan time.Time `json:"-"` } @@ -31,14 +47,14 @@ type Response struct { Key string `json:"key"` OldValue string `json:"oldValue"` NewValue string `json:"newValue"` + + // if the key existed before the action, this field should be true + // if the key did not exist before the action, this field should be false Exist bool `json:"exist"` + Expiration time.Time `json:"expiration"` } - -// global store -var s *Store - func init() { s = createStore() s.messager = nil @@ -51,10 +67,12 @@ func createStore() *Store{ return s } +// return a pointer to the store func GetStore() *Store { return s } +// set the messager of the store func (s *Store)SetMessager(messager *chan string) { s.messager = messager } @@ -66,44 +84,45 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { var isExpire bool = false - isExpire = !expireTime.Equal(time.Unix(0,0)) + isExpire = !expireTime.Equal(PERMANENT) // when the slow follower receive the set command - // the key may be expired, we need also to delete - // the previous value of key + // the key may be expired, we should not add the node + // also if the node exist, we need to delete the node if isExpire && expireTime.Sub(time.Now()) < 0 { return Delete(key) } + // get the node node, ok := s.Nodes[key] if ok { - // if node is not permanent before // update its expireTime - if !node.ExpireTime.Equal(time.Unix(0,0)) { + if !node.ExpireTime.Equal(PERMANENT) { node.update <- expireTime } else { - - // if we want the permanent to have expire time - // we need to create a chan and create a func + // if we want the permanent node to have expire time + // we need to create a chan and create a go routine if isExpire { node.update = make(chan time.Time) - go expire(key, node.update, expireTime) } } + // update the information of the node node.ExpireTime = expireTime - node.Value = value - notify(SET, key, node.Value, value, true) - msg, err := json.Marshal(Response{SET, key, node.Value, value, true, expireTime}) + resp := Response{SET, key, node.Value, value, true, expireTime} - // notify the web interface + msg, err := json.Marshal(resp) + + notify(resp) + + // send to the messager if (s.messager != nil && err == nil) { *s.messager <- string(msg) @@ -111,21 +130,23 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { return msg, err + // add new node } else { - // add new node update := make(chan time.Time) s.Nodes[key] = Node{value, expireTime, update} - // nofity the watcher - notify(SET, key, "", value, false) - if isExpire { go expire(key, update, expireTime) } - msg, err := json.Marshal(Response{SET, key, "", value, false, expireTime}) + resp := Response{SET, key, "", value, false, expireTime} + + msg, err := json.Marshal(resp) + + // nofity the watcher + notify(resp) // notify the web interface if (s.messager != nil && err == nil) { @@ -137,23 +158,45 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { } } -// delete the key when it expires +// should be used as a go routine to delete the key when it expires func expire(key string, update chan time.Time, expireTime time.Time) { duration := expireTime.Sub(time.Now()) for { select { - // timeout delte key + // timeout delete the node case <-time.After(duration): - fmt.Println("expired at ", time.Now()) - Delete(key) - return + node, ok := s.Nodes[key] + if !ok { + return + } else { + + delete(s.Nodes, key) + + resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime} + + msg, err := json.Marshal(resp) + + notify(resp) + + // notify the messager + if (s.messager != nil && err == nil) { + + *s.messager <- string(msg) + } + + return + + } + case updateTime := <-update: //update duration - if updateTime.Equal(time.Unix(0,0)) { - fmt.Println("node became stable") + // if the node become a permanent one, the go routine is + // not needed + if updateTime.Equal(PERMANENT) { return } + // update duration duration = updateTime.Sub(time.Now()) } } @@ -172,20 +215,33 @@ func Get(key string) Response { } } -// delete the key, return the old value if the key exists +// delete the key func Delete(key string) ([]byte, error) { key = path.Clean(key) node, ok := s.Nodes[key] if ok { - delete(s.Nodes, key) - notify(DELETE, key, node.Value, "", true) + if node.ExpireTime.Equal(PERMANENT) { - msg, err := json.Marshal(Response{DELETE, key, node.Value, "", true, node.ExpireTime}) + delete(s.Nodes, key) - // notify the web interface + } else { + + // kill the expire go routine + node.update <- PERMANENT + delete(s.Nodes, key) + + } + + resp := Response{DELETE, key, node.Value, "", true, node.ExpireTime} + + msg, err := json.Marshal(resp) + + notify(resp) + + // notify the messager if (s.messager != nil && err == nil) { *s.messager <- string(msg) @@ -194,7 +250,6 @@ func Delete(key string) ([]byte, error) { return msg, err } else { - // no notify to the watcher and web interface return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0)}) } @@ -213,20 +268,25 @@ func (s *Store)Save() ([]byte, error) { // recovery the state of the stroage system from a previous state func (s *Store)Recovery(state []byte) error { err := json.Unmarshal(state, s) + + // clean the expired nodes clean() + return err } // clean all expired keys func clean() { for key, node := range s.Nodes{ - // stable node - if node.ExpireTime.Equal(time.Unix(0,0)) { + + if node.ExpireTime.Equal(PERMANENT) { continue } else { + if node.ExpireTime.Sub(time.Now()) >= time.Second { node.update = make(chan time.Time) go expire(key, node.update, node.ExpireTime) + } else { // we should delete this node delete(s.Nodes, key) diff --git a/store/watcher.go b/store/watcher.go index 7fcc9ac8b..2a34b320b 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -4,7 +4,6 @@ import ( "path" "strings" //"fmt" - "time" ) @@ -49,9 +48,10 @@ func AddWatcher(prefix string, c chan Response) error { } // notify the watcher a action happened -func notify(action int, key string, oldValue string, newValue string, exist bool) error { - key = path.Clean(key) - segments := strings.Split(key, "/") +func notify(resp Response) error { + resp.Key = path.Clean(resp.Key) + + segments := strings.Split(resp.Key, "/") currPath := "/" // walk through all the pathes @@ -62,11 +62,9 @@ func notify(action int, key string, oldValue string, newValue string, exist bool if ok { - n := Response {action, key, oldValue, newValue, exist, time.Unix(0, 0)} - // notify all the watchers for _, c := range chans { - c <- n + c <- resp } // we have notified all the watchers at this path diff --git a/trans_handler.go b/trans_handler.go index b852cd295..587f334a0 100644 --- a/trans_handler.go +++ b/trans_handler.go @@ -20,17 +20,8 @@ func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.P var b bytes.Buffer json.NewEncoder(&b).Encode(req) + resp, err := Post(&t, fmt.Sprintf("%s/log/append", peer.Name()), &b) - var resp *http.Response - var err error - - if t.client != nil { - debug("[send] POST https://%s/log/append [%d]", peer.Name(), len(req.Entries)) - resp, err = http.Post(fmt.Sprintf("https://%s/log/append", peer.Name()), "application/json", &b) - } else { - debug("[send] POST http://%s/log/append [%d]", peer.Name(), len(req.Entries)) - resp, err = t.client.Post(fmt.Sprintf("http://%s/log/append", peer.Name()), "application/json", &b) - } if resp != nil { defer resp.Body.Close() aersp = &raft.AppendEntriesResponse{} @@ -48,16 +39,7 @@ func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req var b bytes.Buffer json.NewEncoder(&b).Encode(req) - var resp *http.Response - var err error - - if t.client != nil { - debug("[send] POST https://%s/vote", peer.Name()) - resp, err = t.client.Post(fmt.Sprintf("https://%s/vote", peer.Name()), "application/json", &b) - } else { - debug("[send] POST http://%s/vote", peer.Name()) - resp, err = http.Post(fmt.Sprintf("http://%s/vote", peer.Name()), "application/json", &b) - } + resp, err := Post(&t, fmt.Sprintf("%s/vote", peer.Name()), &b) if resp != nil { defer resp.Body.Close() @@ -76,16 +58,10 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, var b bytes.Buffer json.NewEncoder(&b).Encode(req) - var resp *http.Response - var err error + debug("[send] POST %s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex) + + resp, err := Post(&t, fmt.Sprintf("%s/snapshot", peer.Name()), &b) - if t.client != nil { - debug("[send] POST https://%s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex) - resp, err = t.client.Post(fmt.Sprintf("https://%s/snapshot", peer.Name()), "application/json", &b) - } else { - debug("[send] POST http://%s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex) - resp, err = http.Post(fmt.Sprintf("http://%s/snapshot", peer.Name()), "application/json", &b) - } if resp != nil { defer resp.Body.Close() aersp = &raft.SnapshotResponse{}