diff --git a/command.go b/command.go index ab0a83108..7a27362d4 100644 --- a/command.go +++ b/command.go @@ -10,7 +10,6 @@ import ( "encoding/json" "time" "github.com/xiangli-cmu/raft-etcd/store" - "github.com/xiangli-cmu/raft-etcd/web" ) // A command represents an action to be taken on the replicated state machine. @@ -38,12 +37,7 @@ func (c *SetCommand) CommandName() string { // Set the value of key to value func (c *SetCommand) Apply(server *raft.Server) ([]byte, error) { - res := store.Set(c.Key, c.Value, c.ExpireTime) - msg, err := json.Marshal(res) - if err == nil && web.HubOpen(){ - web.Hub().Send(string(msg)) - } - return msg, err + return store.Set(c.Key, c.Value, c.ExpireTime) } // Get the path for http request @@ -118,15 +112,7 @@ func (c *DeleteCommand) CommandName() string { // Delete the key func (c *DeleteCommand) Apply(server *raft.Server) ([]byte, error){ - res := store.Delete(c.Key) - - msg, err := json.Marshal(res) - - if err == nil && web.HubOpen(){ - web.Hub().Send(string(msg)) - } - - return msg, err + return store.Delete(c.Key) } func (c *DeleteCommand) GeneratePath() string{ diff --git a/raftd.go b/raftd.go index 8ef519996..a6cdbab5f 100644 --- a/raftd.go +++ b/raftd.go @@ -28,6 +28,9 @@ var verbose bool var leaderHost string var address string var webPort int +var cert string +var key string +var CAFile string func init() { flag.BoolVar(&verbose, "v", false, "verbose logging") @@ -63,6 +66,8 @@ type Info struct { var server *raft.Server var logger *log.Logger +var storeMsg chan string + //------------------------------------------------------------------------------ // // Functions @@ -106,7 +111,9 @@ func main() { // Setup new raft server. s := store.GetStore() + server, err = raft.NewServer(name, path, t, s, nil) + if err != nil { fatal("%v", err) } @@ -168,9 +175,10 @@ func main() { if webPort != -1 { // start web - + s.SetMessager(&storeMsg) + go webHelper() go web.Start(server, webPort) - } + } // listen on http port log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil)) @@ -255,6 +263,16 @@ func Join(s *raft.Server, serverName string) error { } return fmt.Errorf("raftd: Unable to join: %v", err) } +//-------------------------------------- +// Web Helper +//-------------------------------------- + +func webHelper() { + storeMsg = make(chan string) + for { + web.Hub().Send(<-storeMsg) + } +} //-------------------------------------- diff --git a/store/store.go b/store/store.go index f2f31aba1..2e007ebe2 100644 --- a/store/store.go +++ b/store/store.go @@ -17,6 +17,7 @@ const ( type Store struct { Nodes map[string]Node `json:"nodes"` + messager *chan string } type Node struct { @@ -40,6 +41,7 @@ var s *Store func init() { s = createStore() + s.messager = nil } // make a new stroe @@ -53,8 +55,12 @@ func GetStore() *Store { return s } +func (s *Store)SetMessager(messager *chan string) { + s.messager = messager +} + // set the key to value, return the old value if the key exists -func Set(key string, value string, expireTime time.Time) Response { +func Set(key string, value string, expireTime time.Time) ([]byte, error) { key = path.Clean(key) @@ -75,12 +81,12 @@ func Set(key string, value string, expireTime time.Time) Response { //update := make(chan time.Time) //s.Nodes[key] = Node{value, expireTime, update} - node.ExpireTime = expireTime - node.Value = value - notify(SET, key, node.Value, value, true) + + // if node is not permanent before // update its expireTime if !node.ExpireTime.Equal(time.Unix(0,0)) { + node.update <- expireTime } else { @@ -94,21 +100,44 @@ func Set(key string, value string, expireTime time.Time) Response { } } - return Response{SET, key, node.Value, value, true, expireTime} + node.ExpireTime = expireTime + + node.Value = value + notify(SET, key, node.Value, value, true) + + msg, err := json.Marshal(Response{SET, key, node.Value, value, true, expireTime}) + + // notify the web interface + if (s.messager != nil && err == nil) { + + *s.messager <- string(msg) + } + + return msg, err } else { + // add new node update := make(chan time.Time) s.Nodes[key] = Node{value, expireTime, update} + // nofity the watcher notify(SET, key, "", value, false) if isExpire { go expire(key, update, expireTime) } - - return Response{SET, key, "", value, false, time.Unix(0, 0)} + + msg, err := json.Marshal(Response{SET, key, "", value, false, expireTime}) + + // notify the web interface + if (s.messager != nil && err == nil) { + + *s.messager <- string(msg) + } + + return msg, err } } @@ -148,7 +177,7 @@ func Get(key string) Response { } // delete the key, return the old value if the key exists -func Delete(key string) Response { +func Delete(key string) ([]byte, error) { key = path.Clean(key) node, ok := s.Nodes[key] @@ -158,9 +187,20 @@ func Delete(key string) Response { notify(DELETE, key, node.Value, "", true) - return Response{DELETE, key, node.Value, "", true, node.ExpireTime} + msg, err := json.Marshal(Response{DELETE, key, node.Value, "", true, node.ExpireTime}) + + // notify the web interface + if (s.messager != nil && err == nil) { + + *s.messager <- string(msg) + } + + return msg, err + } else { - return Response{DELETE, key, "", "", false, time.Unix(0, 0)} + // no notify to the watcher and web interface + + return json.Marshal(Response{DELETE, key, "", "", false, time.Unix(0, 0)}) } } diff --git a/web/home.html b/web/home.html deleted file mode 100644 index 4a83f24af..000000000 --- a/web/home.html +++ /dev/null @@ -1,87 +0,0 @@ - -
-