diff --git a/command.go b/command.go index 65311e43d..dd288f3bf 100644 --- a/command.go +++ b/command.go @@ -10,16 +10,15 @@ import ( "encoding/json" ) - // A command represents an action to be taken on the replicated state machine. type Command interface { CommandName() string Apply(server *raft.Server) ([]byte, error) - GeneratePath() string - Type() string + GeneratePath() string // Gererate a path for http request + Type() string // http request type GetValue() string GetKey() string - Sensitive() bool + Sensitive() bool // Sensitive to the stateMachine } // Set command @@ -39,10 +38,12 @@ func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) { return json.Marshal(res) } +// Get the path for http request func (c *SetCommand) GeneratePath() string { return "set/" + c.Key } +// Get the type for http request func (c *SetCommand) Type() string { return "POST" } @@ -96,6 +97,7 @@ func (c *GetCommand) Sensitive() bool { return false } + // Delete command type DeleteCommand struct { Key string `json:"key"` @@ -146,8 +148,10 @@ func (c *WatchCommand) CommandName() string { func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){ ch := make(chan Response) + // add to the watchers list w.add(c.Key, ch) + // wait for the notification for any changing res := <- ch return json.Marshal(res) @@ -173,6 +177,7 @@ func (c *WatchCommand) Sensitive() bool { return false } + // JoinCommand type JoinCommand struct { Name string `json:"name"` @@ -184,5 +189,6 @@ func (c *JoinCommand) CommandName() string { func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) { err := server.AddPeer(c.Name) + // no result will be returned return nil, err } diff --git a/handlers.go b/handlers.go index c1aad8a77..3a8010e5d 100644 --- a/handlers.go +++ b/handlers.go @@ -9,12 +9,12 @@ import ( "bytes" ) + //-------------------------------------- // HTTP Handlers //-------------------------------------- - - +// Get all the current logs func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { debug("[recv] GET http://%v/log", server.Name()) w.Header().Set("Content-Type", "application/json") @@ -33,6 +33,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) { return } } + warn("[vote] ERROR: %v", err) w.WriteHeader(http.StatusInternalServerError) } @@ -41,19 +42,16 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { err := decodeJsonRequest(req, aereq) if err == nil { debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries)) - debug("My role is %s", server.State()) if resp, _ := server.AppendEntries(aereq); resp != nil { - debug("write back success") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp) if !resp.Success { - fmt.Println("append error") + debug("[Append Entry] Step back") } return } } warn("[append] ERROR: %v", err) - debug("write back") w.WriteHeader(http.StatusInternalServerError) } @@ -96,6 +94,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) { debug("[recv] POST http://%v/set/%s", server.Name(), key) content, err := ioutil.ReadAll(req.Body) + if err != nil { warn("raftd: Unable to read: %v", err) w.WriteHeader(http.StatusInternalServerError) @@ -107,7 +106,6 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) { command.Value = string(content) Dispatch(server, command, w) - } func GetHttpHandler(w http.ResponseWriter, req *http.Request) { @@ -131,7 +129,6 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) { command.Key = key Dispatch(server, command, w) - } @@ -151,91 +148,94 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) { var body []byte var err error + debug("Dispatch command") - fmt.Println("dispatch") - // unlikely to fail twice - for { - // i am the leader, i will take care of the command - if server.State() == "leader" { - if command.Sensitive() { - if body, err = server.Do(command); err != nil { - warn("raftd: Unable to write file: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } else { - // good to go - w.WriteHeader(http.StatusOK) - w.Write(body) - return - } - } else { - fmt.Println("non-sensitive") - if body, err = command.Apply(server); err != nil { - warn("raftd: Unable to write file: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } else { - // good to go - w.WriteHeader(http.StatusOK) - w.Write(body) - return - } - } - - // redirect the command to the current leader - } else { - leaderName := server.Leader() - - if leaderName =="" { - // no luckey, during the voting process + // i am the leader, i will take care of the command + if server.State() == "leader" { + // if the command will change the state of the state machine + // the command need to append to the log entry + if command.Sensitive() { + if body, err = server.Do(command); err != nil { + warn("raftd: Unable to write file: %v", err) w.WriteHeader(http.StatusInternalServerError) return - } - - fmt.Println("forward to ", leaderName) - - path := command.GeneratePath() - - if command.Type() == "POST" { - debug("[send] POST http://%v/%s", leaderName, path) - - reader := bytes.NewReader([]byte(command.GetValue())) - - reps, _ := http.Post(fmt.Sprintf("http://%v/%s", - leaderName, command.GeneratePath()), "application/json", reader) - - body, _ := ioutil.ReadAll(reps.Body) - fmt.Println(body) + } else { // good to go w.WriteHeader(http.StatusOK) - w.Write(body) + return + } + // for non-sentitive command, directly apply it + } else { + if body, err = command.Apply(server); err != nil { + warn("raftd: Unable to write file: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } else { + w.WriteHeader(http.StatusOK) + w.Write(body) + return + } + } + + // redirect the command to the current leader + } else { + leaderName := server.Leader() + + if leaderName =="" { + // no luckey, during the voting process + // the client need to catch the error and try again + w.WriteHeader(http.StatusInternalServerError) + return + } + + debug("forward command to %s", leaderName) + + path := command.GeneratePath() + + if command.Type() == "POST" { + debug("[send] POST http://%v/%s", leaderName, path) + + reader := bytes.NewReader([]byte(command.GetValue())) + + reps, _ := http.Post(fmt.Sprintf("http://%v/%s", + leaderName, command.GeneratePath()), "application/json", reader) + + if reps == nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + // forwarding + w.WriteHeader(reps.StatusCode) + + body, _ := ioutil.ReadAll(reps.Body) + + w.Write(body) + return } else if command.Type() == "GET" { debug("[send] GET http://%v/%s", leaderName, path) reps, _ := http.Get(fmt.Sprintf("http://%v/%s", leaderName, command.GeneratePath())) - // good to go - body, _ := ioutil.ReadAll(reps.Body) - fmt.Println(body) - w.WriteHeader(http.StatusOK) - + + if reps == nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + // forwarding + body, _ := ioutil.ReadAll(reps.Body) + + w.WriteHeader(reps.StatusCode) + w.Write(body) } else { - //unsupported type - } - - if err != nil { - // should check other errors - continue - } else { - //good to go - return + //unsupported type } } - } } diff --git a/raftd.go b/raftd.go index c03018599..6ac5d414e 100644 --- a/raftd.go +++ b/raftd.go @@ -13,6 +13,7 @@ import ( "strings" "os" "time" + "strconv" ) //------------------------------------------------------------------------------ @@ -22,12 +23,22 @@ import ( //------------------------------------------------------------------------------ var verbose bool +var leaderHost string +var address string func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") - flag.BoolVar(&verbose, "verbose", false, "verbose logging") + flag.StringVar(&leaderHost, "c", "", "join to a existing cluster") + flag.StringVar(&address, "a", "", "the address of the local machine") } +const ( + ELECTIONTIMTOUT = 3 * time.Second + HEARTBEATTIMEOUT = 1 * time.Second +) + + + //------------------------------------------------------------------------------ // // Typedefs @@ -62,9 +73,6 @@ func main() { var err error logger = log.New(os.Stdout, "", log.LstdFlags) flag.Parse() - if verbose { - fmt.Println("Verbose logging enabled.\n") - } // Setup commands. raft.RegisterCommand(&JoinCommand{}) @@ -85,65 +93,54 @@ func main() { // Read server info from file or grab it from user. var info *Info = getInfo(path) + name := fmt.Sprintf("%s:%d", info.Host, info.Port) + fmt.Printf("Name: %s\n\n", name) t := transHandler{} // Setup new raft server. server, err = raft.NewServer(name, path, t, s, nil) - //server.DoHandler = DoHandler; if err != nil { fatal("%v", err) } server.LoadSnapshot() server.Initialize() - fmt.Println("1 join as ", server.State(), " term ", server.Term()) - // Join to another server if we don't have a log. + server.SetElectionTimeout(ELECTIONTIMTOUT) + server.SetHeartbeatTimeout(HEARTBEATTIMEOUT) + if server.IsLogEmpty() { - var leaderHost string - fmt.Println("2 join as ", server.State(), " term ", server.Term()) - fmt.Println("This server has no log. Please enter a server in the cluster to join\nto or hit enter to initialize a cluster."); - fmt.Printf("Join to (host:port)> "); - fmt.Scanf("%s", &leaderHost) - fmt.Println("3 join as ", server.State(), " term ", server.Term()) + + // start as a leader in a new cluster if leaderHost == "" { - fmt.Println("init") - //server.SetElectionTimeout(300 * time.Millisecond) - //server.SetHeartbeatTimeout(100 * time.Millisecond) - server.SetElectionTimeout(3 * time.Second) - server.SetHeartbeatTimeout(1 * time.Second) server.StartHeartbeatTimeout() server.StartLeader() - // join self + + // join self as a peer command := &JoinCommand{} command.Name = server.Name() - server.Do(command) + + // start as a fellower in a existing cluster } else { - //server.SetElectionTimeout(300 * time.Millisecond) - //server.SetHeartbeatTimeout(100 * time.Millisecond) - server.SetElectionTimeout(3 * time.Second) - server.SetHeartbeatTimeout(1 * time.Second) server.StartElectionTimeout() server.StartFollower() - fmt.Println("4 join as ", server.State(), " term ", server.Term()) Join(server, leaderHost) fmt.Println("success join") } + + // rejoin the previous cluster } else { - //server.SetElectionTimeout(300 * time.Millisecond) - //server.SetHeartbeatTimeout(100 * time.Millisecond) - server.SetElectionTimeout(3 * time.Second) - server.SetHeartbeatTimeout(1 * time.Second) server.StartElectionTimeout() server.StartFollower() } + + // open the snapshot go server.Snapshot() - // open snapshot - //go server.Snapshot() + // internal commands http.HandleFunc("/join", JoinHttpHandler) @@ -158,6 +155,7 @@ func main() { http.HandleFunc("/delete/", DeleteHttpHandler) http.HandleFunc("/watch/", WatchHttpHandler) + // listen on http port log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil)) } @@ -186,17 +184,24 @@ func getInfo(path string) *Info { // Otherwise ask user for info and write it to file. } else { - fmt.Printf("Enter hostname: [localhost] "); - fmt.Scanf("%s", &info.Host) - info.Host = strings.TrimSpace(info.Host) - if info.Host == "" { - info.Host = "localhost" + + if address == "" { + fatal("Please give the address of the local machine") } - fmt.Printf("Enter port: [4001] "); - fmt.Scanf("%d", &info.Port) - if info.Port == 0 { - info.Port = 4001 + input := strings.Split(address, ":") + + if len(input) != 2 { + fatal("Wrong address %s", address) + } + + info.Host = input[0] + info.Host = strings.TrimSpace(info.Host) + + info.Port, err = strconv.Atoi(input[1]) + + if err != nil { + fatal("Wrong port %s", address) } // Write to file. @@ -218,6 +223,7 @@ func getInfo(path string) *Info { // Send join requests to the leader. func Join(s *raft.Server, serverName string) error { var b bytes.Buffer + command := &JoinCommand{} command.Name = s.Name() diff --git a/store.go b/store.go index 8e8a78230..d26554169 100644 --- a/store.go +++ b/store.go @@ -3,7 +3,6 @@ package main import ( "path" "encoding/json" - //"fmt" ) // CONSTANTS diff --git a/trans_handler.go b/trans_handler.go index bed91d950..1ee2e8f0e 100644 --- a/trans_handler.go +++ b/trans_handler.go @@ -64,6 +64,5 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, return aersp, nil } } - fmt.Println("error send snapshot") return aersp, fmt.Errorf("raftd: Unable to send snapshot: %v", err) } \ No newline at end of file diff --git a/watcher.go b/watcher.go index efe90acfe..43682b696 100644 --- a/watcher.go +++ b/watcher.go @@ -3,7 +3,6 @@ package main import ( "path" "strings" - "fmt" ) @@ -30,7 +29,7 @@ func createWatcher() *Watcher { func (w *Watcher) add(prefix string, c chan Response) error { prefix = "/" + path.Clean(prefix) - fmt.Println("Add ", prefix) + debug("Add a watche at ", prefix) _, ok := w.chanMap[prefix] if !ok { @@ -40,15 +39,12 @@ func (w *Watcher) add(prefix string, c chan Response) error { w.chanMap[prefix] = append(w.chanMap[prefix], c) } - fmt.Println(len(w.chanMap[prefix]), "@", prefix) - return nil } // notify the watcher a action happened func (w *Watcher) notify(action int, key string, oldValue string, newValue string, exist bool) error { key = path.Clean(key) - fmt.Println("notify") segments := strings.Split(key, "/") currPath := "/" @@ -58,19 +54,18 @@ func (w *Watcher) notify(action int, key string, oldValue string, newValue strin currPath := path.Join(currPath, segment) - fmt.Println(currPath) - chans, ok := w.chanMap[currPath] if ok { - fmt.Println("found ", currPath) + debug("Notify at ", currPath) n := Response {action, key, oldValue, newValue, exist} + // notify all the watchers for _, c := range chans { c <- n } - + // we have notified all the watchers at this path // delete the map delete(w.chanMap, currPath)