diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index ca1b3bb6c..b6bc18552 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -292,7 +292,7 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error { return errors.New("cannot write empty Event!") } w.Header().Set("Content-Type", "application/json") - w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index())) + w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex)) if ev.IsCreated() { w.WriteHeader(http.StatusCreated) diff --git a/store/event.go b/store/event.go index 5d702ec3b..a351ff723 100644 --- a/store/event.go +++ b/store/event.go @@ -12,9 +12,10 @@ const ( ) type Event struct { - Action string `json:"action"` - Node *NodeExtern `json:"node,omitempty"` - PrevNode *NodeExtern `json:"prevNode,omitempty"` + Action string `json:"action"` + Node *NodeExtern `json:"node,omitempty"` + PrevNode *NodeExtern `json:"prevNode,omitempty"` + EtcdIndex uint64 `json:"-"` } func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event { diff --git a/store/store.go b/store/store.go index f537f352f..b5ffcb19d 100644 --- a/store/store.go +++ b/store/store.go @@ -114,6 +114,7 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) { } e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex) + e.EtcdIndex = s.CurrentIndex e.Node.loadInternalNode(n, recursive, sorted) s.Stats.Inc(GetSuccess) @@ -130,6 +131,7 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp e, err := s.internalCreate(nodePath, dir, value, unique, false, expireTime, Create) if err == nil { + e.EtcdIndex = s.CurrentIndex s.WatcherHub.notify(e) s.Stats.Inc(CreateSuccess) } else { @@ -166,6 +168,7 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim if err != nil { return nil, err } + e.EtcdIndex = s.CurrentIndex // Put prevNode into event if getErr == nil { @@ -227,6 +230,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint s.CurrentIndex++ e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex) + e.EtcdIndex = s.CurrentIndex e.PrevNode = n.Repr(false, false) eNode := e.Node @@ -241,6 +245,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint s.WatcherHub.notify(e) s.Stats.Inc(CompareAndSwapSuccess) + return e, nil } @@ -270,6 +275,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { nextIndex := s.CurrentIndex + 1 e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex) + e.EtcdIndex = s.CurrentIndex e.PrevNode = n.Repr(false, false) eNode := e.Node @@ -329,6 +335,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui s.CurrentIndex++ e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex) + e.EtcdIndex = s.CurrentIndex e.PrevNode = n.Repr(false, false) callback := func(path string) { // notify function @@ -341,6 +348,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui s.WatcherHub.notify(e) s.Stats.Inc(CompareAndDeleteSuccess) + return e, nil } @@ -349,22 +357,12 @@ func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Wa defer s.worldLock.RUnlock() key = path.Clean(path.Join("/", key)) - nextIndex := s.CurrentIndex + 1 - - var w Watcher - var err *etcdErr.Error - if sinceIndex == 0 { - w, err = s.WatcherHub.watch(key, recursive, stream, nextIndex) - - } else { - w, err = s.WatcherHub.watch(key, recursive, stream, sinceIndex) + sinceIndex = s.CurrentIndex + 1 } - + // WatchHub does not know about the current index, so we need to pass it in + w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex) if err != nil { - // watchhub do not know the current Index - // we need to attach the currentIndex here - err.Index = s.CurrentIndex return nil, err } @@ -416,6 +414,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( } e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex) + e.EtcdIndex = s.CurrentIndex e.PrevNode = n.Repr(false, false) eNode := e.Node @@ -569,6 +568,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) { s.CurrentIndex++ e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex) + e.EtcdIndex = s.CurrentIndex e.PrevNode = node.Repr(false, false) callback := func(path string) { // notify function diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 6db9ab661..eeeffb630 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -38,10 +38,11 @@ func newWatchHub(capacity int) *watcherHub { // If recursive is true, the first change after index under key will be sent to the event channel of the watcher. // If recursive is false, the first change after index at key will be sent to the event channel of the watcher. // If index is zero, watch will start from the current index + 1. -func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (Watcher, *etcdErr.Error) { +func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) { event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { + err.Index = storeIndex return nil, err } @@ -53,7 +54,9 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index uint64) (W hub: wh, } + // If the event exists in the known history, append the EtcdIndex and return immediately if event != nil { + event.EtcdIndex = storeIndex w.eventChan <- event return w, nil } diff --git a/store/watcher_test.go b/store/watcher_test.go index baea41953..139d89685 100644 --- a/store/watcher_test.go +++ b/store/watcher_test.go @@ -23,7 +23,7 @@ import ( func TestWatcher(t *testing.T) { s := newStore() wh := s.WatcherHub - w, err := wh.watch("/foo", true, false, 1) + w, err := wh.watch("/foo", true, false, 1, 1) if err != nil { t.Fatalf("%v", err) } @@ -46,7 +46,7 @@ func TestWatcher(t *testing.T) { t.Fatal("recv != send") } - w, _ = wh.watch("/foo", false, false, 2) + w, _ = wh.watch("/foo", false, false, 2, 1) c = w.EventChan() e = newEvent(Create, "/foo/bar", 2, 2) @@ -71,7 +71,7 @@ func TestWatcher(t *testing.T) { } // ensure we are doing exact matching rather than prefix matching - w, _ = wh.watch("/fo", true, false, 1) + w, _ = wh.watch("/fo", true, false, 1, 1) c = w.EventChan() select {