From a2ee62039438cf5cf81278f758e9d38518378a62 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 20 Jan 2014 12:42:13 -0800 Subject: [PATCH] refactor(raft): init raft transporter & server in main --- etcd.go | 24 +++++++++++++++--- server/listener.go | 2 +- server/peer_server.go | 46 ++++++++++++----------------------- server/raft_follower_stats.go | 2 +- server/raft_server_stats.go | 2 +- server/transporter.go | 2 +- tests/server_utils.go | 22 ++++++++++++++--- 7 files changed, 59 insertions(+), 41 deletions(-) diff --git a/etcd.go b/etcd.go index 5f6222149..ac1be5f1b 100644 --- a/etcd.go +++ b/etcd.go @@ -110,6 +110,16 @@ func main() { store := store.New() registry := server.NewRegistry(store) + // Create stats objects + followersStats := server.NewRaftFollowersStats(info.Name) + serverStats := server.NewRaftServerStats(info.Name) + + // Calculate all of our timeouts + heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond + electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond + dialTimeout := (3 * heartbeatTimeout) + electionTimeout + responseHeaderTimeout := (3 * heartbeatTimeout) + electionTimeout + // Create peer server. psConfig := server.PeerServerConfig{ Name: info.Name, @@ -117,13 +127,11 @@ func main() { Scheme: peerTLSConfig.Scheme, URL: info.RaftURL, SnapshotCount: config.SnapshotCount, - HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond, - ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, MaxClusterSize: config.MaxClusterSize, RetryTimes: config.MaxRetryAttempts, CORS: corsInfo, } - ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) + ps := server.NewPeerServer(psConfig, registry, store, &mb, followersStats, serverStats) var psListener net.Listener if psConfig.Scheme == "https" { @@ -135,6 +143,16 @@ func main() { panic(err) } + // Create Raft transporter and server + raftTransporter := server.NewTransporter(peerTLSConfig.Scheme, peerTLSConfig.Client, followersStats, serverStats, registry, heartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftServer, err := raft.NewServer(info.Name, config.DataDir, raftTransporter, store, ps, "") + if err != nil { + log.Fatal(err) + } + raftServer.SetElectionTimeout(electionTimeout) + raftServer.SetHeartbeatTimeout(heartbeatTimeout) + ps.SetRaftServer(raftServer) + // Create client server. sConfig := server.ServerConfig{ Name: info.Name, diff --git a/server/listener.go b/server/listener.go index 291d022ae..dd3cfa9e1 100644 --- a/server/listener.go +++ b/server/listener.go @@ -16,7 +16,7 @@ func NewListener(addr string) (net.Listener, error) { return l, nil } -func NewTLSListener(addr, keyFile, certFile string) (net.Listener, error) { +func NewTLSListener(addr, certFile, keyFile string) (net.Listener, error) { if addr == "" { addr = ":https" } diff --git a/server/peer_server.go b/server/peer_server.go index 781ed4db0..5576cc6d7 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -43,8 +43,6 @@ type PeerServer struct { raftServer raft.Server server *Server joinIndex uint64 - tlsConf *TLSConfig - tlsInfo *TLSInfo followersStats *raftFollowersStats serverStats *raftServerStats registry *Registry @@ -72,9 +70,7 @@ type snapshotConf struct { snapshotThr uint64 } -func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { - followersStats := newRaftFollowersStats(psConfig.Name) - serverStats := newRaftServerStats(psConfig.Name) +func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer { s := &PeerServer{ Config: psConfig, registry: registry, @@ -86,37 +82,28 @@ func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSIn metrics: mb, } + return s +} - // Create transporter for raft - dialTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout - responseHeaderTimeout := (3 * psConfig.HeartbeatTimeout) + psConfig.ElectionTimeout - raftTransporter := newTransporter(psConfig.Scheme, tlsConf.Client, followersStats, serverStats, registry, psConfig.HeartbeatTimeout, dialTimeout, responseHeaderTimeout) - - // Create raft server - raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") - if err != nil { - log.Fatal(err) - } - +func (s *PeerServer) SetRaftServer(raftServer raft.Server) { s.snapConf = &snapshotConf{ checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api lastIndex: raftServer.CommitIndex(), - snapshotThr: uint64(psConfig.SnapshotCount), + snapshotThr: uint64(s.Config.SnapshotCount), } + raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger) + raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger) + + raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) + s.raftServer = raftServer - s.raftServer.AddEventListener(raft.StateChangeEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.LeaderChangeEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.TermChangeEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.AddPeerEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.RemovePeerEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.HeartbeatTimeoutEventType, s.raftEventLogger) - s.raftServer.AddEventListener(raft.ElectionTimeoutThresholdEventType, s.raftEventLogger) - - s.raftServer.AddEventListener(raft.HeartbeatEventType, s.recordMetricEvent) - - return s } // Start the raft server @@ -132,9 +119,6 @@ func (s *PeerServer) Serve(listener net.Listener, snapshot bool, cluster []strin } } - s.raftServer.SetElectionTimeout(s.Config.ElectionTimeout) - s.raftServer.SetHeartbeatTimeout(s.Config.HeartbeatTimeout) - s.raftServer.Start() if s.raftServer.IsLogEmpty() { diff --git a/server/raft_follower_stats.go b/server/raft_follower_stats.go index b22bb803e..1166c02d6 100644 --- a/server/raft_follower_stats.go +++ b/server/raft_follower_stats.go @@ -10,7 +10,7 @@ type raftFollowersStats struct { Followers map[string]*raftFollowerStats `json:"followers"` } -func newRaftFollowersStats(name string) *raftFollowersStats { +func NewRaftFollowersStats(name string) *raftFollowersStats { return &raftFollowersStats{ Leader: name, Followers: make(map[string]*raftFollowerStats), diff --git a/server/raft_server_stats.go b/server/raft_server_stats.go index 518863539..05bc04f85 100644 --- a/server/raft_server_stats.go +++ b/server/raft_server_stats.go @@ -29,7 +29,7 @@ type raftServerStats struct { recvRateQueue *statsQueue } -func newRaftServerStats(name string) *raftServerStats { +func NewRaftServerStats(name string) *raftServerStats { return &raftServerStats{ Name: name, StartTime: time.Now(), diff --git a/server/transporter.go b/server/transporter.go index 98be6fd1c..3b0cb6d68 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -29,7 +29,7 @@ type dialer func(network, addr string) (net.Conn, error) // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { +func NewTransporter(scheme string, tlsConf tls.Config, followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter { tr := &http.Transport{ Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, dialTimeout) diff --git a/tests/server_utils.go b/tests/server_utils.go index 596f960a0..cbe11aa1e 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -5,6 +5,8 @@ import ( "os" "time" + "github.com/coreos/raft" + "github.com/coreos/etcd/server" "github.com/coreos/etcd/store" ) @@ -27,23 +29,37 @@ func RunServer(f func(*server.Server)) { registry := server.NewRegistry(store) corsInfo, _ := server.NewCORSInfo([]string{}) + serverStats := server.NewRaftServerStats(testName) + followersStats := server.NewRaftFollowersStats(testName) + psConfig := server.PeerServerConfig{ Name: testName, Path: path, URL: "http://"+testRaftURL, Scheme: "http", SnapshotCount: testSnapshotCount, - HeartbeatTimeout: testHeartbeatTimeout, - ElectionTimeout: testElectionTimeout, MaxClusterSize: 9, CORS: corsInfo, } - ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil) + ps := server.NewPeerServer(psConfig, registry, store, nil, followersStats, serverStats) psListener, err := server.NewListener(testRaftURL) if err != nil { panic(err) } + // Create Raft transporter and server + tls := &server.TLSConfig{Scheme: "http"} + dialTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout + responseHeaderTimeout := (3 * testHeartbeatTimeout) + testElectionTimeout + raftTransporter := server.NewTransporter(tls.Scheme, tls.Client, followersStats, serverStats, registry, testHeartbeatTimeout, dialTimeout, responseHeaderTimeout) + raftServer, err := raft.NewServer(testName, path, raftTransporter, store, ps, "") + if err != nil { + panic(err) + } + raftServer.SetElectionTimeout(testElectionTimeout) + raftServer.SetHeartbeatTimeout(testHeartbeatTimeout) + ps.SetRaftServer(raftServer) + sConfig := server.ServerConfig{ Name: testName, URL: "http://"+testClientURL,