refactor event

This commit is contained in:
Xiang Li 2013-11-27 23:04:52 -05:00
parent 08c59895b5
commit b7d07ea5c8
13 changed files with 244 additions and 219 deletions

View File

@ -70,7 +70,7 @@ func (r *Registry) Count() int {
if err != nil {
return 0
}
return len(e.KVPairs)
return len(e.Node.Nodes)
}
// Retrieves the client URL for a given node by name.
@ -135,7 +135,7 @@ func (r *Registry) urls(leaderName, selfName string, url func(name string) (stri
// Retrieve a list of all nodes.
if e, _ := r.store.Get(RegistryKey, false, false); e != nil {
// Lookup the URL for each one.
for _, pair := range e.KVPairs {
for _, pair := range e.Node.Nodes {
_, name := filepath.Split(pair.Key)
if url, _ := url(name); len(url) > 0 && name != leaderName {
urls = append(urls, url)
@ -166,7 +166,7 @@ func (r *Registry) load(name string) {
}
// Parse as a query string.
m, err := url.ParseQuery(e.Value)
m, err := url.ParseQuery(e.Node.Value)
if err != nil {
panic(fmt.Sprintf("Failed to parse peers entry: %s", name))
}

View File

@ -24,6 +24,6 @@ func TestV2DeleteKey(t *testing.T) {
resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"delete","key":"/foo/bar","prevValue":"XXX","modifiedIndex":2}`, "")
assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":2}}`, "")
})
}

View File

@ -25,9 +25,11 @@ func TestV2GetKey(t *testing.T) {
resp, _ = tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"))
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "get", "")
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["modifiedIndex"], 1, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "XXX", "")
assert.Equal(t, node["modifiedIndex"], 1, "")
})
}
@ -52,23 +54,25 @@ func TestV2GetKeyRecursively(t *testing.T) {
resp, _ = tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true"))
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "get", "")
assert.Equal(t, body["key"], "/foo", "")
assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["modifiedIndex"], 1, "")
assert.Equal(t, len(body["kvs"].([]interface{})), 2, "")
kv0 := body["kvs"].([]interface{})[0].(map[string]interface{})
assert.Equal(t, kv0["key"], "/foo/x", "")
assert.Equal(t, kv0["value"], "XXX", "")
assert.Equal(t, kv0["ttl"], 10, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo", "")
assert.Equal(t, node["dir"], true, "")
assert.Equal(t, node["modifiedIndex"], 1, "")
assert.Equal(t, len(node["nodes"].([]interface{})), 2, "")
kv1 := body["kvs"].([]interface{})[1].(map[string]interface{})
assert.Equal(t, kv1["key"], "/foo/y", "")
assert.Equal(t, kv1["dir"], true, "")
node0 := node["nodes"].([]interface{})[0].(map[string]interface{})
assert.Equal(t, node0["key"], "/foo/x", "")
assert.Equal(t, node0["value"], "XXX", "")
assert.Equal(t, node0["ttl"], 10, "")
kvs2 := kv1["kvs"].([]interface{})[0].(map[string]interface{})
assert.Equal(t, kvs2["key"], "/foo/y/z", "")
assert.Equal(t, kvs2["value"], "YYY", "")
node1 := node["nodes"].([]interface{})[1].(map[string]interface{})
assert.Equal(t, node1["key"], "/foo/y", "")
assert.Equal(t, node1["dir"], true, "")
node2 := node1["nodes"].([]interface{})[0].(map[string]interface{})
assert.Equal(t, node2["key"], "/foo/y/z", "")
assert.Equal(t, node2["value"], "YYY", "")
})
}
@ -109,9 +113,11 @@ func TestV2WatchKey(t *testing.T) {
assert.NotNil(t, body, "")
assert.Equal(t, body["action"], "set", "")
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "XXX", "")
assert.Equal(t, body["modifiedIndex"], 1, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "XXX", "")
assert.Equal(t, node["modifiedIndex"], 1, "")
})
}
@ -162,8 +168,10 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
assert.NotNil(t, body, "")
assert.Equal(t, body["action"], "set", "")
assert.Equal(t, body["key"], "/foo/bar", "")
assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["modifiedIndex"], 2, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar", "")
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 2, "")
})
}

