Utilize WAL to patch operation history

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2023-10-15 11:39:35 +02:00
parent 452445e2d8
commit 569693be8d
6 changed files with 528 additions and 120 deletions

View File

@ -64,21 +64,21 @@ func TestRobustnessRegression(t *testing.T) {
}
func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testScenario) {
report := report.TestReport{Logger: lg}
r := report.TestReport{Logger: lg}
var err error
report.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
r.Cluster, err = e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&s.cluster))
if err != nil {
t.Fatal(err)
}
defer report.Cluster.Close()
defer r.Cluster.Close()
if s.failpoint == nil {
s.failpoint, err = failpoint.PickRandom(report.Cluster)
s.failpoint, err = failpoint.PickRandom(r.Cluster)
if err != nil {
t.Fatal(err)
}
}
err = failpoint.Validate(report.Cluster, s.failpoint)
err = failpoint.Validate(r.Cluster, s.failpoint)
if err != nil {
t.Fatal(err)
}
@ -88,15 +88,19 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
// Refer to: https://github.com/golang/go/issues/49929
panicked := true
defer func() {
report.Report(t, panicked)
r.Report(t, panicked)
}()
report.Client = s.run(ctx, t, lg, report.Cluster)
forcestopCluster(report.Cluster)
r.Client = s.run(ctx, t, lg, r.Cluster)
persistedRequests, err := report.PersistedRequestsCluster(lg, r.Cluster)
if err != nil {
t.Fatal(err)
}
forcestopCluster(r.Cluster)
watchProgressNotifyEnabled := report.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, report.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
watchProgressNotifyEnabled := r.Cluster.Cfg.ServerConfig.ExperimentalWatchProgressNotifyInterval != 0
validateGotAtLeastOneProgressNotify(t, r.Client, s.watch.requestProgress || watchProgressNotifyEnabled)
validateConfig := validate.Config{ExpectRevisionUnique: s.traffic.ExpectUniqueRevision()}
report.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, report.Client, 5*time.Minute)
r.Visualize = validate.ValidateAndReturnVisualize(t, lg, validateConfig, r.Client, persistedRequests, 5*time.Minute)
panicked = false
}

View File

