client: do not send previous node data (optional)

- Do not send back node data when specified
- remove node and prevNode when noDataOnSuccess is set
This commit is contained in:
Michael Fraenkel 2015-11-20 15:13:30 -05:00 committed by Gyu-Ho Lee
parent 9e9bbb829e
commit 82053f04b2
5 changed files with 216 additions and 61 deletions

View File

@ -191,6 +191,10 @@ type SetOptions struct {
// Dir specifies whether or not this Node should be created as a directory.
Dir bool
// NoDataOnSuccess specifies whether the response contains the current value of the Node.
// If set, the response will only contain the current value when the request fails.
NoDataOnSuccess bool
}
type GetOptions struct {
@ -335,6 +339,7 @@ func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions
act.TTL = opts.TTL
act.Refresh = opts.Refresh
act.Dir = opts.Dir
act.NoDataOnSuccess = opts.NoDataOnSuccess
}
doCtx := ctx
@ -523,15 +528,16 @@ func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
}
type setAction struct {
Prefix string
Key string
Value string
PrevValue string
PrevIndex uint64
PrevExist PrevExistType
TTL time.Duration
Refresh bool
Dir bool
Prefix string
Key string
Value string
PrevValue string
PrevIndex uint64
PrevExist PrevExistType
TTL time.Duration
Refresh bool
Dir bool
NoDataOnSuccess bool
}
func (a *setAction) HTTPRequest(ep url.URL) *http.Request {
@ -565,6 +571,9 @@ func (a *setAction) HTTPRequest(ep url.URL) *http.Request {
if a.Refresh {
form.Add("refresh", "true")
}
if a.NoDataOnSuccess {
params.Set("noDataOnSuccess", strconv.FormatBool(a.NoDataOnSuccess))
}
u.RawQuery = params.Encode()
body := strings.NewReader(form.Encode())

View File

@ -407,6 +407,15 @@ func TestSetAction(t *testing.T) {
wantURL: "http://example.com/foo?dir=true",
wantBody: "",
},
// DataOnFailure is set
{
act: setAction{
Key: "foo",
NoDataOnSuccess: true,
},
wantURL: "http://example.com/foo?noDataOnSuccess=true",
wantBody: "value=",
},
}
for i, tt := range tests {

View File

@ -153,7 +153,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer cancel()
clock := clockwork.NewRealClock()
startTime := clock.Now()
rr, err := parseKeyRequest(r, clock)
rr, noDataOnSuccess, err := parseKeyRequest(r, clock)
if err != nil {
writeKeyError(w, err)
return
@ -175,7 +175,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
switch {
case resp.Event != nil:
if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
if err := writeKeyEvent(w, resp.Event, noDataOnSuccess, h.timer); err != nil {
// Should never be reached
plog.Errorf("error writing event (%v)", err)
}
@ -449,19 +449,20 @@ func logHandleFunc(w http.ResponseWriter, r *http.Request) {
// parseKeyRequest converts a received http.Request on keysPrefix to
// a server Request, performing validation of supplied fields as appropriate.
// If any validation fails, an empty Request and non-nil error is returned.
func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, error) {
func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, bool, error) {
noDataOnSuccess := false
emptyReq := etcdserverpb.Request{}
err := r.ParseForm()
if err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidForm,
err.Error(),
)
}
if !strings.HasPrefix(r.URL.Path, keysPrefix) {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidForm,
"incorrect key prefix",
)
@ -470,13 +471,13 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
var pIdx, wIdx uint64
if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeIndexNaN,
`invalid value for "prevIndex"`,
)
}
if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeIndexNaN,
`invalid value for "waitIndex"`,
)
@ -484,45 +485,45 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
var rec, sort, wait, dir, quorum, stream bool
if rec, err = getBool(r.Form, "recursive"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "recursive"`,
)
}
if sort, err = getBool(r.Form, "sorted"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "sorted"`,
)
}
if wait, err = getBool(r.Form, "wait"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "wait"`,
)
}
// TODO(jonboulle): define what parameters dir is/isn't compatible with?
if dir, err = getBool(r.Form, "dir"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "dir"`,
)
}
if quorum, err = getBool(r.Form, "quorum"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "quorum"`,
)
}
if stream, err = getBool(r.Form, "stream"); err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "stream"`,
)
}
if wait && r.Method != "GET" {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`"wait" can only be used with GET requests`,
)
@ -530,19 +531,26 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
pV := r.FormValue("prevValue")
if _, ok := r.Form["prevValue"]; ok && pV == "" {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodePrevValueRequired,
`"prevValue" cannot be empty`,
)
}
if noDataOnSuccess, err = getBool(r.Form, "noDataOnSuccess"); err != nil {
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "noDataOnSuccess"`,
)
}
// TTL is nullable, so leave it null if not specified
// or an empty string
var ttl *uint64
if len(r.FormValue("ttl")) > 0 {
i, err := getUint64(r.Form, "ttl")
if err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeTTLNaN,
`invalid value for "ttl"`,
)
@ -555,7 +563,7 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
if _, ok := r.Form["prevExist"]; ok {
bv, err := getBool(r.Form, "prevExist")
if err != nil {
return emptyReq, etcdErr.NewRequestError(
return emptyReq, false, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
"invalid value for prevExist",
)
@ -621,13 +629,13 @@ func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Reque
rr.Expiration = clock.Now().Add(expr).UnixNano()
}
return rr, nil
return rr, noDataOnSuccess, nil
}
// writeKeyEvent trims the prefix of key path in a single Event under
// StoreKeysPrefix, serializes it and writes the resulting JSON to the given
// ResponseWriter, along with the appropriate headers.
func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
func writeKeyEvent(w http.ResponseWriter, ev *store.Event, noDataOnSuccess bool, rt etcdserver.RaftTimer) error {
if ev == nil {
return errors.New("cannot write empty Event!")
}
@ -641,6 +649,12 @@ func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTim
}
ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
if noDataOnSuccess &&
(ev.Action == store.Set || ev.Action == store.CompareAndSwap ||
ev.Action == store.Create || ev.Action == store.Update) {
ev.Node = nil
ev.PrevNode = nil
}
return json.NewEncoder(w).Encode(ev)
}

View File

@ -360,7 +360,7 @@ func TestBadParseRequest(t *testing.T) {
},
}
for i, tt := range tests {
got, err := parseKeyRequest(tt.in, clockwork.NewFakeClock())
got, _, err := parseKeyRequest(tt.in, clockwork.NewFakeClock())
if err == nil {
t.Errorf("#%d: unexpected nil error!", i)
continue
@ -384,8 +384,9 @@ func TestGoodParseRequest(t *testing.T) {
fc := clockwork.NewFakeClock()
fc.Advance(1111)
tests := []struct {
in *http.Request
w etcdserverpb.Request
in *http.Request
w etcdserverpb.Request
noData bool
}{
{
// good prefix, all other values default
@ -394,6 +395,7 @@ func TestGoodParseRequest(t *testing.T) {
Method: "GET",
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// value specified
@ -407,6 +409,7 @@ func TestGoodParseRequest(t *testing.T) {
Val: "some_value",
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// prevIndex specified
@ -420,6 +423,7 @@ func TestGoodParseRequest(t *testing.T) {
PrevIndex: 98765,
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// recursive specified
@ -433,6 +437,7 @@ func TestGoodParseRequest(t *testing.T) {
Recursive: true,
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// sorted specified
@ -446,6 +451,7 @@ func TestGoodParseRequest(t *testing.T) {
Sorted: true,
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// quorum specified
@ -459,6 +465,7 @@ func TestGoodParseRequest(t *testing.T) {
Quorum: true,
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// wait specified
@ -468,6 +475,7 @@ func TestGoodParseRequest(t *testing.T) {
Wait: true,
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// empty TTL specified
@ -477,6 +485,7 @@ func TestGoodParseRequest(t *testing.T) {
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
Expiration: 0,
},
false,
},
{
// non-empty TTL specified
@ -486,6 +495,7 @@ func TestGoodParseRequest(t *testing.T) {
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
Expiration: fc.Now().Add(5678 * time.Second).UnixNano(),
},
false,
},
{
// zero TTL specified
@ -495,6 +505,7 @@ func TestGoodParseRequest(t *testing.T) {
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
Expiration: fc.Now().UnixNano(),
},
false,
},
{
// dir specified
@ -504,6 +515,7 @@ func TestGoodParseRequest(t *testing.T) {
Dir: true,
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// dir specified negatively
@ -513,6 +525,7 @@ func TestGoodParseRequest(t *testing.T) {
Dir: false,
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// prevExist should be non-null if specified
@ -526,6 +539,7 @@ func TestGoodParseRequest(t *testing.T) {
PrevExist: boolp(true),
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// prevExist should be non-null if specified
@ -539,6 +553,7 @@ func TestGoodParseRequest(t *testing.T) {
PrevExist: boolp(false),
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
// mix various fields
{
@ -558,6 +573,7 @@ func TestGoodParseRequest(t *testing.T) {
Val: "some value",
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
// query parameters should be used if given
{
@ -571,6 +587,7 @@ func TestGoodParseRequest(t *testing.T) {
PrevValue: "woof",
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
// but form values should take precedence over query parameters
{
@ -586,14 +603,33 @@ func TestGoodParseRequest(t *testing.T) {
PrevValue: "miaow",
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
false,
},
{
// noDataOnSuccess specified
mustNewForm(
t,
"foo",
url.Values{"noDataOnSuccess": []string{"true"}},
),
etcdserverpb.Request{
Method: "PUT",
Path: path.Join(etcdserver.StoreKeysPrefix, "/foo"),
},
true,
},
}
for i, tt := range tests {
got, err := parseKeyRequest(tt.in, fc)
got, noDataOnFailure, err := parseKeyRequest(tt.in, fc)
if err != nil {
t.Errorf("#%d: err = %v, want %v", i, err, nil)
}
if noDataOnFailure != tt.noData {
t.Errorf("#%d: noData=%t, want %t", i, noDataOnFailure, tt.noData)
}
if !reflect.DeepEqual(got, tt.w) {
t.Errorf("#%d: request=%#v, want %#v", i, got, tt.w)
}
@ -1112,7 +1148,7 @@ func TestServeMembersFail(t *testing.T) {
func TestWriteEvent(t *testing.T) {
// nil event should not panic
rec := httptest.NewRecorder()
writeKeyEvent(rec, nil, dummyRaftTimer{})
writeKeyEvent(rec, nil, false, dummyRaftTimer{})
h := rec.Header()
if len(h) > 0 {
t.Fatalf("unexpected non-empty headers: %#v", h)
@ -1123,8 +1159,9 @@ func TestWriteEvent(t *testing.T) {
}
tests := []struct {
ev *store.Event
idx string
ev *store.Event
noData bool
idx string
// TODO(jonboulle): check body as well as just status code
code int
err error
@ -1136,6 +1173,7 @@ func TestWriteEvent(t *testing.T) {
Node: &store.NodeExtern{},
PrevNode: &store.NodeExtern{},
},
false,
"0",
http.StatusOK,
nil,
@ -1147,6 +1185,7 @@ func TestWriteEvent(t *testing.T) {
Node: &store.NodeExtern{},
PrevNode: &store.NodeExtern{},
},
false,
"0",
http.StatusCreated,
nil,
@ -1155,7 +1194,7 @@ func TestWriteEvent(t *testing.T) {
for i, tt := range tests {
rw := httptest.NewRecorder()
writeKeyEvent(rw, tt.ev, dummyRaftTimer{})
writeKeyEvent(rw, tt.ev, tt.noData, dummyRaftTimer{})
if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
}
@ -1550,45 +1589,76 @@ func TestServeKeysGood(t *testing.T) {
}
func TestServeKeysEvent(t *testing.T) {
req := mustNewRequest(t, "foo")
server := &resServer{
etcdserver.Response{
Event: &store.Event{
tests := []struct {
req *http.Request
rsp etcdserver.Response
wcode int
event *store.Event
}{
{
mustNewRequest(t, "foo"),
etcdserver.Response{
Event: &store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
},
},
http.StatusOK,
&store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
},
},
{
mustNewForm(
t,
"foo",
url.Values{"noDataOnSuccess": []string{"true"}},
),
etcdserver.Response{
Event: &store.Event{
Action: store.CompareAndSwap,
Node: &store.NodeExtern{},
},
},
http.StatusOK,
&store.Event{
Action: store.CompareAndSwap,
Node: nil,
},
},
}
server := &resServer{}
h := &keysHandler{
timeout: time.Hour,
server: server,
cluster: &fakeCluster{id: 1},
timer: &dummyRaftTimer{},
}
rw := httptest.NewRecorder()
h.ServeHTTP(rw, req)
for _, tt := range tests {
server.res = tt.rsp
rw := httptest.NewRecorder()
h.ServeHTTP(rw, tt.req)
wcode := http.StatusOK
wbody := mustMarshalEvent(
t,
&store.Event{
Action: store.Get,
Node: &store.NodeExtern{},
},
)
wbody := mustMarshalEvent(
t,
tt.event,
)
if rw.Code != wcode {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
g := rw.Body.String()
if g != wbody {
t.Errorf("got body=%#v, want %#v", g, wbody)
if rw.Code != tt.wcode {
t.Errorf("got code=%d, want %d", rw.Code, tt.wcode)
}
gcid := rw.Header().Get("X-Etcd-Cluster-ID")
wcid := h.cluster.ID().String()
if gcid != wcid {
t.Errorf("cid = %s, want %s", gcid, wcid)
}
g := rw.Body.String()
if g != wbody {
t.Errorf("got body=%#v, want %#v", g, wbody)
}
}
}

View File

@ -45,6 +45,9 @@ func TestV2Set(t *testing.T) {
tc := NewTestClient()
v := url.Values{}
v.Set("value", "bar")
vAndNoData := url.Values{}
vAndNoData.Set("value", "bar")
vAndNoData.Set("noDataOnSuccess", "true")
tests := []struct {
relativeURL string
@ -70,6 +73,12 @@ func TestV2Set(t *testing.T) {
http.StatusCreated,
`{"action":"set","node":{"key":"/fooempty","value":"","modifiedIndex":6,"createdIndex":6}}`,
},
{
"/v2/keys/foo/nodata",
vAndNoData,
http.StatusCreated,
`{"action":"set"}`,
},
}
for i, tt := range tests {
@ -183,6 +192,31 @@ func TestV2CreateUpdate(t *testing.T) {
"cause": "/nonexist",
},
},
// create with no data on success
{
"/v2/keys/create/nodata",
url.Values(map[string][]string{"value": {"XXX"}, "prevExist": {"false"}, "noDataOnSucces": {"true"}}),
http.StatusCreated,
map[string]interface{}{},
},
// update with no data on success
{
"/v2/keys/create/nodata",
url.Values(map[string][]string{"value": {"XXX"}, "prevExist": {"true"}, "noDataOnSucces": {"true"}}),
http.StatusOK,
map[string]interface{}{},
},
// created key failed with no data on success
{
"/v2/keys/create/foo",
url.Values(map[string][]string{"value": {"XXX"}, "prevExist": {"false"}, "noDataOnSucces": {"true"}}),
http.StatusPreconditionFailed,
map[string]interface{}{
"errorCode": float64(105),
"message": "Key already exists",
"cause": "/create/foo",
},
},
}
for i, tt := range tests {
@ -312,6 +346,25 @@ func TestV2CAS(t *testing.T) {
"cause": "[bad_value != ZZZ]",
},
},
{
"/v2/keys/cas/foo",
url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"6"}, "noDataOnSuccess": {"true"}}),
http.StatusOK,
map[string]interface{}{
"action": "compareAndSwap",
},
},
{
"/v2/keys/cas/foo",
url.Values(map[string][]string{"value": {"YYY"}, "prevIndex": {"10"}, "noDataOnSuccess": {"true"}}),
http.StatusPreconditionFailed,
map[string]interface{}{
"errorCode": float64(101),
"message": "Compare failed",
"cause": "[10 != 7]",
"index": float64(7),
},
},
}
for i, tt := range tests {