From ef59a03fbbcbf57c23363cab3cda317e3b0364fe Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 20 Jun 2013 15:59:23 -0700 Subject: [PATCH] gofmt --- command.go | 27 ++++---- handlers.go | 29 +++------ raftd.go | 147 ++++++++++++++++++++---------------------- store/store.go | 60 +++++++++-------- store/store_test.go | 29 ++++----- store/watcher.go | 9 ++- store/watcher_test.go | 2 +- trans_handler.go | 29 +++++---- web/conn.go | 12 ++-- web/hub.go | 80 +++++++++++------------ web/web.go | 69 +++++++++----------- 11 files changed, 233 insertions(+), 260 deletions(-) diff --git a/command.go b/command.go index 0510f7563..e7369cf44 100644 --- a/command.go +++ b/command.go @@ -1,4 +1,5 @@ package main + //------------------------------------------------------------------------------ // // Commands @@ -6,11 +7,11 @@ package main //------------------------------------------------------------------------------ import ( + "encoding/json" "github.com/benbjohnson/go-raft" "github.com/xiangli-cmu/raft-etcd/store" - "encoding/json" "time" - ) +) // A command represents an action to be taken on the replicated state machine. type Command interface { @@ -20,8 +21,8 @@ type Command interface { // Set command type SetCommand struct { - Key string `json:"key"` - Value string `json:"value"` + Key string `json:"key"` + Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` } @@ -40,7 +41,6 @@ func (c *SetCommand) GeneratePath() string { return "set/" + c.Key } - // Get command type GetCommand struct { Key string `json:"key"` @@ -52,12 +52,12 @@ func (c *GetCommand) CommandName() string { } // Set the value of key to value -func (c *GetCommand) Apply(server *raft.Server) ([]byte, error){ +func (c *GetCommand) Apply(server *raft.Server) ([]byte, error) { res := store.Get(c.Key) return json.Marshal(res) } -func (c *GetCommand) GeneratePath() string{ +func (c *GetCommand) GeneratePath() string { return "get/" + c.Key } @@ -71,8 +71,8 @@ func (c *DeleteCommand) CommandName() string { return "delete" } -// Delete the key -func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){ +// Delete the key +func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error) { return store.Delete(c.Key) } @@ -86,14 +86,14 @@ func (c *WatchCommand) CommandName() string { return "watch" } -func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error){ +func (c *WatchCommand) Apply(server *raft.Server) ([]byte, error) { ch := make(chan store.Response) // add to the watchers list - store.AddWatcher(c.Key, ch) + store.AddWatcher(c.Key, ch) // wait for the notification for any changing - res := <- ch + res := <-ch return json.Marshal(res) } @@ -112,6 +112,3 @@ func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) { // no result will be returned return nil, err } - - - diff --git a/handlers.go b/handlers.go index 48105c7ae..3e7073629 100644 --- a/handlers.go +++ b/handlers.go @@ -1,17 +1,16 @@ package main import ( + "encoding/json" "github.com/benbjohnson/go-raft" "net/http" - "encoding/json" //"fmt" "io/ioutil" //"bytes" - "time" - "strings" "strconv" - ) - + "strings" + "time" +) //-------------------------------------- // HTTP Handlers @@ -73,9 +72,8 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusInternalServerError) } - func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { - + command := &JoinCommand{} if err := decodeJsonRequest(req, command); err == nil { @@ -87,7 +85,6 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) { } } - func SetHttpHandler(w http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/set/"):] @@ -96,7 +93,7 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) { if err != nil { warn("raftd: Unable to read: %v", err) w.WriteHeader(http.StatusInternalServerError) - return + return } debug("[recv] POST http://%v/set/%s [%s]", server.Name(), key, content) @@ -112,11 +109,11 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) { if err != nil { warn("raftd: Bad duration: %v", err) w.WriteHeader(http.StatusInternalServerError) - return + return } command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration)) } else { - command.ExpireTime = time.Unix(0,0) + command.ExpireTime = time.Unix(0, 0) } excute(command, &w) @@ -134,7 +131,6 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) { excute(command, &w) } - func excute(c Command, w *http.ResponseWriter) { if server.State() == "leader" { if body, err := server.Do(c); err != nil { @@ -152,11 +148,11 @@ func excute(c Command, w *http.ResponseWriter) { (*w).Write([]byte(server.Leader())) return } - + (*w).WriteHeader(http.StatusInternalServerError) return -} +} func MasterHttpHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) @@ -202,8 +198,3 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) { } } - - - - - diff --git a/raftd.go b/raftd.go index 3bc01d183..ae9681268 100644 --- a/raftd.go +++ b/raftd.go @@ -2,23 +2,23 @@ package main import ( "bytes" + "crypto/tls" + "crypto/x509" "encoding/json" "encoding/pem" "flag" "fmt" "github.com/benbjohnson/go-raft" - "log" + "github.com/xiangli-cmu/raft-etcd/store" + "github.com/xiangli-cmu/raft-etcd/web" "io" "io/ioutil" + "log" "net/http" - "strings" "os" - "time" "strconv" - "crypto/tls" - "crypto/x509" - "github.com/xiangli-cmu/raft-etcd/web" - "github.com/xiangli-cmu/raft-etcd/store" + "strings" + "time" ) //------------------------------------------------------------------------------ @@ -27,7 +27,6 @@ import ( // //------------------------------------------------------------------------------ - var verbose bool var leaderHost string var address string @@ -45,15 +44,16 @@ func init() { flag.StringVar(&certFile, "cert", "", "the cert file of the server") flag.StringVar(&keyFile, "key", "", "the key file of the server") } + // CONSTANTS -const ( +const ( HTTP = iota HTTPS HTTPSANDVERIFY ) const ( - ELECTIONTIMTOUT = 3 * time.Second + ELECTIONTIMTOUT = 3 * time.Second HEARTBEATTIMEOUT = 1 * time.Second ) @@ -65,7 +65,7 @@ const ( type Info struct { Host string `json:"host"` - Port int `json:"port"` + Port int `json:"port"` } //------------------------------------------------------------------------------ @@ -79,7 +79,6 @@ var logger *log.Logger var storeMsg chan string - //------------------------------------------------------------------------------ // // Functions @@ -100,7 +99,7 @@ func main() { raft.RegisterCommand(&SetCommand{}) raft.RegisterCommand(&GetCommand{}) raft.RegisterCommand(&DeleteCommand{}) - + // Use the present working directory if a directory was not passed in. var path string if flag.NArg() == 0 { @@ -118,7 +117,7 @@ func main() { name := fmt.Sprintf("%s:%d", info.Host, info.Port) fmt.Printf("Name: %s\n\n", name) - + // secrity type st := securityType() @@ -126,7 +125,7 @@ func main() { panic("ERROR type") } - t := createTranHandler(st) + t := createTranHandler(st) // Setup new raft server. s := store.GetStore() @@ -159,7 +158,7 @@ func main() { server.Do(command) debug("%s start as a leader", server.Name()) - // start as a fellower in a existing cluster + // start as a fellower in a existing cluster } else { server.StartElectionTimeout() server.StartFollower() @@ -171,7 +170,7 @@ func main() { fmt.Println("success join") } - // rejoin the previous cluster + // rejoin the previous cluster } else { server.StartElectionTimeout() server.StartFollower() @@ -181,15 +180,14 @@ func main() { // open the snapshot go server.Snapshot() + if webPort != -1 { + // start web + s.SetMessager(&storeMsg) + go webHelper() + go web.Start(server, webPort) + } - if webPort != -1 { - // start web - s.SetMessager(&storeMsg) - go webHelper() - go web.Start(server, webPort) - } - - startTransport(info.Port, st) + startTransport(info.Port, st) } @@ -216,12 +214,12 @@ func createTranHandler(st int) transHandler { } tr := &http.Transport{ - TLSClientConfig: &tls.Config{ - Certificates: []tls.Certificate{tlsCert}, + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, InsecureSkipVerify: true, - }, - DisableCompression: true, - } + }, + DisableCompression: true, + } t.client = &http.Client{Transport: tr} return t @@ -231,36 +229,35 @@ func createTranHandler(st int) transHandler { return transHandler{} } -func startTransport(port int, st int) { +func startTransport(port int, st int) { // internal commands - http.HandleFunc("/join", JoinHttpHandler) - http.HandleFunc("/vote", VoteHttpHandler) - http.HandleFunc("/log", GetLogHttpHandler) - http.HandleFunc("/log/append", AppendEntriesHttpHandler) - http.HandleFunc("/snapshot", SnapshotHttpHandler) + http.HandleFunc("/join", JoinHttpHandler) + http.HandleFunc("/vote", VoteHttpHandler) + http.HandleFunc("/log", GetLogHttpHandler) + http.HandleFunc("/log/append", AppendEntriesHttpHandler) + http.HandleFunc("/snapshot", SnapshotHttpHandler) - // external commands - http.HandleFunc("/set/", SetHttpHandler) - http.HandleFunc("/get/", GetHttpHandler) - http.HandleFunc("/delete/", DeleteHttpHandler) - http.HandleFunc("/watch/", WatchHttpHandler) - http.HandleFunc("/master", MasterHttpHandler) + // external commands + http.HandleFunc("/set/", SetHttpHandler) + http.HandleFunc("/get/", GetHttpHandler) + http.HandleFunc("/delete/", DeleteHttpHandler) + http.HandleFunc("/watch/", WatchHttpHandler) + http.HandleFunc("/master", MasterHttpHandler) - switch st { + switch st { - case HTTP: - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) + case HTTP: + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) - case HTTPS: - http.ListenAndServeTLS(fmt.Sprintf(":%d", port), certFile, keyFile, nil) + case HTTPS: + http.ListenAndServeTLS(fmt.Sprintf(":%d", port), certFile, keyFile, nil) - case HTTPSANDVERIFY: - pemByte, _ := ioutil.ReadFile(CAFile) + case HTTPSANDVERIFY: + pemByte, _ := ioutil.ReadFile(CAFile) block, pemByte := pem.Decode(pemByte) - cert, err := x509.ParseCertificate(block.Bytes) if err != nil { @@ -274,16 +271,16 @@ func startTransport(port int, st int) { server := &http.Server{ TLSConfig: &tls.Config{ ClientAuth: tls.RequireAndVerifyClientCert, - ClientCAs: certPool, - }, - Addr:fmt.Sprintf(":%d", port), + ClientCAs: certPool, + }, + Addr: fmt.Sprintf(":%d", port), } err = server.ListenAndServeTLS(certFile, keyFile) if err != nil { log.Fatal(err) } - } + } } @@ -291,8 +288,8 @@ func startTransport(port int, st int) { // Config //-------------------------------------- -func securityType() int{ - if keyFile == "" && certFile == "" && CAFile == ""{ +func securityType() int { + if keyFile == "" && certFile == "" && CAFile == "" { return HTTP @@ -310,7 +307,6 @@ func securityType() int{ return -1 } - func getInfo(path string) *Info { info := &Info{} @@ -325,10 +321,10 @@ func getInfo(path string) *Info { } } file.Close() - - // Otherwise ask user for info and write it to file. + + // Otherwise ask user for info and write it to file. } else { - + if address == "" { fatal("Please give the address of the local machine") } @@ -341,9 +337,9 @@ func getInfo(path string) *Info { info.Host = input[0] info.Host = strings.TrimSpace(info.Host) - + info.Port, err = strconv.Atoi(input[1]) - + if err != nil { fatal("Wrong port %s", address) } @@ -355,11 +351,10 @@ func getInfo(path string) *Info { fatal("Unable to write info to file: %v", err) } } - + return info } - //-------------------------------------- // Handlers //-------------------------------------- @@ -367,15 +362,14 @@ func getInfo(path string) *Info { // Send join requests to the leader. func Join(s *raft.Server, serverName string) error { var b bytes.Buffer - + command := &JoinCommand{} command.Name = s.Name() json.NewEncoder(&b).Encode(command) - // t must be ok - t,_ := server.Transporter().(transHandler) + t, _ := server.Transporter().(transHandler) debug("Send Join Request to %s", serverName) resp, err := Post(&t, fmt.Sprintf("%s/join", serverName), &b) @@ -399,6 +393,7 @@ func Join(s *raft.Server, serverName string) error { } return fmt.Errorf("Unable to join: %v", err) } + //-------------------------------------- // Web Helper //-------------------------------------- @@ -410,7 +405,6 @@ func webHelper() { } } - //-------------------------------------- // HTTP Utilities //-------------------------------------- @@ -434,13 +428,13 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) { } } -func Post(t *transHandler, path string, body io.Reader) (*http.Response, error){ +func Post(t *transHandler, path string, body io.Reader) (*http.Response, error) { if t.client != nil { - resp, err := t.client.Post("https://" + path, "application/json", body) + resp, err := t.client.Post("https://"+path, "application/json", body) return resp, err } else { - resp, err := http.Post("http://" + path, "application/json", body) + resp, err := http.Post("http://"+path, "application/json", body) return resp, err } } @@ -461,22 +455,19 @@ func Get(t *transHandler, path string) (*http.Response, error) { func debug(msg string, v ...interface{}) { if verbose { - logger.Printf("DEBUG " + msg + "\n", v...) + logger.Printf("DEBUG "+msg+"\n", v...) } } func info(msg string, v ...interface{}) { - logger.Printf("INFO " + msg + "\n", v...) + logger.Printf("INFO "+msg+"\n", v...) } func warn(msg string, v ...interface{}) { - logger.Printf("Alpaca Server: WARN " + msg + "\n", v...) + logger.Printf("Alpaca Server: WARN "+msg+"\n", v...) } func fatal(msg string, v ...interface{}) { - logger.Printf("FATAL " + msg + "\n", v...) + logger.Printf("FATAL "+msg+"\n", v...) os.Exit(1) } - - - diff --git a/store/store.go b/store/store.go index db0d26ce4..0bb1027d7 100644 --- a/store/store.go +++ b/store/store.go @@ -1,11 +1,11 @@ package store import ( - "path" "encoding/json" - "time" "fmt" - ) + "path" + "time" +) // global store var s *Store @@ -13,26 +13,24 @@ var s *Store // CONSTANTS const ( ERROR = -1 + iota - SET + SET DELETE GET ) - -var PERMANENT = time.Unix(0,0) +var PERMANENT = time.Unix(0, 0) type Store struct { // use the build-in hash map as the key-value store structure - Nodes map[string]Node `json:"nodes"` + Nodes map[string]Node `json:"nodes"` // the string channel to send messages to the outside world // now we use it to send changes to the hub of the web service messager *chan string } - type Node struct { - Value string `json:"value"` + Value string `json:"value"` // if the node is a permanent one the ExprieTime will be Unix(0,0) // Otherwise after the expireTime, the node will be deleted @@ -43,14 +41,14 @@ type Node struct { } type Response struct { - Action int `json:"action"` + Action int `json:"action"` Key string `json:"key"` OldValue string `json:"oldValue"` NewValue string `json:"newValue"` // if the key existed before the action, this field should be true // if the key did not exist before the action, this field should be false - Exist bool `json:"exist"` + Exist bool `json:"exist"` Expiration time.Time `json:"expiration"` } @@ -61,7 +59,7 @@ func init() { } // make a new stroe -func createStore() *Store{ +func createStore() *Store { s := new(Store) s.Nodes = make(map[string]Node) return s @@ -73,11 +71,11 @@ func GetStore() *Store { } // set the messager of the store -func (s *Store)SetMessager(messager *chan string) { +func (s *Store) SetMessager(messager *chan string) { s.messager = messager -} +} -// set the key to value, return the old value if the key exists +// set the key to value, return the old value if the key exists func Set(key string, value string, expireTime time.Time) ([]byte, error) { key = path.Clean(key) @@ -97,11 +95,11 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { node, ok := s.Nodes[key] if ok { - // if node is not permanent before + // if node is not permanent before // update its expireTime if !node.ExpireTime.Equal(PERMANENT) { - node.update <- expireTime + node.update <- expireTime } else { // if we want the permanent node to have expire time @@ -115,7 +113,7 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { // update the information of the node node.ExpireTime = expireTime node.Value = value - + resp := Response{SET, key, node.Value, value, true, expireTime} msg, err := json.Marshal(resp) @@ -123,14 +121,14 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { notify(resp) // send to the messager - if (s.messager != nil && err == nil) { + if s.messager != nil && err == nil { *s.messager <- string(msg) - } + } return msg, err - // add new node + // add new node } else { update := make(chan time.Time) @@ -149,10 +147,10 @@ func Set(key string, value string, expireTime time.Time) ([]byte, error) { notify(resp) // notify the web interface - if (s.messager != nil && err == nil) { + if s.messager != nil && err == nil { *s.messager <- string(msg) - } + } return msg, err } @@ -180,10 +178,10 @@ func expire(key string, update chan time.Time, expireTime time.Time) { notify(resp) // notify the messager - if (s.messager != nil && err == nil) { + if s.messager != nil && err == nil { *s.messager <- string(msg) - } + } return @@ -191,7 +189,7 @@ func expire(key string, update chan time.Time, expireTime time.Time) { case updateTime := <-update: //update duration - // if the node become a permanent one, the go routine is + // if the node become a permanent one, the go routine is // not needed if updateTime.Equal(PERMANENT) { return @@ -242,10 +240,10 @@ func Delete(key string) ([]byte, error) { notify(resp) // notify the messager - if (s.messager != nil && err == nil) { + if s.messager != nil && err == nil { *s.messager <- string(msg) - } + } return msg, err @@ -256,7 +254,7 @@ func Delete(key string) ([]byte, error) { } // save the current state of the storage system -func (s *Store)Save() ([]byte, error) { +func (s *Store) Save() ([]byte, error) { b, err := json.Marshal(s) if err != nil { fmt.Println(err) @@ -266,7 +264,7 @@ func (s *Store)Save() ([]byte, error) { } // recovery the state of the stroage system from a previous state -func (s *Store)Recovery(state []byte) error { +func (s *Store) Recovery(state []byte) error { err := json.Unmarshal(state, s) // clean the expired nodes @@ -277,7 +275,7 @@ func (s *Store)Recovery(state []byte) error { // clean all expired keys func clean() { - for key, node := range s.Nodes{ + for key, node := range s.Nodes { if node.ExpireTime.Equal(PERMANENT) { continue diff --git a/store/store_test.go b/store/store_test.go index a51220ffd..2be7ec276 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,9 +1,9 @@ package store import ( + "fmt" "testing" "time" - "fmt" ) func TestStoreGet(t *testing.T) { @@ -53,7 +53,6 @@ func TestStoreGet(t *testing.T) { // t.Fatalf("Get expired value") // } - // s.Delete("foo") // } @@ -63,8 +62,8 @@ func TestExpire(t *testing.T) { fmt.Println("TEST EXPIRE") // test expire - Set("foo", "bar", time.Now().Add(time.Second * 1)) - time.Sleep(2*time.Second) + Set("foo", "bar", time.Now().Add(time.Second*1)) + time.Sleep(2 * time.Second) res := Get("foo") @@ -73,7 +72,7 @@ func TestExpire(t *testing.T) { } //test change expire time - Set("foo", "bar", time.Now().Add(time.Second * 10)) + Set("foo", "bar", time.Now().Add(time.Second*10)) res = Get("foo") @@ -81,7 +80,7 @@ func TestExpire(t *testing.T) { t.Fatalf("Cannot get Value") } - Set("foo", "barbar", time.Now().Add(time.Second * 1)) + Set("foo", "barbar", time.Now().Add(time.Second*1)) time.Sleep(2 * time.Second) @@ -91,13 +90,12 @@ func TestExpire(t *testing.T) { t.Fatalf("Got expired value") } - // test change expire to stable - Set("foo", "bar", time.Now().Add(time.Second * 1)) + Set("foo", "bar", time.Now().Add(time.Second*1)) - Set("foo", "bar", time.Unix(0,0)) + Set("foo", "bar", time.Unix(0, 0)) - time.Sleep(2*time.Second) + time.Sleep(2 * time.Second) res = s.Get("foo") @@ -105,22 +103,21 @@ func TestExpire(t *testing.T) { t.Fatalf("Cannot get Value") } - // test stable to expire - s.Set("foo", "bar", time.Now().Add(time.Second * 1)) - time.Sleep(2*time.Second) + // test stable to expire + s.Set("foo", "bar", time.Now().Add(time.Second*1)) + time.Sleep(2 * time.Second) res = s.Get("foo") if res.Exist { t.Fatalf("Got expired value") } - // test set older node - s.Set("foo", "bar", time.Now().Add(-time.Second * 1)) + // test set older node + s.Set("foo", "bar", time.Now().Add(-time.Second*1)) res = s.Get("foo") if res.Exist { t.Fatalf("Got expired value") } - } diff --git a/store/watcher.go b/store/watcher.go index 2a34b320b..4d5932962 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -3,9 +3,9 @@ package store import ( "path" "strings" - //"fmt" - ) +//"fmt" +) type Watchers struct { chanMap map[string][]chan Response @@ -14,7 +14,6 @@ type Watchers struct { // global watcher var w *Watchers - // init the global watcher func init() { w = createWatcher() @@ -66,7 +65,7 @@ func notify(resp Response) error { for _, c := range chans { c <- resp } - + // we have notified all the watchers at this path // delete the map delete(w.chanMap, currPath) @@ -75,4 +74,4 @@ func notify(resp Response) error { } return nil -} \ No newline at end of file +} diff --git a/store/watcher_test.go b/store/watcher_test.go index 7c6ecb56b..ad5e80283 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -1,8 +1,8 @@ package store import ( - "testing" "fmt" + "testing" "time" ) diff --git a/trans_handler.go b/trans_handler.go index 587f334a0..db23ab3fd 100644 --- a/trans_handler.go +++ b/trans_handler.go @@ -1,16 +1,16 @@ package main -import( - "encoding/json" - "github.com/benbjohnson/go-raft" +import ( "bytes" - "net/http" + "encoding/json" "fmt" + "github.com/benbjohnson/go-raft" "io" + "net/http" ) type transHandler struct { - name string + name string client *http.Client } @@ -19,7 +19,9 @@ func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.P var aersp *raft.AppendEntriesResponse var b bytes.Buffer json.NewEncoder(&b).Encode(req) - + + debug("Send LogEntries to %s ", peer.Name()) + resp, err := Post(&t, fmt.Sprintf("%s/log/append", peer.Name()), &b) if resp != nil { @@ -28,7 +30,7 @@ func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.P if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { return aersp, nil } - + } return aersp, fmt.Errorf("raftd: Unable to append entries: %v", err) } @@ -39,6 +41,8 @@ func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req var b bytes.Buffer json.NewEncoder(&b).Encode(req) + debug("Send Vote to %s", peer.Name()) + resp, err := Post(&t, fmt.Sprintf("%s/vote", peer.Name()), &b) if resp != nil { @@ -47,9 +51,9 @@ func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req if err = json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { return rvrsp, nil } - + } - return rvrsp, fmt.Errorf("raftd: Unable to request vote: %v", err) + return rvrsp, fmt.Errorf("Unable to request vote: %v", err) } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. @@ -58,7 +62,8 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, var b bytes.Buffer json.NewEncoder(&b).Encode(req) - debug("[send] POST %s/snapshot [%d %d]", peer.Name(), req.LastTerm, req.LastIndex) + debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(), + req.LastTerm, req.LastIndex) resp, err := Post(&t, fmt.Sprintf("%s/snapshot", peer.Name()), &b) @@ -70,5 +75,5 @@ func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, return aersp, nil } } - return aersp, fmt.Errorf("raftd: Unable to send snapshot: %v", err) -} \ No newline at end of file + return aersp, fmt.Errorf("Unable to send snapshot: %v", err) +} diff --git a/web/conn.go b/web/conn.go index ab0e6d7c6..25e871635 100644 --- a/web/conn.go +++ b/web/conn.go @@ -1,17 +1,17 @@ package web - + import ( "code.google.com/p/go.net/websocket" ) - + type connection struct { // The websocket connection. ws *websocket.Conn - + // Buffered channel of outbound messages. send chan string } - + func (c *connection) writer() { for message := range c.send { err := websocket.Message.Send(c.ws, message) @@ -21,10 +21,10 @@ func (c *connection) writer() { } c.ws.Close() } - + func wsHandler(ws *websocket.Conn) { c := &connection{send: make(chan string, 256), ws: ws} h.register <- c defer func() { h.unregister <- c }() c.writer() -} \ No newline at end of file +} diff --git a/web/hub.go b/web/hub.go index 3abcd0a9b..47f203f72 100644 --- a/web/hub.go +++ b/web/hub.go @@ -1,61 +1,61 @@ package web type hub struct { - // status - open bool + // status + open bool - // Registered connections. - connections map[*connection]bool + // Registered connections. + connections map[*connection]bool - // Inbound messages from the connections. - broadcast chan string + // Inbound messages from the connections. + broadcast chan string - // Register requests from the connections. - register chan *connection + // Register requests from the connections. + register chan *connection - // Unregister requests from connections. - unregister chan *connection + // Unregister requests from connections. + unregister chan *connection } var h = hub{ - open: false, - broadcast: make(chan string), - register: make(chan *connection), - unregister: make(chan *connection), - connections: make(map[*connection]bool), + open: false, + broadcast: make(chan string), + register: make(chan *connection), + unregister: make(chan *connection), + connections: make(map[*connection]bool), } -func Hub() *hub{ - return &h +func Hub() *hub { + return &h } func HubOpen() bool { - return h.open + return h.open } func (h *hub) run() { - h.open = true - for { - select { - case c := <-h.register: - h.connections[c] = true - case c := <-h.unregister: - delete(h.connections, c) - close(c.send) - case m := <-h.broadcast: - for c := range h.connections { - select { - case c.send <- m: - default: - delete(h.connections, c) - close(c.send) - go c.ws.Close() - } - } - } - } + h.open = true + for { + select { + case c := <-h.register: + h.connections[c] = true + case c := <-h.unregister: + delete(h.connections, c) + close(c.send) + case m := <-h.broadcast: + for c := range h.connections { + select { + case c.send <- m: + default: + delete(h.connections, c) + close(c.send) + go c.ws.Close() + } + } + } + } } func (h *hub) Send(msg string) { - h.broadcast <- msg -} \ No newline at end of file + h.broadcast <- msg +} diff --git a/web/web.go b/web/web.go index 73198f2b3..137e10ba2 100644 --- a/web/web.go +++ b/web/web.go @@ -1,42 +1,41 @@ package web import ( - "fmt" - "net/http" - "github.com/xiangli-cmu/raft-etcd/store" - "github.com/benbjohnson/go-raft" - "time" - "code.google.com/p/go.net/websocket" - "html/template" + "code.google.com/p/go.net/websocket" + "fmt" + "github.com/benbjohnson/go-raft" + "github.com/xiangli-cmu/raft-etcd/store" + "html/template" + "net/http" + "time" ) var s *raft.Server type MainPage struct { - Leader string - Address string + Leader string + Address string } func handler(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Leader:\n%s\n", s.Leader()) - fmt.Fprintf(w, "Peers:\n") + fmt.Fprintf(w, "Leader:\n%s\n", s.Leader()) + fmt.Fprintf(w, "Peers:\n") - for peerName, _ := range s.Peers() { - fmt.Fprintf(w, "%s\n", peerName) - } + for peerName, _ := range s.Peers() { + fmt.Fprintf(w, "%s\n", peerName) + } + fmt.Fprintf(w, "Data\n") - fmt.Fprintf(w, "Data\n") + s := store.GetStore() - s := store.GetStore() - - for key, node := range s.Nodes { - if node.ExpireTime.Equal(time.Unix(0,0)) { - fmt.Fprintf(w, "%s %s\n", key, node.Value) - } else { - fmt.Fprintf(w, "%s %s %s\n", key, node.Value, node.ExpireTime) - } - } + for key, node := range s.Nodes { + if node.ExpireTime.Equal(time.Unix(0, 0)) { + fmt.Fprintf(w, "%s %s\n", key, node.Value) + } else { + fmt.Fprintf(w, "%s %s %s\n", key, node.Value, node.ExpireTime) + } + } } @@ -44,24 +43,20 @@ var mainTempl = template.Must(template.ParseFiles("home.html")) func mainHandler(c http.ResponseWriter, req *http.Request) { - p := &MainPage{Leader: s.Leader(), - Address: s.Name(),} + p := &MainPage{Leader: s.Leader(), + Address: s.Name()} - mainTempl.Execute(c, p) + mainTempl.Execute(c, p) } - func Start(server *raft.Server, port int) { s = server - go h.run() - http.HandleFunc("/", mainHandler) - http.Handle("/ws", websocket.Handler(wsHandler)) + go h.run() + http.HandleFunc("/", mainHandler) + http.Handle("/ws", websocket.Handler(wsHandler)) - //http.HandleFunc("/", handler) - fmt.Println("web listening at port ", port) - http.ListenAndServe(fmt.Sprintf(":%v", port), nil) + //http.HandleFunc("/", handler) + fmt.Println("web listening at port ", port) + http.ListenAndServe(fmt.Sprintf(":%v", port), nil) } - - -