fix(mod/lock): correct watch index to remove cpu load

The waitIndex was being pulled from the wrong node in the lock parent which caused the watch to be returned immediately. This caused a continuous set of calls while a client was waiting for a lock.
This commit is contained in:
Ben Johnson 2014-02-22 16:32:59 -07:00
parent f922a08a27
commit dde2b71850
2 changed files with 11 additions and 13 deletions

View File

@ -165,22 +165,17 @@ func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error
return fmt.Errorf("lock watch lookup error: %s", err.Error()) return fmt.Errorf("lock watch lookup error: %s", err.Error())
} }
nodes := lockNodes{resp.Node.Nodes} nodes := lockNodes{resp.Node.Nodes}
prevIndex := nodes.PrevIndex(index) prevIndex, modifiedIndex := nodes.PrevIndex(index)
// If there is no previous index then we have the lock. // If there is no previous index then we have the lock.
if prevIndex == 0 { if prevIndex == 0 {
return nil return nil
} }
// Watch previous index until it's gone. // Wait from the last modification of the node.
waitIndex := resp.Node.ModifiedIndex waitIndex := modifiedIndex + 1
// Since event store has only 1000 histories we should use first node's CreatedIndex if available resp, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), uint64(waitIndex), false, nil, stopWatchChan)
if firstNode := nodes.First(); firstNode != nil {
waitIndex = firstNode.CreatedIndex
}
_, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
if err == etcd.ErrWatchStoppedByUser { if err == etcd.ErrWatchStoppedByUser {
return fmt.Errorf("lock watch closed") return fmt.Errorf("lock watch closed")
} else if err != nil { } else if err != nil {

View File

@ -41,17 +41,20 @@ func (s lockNodes) FindByValue(value string) (*etcd.Node, int) {
return nil, 0 return nil, 0
} }
// Retrieves the index that occurs before a given index. // Find the node with the largest index in the lockNodes that is smaller than the given index. Also return the lastModified index of that node.
func (s lockNodes) PrevIndex(index int) int { func (s lockNodes) PrevIndex(index int) (int, int) {
sort.Sort(s) sort.Sort(s)
// Iterate over each node to find the given index. We keep track of the
// previous index on each iteration so we can return it when we match the
// index we're looking for.
var prevIndex int var prevIndex int
for _, node := range s.Nodes { for _, node := range s.Nodes {
idx, _ := strconv.Atoi(path.Base(node.Key)) idx, _ := strconv.Atoi(path.Base(node.Key))
if index == idx { if index == idx {
return prevIndex return prevIndex, int(node.ModifiedIndex)
} }
prevIndex = idx prevIndex = idx
} }
return 0 return 0, 0
} }