Merge pull request #5880 from xiang90/put_prev

add options to return prev_kv
This commit is contained in:
Xiang Li 2016-07-05 21:03:56 -07:00 committed by GitHub
commit 234c30c061
11 changed files with 493 additions and 366 deletions

View File

@ -671,10 +671,10 @@
"format": "byte", "format": "byte",
"description": "key is the first key to delete in the range." "description": "key is the first key to delete in the range."
}, },
"preserveKVs": { "prev_kv": {
"type": "boolean", "type": "boolean",
"format": "boolean", "format": "boolean",
"description": "If preserveKVs is set, the deleted KVs will be preserved for delete events\nThe preserved KVs will be returned as response.\nIt requires read permission to read the deleted KVs." "description": "If prev_kv is set, etcd gets the previous key-value pairs before deleting it.\nThe previous key-value pairs will be returned in the delte response."
}, },
"range_end": { "range_end": {
"type": "string", "type": "string",
@ -686,13 +686,6 @@
"etcdserverpbDeleteRangeResponse": { "etcdserverpbDeleteRangeResponse": {
"type": "object", "type": "object",
"properties": { "properties": {
"KVs": {
"type": "array",
"items": {
"$ref": "#/definitions/mvccpbKeyValue"
},
"description": "if preserveKVs is set in the request, the deleted KVs will be returned."
},
"deleted": { "deleted": {
"type": "string", "type": "string",
"format": "int64", "format": "int64",
@ -700,6 +693,13 @@
}, },
"header": { "header": {
"$ref": "#/definitions/etcdserverpbResponseHeader" "$ref": "#/definitions/etcdserverpbResponseHeader"
},
"prev_kvs": {
"type": "array",
"items": {
"$ref": "#/definitions/mvccpbKeyValue"
},
"description": "if prev_kv is set in the request, the previous key-value pairs will be returned."
} }
} }
}, },
@ -933,6 +933,11 @@
"format": "int64", "format": "int64",
"description": "lease is the lease ID to associate with the key in the key-value store. A lease\nvalue of 0 indicates no lease." "description": "lease is the lease ID to associate with the key in the key-value store. A lease\nvalue of 0 indicates no lease."
}, },
"prev_kv": {
"type": "boolean",
"format": "boolean",
"description": "If prev_kv is set, etcd gets the previous key-value pair before changing it.\nThe previous key-value pair will be returned in the put response."
},
"value": { "value": {
"type": "string", "type": "string",
"format": "byte", "format": "byte",
@ -945,6 +950,10 @@
"properties": { "properties": {
"header": { "header": {
"$ref": "#/definitions/etcdserverpbResponseHeader" "$ref": "#/definitions/etcdserverpbResponseHeader"
},
"prev_kv": {
"$ref": "#/definitions/mvccpbKeyValue",
"description": "if prev_kv is set in the request, the previous key-value pair will be returned."
} }
} }
}, },

View File

