tests/robustness: Reproduce issue #15220

This issue is somewhat easily reproduced simply by bombarding the
server with requests for progress notifications, which eventually
leads to one being delivered ahead of the payload message. This is
then caught by the watch response validation code previously added by
Marek Siarkowicz.

Signed-off-by: Peter Wortmann <peter.wortmann@skao.int>
This commit is contained in:
Peter Wortmann 2023-03-28 22:03:37 +01:00
parent af25936fb7
commit 42a2643df9
3 changed files with 48 additions and 18 deletions

View File

@ -36,10 +36,11 @@ const (
var (
LowTraffic = trafficConfig{
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
name: "LowTraffic",
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
requestProgress: false,
traffic: traffic{
keyCount: 10,
leaseTTL: DefaultLeaseTTL,
@ -56,10 +57,11 @@ var (
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: false,
traffic: traffic{
keyCount: 10,
largePutSize: 32769,
@ -71,6 +73,22 @@ var (
},
},
}
ReqProgTraffic = trafficConfig{
name: "RequestProgressTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
requestProgress: true,
traffic: traffic{
keyCount: 10,
largePutSize: 8196,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 95},
{operation: LargePut, chance: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
LowTraffic, HighTraffic,
@ -141,6 +159,14 @@ func TestRobustness(t *testing.T) {
e2e.WithSnapshotCount(100),
),
})
scenarios = append(scenarios, scenario{
name: "Issue15220",
failpoint: RandomOneNodeClusterFailpoint,
traffic: &ReqProgTraffic,
config: *e2e.NewConfig(
e2e.WithClusterSize(1),
),
})
snapshotOptions := []e2e.EPClusterOption{
e2e.WithGoFailEnabled(true),
e2e.WithSnapshotCount(100),
@ -191,7 +217,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, config e2
forcestopCluster(r.clus)
watchProgressNotifyEnabled := r.clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, r.responses, watchProgressNotifyEnabled)
validateWatchResponses(t, r.responses, traffic.requestProgress || watchProgressNotifyEnabled)
r.events = watchEvents(r.responses)
validateEventsMatch(t, r.events)
@ -218,7 +244,7 @@ func runScenario(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.Et
return nil
})
g.Go(func() error {
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan)
responses = collectClusterWatchEvents(ctx, t, clus, maxRevisionChan, traffic.requestProgress)
return nil
})
g.Wait()

View File

@ -109,11 +109,12 @@ func simulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}
type trafficConfig struct {
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
name string
minimalQPS float64
maximalQPS float64
clientCount int
traffic Traffic
requestProgress bool // Request progress notifications while watching this traffic
}
type Traffic interface {

View File

@ -31,7 +31,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/model"
)
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64) [][]watchResponse {
func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, requestProgress bool) [][]watchResponse {
mux := sync.Mutex{}
var wg sync.WaitGroup
memberResponses := make([][]watchResponse, len(clus.Procs))
@ -52,7 +52,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
go func(i int, c *clientv3.Client) {
defer wg.Done()
defer c.Close()
responses := watchMember(ctx, t, c, memberChan)
responses := watchMember(ctx, t, c, memberChan, requestProgress)
mux.Lock()
memberResponses[i] = responses
mux.Unlock()
@ -71,7 +71,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd
}
// watchMember collects all responses until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed.
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64) (resps []watchResponse) {
func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevisionChan <-chan int64, requestProgress bool) (resps []watchResponse) {
var maxRevision int64 = 0
var lastRevision int64 = 0
ctx, cancel := context.WithCancel(ctx)
@ -101,6 +101,9 @@ func watchMember(ctx context.Context, t *testing.T, c *clientv3.Client, maxRevis
}
}
case resp := <-watch:
if requestProgress {
c.RequestProgress(ctx)
}
if resp.Err() == nil {
resps = append(resps, watchResponse{resp, time.Now()})
} else if !resp.Canceled {