From 2c9c278e4d9a9cbc27ced845662bc3ff3521aa67 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 13 Sep 2013 17:10:40 -0400 Subject: [PATCH] refactor; add save and recover --- etcd_handlers.go | 22 ++-- file_system/event.go | 37 +++---- file_system/event_test.go | 8 +- file_system/file_system.go | 51 +++++++-- file_system/file_system_test.go | 180 ++++++++++++++++++++------------ file_system/node.go | 80 +++++++++----- file_system/watcher.go | 3 +- 7 files changed, 246 insertions(+), 135 deletions(-) diff --git a/etcd_handlers.go b/etcd_handlers.go index 7b8b0829e..6b5203ec4 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -176,18 +176,18 @@ func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) er //-------------------------------------- // Handler to return the current leader's raft address -// func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { -// leader := r.Leader() +func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error { + leader := r.Leader() -// if leader != "" { -// w.WriteHeader(http.StatusOK) -// raftURL, _ := nameToRaftURL(leader) -// w.Write([]byte(raftURL)) -// return nil -// } else { -// return etcdErr.NewError(etcdErr.EcodeLeaderElect, "") -// } -// } + if leader != "" { + w.WriteHeader(http.StatusOK) + raftURL, _ := nameToRaftURL(leader) + w.Write([]byte(raftURL)) + return nil + } else { + return etcdErr.NewError(etcdErr.EcodeLeaderElect, "") + } +} // Handler to return all the known machines in the current cluster func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error { diff --git a/file_system/event.go b/file_system/event.go index f3d92ebb7..f5b3eeb85 100644 --- a/file_system/event.go +++ b/file_system/event.go @@ -2,10 +2,11 @@ package fileSystem import ( "fmt" - etcdErr "github.com/coreos/etcd/error" "strings" "sync" "time" + + etcdErr "github.com/coreos/etcd/error" ) const ( @@ -61,26 +62,26 @@ func newEvent(action string, key string, index uint64, term uint64) *Event { } type eventQueue struct { - events []*Event - size int - front int - capacity int + Events []*Event + Size int + Front int + Capacity int } func (eq *eventQueue) back() int { - return (eq.front + eq.size - 1 + eq.capacity) % eq.capacity + return (eq.Front + eq.Size - 1 + eq.Capacity) % eq.Capacity } func (eq *eventQueue) insert(e *Event) { - index := (eq.back() + 1) % eq.capacity + index := (eq.back() + 1) % eq.Capacity - eq.events[index] = e + eq.Events[index] = e - if eq.size == eq.capacity { //dequeue - eq.front = (index + 1) % eq.capacity + if eq.Size == eq.Capacity { //dequeue + eq.Front = (index + 1) % eq.Capacity } else { - eq.size++ + eq.Size++ } } @@ -94,8 +95,8 @@ type EventHistory struct { func newEventHistory(capacity int) *EventHistory { return &EventHistory{ Queue: eventQueue{ - capacity: capacity, - events: make([]*Event, capacity), + Capacity: capacity, + Events: make([]*Event, capacity), }, } } @@ -107,7 +108,7 @@ func (eh *EventHistory) addEvent(e *Event) { eh.Queue.insert(e) - eh.StartIndex = eh.Queue.events[eh.Queue.front].Index + eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index } // scan function is enumerating events from the index in history and @@ -129,19 +130,19 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) { ) } - if start >= uint64(eh.Queue.size) { + if start >= uint64(eh.Queue.Size) { return nil, nil } - i := int((start + uint64(eh.Queue.front)) % uint64(eh.Queue.capacity)) + i := int((start + uint64(eh.Queue.Front)) % uint64(eh.Queue.Capacity)) for { - e := eh.Queue.events[i] + e := eh.Queue.Events[i] if strings.HasPrefix(e.Key, prefix) { return e, nil } - i = (i + 1) % eh.Queue.capacity + i = (i + 1) % eh.Queue.Capacity if i == eh.Queue.back() { // TODO: Add error type diff --git a/file_system/event_test.go b/file_system/event_test.go index fd0692428..146f8cd4d 100644 --- a/file_system/event_test.go +++ b/file_system/event_test.go @@ -19,15 +19,15 @@ func TestEventQueue(t *testing.T) { // Test j := 100 - i := eh.Queue.front - n := eh.Queue.size + i := eh.Queue.Front + n := eh.Queue.Size for ; n > 0; n-- { - e := eh.Queue.events[i] + e := eh.Queue.Events[i] if e.Index != uint64(j) { t.Fatalf("queue error!") } j++ - i = (i + 1) % eh.Queue.capacity + i = (i + 1) % eh.Queue.Capacity } diff --git a/file_system/file_system.go b/file_system/file_system.go index d45c8d1bc..5d11f8f20 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -1,6 +1,7 @@ package fileSystem import ( + "encoding/json" "fmt" "path" "sort" @@ -11,11 +12,10 @@ import ( ) type FileSystem struct { - Root *Node - EventHistory *EventHistory - WatcherHub *watcherHub - Index uint64 - Term uint64 + Root *Node + WatcherHub *watcherHub + Index uint64 + Term uint64 } func New() *FileSystem { @@ -126,7 +126,7 @@ func (fs *FileSystem) Create(nodePath string, value string, expireTime time.Time // Node with TTL if expireTime != Permanent { - go n.Expire() + n.Expire() e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) } @@ -164,13 +164,13 @@ func (fs *FileSystem) Update(nodePath string, value string, expireTime time.Time } // update ttl - if n.ExpireTime != Permanent && expireTime != Permanent { + if !n.IsPermanent() && expireTime != Permanent { n.stopExpire <- true } - if expireTime != Permanent { + if expireTime.Sub(Permanent) != 0 { n.ExpireTime = expireTime - go n.Expire() + n.Expire() e.Expiration = &n.ExpireTime e.TTL = int64(expireTime.Sub(time.Now()) / time.Second) } @@ -298,7 +298,6 @@ func (fs *FileSystem) InternalGet(nodePath string, index uint64, term uint64) (* // If it does not exist, this function will create a new directory and return the pointer to that node. // If it is a file, this function will return error. func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) { - subDir, ok := parent.Children[dirName] if ok { @@ -311,3 +310,35 @@ func (fs *FileSystem) checkDir(parent *Node, dirName string) (*Node, error) { return n, nil } + +// Save function saves the static state of the store system. +// Save function will not be able to save the state of watchers. +// Save function will not save the parent field of the node. Or there will +// be cyclic dependencies issue for the json package. +func (fs *FileSystem) Save() []byte { + cloneFs := New() + cloneFs.Root = fs.Root.Clone() + + b, err := json.Marshal(fs) + + if err != nil { + panic(err) + } + + return b +} + +// recovery function recovery the store system from a static state. +// It needs to recovery the parent field of the nodes. +// It needs to delete the expired nodes since the saved time and also +// need to create monitor go routines. +func (fs *FileSystem) Recover(state []byte) { + err := json.Unmarshal(state, fs) + + if err != nil { + panic(err) + } + + fs.Root.recoverAndclean() + +} diff --git a/file_system/file_system_test.go b/file_system/file_system_test.go index e887fd9da..4da413da9 100644 --- a/file_system/file_system_test.go +++ b/file_system/file_system_test.go @@ -311,7 +311,6 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ? t.Fatalf("[%v/%v] [%v/%v]", e.PrevValue, "car", e.Value, "bar") } - //e, err = fs.TestAndSet("/foo", ) } func TestWatch(t *testing.T) { @@ -377,6 +376,115 @@ func TestWatch(t *testing.T) { } +func TestSort(t *testing.T) { + fs := New() + + // simulating random creation + keys := GenKeys(80, 4) + + i := uint64(1) + for _, k := range keys { + _, err := fs.Create(k, "bar", Permanent, i, 1) + if err != nil { + panic(err) + } else { + i++ + } + } + + e, err := fs.Get("/foo", true, true, i, 1) + if err != nil { + t.Fatalf("get dir nodes failed [%s]", err.Error()) + } + + for i, k := range e.KVPairs[:len(e.KVPairs)-1] { + + if k.Key >= e.KVPairs[i+1].Key { + t.Fatalf("sort failed, [%s] should be placed after [%s]", k.Key, e.KVPairs[i+1].Key) + } + + if k.Dir { + recursiveTestSort(k, t) + } + + } + + if k := e.KVPairs[len(e.KVPairs)-1]; k.Dir { + recursiveTestSort(k, t) + } +} + +func TestSaveAndRecover(t *testing.T) { + fs := New() + + // simulating random creation + keys := GenKeys(8, 4) + + i := uint64(1) + for _, k := range keys { + _, err := fs.Create(k, "bar", Permanent, i, 1) + if err != nil { + panic(err) + } else { + i++ + } + } + + // create a node with expiration + // test if we can reach the node before expiration + + expire := time.Now().Add(time.Second) + fs.Create("/foo/foo", "bar", expire, 1, 1) + + b := fs.Save() + + cloneFs := New() + + time.Sleep(time.Second) + + cloneFs.Recover(b) + + for i, k := range keys { + _, err := cloneFs.Get(k, false, false, uint64(i), 1) + if err != nil { + panic(err) + } + } + + if fs.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex { + t.Fatal("Error recovered event history start index") + } + + for i = 0; int(i) < fs.WatcherHub.EventHistory.Queue.Size; i++ { + if fs.WatcherHub.EventHistory.Queue.Events[i].Key != + cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key { + t.Fatal("Error recovered event history") + } + } + + _, err := fs.Get("/foo/foo", false, false, 1, 1) + + if err == nil || err.Error() != "Key Not Found" { + t.Fatalf("can get the node after deletion ") + } + +} + +// GenKeys randomly generate num of keys with max depth +func GenKeys(num int, depth int) []string { + keys := make([]string, num) + for i := 0; i < num; i++ { + + keys[i] = "/foo" + depth := rand.Intn(depth) + 1 + + for j := 0; j < depth; j++ { + keys[i] += "/" + strconv.Itoa(rand.Int()) + } + } + return keys +} + func createAndGet(fs *FileSystem, path string, t *testing.T) { _, err := fs.Create(path, "bar", Permanent, 1, 1) @@ -396,58 +504,8 @@ func createAndGet(fs *FileSystem, path string, t *testing.T) { } -func nonblockingRetrive(c <-chan *Event) *Event { - select { - case e := <-c: - return e - default: - return nil - } -} - -func TestSort(t *testing.T) { - fs := New() - - // simulating random creation - keys := GenKeys(80, 4) - - //t.Log(keys) - i := uint64(1) - for _, k := range keys { - _, err := fs.Create(k, "bar", Permanent, i, 1) - if err != nil { - //t.Logf("create node[%s] failed %s", k, err.Error()) - } else { - i++ - } - } - - e, err := fs.Get("/foo", true, true, i, 1) - if err != nil { - t.Fatalf("get dir nodes failed [%s]", err.Error()) - } - - for i, k := range e.KVPairs[:len(e.KVPairs)-1] { - //t.Log("root:") - //t.Log(k) - if k.Key >= e.KVPairs[i+1].Key { - t.Fatalf("sort failed, [%s] should be placed after [%s]", k.Key, e.KVPairs[i+1].Key) - } - - if k.Dir { - recursiveTestSort(k, t) - } - - } - - if k := e.KVPairs[len(e.KVPairs)-1]; k.Dir { - recursiveTestSort(k, t) - } -} - func recursiveTestSort(k KeyValuePair, t *testing.T) { - //t.Log("recursive in") - //t.Log(k) + for i, v := range k.KVPairs[:len(k.KVPairs)-1] { if v.Key >= k.KVPairs[i+1].Key { t.Fatalf("sort failed, [%s] should be placed after [%s]", v.Key, k.KVPairs[i+1].Key) @@ -464,17 +522,11 @@ func recursiveTestSort(k KeyValuePair, t *testing.T) { } } -// GenKeys randomly generate num of keys with max depth -func GenKeys(num int, depth int) []string { - keys := make([]string, num) - for i := 0; i < num; i++ { - - keys[i] = "/foo" - depth := rand.Intn(depth) + 1 - - for j := 0; j < depth; j++ { - keys[i] += "/" + strconv.Itoa(rand.Int()%20) - } +func nonblockingRetrive(c <-chan *Event) *Event { + select { + case e := <-c: + return e + default: + return nil } - return keys } diff --git a/file_system/node.go b/file_system/node.go index 406cc98d6..e4f42ef59 100644 --- a/file_system/node.go +++ b/file_system/node.go @@ -1,7 +1,6 @@ package fileSystem import ( - "fmt" "path" "sort" "sync" @@ -25,7 +24,7 @@ type Node struct { CreateTerm uint64 ModifiedIndex uint64 ModifiedTerm uint64 - Parent *Node + Parent *Node `json:"-"` ExpireTime time.Time ACL string Value string // for key-value pair @@ -237,62 +236,89 @@ func (n *Node) Clone() *Node { return clone } -// IsDir function checks whether the node is a directory. -// If the node is a directory, the function will return true. -// Otherwise the function will return false. -func (n *Node) IsDir() bool { - - if n.Children == nil { // key-value pair - return false +func (n *Node) recoverAndclean() { + if n.IsDir() { + for _, child := range n.Children { + child.Parent = n + child.recoverAndclean() + } } - return true + n.stopExpire = make(chan bool, 1) + + n.Expire() } func (n *Node) Expire() { - duration := n.ExpireTime.Sub(time.Now()) - if duration <= 0 { + expired, duration := n.IsExpired() + + if expired { // has been expired n.Remove(true, nil) return } - select { - // if timeout, delete the node - case <-time.After(duration): - n.Remove(true, nil) + if duration == 0 { // Permanent Node return - - // if stopped, return - case <-n.stopExpire: - fmt.Println("expire stopped") - return - } + + go func() { // do monitoring + select { + // if timeout, delete the node + case <-time.After(duration): + n.Remove(true, nil) + return + + // if stopped, return + case <-n.stopExpire: + return + + } + }() } // IsHidden function checks if the node is a hidden node. A hidden node // will begin with '_' - // A hidden node will not be shown via get command under a directory // For example if we have /foo/_hidden and /foo/notHidden, get "/foo" // will only return /foo/notHidden func (n *Node) IsHidden() bool { _, name := path.Split(n.Path) - if name[0] == '_' { //hidden - return true + return name[0] == '_' + +} + +func (n *Node) IsPermanent() bool { + return n.ExpireTime.Sub(Permanent) == 0 +} + +func (n *Node) IsExpired() (bool, time.Duration) { + if n.IsPermanent() { + return false, 0 } - return false + duration := n.ExpireTime.Sub(time.Now()) + if duration <= 0 { + return true, 0 + } + + return false, duration +} + +// IsDir function checks whether the node is a directory. +// If the node is a directory, the function will return true. +// Otherwise the function will return false. +func (n *Node) IsDir() bool { + return !(n.Children == nil) } func (n *Node) Pair(recurisive, sorted bool) KeyValuePair { - if n.IsDir() { pair := KeyValuePair{ Key: n.Path, Dir: true, } + if !recurisive { return pair } diff --git a/file_system/watcher.go b/file_system/watcher.go index c17d0eb87..728d11fd4 100644 --- a/file_system/watcher.go +++ b/file_system/watcher.go @@ -99,7 +99,6 @@ func (wh *watcherHub) notifyWithPath(e *Event, path string, force bool) { } func (wh *watcherHub) notify(e *Event) { - segments := strings.Split(e.Key, "/") currPath := "/" @@ -109,4 +108,6 @@ func (wh *watcherHub) notify(e *Event) { currPath = path.Join(currPath, segment) wh.notifyWithPath(e, currPath, false) } + + wh.EventHistory.addEvent(e) }