Remove packet modification

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
This commit is contained in:
Chun-Hung Tseng 2024-09-25 14:04:13 +02:00
parent e7b77bb914
commit 85f7a9206b
No known key found for this signature in database
GPG Key ID: EF93C20F55FB48BB
2 changed files with 27 additions and 75 deletions

View File

@ -93,28 +93,12 @@ type Server interface {
// LatencyRx returns current receive latency.
LatencyRx() time.Duration
// ModifyTx alters/corrupts/drops "outgoing" packets from the listener
// with the given edit function.
ModifyTx(f func(data []byte) []byte)
// UnmodifyTx removes modify operation on "forwarding".
UnmodifyTx()
// ModifyRx alters/corrupts/drops "incoming" packets to client
// with the given edit function.
ModifyRx(f func(data []byte) []byte)
// UnmodifyRx removes modify operation on "receiving".
UnmodifyRx()
// BlackholeTx drops all "outgoing" packets before "forwarding".
// "BlackholeTx" operation is a wrapper around "ModifyTx" with
// a function that returns empty bytes.
BlackholeTx()
// UnblackholeTx removes blackhole operation on "sending".
UnblackholeTx()
// BlackholeRx drops all "incoming" packets to client.
// "BlackholeRx" operation is a wrapper around "ModifyRx" with
// a function that returns empty bytes.
BlackholeRx()
// UnblackholeRx removes blackhole operation on "receiving".
UnblackholeRx()
@ -855,51 +839,11 @@ func computeLatency(lat, rv time.Duration) time.Duration {
return lat + time.Duration(int64(sign)*mrand.Int63n(rv.Nanoseconds()))
}
func (s *server) ModifyTx(f func([]byte) []byte) {
s.modifyTxMu.Lock()
s.modifyTx = f
s.modifyTxMu.Unlock()
s.lg.Info(
"modifying tx",
zap.String("proxy listen on", s.Listen()),
)
}
func (s *server) UnmodifyTx() {
s.modifyTxMu.Lock()
s.modifyTx = nil
s.modifyTxMu.Unlock()
s.lg.Info(
"unmodifyed tx",
zap.String("proxy listen on", s.Listen()),
)
}
func (s *server) ModifyRx(f func([]byte) []byte) {
s.modifyRxMu.Lock()
s.modifyRx = f
s.modifyRxMu.Unlock()
s.lg.Info(
"modifying rx",
zap.String("proxy listen on", s.Listen()),
)
}
func (s *server) UnmodifyRx() {
s.modifyRxMu.Lock()
s.modifyRx = nil
s.modifyRxMu.Unlock()
s.lg.Info(
"unmodifyed rx",
zap.String("proxy listen on", s.Listen()),
)
}
func (s *server) BlackholeTx() {
s.ModifyTx(func([]byte) []byte { return nil })
s.modifyTxMu.Lock()
s.modifyTx = func([]byte) []byte { return nil }
s.modifyTxMu.Unlock()
s.lg.Info(
"blackholed tx",
zap.String("proxy listening on", s.Listen()),
@ -907,7 +851,10 @@ func (s *server) BlackholeTx() {
}
func (s *server) UnblackholeTx() {
s.UnmodifyTx()
s.modifyTxMu.Lock()
s.modifyTx = nil
s.modifyTxMu.Unlock()
s.lg.Info(
"unblackholed tx",
zap.String("proxy listening on", s.Listen()),
@ -915,7 +862,10 @@ func (s *server) UnblackholeTx() {
}
func (s *server) BlackholeRx() {
s.ModifyRx(func([]byte) []byte { return nil })
s.modifyRxMu.Lock()
s.modifyRx = func([]byte) []byte { return nil }
s.modifyRxMu.Unlock()
s.lg.Info(
"blackholed rx",
zap.String("proxy listening on", s.Listen()),
@ -923,7 +873,10 @@ func (s *server) BlackholeRx() {
}
func (s *server) UnblackholeRx() {
s.UnmodifyRx()
s.modifyRxMu.Lock()
s.modifyRx = nil
s.modifyRxMu.Unlock()
s.lg.Info(
"unblackholed rx",
zap.String("proxy listening on", s.Listen()),

View File

@ -187,21 +187,20 @@ func (f dropPeerNetworkFailpoint) Inject(ctx context.Context, t *testing.T, lg *
member := clus.Procs[rand.Int()%len(clus.Procs)]
forwardProxy := member.PeerForwardProxy()
forwardProxy.ModifyRx(f.modifyPacket)
forwardProxy.ModifyTx(f.modifyPacket)
lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent))
time.Sleep(f.duration)
lg.Info("Traffic drop removed", zap.String("member", member.Config().Name))
forwardProxy.UnmodifyRx()
forwardProxy.UnmodifyTx()
if f.shouldDropPacket() {
forwardProxy.BlackholeRx()
forwardProxy.BlackholeTx()
lg.Info("Dropping traffic from and to member", zap.String("member", member.Config().Name), zap.Int("probability", f.dropProbabilityPercent))
time.Sleep(f.duration)
lg.Info("Traffic drop removed", zap.String("member", member.Config().Name))
forwardProxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
}
return nil, nil
}
func (f dropPeerNetworkFailpoint) modifyPacket(data []byte) []byte {
if rand.Intn(100) < f.dropProbabilityPercent {
return nil
}
return data
func (f dropPeerNetworkFailpoint) shouldDropPacket() bool {
return rand.Intn(100) < f.dropProbabilityPercent
}
func (f dropPeerNetworkFailpoint) Name() string {