refactor(transporter): Pass in everything the transporter needs

This commit is contained in:
Brian Waldon 2014-01-20 11:39:41 -08:00
parent 60bbc57aeb
commit ffa2b07dc4
2 changed files with 23 additions and 18 deletions

View File

@ -73,13 +73,14 @@ type snapshotConf struct {
} }
func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer {
followersStats := newRaftFollowersStats(psConfig.Name)
serverStats := newRaftServerStats(psConfig.Name)
s := &PeerServer{ s := &PeerServer{
Config: psConfig, Config: psConfig,
registry: registry, registry: registry,
store: store, store: store,
followersStats: followersStats,
followersStats: newRaftFollowersStats(psConfig.Name), serverStats: serverStats,
serverStats: newRaftServerStats(psConfig.Name),
timeoutThresholdChan: make(chan interface{}, 1), timeoutThresholdChan: make(chan interface{}, 1),
@ -89,7 +90,7 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn
// Create transporter for raft // Create transporter for raft
dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout
responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout
raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, s, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, followersStats, serverStats, registry, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout)
// Create raft server // Create raft server
raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "")

View File

@ -16,10 +16,12 @@ import (
// Transporter layer for communication between raft nodes // Transporter layer for communication between raft nodes
type transporter struct { type transporter struct {
requestTimeout time.Duration requestTimeout time.Duration
followersStats *raftFollowersStats
serverStats *raftServerStats
registry *Registry
peerServer *PeerServer client *http.Client
client *http.Client transport *http.Transport
transport *http.Transport
} }
type dialer func(network, addr string) (net.Conn, error) type dialer func(network, addr string) (net.Conn, error)
@ -27,7 +29,7 @@ type dialer func(network, addr string) (net.Conn, error)
// Create transporter using by raft server // Create transporter using by raft server
// Create http or https transporter based on // Create http or https transporter based on
// whether the user give the server cert and key // whether the user give the server cert and key
func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { func newTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
tr := &http.Transport{ tr := &http.Transport{
Dial: func(network, addr string) (net.Conn, error) { Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, dialTimeout) return net.DialTimeout(network, addr, dialTimeout)
@ -41,10 +43,12 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, d
} }
t := transporter{ t := transporter{
client: &http.Client{Transport: tr}, client: &http.Client{Transport: tr},
transport: tr, transport: tr,
peerServer: peerServer,
requestTimeout: requestTimeout, requestTimeout: requestTimeout,
followersStats: followersStats,
serverStats: serverStats,
registry: registry,
} }
return &t return &t
@ -61,18 +65,18 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe
size := b.Len() size := b.Len()
t.peerServer.serverStats.SendAppendReq(size) t.serverStats.SendAppendReq(size)
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send LogEntries to %s ", u) log.Debugf("Send LogEntries to %s ", u)
thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name] thisFollowerStats, ok := t.followersStats.Followers[peer.Name]
if !ok { //this is the first time this follower has been seen if !ok { //this is the first time this follower has been seen
thisFollowerStats = &raftFollowerStats{} thisFollowerStats = &raftFollowerStats{}
thisFollowerStats.Latency.Minimum = 1 << 63 thisFollowerStats.Latency.Minimum = 1 << 63
t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats t.followersStats.Followers[peer.Name] = thisFollowerStats
} }
start := time.Now() start := time.Now()
@ -118,7 +122,7 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *
return nil return nil
} }
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send Vote from %s to %s", server.Name(), u) log.Debugf("Send Vote from %s to %s", server.Name(), u)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
@ -151,7 +155,7 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r
return nil return nil
} }
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u) log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
@ -184,7 +188,7 @@ func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft
return nil return nil
} }
u, _ := t.peerServer.registry.PeerURL(peer.Name) u, _ := t.registry.PeerURL(peer.Name)
log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u) log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)