Separate persisted responses without knowing their revision to prevent duplicating state during linearization

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2024-06-22 14:08:02 +02:00
parent 0dd79f4e18
commit 1870222f41
8 changed files with 145 additions and 45 deletions

View File

@ -28,8 +28,11 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
if response.ClientError != "" { if response.ClientError != "" {
return fmt.Sprintf("err: %q", response.ClientError) return fmt.Sprintf("err: %q", response.ClientError)
} }
if response.PartialResponse { if response.Persisted {
return fmt.Sprintf("unknown, rev: %d", response.Revision) if response.PersistedRevision != 0 {
return fmt.Sprintf("unknown, rev: %d", response.PersistedRevision)
}
return fmt.Sprintf("unknown")
} }
switch request.Type { switch request.Type {
case Range: case Range:

View File

@ -123,7 +123,7 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
if request.Range.Revision < newState.CompactRevision { if request.Range.Revision < newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}} return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
} }
return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}} return newState, MaybeEtcdResponse{Persisted: true, PersistedRevision: newState.Revision}
case Txn: case Txn:
failure := false failure := false
for _, cond := range request.Txn.Conditions { for _, cond := range request.Txn.Conditions {
@ -351,14 +351,16 @@ type LeaseRevokeRequest struct {
} }
type DefragmentRequest struct{} type DefragmentRequest struct{}
// MaybeEtcdResponse extends EtcdResponse to represent partial or failed responses. // MaybeEtcdResponse extends EtcdResponse to include partial information about responses to a request.
// Possible states: // Possible response state information:
// * Normal response. Only EtcdResponse is set. // * Normal response. Client observed response. Only EtcdResponse is set.
// * Partial response. The EtcdResponse.Revision and PartialResponse are set. // * Persisted. Client didn't observe response, but we know it was persisted by etcd. Only Persisted is set
// * Failed response. Only Err is set. // * Persisted with Revision. Client didn't observe response, but we know that it was persisted, and it's revision. Both Persisted and PersistedRevision is set.
// * Error response. Client observed error, but we don't know if it was persisted. Only Error is set.
type MaybeEtcdResponse struct { type MaybeEtcdResponse struct {
EtcdResponse EtcdResponse
PartialResponse bool Persisted bool
PersistedRevision int64
Error string Error string
} }
@ -376,7 +378,15 @@ type EtcdResponse struct {
} }
func Match(r1, r2 MaybeEtcdResponse) bool { func Match(r1, r2 MaybeEtcdResponse) bool {
return ((r1.PartialResponse || r2.PartialResponse) && (r1.Revision == r2.Revision)) || reflect.DeepEqual(r1, r2) r1Revision := r1.Revision
if r1.Persisted {
r1Revision = r1.PersistedRevision
}
r2Revision := r2.Revision
if r2.Persisted {
r2Revision = r2.PersistedRevision
}
return (r1.Persisted && r1.PersistedRevision == 0) || (r2.Persisted && r2.PersistedRevision == 0) || ((r1.Persisted || r2.Persisted) && (r1.Error != "" || r2.Error != "" || r1Revision == r2Revision)) || reflect.DeepEqual(r1, r2)
} }
type TxnResponse struct { type TxnResponse struct {

View File

@ -365,7 +365,7 @@ func failedResponse(err error) MaybeEtcdResponse {
} }
func partialResponse(revision int64) MaybeEtcdResponse { func partialResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: revision}} return MaybeEtcdResponse{Persisted: true, PersistedRevision: revision}
} }
func putRequest(key, value string) EtcdRequest { func putRequest(key, value string) EtcdRequest {

View File

@ -58,17 +58,19 @@ func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtc
var newStates nonDeterministicState var newStates nonDeterministicState
switch { switch {
case response.Error != "": case response.Error != "":
newStates = states.stepFailedResponse(request) newStates = states.applyFailedRequest(request)
case response.PartialResponse: case response.Persisted && response.PersistedRevision == 0:
newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision) newStates = states.applyPersistedRequest(request)
case response.Persisted && response.PersistedRevision != 0:
newStates = states.applyPersistedRequestWithRevision(request, response.PersistedRevision)
default: default:
newStates = states.applySuccessfulResponse(request, response.EtcdResponse) newStates = states.applyRequestWithResponse(request, response.EtcdResponse)
} }
return len(newStates) > 0, newStates return len(newStates) > 0, newStates
} }
// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost. // applyFailedRequest returns both the original states and states with applied request. It considers both cases, request was persisted and request was lost.
func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState { func (states nonDeterministicState) applyFailedRequest(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)*2) newStates := make(nonDeterministicState, 0, len(states)*2)
for _, s := range states { for _, s := range states {
newStates = append(newStates, s) newStates = append(newStates, s)
@ -80,8 +82,18 @@ func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonD
return newStates return newStates
} }
// applyResponseRevision filters possible states by leaving ony states that would return proper revision. // applyPersistedRequest applies request to all possible states.
func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState { func (states nonDeterministicState) applyPersistedRequest(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, _ := s.Step(request)
newStates = append(newStates, newState)
}
return newStates
}
// applyPersistedRequestWithRevision applies request to all possible states, but leaves only states that would return proper revision.
func (states nonDeterministicState) applyPersistedRequestWithRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)) newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states { for _, s := range states {
newState, modelResponse := s.Step(request) newState, modelResponse := s.Step(request)
@ -92,8 +104,8 @@ func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, r
return newStates return newStates
} }
// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly. // applyRequestWithResponse applies request to all possible states, but leaves only state that would return proper response.
func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState { func (states nonDeterministicState) applyRequestWithResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)) newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states { for _, s := range states {
newState, modelResponse := s.Step(request) newState, modelResponse := s.Step(request)

View File

@ -391,7 +391,7 @@ func TestModelResponseMatch(t *testing.T) {
}, },
{ {
resp1: getResponse("key", "a", 1, 1), resp1: getResponse("key", "a", 1, 1),
resp2: partialResponse(0), resp2: partialResponse(2),
expectMatch: false, expectMatch: false,
}, },
{ {
@ -416,9 +416,14 @@ func TestModelResponseMatch(t *testing.T) {
}, },
{ {
resp1: putResponse(3), resp1: putResponse(3),
resp2: partialResponse(0), resp2: partialResponse(1),
expectMatch: false, expectMatch: false,
}, },
{
resp1: putResponse(3),
resp2: partialResponse(0),
expectMatch: true,
},
{ {
resp1: deleteResponse(1, 5), resp1: deleteResponse(1, 5),
resp2: deleteResponse(1, 5), resp2: deleteResponse(1, 5),
@ -446,13 +451,18 @@ func TestModelResponseMatch(t *testing.T) {
}, },
{ {
resp1: deleteResponse(0, 5), resp1: deleteResponse(0, 5),
resp2: partialResponse(0), resp2: partialResponse(4),
expectMatch: false, expectMatch: false,
}, },
{
resp1: deleteResponse(0, 5),
resp2: partialResponse(0),
expectMatch: true,
},
{ {
resp1: deleteResponse(1, 5), resp1: deleteResponse(1, 5),
resp2: partialResponse(0), resp2: partialResponse(0),
expectMatch: false, expectMatch: true,
}, },
{ {
resp1: deleteResponse(0, 5), resp1: deleteResponse(0, 5),
@ -491,12 +501,72 @@ func TestModelResponseMatch(t *testing.T) {
}, },
{ {
resp1: compareRevisionAndPutResponse(true, 7), resp1: compareRevisionAndPutResponse(true, 7),
resp2: partialResponse(0), resp2: partialResponse(4),
expectMatch: false,
},
{
resp1: compareRevisionAndPutResponse(false, 7),
resp2: partialResponse(3),
expectMatch: false, expectMatch: false,
}, },
{ {
resp1: compareRevisionAndPutResponse(false, 7), resp1: compareRevisionAndPutResponse(false, 7),
resp2: partialResponse(0), resp2: partialResponse(0),
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: 1, Txn: &TxnResponse{Failure: false, Results: []EtcdOperationResult{{Deleted: 1}}}}},
resp2: failedResponse(errors.New("failed request")),
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request 1")),
resp2: failedResponse(errors.New("failed request 2")),
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: failedResponse(errors.New("failed request")),
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true},
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: putResponse(2),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 3},
expectMatch: false,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: MaybeEtcdResponse{Persisted: true},
expectMatch: true,
},
{
resp1: failedResponse(errors.New("failed request")),
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: true,
},
{
resp1: MaybeEtcdResponse{Persisted: true, PersistedRevision: 1},
resp2: MaybeEtcdResponse{Persisted: true, PersistedRevision: 2},
expectMatch: false, expectMatch: false,
}, },
} }

