Merge pull request #1187 from jonboulle/1187_watches_header

Watches do not return X-Etcd-Index header
This commit is contained in:
Jonathan Boulle 2014-10-03 10:13:25 -07:00
commit 1c4163faf8
6 changed files with 140 additions and 121 deletions

View File

@ -352,6 +352,7 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
w.WriteHeader(http.StatusOK)

View File

@ -572,12 +572,14 @@ func TestWriteEvent(t *testing.T) {
type dummyWatcher struct {
echan chan *store.Event
sidx uint64
}
func (w *dummyWatcher) EventChan() chan *store.Event {
return w.echan
}
func (w *dummyWatcher) Remove() {}
func (w *dummyWatcher) StartIndex() uint64 { return w.sidx }
func (w *dummyWatcher) Remove() {}
func TestV2MachinesEndpoint(t *testing.T) {
tests := []struct {
@ -997,75 +999,6 @@ func TestServeKeysWatch(t *testing.T) {
}
}
func TestHandleWatch(t *testing.T) {
rw := httptest.NewRecorder()
wa := &dummyWatcher{
echan: make(chan *store.Event, 1),
}
wa.echan <- &store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
}
handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
wcode := http.StatusOK
wct := "application/json"
wri := "100"
wrt := "5"
wbody := mustMarshalEvent(
t,
&store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
},
)
if rw.Code != wcode {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
h := rw.Header()
if ct := h.Get("Content-Type"); ct != wct {
t.Errorf("Content-Type=%q, want %q", ct, wct)
}
if ri := h.Get("X-Raft-Index"); ri != wri {
t.Errorf("X-Raft-Index=%q, want %q", ri, wri)
}
if rt := h.Get("X-Raft-Term"); rt != wrt {
t.Errorf("X-Raft-Term=%q, want %q", rt, wrt)
}
g := rw.Body.String()
if g != wbody {
t.Errorf("got body=%#v, want %#v", g, wbody)
}
}
func TestHandleWatchNoEvent(t *testing.T) {
rw := httptest.NewRecorder()
wa := &dummyWatcher{
echan: make(chan *store.Event, 1),
}
close(wa.echan)
handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
wcode := http.StatusOK
wct := "application/json"
wbody := ""
if rw.Code != wcode {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
h := rw.Header()
if ct := h.Get("Content-Type"); ct != wct {
t.Errorf("Content-Type=%q, want %q", ct, wct)
}
g := rw.Body.String()
if g != wbody {
t.Errorf("got body=%#v, want %#v", g, wbody)
}
}
type recordingCloseNotifier struct {
*httptest.ResponseRecorder
cn chan bool
@ -1075,56 +1008,114 @@ func (rcn *recordingCloseNotifier) CloseNotify() <-chan bool {
return rcn.cn
}
func TestHandleWatchCloseNotified(t *testing.T) {
rw := &recordingCloseNotifier{
ResponseRecorder: httptest.NewRecorder(),
cn: make(chan bool, 1),
func TestHandleWatch(t *testing.T) {
defaultRwRr := func() (http.ResponseWriter, *httptest.ResponseRecorder) {
r := httptest.NewRecorder()
return r, r
}
rw.cn <- true
wa := &dummyWatcher{}
noopEv := func(chan *store.Event) {}
handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
tests := []struct {
getCtx func() context.Context
getRwRr func() (http.ResponseWriter, *httptest.ResponseRecorder)
doToChan func(chan *store.Event)
wcode := http.StatusOK
wct := "application/json"
wbody := ""
wbody string
}{
{
// Normal case: one event
context.Background,
defaultRwRr,
func(ch chan *store.Event) {
ch <- &store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
}
},
if rw.Code != wcode {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
mustMarshalEvent(
t,
&store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
},
),
},
{
// Channel is closed, no event
context.Background,
defaultRwRr,
func(ch chan *store.Event) {
close(ch)
},
"",
},
{
// Simulate a timed-out context
func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
},
defaultRwRr,
noopEv,
"",
},
{
// Close-notifying request
context.Background,
func() (http.ResponseWriter, *httptest.ResponseRecorder) {
rw := &recordingCloseNotifier{
ResponseRecorder: httptest.NewRecorder(),
cn: make(chan bool, 1),
}
rw.cn <- true
return rw, rw.ResponseRecorder
},
noopEv,
"",
},
}
h := rw.Header()
if ct := h.Get("Content-Type"); ct != wct {
t.Errorf("Content-Type=%q, want %q", ct, wct)
}
g := rw.Body.String()
if g != wbody {
t.Errorf("got body=%#v, want %#v", g, wbody)
}
}
func TestHandleWatchTimeout(t *testing.T) {
rw := httptest.NewRecorder()
wa := &dummyWatcher{}
// Simulate a timed-out context
ctx, cancel := context.WithCancel(context.Background())
cancel()
for i, tt := range tests {
rw, rr := tt.getRwRr()
wa := &dummyWatcher{
echan: make(chan *store.Event, 1),
sidx: 10,
}
tt.doToChan(wa.echan)
handleWatch(ctx, rw, wa, false, dummyRaftTimer{})
handleWatch(tt.getCtx(), rw, wa, false, dummyRaftTimer{})
wcode := http.StatusOK
wct := "application/json"
wbody := ""
wcode := http.StatusOK
wct := "application/json"
wei := "10"
wri := "100"
wrt := "5"
if rw.Code != wcode {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
h := rw.Header()
if ct := h.Get("Content-Type"); ct != wct {
t.Errorf("Content-Type=%q, want %q", ct, wct)
}
g := rw.Body.String()
if g != wbody {
t.Errorf("got body=%#v, want %#v", g, wbody)
if rr.Code != wcode {
t.Errorf("#%d: got code=%d, want %d", rr.Code, wcode)
}
h := rr.Header()
if ct := h.Get("Content-Type"); ct != wct {
t.Errorf("#%d: Content-Type=%q, want %q", i, ct, wct)
}
if ei := h.Get("X-Etcd-Index"); ei != wei {
t.Errorf("#%d: X-Etcd-Index=%q, want %q", i, ei, wei)
}
if ri := h.Get("X-Raft-Index"); ri != wri {
t.Errorf("#%d: X-Raft-Index=%q, want %q", i, ri, wri)
}
if rt := h.Get("X-Raft-Term"); rt != wrt {
t.Errorf("#%d: X-Raft-Term=%q, want %q", i, rt, wrt)
}
g := rr.Body.String()
if g != tt.wbody {
t.Errorf("#%d: got body=%#v, want %#v", i, g, tt.wbody)
}
}
}

View File

@ -1059,6 +1059,7 @@ func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
type stubWatcher struct{}
func (w *stubWatcher) EventChan() chan *store.Event { return nil }
func (w *stubWatcher) StartIndex() uint64 { return 0 }
func (w *stubWatcher) Remove() {}
// errStoreRecorder returns an store error on Get, Watch request

View File

@ -586,10 +586,12 @@ func TestStoreCompareAndSwapPrevIndexFailsIfNotMatch(t *testing.T) {
// Ensure that the store can watch for key creation.
func TestStoreWatchCreate(t *testing.T) {
s := newStore()
var eidx uint64 = 1
var eidx uint64 = 0
w, _ := s.Watch("/foo", false, false, 0)
c := w.EventChan()
assert.Equal(t, w.StartIndex(), eidx, "")
s.Create("/foo", false, "bar", false, Permanent)
eidx = 1
e := nbselect(c)
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "create", "")
@ -601,8 +603,10 @@ func TestStoreWatchCreate(t *testing.T) {
// Ensure that the store can watch for recursive key creation.
func TestStoreWatchRecursiveCreate(t *testing.T) {
s := newStore()
var eidx uint64 = 1
var eidx uint64 = 0
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -613,9 +617,11 @@ func TestStoreWatchRecursiveCreate(t *testing.T) {
// Ensure that the store can watch for key updates.
func TestStoreWatchUpdate(t *testing.T) {
s := newStore()
var eidx uint64 = 2
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
w, _ := s.Watch("/foo", false, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.Update("/foo", "baz", Permanent)
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -626,9 +632,11 @@ func TestStoreWatchUpdate(t *testing.T) {
// Ensure that the store can watch for recursive key updates.
func TestStoreWatchRecursiveUpdate(t *testing.T) {
s := newStore()
var eidx uint64 = 2
var eidx uint64 = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.Update("/foo/bar", "baz", Permanent)
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -639,9 +647,11 @@ func TestStoreWatchRecursiveUpdate(t *testing.T) {
// Ensure that the store can watch for key deletions.
func TestStoreWatchDelete(t *testing.T) {
s := newStore()
var eidx uint64 = 2
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
w, _ := s.Watch("/foo", false, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.Delete("/foo", false, false)
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -652,9 +662,11 @@ func TestStoreWatchDelete(t *testing.T) {
// Ensure that the store can watch for recursive key deletions.
func TestStoreWatchRecursiveDelete(t *testing.T) {
s := newStore()
var eidx uint64 = 2
var eidx uint64 = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.Delete("/foo/bar", false, false)
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -665,9 +677,11 @@ func TestStoreWatchRecursiveDelete(t *testing.T) {
// Ensure that the store can watch for CAS updates.
func TestStoreWatchCompareAndSwap(t *testing.T) {
s := newStore()
var eidx uint64 = 2
var eidx uint64 = 1
s.Create("/foo", false, "bar", false, Permanent)
w, _ := s.Watch("/foo", false, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.CompareAndSwap("/foo", "bar", 0, "baz", Permanent)
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -678,9 +692,11 @@ func TestStoreWatchCompareAndSwap(t *testing.T) {
// Ensure that the store can watch for recursive CAS updates.
func TestStoreWatchRecursiveCompareAndSwap(t *testing.T) {
s := newStore()
var eidx uint64 = 2
var eidx uint64 = 1
s.Create("/foo/bar", false, "baz", false, Permanent)
w, _ := s.Watch("/foo", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
eidx = 2
s.CompareAndSwap("/foo/bar", "baz", 0, "bat", Permanent)
e := nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
@ -698,22 +714,25 @@ func TestStoreWatchExpire(t *testing.T) {
}()
go mockSyncService(s.DeleteExpiredKeys, stopChan)
var eidx uint64 = 3
var eidx uint64 = 2
s.Create("/foo", false, "bar", false, time.Now().Add(500*time.Millisecond))
s.Create("/foofoo", false, "barbarbar", false, time.Now().Add(500*time.Millisecond))
w, _ := s.Watch("/", true, false, 0)
assert.Equal(t, w.StartIndex(), eidx, "")
c := w.EventChan()
e := nbselect(c)
assert.Nil(t, e, "")
time.Sleep(600 * time.Millisecond)
eidx = 3
e = nbselect(c)
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Node.Key, "/foo", "")
w, _ = s.Watch("/", true, false, 4)
e = nbselect(w.EventChan())
eidx = 4
assert.Equal(t, w.StartIndex(), eidx, "")
e = nbselect(w.EventChan())
assert.Equal(t, e.EtcdIndex, eidx, "")
assert.Equal(t, e.Action, "expire", "")
assert.Equal(t, e.Node.Key, "/foofoo", "")

View File

@ -18,6 +18,7 @@ package store
type Watcher interface {
EventChan() chan *Event
StartIndex() uint64 // The EtcdIndex at which the Watcher was created
Remove()
}
@ -26,6 +27,7 @@ type watcher struct {
stream bool
recursive bool
sinceIndex uint64
startIndex uint64
hub *watcherHub
removed bool
remove func()
@ -35,6 +37,10 @@ func (w *watcher) EventChan() chan *Event {
return w.eventChan
}
func (w *watcher) StartIndex() uint64 {
return w.startIndex
}
// notify function notifies the watcher. If the watcher interests in the given path,
// the function will return true.
func (w *watcher) notify(e *Event, originalPath bool, deleted bool) bool {

View File

@ -51,6 +51,7 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
recursive: recursive,
stream: stream,
sinceIndex: index,
startIndex: storeIndex,
hub: wh,
}