Implement Compaction support in robustness test

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2024-04-21 15:08:05 +02:00
parent 2ffaf5fba4
commit 5959110f4a
13 changed files with 178 additions and 57 deletions

View File

@ -193,9 +193,13 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
return resp, err
}
func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()

View File

@ -78,12 +78,22 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et
}
_, err = cc.Compact(ctx, rev)
if err != nil && !connectionError(err) {
return nil, err
return nil, fmt.Errorf("failed to compact: %w", err)
}
return []report.ClientReport{cc.Report()}, nil
}
func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool {
// Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy.
// TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause.
if config.PeerProxy {
return false
}
// For multiBatchCompaction we need to guarantee that there are enough revisions between two compaction requests.
// With addition of compaction requests to traffic this might be hard if experimental-compaction-batch-limit is too high.
if t.multiBatchCompaction {
return config.ServerConfig.ExperimentalCompactionBatchLimit <= 10
}
return true
}

View File

@ -25,6 +25,9 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
if response.Error != "" {
return fmt.Sprintf("err: %q", response.Error)
}
if response.ClientError != "" {
return fmt.Sprintf("err: %q", response.ClientError)
}
if response.PartialResponse {
return fmt.Sprintf("unknown, rev: %d", response.Revision)
}
@ -38,6 +41,8 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return "ok"
}
return fmt.Sprintf("ok, rev: %d", response.Revision)
case Compact:
return "ok"
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
@ -67,6 +72,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}

View File

@ -24,6 +24,8 @@ import (
"sort"
"github.com/anishathalye/porcupine"
"go.etcd.io/etcd/server/v3/storage/mvcc"
)
// DeterministicModel assumes a deterministic execution of etcd requests. All
@ -65,10 +67,11 @@ var DeterministicModel = porcupine.Model{
}
type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}
func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
@ -77,7 +80,10 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd
}
func (s EtcdState) DeepCopy() EtcdState {
newState := EtcdState{Revision: s.Revision}
newState := EtcdState{
Revision: s.Revision,
CompactRevision: s.CompactRevision,
}
newState.KeyValues = maps.Clone(s.KeyValues)
newState.KeyLeases = maps.Clone(s.KeyLeases)
@ -92,10 +98,12 @@ func (s EtcdState) DeepCopy() EtcdState {
func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
Revision: 1,
// Start from CompactRevision equal -1 as etcd allows client to compact revision 0 for some reason.
CompactRevision: -1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
}
@ -112,6 +120,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
if request.Range.Revision > newState.Revision {
return newState, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
}
if request.Range.Revision < newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}}
case Txn:
failure := false
@ -190,6 +201,14 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: newState.Revision}}
case Compact:
if request.Compact.Revision <= newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
newState.CompactRevision = request.Compact.Revision
// Set fake revision as compaction returns non-linearizable revision.
// TODO: Model non-linearizable response revision in model.
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: -1}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
@ -249,6 +268,7 @@ const (
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Compact RequestType = "compact"
)
type EtcdRequest struct {
@ -258,6 +278,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}
func (r *EtcdRequest) IsRead() bool {
@ -349,6 +370,8 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
ClientError string
Revision int64
}
@ -417,3 +440,10 @@ func ToValueOrHash(value string) ValueOrHash {
}
return v
}
type CompactResponse struct {
}
type CompactRequest struct {
Revision int64
}

View File

@ -16,6 +16,7 @@ package model
import (
"fmt"
"strings"
"time"
"github.com/anishathalye/porcupine"
@ -23,6 +24,7 @@ import (
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/tests/v3/robustness/identity"
)
@ -259,6 +261,23 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli
h.appendSuccessful(request, start, end, defragmentResponse(revision))
}
func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
if strings.Contains(err.Error(), mvcc.ErrCompacted.Error()) {
h.appendSuccessful(request, start, end, MaybeEtcdResponse{
EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()},
})
return
}
h.appendFailed(request, start, end, err)
return
}
// Set fake revision as compaction returns non-linearizable revision.
// TODO: Model non-linearizable response revision in model.
h.appendSuccessful(request, start, end, compactResponse(-1))
}
func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) {
op := porcupine.Operation{
ClientId: h.streamID,
@ -444,6 +463,14 @@ func defragmentResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: revision}}
}
func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}
func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}
type History struct {
operations []porcupine.Operation
}

View File

@ -26,6 +26,12 @@ func WithSnapshotCount(input ...uint64) e2e.EPClusterOption {
}
}
func WithCompactionBatchLimit(input ...int) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) {
c.ServerConfig.ExperimentalCompactionBatchLimit = input[internalRand.Intn(len(input))]
}
}
func WithSnapshotCatchUpEntries(input ...uint64) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) {
c.ServerConfig.SnapshotCatchUpEntries = input[internalRand.Intn(len(input))]

View File

@ -184,7 +184,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.Compaction != nil:
return nil, nil
request := model.EtcdRequest{
Type: model.Compact,
Compact: &model.CompactRequest{Revision: raftReq.Compaction.Revision},
}
return &request, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{
Conditions: []model.EtcdCondition{},

View File

@ -91,7 +91,8 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
options.WithSnapshotCount(50, 100, 1000),
options.WithSubsetOptions(randomizableOptions...),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100),
// Set low minimal compaction batch limit to allow for triggering multi batch compaction failpoints.
options.WithCompactionBatchLimit(10, 100, 1000),
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
}