View File

@ -83,7 +83,7 @@ func filterSerializableOperations(clients []report.ClientReport) []porcupine.Ope
} }
func validateSerializableRead(lg *zap.Logger, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) error { func validateSerializableRead(lg *zap.Logger, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) error {
if response.PartialResponse || response.Error != "" { if response.Persisted || response.Error != "" {
return nil return nil
} }
state, err := replay.StateForRevision(request.Range.Revision) state, err := replay.StateForRevision(request.Range.Revision)

View File

@ -76,6 +76,7 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
newOperations = append(newOperations, op) newOperations = append(newOperations, op)
continue continue
} }
var resourceVersion int64
if op.Call <= lastObservedOperation.Call { if op.Call <= lastObservedOperation.Call {
matchingEvent := matchWatchEvent(request.Txn, watchEvents) matchingEvent := matchWatchEvent(request.Txn, watchEvents)
if matchingEvent != nil { if matchingEvent != nil {
@ -84,7 +85,7 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
if eventTime < op.Return { if eventTime < op.Return {
op.Return = eventTime op.Return = eventTime
} }
op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: matchingEvent.Revision}} resourceVersion = matchingEvent.Revision
} }
} }
persistedReturnTime := matchReturnTime(request, persistedOperations) persistedReturnTime := matchReturnTime(request, persistedOperations)
@ -94,9 +95,17 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve
op.Return = *persistedReturnTime op.Return = *persistedReturnTime
} }
} }
if persistedReturnTime == nil && canBeDiscarded(request.Txn) { if isUniqueTxn(request.Txn) {
if persistedReturnTime == nil {
// Remove non persisted operations // Remove non persisted operations
continue continue
} else {
if resourceVersion != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: resourceVersion}
} else {
op.Output = model.MaybeEtcdResponse{Persisted: true}
}
}
} }
// Leave operation as it is as we cannot discard it. // Leave operation as it is as we cannot discard it.
newOperations = append(newOperations, op) newOperations = append(newOperations, op)
@ -137,12 +146,8 @@ func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]clie
return nil return nil
} }
func canBeDiscarded(request *model.TxnRequest) bool { func isUniqueTxn(request *model.TxnRequest) bool {
return operationsCanBeDiscarded(request.OperationsOnSuccess) && operationsCanBeDiscarded(request.OperationsOnFailure) return (hasUniqueWriteOperation(request.OperationsOnSuccess) || !hasWriteOperation(request.OperationsOnSuccess)) && (hasUniqueWriteOperation(request.OperationsOnFailure) || !hasWriteOperation(request.OperationsOnFailure))
}
func operationsCanBeDiscarded(ops []model.EtcdOperation) bool {
return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops)
} }
func hasWriteOperation(ops []model.EtcdOperation) bool { func hasWriteOperation(ops []model.EtcdOperation) bool {

View File

@ -67,7 +67,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key", "value"), putRequest("key", "value"),
}, },
expectedRemainingOperations: []porcupine.Operation{ expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
}, },
}, },
{ {
@ -81,7 +81,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key2", "value"), putRequest("key2", "value"),
}, },
expectedRemainingOperations: []porcupine.Operation{ expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})}, {Return: 4, Output: putResponse(model.EtcdOperationResult{})},
}, },
}, },
@ -95,7 +95,7 @@ func TestPatchHistory(t *testing.T) {
}, },
watchOperations: watchPutEvent("key", "value", 2, 3), watchOperations: watchPutEvent("key", "value", 2, 3),
expectedRemainingOperations: []porcupine.Operation{ expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: 2}}}, {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true, PersistedRevision: 2}},
}, },
}, },
{ {
@ -133,7 +133,7 @@ func TestPatchHistory(t *testing.T) {
putRequestWithLease("key", "value", 123), putRequestWithLease("key", "value", 123),
}, },
expectedRemainingOperations: []porcupine.Operation{ expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
}, },
}, },
{ {
@ -147,7 +147,7 @@ func TestPatchHistory(t *testing.T) {
putRequestWithLease("key2", "value", 234), putRequestWithLease("key2", "value", 234),
}, },
expectedRemainingOperations: []porcupine.Operation{ expectedRemainingOperations: []porcupine.Operation{
{Return: 3, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 3, Output: model.MaybeEtcdResponse{Persisted: true}},
{Return: 4, Output: putResponse(model.EtcdOperationResult{})}, {Return: 4, Output: putResponse(model.EtcdOperationResult{})},
}, },
}, },
@ -212,7 +212,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key", "value"), putRequest("key", "value"),
}, },
expectedRemainingOperations: []porcupine.Operation{ expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
}, },
}, },
{ {
@ -267,7 +267,7 @@ func TestPatchHistory(t *testing.T) {
putRequest("key", "value"), putRequest("key", "value"),
}, },
expectedRemainingOperations: []porcupine.Operation{ expectedRemainingOperations: []porcupine.Operation{
{Return: 1000000000, Output: model.MaybeEtcdResponse{Error: "failed"}}, {Return: 1000000000, Output: model.MaybeEtcdResponse{Persisted: true}},
}, },
}, },
{ {