diff --git a/tests/robustness/traffic/client.go b/tests/robustness/client/client.go similarity index 80% rename from tests/robustness/traffic/client.go rename to tests/robustness/client/client.go index 0fd20e452..bd3ccb5b9 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/client/client.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package traffic +package client import ( "context" @@ -33,7 +33,7 @@ import ( // clientv3.Client) that records all the requests and responses made. Doesn't // allow for concurrent requests to confirm to model.AppendableHistory requirements. type RecordingClient struct { - id int + ID int client clientv3.Client // using baseTime time-measuring operation to get monotonic clock reading // see https://github.com/golang/go/blob/master/src/time/time.go#L17 @@ -51,7 +51,7 @@ type TimedWatchEvent struct { Time time.Duration } -func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { +func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, Logger: zap.NewNop(), @@ -62,7 +62,7 @@ func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (* return nil, err } return &RecordingClient{ - id: ids.NewClientID(), + ID: ids.NewClientID(), client: *cc, kvOperations: model.NewAppendableHistory(ids), baseTime: baseTime, @@ -75,7 +75,7 @@ func (c *RecordingClient) Close() error { func (c *RecordingClient) Report() report.ClientReport { return report.ClientReport{ - ClientID: c.id, + ClientID: c.ID, KeyValue: c.kvOperations.History.Operations(), Watch: c.watchOperations, } @@ -190,6 +190,65 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR return resp, err } +func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.Compact(ctx, rev) + return resp, err +} +func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.MemberList(ctx, opts...) + return resp, err +} + +func (c *RecordingClient) MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.MemberAdd(ctx, peerAddrs) + return resp, err +} + +func (c *RecordingClient) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.MemberAddAsLearner(ctx, peerAddrs) + return resp, err +} + +func (c *RecordingClient) MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.MemberRemove(ctx, id) + return resp, err +} + +func (c *RecordingClient) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*clientv3.MemberUpdateResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.MemberUpdate(ctx, id, peerAddrs) + return resp, err +} + +func (c *RecordingClient) MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.MemberPromote(ctx, id) + return resp, err +} + +func (c *RecordingClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) { + c.kvMux.Lock() + defer c.kvMux.Unlock() + resp, err := c.client.Status(ctx, endpoint) + return resp, err +} + +func (c *RecordingClient) Endpoints() []string { + return c.client.Endpoints() +} + func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan { request := model.WatchRequest{ Key: key, diff --git a/tests/robustness/failpoint/cluster.go b/tests/robustness/failpoint/cluster.go index 0658def95..2a68fcb73 100644 --- a/tests/robustness/failpoint/cluster.go +++ b/tests/robustness/failpoint/cluster.go @@ -28,6 +28,9 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/client" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -36,26 +39,21 @@ var ( type memberReplace struct{} -func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { memberID := uint64(rand.Int() % len(clus.Procs)) member := clus.Procs[memberID] var endpoints []string for i := 1; i < len(clus.Procs); i++ { endpoints = append(endpoints, clus.Procs[(int(memberID)+i)%len(clus.Procs)].EndpointsGRPC()...) } - cc, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - Logger: zap.NewNop(), - DialKeepAliveTime: 50 * time.Second, - DialKeepAliveTimeout: 100 * time.Millisecond, - }) + cc, err := client.NewRecordingClient(endpoints, ids, baseTime) if err != nil { - return err + return nil, err } defer cc.Close() memberID, found, err := getID(ctx, cc, member.Config().Name) if err != nil { - return err + return nil, err } if !found { t.Fatal("Member not found") @@ -65,11 +63,11 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, lg.Info("Removing member", zap.String("member", member.Config().Name)) _, err = cc.MemberRemove(ctx, memberID) if err != nil { - return err + return nil, err } _, found, err = getID(ctx, cc, member.Config().Name) if err != nil { - return err + return nil, err } if found { t.Fatal("Expected member to be removed") @@ -83,13 +81,13 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Failed to kill the process", zap.Error(err)) - return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) + return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } lg.Info("Removing member data", zap.String("member", member.Config().Name)) err = os.RemoveAll(member.Config().DataDirPath) if err != nil { - return err + return nil, err } lg.Info("Adding member back", zap.String("member", member.Config().Name)) @@ -97,7 +95,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, for { select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } reqCtx, cancel := context.WithTimeout(ctx, time.Second) @@ -109,17 +107,17 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, } err = patchArgs(member.Config().Args, "initial-cluster-state", "existing") if err != nil { - return err + return nil, err } lg.Info("Starting member", zap.String("member", member.Config().Name)) err = member.Start(ctx) if err != nil { - return err + return nil, err } for { select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } _, found, err := getID(ctx, cc, member.Config().Name) @@ -130,7 +128,7 @@ func (f memberReplace) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, break } } - return nil + return nil, nil } func (f memberReplace) Name() string { @@ -141,7 +139,7 @@ func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, _ e2e.Etcd return config.ClusterSize > 1 } -func getID(ctx context.Context, cc *clientv3.Client, name string) (id uint64, found bool, err error) { +func getID(ctx context.Context, cc clientv3.Cluster, name string) (id uint64, found bool, err error) { resp, err := cc.MemberList(ctx) if err != nil { return 0, false, err diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index c81fb5fe9..14e6ddf7e 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -26,6 +26,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) const ( @@ -75,7 +77,7 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error { return nil } -func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) { +func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) { ctx, cancel := context.WithTimeout(ctx, triggerTimeout) defer cancel() var err error @@ -85,7 +87,7 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro } lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name())) start := time.Since(baseTime) - err = failpoint.Inject(ctx, t, lg, clus) + clientReport, err := failpoint.Inject(ctx, t, lg, clus, baseTime, ids) if err != nil { lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err)) return nil, fmt.Errorf("failed triggering failpoint, err: %v", err) @@ -96,14 +98,22 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name())) end := time.Since(baseTime) - return &InjectionReport{ - Start: start, - End: end, - Name: failpoint.Name(), + return &FailpointReport{ + Injection: Injection{ + Start: start, + End: end, + Name: failpoint.Name(), + }, + Client: clientReport, }, nil } -type InjectionReport struct { +type FailpointReport struct { + Injection + Client []report.ClientReport +} + +type Injection struct { Start, End time.Duration Name string } @@ -137,7 +147,7 @@ func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProces } type Failpoint interface { - Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error + Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) Name() string AvailabilityChecker } diff --git a/tests/robustness/failpoint/gofail.go b/tests/robustness/failpoint/gofail.go index 3d90c5ddd..2e85798ba 100644 --- a/tests/robustness/failpoint/gofail.go +++ b/tests/robustness/failpoint/gofail.go @@ -25,6 +25,8 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -73,17 +75,17 @@ const ( Follower failpointTarget = "Follower" ) -func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) (reports []report.ClientReport, err error) { member := f.pickMember(t, clus) for member.IsRunning() { select { case <-ctx.Done(): - return ctx.Err() + return reports, ctx.Err() default: } lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name())) - err := member.Failpoints().SetupHTTP(ctx, f.failpoint, "panic") + err = member.Failpoints().SetupHTTP(ctx, f.failpoint, "panic") if err != nil { lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err)) continue @@ -93,17 +95,21 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg break } if f.trigger != nil { + var r []report.ClientReport lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name())) - err = f.trigger.Trigger(ctx, t, member, clus) + r, err = f.trigger.Trigger(ctx, t, member, clus, baseTime, ids) if err != nil { lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err)) } + if r != nil { + reports = append(reports, r...) + } } lg.Info("Waiting for member to exit", zap.String("member", member.Config().Name)) err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Member didn't exit as expected", zap.String("member", member.Config().Name), zap.Error(err)) - return fmt.Errorf("member didn't exit as expected: %v", err) + return reports, fmt.Errorf("member didn't exit as expected: %v", err) } lg.Info("Member exited as expected", zap.String("member", member.Config().Name)) } @@ -112,11 +118,11 @@ func (f goPanicFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logg lg.Info("Removing data that was not fsynced") err := lazyfs.ClearCache(ctx) if err != nil { - return err + return reports, err } } - return member.Start(ctx) + return reports, member.Start(ctx) } func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) e2e.EtcdProcess { @@ -155,7 +161,7 @@ type killAndGofailSleep struct { time time.Duration } -func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] for member.IsRunning() { err := member.Kill() @@ -165,20 +171,20 @@ func (f killAndGofailSleep) Inject(ctx context.Context, t *testing.T, lg *zap.Lo err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Failed to kill the process", zap.Error(err)) - return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) + return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } lg.Info("Setting up goFailpoint", zap.String("failpoint", f.Name())) err := member.Failpoints().SetupEnv(f.failpoint, fmt.Sprintf(`sleep(%q)`, f.time)) if err != nil { - return err + return nil, err } err = member.Start(ctx) if err != nil { - return err + return nil, err } // TODO: Check gofail status (https://github.com/etcd-io/gofail/pull/47) and wait for sleep to beis executed at least once. - return nil + return nil, nil } func (f killAndGofailSleep) Name() string { @@ -201,22 +207,22 @@ type gofailSleepAndDeactivate struct { time time.Duration } -func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f gofailSleepAndDeactivate) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] lg.Info("Setting up gofailpoint", zap.String("failpoint", f.Name())) err := member.Failpoints().SetupHTTP(ctx, f.failpoint, fmt.Sprintf(`sleep(%q)`, f.time)) if err != nil { lg.Info("goFailpoint setup failed", zap.String("failpoint", f.Name()), zap.Error(err)) - return fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err) + return nil, fmt.Errorf("goFailpoint %s setup failed, err:%w", f.Name(), err) } time.Sleep(f.time) lg.Info("Deactivating gofailpoint", zap.String("failpoint", f.Name())) err = member.Failpoints().DeactivateHTTP(ctx, f.failpoint) if err != nil { lg.Info("goFailpoint deactivate failed", zap.String("failpoint", f.Name()), zap.Error(err)) - return fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err) + return nil, fmt.Errorf("goFailpoint %s deactivate failed, err: %w", f.Name(), err) } - return nil + return nil, nil } func (f gofailSleepAndDeactivate) Name() string { diff --git a/tests/robustness/failpoint/kill.go b/tests/robustness/failpoint/kill.go index ef1d26e13..b0b5ff4fb 100644 --- a/tests/robustness/failpoint/kill.go +++ b/tests/robustness/failpoint/kill.go @@ -20,10 +20,13 @@ import ( "math/rand" "strings" "testing" + "time" "go.uber.org/zap" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -32,7 +35,7 @@ var ( type killFailpoint struct{} -func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] for member.IsRunning() { @@ -43,21 +46,21 @@ func (f killFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, err = member.Wait(ctx) if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { lg.Info("Failed to kill the process", zap.Error(err)) - return fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) + return nil, fmt.Errorf("failed to kill the process within %s, err: %w", triggerTimeout, err) } } if lazyfs := member.LazyFS(); lazyfs != nil { lg.Info("Removing data that was not fsynced") err := lazyfs.ClearCache(ctx) if err != nil { - return err + return nil, err } } err := member.Start(ctx) if err != nil { - return err + return nil, err } - return nil + return nil, nil } func (f killFailpoint) Name() string { diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index 5d59fba3d..48edc912e 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -24,6 +24,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) var ( @@ -37,9 +39,9 @@ type blackholePeerNetworkFailpoint struct { triggerBlackhole } -func (f blackholePeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f blackholePeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] - return f.Trigger(ctx, t, member, clus) + return f.Trigger(ctx, t, member, clus, baseTime, ids) } func (f blackholePeerNetworkFailpoint) Name() string { @@ -50,8 +52,8 @@ type triggerBlackhole struct { waitTillSnapshot bool } -func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - return Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) +func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + return nil, Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) } func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool { @@ -153,7 +155,7 @@ type delayPeerNetworkFailpoint struct { randomizedLatency time.Duration } -func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] proxy := member.PeerProxy() @@ -164,7 +166,7 @@ func (f delayPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg lg.Info("Traffic delay removed", zap.String("member", member.Config().Name)) proxy.UndelayRx() proxy.UndelayTx() - return nil + return nil, nil } func (f delayPeerNetworkFailpoint) Name() string { @@ -180,7 +182,7 @@ type dropPeerNetworkFailpoint struct { dropProbabilityPercent int } -func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { member := clus.Procs[rand.Int()%len(clus.Procs)] proxy := member.PeerProxy() @@ -191,7 +193,7 @@ func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg * lg.Info("Traffic drop removed", zap.String("member", member.Config().Name)) proxy.UnmodifyRx() proxy.UnmodifyTx() - return nil + return nil, nil } func (f dropPeerNetworkFailpoint) modifyPacket(data []byte) []byte { diff --git a/tests/robustness/failpoint/trigger.go b/tests/robustness/failpoint/trigger.go index 79316a3ab..3f8e2990a 100644 --- a/tests/robustness/failpoint/trigger.go +++ b/tests/robustness/failpoint/trigger.go @@ -21,35 +21,30 @@ import ( "testing" "time" - "go.uber.org/zap" - - clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/client" + "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/report" ) type trigger interface { - Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error + Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) AvailabilityChecker } type triggerDefrag struct{} -func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error { - cc, err := clientv3.New(clientv3.Config{ - Endpoints: member.EndpointsGRPC(), - Logger: zap.NewNop(), - DialKeepAliveTime: 10 * time.Second, - DialKeepAliveTimeout: 100 * time.Millisecond, - }) +func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + cc, err := client.NewRecordingClient(member.EndpointsGRPC(), ids, baseTime) if err != nil { - return fmt.Errorf("failed creating client: %w", err) + return nil, fmt.Errorf("failed creating client: %w", err) } defer cc.Close() - _, err = cc.Defragment(ctx, member.EndpointsGRPC()[0]) + _, err = cc.Defragment(ctx) if err != nil && !strings.Contains(err.Error(), "error reading from server: EOF") { - return err + return nil, err } - return nil + return nil, nil } func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { @@ -60,37 +55,32 @@ type triggerCompact struct { multiBatchCompaction bool } -func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - cc, err := clientv3.New(clientv3.Config{ - Endpoints: member.EndpointsGRPC(), - Logger: zap.NewNop(), - DialKeepAliveTime: 10 * time.Second, - DialKeepAliveTimeout: 100 * time.Millisecond, - }) +func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, baseTime time.Time, ids identity.Provider) ([]report.ClientReport, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + cc, err := client.NewRecordingClient(member.EndpointsGRPC(), ids, baseTime) if err != nil { - return fmt.Errorf("failed creating client: %w", err) + return nil, fmt.Errorf("failed creating client: %w", err) } defer cc.Close() var rev int64 for { - resp, gerr := cc.Get(ctx, "/") - if gerr != nil { - return gerr + _, rev, err = cc.Get(ctx, "/", 0) + if err != nil { + return nil, err } - rev = resp.Header.Revision if !t.multiBatchCompaction || rev > int64(clus.Cfg.ServerConfig.ExperimentalCompactionBatchLimit) { break } time.Sleep(50 * time.Millisecond) } - _, err = cc.Compact(ctx, rev) if err != nil && !strings.Contains(err.Error(), "error reading from server: EOF") { - return err + return nil, err } - return nil + return []report.ClientReport{cc.Report()}, nil } func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index db9e483c3..3e59e5b3c 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -108,8 +108,8 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu ctx, cancel := context.WithCancel(ctx) defer cancel() g := errgroup.Group{} - var operationReport, watchReport []report.ClientReport - failpointInjected := make(chan failpoint.InjectionReport, 1) + var operationReport, watchReport, failpointClientReport []report.ClientReport + failpointInjected := make(chan failpoint.Injection, 1) // using baseTime time-measuring operation to get monotonic clock reading // see https://github.com/golang/go/blob/master/src/time/time.go#L17 @@ -119,7 +119,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu defer close(failpointInjected) // Give some time for traffic to reach qps target before injecting failpoint. time.Sleep(time.Second) - fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime) + fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime, ids) if err != nil { t.Error(err) cancel() @@ -127,7 +127,8 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu // Give some time for traffic to reach qps target after injecting failpoint. time.Sleep(time.Second) if fr != nil { - failpointInjected <- *fr + failpointInjected <- fr.Injection + failpointClientReport = fr.Client } return nil }) @@ -145,7 +146,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu return nil }) g.Wait() - return append(operationReport, watchReport...) + return append(operationReport, append(failpointClientReport, watchReport...)...) } func operationsMaxRevision(reports []report.ClientReport) int64 { diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index a7776e307..4e80c633d 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" ) @@ -94,7 +95,7 @@ func (t etcdTraffic) Name() string { return "Etcd" } -func (t etcdTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { +func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { lastOperationSucceeded := true var lastRev int64 var requestType etcdRequestType @@ -151,7 +152,7 @@ func filterOutNonUniqueEtcdWrites(choices []choiceWeight[etcdRequestType]) (resp type etcdTrafficClient struct { etcdTraffic keyPrefix string - client *RecordingClient + client *client.RecordingClient limiter *rate.Limiter idProvider identity.Provider leaseStorage identity.LeaseIDStorage @@ -222,7 +223,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, } } case PutWithLease: - leaseID := c.leaseStorage.LeaseID(c.client.id) + leaseID := c.leaseStorage.LeaseID(c.client.ID) if leaseID == 0 { var resp *clientv3.LeaseGrantResponse resp, err = c.client.LeaseGrant(opCtx, c.leaseTTL) @@ -231,7 +232,7 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, rev = resp.ResponseHeader.Revision } if err == nil { - c.leaseStorage.AddLeaseID(c.client.id, leaseID) + c.leaseStorage.AddLeaseID(c.client.ID, leaseID) c.limiter.Wait(ctx) } } @@ -245,13 +246,13 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, } } case LeaseRevoke: - leaseID := c.leaseStorage.LeaseID(c.client.id) + leaseID := c.leaseStorage.LeaseID(c.client.ID) if leaseID != 0 { var resp *clientv3.LeaseRevokeResponse resp, err = c.client.LeaseRevoke(opCtx, leaseID) //if LeaseRevoke has failed, do not remove the mapping. if err == nil { - c.leaseStorage.RemoveLeaseID(c.client.id) + c.leaseStorage.RemoveLeaseID(c.client.ID) } if resp != nil { rev = resp.Header.Revision diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 2f065d84e..7ba278a90 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -27,6 +27,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/v3/stringutil" + "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" ) @@ -58,7 +59,7 @@ func (t kubernetesTraffic) Name() string { return "Kubernetes" } -func (t kubernetesTraffic) Run(ctx context.Context, c *RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { +func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { kc := &kubernetesClient{client: c} s := newStorage() keyPrefix := "/registry/" + t.resource + "/" @@ -214,7 +215,7 @@ const ( ) type kubernetesClient struct { - client *RecordingClient + client *client.RecordingClient } func (k kubernetesClient) List(ctx context.Context, prefix string, revision, limit int64) (*clientv3.GetResponse, error) { diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 431a0fb87..ae1c30dfd 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -24,6 +24,7 @@ import ( "golang.org/x/time/rate" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/failpoint" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" @@ -52,7 +53,7 @@ var ( } ) -func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.InjectionReport, baseTime time.Time, ids identity.Provider) []report.ClientReport { +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.Injection, baseTime time.Time, ids identity.Provider) []report.ClientReport { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() @@ -60,7 +61,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 reports := []report.ClientReport{} limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200) - cc, err := NewClient(endpoints, ids, baseTime) + cc, err := client.NewRecordingClient(endpoints, ids, baseTime) if err != nil { t.Fatal(err) } @@ -77,11 +78,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 startTime := time.Since(baseTime) for i := 0; i < profile.ClientCount; i++ { wg.Add(1) - c, nerr := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) + c, nerr := client.NewRecordingClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) if nerr != nil { t.Fatal(nerr) } - go func(c *RecordingClient) { + go func(c *client.RecordingClient) { defer wg.Done() defer c.Close() @@ -91,7 +92,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 mux.Unlock() }(c) } - var fr *failpoint.InjectionReport + var fr *failpoint.Injection select { case frp, ok := <-failpointInjected: if !ok { @@ -172,7 +173,7 @@ type Profile struct { } type Traffic interface { - Run(ctx context.Context, c *RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) + Run(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) ExpectUniqueRevision() bool Name() string } diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 6706c0000..2c967b935 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -19,9 +19,9 @@ import ( "github.com/anishathalye/porcupine" + "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/report" - "go.etcd.io/etcd/tests/v3/robustness/traffic" ) func patchedOperationHistory(reports []report.ClientReport, persistedRequests []model.EtcdRequest) []porcupine.Operation { @@ -46,8 +46,8 @@ func relevantOperations(reports []report.ClientReport) []porcupine.Operation { return ops } -func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]traffic.TimedWatchEvent { - persisted := map[model.Event]traffic.TimedWatchEvent{} +func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]client.TimedWatchEvent { + persisted := map[model.Event]client.TimedWatchEvent{} for _, r := range reports { for _, op := range r.Watch { for _, resp := range op.Responses { @@ -56,7 +56,7 @@ func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]traffic.Ti if prev, found := persisted[event.Event]; found && prev.Time < responseTime { responseTime = prev.Time } - persisted[event.Event] = traffic.TimedWatchEvent{Time: responseTime, WatchEvent: event} + persisted[event.Event] = client.TimedWatchEvent{Time: responseTime, WatchEvent: event} } } } @@ -64,7 +64,7 @@ func uniqueWatchEvents(reports []report.ClientReport) map[model.Event]traffic.Ti return persisted } -func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation { +func patchOperations(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent, persistedOperations map[model.EtcdOperation]int64) []porcupine.Operation { newOperations := make([]porcupine.Operation, 0, len(operations)) lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents) @@ -104,7 +104,7 @@ func patchOperations(operations []porcupine.Operation, watchEvents map[model.Eve return newOperations } -func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) porcupine.Operation { +func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents map[model.Event]client.TimedWatchEvent) porcupine.Operation { var maxCallTime int64 var lastOperation porcupine.Operation for _, op := range operations { @@ -121,7 +121,7 @@ func lastOperationObservedInWatch(operations []porcupine.Operation, watchEvents return lastOperation } -func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]traffic.TimedWatchEvent) *traffic.TimedWatchEvent { +func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]client.TimedWatchEvent) *client.TimedWatchEvent { for _, etcdOp := range append(request.OperationsOnSuccess, request.OperationsOnFailure...) { if etcdOp.Type == model.PutOperation { event, ok := watchEvents[model.Event{ diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index 212f86fd0..f19fced73 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -21,9 +21,9 @@ import ( "time" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/report" - "go.etcd.io/etcd/tests/v3/robustness/traffic" ) func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, maxRevisionChan <-chan int64, cfg watchConfig, baseTime time.Time, ids identity.Provider) []report.ClientReport { @@ -32,14 +32,14 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, clus *e2e.Etcd reports := make([]report.ClientReport, len(clus.Procs)) memberMaxRevisionChans := make([]chan int64, len(clus.Procs)) for i, member := range clus.Procs { - c, err := traffic.NewClient(member.EndpointsGRPC(), ids, baseTime) + c, err := client.NewRecordingClient(member.EndpointsGRPC(), ids, baseTime) if err != nil { t.Fatal(err) } memberMaxRevisionChan := make(chan int64, 1) memberMaxRevisionChans[i] = memberMaxRevisionChan wg.Add(1) - go func(i int, c *traffic.RecordingClient) { + go func(i int, c *client.RecordingClient) { defer wg.Done() defer c.Close() watchUntilRevision(ctx, t, c, memberMaxRevisionChan, cfg) @@ -65,7 +65,7 @@ type watchConfig struct { } // watchUntilRevision watches all changes until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. -func watchUntilRevision(ctx context.Context, t *testing.T, c *traffic.RecordingClient, maxRevisionChan <-chan int64, cfg watchConfig) { +func watchUntilRevision(ctx context.Context, t *testing.T, c *client.RecordingClient, maxRevisionChan <-chan int64, cfg watchConfig) { var maxRevision int64 var lastRevision int64 ctx, cancel := context.WithCancel(ctx)