@ -156,14 +156,14 @@ func (kv *kv) do(ctx context.Context, op Op) (OpResponse, error) {
} }
case tPut: case tPut:
var resp *pb.PutResponse var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
resp, err = kv.remote.Put(ctx, r) resp, err = kv.remote.Put(ctx, r)
if err == nil { if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil return OpResponse{put: (*PutResponse)(resp)}, nil
} }
case tDeleteRange: case tDeleteRange:
var resp *pb.DeleteRangeResponse var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PreserveKVs: op.preserveKVs} r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r) resp, err = kv.remote.DeleteRange(ctx, r)
if err == nil { if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil return OpResponse{del: (*DeleteResponse)(resp)}, nil

View File

@ -45,11 +45,9 @@ type Op struct {
// for range, watch // for range, watch
rev int64 rev int64
// for watch, put, delete
prevKV bool prevKV bool
// for delete
preserveKVs bool
// progressNotify is for progress updates. // progressNotify is for progress updates.
progressNotify bool progressNotify bool
@ -76,10 +74,10 @@ func (op Op) toRequestOp() *pb.RequestOp {
} }
return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}} return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: r}}
case tPut: case tPut:
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID)} r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}} return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
case tDeleteRange: case tDeleteRange:
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PreserveKVs: op.preserveKVs} r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}} return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
default: default:
@ -133,8 +131,6 @@ func OpPut(key, val string, opts ...OpOption) Op {
panic("unexpected serializable in put") panic("unexpected serializable in put")
case ret.countOnly: case ret.countOnly:
panic("unexpected countOnly in put") panic("unexpected countOnly in put")
case ret.preserveKVs:
panic("unexpected preserveKVs in put")
} }
return ret return ret
} }
@ -153,8 +149,6 @@ func opWatch(key string, opts ...OpOption) Op {
panic("unexpected serializable in watch") panic("unexpected serializable in watch")
case ret.countOnly: case ret.countOnly:
panic("unexpected countOnly in watch") panic("unexpected countOnly in watch")
case ret.preserveKVs:
panic("unexpected preserveKVs in watch")
} }
return ret return ret
} }
@ -266,11 +260,6 @@ func withTop(target SortTarget, order SortOrder) []OpOption {
return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)} return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
} }
// WithPreserveKVs preserves the deleted KVs for attaching in responses.
func WithPreserveKVs() OpOption {
return func(op *Op) { op.preserveKVs = true }
}
// WithProgressNotify makes watch server send periodic progress updates. // WithProgressNotify makes watch server send periodic progress updates.
// Progress updates have zero events in WatchResponse. // Progress updates have zero events in WatchResponse.
func WithProgressNotify() OpOption { func WithProgressNotify() OpOption {

View File

@ -23,7 +23,7 @@ import (
var ( var (
delPrefix bool delPrefix bool
delKVs bool delPrevKV bool
) )
// NewDelCommand returns the cobra command for "del". // NewDelCommand returns the cobra command for "del".
@ -35,7 +35,7 @@ func NewDelCommand() *cobra.Command {
} }
cmd.Flags().BoolVar(&delPrefix, "prefix", false, "delete keys with matching prefix") cmd.Flags().BoolVar(&delPrefix, "prefix", false, "delete keys with matching prefix")
cmd.Flags().BoolVar(&delKVs, "preserve-kvs", false, "preserve and return deleted key-value pairs") cmd.Flags().BoolVar(&delPrevKV, "prev-kv", false, "return deleted key-value pairs")
return cmd return cmd
} }
@ -68,8 +68,8 @@ func getDelOp(cmd *cobra.Command, args []string) (string, []clientv3.OpOption) {
if delPrefix { if delPrefix {
opts = append(opts, clientv3.WithPrefix()) opts = append(opts, clientv3.WithPrefix())
} }
if delKVs { if delPrevKV {
opts = append(opts, clientv3.WithPreserveKVs()) opts = append(opts, clientv3.WithPrevKV())
} }
return key, opts return key, opts

View File

@ -108,7 +108,7 @@ type simplePrinter struct {
func (s *simplePrinter) Del(resp v3.DeleteResponse) { func (s *simplePrinter) Del(resp v3.DeleteResponse) {
fmt.Println(resp.Deleted) fmt.Println(resp.Deleted)
for _, kv := range resp.KVs { for _, kv := range resp.PrevKvs {
printKV(s.isHex, kv) printKV(s.isHex, kv)
} }
} }
@ -119,7 +119,12 @@ func (s *simplePrinter) Get(resp v3.GetResponse) {
} }
} }
func (s *simplePrinter) Put(r v3.PutResponse) { fmt.Println("OK") } func (s *simplePrinter) Put(r v3.PutResponse) {
fmt.Println("OK")
if r.PrevKv != nil {
printKV(s.isHex, r.PrevKv)
}
}
func (s *simplePrinter) Txn(resp v3.TxnResponse) { func (s *simplePrinter) Txn(resp v3.TxnResponse) {
if resp.Succeeded { if resp.Succeeded {

View File

@ -25,6 +25,7 @@ import (
var ( var (
leaseStr string leaseStr string
putPrevKV bool
) )
// NewPutCommand returns the cobra command for "put". // NewPutCommand returns the cobra command for "put".
@ -49,6 +50,7 @@ will store the content of the file to <key>.
Run: putCommandFunc, Run: putCommandFunc,
} }
cmd.Flags().StringVar(&leaseStr, "lease", "0", "lease ID (in hexadecimal) to attach to the key") cmd.Flags().StringVar(&leaseStr, "lease", "0", "lease ID (in hexadecimal) to attach to the key")
cmd.Flags().BoolVar(&putPrevKV, "prev-kv", false, "return changed key-value pairs")
return cmd return cmd
} }
@ -85,6 +87,9 @@ func getPutOp(cmd *cobra.Command, args []string) (string, string, []clientv3.OpO
if id != 0 { if id != 0 {
opts = append(opts, clientv3.WithLease(clientv3.LeaseID(id))) opts = append(opts, clientv3.WithLease(clientv3.LeaseID(id)))
} }
if putPrevKV {
opts = append(opts, clientv3.WithPrevKV())
}
return key, value, opts return key, value, opts
} }

