Merge pull request #2224 from xiang90/raftt

rafttest: separate network interface and network
This commit is contained in:
Xiang Li 2015-02-03 23:11:01 -08:00
commit 378fa46b7d
2 changed files with 28 additions and 10 deletions

View File

@ -7,17 +7,25 @@ import (
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
) )
type network interface { // a network interface
type iface interface {
send(m raftpb.Message) send(m raftpb.Message)
recv() chan raftpb.Message recv() chan raftpb.Message
disconnect()
connect()
}
// a network
type network interface {
// drop message at given rate (1.0 drops all messages) // drop message at given rate (1.0 drops all messages)
drop(from, to uint64, rate float64) drop(from, to uint64, rate float64)
// delay message for (0, d] randomly at given rate (1.0 delay all messages) // delay message for (0, d] randomly at given rate (1.0 delay all messages)
// do we need rate here? // do we need rate here?
delay(from, to uint64, d time.Duration, rate float64) delay(from, to uint64, d time.Duration, rate float64)
disconnect(id uint64) disconnect(id uint64)
connect(id uint64) connect(id uint64)
// heal heals the network
heal()
} }
type raftNetwork struct { type raftNetwork struct {
@ -38,7 +46,7 @@ func newRaftNetwork(nodes ...uint64) *raftNetwork {
return pn return pn
} }
func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork { func (rn *raftNetwork) nodeNetwork(id uint64) iface {
return &nodeNetwork{id: id, raftNetwork: rn} return &nodeNetwork{id: id, raftNetwork: rn}
} }
@ -75,6 +83,8 @@ func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
panic("unimplemented") panic("unimplemented")
} }
func (rn *raftNetwork) heal() {}
func (rn *raftNetwork) disconnect(id uint64) { func (rn *raftNetwork) disconnect(id uint64) {
rn.mu.Lock() rn.mu.Lock()
defer rn.mu.Unlock() defer rn.mu.Unlock()
@ -92,6 +102,14 @@ type nodeNetwork struct {
*raftNetwork *raftNetwork
} }
func (nt *nodeNetwork) connect() {
nt.raftNetwork.connect(nt.id)
}
func (nt *nodeNetwork) disconnect() {
nt.raftNetwork.disconnect(nt.id)
}
func (nt *nodeNetwork) send(m raftpb.Message) { func (nt *nodeNetwork) send(m raftpb.Message) {
nt.raftNetwork.send(m) nt.raftNetwork.send(m)
} }

View File

@ -13,7 +13,7 @@ type node struct {
raft.Node raft.Node
id uint64 id uint64
paused bool paused bool
nt network iface iface
stopc chan struct{} stopc chan struct{}
// stable // stable
@ -21,14 +21,14 @@ type node struct {
state raftpb.HardState state raftpb.HardState
} }
func startNode(id uint64, peers []raft.Peer, nt network) *node { func startNode(id uint64, peers []raft.Peer, iface iface) *node {
st := raft.NewMemoryStorage() st := raft.NewMemoryStorage()
rn := raft.StartNode(id, peers, 10, 1, st) rn := raft.StartNode(id, peers, 10, 1, st)
n := &node{ n := &node{
Node: rn, Node: rn,
id: id, id: id,
storage: st, storage: st,
nt: nt, iface: iface,
} }
n.start() n.start()
return n return n
@ -51,11 +51,11 @@ func (n *node) start() {
n.storage.Append(rd.Entries) n.storage.Append(rd.Entries)
go func() { go func() {
for _, m := range rd.Messages { for _, m := range rd.Messages {
n.nt.send(m) n.iface.send(m)
} }
}() }()
n.Advance() n.Advance()
case m := <-n.nt.recv(): case m := <-n.iface.recv():
n.Step(context.TODO(), m) n.Step(context.TODO(), m)
case <-n.stopc: case <-n.stopc:
n.Stop() n.Stop()
@ -72,7 +72,7 @@ func (n *node) start() {
// All in memory state of node is discarded. // All in memory state of node is discarded.
// All stable MUST be unchanged. // All stable MUST be unchanged.
func (n *node) stop() { func (n *node) stop() {
n.nt.disconnect(n.id) n.iface.disconnect()
n.stopc <- struct{}{} n.stopc <- struct{}{}
// wait for the shutdown // wait for the shutdown
<-n.stopc <-n.stopc
@ -85,7 +85,7 @@ func (n *node) restart() {
<-n.stopc <-n.stopc
n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0) n.Node = raft.RestartNode(n.id, 10, 1, n.storage, 0)
n.start() n.start()
n.nt.connect(n.id) n.iface.connect()
} }
// pause pauses the node. // pause pauses the node.