diff --git a/command.go b/command.go index cd259ef10..a09199c77 100644 --- a/command.go +++ b/command.go @@ -8,7 +8,7 @@ package main import ( "encoding/json" - "github.com/benbjohnson/go-raft" + "github.com/xiangli-cmu/go-raft" "github.com/xiangli-cmu/raft-etcd/store" "time" ) @@ -16,7 +16,7 @@ import ( // A command represents an action to be taken on the replicated state machine. type Command interface { CommandName() string - Apply(server *raft.Server) ([]byte, error) + Apply(server *raft.Server) (interface {}, error) } // Set command @@ -32,7 +32,7 @@ func (c *SetCommand) CommandName() string { } // Set the value of key to value -func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) { +func (c *SetCommand) Apply(server *raft.Server) (interface {}, error) { return store.Set(c.Key, c.Value, c.ExpireTime) } @@ -52,7 +52,7 @@ func (c *GetCommand) CommandName() string { } // Set the value of key to value -func (c *GetCommand) Apply(server *raft.Server) ([]byte, error) { +func (c *GetCommand) Apply(server *raft.Server) (interface {}, error) { res := store.Get(c.Key) return json.Marshal(res) } @@ -72,7 +72,7 @@ func (c *DeleteCommand) CommandName() string { } // Delete the key -func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error) { +func (c *DeleteCommand) Apply(server *raft.Server) (interface {}, error) { return store.Delete(c.Key) } @@ -86,7 +86,7 @@ func (c *WatchCommand) CommandName() string { return "watch" } -func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error) { +func (c *WatchCommand) Apply(server *raft.Server) (interface {}, error) { ch := make(chan store.Response) // add to the watchers list @@ -107,7 +107,7 @@ func (c *JoinCommand) CommandName() string { return "join" } -func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) { +func (c *JoinCommand) Apply(server *raft.Server) (interface {}, error) { err := server.AddPeer(c.Name) // no result will be returned return nil, err diff --git a/handlers.go b/handlers.go index fa846b504..fc0b88735 100644 --- a/handlers.go +++ b/handlers.go @@ -2,7 +2,7 @@ package main import ( "encoding/json" - "github.com/benbjohnson/go-raft" + "github.com/xiangli-cmu/go-raft" "net/http" //"fmt" "io/ioutil" @@ -134,11 +134,21 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) { func excute(c Command, w *http.ResponseWriter) { if server.State() == "leader" { if body, err := server.Do(c); err != nil { - warn("raftd: Unable to write file: %v", err) + warn("Commit failed %v", err) (*w).WriteHeader(http.StatusInternalServerError) return } else { (*w).WriteHeader(http.StatusOK) + + if body == nil { + return + } + + body, ok := body.([]byte) + if !ok { + panic ("wrong type") + } + (*w).Write(body) return } @@ -174,6 +184,12 @@ func GetHttpHandler(w http.ResponseWriter, req *http.Request) { return } else { w.WriteHeader(http.StatusOK) + + body, ok := body.([]byte) + if !ok { + panic ("wrong type") + } + w.Write(body) return } @@ -194,6 +210,12 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { return } else { w.WriteHeader(http.StatusOK) + + body, ok := body.([]byte) + if !ok { + panic ("wrong type") + } + w.Write(body) return } diff --git a/raftd.go b/raftd.go index 6778acc69..860dd48f1 100644 --- a/raftd.go +++ b/raftd.go @@ -8,7 +8,7 @@ import ( "encoding/pem" "flag" "fmt" - "github.com/benbjohnson/go-raft" + "github.com/xiangli-cmu/go-raft" "github.com/xiangli-cmu/raft-etcd/store" "github.com/xiangli-cmu/raft-etcd/web" "io" @@ -53,8 +53,8 @@ const ( ) const ( - ELECTIONTIMTOUT = 3 * time.Second - HEARTBEATTIMEOUT = 1 * time.Second + ELECTIONTIMTOUT = 200 * time.Millisecond + HEARTBEATTIMEOUT = 50 * time.Millisecond ) //------------------------------------------------------------------------------ @@ -149,7 +149,6 @@ func main() { // start as a leader in a new cluster if leaderHost == "" { - server.StartHeartbeatTimeout() server.StartLeader() // join self as a peer @@ -160,7 +159,6 @@ func main() { // start as a fellower in a existing cluster } else { - server.StartElectionTimeout() server.StartFollower() err := Join(server, leaderHost) @@ -172,7 +170,6 @@ func main() { // rejoin the previous cluster } else { - server.StartElectionTimeout() server.StartFollower() debug("%s start as a follower", server.Name()) } @@ -248,6 +245,7 @@ func startTransport(port int, st int) { switch st { case HTTP: + debug("%s listen on http", server.Name()) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) case HTTPS: diff --git a/trans_handler.go b/trans_handler.go index db23ab3fd..06cc98346 100644 --- a/trans_handler.go +++ b/trans_handler.go @@ -4,7 +4,7 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/benbjohnson/go-raft" + "github.com/xiangli-cmu/go-raft" "io" "net/http" ) diff --git a/web/web.go b/web/web.go index 137e10ba2..1e4798fe0 100644 --- a/web/web.go +++ b/web/web.go @@ -3,7 +3,7 @@ package web import ( "code.google.com/p/go.net/websocket" "fmt" - "github.com/benbjohnson/go-raft" + "github.com/xiangli-cmu/go-raft" "github.com/xiangli-cmu/raft-etcd/store" "html/template" "net/http"