View File

@ -159,6 +159,22 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev int64 rev int64
err error err error
) )
var rr *mvcc.RangeResult
if p.PrevKv {
if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
} else {
rr, err = a.s.KV().Range(p.Key, nil, mvcc.RangeOptions{})
if err != nil {
return nil, err
}
}
}
if txnID != noTxn { if txnID != noTxn {
rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil { if err != nil {
@ -174,6 +190,9 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
rev = a.s.KV().Put(p.Key, p.Value, leaseID) rev = a.s.KV().Put(p.Key, p.Value, leaseID)
} }
resp.Header.Revision = rev resp.Header.Revision = rev
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
}
return resp, nil return resp, nil
} }
@ -192,7 +211,7 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
} }
var rr *mvcc.RangeResult var rr *mvcc.RangeResult
if dr.PreserveKVs { if dr.PrevKv {
if txnID != noTxn { if txnID != noTxn {
rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{}) rr, err = a.s.KV().TxnRange(txnID, dr.Key, dr.RangeEnd, mvcc.RangeOptions{})
if err != nil { if err != nil {
@ -218,7 +237,7 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
resp.Deleted = n resp.Deleted = n
if rr != nil { if rr != nil {
for i := range rr.KVs { for i := range rr.KVs {
resp.KVs = append(resp.KVs, &rr.KVs[i]) resp.PrevKvs = append(resp.PrevKvs, &rr.KVs[i])
} }
} }
resp.Header.Revision = rev resp.Header.Revision = rev

View File

@ -56,6 +56,9 @@ func (aa *authApplierV3) Put(txnID int64, r *pb.PutRequest) (*pb.PutResponse, er
if !aa.as.IsPutPermitted(aa.user, r.Key) { if !aa.as.IsPutPermitted(aa.user, r.Key) {
return nil, auth.ErrPermissionDenied return nil, auth.ErrPermissionDenied
} }
if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, nil) {
return nil, auth.ErrPermissionDenied
}
return aa.applierV3.Put(txnID, r) return aa.applierV3.Put(txnID, r)
} }
@ -70,7 +73,7 @@ func (aa *authApplierV3) DeleteRange(txnID int64, r *pb.DeleteRangeRequest) (*pb
if !aa.as.IsDeleteRangePermitted(aa.user, r.Key, r.RangeEnd) { if !aa.as.IsDeleteRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied return nil, auth.ErrPermissionDenied
} }
if r.PreserveKVs && !aa.as.IsRangePermitted(aa.user, r.Key, r.RangeEnd) { if r.PrevKv && !aa.as.IsRangePermitted(aa.user, r.Key, r.RangeEnd) {
return nil, auth.ErrPermissionDenied return nil, auth.ErrPermissionDenied
} }
@ -103,7 +106,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) bool {
continue continue
} }
if tv.RequestDeleteRange.PreserveKVs && !aa.as.IsRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) { if tv.RequestDeleteRange.PrevKv && !aa.as.IsRangePermitted(aa.user, tv.RequestDeleteRange.Key, tv.RequestDeleteRange.RangeEnd) {
return false return false
} }