View File

@ -21,18 +21,22 @@ func TestV2CreateUnique(t *testing.T) {
resp, _ := tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "create", "")
assert.Equal(t, body["key"], "/foo/bar/1", "")
assert.Equal(t, body["dir"], true, "")
assert.Equal(t, body["modifiedIndex"], 1, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar/1", "")
assert.Equal(t, node["dir"], true, "")
assert.Equal(t, node["modifiedIndex"], 1, "")
// Second POST should add next index to list.
resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
body = tests.ReadBodyJSON(resp)
assert.Equal(t, body["key"], "/foo/bar/2", "")
node = body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/bar/2", "")
// POST to a different key should add index to that list.
resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
body = tests.ReadBodyJSON(resp)
assert.Equal(t, body["key"], "/foo/baz/3", "")
node = body["node"].(map[string]interface{})
assert.Equal(t, node["key"], "/foo/baz/3", "")
})
}

View File

@ -22,7 +22,7 @@ func TestV2SetKey(t *testing.T) {
resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBody(resp)
assert.Nil(t, err, "")
assert.Equal(t, string(body), `{"action":"set","key":"/foo/bar","value":"XXX","modifiedIndex":1}`, "")
assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":1}}`, "")
})
}
@ -38,10 +38,11 @@ func TestV2SetKeyWithTTL(t *testing.T) {
v.Set("ttl", "20")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["ttl"], 20, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["ttl"], 20, "")
// Make sure the expiration date is correct.
expiration, _ := time.Parse(time.RFC3339Nano, body["expiration"].(string))
expiration, _ := time.Parse(time.RFC3339Nano, node["expiration"].(string))
assert.Equal(t, expiration.Sub(t0)/time.Second, 20, "")
})
}
@ -74,7 +75,8 @@ func TestV2CreateKeySuccess(t *testing.T) {
v.Set("prevExist", "false")
resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["value"], "XXX", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["value"], "XXX", "")
})
}
@ -116,7 +118,9 @@ func TestV2UpdateKeySuccess(t *testing.T) {
resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "update", "")
assert.Equal(t, body["prevValue"], "XXX", "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["prevValue"], "XXX", "")
})
}
@ -173,9 +177,11 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["modifiedIndex"], 2, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["prevValue"], "XXX", "")
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 2, "")
})
}
@ -234,9 +240,11 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
body := tests.ReadBodyJSON(resp)
assert.Equal(t, body["action"], "compareAndSwap", "")
assert.Equal(t, body["prevValue"], "XXX", "")
assert.Equal(t, body["value"], "YYY", "")
assert.Equal(t, body["modifiedIndex"], 2, "")
node := body["node"].(map[string]interface{})
assert.Equal(t, node["prevValue"], "XXX", "")
assert.Equal(t, node["value"], "YYY", "")
assert.Equal(t, node["modifiedIndex"], 2, "")
})
}

View File

