diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index b7c9e23fa..f01586a4c 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -176,12 +176,12 @@ func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.R return default: log.Println(err) - http.Error(w, "Internal Server Error", 500) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } if err := encodeResponse(ctx, w, resp); err != nil { - http.Error(w, "Timeout while waiting for response", 504) + http.Error(w, "Timeout while waiting for response", http.StatusGatewayTimeout) return } } @@ -261,6 +261,8 @@ func parseUint64(s string) uint64 { return v } +// encodeResponse serializes the given etcdserver Response and writes the +// resulting JSON to the given ResponseWriter, utilizing the provided context func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver.Response) (err error) { var ev *store.Event switch { @@ -288,6 +290,7 @@ func encodeResponse(ctx context.Context, w http.ResponseWriter, resp etcdserver. return nil } +// waitForEvent waits for a given watcher to return its associated event. It returns a non-nil error if the given Context times out or the given ResponseWriter triggers a CloseNotify. func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) { // TODO(bmizerany): support streaming? defer wa.Remove() diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 799707e7a..a3b3fda47 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -6,6 +6,8 @@ import ( "net/http/httptest" "net/url" "reflect" + "strconv" + "sync" "testing" "time" @@ -71,3 +73,197 @@ func TestSet(t *testing.T) { } func stringp(s string) *string { return &s } + +// eventingWatcher immediately returns a simple event of the given action on its channel +type eventingWatcher struct { + action string +} + +func (w *eventingWatcher) EventChan() chan *store.Event { + ch := make(chan *store.Event) + go func() { + ch <- &store.Event{ + Action: w.action, + Node: &store.NodeExtern{}, + } + }() + return ch +} + +func (w *eventingWatcher) Remove() {} + +func TestEncodeResponse(t *testing.T) { + testCases := []struct { + ctx context.Context + resp etcdserver.Response + idx uint64 + code int + err error + }{ + // standard case, standard 200 response + { + context.Background(), + etcdserver.Response{ + Event: &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{}, + PrevNode: &store.NodeExtern{}, + }, + Watcher: nil, + }, + 0, + http.StatusOK, + nil, + }, + // check new nodes return StatusCreated + { + context.Background(), + etcdserver.Response{ + Event: &store.Event{ + Action: store.Create, + Node: &store.NodeExtern{}, + PrevNode: &store.NodeExtern{}, + }, + Watcher: nil, + }, + 0, + http.StatusCreated, + nil, + }, + { + context.Background(), + etcdserver.Response{ + Watcher: &eventingWatcher{store.Create}, + }, + 0, + http.StatusCreated, + nil, + }, + } + + for i, tt := range testCases { + rw := httptest.NewRecorder() + err := encodeResponse(tt.ctx, rw, tt.resp) + if err != tt.err { + t.Errorf("case %d: unexpected err: got %v, want %v", i, err, tt.err) + continue + } + + if gct := rw.Header().Get("Content-Type"); gct != "application/json" { + t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct) + } + + if gei := rw.Header().Get("X-Etcd-Index"); gei != strconv.Itoa(int(tt.idx)) { + t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %d", i, gei, tt.idx) + } + + if rw.Code != tt.code { + t.Errorf("case %d: bad response code: got %d, want %v", i, rw.Code, tt.code) + } + + } + +} + +type dummyWatcher struct { + echan chan *store.Event +} + +func (w *dummyWatcher) EventChan() chan *store.Event { + return w.echan +} +func (w *dummyWatcher) Remove() {} + +type dummyResponseWriter struct { + cnchan chan bool + http.ResponseWriter +} + +func (rw *dummyResponseWriter) CloseNotify() <-chan bool { + return rw.cnchan +} + +func TestWaitForEventChan(t *testing.T) { + ctx := context.Background() + ec := make(chan *store.Event) + dw := &dummyWatcher{ + echan: ec, + } + w := httptest.NewRecorder() + var wg sync.WaitGroup + var ev *store.Event + var err error + wg.Add(1) + go func() { + ev, err = waitForEvent(ctx, w, dw) + wg.Done() + }() + ec <- &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{ + Key: "/foo/bar", + ModifiedIndex: 12345, + }, + } + wg.Wait() + want := &store.Event{ + Action: store.Get, + Node: &store.NodeExtern{ + Key: "/foo/bar", + ModifiedIndex: 12345, + }, + } + if !reflect.DeepEqual(ev, want) { + t.Fatalf("bad event: got %#v, want %#v", ev, want) + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestWaitForEventCloseNotify(t *testing.T) { + ctx := context.Background() + dw := &dummyWatcher{} + cnchan := make(chan bool) + w := &dummyResponseWriter{ + cnchan: cnchan, + } + var wg sync.WaitGroup + var ev *store.Event + var err error + wg.Add(1) + go func() { + ev, err = waitForEvent(ctx, w, dw) + wg.Done() + }() + close(cnchan) + wg.Wait() + if ev != nil { + t.Fatalf("non-nil Event returned with CloseNotifier: %v", ev) + } + if err == nil { + t.Fatalf("nil err returned with CloseNotifier!") + } +} + +func TestWaitForEventCancelledContext(t *testing.T) { + cctx, cancel := context.WithCancel(context.Background()) + dw := &dummyWatcher{} + w := httptest.NewRecorder() + var wg sync.WaitGroup + var ev *store.Event + var err error + wg.Add(1) + go func() { + ev, err = waitForEvent(cctx, w, dw) + wg.Done() + }() + cancel() + wg.Wait() + if ev != nil { + t.Fatalf("non-nil Event returned with cancelled context: %v", ev) + } + if err == nil { + t.Fatalf("nil err returned with cancelled context!") + } +}