mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #15891 from serathius/robustness-k8s-watch
tests/robustness: Implement kubernetes list watch protocol
This commit is contained in:
commit
0efa1c19ef
@ -167,7 +167,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
|
||||
panicked = false
|
||||
}
|
||||
|
||||
func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]watchResponse) {
|
||||
func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (operations []porcupine.Operation, responses [][]traffic.WatchResponse) {
|
||||
g := errgroup.Group{}
|
||||
finishTraffic := make(chan struct{})
|
||||
|
||||
|
@ -25,12 +25,13 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/traffic"
|
||||
)
|
||||
|
||||
type report struct {
|
||||
lg *zap.Logger
|
||||
clus *e2e.EtcdProcessCluster
|
||||
responses [][]watchResponse
|
||||
responses [][]traffic.WatchResponse
|
||||
events [][]watchEvent
|
||||
operations []porcupine.Operation
|
||||
patchedOperations []porcupine.Operation
|
||||
@ -94,7 +95,7 @@ func persistMemberDataDir(t *testing.T, lg *zap.Logger, member e2e.EtcdProcess,
|
||||
}
|
||||
}
|
||||
|
||||
func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []watchResponse) {
|
||||
func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses []traffic.WatchResponse) {
|
||||
lg.Info("Saving watch responses", zap.String("path", path))
|
||||
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
|
||||
if err != nil {
|
||||
|
@ -37,9 +37,17 @@ type RecordingClient struct {
|
||||
// Only time-measuring operations should be used to record time.
|
||||
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
|
||||
baseTime time.Time
|
||||
|
||||
watchMux sync.Mutex
|
||||
watchResponses []WatchResponse
|
||||
// mux ensures order of request appending.
|
||||
mux sync.Mutex
|
||||
history *model.AppendableHistory
|
||||
opMux sync.Mutex
|
||||
operations *model.AppendableHistory
|
||||
}
|
||||
|
||||
type WatchResponse struct {
|
||||
clientv3.WatchResponse
|
||||
Time time.Duration
|
||||
}
|
||||
|
||||
func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
|
||||
@ -53,9 +61,9 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*
|
||||
return nil, err
|
||||
}
|
||||
return &RecordingClient{
|
||||
client: *cc,
|
||||
history: model.NewAppendableHistory(ids),
|
||||
baseTime: baseTime,
|
||||
client: *cc,
|
||||
operations: model.NewAppendableHistory(ids),
|
||||
baseTime: baseTime,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -63,77 +71,85 @@ func (c *RecordingClient) Close() error {
|
||||
return c.client.Close()
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Operations() model.History {
|
||||
return c.history.History
|
||||
func (c *RecordingClient) Report() ClientReport {
|
||||
return ClientReport{
|
||||
Operations: c.operations.History,
|
||||
Watch: nil,
|
||||
}
|
||||
}
|
||||
|
||||
type ClientReport struct {
|
||||
Operations model.History
|
||||
Watch []WatchResponse
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Get(ctx context.Context, key string) (*mvccpb.KeyValue, error) {
|
||||
resp, err := c.Range(ctx, key, false)
|
||||
if err != nil || len(resp) == 0 {
|
||||
if err != nil || len(resp.Kvs) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
if len(resp) == 1 {
|
||||
return resp[0], err
|
||||
if len(resp.Kvs) == 1 {
|
||||
return resp.Kvs[0], err
|
||||
}
|
||||
panic(fmt.Sprintf("Unexpected response size: %d", len(resp)))
|
||||
panic(fmt.Sprintf("Unexpected response size: %d", len(resp.Kvs)))
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
|
||||
func (c *RecordingClient) Range(ctx context.Context, key string, withPrefix bool) (*clientv3.GetResponse, error) {
|
||||
ops := []clientv3.OpOption{}
|
||||
if withPrefix {
|
||||
ops = append(ops, clientv3.WithPrefix())
|
||||
}
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Get(ctx, key, ops...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendRange(key, withPrefix, callTime, returnTime, resp)
|
||||
return resp.Kvs, nil
|
||||
c.operations.AppendRange(key, withPrefix, callTime, returnTime, resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Put(ctx context.Context, key, value string) error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Put(ctx, key, value)
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendPut(key, value, callTime, returnTime, resp, err)
|
||||
c.operations.AppendPut(key, value, callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Delete(ctx context.Context, key string) error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Delete(ctx, key)
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendDelete(key, callTime, returnTime, resp, err)
|
||||
c.operations.AppendDelete(key, callTime, returnTime, resp, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *RecordingClient) CompareRevisionAndDelete(ctx context.Context, key string, expectedRevision int64) error {
|
||||
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpDelete(key))
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := txn.Commit()
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err)
|
||||
c.operations.AppendCompareRevisionAndDelete(key, expectedRevision, callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *RecordingClient) CompareRevisionAndPut(ctx context.Context, key, value string, expectedRevision int64) error {
|
||||
txn := c.compareRevisionTxn(ctx, key, expectedRevision, clientv3.OpPut(key, value))
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := txn.Commit()
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err)
|
||||
c.operations.AppendCompareRevisionAndPut(key, expectedRevision, value, callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -158,22 +174,22 @@ func (c *RecordingClient) Txn(ctx context.Context, cmp []clientv3.Cmp, ops []cli
|
||||
).Then(
|
||||
ops...,
|
||||
)
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := txn.Commit()
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendTxn(cmp, ops, callTime, returnTime, resp, err)
|
||||
c.operations.AppendTxn(cmp, ops, callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, error) {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Lease.Grant(ctx, ttl)
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendLeaseGrant(callTime, returnTime, resp, err)
|
||||
c.operations.AppendLeaseGrant(callTime, returnTime, resp, err)
|
||||
var leaseId int64
|
||||
if resp != nil {
|
||||
leaseId = int64(resp.ID)
|
||||
@ -182,32 +198,53 @@ func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (int64, err
|
||||
}
|
||||
|
||||
func (c *RecordingClient) LeaseRevoke(ctx context.Context, leaseId int64) error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Lease.Revoke(ctx, clientv3.LeaseID(leaseId))
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
|
||||
c.operations.AppendLeaseRevoke(leaseId, callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *RecordingClient) PutWithLease(ctx context.Context, key string, value string, leaseId int64) error {
|
||||
opts := clientv3.WithLease(clientv3.LeaseID(leaseId))
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Put(ctx, key, value, opts)
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err)
|
||||
c.operations.AppendPutWithLease(key, value, leaseId, callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Defragment(ctx context.Context) error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.opMux.Lock()
|
||||
defer c.opMux.Unlock()
|
||||
callTime := time.Since(c.baseTime)
|
||||
resp, err := c.client.Defragment(ctx, c.client.Endpoints()[0])
|
||||
returnTime := time.Since(c.baseTime)
|
||||
c.history.AppendDefragment(callTime, returnTime, resp, err)
|
||||
c.operations.AppendDefragment(callTime, returnTime, resp, err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool) clientv3.WatchChan {
|
||||
ops := []clientv3.OpOption{clientv3.WithProgressNotify()}
|
||||
if withPrefix {
|
||||
ops = append(ops, clientv3.WithPrefix())
|
||||
}
|
||||
if rev != 0 {
|
||||
ops = append(ops, clientv3.WithRev(rev))
|
||||
}
|
||||
respCh := make(chan clientv3.WatchResponse)
|
||||
go func() {
|
||||
defer close(respCh)
|
||||
for r := range c.client.Watch(ctx, key, ops...) {
|
||||
c.watchMux.Lock()
|
||||
c.watchResponses = append(c.watchResponses, WatchResponse{r, time.Since(c.baseTime)})
|
||||
c.watchMux.Unlock()
|
||||
respCh <- r
|
||||
}
|
||||
}()
|
||||
return respCh
|
||||
}
|
||||
|
@ -16,12 +16,16 @@ package traffic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/pkg/v3/stringutil"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/identity"
|
||||
)
|
||||
@ -61,42 +65,82 @@ const (
|
||||
)
|
||||
|
||||
func (t kubernetesTraffic) Run(ctx context.Context, clientId int, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage, finish <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-finish:
|
||||
return
|
||||
default:
|
||||
s := newStorage()
|
||||
keyPrefix := "/registry/" + t.resource + "/"
|
||||
g := errgroup.Group{}
|
||||
|
||||
g.Go(func() error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-finish:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
resp, err := t.Range(ctx, c, keyPrefix, true)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
s.Reset(resp)
|
||||
limiter.Wait(ctx)
|
||||
watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout)
|
||||
for e := range c.Watch(watchCtx, keyPrefix, resp.Header.Revision, true) {
|
||||
s.Update(e)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
objects, err := t.Range(ctx, c, "/registry/"+t.resource+"/", true)
|
||||
if err != nil {
|
||||
continue
|
||||
})
|
||||
g.Go(func() error {
|
||||
lastWriteFailed := false
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-finish:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
// Avoid multiple failed writes in a row
|
||||
if lastWriteFailed {
|
||||
resp, err := t.Range(ctx, c, keyPrefix, true)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
s.Reset(resp)
|
||||
limiter.Wait(ctx)
|
||||
}
|
||||
err := t.Write(ctx, c, ids, s)
|
||||
lastWriteFailed = err != nil
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
limiter.Wait(ctx)
|
||||
}
|
||||
limiter.Wait(ctx)
|
||||
err = t.Write(ctx, c, ids, objects)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
limiter.Wait(ctx)
|
||||
}
|
||||
})
|
||||
g.Wait()
|
||||
}
|
||||
|
||||
func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, objects []*mvccpb.KeyValue) (err error) {
|
||||
func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids identity.Provider, s *storage) (err error) {
|
||||
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
if len(objects) < t.averageKeyCount/2 {
|
||||
defer cancel()
|
||||
count := s.Count()
|
||||
if count < t.averageKeyCount/2 {
|
||||
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
|
||||
} else {
|
||||
randomPod := objects[rand.Intn(len(objects))]
|
||||
if len(objects) > t.averageKeyCount*3/2 {
|
||||
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
|
||||
key, rev := s.PickRandom()
|
||||
if rev == 0 {
|
||||
return errors.New("storage empty")
|
||||
}
|
||||
if count > t.averageKeyCount*3/2 {
|
||||
err = t.Delete(writeCtx, c, key, rev)
|
||||
} else {
|
||||
op := KubernetesRequestType(pickRandom(t.writeChoices))
|
||||
op := pickRandom(t.writeChoices)
|
||||
switch op {
|
||||
case KubernetesDelete:
|
||||
err = t.Delete(writeCtx, c, string(randomPod.Key), randomPod.ModRevision)
|
||||
err = t.Delete(writeCtx, c, key, rev)
|
||||
case KubernetesUpdate:
|
||||
err = t.Update(writeCtx, c, string(randomPod.Key), fmt.Sprintf("%d", ids.NewRequestId()), randomPod.ModRevision)
|
||||
err = t.Update(writeCtx, c, key, fmt.Sprintf("%d", ids.NewRequestId()), rev)
|
||||
case KubernetesCreate:
|
||||
err = t.Create(writeCtx, c, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestId()))
|
||||
default:
|
||||
@ -104,7 +148,6 @@ func (t kubernetesTraffic) Write(ctx context.Context, c *RecordingClient, ids id
|
||||
}
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
@ -112,7 +155,7 @@ func (t kubernetesTraffic) generateKey() string {
|
||||
return fmt.Sprintf("/registry/%s/%s/%s", t.resource, t.namespace, stringutil.RandString(5))
|
||||
}
|
||||
|
||||
func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) ([]*mvccpb.KeyValue, error) {
|
||||
func (t kubernetesTraffic) Range(ctx context.Context, c *RecordingClient, key string, withPrefix bool) (*clientv3.GetResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, RequestTimeout)
|
||||
resp, err := c.Range(ctx, key, withPrefix)
|
||||
cancel()
|
||||
@ -136,3 +179,65 @@ func (t kubernetesTraffic) Delete(ctx context.Context, c *RecordingClient, key s
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
type storage struct {
|
||||
mux sync.RWMutex
|
||||
keyRevision map[string]int64
|
||||
revision int64
|
||||
}
|
||||
|
||||
func newStorage() *storage {
|
||||
return &storage{
|
||||
keyRevision: map[string]int64{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *storage) Update(resp clientv3.WatchResponse) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
for _, e := range resp.Events {
|
||||
if e.Kv.ModRevision < s.revision {
|
||||
continue
|
||||
}
|
||||
s.revision = e.Kv.ModRevision
|
||||
switch e.Type {
|
||||
case mvccpb.PUT:
|
||||
s.keyRevision[string(e.Kv.Key)] = e.Kv.ModRevision
|
||||
case mvccpb.DELETE:
|
||||
delete(s.keyRevision, string(e.Kv.Key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *storage) Reset(resp *clientv3.GetResponse) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
if resp.Header.Revision <= s.revision {
|
||||
return
|
||||
}
|
||||
s.keyRevision = make(map[string]int64, len(resp.Kvs))
|
||||
for _, kv := range resp.Kvs {
|
||||
s.keyRevision[string(kv.Key)] = kv.ModRevision
|
||||
}
|
||||
s.revision = resp.Header.Revision
|
||||
}
|
||||
|
||||
func (s *storage) Count() int {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
return len(s.keyRevision)
|
||||
}
|
||||
|
||||
func (s *storage) PickRandom() (key string, rev int64) {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
n := rand.Intn(len(s.keyRevision))
|
||||
i := 0
|
||||
for k, v := range s.keyRevision {
|
||||
if i == n {
|
||||
return k, v
|
||||
}
|
||||
i++
|
||||
}
|
||||
return "", 0
|
||||
}
|
||||
|
@ -32,6 +32,7 @@ import (
|
||||
var (
|
||||
DefaultLeaseTTL int64 = 7200
|
||||
RequestTimeout = 40 * time.Millisecond
|
||||
WatchTimeout = 400 * time.Millisecond
|
||||
MultiOpTxnOpCount = 4
|
||||
)
|
||||
|
||||
@ -63,7 +64,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
|
||||
config.traffic.Run(ctx, clientId, c, limiter, ids, lm, finish)
|
||||
mux.Lock()
|
||||
h = h.Merge(c.Operations())
|
||||
h = h.Merge(c.operations.History)
|
||||
mux.Unlock()
|
||||
}(c, i)
|
||||
}
|
||||
@ -76,7 +77,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
h = h.Merge(cc.Operations())
|
||||
h = h.Merge(cc.operations.History)
|
||||
|
||||
operations := h.Operations()
|
||||
lg.Info("Recorded operations", zap.Int("count", len(operations)))
|
||||
|
@ -29,12 +29,13 @@ import (
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/model"
|
||||
"go.etcd.io/etcd/tests/v3/robustness/traffic"
|
||||
)
|
||||
|
||||
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]watchResponse {
|
||||
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) [][]traffic.WatchResponse {
|
||||
mux := sync.Mutex{}
|
||||
var wg sync.WaitGroup
|
||||
memberResponses := make([][]watchResponse, len(clus.Procs))
|
||||
memberResponses := make([][]traffic.WatchResponse, len(clus.Procs))
|
||||
memberMaxRevisionChans := make([]chan int64, len(clus.Procs))
|
||||
for i, member := range clus.Procs {
|
||||
c, err := clientv3.New(clientv3.Config{
|
||||
@ -75,7 +76,7 @@ type watchConfig struct {
|
||||
}
|
||||
|
||||
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
|
||||
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []watchResponse) {
|
||||
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time) (resps []traffic.WatchResponse) {
|
||||
var maxRevision int64 = 0
|
||||
var lastRevision int64 = 0
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@ -111,7 +112,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
|
||||
if resp.Err() == nil {
|
||||
// using time.Since time-measuring operation to get monotonic clock reading
|
||||
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
|
||||
resps = append(resps, watchResponse{resp, time.Since(baseTime)})
|
||||
resps = append(resps, traffic.WatchResponse{WatchResponse: resp, Time: time.Since(baseTime)})
|
||||
} else if !resp.Canceled {
|
||||
t.Errorf("Watch stream received error, err %v", resp.Err())
|
||||
}
|
||||
@ -126,7 +127,7 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
|
||||
}
|
||||
}
|
||||
|
||||
func watchResponsesMaxRevision(responses []watchResponse) int64 {
|
||||
func watchResponsesMaxRevision(responses []traffic.WatchResponse) int64 {
|
||||
var maxRevision int64
|
||||
for _, response := range responses {
|
||||
for _, event := range response.Events {
|
||||
@ -138,13 +139,13 @@ func watchResponsesMaxRevision(responses []watchResponse) int64 {
|
||||
return maxRevision
|
||||
}
|
||||
|
||||
func validateWatchResponses(t *testing.T, clus *e2e.EtcdProcessCluster, responses [][]watchResponse, expectProgressNotify bool) {
|
||||
func validateWatchResponses(t *testing.T, clus *e2e.EtcdProcessCluster, responses [][]traffic.WatchResponse, expectProgressNotify bool) {
|
||||
for i, member := range clus.Procs {
|
||||
validateMemberWatchResponses(t, member.Config().Name, responses[i], expectProgressNotify)
|
||||
}
|
||||
}
|
||||
|
||||
func validateMemberWatchResponses(t *testing.T, memberId string, responses []watchResponse, expectProgressNotify bool) {
|
||||
func validateMemberWatchResponses(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) {
|
||||
// Validate watch is correctly configured to ensure proper testing
|
||||
validateGotAtLeastOneProgressNotify(t, memberId, responses, expectProgressNotify)
|
||||
|
||||
@ -156,7 +157,7 @@ func validateMemberWatchResponses(t *testing.T, memberId string, responses []wat
|
||||
validateRenewable(t, memberId, responses)
|
||||
}
|
||||
|
||||
func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, responses []watchResponse, expectProgressNotify bool) {
|
||||
func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, responses []traffic.WatchResponse, expectProgressNotify bool) {
|
||||
var gotProgressNotify = false
|
||||
var lastHeadRevision int64 = 1
|
||||
for _, resp := range responses {
|
||||
@ -171,7 +172,7 @@ func validateGotAtLeastOneProgressNotify(t *testing.T, memberId string, response
|
||||
}
|
||||
}
|
||||
|
||||
func validateRenewable(t *testing.T, memberId string, responses []watchResponse) {
|
||||
func validateRenewable(t *testing.T, memberId string, responses []traffic.WatchResponse) {
|
||||
var lastProgressNotifyRevision int64 = 0
|
||||
for _, resp := range responses {
|
||||
for _, event := range resp.Events {
|
||||
@ -185,7 +186,7 @@ func validateRenewable(t *testing.T, memberId string, responses []watchResponse)
|
||||
}
|
||||
}
|
||||
|
||||
func validateOrderedAndReliable(t *testing.T, memberId string, responses []watchResponse) {
|
||||
func validateOrderedAndReliable(t *testing.T, memberId string, responses []traffic.WatchResponse) {
|
||||
var lastEventRevision int64 = 1
|
||||
for _, resp := range responses {
|
||||
for _, event := range resp.Events {
|
||||
@ -201,7 +202,7 @@ func validateOrderedAndReliable(t *testing.T, memberId string, responses []watch
|
||||
}
|
||||
}
|
||||
|
||||
func validateUnique(t *testing.T, memberId string, responses []watchResponse) {
|
||||
func validateUnique(t *testing.T, memberId string, responses []traffic.WatchResponse) {
|
||||
type revisionKey struct {
|
||||
revision int64
|
||||
key string
|
||||
@ -218,7 +219,7 @@ func validateUnique(t *testing.T, memberId string, responses []watchResponse) {
|
||||
}
|
||||
}
|
||||
|
||||
func validateAtomic(t *testing.T, memberId string, responses []watchResponse) {
|
||||
func validateAtomic(t *testing.T, memberId string, responses []traffic.WatchResponse) {
|
||||
var lastEventRevision int64 = 1
|
||||
for _, resp := range responses {
|
||||
if len(resp.Events) > 0 {
|
||||
@ -230,7 +231,7 @@ func validateAtomic(t *testing.T, memberId string, responses []watchResponse) {
|
||||
}
|
||||
}
|
||||
|
||||
func toWatchEvents(responses []watchResponse) (events []watchEvent) {
|
||||
func toWatchEvents(responses []traffic.WatchResponse) (events []watchEvent) {
|
||||
for _, resp := range responses {
|
||||
for _, event := range resp.Events {
|
||||
var op model.OperationType
|
||||
@ -241,7 +242,7 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) {
|
||||
op = model.Delete
|
||||
}
|
||||
events = append(events, watchEvent{
|
||||
Time: resp.time,
|
||||
Time: resp.Time,
|
||||
Revision: event.Kv.ModRevision,
|
||||
Op: model.EtcdOperation{
|
||||
Type: op,
|
||||
@ -254,11 +255,6 @@ func toWatchEvents(responses []watchResponse) (events []watchEvent) {
|
||||
return events
|
||||
}
|
||||
|
||||
type watchResponse struct {
|
||||
clientv3.WatchResponse
|
||||
time time.Duration
|
||||
}
|
||||
|
||||
type watchEvent struct {
|
||||
Op model.EtcdOperation
|
||||
Revision int64
|
||||
@ -354,7 +350,7 @@ func hasUniqueWriteOperation(request *model.TxnRequest) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func watchEvents(responses [][]watchResponse) [][]watchEvent {
|
||||
func watchEvents(responses [][]traffic.WatchResponse) [][]watchEvent {
|
||||
ops := make([][]watchEvent, len(responses))
|
||||
for i, resps := range responses {
|
||||
ops[i] = toWatchEvents(resps)
|
||||
|
Loading…
x
Reference in New Issue
Block a user