@ -0,0 +1,240 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package report
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/raft/v3/raftpb"
)
func LoadClusterPersistedRequests(lg *zap.Logger, path string) ([]model.EtcdRequest, error) {
files, err := os.ReadDir(path)
if err != nil {
return nil, err
}
dataDirs := []string{}
for _, file := range files {
if file.IsDir() && strings.HasPrefix(file.Name(), "server-") {
dataDirs = append(dataDirs, filepath.Join(path, file.Name()))
}
}
return PersistedRequestsDirs(lg, dataDirs)
}
func PersistedRequestsCluster(lg *zap.Logger, cluster *e2e.EtcdProcessCluster) ([]model.EtcdRequest, error) {
dataDirs := []string{}
for _, proc := range cluster.Procs {
dataDirs = append(dataDirs, proc.Config().DataDirPath)
}
return PersistedRequestsDirs(lg, dataDirs)
}
func PersistedRequestsDirs(lg *zap.Logger, dataDirs []string) ([]model.EtcdRequest, error) {
persistedRequests := []model.EtcdRequest{}
// Allow failure in minority of etcd cluster.
// 0 failures in 1 node cluster, 1 failure in 3 node cluster
allowedFailures := len(dataDirs) / 2
for _, dir := range dataDirs {
memberRequests, err := requestsPersistedInWAL(lg, dir)
if err != nil {
if allowedFailures < 1 {
return nil, err
}
allowedFailures--
continue
}
minLength := min(len(persistedRequests), len(memberRequests))
if diff := cmp.Diff(memberRequests[:minLength], persistedRequests[:minLength]); diff != "" {
return nil, fmt.Errorf("unexpected differences between wal entries, diff:\n%s", diff)
}
if len(memberRequests) > len(persistedRequests) {
persistedRequests = memberRequests
}
}
return persistedRequests, nil
}
func requestsPersistedInWAL(lg *zap.Logger, dataDir string) ([]model.EtcdRequest, error) {
_, ents, err := ReadWAL(lg, dataDir)
if err != nil {
return nil, err
}
requests := make([]model.EtcdRequest, 0, len(ents))
for _, ent := range ents {
if ent.Type != raftpb.EntryNormal || len(ent.Data) == 0 {
continue
}
request, err := parseEntryNormal(ent)
if err != nil {
return nil, err
}
if request != nil {
requests = append(requests, *request)
}
}
return requests, nil
}
func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raftpb.Entry, err error) {
walDir := datadir.ToWALDir(dataDir)
repaired := false
for {
w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0})
if err != nil {
return state, nil, fmt.Errorf("failed to open WAL, err: %s", err)
}
_, state, ents, err = w.ReadAll()
w.Close()
if err != nil {
if errors.Is(err, wal.ErrSnapshotNotFound) {
return state, ents, nil
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
return state, nil, fmt.Errorf("failed to read WAL, cannot be repaired, err: %s", err)
}
if !wal.Repair(lg, walDir) {
return state, nil, fmt.Errorf("failed to repair WAL, err: %s", err)
}
lg.Info("repaired WAL", zap.Error(err))
repaired = true
continue
}
return state, ents, nil
}
}
func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
var raftReq pb.InternalRaftRequest
if err := raftReq.Unmarshal(ent.Data); err != nil {
var r pb.Request
isV2Entry := pbutil.MaybeUnmarshal(&r, ent.Data)
if !isV2Entry {
return nil, err
}
return nil, nil
}
switch {
case raftReq.Put != nil:
op := model.PutOptions{
Key: string(raftReq.Put.Key),
Value: model.ToValueOrHash(string(raftReq.Put.Value)),
LeaseID: raftReq.Put.Lease,
}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{Type: model.PutOperation, Put: op},
},
},
}
return &request, nil
case raftReq.DeleteRange != nil:
op := model.DeleteOptions{Key: string(raftReq.DeleteRange.Key)}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{Type: model.DeleteOperation, Delete: op},
},
},
}
return &request, nil
case raftReq.LeaseGrant != nil:
return &model.EtcdRequest{
Type: model.LeaseGrant,
LeaseGrant: &model.LeaseGrantRequest{LeaseID: raftReq.LeaseGrant.ID},
}, nil
case raftReq.ClusterMemberAttrSet != nil:
return nil, nil
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.Compaction != nil:
return nil, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{}
for _, cmp := range raftReq.Txn.Compare {
txn.Conditions = append(txn.Conditions, model.EtcdCondition{
Key: string(cmp.Key),
ExpectedRevision: cmp.GetModRevision(),
})
}
for _, op := range raftReq.Txn.Success {
txn.OperationsOnSuccess = append(txn.OperationsOnSuccess, toEtcdOperation(op))
}
for _, op := range raftReq.Txn.Failure {
txn.OperationsOnFailure = append(txn.OperationsOnFailure, toEtcdOperation(op))
}
request := model.EtcdRequest{
Type: model.Txn,
Txn: &txn,
}
return &request, nil
default:
panic(fmt.Sprintf("Unhandled raft request: %+v", raftReq))
}
}
func toEtcdOperation(op *pb.RequestOp) (operation model.EtcdOperation) {
switch {
case op.GetRequestRange() != nil:
rangeOp := op.GetRequestRange()
operation = model.EtcdOperation{
Type: model.RangeOperation,
Range: model.RangeOptions{
Start: string(rangeOp.Key),
End: string(rangeOp.RangeEnd),
Limit: rangeOp.Limit,
},
}
case op.GetRequestPut() != nil:
putOp := op.GetRequestPut()
operation = model.EtcdOperation{
Type: model.PutOperation,
Put: model.PutOptions{
Key: string(putOp.Key),
Value: model.ToValueOrHash(string(putOp.Value)),
},
}
case op.GetRequestDeleteRange() != nil:
deleteOp := op.GetRequestDeleteRange()
operation = model.EtcdOperation{
Type: model.DeleteOperation,
Delete: model.DeleteOptions{
Key: string(deleteOp.Key),
},
}
default:
panic(fmt.Sprintf("Unknown op type %v", op))
}
return operation
}

View File

