etcd/store/watcher.go
2013-10-07 09:48:28 -07:00

146 lines
3.2 KiB
Go

/*
Copyright 2013 CoreOS Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"path"
"strconv"
"strings"
)
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// WatcherHub is where the client register its watcher
type WatcherHub struct {
watchers map[string][]*Watcher
}
// Currently watcher only contains a response channel
type Watcher struct {
C chan *Response
}
// Create a new watcherHub
func newWatcherHub() *WatcherHub {
w := new(WatcherHub)
w.watchers = make(map[string][]*Watcher)
return w
}
// Create a new watcher
func NewWatcher() *Watcher {
return &Watcher{C: make(chan *Response, 1)}
}
// Add a watcher to the watcherHub
func (w *WatcherHub) addWatcher(prefix string, watcher *Watcher, sinceIndex uint64,
responseStartIndex uint64, currentIndex uint64, resMap map[string]*Response) error {
prefix = path.Clean("/" + prefix)
if sinceIndex != 0 && sinceIndex >= responseStartIndex {
for i := sinceIndex; i <= currentIndex; i++ {
if checkResponse(prefix, i, resMap) {
watcher.C <- resMap[strconv.FormatUint(i, 10)]
return nil
}
}
}
_, ok := w.watchers[prefix]
if !ok {
w.watchers[prefix] = make([]*Watcher, 0)
}
w.watchers[prefix] = append(w.watchers[prefix], watcher)
return nil
}
// Check if the response has what we are watching
func checkResponse(prefix string, index uint64, resMap map[string]*Response) bool {
resp, ok := resMap[strconv.FormatUint(index, 10)]
if !ok {
// not storage system command
return false
} else {
path := resp.Key
if strings.HasPrefix(path, prefix) {
prefixLen := len(prefix)
if len(path) == prefixLen || path[prefixLen] == '/' {
return true
}
}
}
return false
}
// Notify the watcher a action happened
func (w *WatcherHub) notify(resp Response) error {
resp.Key = path.Clean(resp.Key)
segments := strings.Split(resp.Key, "/")
currPath := "/"
// walk through all the pathes
for _, segment := range segments {
currPath = path.Join(currPath, segment)
watchers, ok := w.watchers[currPath]
if ok {
newWatchers := make([]*Watcher, 0)
// notify all the watchers
for _, watcher := range watchers {
watcher.C <- &resp
}
if len(newWatchers) == 0 {
// we have notified all the watchers at this path
// delete the map
delete(w.watchers, currPath)
} else {
w.watchers[currPath] = newWatchers
}
}
}
return nil
}
// stopWatchers stops all the watchers
// This function is used when the etcd recovery from a snapshot at runtime
func (w *WatcherHub) stopWatchers() {
for _, subWatchers := range w.watchers {
for _, watcher := range subWatchers {
watcher.C <- nil
}
}
w.watchers = nil
}