mirror of
				https://github.com/etcd-io/etcd.git
				synced 2024-09-27 06:25:44 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			201 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			201 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2015 The etcd Authors
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package v2store
 | |
| 
 | |
| import (
 | |
| 	"container/list"
 | |
| 	"path"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 
 | |
| 	"github.com/coreos/etcd/etcdserver/v2error"
 | |
| )
 | |
| 
 | |
| // A watcherHub contains all subscribed watchers
 | |
| // watchers is a map with watched path as key and watcher as value
 | |
| // EventHistory keeps the old events for watcherHub. It is used to help
 | |
| // watcher to get a continuous event history. Or a watcher might miss the
 | |
| // event happens between the end of the first watch command and the start
 | |
| // of the second command.
 | |
| type watcherHub struct {
 | |
| 	// count must be the first element to keep 64-bit alignment for atomic
 | |
| 	// access
 | |
| 
 | |
| 	count int64 // current number of watchers.
 | |
| 
 | |
| 	mutex        sync.Mutex
 | |
| 	watchers     map[string]*list.List
 | |
| 	EventHistory *EventHistory
 | |
| }
 | |
| 
 | |
| // newWatchHub creates a watcherHub. The capacity determines how many events we will
 | |
| // keep in the eventHistory.
 | |
| // Typically, we only need to keep a small size of history[smaller than 20K].
 | |
| // Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
 | |
| func newWatchHub(capacity int) *watcherHub {
 | |
| 	return &watcherHub{
 | |
| 		watchers:     make(map[string]*list.List),
 | |
| 		EventHistory: newEventHistory(capacity),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Watch function returns a Watcher.
 | |
| // If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
 | |
| // If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
 | |
| // If index is zero, watch will start from the current index + 1.
 | |
| func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *v2error.Error) {
 | |
| 	reportWatchRequest()
 | |
| 	event, err := wh.EventHistory.scan(key, recursive, index)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		err.Index = storeIndex
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	w := &watcher{
 | |
| 		eventChan:  make(chan *Event, 100), // use a buffered channel
 | |
| 		recursive:  recursive,
 | |
| 		stream:     stream,
 | |
| 		sinceIndex: index,
 | |
| 		startIndex: storeIndex,
 | |
| 		hub:        wh,
 | |
| 	}
 | |
| 
 | |
| 	wh.mutex.Lock()
 | |
| 	defer wh.mutex.Unlock()
 | |
| 	// If the event exists in the known history, append the EtcdIndex and return immediately
 | |
| 	if event != nil {
 | |
| 		ne := event.Clone()
 | |
| 		ne.EtcdIndex = storeIndex
 | |
| 		w.eventChan <- ne
 | |
| 		return w, nil
 | |
| 	}
 | |
| 
 | |
| 	l, ok := wh.watchers[key]
 | |
| 
 | |
| 	var elem *list.Element
 | |
| 
 | |
| 	if ok { // add the new watcher to the back of the list
 | |
| 		elem = l.PushBack(w)
 | |
| 	} else { // create a new list and add the new watcher
 | |
| 		l = list.New()
 | |
| 		elem = l.PushBack(w)
 | |
| 		wh.watchers[key] = l
 | |
| 	}
 | |
| 
 | |
| 	w.remove = func() {
 | |
| 		if w.removed { // avoid removing it twice
 | |
| 			return
 | |
| 		}
 | |
| 		w.removed = true
 | |
| 		l.Remove(elem)
 | |
| 		atomic.AddInt64(&wh.count, -1)
 | |
| 		reportWatcherRemoved()
 | |
| 		if l.Len() == 0 {
 | |
| 			delete(wh.watchers, key)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	atomic.AddInt64(&wh.count, 1)
 | |
| 	reportWatcherAdded()
 | |
| 
 | |
| 	return w, nil
 | |
| }
 | |
| 
 | |
| func (wh *watcherHub) add(e *Event) {
 | |
| 	wh.EventHistory.addEvent(e)
 | |
| }
 | |
| 
 | |
| // notify function accepts an event and notify to the watchers.
 | |
| func (wh *watcherHub) notify(e *Event) {
 | |
| 	e = wh.EventHistory.addEvent(e) // add event into the eventHistory
 | |
| 
 | |
| 	segments := strings.Split(e.Node.Key, "/")
 | |
| 
 | |
| 	currPath := "/"
 | |
| 
 | |
| 	// walk through all the segments of the path and notify the watchers
 | |
| 	// if the path is "/foo/bar", it will notify watchers with path "/",
 | |
| 	// "/foo" and "/foo/bar"
 | |
| 
 | |
| 	for _, segment := range segments {
 | |
| 		currPath = path.Join(currPath, segment)
 | |
| 		// notify the watchers who interests in the changes of current path
 | |
| 		wh.notifyWatchers(e, currPath, false)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
 | |
| 	wh.mutex.Lock()
 | |
| 	defer wh.mutex.Unlock()
 | |
| 
 | |
| 	l, ok := wh.watchers[nodePath]
 | |
| 	if ok {
 | |
| 		curr := l.Front()
 | |
| 
 | |
| 		for curr != nil {
 | |
| 			next := curr.Next() // save reference to the next one in the list
 | |
| 
 | |
| 			w, _ := curr.Value.(*watcher)
 | |
| 
 | |
| 			originalPath := (e.Node.Key == nodePath)
 | |
| 			if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
 | |
| 				if !w.stream { // do not remove the stream watcher
 | |
| 					// if we successfully notify a watcher
 | |
| 					// we need to remove the watcher from the list
 | |
| 					// and decrease the counter
 | |
| 					w.removed = true
 | |
| 					l.Remove(curr)
 | |
| 					atomic.AddInt64(&wh.count, -1)
 | |
| 					reportWatcherRemoved()
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			curr = next // update current to the next element in the list
 | |
| 		}
 | |
| 
 | |
| 		if l.Len() == 0 {
 | |
| 			// if we have notified all watcher in the list
 | |
| 			// we can delete the list
 | |
| 			delete(wh.watchers, nodePath)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // clone function clones the watcherHub and return the cloned one.
 | |
| // only clone the static content. do not clone the current watchers.
 | |
| func (wh *watcherHub) clone() *watcherHub {
 | |
| 	clonedHistory := wh.EventHistory.clone()
 | |
| 
 | |
| 	return &watcherHub{
 | |
| 		EventHistory: clonedHistory,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // isHidden checks to see if key path is considered hidden to watch path i.e. the
 | |
| // last element is hidden or it's within a hidden directory
 | |
| func isHidden(watchPath, keyPath string) bool {
 | |
| 	// When deleting a directory, watchPath might be deeper than the actual keyPath
 | |
| 	// For example, when deleting /foo we also need to notify watchers on /foo/bar.
 | |
| 	if len(watchPath) > len(keyPath) {
 | |
| 		return false
 | |
| 	}
 | |
| 	// if watch path is just a "/", after path will start without "/"
 | |
| 	// add a "/" to deal with the special case when watchPath is "/"
 | |
| 	afterPath := path.Clean("/" + keyPath[len(watchPath):])
 | |
| 	return strings.Contains(afterPath, "/_")
 | |
| }
 | 
