mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #16079 from serathius/robustness-range-request
Robustness range request
This commit is contained in:
commit
7d27e33a12
@ -30,17 +30,25 @@ func describeEtcdNonDeterministicResponse(request EtcdRequest, response EtcdNonD
|
|||||||
}
|
}
|
||||||
|
|
||||||
func describeEtcdResponse(request EtcdRequest, response EtcdResponse) string {
|
func describeEtcdResponse(request EtcdRequest, response EtcdResponse) string {
|
||||||
if request.Type == Txn {
|
switch request.Type {
|
||||||
|
case Range:
|
||||||
|
return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision)
|
||||||
|
case Txn:
|
||||||
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
|
return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision)
|
||||||
|
case LeaseGrant, LeaseRevoke, Defragment:
|
||||||
|
if response.Revision == 0 {
|
||||||
|
return "ok"
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("ok, rev: %d", response.Revision)
|
||||||
|
default:
|
||||||
|
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
|
||||||
}
|
}
|
||||||
if response.Revision == 0 {
|
|
||||||
return "ok"
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("ok, rev: %d", response.Revision)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func describeEtcdRequest(request EtcdRequest) string {
|
func describeEtcdRequest(request EtcdRequest) string {
|
||||||
switch request.Type {
|
switch request.Type {
|
||||||
|
case Range:
|
||||||
|
return describeRangeRequest(request.Range.Key, request.Range.RangeOptions)
|
||||||
case Txn:
|
case Txn:
|
||||||
onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess)
|
onSuccess := describeEtcdOperations(request.Txn.OperationsOnSuccess)
|
||||||
if len(request.Txn.Conditions) != 0 {
|
if len(request.Txn.Conditions) != 0 {
|
||||||
@ -100,51 +108,64 @@ func describeTxnResponse(request *TxnRequest, response *TxnResponse) string {
|
|||||||
|
|
||||||
func describeEtcdOperation(op EtcdOperation) string {
|
func describeEtcdOperation(op EtcdOperation) string {
|
||||||
switch op.Type {
|
switch op.Type {
|
||||||
case Range:
|
case RangeOperation:
|
||||||
if op.WithPrefix {
|
return describeRangeRequest(op.Key, op.RangeOptions)
|
||||||
if op.Limit != 0 {
|
case PutOperation:
|
||||||
return fmt.Sprintf("range(%q, limit=%d)", op.Key, op.Limit)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("range(%q)", op.Key)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("get(%q)", op.Key)
|
|
||||||
case Put:
|
|
||||||
if op.LeaseID != 0 {
|
if op.LeaseID != 0 {
|
||||||
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
|
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("put(%q, %s)", op.Key, describeValueOrHash(op.Value))
|
return fmt.Sprintf("put(%q, %s)", op.Key, describeValueOrHash(op.Value))
|
||||||
case Delete:
|
case DeleteOperation:
|
||||||
return fmt.Sprintf("delete(%q)", op.Key)
|
return fmt.Sprintf("delete(%q)", op.Key)
|
||||||
default:
|
default:
|
||||||
return fmt.Sprintf("<! unknown op: %q !>", op.Type)
|
return fmt.Sprintf("<! unknown op: %q !>", op.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func describeRangeRequest(key string, opts RangeOptions) string {
|
||||||
|
kwargs := []string{}
|
||||||
|
if opts.Limit != 0 {
|
||||||
|
kwargs = append(kwargs, fmt.Sprintf("limit=%d", opts.Limit))
|
||||||
|
}
|
||||||
|
command := "get"
|
||||||
|
if opts.WithPrefix {
|
||||||
|
command = "range"
|
||||||
|
}
|
||||||
|
if len(kwargs) == 0 {
|
||||||
|
return fmt.Sprintf("%s(%q)", command, key)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s(%q, %s)", command, key, strings.Join(kwargs, ", "))
|
||||||
|
}
|
||||||
|
|
||||||
func describeEtcdOperationResponse(req EtcdOperation, resp EtcdOperationResult) string {
|
func describeEtcdOperationResponse(req EtcdOperation, resp EtcdOperationResult) string {
|
||||||
switch req.Type {
|
switch req.Type {
|
||||||
case Range:
|
case RangeOperation:
|
||||||
if req.WithPrefix {
|
return describeRangeResponse(req.RangeOptions, resp.RangeResponse)
|
||||||
kvs := make([]string, len(resp.KVs))
|
case PutOperation:
|
||||||
for i, kv := range resp.KVs {
|
|
||||||
kvs[i] = describeValueOrHash(kv.Value)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("[%s], count: %d", strings.Join(kvs, ","), resp.Count)
|
|
||||||
} else {
|
|
||||||
if len(resp.KVs) == 0 {
|
|
||||||
return "nil"
|
|
||||||
} else {
|
|
||||||
return describeValueOrHash(resp.KVs[0].Value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case Put:
|
|
||||||
return fmt.Sprintf("ok")
|
return fmt.Sprintf("ok")
|
||||||
case Delete:
|
case DeleteOperation:
|
||||||
return fmt.Sprintf("deleted: %d", resp.Deleted)
|
return fmt.Sprintf("deleted: %d", resp.Deleted)
|
||||||
default:
|
default:
|
||||||
return fmt.Sprintf("<! unknown op: %q !>", req.Type)
|
return fmt.Sprintf("<! unknown op: %q !>", req.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func describeRangeResponse(opts RangeOptions, response RangeResponse) string {
|
||||||
|
if opts.WithPrefix {
|
||||||
|
kvs := make([]string, len(response.KVs))
|
||||||
|
for i, kv := range response.KVs {
|
||||||
|
kvs[i] = describeValueOrHash(kv.Value)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("[%s], count: %d", strings.Join(kvs, ","), response.Count)
|
||||||
|
} else {
|
||||||
|
if len(response.KVs) == 0 {
|
||||||
|
return "nil"
|
||||||
|
} else {
|
||||||
|
return describeValueOrHash(response.KVs[0].Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func describeValueOrHash(value ValueOrHash) string {
|
func describeValueOrHash(value ValueOrHash) string {
|
||||||
if value.Hash != 0 {
|
if value.Hash != 0 {
|
||||||
return fmt.Sprintf("hash: %d", value.Hash)
|
return fmt.Sprintf("hash: %d", value.Hash)
|
||||||
|
@ -95,18 +95,18 @@ func TestModelDescribe(t *testing.T) {
|
|||||||
expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`,
|
expectDescribe: `if(mod_rev(key9)==9).then(put("key9", "99")) -> err: "failed"`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
req: txnRequest([]EtcdCondition{{Key: "key9b", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9b", Value: ValueOrHash{Value: "991"}}}, []EtcdOperation{{Type: Range, Key: "key9b"}}),
|
req: txnRequest([]EtcdCondition{{Key: "key9b", ExpectedRevision: 9}}, []EtcdOperation{{Type: PutOperation, Key: "key9b", PutOptions: PutOptions{Value: ValueOrHash{Value: "991"}}}}, []EtcdOperation{{Type: RangeOperation, Key: "key9b"}}),
|
||||||
resp: txnResponse([]EtcdOperationResult{{}}, true, 10),
|
resp: txnResponse([]EtcdOperationResult{{}}, true, 10),
|
||||||
expectDescribe: `if(mod_rev(key9b)==9).then(put("key9b", "991")).else(get("key9b")) -> success(ok), rev: 10`,
|
expectDescribe: `if(mod_rev(key9b)==9).then(put("key9b", "991")).else(get("key9b")) -> success(ok), rev: 10`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
req: txnRequest([]EtcdCondition{{Key: "key9c", ExpectedRevision: 9}}, []EtcdOperation{{Type: Put, Key: "key9c", Value: ValueOrHash{Value: "992"}}}, []EtcdOperation{{Type: Range, Key: "key9c"}}),
|
req: txnRequest([]EtcdCondition{{Key: "key9c", ExpectedRevision: 9}}, []EtcdOperation{{Type: PutOperation, Key: "key9c", PutOptions: PutOptions{Value: ValueOrHash{Value: "992"}}}}, []EtcdOperation{{Type: RangeOperation, Key: "key9c"}}),
|
||||||
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{Key: "key9c", ValueRevision: ValueRevision{Value: ValueOrHash{Value: "993"}, ModRevision: 10}}}}}, false, 10),
|
resp: txnResponse([]EtcdOperationResult{{RangeResponse: RangeResponse{KVs: []KeyValue{{Key: "key9c", ValueRevision: ValueRevision{Value: ValueOrHash{Value: "993"}, ModRevision: 10}}}}}}, false, 10),
|
||||||
expectDescribe: `if(mod_rev(key9c)==9).then(put("key9c", "992")).else(get("key9c")) -> failure("993"), rev: 10`,
|
expectDescribe: `if(mod_rev(key9c)==9).then(put("key9c", "992")).else(get("key9c")) -> failure("993"), rev: 10`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
req: txnRequest(nil, []EtcdOperation{{Type: Range, Key: "10"}, {Type: Put, Key: "11", Value: ValueOrHash{Value: "111"}}, {Type: Delete, Key: "12"}}, nil),
|
req: txnRequest(nil, []EtcdOperation{{Type: RangeOperation, Key: "10"}, {Type: PutOperation, Key: "11", PutOptions: PutOptions{Value: ValueOrHash{Value: "111"}}}, {Type: DeleteOperation, Key: "12"}}, nil),
|
||||||
resp: txnResponse([]EtcdOperationResult{{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}, {}, {Deleted: 1}}, true, 10),
|
resp: txnResponse([]EtcdOperationResult{{RangeResponse: RangeResponse{KVs: []KeyValue{{ValueRevision: ValueRevision{Value: ValueOrHash{Value: "110"}}}}}}, {}, {Deleted: 1}}, true, 10),
|
||||||
expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`,
|
expectDescribe: `get("10"), put("11", "111"), delete("12") -> "110", ok, deleted: 1, rev: 10`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -73,6 +73,13 @@ func initState(request EtcdRequest, response EtcdResponse) etcdState {
|
|||||||
state := emptyState()
|
state := emptyState()
|
||||||
state.Revision = response.Revision
|
state.Revision = response.Revision
|
||||||
switch request.Type {
|
switch request.Type {
|
||||||
|
case Range:
|
||||||
|
for _, kv := range response.Range.KVs {
|
||||||
|
state.KeyValues[kv.Key] = ValueRevision{
|
||||||
|
Value: kv.Value,
|
||||||
|
ModRevision: kv.ModRevision,
|
||||||
|
}
|
||||||
|
}
|
||||||
case Txn:
|
case Txn:
|
||||||
if response.Txn.Failure {
|
if response.Txn.Failure {
|
||||||
return state
|
return state
|
||||||
@ -83,19 +90,19 @@ func initState(request EtcdRequest, response EtcdResponse) etcdState {
|
|||||||
for i, op := range request.Txn.OperationsOnSuccess {
|
for i, op := range request.Txn.OperationsOnSuccess {
|
||||||
opResp := response.Txn.Results[i]
|
opResp := response.Txn.Results[i]
|
||||||
switch op.Type {
|
switch op.Type {
|
||||||
case Range:
|
case RangeOperation:
|
||||||
for _, kv := range opResp.KVs {
|
for _, kv := range opResp.KVs {
|
||||||
state.KeyValues[kv.Key] = ValueRevision{
|
state.KeyValues[kv.Key] = ValueRevision{
|
||||||
Value: kv.Value,
|
Value: kv.Value,
|
||||||
ModRevision: kv.ModRevision,
|
ModRevision: kv.ModRevision,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case Put:
|
case PutOperation:
|
||||||
state.KeyValues[op.Key] = ValueRevision{
|
state.KeyValues[op.Key] = ValueRevision{
|
||||||
Value: op.Value,
|
Value: op.Value,
|
||||||
ModRevision: response.Revision,
|
ModRevision: response.Revision,
|
||||||
}
|
}
|
||||||
case Delete:
|
case DeleteOperation:
|
||||||
default:
|
default:
|
||||||
panic("Unknown operation")
|
panic("Unknown operation")
|
||||||
}
|
}
|
||||||
@ -131,6 +138,9 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
|
|||||||
}
|
}
|
||||||
s.KeyValues = newKVs
|
s.KeyValues = newKVs
|
||||||
switch request.Type {
|
switch request.Type {
|
||||||
|
case Range:
|
||||||
|
resp := s.getRange(request.Range.Key, request.Range.RangeOptions)
|
||||||
|
return s, EtcdResponse{Range: &resp, Revision: s.Revision}
|
||||||
case Txn:
|
case Txn:
|
||||||
failure := false
|
failure := false
|
||||||
for _, cond := range request.Txn.Conditions {
|
for _, cond := range request.Txn.Conditions {
|
||||||
@ -147,36 +157,11 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
|
|||||||
increaseRevision := false
|
increaseRevision := false
|
||||||
for i, op := range operations {
|
for i, op := range operations {
|
||||||
switch op.Type {
|
switch op.Type {
|
||||||
case Range:
|
case RangeOperation:
|
||||||
opResp[i] = EtcdOperationResult{
|
opResp[i] = EtcdOperationResult{
|
||||||
KVs: []KeyValue{},
|
RangeResponse: s.getRange(op.Key, op.RangeOptions),
|
||||||
}
|
}
|
||||||
if op.WithPrefix {
|
case PutOperation:
|
||||||
var count int64
|
|
||||||
for k, v := range s.KeyValues {
|
|
||||||
if strings.HasPrefix(k, op.Key) {
|
|
||||||
opResp[i].KVs = append(opResp[i].KVs, KeyValue{Key: k, ValueRevision: v})
|
|
||||||
count += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Slice(opResp[i].KVs, func(j, k int) bool {
|
|
||||||
return opResp[i].KVs[j].Key < opResp[i].KVs[k].Key
|
|
||||||
})
|
|
||||||
if op.Limit != 0 && count > op.Limit {
|
|
||||||
opResp[i].KVs = opResp[i].KVs[:op.Limit]
|
|
||||||
}
|
|
||||||
opResp[i].Count = count
|
|
||||||
} else {
|
|
||||||
value, ok := s.KeyValues[op.Key]
|
|
||||||
if ok {
|
|
||||||
opResp[i].KVs = append(opResp[i].KVs, KeyValue{
|
|
||||||
Key: op.Key,
|
|
||||||
ValueRevision: value,
|
|
||||||
})
|
|
||||||
opResp[i].Count = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case Put:
|
|
||||||
_, leaseExists := s.Leases[op.LeaseID]
|
_, leaseExists := s.Leases[op.LeaseID]
|
||||||
if op.LeaseID != 0 && !leaseExists {
|
if op.LeaseID != 0 && !leaseExists {
|
||||||
break
|
break
|
||||||
@ -190,7 +175,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
|
|||||||
if leaseExists {
|
if leaseExists {
|
||||||
s = attachToNewLease(s, op.LeaseID, op.Key)
|
s = attachToNewLease(s, op.LeaseID, op.Key)
|
||||||
}
|
}
|
||||||
case Delete:
|
case DeleteOperation:
|
||||||
if _, ok := s.KeyValues[op.Key]; ok {
|
if _, ok := s.KeyValues[op.Key]; ok {
|
||||||
delete(s.KeyValues, op.Key)
|
delete(s.KeyValues, op.Key)
|
||||||
increaseRevision = true
|
increaseRevision = true
|
||||||
@ -238,6 +223,38 @@ func (s etcdState) step(request EtcdRequest) (etcdState, EtcdResponse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
|
||||||
|
response := RangeResponse{
|
||||||
|
KVs: []KeyValue{},
|
||||||
|
}
|
||||||
|
if options.WithPrefix {
|
||||||
|
var count int64
|
||||||
|
for k, v := range s.KeyValues {
|
||||||
|
if strings.HasPrefix(k, key) {
|
||||||
|
response.KVs = append(response.KVs, KeyValue{Key: k, ValueRevision: v})
|
||||||
|
count += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Slice(response.KVs, func(j, k int) bool {
|
||||||
|
return response.KVs[j].Key < response.KVs[k].Key
|
||||||
|
})
|
||||||
|
if options.Limit != 0 && count > options.Limit {
|
||||||
|
response.KVs = response.KVs[:options.Limit]
|
||||||
|
}
|
||||||
|
response.Count = count
|
||||||
|
} else {
|
||||||
|
value, ok := s.KeyValues[key]
|
||||||
|
if ok {
|
||||||
|
response.KVs = append(response.KVs, KeyValue{
|
||||||
|
Key: key,
|
||||||
|
ValueRevision: value,
|
||||||
|
})
|
||||||
|
response.Count = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return response
|
||||||
|
}
|
||||||
|
|
||||||
func detachFromOldLease(s etcdState, key string) etcdState {
|
func detachFromOldLease(s etcdState, key string) etcdState {
|
||||||
if oldLeaseId, ok := s.KeyLeases[key]; ok {
|
if oldLeaseId, ok := s.KeyLeases[key]; ok {
|
||||||
delete(s.Leases[oldLeaseId].Keys, key)
|
delete(s.Leases[oldLeaseId].Keys, key)
|
||||||
@ -255,6 +272,7 @@ func attachToNewLease(s etcdState, leaseID int64, key string) etcdState {
|
|||||||
type RequestType string
|
type RequestType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
Range RequestType = "range"
|
||||||
Txn RequestType = "txn"
|
Txn RequestType = "txn"
|
||||||
LeaseGrant RequestType = "leaseGrant"
|
LeaseGrant RequestType = "leaseGrant"
|
||||||
LeaseRevoke RequestType = "leaseRevoke"
|
LeaseRevoke RequestType = "leaseRevoke"
|
||||||
@ -265,10 +283,28 @@ type EtcdRequest struct {
|
|||||||
Type RequestType
|
Type RequestType
|
||||||
LeaseGrant *LeaseGrantRequest
|
LeaseGrant *LeaseGrantRequest
|
||||||
LeaseRevoke *LeaseRevokeRequest
|
LeaseRevoke *LeaseRevokeRequest
|
||||||
|
Range *RangeRequest
|
||||||
Txn *TxnRequest
|
Txn *TxnRequest
|
||||||
Defragment *DefragmentRequest
|
Defragment *DefragmentRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RangeRequest struct {
|
||||||
|
Key string
|
||||||
|
RangeOptions
|
||||||
|
// TODO: Implement stale read using revision
|
||||||
|
revision int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type RangeOptions struct {
|
||||||
|
WithPrefix bool
|
||||||
|
Limit int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type PutOptions struct {
|
||||||
|
Value ValueOrHash
|
||||||
|
LeaseID int64
|
||||||
|
}
|
||||||
|
|
||||||
type TxnRequest struct {
|
type TxnRequest struct {
|
||||||
Conditions []EtcdCondition
|
Conditions []EtcdCondition
|
||||||
OperationsOnSuccess []EtcdOperation
|
OperationsOnSuccess []EtcdOperation
|
||||||
@ -281,14 +317,20 @@ type EtcdCondition struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type EtcdOperation struct {
|
type EtcdOperation struct {
|
||||||
Type OperationType
|
Type OperationType
|
||||||
Key string
|
Key string
|
||||||
WithPrefix bool
|
RangeOptions
|
||||||
Limit int64
|
PutOptions
|
||||||
Value ValueOrHash
|
|
||||||
LeaseID int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OperationType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
RangeOperation OperationType = "range-operation"
|
||||||
|
PutOperation OperationType = "put-operation"
|
||||||
|
DeleteOperation OperationType = "delete-operation"
|
||||||
|
)
|
||||||
|
|
||||||
type LeaseGrantRequest struct {
|
type LeaseGrantRequest struct {
|
||||||
LeaseID int64
|
LeaseID int64
|
||||||
}
|
}
|
||||||
@ -300,6 +342,7 @@ type DefragmentRequest struct{}
|
|||||||
type EtcdResponse struct {
|
type EtcdResponse struct {
|
||||||
Revision int64
|
Revision int64
|
||||||
Txn *TxnResponse
|
Txn *TxnResponse
|
||||||
|
Range *RangeResponse
|
||||||
LeaseGrant *LeaseGrantReponse
|
LeaseGrant *LeaseGrantReponse
|
||||||
LeaseRevoke *LeaseRevokeResponse
|
LeaseRevoke *LeaseRevokeResponse
|
||||||
Defragment *DefragmentResponse
|
Defragment *DefragmentResponse
|
||||||
@ -310,6 +353,11 @@ type TxnResponse struct {
|
|||||||
Results []EtcdOperationResult
|
Results []EtcdOperationResult
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RangeResponse struct {
|
||||||
|
KVs []KeyValue
|
||||||
|
Count int64
|
||||||
|
}
|
||||||
|
|
||||||
type LeaseGrantReponse struct {
|
type LeaseGrantReponse struct {
|
||||||
LeaseID int64
|
LeaseID int64
|
||||||
}
|
}
|
||||||
@ -317,8 +365,7 @@ type LeaseRevokeResponse struct{}
|
|||||||
type DefragmentResponse struct{}
|
type DefragmentResponse struct{}
|
||||||
|
|
||||||
type EtcdOperationResult struct {
|
type EtcdOperationResult struct {
|
||||||
KVs []KeyValue
|
RangeResponse
|
||||||
Count int64
|
|
||||||
Deleted int64
|
Deleted int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,18 +243,18 @@ func toEtcdOperation(op clientv3.Op) EtcdOperation {
|
|||||||
var opType OperationType
|
var opType OperationType
|
||||||
switch {
|
switch {
|
||||||
case op.IsGet():
|
case op.IsGet():
|
||||||
opType = Range
|
opType = RangeOperation
|
||||||
case op.IsPut():
|
case op.IsPut():
|
||||||
opType = Put
|
opType = PutOperation
|
||||||
case op.IsDelete():
|
case op.IsDelete():
|
||||||
opType = Delete
|
opType = DeleteOperation
|
||||||
default:
|
default:
|
||||||
panic("Unsupported operation")
|
panic("Unsupported operation")
|
||||||
}
|
}
|
||||||
return EtcdOperation{
|
return EtcdOperation{
|
||||||
Type: opType,
|
Type: opType,
|
||||||
Key: string(op.KeyBytes()),
|
Key: string(op.KeyBytes()),
|
||||||
Value: ValueOrHash{Value: string(op.ValueBytes())},
|
PutOptions: PutOptions{Value: ValueOrHash{Value: string(op.ValueBytes())}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -273,8 +273,10 @@ func toEtcdOperationResult(resp *etcdserverpb.ResponseOp) EtcdOperationResult {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return EtcdOperationResult{
|
return EtcdOperationResult{
|
||||||
KVs: kvs,
|
RangeResponse: RangeResponse{
|
||||||
Count: getResp.Count,
|
KVs: kvs,
|
||||||
|
Count: getResp.Count,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
case resp.GetResponsePut() != nil:
|
case resp.GetResponsePut() != nil:
|
||||||
return EtcdOperationResult{}
|
return EtcdOperationResult{}
|
||||||
@ -339,7 +341,7 @@ func getRequest(key string) EtcdRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func rangeRequest(key string, withPrefix bool, limit int64) EtcdRequest {
|
func rangeRequest(key string, withPrefix bool, limit int64) EtcdRequest {
|
||||||
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: Range, Key: key, WithPrefix: withPrefix, Limit: limit}}}}
|
return EtcdRequest{Type: Range, Range: &RangeRequest{Key: key, RangeOptions: RangeOptions{WithPrefix: withPrefix, Limit: limit}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func emptyGetResponse(revision int64) EtcdNonDeterministicResponse {
|
func emptyGetResponse(revision int64) EtcdNonDeterministicResponse {
|
||||||
@ -351,7 +353,7 @@ func getResponse(key, value string, modRevision, revision int64) EtcdNonDetermin
|
|||||||
}
|
}
|
||||||
|
|
||||||
func rangeResponse(kvs []*mvccpb.KeyValue, count int64, revision int64) EtcdNonDeterministicResponse {
|
func rangeResponse(kvs []*mvccpb.KeyValue, count int64, revision int64) EtcdNonDeterministicResponse {
|
||||||
result := EtcdOperationResult{KVs: make([]KeyValue, len(kvs))}
|
result := RangeResponse{KVs: make([]KeyValue, len(kvs)), Count: count}
|
||||||
|
|
||||||
for i, kv := range kvs {
|
for i, kv := range kvs {
|
||||||
result.KVs[i] = KeyValue{
|
result.KVs[i] = KeyValue{
|
||||||
@ -361,9 +363,8 @@ func rangeResponse(kvs []*mvccpb.KeyValue, count int64, revision int64) EtcdNonD
|
|||||||
ModRevision: kv.ModRevision,
|
ModRevision: kv.ModRevision,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
result.Count = count
|
|
||||||
}
|
}
|
||||||
return EtcdNonDeterministicResponse{EtcdResponse: EtcdResponse{Txn: &TxnResponse{Results: []EtcdOperationResult{result}}, Revision: revision}}
|
return EtcdNonDeterministicResponse{EtcdResponse: EtcdResponse{Range: &result, Revision: revision}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func failedResponse(err error) EtcdNonDeterministicResponse {
|
func failedResponse(err error) EtcdNonDeterministicResponse {
|
||||||
@ -375,7 +376,7 @@ func unknownResponse(revision int64) EtcdNonDeterministicResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func putRequest(key, value string) EtcdRequest {
|
func putRequest(key, value string) EtcdRequest {
|
||||||
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}}}}
|
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: PutOperation, Key: key, PutOptions: PutOptions{Value: ToValueOrHash(value)}}}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func putResponse(revision int64) EtcdNonDeterministicResponse {
|
func putResponse(revision int64) EtcdNonDeterministicResponse {
|
||||||
@ -383,7 +384,7 @@ func putResponse(revision int64) EtcdNonDeterministicResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func deleteRequest(key string) EtcdRequest {
|
func deleteRequest(key string) EtcdRequest {
|
||||||
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: Delete, Key: key}}}}
|
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: DeleteOperation, Key: key}}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteResponse(deleted int64, revision int64) EtcdNonDeterministicResponse {
|
func deleteResponse(deleted int64, revision int64) EtcdNonDeterministicResponse {
|
||||||
@ -406,7 +407,7 @@ func compareRevision(key string, expectedRevision int64) *EtcdCondition {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func putOperation(key, value string) *EtcdOperation {
|
func putOperation(key, value string) *EtcdOperation {
|
||||||
return &EtcdOperation{Type: Put, Key: key, Value: ToValueOrHash(value)}
|
return &EtcdOperation{Type: PutOperation, Key: key, PutOptions: PutOptions{Value: ToValueOrHash(value)}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func txnRequestSingleOperation(cond *EtcdCondition, onSuccess, onFailure *EtcdOperation) EtcdRequest {
|
func txnRequestSingleOperation(cond *EtcdCondition, onSuccess, onFailure *EtcdOperation) EtcdRequest {
|
||||||
@ -442,7 +443,7 @@ func txnResponse(result []EtcdOperationResult, succeeded bool, revision int64) E
|
|||||||
}
|
}
|
||||||
|
|
||||||
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
|
func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
|
||||||
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value), LeaseID: leaseID}}}}
|
return EtcdRequest{Type: Txn, Txn: &TxnRequest{OperationsOnSuccess: []EtcdOperation{{Type: PutOperation, Key: key, PutOptions: PutOptions{Value: ToValueOrHash(value), LeaseID: leaseID}}}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func leaseGrantRequest(leaseID int64) EtcdRequest {
|
func leaseGrantRequest(leaseID int64) EtcdRequest {
|
||||||
|
@ -22,14 +22,6 @@ import (
|
|||||||
"github.com/anishathalye/porcupine"
|
"github.com/anishathalye/porcupine"
|
||||||
)
|
)
|
||||||
|
|
||||||
type OperationType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
Range OperationType = "range"
|
|
||||||
Put OperationType = "put"
|
|
||||||
Delete OperationType = "delete"
|
|
||||||
)
|
|
||||||
|
|
||||||
// NonDeterministicModel extends DeterministicModel to handle requests that have unknown or error response.
|
// NonDeterministicModel extends DeterministicModel to handle requests that have unknown or error response.
|
||||||
// Unknown/error response doesn't inform whether request was persisted or not, so model
|
// Unknown/error response doesn't inform whether request was persisted or not, so model
|
||||||
// considers both cases. This is represented as multiple equally possible deterministic states.
|
// considers both cases. This is represented as multiple equally possible deterministic states.
|
||||||
|
@ -264,18 +264,18 @@ func toWatchEvent(event clientv3.Event) WatchEvent {
|
|||||||
var op model.OperationType
|
var op model.OperationType
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case mvccpb.PUT:
|
case mvccpb.PUT:
|
||||||
op = model.Put
|
op = model.PutOperation
|
||||||
case mvccpb.DELETE:
|
case mvccpb.DELETE:
|
||||||
op = model.Delete
|
op = model.DeleteOperation
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
|
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
|
||||||
}
|
}
|
||||||
return WatchEvent{
|
return WatchEvent{
|
||||||
Revision: event.Kv.ModRevision,
|
Revision: event.Kv.ModRevision,
|
||||||
Op: model.EtcdOperation{
|
Op: model.EtcdOperation{
|
||||||
Type: op,
|
Type: op,
|
||||||
Key: string(event.Kv.Key),
|
Key: string(event.Kv.Key),
|
||||||
Value: model.ToValueOrHash(string(event.Kv.Value)),
|
PutOptions: model.PutOptions{Value: model.ToValueOrHash(string(event.Kv.Value))},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -195,24 +195,24 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op)
|
|||||||
atLeastOnePut := false
|
atLeastOnePut := false
|
||||||
for i := 0; i < MultiOpTxnOpCount; i++ {
|
for i := 0; i < MultiOpTxnOpCount; i++ {
|
||||||
opTypes[i] = t.pickOperationType()
|
opTypes[i] = t.pickOperationType()
|
||||||
if opTypes[i] == model.Put {
|
if opTypes[i] == model.PutOperation {
|
||||||
atLeastOnePut = true
|
atLeastOnePut = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Ensure at least one put to make operation unique
|
// Ensure at least one put to make operation unique
|
||||||
if !atLeastOnePut {
|
if !atLeastOnePut {
|
||||||
opTypes[0] = model.Put
|
opTypes[0] = model.PutOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, opType := range opTypes {
|
for i, opType := range opTypes {
|
||||||
key := fmt.Sprintf("%d", keys[i])
|
key := fmt.Sprintf("%d", keys[i])
|
||||||
switch opType {
|
switch opType {
|
||||||
case model.Range:
|
case model.RangeOperation:
|
||||||
ops = append(ops, clientv3.OpGet(key))
|
ops = append(ops, clientv3.OpGet(key))
|
||||||
case model.Put:
|
case model.PutOperation:
|
||||||
value := fmt.Sprintf("%d", ids.NewRequestId())
|
value := fmt.Sprintf("%d", ids.NewRequestId())
|
||||||
ops = append(ops, clientv3.OpPut(key, value))
|
ops = append(ops, clientv3.OpPut(key, value))
|
||||||
case model.Delete:
|
case model.DeleteOperation:
|
||||||
ops = append(ops, clientv3.OpDelete(key))
|
ops = append(ops, clientv3.OpDelete(key))
|
||||||
default:
|
default:
|
||||||
panic("unsuported choice type")
|
panic("unsuported choice type")
|
||||||
@ -224,10 +224,10 @@ func (t etcdTraffic) pickMultiTxnOps(ids identity.Provider) (ops []clientv3.Op)
|
|||||||
func (t etcdTraffic) pickOperationType() model.OperationType {
|
func (t etcdTraffic) pickOperationType() model.OperationType {
|
||||||
roll := rand.Int() % 100
|
roll := rand.Int() % 100
|
||||||
if roll < 10 {
|
if roll < 10 {
|
||||||
return model.Delete
|
return model.DeleteOperation
|
||||||
}
|
}
|
||||||
if roll < 50 {
|
if roll < 50 {
|
||||||
return model.Range
|
return model.RangeOperation
|
||||||
}
|
}
|
||||||
return model.Put
|
return model.PutOperation
|
||||||
}
|
}
|
||||||
|
@ -73,12 +73,14 @@ func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents
|
|||||||
|
|
||||||
func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) *traffic.TimedWatchEvent {
|
func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperation]traffic.TimedWatchEvent) *traffic.TimedWatchEvent {
|
||||||
for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) {
|
for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) {
|
||||||
if etcdOp.Type == model.Put {
|
if etcdOp.Type == model.PutOperation {
|
||||||
// Remove LeaseID which is not exposed in watch.
|
// Remove LeaseID which is not exposed in watch.
|
||||||
event, ok := watchEvents[model.EtcdOperation{
|
event, ok := watchEvents[model.EtcdOperation{
|
||||||
Type: etcdOp.Type,
|
Type: etcdOp.Type,
|
||||||
Key: etcdOp.Key,
|
Key: etcdOp.Key,
|
||||||
Value: etcdOp.Value,
|
PutOptions: model.PutOptions{
|
||||||
|
Value: etcdOp.Value,
|
||||||
|
},
|
||||||
}]
|
}]
|
||||||
if ok {
|
if ok {
|
||||||
return &event
|
return &event
|
||||||
@ -90,7 +92,7 @@ func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.EtcdOperat
|
|||||||
|
|
||||||
func hasNonUniqueWriteOperation(request *model.TxnRequest) bool {
|
func hasNonUniqueWriteOperation(request *model.TxnRequest) bool {
|
||||||
for _, etcdOp := range request.OperationsOnSuccess {
|
for _, etcdOp := range request.OperationsOnSuccess {
|
||||||
if etcdOp.Type == model.Put || etcdOp.Type == model.Delete {
|
if etcdOp.Type == model.PutOperation || etcdOp.Type == model.DeleteOperation {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -99,7 +101,7 @@ func hasNonUniqueWriteOperation(request *model.TxnRequest) bool {
|
|||||||
|
|
||||||
func hasUniqueWriteOperation(request *model.TxnRequest) bool {
|
func hasUniqueWriteOperation(request *model.TxnRequest) bool {
|
||||||
for _, etcdOp := range request.OperationsOnSuccess {
|
for _, etcdOp := range request.OperationsOnSuccess {
|
||||||
if etcdOp.Type == model.Put {
|
if etcdOp.Type == model.PutOperation {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user