From 1c11f6a1449f02024e3ee5c0f64fcc9067d3245d Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Fri, 26 Sep 2014 11:34:11 -0700 Subject: [PATCH] *: expose etcd-index in watch requests This adds a StartIndex field to the Watcher interface, which represents the Etcd-Index at which the Watcher is created. Also refactors the HTTP tests to use a table for most handleWatch tests --- etcdserver/etcdhttp/http.go | 1 + etcdserver/etcdhttp/http_test.go | 213 +++++++++++++++---------------- etcdserver/server_test.go | 1 + store/store_test.go | 39 ++++-- store/watcher.go | 6 + store/watcher_hub.go | 1 + 6 files changed, 140 insertions(+), 121 deletions(-) diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index 7f7b5ca83..ebcbcc20a 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -352,6 +352,7 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s } w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex())) w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index())) w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term())) w.WriteHeader(http.StatusOK) diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 46ebfe7b4..4c41dc448 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -572,12 +572,14 @@ func TestWriteEvent(t *testing.T) { type dummyWatcher struct { echan chan *store.Event + sidx uint64 } func (w *dummyWatcher) EventChan() chan *store.Event { return w.echan } -func (w *dummyWatcher) Remove() {} +func (w *dummyWatcher) StartIndex() uint64 { return w.sidx } +func (w *dummyWatcher) Remove() {} func TestV2MachinesEndpoint(t *testing.T) { tests := []struct { @@ -997,75 +999,6 @@ func TestServeKeysWatch(t *testing.T) { } } -func TestHandleWatch(t *testing.T) { - rw := httptest.NewRecorder() - wa := &dummyWatcher{ - echan: make(chan *store.Event, 1), - } - wa.echan <- &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - } - - handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{}) - - wcode := http.StatusOK - wct := "application/json" - wri := "100" - wrt := "5" - wbody := mustMarshalEvent( - t, - &store.Event{ - Action: store.Get, - Node: &store.NodeExtern{}, - }, - ) - - if rw.Code != wcode { - t.Errorf("got code=%d, want %d", rw.Code, wcode) - } - h := rw.Header() - if ct := h.Get("Content-Type"); ct != wct { - t.Errorf("Content-Type=%q, want %q", ct, wct) - } - if ri := h.Get("X-Raft-Index"); ri != wri { - t.Errorf("X-Raft-Index=%q, want %q", ri, wri) - } - if rt := h.Get("X-Raft-Term"); rt != wrt { - t.Errorf("X-Raft-Term=%q, want %q", rt, wrt) - } - g := rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } -} - -func TestHandleWatchNoEvent(t *testing.T) { - rw := httptest.NewRecorder() - wa := &dummyWatcher{ - echan: make(chan *store.Event, 1), - } - close(wa.echan) - - handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{}) - - wcode := http.StatusOK - wct := "application/json" - wbody := "" - - if rw.Code != wcode { - t.Errorf("got code=%d, want %d", rw.Code, wcode) - } - h := rw.Header() - if ct := h.Get("Content-Type"); ct != wct { - t.Errorf("Content-Type=%q, want %q", ct, wct) - } - g := rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } -} - type recordingCloseNotifier struct { *httptest.ResponseRecorder cn chan bool @@ -1075,56 +1008,114 @@ func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool { return rcn.cn } -func TestHandleWatchCloseNotified(t *testing.T) { - rw := &recordingCloseNotifier{ - ResponseRecorder: httptest.NewRecorder(), - cn: make(chan bool, 1), +func TestHandleWatch(t *testing.T) { + defaultRwRr := func() (http.ResponseWriter, *httptest.ResponseRecorder) { + r := httptest.NewRecorder() + return r, r } - rw.cn <- true - wa := &dummyWatcher{} + noopEv := func(chan *store.Event) {} - handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{}) + tests := []struct { + getCtx func() context.Context + getRwRr func() (http.ResponseWriter, *httptest.ResponseRecorder) + doToChan func(chan *store.Event) - wcode := http.StatusOK - wct := "application/json" - wbody := "" + wbody string + }{ + { + // Normal case: one event + context.Background, + defaultRwRr, + func(ch chan *store.Event) { + ch <- &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + } + }, - if rw.Code != wcode { - t.Errorf("got code=%d, want %d", rw.Code, wcode) + mustMarshalEvent( + t, + &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + }, + ), + }, + { + // Channel is closed, no event + context.Background, + defaultRwRr, + func(ch chan *store.Event) { + close(ch) + }, + + "", + }, + { + // Simulate a timed-out context + func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }, + defaultRwRr, + noopEv, + + "", + }, + { + // Close-notifying request + context.Background, + func() (http.ResponseWriter, *httptest.ResponseRecorder) { + rw := &recordingCloseNotifier{ + ResponseRecorder: httptest.NewRecorder(), + cn: make(chan bool, 1), + } + rw.cn <- true + return rw, rw.ResponseRecorder + }, + noopEv, + + "", + }, } - h := rw.Header() - if ct := h.Get("Content-Type"); ct != wct { - t.Errorf("Content-Type=%q, want %q", ct, wct) - } - g := rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) - } -} -func TestHandleWatchTimeout(t *testing.T) { - rw := httptest.NewRecorder() - wa := &dummyWatcher{} - // Simulate a timed-out context - ctx, cancel := context.WithCancel(context.Background()) - cancel() + for i, tt := range tests { + rw, rr := tt.getRwRr() + wa := &dummyWatcher{ + echan: make(chan *store.Event, 1), + sidx: 10, + } + tt.doToChan(wa.echan) - handleWatch(ctx, rw, wa, false, dummyRaftTimer{}) + handleWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{}) - wcode := http.StatusOK - wct := "application/json" - wbody := "" + wcode := http.StatusOK + wct := "application/json" + wei := "10" + wri := "100" + wrt := "5" - if rw.Code != wcode { - t.Errorf("got code=%d, want %d", rw.Code, wcode) - } - h := rw.Header() - if ct := h.Get("Content-Type"); ct != wct { - t.Errorf("Content-Type=%q, want %q", ct, wct) - } - g := rw.Body.String() - if g != wbody { - t.Errorf("got body=%#v, want %#v", g, wbody) + if rr.Code != wcode { + t.Errorf("#%d: got code=%d, want %d", rr.Code, wcode) + } + h := rr.Header() + if ct := h.Get("Content-Type"); ct != wct { + t.Errorf("#%d: Content-Type=%q, want %q", i, ct, wct) + } + if ei := h.Get("X-Etcd-Index"); ei != wei { + t.Errorf("#%d: X-Etcd-Index=%q, want %q", i, ei, wei) + } + if ri := h.Get("X-Raft-Index"); ri != wri { + t.Errorf("#%d: X-Raft-Index=%q, want %q", i, ri, wri) + } + if rt := h.Get("X-Raft-Term"); rt != wrt { + t.Errorf("#%d: X-Raft-Term=%q, want %q", i, rt, wrt) + } + g := rr.Body.String() + if g != tt.wbody { + t.Errorf("#%d: got body=%#v, want %#v", i, g, tt.wbody) + } } } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a5de36451..3d38a1156 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1059,6 +1059,7 @@ func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) { type stubWatcher struct{} func (w *stubWatcher) EventChan() chan *store.Event { return nil } +func (w *stubWatcher) StartIndex() uint64 { return 0 } func (w *stubWatcher) Remove() {} // errStoreRecorder returns an store error on Get, Watch request diff --git a/store/store_test.go b/store/store_test.go index 4ac1baba3..280c6c7de 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -586,10 +586,12 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) { // Ensure that the store can watch for key creation. func TestStoreWatchCreate(t *testing.T) { s := newStore() - var eidx uint64 = 1 + var eidx uint64 = 0 w, _ := s.Watch("/foo", false, false, 0) c := w.EventChan() + assert.Equal(t, w.StartIndex(), eidx, "") s.Create("/foo", false, "bar", false, Permanent) + eidx = 1 e := nbselect(c) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "create", "") @@ -601,8 +603,10 @@ func TestStoreWatchCreate(t *testing.T) { // Ensure that the store can watch for recursive key creation. func TestStoreWatchRecursiveCreate(t *testing.T) { s := newStore() - var eidx uint64 = 1 + var eidx uint64 = 0 w, _ := s.Watch("/foo", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + eidx = 1 s.Create("/foo/bar", false, "baz", false, Permanent) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -613,9 +617,11 @@ func TestStoreWatchRecursiveCreate(t *testing.T) { // Ensure that the store can watch for key updates. func TestStoreWatchUpdate(t *testing.T) { s := newStore() - var eidx uint64 = 2 + var eidx uint64 = 1 s.Create("/foo", false, "bar", false, Permanent) w, _ := s.Watch("/foo", false, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + eidx = 2 s.Update("/foo", "baz", Permanent) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -626,9 +632,11 @@ func TestStoreWatchUpdate(t *testing.T) { // Ensure that the store can watch for recursive key updates. func TestStoreWatchRecursiveUpdate(t *testing.T) { s := newStore() - var eidx uint64 = 2 + var eidx uint64 = 1 s.Create("/foo/bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + eidx = 2 s.Update("/foo/bar", "baz", Permanent) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -639,9 +647,11 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) { // Ensure that the store can watch for key deletions. func TestStoreWatchDelete(t *testing.T) { s := newStore() - var eidx uint64 = 2 + var eidx uint64 = 1 s.Create("/foo", false, "bar", false, Permanent) w, _ := s.Watch("/foo", false, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + eidx = 2 s.Delete("/foo", false, false) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -652,9 +662,11 @@ func TestStoreWatchDelete(t *testing.T) { // Ensure that the store can watch for recursive key deletions. func TestStoreWatchRecursiveDelete(t *testing.T) { s := newStore() - var eidx uint64 = 2 + var eidx uint64 = 1 s.Create("/foo/bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + eidx = 2 s.Delete("/foo/bar", false, false) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -665,9 +677,11 @@ func TestStoreWatchRecursiveDelete(t *testing.T) { // Ensure that the store can watch for CAS updates. func TestStoreWatchCompareAndSwap(t *testing.T) { s := newStore() - var eidx uint64 = 2 + var eidx uint64 = 1 s.Create("/foo", false, "bar", false, Permanent) w, _ := s.Watch("/foo", false, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + eidx = 2 s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -678,9 +692,11 @@ func TestStoreWatchCompareAndSwap(t *testing.T) { // Ensure that the store can watch for recursive CAS updates. func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) { s := newStore() - var eidx uint64 = 2 + var eidx uint64 = 1 s.Create("/foo/bar", false, "baz", false, Permanent) w, _ := s.Watch("/foo", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") + eidx = 2 s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent) e := nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") @@ -698,22 +714,25 @@ func TestStoreWatchExpire(t *testing.T) { }() go mockSyncService(s.DeleteExpiredKeys, stopChan) - var eidx uint64 = 3 + var eidx uint64 = 2 s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond)) s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond)) w, _ := s.Watch("/", true, false, 0) + assert.Equal(t, w.StartIndex(), eidx, "") c := w.EventChan() e := nbselect(c) assert.Nil(t, e, "") time.Sleep(600 * time.Millisecond) + eidx = 3 e = nbselect(c) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foo", "") w, _ = s.Watch("/", true, false, 4) - e = nbselect(w.EventChan()) eidx = 4 + assert.Equal(t, w.StartIndex(), eidx, "") + e = nbselect(w.EventChan()) assert.Equal(t, e.EtcdIndex, eidx, "") assert.Equal(t, e.Action, "expire", "") assert.Equal(t, e.Node.Key, "/foofoo", "") diff --git a/store/watcher.go b/store/watcher.go index d6b2e6313..4a232a3b2 100644 --- a/store/watcher.go +++ b/store/watcher.go @@ -18,6 +18,7 @@ package store type Watcher interface { EventChan() chan *Event + StartIndex() uint64 // The EtcdIndex at which the Watcher was created Remove() } @@ -26,6 +27,7 @@ type watcher struct { stream bool recursive bool sinceIndex uint64 + startIndex uint64 hub *watcherHub removed bool remove func() @@ -35,6 +37,10 @@ func (w *watcher) EventChan() chan *Event { return w.eventChan } +func (w *watcher) StartIndex() uint64 { + return w.startIndex +} + // notify function notifies the watcher. If the watcher interests in the given path, // the function will return true. func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool { diff --git a/store/watcher_hub.go b/store/watcher_hub.go index eeeffb630..36612199d 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -51,6 +51,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde recursive: recursive, stream: stream, sinceIndex: index, + startIndex: storeIndex, hub: wh, }