diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index f4d45f045..d21ff8828 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -22,14 +22,16 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/linearizability/identity" + "go.etcd.io/etcd/tests/v3/linearizability/model" ) type recordingClient struct { client clientv3.Client - history *appendableHistory + history *model.AppendableHistory } -func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) { +func NewClient(endpoints []string, ids identity.Provider) (*recordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, Logger: zap.NewNop(), @@ -41,7 +43,7 @@ func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) { } return &recordingClient{ client: *cc, - history: newAppendableHistory(ids), + history: model.NewAppendableHistory(ids), }, nil } diff --git a/tests/linearizability/id.go b/tests/linearizability/identity/id.go similarity index 91% rename from tests/linearizability/id.go rename to tests/linearizability/identity/id.go index 4e8fa3817..31f57ccc1 100644 --- a/tests/linearizability/id.go +++ b/tests/linearizability/identity/id.go @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package linearizability +package identity import "sync/atomic" -type idProvider interface { +type Provider interface { ClientId() int RequestId() int } -func newIdProvider() idProvider { +func NewIdProvider() Provider { return &atomicProvider{} } diff --git a/tests/linearizability/lease_ids.go b/tests/linearizability/identity/lease_ids.go similarity index 91% rename from tests/linearizability/lease_ids.go rename to tests/linearizability/identity/lease_ids.go index 0a15da793..23eeb5d90 100644 --- a/tests/linearizability/lease_ids.go +++ b/tests/linearizability/identity/lease_ids.go @@ -12,19 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -package linearizability +package identity import ( "sync" ) -type clientId2LeaseIdMapper interface { +type LeaseIdStorage interface { LeaseId(int) int64 AddLeaseId(int, int64) RemoveLeaseId(int) } -func newClientId2LeaseIdMapper() clientId2LeaseIdMapper { +func NewLeaseIdStorage() LeaseIdStorage { return &atomicClientId2LeaseIdMapper{m: map[int]int64{}} } diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index bf1e1fef5..20a6a66ac 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -31,6 +31,8 @@ import ( "golang.org/x/time/rate" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/linearizability/identity" + "go.etcd.io/etcd/tests/v3/linearizability/model" ) const ( @@ -139,14 +141,14 @@ func testLinearizability(ctx context.Context, t *testing.T, clus *e2e.EtcdProces func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEvents []watchEvent) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) - persisted := map[EtcdOperation]watchEvent{} + persisted := map[model.EtcdOperation]watchEvent{} for _, op := range watchEvents { persisted[op.Op] = op } lastObservedOperation := lastOperationObservedInWatch(operations, persisted) for _, op := range operations { - resp := op.Output.(EtcdResponse) + resp := op.Output.(model.EtcdResponse) if resp.Err == nil || op.Call > lastObservedOperation.Call { // No need to patch successfully requests and cannot patch requests outside observed window. newOperations = append(newOperations, op) @@ -156,7 +158,7 @@ func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEve if event != nil { // Set revision and time based on watchEvent. op.Return = event.Time.UnixNano() - op.Output = EtcdResponse{ + op.Output = model.EtcdResponse{ Revision: event.Revision, ResultUnknown: true, } @@ -173,7 +175,7 @@ func patchOperationBasedOnWatchEvents(operations []porcupine.Operation, watchEve return newOperations } -func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[EtcdOperation]watchEvent) porcupine.Operation { +func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.EtcdOperation]watchEvent) porcupine.Operation { var maxCallTime int64 var lastOperation porcupine.Operation for _, op := range operations { @@ -186,17 +188,17 @@ func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents return lastOperation } -func matchWatchEvent(op porcupine.Operation, watchEvents map[EtcdOperation]watchEvent) (event *watchEvent, hasUniqueWriteOperation bool) { - request := op.Input.(EtcdRequest) +func matchWatchEvent(op porcupine.Operation, watchEvents map[model.EtcdOperation]watchEvent) (event *watchEvent, hasUniqueWriteOperation bool) { + request := op.Input.(model.EtcdRequest) for _, etcdOp := range request.Ops { - if isWrite(etcdOp.Type) && inUnique(etcdOp.Type) { + if model.IsWrite(etcdOp.Type) && model.IsUnique(etcdOp.Type) { // We expect all put to be unique as they write unique value. hasUniqueWriteOperation = true opType := etcdOp.Type - if opType == PutWithLease { - opType = Put + if opType == model.PutWithLease { + opType = model.Put } - event, ok := watchEvents[EtcdOperation{ + event, ok := watchEvents[model.EtcdOperation{ Type: opType, Key: etcdOp.Key, Value: etcdOp.Value, @@ -210,9 +212,9 @@ func matchWatchEvent(op porcupine.Operation, watchEvents map[EtcdOperation]watch } func hasWriteOperation(op porcupine.Operation) bool { - request := op.Input.(EtcdRequest) + request := op.Input.(model.EtcdRequest) for _, etcdOp := range request.Ops { - if isWrite(etcdOp.Type) { + if model.IsWrite(etcdOp.Type) { return true } } @@ -255,9 +257,9 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu mux := sync.Mutex{} endpoints := clus.EndpointsV3() - ids := newIdProvider() - lm := newClientId2LeaseIdMapper() - h := history{} + ids := identity.NewIdProvider() + lm := identity.NewLeaseIdStorage() + h := model.History{} limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) startTime := time.Now() @@ -269,15 +271,15 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu if err != nil { t.Fatal(err) } - go func(c *recordingClient) { + go func(c *recordingClient, clientId int) { defer wg.Done() defer c.Close() - config.traffic.Run(ctx, c, limiter, ids, lm) + config.traffic.Run(ctx, clientId, c, limiter, ids, lm) mux.Lock() - h = h.Merge(c.history.history) + h = h.Merge(c.history.History) mux.Unlock() - }(c) + }(c, i) } wg.Wait() endTime := time.Now() @@ -322,7 +324,7 @@ func checkOperationsAndPersistResults(t *testing.T, operations []porcupine.Opera t.Error(err) } - linearizable, info := porcupine.CheckOperationsVerbose(etcdModel, operations, 0) + linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 0) if linearizable != porcupine.Ok { t.Error("Model is not linearizable") persistMemberDataDir(t, clus, path) @@ -330,7 +332,7 @@ func checkOperationsAndPersistResults(t *testing.T, operations []porcupine.Opera visualizationPath := filepath.Join(path, "history.html") t.Logf("saving visualization to %q", visualizationPath) - err = porcupine.VisualizePath(etcdModel, info, visualizationPath) + err = porcupine.VisualizePath(model.Etcd, info, visualizationPath) if err != nil { t.Errorf("Failed to visualize, err: %v", err) } diff --git a/tests/linearizability/history.go b/tests/linearizability/model/history.go similarity index 89% rename from tests/linearizability/history.go rename to tests/linearizability/model/history.go index 6a3fde13b..688bbc762 100644 --- a/tests/linearizability/history.go +++ b/tests/linearizability/model/history.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package linearizability +package model import ( "time" @@ -20,28 +20,29 @@ import ( "github.com/anishathalye/porcupine" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/linearizability/identity" ) -type appendableHistory struct { +type AppendableHistory struct { // id of the next write operation. If needed a new id might be requested from idProvider. id int - idProvider idProvider + idProvider identity.Provider - history + History } -func newAppendableHistory(ids idProvider) *appendableHistory { - return &appendableHistory{ +func NewAppendableHistory(ids identity.Provider) *AppendableHistory { + return &AppendableHistory{ id: ids.ClientId(), idProvider: ids, - history: history{ + History: History{ successful: []porcupine.Operation{}, failed: []porcupine.Operation{}, }, } } -func (h *appendableHistory) AppendGet(key string, start, end time.Time, resp *clientv3.GetResponse) { +func (h *AppendableHistory) AppendGet(key string, start, end time.Time, resp *clientv3.GetResponse) { var readData string if len(resp.Kvs) == 1 { readData = string(resp.Kvs[0].Value) @@ -59,7 +60,7 @@ func (h *appendableHistory) AppendGet(key string, start, end time.Time, resp *cl }) } -func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, resp *clientv3.PutResponse, err error) { +func (h *AppendableHistory) AppendPut(key, value string, start, end time.Time, resp *clientv3.PutResponse, err error) { request := putRequest(key, value) if err != nil { h.appendFailed(request, start, err) @@ -78,7 +79,7 @@ func (h *appendableHistory) AppendPut(key, value string, start, end time.Time, r }) } -func (h *appendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Time, resp *clientv3.PutResponse, err error) { +func (h *AppendableHistory) AppendPutWithLease(key, value string, leaseID int64, start, end time.Time, resp *clientv3.PutResponse, err error) { request := putWithLeaseRequest(key, value, leaseID) if err != nil { h.appendFailed(request, start, err) @@ -97,7 +98,7 @@ func (h *appendableHistory) AppendPutWithLease(key, value string, leaseID int64, }) } -func (h *appendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv3.LeaseGrantResponse, err error) { +func (h *AppendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv3.LeaseGrantResponse, err error) { var leaseID int64 if resp != nil { leaseID = int64(resp.ID) @@ -120,7 +121,7 @@ func (h *appendableHistory) AppendLeaseGrant(start, end time.Time, resp *clientv }) } -func (h *appendableHistory) AppendLeaseRevoke(id int64, start time.Time, end time.Time, resp *clientv3.LeaseRevokeResponse, err error) { +func (h *AppendableHistory) AppendLeaseRevoke(id int64, start time.Time, end time.Time, resp *clientv3.LeaseRevokeResponse, err error) { request := leaseRevokeRequest(id) if err != nil { h.appendFailed(request, start, err) @@ -139,7 +140,7 @@ func (h *appendableHistory) AppendLeaseRevoke(id int64, start time.Time, end tim }) } -func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) { +func (h *AppendableHistory) AppendDelete(key string, start, end time.Time, resp *clientv3.DeleteResponse, err error) { request := deleteRequest(key) if err != nil { h.appendFailed(request, start, err) @@ -160,7 +161,7 @@ func (h *appendableHistory) AppendDelete(key string, start, end time.Time, resp }) } -func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) { +func (h *AppendableHistory) AppendTxn(key, expectValue, newValue string, start, end time.Time, resp *clientv3.TxnResponse, err error) { request := txnRequest(key, expectValue, newValue) if err != nil { h.appendFailed(request, start, err) @@ -179,7 +180,7 @@ func (h *appendableHistory) AppendTxn(key, expectValue, newValue string, start, }) } -func (h *appendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) { +func (h *AppendableHistory) appendFailed(request EtcdRequest, start time.Time, err error) { h.failed = append(h.failed, porcupine.Operation{ ClientId: h.id, Input: request, @@ -256,15 +257,15 @@ func leaseRevokeResponse(revision int64) EtcdResponse { return EtcdResponse{OpsResult: []EtcdOperationResult{{}}, Revision: revision} } -type history struct { +type History struct { successful []porcupine.Operation // failed requests are kept separate as we don't know return time of failed operations. // Based on https://github.com/anishathalye/porcupine/issues/10 failed []porcupine.Operation } -func (h history) Merge(h2 history) history { - result := history{ +func (h History) Merge(h2 History) History { + result := History{ successful: make([]porcupine.Operation, 0, len(h.successful)+len(h2.successful)), failed: make([]porcupine.Operation, 0, len(h.failed)+len(h2.failed)), } @@ -275,7 +276,7 @@ func (h history) Merge(h2 history) history { return result } -func (h history) Operations() []porcupine.Operation { +func (h History) Operations() []porcupine.Operation { operations := make([]porcupine.Operation, 0, len(h.successful)+len(h.failed)) var maxTime int64 for _, op := range h.successful { diff --git a/tests/linearizability/model.go b/tests/linearizability/model/model.go similarity index 98% rename from tests/linearizability/model.go rename to tests/linearizability/model/model.go index 87cb039a5..adc0aea23 100644 --- a/tests/linearizability/model.go +++ b/tests/linearizability/model/model.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package linearizability +package model import ( "encoding/json" @@ -35,11 +35,33 @@ const ( LeaseRevoke OperationType = "leaseRevoke" ) -func isWrite(t OperationType) bool { +var Etcd = porcupine.Model{ + Init: func() interface{} { + return "[]" // empty PossibleStates + }, + Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { + var states PossibleStates + err := json.Unmarshal([]byte(st.(string)), &states) + if err != nil { + panic(err) + } + ok, states := step(states, in.(EtcdRequest), out.(EtcdResponse)) + data, err := json.Marshal(states) + if err != nil { + panic(err) + } + return ok, string(data) + }, + DescribeOperation: func(in, out interface{}) string { + return describeEtcdRequestResponse(in.(EtcdRequest), out.(EtcdResponse)) + }, +} + +func IsWrite(t OperationType) bool { return t == Put || t == Delete || t == PutWithLease || t == LeaseRevoke || t == LeaseGrant } -func inUnique(t OperationType) bool { +func IsUnique(t OperationType) bool { return t == Put || t == PutWithLease } @@ -92,28 +114,6 @@ type EtcdState struct { Leases map[int64]EtcdLease } -var etcdModel = porcupine.Model{ - Init: func() interface{} { - return "[]" // empty PossibleStates - }, - Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { - var states PossibleStates - err := json.Unmarshal([]byte(st.(string)), &states) - if err != nil { - panic(err) - } - ok, states := step(states, in.(EtcdRequest), out.(EtcdResponse)) - data, err := json.Marshal(states) - if err != nil { - panic(err) - } - return ok, string(data) - }, - DescribeOperation: func(in, out interface{}) string { - return describeEtcdRequestResponse(in.(EtcdRequest), out.(EtcdResponse)) - }, -} - func describeEtcdRequestResponse(request EtcdRequest, response EtcdResponse) string { prefix := describeEtcdOperations(request.Ops) if len(request.Conds) != 0 { diff --git a/tests/linearizability/model_test.go b/tests/linearizability/model/model_test.go similarity index 99% rename from tests/linearizability/model_test.go rename to tests/linearizability/model/model_test.go index 6ca882ce1..175e3998b 100644 --- a/tests/linearizability/model_test.go +++ b/tests/linearizability/model/model_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package linearizability +package model import ( "errors" @@ -521,12 +521,12 @@ func TestModelStep(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - state := etcdModel.Init() + state := Etcd.Init() for _, op := range tc.operations { - ok, newState := etcdModel.Step(state, op.req, op.resp) + ok, newState := Etcd.Step(state, op.req, op.resp) if ok != !op.failure { t.Logf("state: %v", state) - t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, etcdModel.DescribeOperation(op.req, op.resp)) + t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, Etcd.DescribeOperation(op.req, op.resp)) } if ok { state = newState @@ -601,7 +601,7 @@ func TestModelDescribe(t *testing.T) { }, } for _, tc := range tcs { - assert.Equal(t, tc.expectDescribe, etcdModel.DescribeOperation(tc.req, tc.resp)) + assert.Equal(t, tc.expectDescribe, Etcd.DescribeOperation(tc.req, tc.resp)) } } diff --git a/tests/linearizability/traffic.go b/tests/linearizability/traffic.go index aa1386ed0..761480251 100644 --- a/tests/linearizability/traffic.go +++ b/tests/linearizability/traffic.go @@ -23,16 +23,18 @@ import ( "golang.org/x/time/rate" "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/tests/v3/linearizability/identity" + "go.etcd.io/etcd/tests/v3/linearizability/model" ) var ( DefaultLeaseTTL int64 = 7200 RequestTimeout = 40 * time.Millisecond - DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []opChance{{operation: Put, chance: 50}, {operation: Delete, chance: 10}, {operation: PutWithLease, chance: 10}, {operation: LeaseRevoke, chance: 10}, {operation: Txn, chance: 20}}} + DefaultTraffic Traffic = readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []opChance{{operation: model.Put, chance: 50}, {operation: model.Delete, chance: 10}, {operation: model.PutWithLease, chance: 10}, {operation: model.LeaseRevoke, chance: 10}, {operation: model.Txn, chance: 20}}} ) type Traffic interface { - Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper) + Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) } type readWriteSingleKey struct { @@ -42,11 +44,11 @@ type readWriteSingleKey struct { } type opChance struct { - operation OperationType + operation model.OperationType chance int } -func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider, lm clientId2LeaseIdMapper) { +func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) { for { select { @@ -61,7 +63,7 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter continue } // Provide each write with unique id to make it easier to validate operation history. - t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, c.history.id, resp) + t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), lm, clientId, resp) } } @@ -75,22 +77,22 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite return resp, err } -func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm clientId2LeaseIdMapper, cid int, lastValues []*mvccpb.KeyValue) error { +func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) var err error switch t.pickWriteOperation() { - case Put: + case model.Put: err = c.Put(writeCtx, key, newValue) - case Delete: + case model.Delete: err = c.Delete(writeCtx, key) - case Txn: + case model.Txn: var expectValue string if len(lastValues) != 0 { expectValue = string(lastValues[0].Value) } err = c.Txn(writeCtx, key, expectValue, newValue) - case PutWithLease: + case model.PutWithLease: leaseId := lm.LeaseId(cid) if leaseId == 0 { leaseId, err = c.LeaseGrant(writeCtx, t.leaseTTL) @@ -104,7 +106,7 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit err = c.PutWithLease(putCtx, key, newValue, leaseId) putCancel() } - case LeaseRevoke: + case model.LeaseRevoke: leaseId := lm.LeaseId(cid) if leaseId != 0 { err = c.LeaseRevoke(writeCtx, leaseId) @@ -123,7 +125,7 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit return err } -func (t readWriteSingleKey) pickWriteOperation() OperationType { +func (t readWriteSingleKey) pickWriteOperation() model.OperationType { sum := 0 for _, op := range t.writes { sum += op.chance diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index 28fa017bb..3a7f30d5c 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -25,6 +25,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/linearizability/model" ) func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) [][]watchEvent { @@ -69,17 +70,17 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli lastRevision = resp.Header.Revision time := time.Now() for _, event := range resp.Events { - var op OperationType + var op model.OperationType switch event.Type { case mvccpb.PUT: - op = Put + op = model.Put case mvccpb.DELETE: - op = Delete + op = model.Delete } events = append(events, watchEvent{ Time: time, Revision: event.Kv.ModRevision, - Op: EtcdOperation{ + Op: model.EtcdOperation{ Type: op, Key: string(event.Kv.Key), Value: string(event.Kv.Value), @@ -94,7 +95,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli } type watchEvent struct { - Op EtcdOperation + Op model.EtcdOperation Revision int64 Time time.Time }