File diff suppressed because it is too large Load Diff

View File

@ -391,10 +391,16 @@ message PutRequest {
// lease is the lease ID to associate with the key in the key-value store. A lease // lease is the lease ID to associate with the key in the key-value store. A lease
// value of 0 indicates no lease. // value of 0 indicates no lease.
int64 lease = 3; int64 lease = 3;
// If prev_kv is set, etcd gets the previous key-value pair before changing it.
// The previous key-value pair will be returned in the put response.
bool prev_kv = 4;
} }
message PutResponse { message PutResponse {
ResponseHeader header = 1; ResponseHeader header = 1;
// if prev_kv is set in the request, the previous key-value pair will be returned.
mvccpb.KeyValue prev_kv = 2;
} }
message DeleteRangeRequest { message DeleteRangeRequest {
@ -405,18 +411,17 @@ message DeleteRangeRequest {
// If range_end is '\0', the range is all keys greater than or equal to the key argument. // If range_end is '\0', the range is all keys greater than or equal to the key argument.
bytes range_end = 2; bytes range_end = 2;
// If preserveKVs is set, the deleted KVs will be preserved for delete events // If prev_kv is set, etcd gets the previous key-value pairs before deleting it.
// The preserved KVs will be returned as response. // The previous key-value pairs will be returned in the delte response.
// It requires read permission to read the deleted KVs. bool prev_kv = 3;
bool preserveKVs = 3;
} }
message DeleteRangeResponse { message DeleteRangeResponse {
ResponseHeader header = 1; ResponseHeader header = 1;
// deleted is the number of keys deleted by the delete range request. // deleted is the number of keys deleted by the delete range request.
int64 deleted = 2; int64 deleted = 2;
// if preserveKVs is set in the request, the deleted KVs will be returned. // if prev_kv is set in the request, the previous key-value pairs will be returned.
repeated mvccpb.KeyValue KVs = 3; repeated mvccpb.KeyValue prev_kvs = 3;
} }
message RequestOp { message RequestOp {

View File

@ -379,7 +379,7 @@ func TestV3DeleteRange(t *testing.T) {
keySet []string keySet []string
begin string begin string
end string end string
preserveKVs bool prevKV bool
wantSet [][]byte wantSet [][]byte
deleted int64 deleted int64
@ -444,7 +444,7 @@ func TestV3DeleteRange(t *testing.T) {
dreq := &pb.DeleteRangeRequest{ dreq := &pb.DeleteRangeRequest{
Key: []byte(tt.begin), Key: []byte(tt.begin),
RangeEnd: []byte(tt.end), RangeEnd: []byte(tt.end),
PreserveKVs: tt.preserveKVs, PrevKv: tt.prevKV,
} }
dresp, err := kvc.DeleteRange(context.TODO(), dreq) dresp, err := kvc.DeleteRange(context.TODO(), dreq)
if err != nil { if err != nil {
@ -453,9 +453,9 @@ func TestV3DeleteRange(t *testing.T) {
if tt.deleted != dresp.Deleted { if tt.deleted != dresp.Deleted {
t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted) t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
} }
if tt.preserveKVs { if tt.prevKV {
if len(dresp.KVs) != int(dresp.Deleted) { if len(dresp.PrevKvs) != int(dresp.Deleted) {
t.Errorf("preserve %d keys, want %d", len(dresp.KVs), dresp.Deleted) t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
} }
} }