tests: Allow dynamic number of clients

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2022-12-01 20:22:50 +01:00
parent 42bb543315
commit 6a68141db5
4 changed files with 49 additions and 13 deletions

View File

@ -30,7 +30,7 @@ type recordingClient struct {
operations []porcupine.Operation operations []porcupine.Operation
} }
func NewClient(endpoints []string, id int) (*recordingClient, error) { func NewClient(endpoints []string, ids idProvider) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{ cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints, Endpoints: endpoints,
Logger: zap.NewNop(), Logger: zap.NewNop(),
@ -42,7 +42,7 @@ func NewClient(endpoints []string, id int) (*recordingClient, error) {
} }
return &recordingClient{ return &recordingClient{
client: *cc, client: *cc,
id: id, id: ids.ClientId(),
operations: []porcupine.Operation{}, operations: []porcupine.Operation{},
}, nil }, nil
} }

View File

@ -0,0 +1,40 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package linearizability
import "sync/atomic"
type idProvider interface {
ClientId() int
RequestId() int
}
func newIdProvider() idProvider {
return &atomicProvider{}
}
type atomicProvider struct {
clientId atomic.Int64
requestId atomic.Int64
}
func (id *atomicProvider) ClientId() int {
// Substract one as ClientId should start from zero.
return int(id.clientId.Add(1) - 1)
}
func (id *atomicProvider) RequestId() int {
return int(id.requestId.Add(1))
}

View File

@ -144,6 +144,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
mux := sync.Mutex{} mux := sync.Mutex{}
endpoints := clus.EndpointsV3() endpoints := clus.EndpointsV3()
ids := newIdProvider()
limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200) limiter := rate.NewLimiter(rate.Limit(config.maximalQPS), 200)
startTime := time.Now() startTime := time.Now()
@ -151,7 +152,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
for i := 0; i < config.clientCount; i++ { for i := 0; i < config.clientCount; i++ {
wg.Add(1) wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]} endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, i) c, err := NewClient(endpoints, ids)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -159,7 +160,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
defer wg.Done() defer wg.Done()
defer c.Close() defer c.Close()
config.traffic.Run(ctx, c, limiter) config.traffic.Run(ctx, c, limiter, ids)
mux.Lock() mux.Lock()
operations = append(operations, c.operations...) operations = append(operations, c.operations...)
mux.Unlock() mux.Unlock()

View File

@ -28,7 +28,7 @@ var (
) )
type Traffic interface { type Traffic interface {
Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider)
} }
type readWriteSingleKey struct { type readWriteSingleKey struct {
@ -41,12 +41,9 @@ type opChance struct {
chance int chance int
} }
func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) { func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider) {
maxOperationsPerClient := 1000000
minId := maxOperationsPerClient * c.id
maxId := maxOperationsPerClient * (c.id + 1)
for writeId := minId; writeId < maxId; { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -58,10 +55,8 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter
continue continue
} }
// Provide each write with unique id to make it easier to validate operation history. // Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, writeId) t.Write(ctx, c, limiter, ids.RequestId())
writeId++
} }
return
} }
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error { func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error {