@ -15,6 +15,8 @@
package validate
import (
"fmt"
"github.com/anishathalye/porcupine"
"go.etcd.io/etcd/tests/v3/robustness/model"
@ -22,10 +24,11 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)
func patchedOperationHistory(reports []report.ClientReport) []porcupine.Operation {
func patchedOperationHistory(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation {
allOperations := relevantOperations(reports)
uniqueEvents := uniqueWatchEvents(reports)
return patchOperationsWithWatchEvents(allOperations, uniqueEvents)
operationsReturnTime := persistedOperationsReturnTime(allOperations, persistedRequests)
return patchOperations(allOperations, uniqueEvents, operationsReturnTime)
}
func relevantOperations(reports []report.ClientReport) []porcupine.Operation {
@ -61,33 +64,42 @@ func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]traffic.Ti
return persisted
}
func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) []porcupine.Operation {
func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation {
newOperations := make([]porcupine.Operation, 0, len(operations))
lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents)
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error == "" || op.Call > lastObservedOperation.Call || request.Type != model.Txn {
if resp.Error == "" || request.Type != model.Txn {
// Cannot patch those requests.
newOperations = append(newOperations, op)
continue
}
event := matchWatchEvent(request.Txn, watchEvents)
if event != nil {
// Set revision and time based on watchEvent.
op.Return = event.Time.Nanoseconds()
op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: event.Revision}}
newOperations = append(newOperations, op)
if op.Call <= lastObservedOperation.Call {
matchingEvent := matchWatchEvent(request.Txn, watchEvents)
if matchingEvent != nil {
eventTime := matchingEvent.Time.Nanoseconds()
// Set revision and time based on watchEvent.
if eventTime < op.Return {
op.Return = eventTime
}
op.Output = model.MaybeEtcdResponse{PartialResponse: true, EtcdResponse: model.EtcdResponse{Revision: matchingEvent.Revision}}
}
}
persistedReturnTime := matchReturnTime(request, persistedOperations)
if persistedReturnTime != nil {
// Set return time based on persisted return time.
if *persistedReturnTime < op.Return {
op.Return = *persistedReturnTime
}
}
if persistedReturnTime == nil && canBeDiscarded(request.Txn) {
// Remove non persisted operations
continue
}
if !canBeDiscarded(request.Txn) {
// Leave operation as it is as we cannot discard it.
newOperations = append(newOperations, op)
continue
}
// Remove non persisted operations
// Leave operation as it is as we cannot discard it.
newOperations = append(newOperations, op)
}
return newOperations
}
@ -150,3 +162,102 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation) bool {
}
return false
}
func persistedOperationsReturnTime(allOperations []porcupine.Operation, persistedRequests []model.EtcdRequest) map[model.EtcdOperation]int64 {
operationReturnTime := operationReturnTime(allOperations)
persisted := map[model.EtcdOperation]int64{}
lastReturnTime := maxReturnTime(operationReturnTime)
for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
switch request.Type {
case model.Txn:
hasPut := false
lastReturnTime--
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
continue
}
if _, found := persisted[op]; found {
panic(fmt.Sprintf("Unexpected duplicate event in persisted requests. %d %+v", i, op))
}
hasPut = true
persisted[op] = lastReturnTime
}
if hasPut {
newReturnTime := requestReturnTime(operationReturnTime, request)
if newReturnTime != -1 {
lastReturnTime = min(lastReturnTime, newReturnTime)
}
}
case model.LeaseGrant:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return persisted
}
func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperation]int64 {
newOperations := map[model.EtcdOperation]int64{}
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
switch request.Type {
case model.Txn:
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
if _, found := newOperations[etcdOp]; found {
panic("Unexpected duplicate event in persisted requests.")
}
newOperations[etcdOp] = op.Return
}
case model.Range:
case model.LeaseGrant:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return newOperations
}
func maxReturnTime(operationTime map[model.EtcdOperation]int64) int64 {
var maxReturnTime int64
for _, returnTime := range operationTime {
if returnTime > maxReturnTime {
maxReturnTime = returnTime
}
}
return maxReturnTime
}
func requestReturnTime(operationTime map[model.EtcdOperation]int64, request model.EtcdRequest) int64 {
switch request.Type {
case model.Txn:
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if op.Type != model.PutOperation {
continue
}
if time, found := operationTime[op]; found {
return time
}
}
return -1
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
func matchReturnTime(request model.EtcdRequest, persistedOperations map[model.EtcdOperation]int64) *int64 {
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if etcdOp.Type != model.PutOperation {
continue
}
if returnTime, found := persistedOperations[etcdOp]; found {
return &returnTime
}
}
return nil
}

View File

