tests/robustness: address golangci var-naming issues

Signed-off-by: Ivan Valdes <ivan@vald.es>
This commit is contained in:
Ivan Valdes 2024-03-21 22:30:09 -07:00
parent e4448c4744
commit 0976398964
No known key found for this signature in database
GPG Key ID: 4037D37741ED0CC5
15 changed files with 102 additions and 102 deletions

View File

@ -37,11 +37,11 @@ var (
type memberReplace struct{}
func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
memberId := rand.Int() % len(clus.Procs)
member := clus.Procs[memberId]
memberID := uint64(rand.Int() % len(clus.Procs))
member := clus.Procs[memberID]
var endpoints []string
for i := 1; i < len(clus.Procs); i++ {
endpoints = append(endpoints, clus.Procs[(memberId+i)%len(clus.Procs)].EndpointsGRPC()...)
endpoints = append(endpoints, clus.Procs[(int(memberID)+i)%len(clus.Procs)].EndpointsGRPC()...)
}
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
@ -93,7 +93,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
}
lg.Info("Adding member back", zap.String("member", member.Config().Name))
removedMemberPeerUrl := member.Config().PeerURL.String()
removedMemberPeerURL := member.Config().PeerURL.String()
for {
select {
case <-ctx.Done():
@ -101,7 +101,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger,
default:
}
reqCtx, cancel := context.WithTimeout(ctx, time.Second)
_, err = cc.MemberAdd(reqCtx, []string{removedMemberPeerUrl})
_, err = cc.MemberAdd(reqCtx, []string{removedMemberPeerURL})
cancel()
if err == nil {
break

View File

@ -17,32 +17,32 @@ package identity
import "sync/atomic"
type Provider interface {
// NewStreamId returns an integer starting from zero to make it render nicely by porcupine visualization.
NewStreamId() int
// NewRequestId returns unique identification used to make write requests unique.
NewRequestId() int
// NewClientId returns unique identification for client and their reports.
NewClientId() int
// NewStreamID returns an integer starting from zero to make it render nicely by porcupine visualization.
NewStreamID() int
// NewRequestID returns unique identification used to make write requests unique.
NewRequestID() int
// NewClientID returns unique identification for client and their reports.
NewClientID() int
}
func NewIdProvider() Provider {
func NewIDProvider() Provider {
return &atomicProvider{}
}
type atomicProvider struct {
streamId atomic.Int64
requestId atomic.Int64
clientId atomic.Int64
streamID atomic.Int64
requestID atomic.Int64
clientID atomic.Int64
}
func (id *atomicProvider) NewStreamId() int {
return int(id.streamId.Add(1) - 1)
func (id *atomicProvider) NewStreamID() int {
return int(id.streamID.Add(1) - 1)
}
func (id *atomicProvider) NewRequestId() int {
return int(id.requestId.Add(1))
func (id *atomicProvider) NewRequestID() int {
return int(id.requestID.Add(1))
}
func (id *atomicProvider) NewClientId() int {
return int(id.clientId.Add(1))
func (id *atomicProvider) NewClientID() int {
return int(id.clientID.Add(1))
}

View File

@ -18,36 +18,36 @@ import (
"sync"
)
type LeaseIdStorage interface {
LeaseId(int) int64
AddLeaseId(int, int64)
RemoveLeaseId(int)
type LeaseIDStorage interface {
LeaseID(int) int64
AddLeaseID(int, int64)
RemoveLeaseID(int)
}
func NewLeaseIdStorage() LeaseIdStorage {
return &atomicClientId2LeaseIdMapper{m: map[int]int64{}}
func NewLeaseIDStorage() LeaseIDStorage {
return &atomicClientID2LeaseIDMapper{m: map[int]int64{}}
}
type atomicClientId2LeaseIdMapper struct {
type atomicClientID2LeaseIDMapper struct {
sync.RWMutex
// m is used to store clientId to leaseId mapping.
m map[int]int64
}
func (lm *atomicClientId2LeaseIdMapper) LeaseId(clientId int) int64 {
func (lm *atomicClientID2LeaseIDMapper) LeaseID(clientID int) int64 {
lm.RLock()
defer lm.RUnlock()
return lm.m[clientId]
return lm.m[clientID]
}
func (lm *atomicClientId2LeaseIdMapper) AddLeaseId(clientId int, leaseId int64) {
func (lm *atomicClientID2LeaseIDMapper) AddLeaseID(clientID int, leaseID int64) {
lm.Lock()
defer lm.Unlock()
lm.m[clientId] = leaseId
lm.m[clientID] = leaseID
}
func (lm *atomicClientId2LeaseIdMapper) RemoveLeaseId(clientId int) {
func (lm *atomicClientID2LeaseIDMapper) RemoveLeaseID(clientID int) {
lm.Lock()
defer lm.Unlock()
delete(lm.m, clientId)
delete(lm.m, clientID)
}

View File

@ -109,7 +109,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
baseTime := time.Now()
ids := identity.NewIdProvider()
ids := identity.NewIDProvider()
g.Go(func() error {
defer close(finishTraffic)
err := failpoint.Inject(ctx, t, lg, clus, s.failpoint)

View File

@ -216,8 +216,8 @@ func (s EtcdState) getRange(options RangeOptions) RangeResponse {
}
func detachFromOldLease(s EtcdState, key string) EtcdState {
if oldLeaseId, ok := s.KeyLeases[key]; ok {
delete(s.Leases[oldLeaseId].Keys, key)
if oldLeaseID, ok := s.KeyLeases[key]; ok {
delete(s.Leases[oldLeaseID].Keys, key)
delete(s.KeyLeases, key)
}
return s

View File

@ -35,8 +35,8 @@ import (
// Operations time should be calculated as time.Since common base time to ensure that Go monotonic time is used.
// More in https://github.com/golang/go/blob/96add980ad27faed627f26ef1ab09e8fe45d6bd1/src/time/time.go#L10.
type AppendableHistory struct {
// streamId for the next operation. Used for porcupine.Operation.ClientId as porcupine assumes no concurrent requests.
streamId int
// streamID for the next operation. Used for porcupine.Operation.ClientId as porcupine assumes no concurrent requests.
streamID int
// If needed a new streamId is requested from idProvider.
idProvider identity.Provider
@ -45,7 +45,7 @@ type AppendableHistory struct {
func NewAppendableHistory(ids identity.Provider) *AppendableHistory {
return &AppendableHistory{
streamId: ids.NewStreamId(),
streamID: ids.NewStreamID(),
idProvider: ids,
History: History{
successful: []porcupine.Operation{},
@ -60,7 +60,7 @@ func (h *AppendableHistory) AppendRange(startKey, endKey string, revision, limit
respRevision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: staleRangeRequest(startKey, endKey, limit, revision),
Call: start.Nanoseconds(),
Output: rangeResponse(resp.Kvs, resp.Count, respRevision),
@ -79,7 +79,7 @@ func (h *AppendableHistory) AppendPut(key, value string, start, end time.Duratio
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: putResponse(revision),
@ -98,7 +98,7 @@ func (h *AppendableHistory) AppendPutWithLease(key, value string, leaseID int64,
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: putResponse(revision),
@ -121,7 +121,7 @@ func (h *AppendableHistory) AppendLeaseGrant(start, end time.Duration, resp *cli
revision = resp.ResponseHeader.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: leaseGrantResponse(revision),
@ -140,7 +140,7 @@ func (h *AppendableHistory) AppendLeaseRevoke(id int64, start, end time.Duration
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: leaseRevokeResponse(revision),
@ -161,7 +161,7 @@ func (h *AppendableHistory) AppendDelete(key string, start, end time.Duration, r
deleted = resp.Deleted
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: deleteResponse(deleted, revision),
@ -196,7 +196,7 @@ func (h *AppendableHistory) AppendTxn(cmp []clientv3.Cmp, clientOnSuccessOps, cl
results = append(results, toEtcdOperationResult(resp))
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: txnResponse(results, resp.Succeeded, revision),
@ -306,7 +306,7 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli
revision = resp.Header.Revision
}
h.appendSuccessful(porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: start.Nanoseconds(),
Output: defragmentResponse(revision),
@ -331,7 +331,7 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, call int64, err er
}
}
h.failed = append(h.failed, porcupine.Operation{
ClientId: h.streamId,
ClientId: h.streamID,
Input: request,
Call: call,
Output: failedResponse(err),
@ -339,7 +339,7 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, call int64, err er
})
// Operations of single client needs to be sequential.
// As we don't know return time of failed operations, all new writes need to be done with new stream id.
h.streamId = h.idProvider.NewStreamId()
h.streamID = h.idProvider.NewStreamID()
}
func getRequest(key string) EtcdRequest {

View File

@ -31,7 +31,7 @@ import (
)
type ClientReport struct {
ClientId int
ClientID int
KeyValue []porcupine.Operation
Watch []model.WatchOperation
}
@ -48,10 +48,10 @@ func (r ClientReport) WatchEventCount() int {
func persistClientReports(t *testing.T, lg *zap.Logger, path string, reports []ClientReport) {
sort.Slice(reports, func(i, j int) bool {
return reports[i].ClientId < reports[j].ClientId
return reports[i].ClientID < reports[j].ClientID
})
for _, r := range reports {
clientDir := filepath.Join(path, fmt.Sprintf("client-%d", r.ClientId))
clientDir := filepath.Join(path, fmt.Sprintf("client-%d", r.ClientID))
err := os.MkdirAll(clientDir, 0700)
if err != nil {
t.Fatal(err)
@ -82,12 +82,12 @@ func LoadClientReports(path string) ([]ClientReport, error) {
if err != nil {
return nil, err
}
r.ClientId = id
r.ClientID = id
reports = append(reports, r)
}
}
sort.Slice(reports, func(i, j int) bool {
return reports[i].ClientId < reports[j].ClientId
return reports[i].ClientID < reports[j].ClientID
})
return reports, nil
}
@ -145,7 +145,7 @@ func loadKeyValueOperations(path string) (operations []porcupine.Operation, err
decoder := json.NewDecoder(file)
for decoder.More() {
var operation struct {
ClientId int
ClientID int
Input model.EtcdRequest
Call int64
Output model.MaybeEtcdResponse
@ -156,7 +156,7 @@ func loadKeyValueOperations(path string) (operations []porcupine.Operation, err
return nil, fmt.Errorf("failed to decode watch operation, err: %w", err)
}
operations = append(operations, porcupine.Operation{
ClientId: operation.ClientId,
ClientId: operation.ClientID,
Input: operation.Input,
Call: operation.Call,
Output: operation.Output,

View File

@ -32,7 +32,7 @@ import (
)
func TestPersistLoadClientReports(t *testing.T) {
h := model.NewAppendableHistory(identity.NewIdProvider())
h := model.NewAppendableHistory(identity.NewIDProvider())
baseTime := time.Now()
start := time.Since(baseTime)
@ -122,12 +122,12 @@ func TestPersistLoadClientReports(t *testing.T) {
}
reports := []ClientReport{
{
ClientId: 1,
ClientID: 1,
KeyValue: h.Operations(),
Watch: []model.WatchOperation{watch},
},
{
ClientId: 2,
ClientID: 2,
KeyValue: nil,
Watch: []model.WatchOperation{watch},
},

View File

@ -62,7 +62,7 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*
return nil, err
}
return &RecordingClient{
id: ids.NewClientId(),
id: ids.NewClientID(),
client: *cc,
kvOperations: model.NewAppendableHistory(ids),
baseTime: baseTime,
@ -75,7 +75,7 @@ func (c *RecordingClient) Close() error {
func (c *RecordingClient) Report() report.ClientReport {
return report.ClientReport{
ClientId: c.id,
ClientID: c.id,
KeyValue: c.kvOperations.History.Operations(),
Watch: c.watchOperations,
}
@ -162,24 +162,24 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.
return resp, err
}
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) (*clientv3.LeaseRevokeResponse, error) {
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseID int64) (*clientv3.LeaseRevokeResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId))
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseID))
returnTime := time.Since(c.baseTime)
c.kvOperations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
c.kvOperations.AppendLeaseRevoke(leaseID, callTime, returnTime, resp, err)
return resp, err
}
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) (*clientv3.PutResponse, error) {
opts := clientv3.WithLease(clientv3.LeaseID(leaseId))
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseID int64) (*clientv3.PutResponse, error) {
opts := clientv3.WithLease(clientv3.LeaseID(leaseID))
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Put(ctx, key, value, opts)
returnTime := time.Since(c.baseTime)
c.kvOperations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err)
c.kvOperations.AppendPutWithLease(key, value, leaseID, callTime, returnTime, resp, err)
return resp, err
}

View File

@ -94,7 +94,7 @@ func (t etcdTraffic) Name() string {
return "Etcd"
}
func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
lastOperationSucceeded := true
var lastRev int64
var requestType etcdRequestType
@ -154,7 +154,7 @@ type etcdTrafficClient struct {
client *RecordingClient
limiter *rate.Limiter
idProvider identity.Provider
leaseStorage identity.LeaseIdStorage
leaseStorage identity.LeaseIDStorage
}
func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, lastRev int64) (rev int64, err error) {
@ -181,7 +181,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
}
case Put:
var resp *clientv3.PutResponse
resp, err = c.client.Put(opCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestId()))
resp, err = c.client.Put(opCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestID()))
if resp != nil {
rev = resp.Header.Revision
}
@ -215,43 +215,43 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
}
txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.TxnResponse
resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestId()))}, nil)
resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestID()))}, nil)
txnCancel()
if resp != nil {
rev = resp.Header.Revision
}
}
case PutWithLease:
leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId == 0 {
leaseID := c.leaseStorage.LeaseID(c.client.id)
if leaseID == 0 {
var resp *clientv3.LeaseGrantResponse
resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL)
if resp != nil {
leaseId = int64(resp.ID)
leaseID = int64(resp.ID)
rev = resp.ResponseHeader.Revision
}
if err == nil {
c.leaseStorage.AddLeaseId(c.client.id, leaseId)
c.leaseStorage.AddLeaseID(c.client.id, leaseID)
c.limiter.Wait(ctx)
}
}
if leaseId != 0 {
if leaseID != 0 {
putCtx, putCancel := context.WithTimeout(ctx, RequestTimeout)
var resp *clientv3.PutResponse
resp, err = c.client.PutWithLease(putCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestId()), leaseId)
resp, err = c.client.PutWithLease(putCtx, c.randomKey(), fmt.Sprintf("%d", c.idProvider.NewRequestID()), leaseID)
putCancel()
if resp != nil {
rev = resp.Header.Revision
}
}
case LeaseRevoke:
leaseId := c.leaseStorage.LeaseId(c.client.id)
if leaseId != 0 {
leaseID := c.leaseStorage.LeaseID(c.client.id)
if leaseID != 0 {
var resp *clientv3.LeaseRevokeResponse
resp, err = c.client.LeaseRevoke(opCtx, leaseId)
resp, err = c.client.LeaseRevoke(opCtx, leaseID)
//if LeaseRevoke has failed, do not remove the mapping.
if err == nil {
c.leaseStorage.RemoveLeaseId(c.client.id)
c.leaseStorage.RemoveLeaseID(c.client.id)
}
if resp != nil {
rev = resp.Header.Revision
@ -291,7 +291,7 @@ func (c etcdTrafficClient) pickMultiTxnOps() (ops []clientv3.Op) {
case model.RangeOperation:
ops = append(ops, clientv3.OpGet(key))
case model.PutOperation:
value := fmt.Sprintf("%d", c.idProvider.NewRequestId())
value := fmt.Sprintf("%d", c.idProvider.NewRequestID())
ops = append(ops, clientv3.OpPut(key, value))
case model.DeleteOperation:
ops = append(ops, clientv3.OpDelete(key))

View File

@ -58,7 +58,7 @@ func (t kubernetesTraffic) Name() string {
return "Kubernetes"
}
func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) {
kc := &kubernetesClient{client: c}
s := newStorage()
keyPrefix := "/registry/" + t.resource + "/"
@ -143,7 +143,7 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
defer cancel()
count := s.Count()
if count < t.averageKeyCount/2 {
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
} else {
key, rev := s.PickRandom()
if rev == 0 {
@ -163,9 +163,9 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
case KubernetesUpdate:
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}

View File

@ -54,7 +54,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC()
lm := identity.NewLeaseIdStorage()
lm := identity.NewLeaseIDStorage()
reports := []report.ClientReport{}
limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200)
@ -116,7 +116,7 @@ type Profile struct {
}
type Traffic interface {
Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{})
ExpectUniqueRevision() bool
Name() string
}

View File

@ -306,7 +306,7 @@ func TestPatchHistory(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
baseTime := time.Now()
history := model.NewAppendableHistory(identity.NewIdProvider())
history := model.NewAppendableHistory(identity.NewIDProvider())
tc.historyFunc(baseTime, history)
time.Sleep(time.Nanosecond)
start := time.Since(baseTime)
@ -342,7 +342,7 @@ func TestPatchHistory(t *testing.T) {
}
operations := patchedOperationHistory([]report.ClientReport{
{
ClientId: 0,
ClientID: 0,
KeyValue: history.History.Operations(),
Watch: []model.WatchOperation{{Responses: watch}},
},

View File

@ -55,29 +55,29 @@ func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEve
type revisionEvents struct {
events []model.PersistedEvent
revision int64
clientId int
clientID int
}
revisionToEvents := map[int64]revisionEvents{}
var lastClientId = 0
var lastClientID = 0
var lastRevision int64
events := []model.PersistedEvent{}
for _, r := range reports {
for _, op := range r.Watch {
for _, resp := range op.Responses {
for _, event := range resp.Events {
if event.Revision == lastRevision && lastClientId == r.ClientId {
if event.Revision == lastRevision && lastClientID == r.ClientID {
events = append(events, event.PersistedEvent)
} else {
if prev, found := revisionToEvents[lastRevision]; found {
// This assumes that there are txn that would be observed differently by two watches.
// TODO: Implement merging events from multiple watches about single revision based on operations.
if diff := cmp.Diff(prev.events, events); diff != "" {
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff)
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientID, lastClientID, lastRevision, diff)
}
} else {
revisionToEvents[lastRevision] = revisionEvents{clientId: lastClientId, events: events, revision: lastRevision}
revisionToEvents[lastRevision] = revisionEvents{clientID: lastClientID, events: events, revision: lastRevision}
}
lastClientId = r.ClientId
lastClientID = r.ClientID
lastRevision = event.Revision
events = []model.PersistedEvent{event.PersistedEvent}
}
@ -87,10 +87,10 @@ func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEve
}
if prev, found := revisionToEvents[lastRevision]; found {
if diff := cmp.Diff(prev.events, events); diff != "" {
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff)
return nil, fmt.Errorf("events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientID, lastClientID, lastRevision, diff)
}
} else {
revisionToEvents[lastRevision] = revisionEvents{clientId: lastClientId, events: events, revision: lastRevision}
revisionToEvents[lastRevision] = revisionEvents{clientID: lastClientID, events: events, revision: lastRevision}
}
var allRevisionEvents []revisionEvents

View File

@ -62,7 +62,7 @@ func validateOrdered(t *testing.T, report report.ClientReport) {
for _, resp := range op.Responses {
for _, event := range resp.Events {
if event.Revision < lastEventRevision {
t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, client: %d", lastEventRevision, event.Revision, report.ClientId)
t.Errorf("Broke watch guarantee: Ordered - events are ordered by revision; an event will never appear on a watch if it precedes an event in time that has already been posted, lastRevision: %d, currentRevision: %d, client: %d", lastEventRevision, event.Revision, report.ClientID)
}
lastEventRevision = event.Revision
}
@ -85,7 +85,7 @@ func validateUnique(t *testing.T, expectUniqueRevision bool, report report.Clien
}{event.Revision, event.Key}
}
if _, found := uniqueOperations[key]; found {
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Key, event.Revision, report.ClientId)
t.Errorf("Broke watch guarantee: Unique - an event will never appear on a watch twice, key: %q, revision: %d, client: %d", event.Key, event.Revision, report.ClientID)
}
uniqueOperations[key] = struct{}{}
}
@ -99,7 +99,7 @@ func validateAtomic(t *testing.T, report report.ClientReport) {
for _, resp := range op.Responses {
if len(resp.Events) > 0 {
if resp.Events[0].Revision == lastEventRevision {
t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, client: %d", lastEventRevision, resp.Events[0].Revision, report.ClientId)
t.Errorf("Broke watch guarantee: Atomic - a list of events is guaranteed to encompass complete revisions; updates in the same revision over multiple keys will not be split over several lists of events, previousListEventRevision: %d, currentListEventRevision: %d, client: %d", lastEventRevision, resp.Events[0].Revision, report.ClientID)
}
lastEventRevision = resp.Events[len(resp.Events)-1].Revision
}