From 65cd0051fea5fb54956dee52e91fb601fb702b9a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 6 Feb 2015 15:01:07 -0800 Subject: [PATCH] rafttest: add network delay --- raft/rafttest/network.go | 18 +++++++++++++++++- raft/rafttest/network_test.go | 22 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/raft/rafttest/network.go b/raft/rafttest/network.go index 5bd75ed6d..395334957 100644 --- a/raft/rafttest/network.go +++ b/raft/rafttest/network.go @@ -33,6 +33,7 @@ type raftNetwork struct { mu sync.Mutex disconnected map[uint64]bool dropmap map[conn]float64 + delaymap map[conn]delay recvQueues map[uint64]chan raftpb.Message } @@ -40,10 +41,16 @@ type conn struct { from, to uint64 } +type delay struct { + d time.Duration + rate float64 +} + func newRaftNetwork(nodes ...uint64) *raftNetwork { pn := &raftNetwork{ recvQueues: make(map[uint64]chan raftpb.Message), dropmap: make(map[conn]float64), + delaymap: make(map[conn]delay), disconnected: make(map[uint64]bool), } @@ -64,6 +71,7 @@ func (rn *raftNetwork) send(m raftpb.Message) { to = nil } drop := rn.dropmap[conn{m.From, m.To}] + delay := rn.delaymap[conn{m.From, m.To}] rn.mu.Unlock() if to == nil { @@ -72,6 +80,11 @@ func (rn *raftNetwork) send(m raftpb.Message) { if drop != 0 && rand.Float64() < drop { return } + // TODO: shall we delay without blocking the send call? + if delay.d != 0 && rand.Float64() < delay.rate { + rd := rand.Int63n(int64(delay.d)) + time.Sleep(time.Duration(rd)) + } to <- m } @@ -94,13 +107,16 @@ func (rn *raftNetwork) drop(from, to uint64, rate float64) { } func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) { - panic("unimplemented") + rn.mu.Lock() + defer rn.mu.Unlock() + rn.delaymap[conn{from, to}] = delay{d, rate} } func (rn *raftNetwork) heal() { rn.mu.Lock() defer rn.mu.Unlock() rn.dropmap = make(map[conn]float64) + rn.delaymap = make(map[conn]delay) } func (rn *raftNetwork) disconnect(id uint64) { diff --git a/raft/rafttest/network_test.go b/raft/rafttest/network_test.go index 3718ef283..068a67084 100644 --- a/raft/rafttest/network_test.go +++ b/raft/rafttest/network_test.go @@ -2,6 +2,7 @@ package rafttest import ( "testing" + "time" "github.com/coreos/etcd/raft/raftpb" ) @@ -34,3 +35,24 @@ func TestNetworkDrop(t *testing.T) { t.Errorf("drop = %d, want around %d", drop, droprate*float64(sent)) } } + +func TestNetworkDelay(t *testing.T) { + sent := 1000 + delay := time.Millisecond + delayrate := 0.1 + nt := newRaftNetwork(1, 2) + + nt.delay(1, 2, delay, delayrate) + var total time.Duration + for i := 0; i < sent; i++ { + s := time.Now() + nt.send(raftpb.Message{From: 1, To: 2}) + total += time.Since(s) + } + + w := time.Duration(float64(sent)*delayrate/2) * delay + // there are pretty overhead in the send call, since it genarete random numbers. + if total < w+10*delay { + t.Errorf("total = %v, want > %v", total, w) + } +}