@ -19,7 +19,6 @@ import (
"testing"
"time"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
@ -28,10 +27,10 @@ import (
func TestPatchHistory(t *testing.T) {
for _, tc := range []struct {
name string
historyFunc func(baseTime time.Time, h *model.AppendableHistory)
event model.Event
expectRemains bool
name string
historyFunc func(baseTime time.Time, h *model.AppendableHistory)
persistedRequest *model.EtcdRequest
expectedRemainingOperations int
}{
{
name: "successful range remains",
@ -41,7 +40,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendRange("key", "", 0, 0, start, stop, &clientv3.GetResponse{}, nil)
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "successful put remains",
@ -51,7 +50,21 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendPut("key", "value", start, stop, &clientv3.PutResponse{}, nil)
},
expectRemains: true,
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "key",
Value: model.ToValueOrHash("value"),
},
},
},
},
},
expectedRemainingOperations: 1,
},
{
name: "failed put remains if there is a matching event",
@ -61,12 +74,21 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendPut("key", "value", start, stop, nil, errors.New("failed"))
},
event: model.Event{
Type: model.PutOperation,
Key: "key",
Value: model.ToValueOrHash("value"),
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "key",
Value: model.ToValueOrHash("value"),
},
},
},
},
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed put is dropped if event has different key",
@ -75,13 +97,26 @@ func TestPatchHistory(t *testing.T) {
time.Sleep(time.Nanosecond)
stop := time.Since(baseTime)
h.AppendPut("key1", "value", start, stop, nil, errors.New("failed"))
start2 := time.Since(baseTime)
time.Sleep(time.Nanosecond)
stop2 := time.Since(baseTime)
h.AppendPut("key2", "value", start2, stop2, &clientv3.PutResponse{}, nil)
},
event: model.Event{
Type: model.PutOperation,
Key: "key2",
Value: model.ToValueOrHash("value"),
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "key2",
Value: model.ToValueOrHash("value"),
},
},
},
},
},
expectRemains: false,
expectedRemainingOperations: 1,
},
{
name: "failed put is dropped if event has different value",
@ -90,13 +125,26 @@ func TestPatchHistory(t *testing.T) {
time.Sleep(time.Nanosecond)
stop := time.Since(baseTime)
h.AppendPut("key", "value1", start, stop, nil, errors.New("failed"))
start2 := time.Since(baseTime)
time.Sleep(time.Nanosecond)
stop2 := time.Since(baseTime)
h.AppendPut("key", "value2", start2, stop2, &clientv3.PutResponse{}, nil)
},
event: model.Event{
Type: model.PutOperation,
Key: "key",
Value: model.ToValueOrHash("value2"),
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "key",
Value: model.ToValueOrHash("value2"),
},
},
},
},
},
expectRemains: false,
expectedRemainingOperations: 1,
},
{
name: "failed put with lease remains if there is a matching event",
@ -106,12 +154,22 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendPutWithLease("key", "value", 123, start, stop, nil, errors.New("failed"))
},
event: model.Event{
Type: model.PutOperation,
Key: "key",
Value: model.ToValueOrHash("value"),
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "key",
Value: model.ToValueOrHash("value"),
LeaseID: 123,
},
},
},
},
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed put is dropped",
@ -121,7 +179,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendPut("key", "value", start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
{
name: "failed put with lease is dropped",
@ -131,7 +189,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendPutWithLease("key", "value", 123, start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
{
name: "successful delete remains",
@ -141,7 +199,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendDelete("key", start, stop, &clientv3.DeleteResponse{}, nil)
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed delete remains",
@ -151,7 +209,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendDelete("key", start, stop, nil, errors.New("failed"))
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "successful empty txn remains",
@ -161,7 +219,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{}, start, stop, &clientv3.TxnResponse{}, nil)
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed empty txn is dropped",
@ -171,7 +229,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{}, start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
{
name: "failed txn put is dropped",
@ -181,7 +239,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
{
name: "failed txn put remains if there is a matching event",
@ -191,12 +249,21 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed"))
},
event: model.Event{
Type: model.PutOperation,
Key: "key",
Value: model.ToValueOrHash("value"),
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "key",
Value: model.ToValueOrHash("value"),
},
},
},
},
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed txn delete remains",
@ -206,7 +273,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpDelete("key")}, []clientv3.Op{}, start, stop, nil, errors.New("failed"))
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "successful txn put/delete remains",
@ -216,7 +283,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, &clientv3.TxnResponse{}, nil)
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed txn put/delete remains",
@ -226,7 +293,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed"))
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed txn delete/put remains",
@ -236,7 +303,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpDelete("key")}, []clientv3.Op{clientv3.OpPut("key", "value")}, start, stop, nil, errors.New("failed"))
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed txn empty/put is dropped",
@ -246,7 +313,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpPut("key", "value")}, start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
{
name: "failed txn empty/put remains if there is a matching event",
@ -256,12 +323,21 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value")}, []clientv3.Op{}, start, stop, nil, errors.New("failed"))
},
event: model.Event{
Type: model.PutOperation,
Key: "key",
Value: model.ToValueOrHash("value"),
persistedRequest: &model.EtcdRequest{
Type: model.Txn,
Txn: &model.TxnRequest{
OperationsOnSuccess: []model.EtcdOperation{
{
Type: model.PutOperation,
Put: model.PutOptions{
Key: "key",
Value: model.ToValueOrHash("value"),
},
},
},
},
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed txn empty/delete remains",
@ -271,7 +347,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed"))
},
expectRemains: true,
expectedRemainingOperations: 1,
},
{
name: "failed txn put&delete is dropped",
@ -281,7 +357,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, []clientv3.Op{}, start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
{
name: "failed txn empty/put&delete is dropped",
@ -291,7 +367,7 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{}, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
{
name: "failed txn put&delete/put&delete is dropped",
@ -301,55 +377,26 @@ func TestPatchHistory(t *testing.T) {
stop := time.Since(baseTime)
h.AppendTxn(nil, []clientv3.Op{clientv3.OpPut("key", "value1"), clientv3.OpDelete("key")}, []clientv3.Op{clientv3.OpPut("key", "value2"), clientv3.OpDelete("key")}, start, stop, nil, errors.New("failed"))
},
expectRemains: false,
expectedRemainingOperations: 0,
},
} {
t.Run(tc.name, func(t *testing.T) {
baseTime := time.Now()
history := model.NewAppendableHistory(identity.NewIDProvider())
tc.historyFunc(baseTime, history)
time.Sleep(time.Nanosecond)
start := time.Since(baseTime)
time.Sleep(time.Nanosecond)
stop := time.Since(baseTime)
history.AppendPut("tombstone", "true", start, stop, &clientv3.PutResponse{Header: &etcdserverpb.ResponseHeader{Revision: 3}}, nil)
watch := []model.WatchResponse{
{
Events: []model.WatchEvent{
{
PersistedEvent: model.PersistedEvent{
Event: tc.event, Revision: 2,
},
},
},
Revision: 2,
Time: time.Since(baseTime),
},
{
Events: []model.WatchEvent{
{
PersistedEvent: model.PersistedEvent{
Event: model.Event{
Type: model.PutOperation,
Key: "tombstone",
Value: model.ToValueOrHash("true"),
}, Revision: 3},
},
},
Revision: 3,
Time: time.Since(baseTime),
},
requests := []model.EtcdRequest{}
if tc.persistedRequest != nil {
requests = append(requests, *tc.persistedRequest)
}
operations := patchedOperationHistory([]report.ClientReport{
{
ClientID: 0,
KeyValue: history.History.Operations(),
Watch: []model.WatchOperation{{Responses: watch}},
Watch: []model.WatchOperation{},
},
})
remains := len(operations) == history.Len()
if remains != tc.expectRemains {
t.Errorf("Unexpected remains, got: %v, want: %v", remains, tc.expectRemains)
}, requests)
if len(operations) != tc.expectedRemainingOperations {
t.Errorf("Unexpected remains, got: %d, want: %d", len(operations), tc.expectedRemainingOperations)
}
})
}

View File

@ -29,12 +29,12 @@ import (
)
// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, timeout time.Duration) (visualize func(basepath string) error) {
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, persistedRequests []model.EtcdRequest, timeout time.Duration) (visualize func(basepath string) error) {
err := checkValidationAssumptions(reports)
if err != nil {
t.Fatalf("Broken validation assumptions: %s", err)
}
patchedOperations := patchedOperationHistory(reports)
patchedOperations := patchedOperationHistory(reports, persistedRequests)
linearizable, visualize := validateLinearizableOperationsAndVisualize(lg, patchedOperations, timeout)
if linearizable != porcupine.Ok {
t.Error("Failed linearization, skipping further validation")

View File

@ -37,10 +37,16 @@ func TestDataReports(t *testing.T) {
continue
}
t.Run(file.Name(), func(t *testing.T) {
lg := zaptest.NewLogger(t)
path := filepath.Join(testdataPath, file.Name())
reports, err := report.LoadClientReports(path)
assert.NoError(t, err)
visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, 5*time.Minute)
persistedRequests, err := report.LoadClusterPersistedRequests(lg, path)
if err != nil {
t.Fatal(err)
}
visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, persistedRequests, 5*time.Minute)
if t.Failed() {
err := visualize(filepath.Join(path, "history.html"))