etcdserver: correct X-Etcd-Index header

This adds an EtcdIndex field to store.Event and uses that as the header
instead of the node's modifiedIndex. To facilitate this in a non-racy
way, we set the EtcdIndex while holding the lock.
This commit is contained in:
Jonathan Boulle 2014-09-17 16:20:34 -07:00
parent ceab948831
commit 5441c6aa54
5 changed files with 25 additions and 21 deletions

View File

@ -292,7 +292,7 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
return errors.New("cannot write empty Event!")
}
w.Header().Set("Content-Type", "application/json")
w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index()))
w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
if ev.IsCreated() {
w.WriteHeader(http.StatusCreated)

View File

@ -12,9 +12,10 @@ const (
)
type Event struct {
Action string `json:"action"`
Node *NodeExtern `json:"node,omitempty"`
PrevNode *NodeExtern `json:"prevNode,omitempty"`
Action string `json:"action"`
Node *NodeExtern `json:"node,omitempty"`
PrevNode *NodeExtern `json:"prevNode,omitempty"`
EtcdIndex uint64 `json:"-"`
}
func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {

View File

@ -114,6 +114,7 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
}
e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.Node.loadInternalNode(n, recursive, sorted)
s.Stats.Inc(GetSuccess)
@ -130,6 +131,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create)
if err == nil {
e.EtcdIndex = s.CurrentIndex
s.WatcherHub.notify(e)
s.Stats.Inc(CreateSuccess)
} else {
@ -166,6 +168,7 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
if err != nil {
return nil, err
}
e.EtcdIndex = s.CurrentIndex
// Put prevNode into event
if getErr == nil {
@ -227,6 +230,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
s.CurrentIndex++
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = n.Repr(false, false)
eNode := e.Node
@ -241,6 +245,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndSwapSuccess)
return e, nil
}
@ -270,6 +275,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
nextIndex := s.CurrentIndex + 1
e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = n.Repr(false, false)
eNode := e.Node
@ -329,6 +335,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
s.CurrentIndex++
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = n.Repr(false, false)
callback := func(path string) { // notify function
@ -341,6 +348,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
s.WatcherHub.notify(e)
s.Stats.Inc(CompareAndDeleteSuccess)
return e, nil
}
@ -349,22 +357,12 @@ func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Wa
defer s.worldLock.RUnlock()
key = path.Clean(path.Join("/", key))
nextIndex := s.CurrentIndex + 1
var w Watcher
var err *etcdErr.Error
if sinceIndex == 0 {
w, err = s.WatcherHub.watch(key, recursive, stream, nextIndex)
} else {
w, err = s.WatcherHub.watch(key, recursive, stream, sinceIndex)
sinceIndex = s.CurrentIndex + 1
}
// WatchHub does not know about the current index, so we need to pass it in
w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
if err != nil {
// watchhub do not know the current Index
// we need to attach the currentIndex here
err.Index = s.CurrentIndex
return nil, err
}
@ -416,6 +414,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
}
e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = n.Repr(false, false)
eNode := e.Node
@ -569,6 +568,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
s.CurrentIndex++
e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = node.Repr(false, false)
callback := func(path string) { // notify function

View File

@ -38,10 +38,11 @@ func newWatchHub(capacity int) *watcherHub {
// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
// If index is zero, watch will start from the current index + 1.
func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (Watcher, *etcdErr.Error) {
func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) {
event, err := wh.EventHistory.scan(key, recursive, index)
if err != nil {
err.Index = storeIndex
return nil, err
}
@ -53,7 +54,9 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (W
hub: wh,
}
// If the event exists in the known history, append the EtcdIndex and return immediately
if event != nil {
event.EtcdIndex = storeIndex
w.eventChan <- event
return w, nil
}

View File

@ -23,7 +23,7 @@ import (
func TestWatcher(t *testing.T) {
s := newStore()
wh := s.WatcherHub
w, err := wh.watch("/foo", true, false, 1)
w, err := wh.watch("/foo", true, false, 1, 1)
if err != nil {
t.Fatalf("%v", err)
}
@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) {
t.Fatal("recv != send")
}
w, _ = wh.watch("/foo", false, false, 2)
w, _ = wh.watch("/foo", false, false, 2, 1)
c = w.EventChan()
e = newEvent(Create, "/foo/bar", 2, 2)
@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) {
}
// ensure we are doing exact matching rather than prefix matching
w, _ = wh.watch("/fo", true, false, 1)
w, _ = wh.watch("/fo", true, false, 1, 1)
c = w.EventChan()
select {