From 69a8116272c8d61cee291f514c3cd709b99bd61f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 9 Jun 2013 10:42:34 -0700 Subject: [PATCH] make store system communicate with raft --- command.go | 135 ++++++++++++++++++++++++ handlers.go | 190 ++++++++++++++++++++++++++++++++++ raftd.go | 259 +++++++++++++++++++++++++++++++++++++++++++++++ store.go | 31 +++--- store_test.go | 2 +- trans_handler.go | 69 +++++++++++++ watcher.go | 2 +- watcher_test.go | 2 +- 8 files changed, 675 insertions(+), 15 deletions(-) create mode 100644 command.go create mode 100644 handlers.go create mode 100644 raftd.go create mode 100644 trans_handler.go diff --git a/command.go b/command.go new file mode 100644 index 000000000..a1e6adb93 --- /dev/null +++ b/command.go @@ -0,0 +1,135 @@ +package main +//------------------------------------------------------------------------------ +// +// Commands +// +//------------------------------------------------------------------------------ + +import ( + "github.com/benbjohnson/go-raft" + "encoding/json" + ) + + +// A command represents an action to be taken on the replicated state machine. +type Command interface { + CommandName() string + Apply(server *raft.Server) ([]byte, error) + GeneratePath() string + Type() string + GetValue() string + GetKey() string +} + +// Set command +type SetCommand struct { + Key string `json:"key"` + Value string `json:"value"` +} + +// The name of the command in the log +func (c *SetCommand) CommandName() string { + return "set" +} + +// Set the value of key to value +func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) { + res := s.Set(c.Key, c.Value) + return json.Marshal(res) +} + +func (c *SetCommand) GeneratePath() string{ + return "/set/" + c.Key +} + +func (c *SetCommand) Type() string{ + return "POST" +} + +func (c *SetCommand) GetValue() string{ + return c.Value +} + +func (c *SetCommand) GetKey() string{ + return c.Key +} + + +// Get command +type GetCommand struct { + Key string `json:"key"` +} + +// The name of the command in the log +func (c *GetCommand) CommandName() string { + return "get" +} + +// Set the value of key to value +func (c *GetCommand) Apply(server *raft.Server) ([]byte, error){ + res := s.Get(c.Key) + return json.Marshal(res) +} + +func (c *GetCommand) GeneratePath() string{ + return "/get/" + c.Key +} + +func (c *GetCommand) Type() string{ + return "GET" +} + +func (c *GetCommand) GetValue() string{ + return "" +} + +func (c *GetCommand) GetKey() string{ + return c.Key +} + + +// Delete command +type DeleteCommand struct { + Key string `json:"key"` +} + +// The name of the command in the log +func (c *DeleteCommand) CommandName() string { + return "delete" +} + +// Set the value of key to value +func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){ + res := s.Delete(c.Key) + return json.Marshal(res) +} + +func (c *DeleteCommand) GeneratePath() string{ + return "/delete/" + c.Key +} + +func (c *DeleteCommand) Type() string{ + return "GET" +} + +func (c *DeleteCommand) GetValue() string{ + return "" +} + +func (c *DeleteCommand) GetKey() string{ + return c.Key +} + +// joinCommand +type joinCommand struct { + Name string `json:"name"` +} + +func (c *joinCommand) CommandName() string { + return "join" +} + +func (c *joinCommand) Apply(server *raft.Server) ([]byte, error) { + err := server.AddPeer(c.Name) + return nil, err +} diff --git a/handlers.go b/handlers.go new file mode 100644 index 000000000..b23486812 --- /dev/null +++ b/handlers.go @@ -0,0 +1,190 @@ +package main + +import ( + "github.com/benbjohnson/go-raft" + "net/http" + "encoding/json" + "fmt" + "github.com/gorilla/mux" + "io/ioutil" + "bytes" + ) + +//-------------------------------------- +// HTTP Handlers +//-------------------------------------- + + + +func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { + debug("[recv] GET http://%v/log", server.Name()) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(server.LogEntries()) +} + +func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { + debug("[recv] POST http://%v/join", server.Name()) + command := &joinCommand{} + if err := decodeJsonRequest(req, command); err == nil { + if _, err= server.Do(command); err != nil { + warn("raftd: Unable to join: %v", err) + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) + } + } else { + warn("[join] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) + } +} + +func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { + rvreq := &raft.RequestVoteRequest{} + err := decodeJsonRequest(req, rvreq) + if err == nil { + debug("[recv] POST http://%v/vote [%s]", server.Name(), rvreq.CandidateName) + if resp, _ := server.RequestVote(rvreq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + w.WriteHeader(http.StatusInternalServerError) +} + +func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { + aereq := &raft.AppendEntriesRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries)) + if resp, _ := server.AppendEntries(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + if !resp.Success { + fmt.Println("append error") + } + return + } + } + warn("[append] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + +func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { + aereq := &raft.SnapshotRequest{} + err := decodeJsonRequest(req, aereq) + if err == nil { + debug("[recv] POST http://%s/snapshot/ ", server.Name()) + if resp, _ := server.SnapshotRecovery(aereq); resp != nil { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) + return + } + } + warn("[snapshot] ERROR: %v", err) + w.WriteHeader(http.StatusInternalServerError) +} + +func SetHttpHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + + debug("[recv] POST http://%v/set/%s", server.Name(), vars["key"]) + + content, err := ioutil.ReadAll(req.Body) + if err != nil { + warn("raftd: Unable to read: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + command := &SetCommand{} + command.Key = vars["key"] + command.Value = string(content) + + Dispatch(server, command, w) + +} + +func GetHttpHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + + debug("[recv] GET http://%v/get/%s", server.Name(), vars["key"]) + + command := &GetCommand{} + command.Key = vars["key"] + + Dispatch(server, command, w) + +} + + +func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { + var body []byte + var err error + // unlikely to fail twice + for { + // i am the leader, i will take care of the command + if server.State() == "leader" { + if body, err = server.Do(command); err != nil { + warn("raftd: Unable to write file: %v", err) + w.WriteHeader(http.StatusInternalServerError) + } else { + // good to go + w.WriteHeader(http.StatusOK) + w.Write(body) + return + } + + // redirect the command to the current leader + } else { + leaderName := server.Leader() + + if leaderName =="" { + // no luckey, during the voting process + continue + } + + path := command.GeneratePath() + + if command.Type() == "POST" { + debug("[send] POST http://%v/%s", leaderName, path) + + reader := bytes.NewReader([]byte(command.GetValue())) + + reps, _ := http.Post(fmt.Sprintf("http://%v/%s", + leaderName, command.GeneratePath()), "application/json", reader) + + reps.Body.Read(body) + // good to go + w.WriteHeader(http.StatusOK) + + w.Write(body) + + } else if command.Type() == "GET" { + debug("[send] GET http://%v/%s", leaderName, path) + + reps, _ := http.Get(fmt.Sprintf("http://%v/%s", + leaderName, command.GeneratePath())) + // good to go + reps.Body.Read(body) + + w.WriteHeader(http.StatusOK) + + w.Write(body) + + } else { + //unsupported type + } + + if err != nil { + // should check other errors + continue + } else { + //good to go + return + } + + } + } +} diff --git a/raftd.go b/raftd.go new file mode 100644 index 000000000..e6787d851 --- /dev/null +++ b/raftd.go @@ -0,0 +1,259 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "github.com/benbjohnson/go-raft" + "github.com/gorilla/mux" + "log" + "io" + "io/ioutil" + "net/http" + "strings" + "os" + "time" +) + +//------------------------------------------------------------------------------ +// +// Initialization +// +//------------------------------------------------------------------------------ + +var verbose bool + +func init() { + flag.BoolVar(&verbose, "v", false, "verbose logging") + flag.BoolVar(&verbose, "verbose", false, "verbose logging") +} + +//------------------------------------------------------------------------------ +// +// Typedefs +// +//------------------------------------------------------------------------------ + +type Info struct { + Host string `json:"host"` + Port int `json:"port"` +} + +//------------------------------------------------------------------------------ +// +// Variables +// +//------------------------------------------------------------------------------ + +var server *raft.Server +var logger *log.Logger + +//------------------------------------------------------------------------------ +// +// Functions +// +//------------------------------------------------------------------------------ + +//-------------------------------------- +// Main +//-------------------------------------- + +func main() { + var err error + logger = log.New(os.Stdout, "", log.LstdFlags) + flag.Parse() + if verbose { + fmt.Println("Verbose logging enabled.\n") + } + + // Setup commands. + raft.RegisterCommand(&joinCommand{}) + raft.RegisterCommand(&SetCommand{}) + raft.RegisterCommand(&GetCommand{}) + raft.RegisterCommand(&DeleteCommand{}) + + // Use the present working directory if a directory was not passed in. + var path string + if flag.NArg() == 0 { + path, _ = os.Getwd() + } else { + path = flag.Arg(0) + if err := os.MkdirAll(path, 0744); err != nil { + fatal("Unable to create path: %v", err) + } + } + + // Read server info from file or grab it from user. + var info *Info = getInfo(path) + name := fmt.Sprintf("%s:%d", info.Host, info.Port) + fmt.Printf("Name: %s\n\n", name) + + t := transHandler{} + + // Setup new raft server. + server, err = raft.NewServer(name, path, t, nil) + //server.DoHandler = DoHandler; + server.SetElectionTimeout(2 * time.Second) + server.SetHeartbeatTimeout(1 * time.Second) + if err != nil { + fatal("%v", err) + } + server.Start() + + // Join to another server if we don't have a log. + if server.IsLogEmpty() { + var leaderHost string + fmt.Println("This server has no log. Please enter a server in the cluster to join\nto or hit enter to initialize a cluster."); + fmt.Printf("Join to (host:port)> "); + fmt.Scanf("%s", &leaderHost) + if leaderHost == "" { + server.Initialize() + } else { + join(server) + fmt.Println("success join") + } + } + // open snapshot + //go server.Snapshot() + + // Create HTTP interface. + r := mux.NewRouter() + + // internal commands + r.HandleFunc("/join", JoinHttpHandler).Methods("POST") + r.HandleFunc("/vote", VoteHttpHandler).Methods("POST") + r.HandleFunc("/log", GetLogHttpHandler).Methods("GET") + r.HandleFunc("/log/append", AppendEntriesHttpHandler).Methods("POST") + r.HandleFunc("/snapshot", SnapshotHttpHandler).Methods("POST") + + // external commands + r.HandleFunc("/set/{key}", SetHttpHandler).Methods("POST") + r.HandleFunc("/get/{key}", GetHttpHandler).Methods("GET") + //r.HandleFunc("/delete/{key}", DeleteHttpHandler).Methods("GET") + + http.Handle("/", r) + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil)) +} + +func usage() { + fatal("usage: raftd [PATH]") +} + +//-------------------------------------- +// Config +//-------------------------------------- + +func getInfo(path string) *Info { + info := &Info{} + + // Read in the server info if available. + infoPath := fmt.Sprintf("%s/info", path) + if file, err := os.Open(infoPath); err == nil { + if content, err := ioutil.ReadAll(file); err != nil { + fatal("Unable to read info: %v", err) + } else { + if err = json.Unmarshal(content, &info); err != nil { + fatal("Unable to parse info: %v", err) + } + } + file.Close() + + // Otherwise ask user for info and write it to file. + } else { + fmt.Printf("Enter hostname: [localhost] "); + fmt.Scanf("%s", &info.Host) + info.Host = strings.TrimSpace(info.Host) + if info.Host == "" { + info.Host = "localhost" + } + + fmt.Printf("Enter port: [4001] "); + fmt.Scanf("%d", &info.Port) + if info.Port == 0 { + info.Port = 4001 + } + + // Write to file. + content, _ := json.Marshal(info) + content = []byte(string(content) + "\n") + if err := ioutil.WriteFile(infoPath, content, 0644); err != nil { + fatal("Unable to write info to file: %v", err) + } + } + + return info +} + + +//-------------------------------------- +// Handlers +//-------------------------------------- + +// Send join requests to the leader. +func join(s *raft.Server) error { + var b bytes.Buffer + command := &joinCommand{} + command.Name = s.Name() + + json.NewEncoder(&b).Encode(command) + debug("[send] POST http://%v/join", "localhost:4001") + resp, err := http.Post(fmt.Sprintf("http://%s/join", "localhost:4001"), "application/json", &b) + if resp != nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return nil + } + } + return fmt.Errorf("raftd: Unable to join: %v", err) +} + + +//-------------------------------------- +// HTTP Utilities +//-------------------------------------- + +func decodeJsonRequest(req *http.Request, data interface{}) error { + decoder := json.NewDecoder(req.Body) + if err := decoder.Decode(&data); err != nil && err != io.EOF { + logger.Println("Malformed json request: %v", err) + return fmt.Errorf("Malformed json request: %v", err) + } + return nil +} + +func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + + if data != nil { + encoder := json.NewEncoder(w) + encoder.Encode(data) + } +} + +//-------------------------------------- +// Log +//-------------------------------------- + +func debug(msg string, v ...interface{}) { + if verbose { + logger.Printf("DEBUG " + msg + "\n", v...) + } +} + +func info(msg string, v ...interface{}) { + logger.Printf("INFO " + msg + "\n", v...) +} + +func warn(msg string, v ...interface{}) { + logger.Printf("WARN " + msg + "\n", v...) +} + +func fatal(msg string, v ...interface{}) { + logger.Printf("FATAL " + msg + "\n", v...) + os.Exit(1) +} + + + diff --git a/store.go b/store.go index 2686b1bb9..a973376fc 100644 --- a/store.go +++ b/store.go @@ -1,9 +1,9 @@ -package raftd +package main import ( "path" - "errors" "encoding/json" + "fmt" ) // CONSTANTS @@ -17,6 +17,12 @@ type Store struct { Nodes map[string]string `json:"nodes"` } +type Response struct { + OldValue string `json:oldvalue` + Exist bool `json:exist` +} + + // global store var s *Store @@ -32,8 +38,8 @@ func createStore() *Store{ } // set the key to value, return the old value if the key exists -func (s *Store) Set(key string, value string) (string, bool) { - +func (s *Store) Set(key string, value string) Response { + fmt.Println("Store SET") key = path.Clean(key) oldValue, ok := s.Nodes[key] @@ -41,30 +47,31 @@ func (s *Store) Set(key string, value string) (string, bool) { if ok { s.Nodes[key] = value w.notify(SET, key, oldValue, value) - return oldValue, true + return Response{oldValue, true} } else { s.Nodes[key] = value w.notify(SET, key, "", value) - return "", false + return Response{"", false} } } // get the value of the key -func (s *Store) Get(key string) (string, error) { +func (s *Store) Get(key string) Response { + fmt.Println("Stroe Get") key = path.Clean(key) value, ok := s.Nodes[key] if ok { - return value, nil + return Response{value, true} } else { - return "", errors.New("Key does not exist") + return Response{"", false} } } // delete the key, return the old value if the key exists -func (s *Store) Delete(key string) (string, error) { +func (s *Store) Delete(key string) Response { key = path.Clean(key) oldValue, ok := s.Nodes[key] @@ -74,9 +81,9 @@ func (s *Store) Delete(key string) (string, error) { w.notify(DELETE, key, oldValue, "") - return oldValue, nil + return Response{oldValue, true} } else { - return "", errors.New("Key does not exist") + return Response{"", false} } } diff --git a/store_test.go b/store_test.go index 934b80a6b..705b901f9 100644 --- a/store_test.go +++ b/store_test.go @@ -1,4 +1,4 @@ -package raftd +package main import ( "testing" diff --git a/trans_handler.go b/trans_handler.go new file mode 100644 index 000000000..bed91d950 --- /dev/null +++ b/trans_handler.go @@ -0,0 +1,69 @@ +package main + +import( + "encoding/json" + "github.com/benbjohnson/go-raft" + "bytes" + "net/http" + "fmt" + "io" +) + +type transHandler struct { + name string +} + +// Sends AppendEntries RPCs to a peer when the server is the leader. +func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) { + var aersp *raft.AppendEntriesResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + debug("[send] POST http://%s/log/append [%d]", peer.Name(), len(req.Entries)) + resp, err := http.Post(fmt.Sprintf("http://%s/log/append", peer.Name()), "application/json", &b) + if resp != nil { + defer resp.Body.Close() + aersp = &raft.AppendEntriesResponse{} + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + return aersp, nil + } + + } + return aersp, fmt.Errorf("raftd: Unable to append entries: %v", err) +} + +// Sends RequestVote RPCs to a peer when the server is the candidate. +func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) (*raft.RequestVoteResponse, error) { + var rvrsp *raft.RequestVoteResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + debug("[send] POST http://%s/vote", peer.Name()) + resp, err := http.Post(fmt.Sprintf("http://%s/vote", peer.Name()), "application/json", &b) + if resp != nil { + defer resp.Body.Close() + rvrsp := &raft.RequestVoteResponse{} + if err = json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { + return rvrsp, nil + } + + } + return rvrsp, fmt.Errorf("raftd: Unable to request vote: %v", err) +} + +// Sends SnapshotRequest RPCs to a peer when the server is the candidate. +func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) (*raft.SnapshotResponse, error) { + var aersp *raft.SnapshotResponse + var b bytes.Buffer + json.NewEncoder(&b).Encode(req) + debug("[send] POST http://%s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex) + resp, err := http.Post(fmt.Sprintf("http://%s/snapshot", peer.Name()), "application/json", &b) + if resp != nil { + defer resp.Body.Close() + aersp = &raft.SnapshotResponse{} + if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { + + return aersp, nil + } + } + fmt.Println("error send snapshot") + return aersp, fmt.Errorf("raftd: Unable to send snapshot: %v", err) +} \ No newline at end of file diff --git a/watcher.go b/watcher.go index d111f302d..168f1045c 100644 --- a/watcher.go +++ b/watcher.go @@ -1,4 +1,4 @@ -package raftd +package main import ( "path" diff --git a/watcher_test.go b/watcher_test.go index 108d63144..ab8003902 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -1,4 +1,4 @@ -package raftd +package main import ( "testing"