View File

@ -38,13 +38,14 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 23},
{choice: LargePut, weight: 2},
{choice: Delete, weight: 5},
{choice: MultiOpTxn, weight: 5},
{choice: PutWithLease, weight: 5},
{choice: LeaseRevoke, weight: 5},
{choice: CompareAndSet, weight: 5},
{choice: Put, weight: 15},
{choice: LargePut, weight: 5},
{choice: Compact, weight: 5},
},
}
EtcdPut = etcdTraffic{
@ -56,9 +57,10 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 40},
{choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5},
{choice: Put, weight: 35},
{choice: Compact, weight: 5},
},
}
)
@ -89,6 +91,7 @@ const (
LeaseRevoke etcdRequestType = "leaseRevoke"
CompareAndSet etcdRequestType = "compareAndSet"
Defragment etcdRequestType = "defragment"
Compact etcdRequestType = "compact"
)
func (t etcdTraffic) Name() string {
@ -266,6 +269,12 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if resp != nil {
rev = resp.Header.Revision
}
case Compact:
var resp *clientv3.CompactResponse
resp, err = c.client.Compact(opCtx, lastRev)
if resp != nil {
rev = resp.Header.Revision
}
default:
panic("invalid choice")
}

View File

@ -37,9 +37,10 @@ var (
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 90},
{choice: KubernetesUpdate, weight: 85},
{choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5},
{choice: KubernetesCompact, weight: 5},
},
}
)
@ -168,6 +169,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
case KubernetesCompact:
err = kc.Compact(writeCtx, rev)
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
@ -213,9 +216,10 @@ func (t kubernetesTraffic) generateKey() string {
type KubernetesRequestType string
const (
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesCompact KubernetesRequestType = "compact"
)
type kubernetesClient struct {
@ -254,6 +258,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error {
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
}
func (k kubernetesClient) Compact(ctx context.Context, rev int64) error {
_, err := k.client.Compact(ctx, rev)
return err
}
// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {

View File

@ -193,6 +193,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
}
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
@ -218,6 +219,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}

View File

@ -49,11 +49,9 @@ func TestDataReports(t *testing.T) {
}
visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, persistedRequests, 5*time.Minute)
if t.Failed() {
err := visualize(filepath.Join(path, "history.html"))
if err != nil {
t.Fatal(err)
}
err = visualize(filepath.Join(path, "history.html"))
if err != nil {
t.Fatal(err)
}
})
}

View File

@ -67,45 +67,59 @@ type watchConfig struct {
// watchUntilRevision watches all changes until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
func watchUntilRevision(ctx context.Context, t *testing.T, c *client.RecordingClient, maxRevisionChan <-chan int64, cfg watchConfig) {
var maxRevision int64
var lastRevision int64
var lastRevision int64 = 1
ctx, cancel := context.WithCancel(ctx)
defer cancel()
watch := c.Watch(ctx, "", 1, true, true, false)
resetWatch:
for {
select {
case <-ctx.Done():
if maxRevision == 0 {
t.Errorf("Client didn't collect all events, max revision not set")
}
if lastRevision < maxRevision {
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", lastRevision, maxRevision)
}
return
case revision, ok := <-maxRevisionChan:
if ok {
maxRevision = revision
if lastRevision >= maxRevision {
cancel()
}
} else {
// Only cancel if maxRevision was never set.
watch := c.Watch(ctx, "", lastRevision+1, true, true, false)
for {
select {
case <-ctx.Done():
if maxRevision == 0 {
t.Errorf("Client didn't collect all events, max revision not set")
}
if lastRevision < maxRevision {
t.Errorf("Client didn't collect all events, revision got %d, expected: %d", lastRevision, maxRevision)
}
return
case revision, ok := <-maxRevisionChan:
if ok {
maxRevision = revision
if lastRevision >= maxRevision {
cancel()
}
} else {
// Only cancel if maxRevision was never set.
if maxRevision == 0 {
cancel()
}
}
case resp, ok := <-watch:
if !ok {
t.Logf("Watch channel closed")
continue resetWatch
}
if cfg.requestProgress {
c.RequestProgress(ctx)
}
if resp.Err() != nil {
if resp.Canceled {
if resp.CompactRevision > lastRevision {
lastRevision = resp.CompactRevision
}
continue resetWatch
}
t.Errorf("Watch stream received error, err %v", resp.Err())
}
if len(resp.Events) > 0 {
lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision
}
if maxRevision != 0 && lastRevision >= maxRevision {
cancel()
}
}
case resp := <-watch:
if cfg.requestProgress {
c.RequestProgress(ctx)
}
if resp.Err() != nil && !resp.Canceled {
t.Errorf("Watch stream received error, err %v", resp.Err())
}
if len(resp.Events) > 0 {
lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision
}
if maxRevision != 0 && lastRevision >= maxRevision {
cancel()
}
}
}
}