Merge pull request #5448 from xiang90/fix_refrsh

etcd: fix refresh feature
This commit is contained in:
Xiang Li 2016-05-26 09:53:13 -07:00
commit 6acb3d67fb
5 changed files with 38 additions and 17 deletions

View File

@ -233,10 +233,11 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t
### Refreshing key TTL ### Refreshing key TTL
Keys in etcd can be refreshed without notifying watchers Keys in etcd can be refreshed without notifying current watchers.
this can be achieved by setting the refresh to true when updating a TTL
You cannot update the value of a key when refreshing it This can be achieved by setting the refresh to true when updating a TTL.
You cannot update the value of a key when refreshing it.
```sh ```sh
curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5 curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5

View File

@ -30,6 +30,7 @@ type Event struct {
Node *NodeExtern `json:"node,omitempty"` Node *NodeExtern `json:"node,omitempty"`
PrevNode *NodeExtern `json:"prevNode,omitempty"` PrevNode *NodeExtern `json:"prevNode,omitempty"`
EtcdIndex uint64 `json:"-"` EtcdIndex uint64 `json:"-"`
Refresh bool `json:"refresh,omitempty"`
} }
func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event { func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
@ -64,3 +65,7 @@ func (e *Event) Clone() *Event {
PrevNode: e.PrevNode.Clone(), PrevNode: e.PrevNode.Clone(),
} }
} }
func (e *Event) SetRefresh() {
e.Refresh = true
}

View File

@ -78,24 +78,26 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
for { for {
e := eh.Queue.Events[i] e := eh.Queue.Events[i]
ok := (e.Node.Key == key) if !e.Refresh {
ok := (e.Node.Key == key)
if recursive { if recursive {
// add tailing slash // add tailing slash
key = path.Clean(key) key = path.Clean(key)
if key[len(key)-1] != '/' { if key[len(key)-1] != '/' {
key = key + "/" key = key + "/"
}
ok = ok || strings.HasPrefix(e.Node.Key, key)
} }
ok = ok || strings.HasPrefix(e.Node.Key, key) if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
} ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
}
if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir { if ok {
ok = ok || strings.HasPrefix(key, e.PrevNode.Key) return e, nil
} }
if ok {
return e, nil
} }
i = (i + 1) % eh.Queue.Capacity i = (i + 1) % eh.Queue.Capacity

View File

@ -236,6 +236,9 @@ func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptio
if !expireOpts.Refresh { if !expireOpts.Refresh {
s.WatcherHub.notify(e) s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
} }
return e, nil return e, nil
@ -314,6 +317,9 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
if !expireOpts.Refresh { if !expireOpts.Refresh {
s.WatcherHub.notify(e) s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
} }
return e, nil return e, nil
@ -539,6 +545,9 @@ func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet
if !expireOpts.Refresh { if !expireOpts.Refresh {
s.WatcherHub.notify(e) s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
} }
s.CurrentIndex = nextIndex s.CurrentIndex = nextIndex

View File

@ -115,6 +115,10 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
return w, nil return w, nil
} }
func (wh *watcherHub) add(e *Event) {
e = wh.EventHistory.addEvent(e)
}
// notify function accepts an event and notify to the watchers. // notify function accepts an event and notify to the watchers.
func (wh *watcherHub) notify(e *Event) { func (wh *watcherHub) notify(e *Event) {
e = wh.EventHistory.addEvent(e) // add event into the eventHistory e = wh.EventHistory.addEvent(e) // add event into the eventHistory