From ffa2b07dc4893f4c8f842f4f4c860f489b6a7b5e Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 11:39:41 -0800 Subject: [PATCH] refactor(transporter): Pass in everything the transporter needs --- server/peer_server.go | 9 +++++---- server/transporter.go | 32 ++++++++++++++++++-------------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/server/peer_server.go b/server/peer_server.go index d6a30f92a..781ed4db0 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -73,13 +73,14 @@ type snapshotConf struct { } func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { + followersStats := newRaftFollowersStats(psConfig.Name) + serverStats := newRaftServerStats(psConfig.Name) s := &PeerServer{ Config: psConfig, registry: registry, store: store, - - followersStats: newRaftFollowersStats(psConfig.Name), - serverStats: newRaftServerStats(psConfig.Name), + followersStats: followersStats, + serverStats: serverStats, timeoutThresholdChan: make(chan interface{}, 1), @@ -89,7 +90,7 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn // Create transporter for raft dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout - raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, s, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, followersStats, serverStats, registry, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) // Create raft server raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") diff --git a/server/transporter.go b/server/transporter.go index f9479f7a0..98be6fd1c 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -16,10 +16,12 @@ import ( // Transporter layer for communication between raft nodes type transporter struct { requestTimeout time.Duration + followersStats *raftFollowersStats + serverStats *raftServerStats + registry *Registry - peerServer *PeerServer - client *http.Client - transport *http.Transport + client *http.Client + transport *http.Transport } type dialer func(network, addr string) (net.Conn, error) @@ -27,7 +29,7 @@ type dialer func(network, addr string) (net.Conn, error) // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { +func newTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { tr := &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, dialTimeout) @@ -41,10 +43,12 @@ func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer, d } t := transporter{ - client: &http.Client{Transport: tr}, - transport: tr, - peerServer: peerServer, + client: &http.Client{Transport: tr}, + transport: tr, requestTimeout: requestTimeout, + followersStats: followersStats, + serverStats: serverStats, + registry: registry, } return &t @@ -61,18 +65,18 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe size := b.Len() - t.peerServer.serverStats.SendAppendReq(size) + t.serverStats.SendAppendReq(size) - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send LogEntries to %s ", u) - thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name] + thisFollowerStats, ok := t.followersStats.Followers[peer.Name] if !ok { //this is the first time this follower has been seen thisFollowerStats = &raftFollowerStats{} thisFollowerStats.Latency.Minimum = 1 << 63 - t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats + t.followersStats.Followers[peer.Name] = thisFollowerStats } start := time.Now() @@ -118,7 +122,7 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req * return nil } - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send Vote from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b) @@ -151,7 +155,7 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r return nil } - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) @@ -184,7 +188,7 @@ func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft return nil } - u, _ := t.peerServer.registry.PeerURL(peer.Name) + u, _ := t.registry.PeerURL(peer.Name) log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)