@ -1,9 +1,5 @@
package store
import (
"time"
)
const (
Get = "get"
Create = "create"
@ -15,22 +11,20 @@ const (
)
type Event struct {
Action string `json:"action"`
Key string `json:"key, omitempty"`
Dir bool `json:"dir,omitempty"`
PrevValue string `json:"prevValue,omitempty"`
Value string `json:"value,omitempty"`
KVPairs kvPairs `json:"kvs,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"`
TTL int64 `json:"ttl,omitempty"` // Time to live in second
ModifiedIndex uint64 `json:"modifiedIndex"`
Action string `json:"action"`
Node *Node `json:"node,omitempty"`
}
func newEvent(action string, key string, index uint64) *Event {
return &Event{
Action: action,
func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
n := &Node{
Key: key,
ModifiedIndex: index,
ModifiedIndex: modifiedIndex,
CreatedIndex: createdIndex,
}
return &Event{
Action: action,
Node: n,
}
}
@ -39,7 +33,7 @@ func (e *Event) IsCreated() bool {
return true
}
if e.Action == Set && e.PrevValue == "" {
if e.Action == Set && e.Node.PrevValue == "" {
return true
}
@ -47,20 +41,20 @@ func (e *Event) IsCreated() bool {
}
func (e *Event) Index() uint64 {
return e.ModifiedIndex
return e.Node.ModifiedIndex
}
// Converts an event object into a response object.
func (event *Event) Response() interface{} {
if !event.Dir {
if !event.Node.Dir {
response := &Response{
Action: event.Action,
Key: event.Key,
Value: event.Value,
PrevValue: event.PrevValue,
Index: event.ModifiedIndex,
TTL: event.TTL,
Expiration: event.Expiration,
Key: event.Node.Key,
Value: event.Node.Value,
PrevValue: event.Node.PrevValue,
Index: event.Node.ModifiedIndex,
TTL: event.Node.TTL,
Expiration: event.Node.Expiration,
}
if response.Action == Set {
@ -75,15 +69,15 @@ func (event *Event) Response() interface{} {
return response
} else {
responses := make([]*Response, len(event.KVPairs))
responses := make([]*Response, len(event.Node.Nodes))
for i, kv := range event.KVPairs {
for i, node := range event.Node.Nodes {
responses[i] = &Response{
Action: event.Action,
Key: kv.Key,
Value: kv.Value,
Dir: kv.Dir,
Index: event.ModifiedIndex,
Key: node.Key,
Value: node.Value,
Dir: node.Dir,
Index: node.ModifiedIndex,
}
}
return responses

View File

@ -33,7 +33,7 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
eh.LastIndex = e.Index()
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].ModifiedIndex
eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index()
return e
}
@ -62,7 +62,7 @@ func (eh *EventHistory) scan(prefix string, index uint64) (*Event, *etcdErr.Erro
for {
e := eh.Queue.Events[i]
if strings.HasPrefix(e.Key, prefix) && index <= e.Index() { // make sure we bypass the smaller one
if strings.HasPrefix(e.Node.Key, prefix) && index <= e.Index() { // make sure we bypass the smaller one
return e, nil
}

View File

@ -1,31 +0,0 @@
package store
import (
"time"
)
// When user list a directory, we add all the node into key-value pair slice
type KeyValuePair struct {
Key string `json:"key, omitempty"`
Value string `json:"value,omitempty"`
Dir bool `json:"dir,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"`
TTL int64 `json:"ttl,omitempty"` // Time to live in second
KVPairs kvPairs `json:"kvs,omitempty"`
ModifiedIndex uint64 `json:"modifiedIndex,omitempty"`
}
type kvPairs []KeyValuePair
// interfaces for sorting
func (kvs kvPairs) Len() int {
return len(kvs)
}
func (kvs kvPairs) Less(i, j int) bool {
return kvs[i].Key < kvs[j].Key
}
func (kvs kvPairs) Swap(i, j int) {
kvs[i], kvs[j] = kvs[j], kvs[i]
}

View File

@ -16,7 +16,7 @@ var Permanent time.Time
type node struct {
Path string
CreateIndex uint64
CreatedIndex uint64
ModifiedIndex uint64
Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
@ -31,13 +31,13 @@ type node struct {
}
// newKV creates a Key-Value pair
func newKV(store *store, nodePath string, value string, createIndex uint64,
func newKV(store *store, nodePath string, value string, createdIndex uint64,
parent *node, ACL string, expireTime time.Time) *node {
return &node{
Path: nodePath,
CreateIndex: createIndex,
ModifiedIndex: createIndex,
CreatedIndex: createdIndex,
ModifiedIndex: createdIndex,
Parent: parent,
ACL: ACL,
store: store,
@ -47,13 +47,13 @@ func newKV(store *store, nodePath string, value string, createIndex uint64,
}
// newDir creates a directory
func newDir(store *store, nodePath string, createIndex uint64, parent *node,
func newDir(store *store, nodePath string, createdIndex uint64, parent *node,
ACL string, expireTime time.Time) *node {
return &node{
Path: nodePath,
CreateIndex: createIndex,
ModifiedIndex: createIndex,
CreatedIndex: createdIndex,
ModifiedIndex: createdIndex,
Parent: parent,
ACL: ACL,
ExpireTime: expireTime,
@ -223,21 +223,21 @@ func (n *node) Remove(recursive bool, callback func(path string)) *etcdErr.Error
return nil
}
func (n *node) Pair(recurisive, sorted bool) KeyValuePair {
func (n *node) Repr(recurisive, sorted bool) Node {
if n.IsDir() {
pair := KeyValuePair{
node := Node{
Key: n.Path,
Dir: true,
ModifiedIndex: n.ModifiedIndex,
}
pair.Expiration, pair.TTL = n.ExpirationAndTTL()
node.Expiration, node.TTL = n.ExpirationAndTTL()
if !recurisive {
return pair
return node
}
children, _ := n.List()
pair.KVPairs = make([]KeyValuePair, len(children))
node.Nodes = make(Nodes, len(children))
// we do not use the index in the children slice directly
// we need to skip the hidden one
@ -249,27 +249,27 @@ func (n *node) Pair(recurisive, sorted bool) KeyValuePair {
continue
}
pair.KVPairs[i] = child.Pair(recurisive, sorted)
node.Nodes[i] = child.Repr(recurisive, sorted)
i++
}
// eliminate hidden nodes
pair.KVPairs = pair.KVPairs[:i]
node.Nodes = node.Nodes[:i]
if sorted {
sort.Sort(pair.KVPairs)
sort.Sort(node.Nodes)
}
return pair
return node
}
pair := KeyValuePair{
node := Node{
Key: n.Path,
Value: n.Value,
ModifiedIndex: n.ModifiedIndex,
}
pair.Expiration, pair.TTL = n.ExpirationAndTTL()
return pair
node.Expiration, node.TTL = n.ExpirationAndTTL()
return node
}
func (n *node) UpdateTTL(expireTime time.Time) {
@ -301,10 +301,10 @@ func (n *node) UpdateTTL(expireTime time.Time) {
// If the node is a key-value pair, it will clone the pair.
func (n *node) Clone() *node {
if !n.IsDir() {
return newKV(n.store, n.Path, n.Value, n.CreateIndex, n.Parent, n.ACL, n.ExpireTime)
return newKV(n.store, n.Path, n.Value, n.CreatedIndex, n.Parent, n.ACL, n.ExpireTime)
}
clone := newDir(n.store, n.Path, n.CreateIndex, n.Parent, n.ACL, n.ExpireTime)
clone := newDir(n.store, n.Path, n.CreatedIndex, n.Parent, n.ACL, n.ExpireTime)
for key, child := range n.Children {
clone.Children[key] = child.Clone()

35
store/node_repr.go Normal file
View File

@ -0,0 +1,35 @@
package store
import (
"time"
)
// Node is the representation of the internal node with additional fields
// PrevValue is the previous value of the node
// TTL is time to live in second
type Node struct {
Key string `json:"key, omitempty"`
PrevValue string `json:"prevValue,omitempty"`
Value string `json:"value,omitempty"`
Dir bool `json:"dir,omitempty"`
Expiration *time.Time `json:"expiration,omitempty"`
TTL int64 `json:"ttl,omitempty"`
Nodes Nodes `json:"nodes,omitempty"`
ModifiedIndex uint64 `json:"modifiedIndex,omitempty"`
CreatedIndex uint64 `json:"createdIndex,omitempty"`
}
type Nodes []Node
// interfaces for sorting
func (ns Nodes) Len() int {
return len(ns)
}
func (ns Nodes) Less(i, j int) bool {
return ns[i].Key < ns[j].Key
}
func (ns Nodes) Swap(i, j int) {
ns[i], ns[j] = ns[j], ns[i]
}

View File

@ -113,13 +113,14 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
return nil, err
}
e := newEvent(Get, nodePath, n.ModifiedIndex)
e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
eNode := e.Node
if n.IsDir() { // node is a directory
e.Dir = true
eNode.Dir = true
children, _ := n.List()
e.KVPairs = make([]KeyValuePair, len(children))
eNode.Nodes = make(Nodes, len(children))
// we do not use the index in the children slice directly
// we need to skip the hidden one
@ -130,22 +131,22 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
continue
}
e.KVPairs[i] = child.Pair(recursive, sorted)
eNode.Nodes[i] = child.Repr(recursive, sorted)
i++
}
// eliminate hidden nodes
e.KVPairs = e.KVPairs[:i]
eNode.Nodes = eNode.Nodes[:i]
if sorted {
sort.Sort(e.KVPairs)
sort.Sort(eNode.Nodes)
}
} else { // node is a file
e.Value, _ = n.Read()
eNode.Value, _ = n.Read()
}
e.Expiration, e.TTL = n.ExpirationAndTTL()
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
s.Stats.Inc(GetSuccess)
@ -214,15 +215,17 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
// update etcd index
s.CurrentIndex++
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex)
e.PrevValue = n.Value
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
eNode := e.Node
eNode.PrevValue = n.Value
// if test succeed, write the value
n.Write(value, s.CurrentIndex)
n.UpdateTTL(expireTime)
e.Value = value
e.Expiration, e.TTL = n.ExpirationAndTTL()
eNode.Value = value
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndSwapSuccess)
@ -251,12 +254,13 @@ func (s *store) Delete(nodePath string, recursive bool) (*Event, error) {
return nil, err
}
e := newEvent(Delete, nodePath, nextIndex)
e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
eNode := e.Node
if n.IsDir() {
e.Dir = true
eNode.Dir = true
} else {
e.PrevValue = n.Value
eNode.PrevValue = eNode.Value
}
callback := func(path string) { // notify function
@ -348,7 +352,8 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
return nil, err
}
e := newEvent(Update, nodePath, nextIndex)
e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
eNode := e.Node
if len(newValue) != 0 {
if n.IsDir() {
@ -357,18 +362,19 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
}
e.PrevValue = n.Value
eNode.PrevValue = n.Value
n.Write(newValue, nextIndex)
e.Value = newValue
eNode.Value = newValue
} else {
// do not update value
e.Value = n.Value
eNode.Value = n.Value
}
// update ttl
n.UpdateTTL(expireTime)
e.Expiration, e.TTL = n.ExpirationAndTTL()
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
s.WatcherHub.notify(e)
@ -407,7 +413,8 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
return nil, err
}
e := newEvent(action, nodePath, nextIndex)
e := newEvent(action, nodePath, nextIndex, nextIndex)
eNode := e.Node
n, _ := d.GetChild(newnodeName)
@ -417,7 +424,7 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
if n.IsDir() {
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
}
e.PrevValue, _ = n.Read()
eNode.PrevValue, _ = n.Read()
n.Remove(false, nil)
} else {
@ -426,12 +433,12 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
}
if len(value) != 0 { // create file
e.Value = value
eNode.Value = value
n = newKV(s, nodePath, value, nextIndex, d, "", expireTime)
} else { // create directory
e.Dir = true
eNode.Dir = true
n = newDir(s, nodePath, nextIndex, d, "", expireTime)
@ -444,7 +451,7 @@ func (s *store) internalCreate(nodePath string, value string, unique bool, repla
if !n.IsPermanent() {
s.ttlKeyHeap.push(n)
e.Expiration, e.TTL = n.ExpirationAndTTL()
eNode.Expiration, eNode.TTL = n.ExpirationAndTTL()
}
s.CurrentIndex = nextIndex
@ -497,7 +504,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
s.CurrentIndex++
s.Stats.Inc(ExpireCount)
s.WatcherHub.notify(newEvent(Expire, node.Path, s.CurrentIndex))
s.WatcherHub.notify(newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex))
}
}

View File

@ -31,8 +31,8 @@ func TestStoreGetValue(t *testing.T) {
e, err := s.Get("/foo", false, false)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "get", "")
assert.Equal(t, e.Key, "/foo", "")
assert.Equal(t, e.Value, "bar", "")
assert.Equal(t, e.Node.Key, "/foo", "")
assert.Equal(t, e.Node.Value, "bar", "")
}
// Ensure that the store can recrusively retrieve a directory listing.
@ -49,21 +49,21 @@ func TestStoreGetDirectory(t *testing.T) {
e, err := s.Get("/foo", true, false)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "get", "")
assert.Equal(t, e.Key, "/foo", "")
assert.Equal(t, len(e.KVPairs), 2, "")
assert.Equal(t, e.KVPairs[0].Key, "/foo/bar", "")
assert.Equal(t, e.KVPairs[0].Value, "X", "")
assert.Equal(t, e.KVPairs[0].Dir, false, "")
assert.Equal(t, e.KVPairs[1].Key, "/foo/baz", "")
assert.Equal(t, e.KVPairs[1].Dir, true, "")
assert.Equal(t, len(e.KVPairs[1].KVPairs), 2, "")
assert.Equal(t, e.KVPairs[1].KVPairs[0].Key, "/foo/baz/bat", "")
assert.Equal(t, e.KVPairs[1].KVPairs[0].Value, "Y", "")
assert.Equal(t, e.KVPairs[1].KVPairs[0].Dir, false, "")
assert.Equal(t, e.KVPairs[1].KVPairs[1].Key, "/foo/baz/ttl", "")
assert.Equal(t, e.KVPairs[1].KVPairs[1].Value, "Y", "")
assert.Equal(t, e.KVPairs[1].KVPairs[1].Dir, false, "")
assert.Equal(t, e.KVPairs[1].KVPairs[1].TTL, 3, "")
assert.Equal(t, e.Node.Key, "/foo", "")
assert.Equal(t, len(e.Node.Nodes), 2, "")
assert.Equal(t, e.Node.Nodes[0].Key, "/foo/bar", "")
assert.Equal(t, e.Node.Nodes[0].Value, "X", "")
assert.Equal(t, e.Node.Nodes[0].Dir, false, "")
assert.Equal(t, e.Node.Nodes[1].Key, "/foo/baz", "")
assert.Equal(t, e.Node.Nodes[1].Dir, true, "")
assert.Equal(t, len(e.Node.Nodes[1].Nodes), 2, "")
assert.Equal(t, e.Node.Nodes[1].Nodes[0].Key, "/foo/baz/bat", "")
assert.Equal(t, e.Node.Nodes[1].Nodes[0].Value, "Y", "")
assert.Equal(t, e.Node.Nodes[1].Nodes[0].Dir, false, "")
assert.Equal(t, e.Node.Nodes[1].Nodes[1].Key, "/foo/baz/ttl", "")
assert.Equal(t, e.Node.Nodes[1].Nodes[1].Value, "Y", "")
assert.Equal(t, e.Node.Nodes[1].Nodes[1].Dir, false, "")
assert.Equal(t, e.Node.Nodes[1].Nodes[1].TTL, 3, "")
}
// Ensure that the store can retrieve a directory in sorted order.
@ -77,11 +77,11 @@ func TestStoreGetSorted(t *testing.T) {
s.Create("/foo/y/b", "0", false, Permanent)
e, err := s.Get("/foo", true, true)
assert.Nil(t, err, "")
assert.Equal(t, e.KVPairs[0].Key, "/foo/x", "")
assert.Equal(t, e.KVPairs[1].Key, "/foo/y", "")
assert.Equal(t, e.KVPairs[1].KVPairs[0].Key, "/foo/y/a", "")
assert.Equal(t, e.KVPairs[1].KVPairs[1].Key, "/foo/y/b", "")
assert.Equal(t, e.KVPairs[2].Key, "/foo/z", "")
assert.Equal(t, e.Node.Nodes[0].Key, "/foo/x", "")
assert.Equal(t, e.Node.Nodes[1].Key, "/foo/y", "")
assert.Equal(t, e.Node.Nodes[1].Nodes[0].Key, "/foo/y/a", "")
assert.Equal(t, e.Node.Nodes[1].Nodes[1].Key, "/foo/y/b", "")
assert.Equal(t, e.Node.Nodes[2].Key, "/foo/z", "")
}
// Ensure that the store can create a new key if it doesn't already exist.
@ -90,14 +90,14 @@ func TestStoreCreateValue(t *testing.T) {
e, err := s.Create("/foo", "bar", false, Permanent)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo", "")
assert.False(t, e.Dir, "")
assert.Equal(t, e.PrevValue, "", "")
assert.Equal(t, e.Value, "bar", "")
assert.Nil(t, e.KVPairs, "")
assert.Nil(t, e.Expiration, "")
assert.Equal(t, e.TTL, 0, "")
assert.Equal(t, e.ModifiedIndex, uint64(1), "")
assert.Equal(t, e.Node.Key, "/foo", "")
assert.False(t, e.Node.Dir, "")
assert.Equal(t, e.Node.PrevValue, "", "")
assert.Equal(t, e.Node.Value, "bar", "")
assert.Nil(t, e.Node.Nodes, "")
assert.Nil(t, e.Node.Expiration, "")
assert.Equal(t, e.Node.TTL, 0, "")
assert.Equal(t, e.Node.ModifiedIndex, uint64(1), "")
}
// Ensure that the store can create a new directory if it doesn't already exist.
@ -106,8 +106,8 @@ func TestStoreCreateDirectory(t *testing.T) {
e, err := s.Create("/foo", "", false, Permanent)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo", "")
assert.True(t, e.Dir, "")
assert.Equal(t, e.Node.Key, "/foo", "")
assert.True(t, e.Node.Dir, "")
}
// Ensure that the store fails to create a key if it already exists.
@ -130,14 +130,14 @@ func TestStoreUpdateValue(t *testing.T) {
e, err := s.Update("/foo", "baz", Permanent)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Key, "/foo", "")
assert.False(t, e.Dir, "")
assert.Equal(t, e.PrevValue, "bar", "")
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.TTL, 0, "")
assert.Equal(t, e.ModifiedIndex, uint64(2), "")
assert.Equal(t, e.Node.Key, "/foo", "")
assert.False(t, e.Node.Dir, "")
assert.Equal(t, e.Node.PrevValue, "bar", "")
assert.Equal(t, e.Node.Value, "baz", "")
assert.Equal(t, e.Node.TTL, 0, "")
assert.Equal(t, e.Node.ModifiedIndex, uint64(2), "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.Value, "baz", "")
}
// Ensure that the store cannot update a directory.
@ -165,7 +165,7 @@ func TestStoreUpdateValueTTL(t *testing.T) {
s.Create("/foo", "bar", false, Permanent)
_, err := s.Update("/foo", "baz", time.Now().Add(500*time.Millisecond))
e, _ := s.Get("/foo", false, false)
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.Value, "baz", "")
time.Sleep(600 * time.Millisecond)
e, err = s.Get("/foo", false, false)
@ -187,7 +187,7 @@ func TestStoreUpdateDirTTL(t *testing.T) {
s.Create("/foo/bar", "baz", false, Permanent)
_, err := s.Update("/foo", "", time.Now().Add(500*time.Millisecond))
e, _ := s.Get("/foo/bar", false, false)
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.Value, "baz", "")
time.Sleep(600 * time.Millisecond)
e, err = s.Get("/foo/bar", false, false)
@ -231,10 +231,10 @@ func TestStoreCompareAndSwapPrevValue(t *testing.T) {
e, err := s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.PrevValue, "bar", "")
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.PrevValue, "bar", "")
assert.Equal(t, e.Node.Value, "baz", "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.Value, "baz", "")
}
// Ensure that the store cannot conditionally update a key if it has the wrong previous value.
@ -247,7 +247,7 @@ func TestStoreCompareAndSwapPrevValueFailsIfNotMatch(t *testing.T) {
assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "")
assert.Equal(t, e.Node.Value, "bar", "")
}
// Ensure that the store can conditionally update a key if it has a previous index.
@ -257,10 +257,10 @@ func TestStoreCompareAndSwapPrevIndex(t *testing.T) {
e, err := s.CompareAndSwap("/foo", "", 1, "baz", Permanent)
assert.Nil(t, err, "")
assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.PrevValue, "bar", "")
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.PrevValue, "bar", "")
assert.Equal(t, e.Node.Value, "baz", "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.Value, "baz", "")
}
// Ensure that the store cannot conditionally update a key if it has the wrong previous index.
@ -273,7 +273,7 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
assert.Equal(t, err.Message, "Test Failed", "")
assert.Nil(t, e, "")
e, _ = s.Get("/foo", false, false)
assert.Equal(t, e.Value, "bar", "")
assert.Equal(t, e.Node.Value, "bar", "")
}
// Ensure that the store can watch for key creation.
@ -283,7 +283,7 @@ func TestStoreWatchCreate(t *testing.T) {
s.Create("/foo", "bar", false, Permanent)
e := nbselect(c)
assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo", "")
assert.Equal(t, e.Node.Key, "/foo", "")
e = nbselect(c)
assert.Nil(t, e, "")
}
@ -295,7 +295,7 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
s.Create("/foo/bar", "baz", false, Permanent)
e := nbselect(c)
assert.Equal(t, e.Action, "create", "")
assert.Equal(t, e.Key, "/foo/bar", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "")
}
// Ensure that the store can watch for key updates.
@ -306,7 +306,7 @@ func TestStoreWatchUpdate(t *testing.T) {
s.Update("/foo", "baz", Permanent)
e := nbselect(c)
assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Key, "/foo", "")
assert.Equal(t, e.Node.Key, "/foo", "")
}
// Ensure that the store can watch for recursive key updates.
@ -317,7 +317,7 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
s.Update("/foo/bar", "baz", Permanent)
e := nbselect(c)
assert.Equal(t, e.Action, "update", "")
assert.Equal(t, e.Key, "/foo/bar", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "")
}
// Ensure that the store can watch for key deletions.
@ -328,7 +328,7 @@ func TestStoreWatchDelete(t *testing.T) {
s.Delete("/foo", false)
e := nbselect(c)
assert.Equal(t, e.Action, "delete", "")
assert.Equal(t, e.Key, "/foo", "")
assert.Equal(t, e.Node.Key, "/foo", "")
}
// Ensure that the store can watch for recursive key deletions.
@ -339,7 +339,7 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
s.Delete("/foo/bar", false)
e := nbselect(c)
assert.Equal(t, e.Action, "delete", "")
assert.Equal(t, e.Key, "/foo/bar", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "")
}
// Ensure that the store can watch for CAS updates.
@ -350,7 +350,7 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
e := nbselect(c)
assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.Key, "/foo", "")
assert.Equal(t, e.Node.Key, "/foo", "")
}
// Ensure that the store can watch for recursive CAS updates.
@ -361,7 +361,7 @@ func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
e := nbselect(c)
assert.Equal(t, e.Action, "compareAndSwap", "")
assert.Equal(t, e.Key, "/foo/bar", "")
assert.Equal(t, e.Node.Key, "/foo/bar", "")
}
// Ensure that the store can watch for key expiration.
@ -383,11 +383,11 @@ func TestStoreWatchExpire(t *testing.T) {
time.Sleep(600 * time.Millisecond)
e = nbselect(c)
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Key, "/foo", "")
assert.Equal(t, e.Node.Key, "/foo", "")
c, _ = s.Watch("/", true, 4)
e = nbselect(c)
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Key, "/foofoo", "")
assert.Equal(t, e.Node.Key, "/foofoo", "")
}
// Ensure that the store can recover from a previously saved state.
@ -403,11 +403,11 @@ func TestStoreRecover(t *testing.T) {
e, err := s.Get("/foo/x", false, false)
assert.Nil(t, err, "")
assert.Equal(t, e.Value, "bar", "")
assert.Equal(t, e.Node.Value, "bar", "")
e, err = s.Get("/foo/y", false, false)
assert.Nil(t, err, "")
assert.Equal(t, e.Value, "baz", "")
assert.Equal(t, e.Node.Value, "baz", "")
}
// Ensure that the store can recover from a previously saved state that includes an expiring key.
@ -441,7 +441,7 @@ func TestStoreRecoverWithExpiration(t *testing.T) {
e, err := s.Get("/foo/x", false, false)
assert.Nil(t, err, "")
assert.Equal(t, e.Value, "bar", "")
assert.Equal(t, e.Node.Value, "bar", "")
e, err = s.Get("/foo/y", false, false)
assert.NotNil(t, err, "")

View File

@ -77,7 +77,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
func (wh *watcherHub) notify(e *Event) {
e = wh.EventHistory.addEvent(e) // add event into the eventHistory
segments := strings.Split(e.Key, "/")
segments := strings.Split(e.Node.Key, "/")
currPath := "/"
@ -111,7 +111,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, path string, deleted bool) {
w, _ := curr.Value.(*watcher)
if w.notify(e, e.Key == path, deleted) {
if w.notify(e, e.Node.Key == path, deleted) {
// if we successfully notify a watcher
// we need to remove the watcher from the list