From 9c8a23c333df9f5a71af955004b13d17a6043c45 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Wed, 15 Jan 2014 23:17:04 -0800 Subject: [PATCH] refactor(PeerServer): Use a config struct in PeerServer --- etcd.go | 17 ++++--- server/join_command.go | 2 +- server/peer_server.go | 82 +++++++++++++++++----------------- server/peer_server_handlers.go | 30 ++++++------- server/transporter.go | 6 +-- tests/server_utils.go | 13 +++++- 6 files changed, 84 insertions(+), 66 deletions(-) diff --git a/etcd.go b/etcd.go index 32116d2f8..800dcb1f3 100644 --- a/etcd.go +++ b/etcd.go @@ -103,11 +103,18 @@ func main() { registry := server.NewRegistry(store) // Create peer server. - heartbeatTimeout := time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond - electionTimeout := time.Duration(config.Peer.ElectionTimeout) * time.Millisecond - ps := server.NewPeerServer(info.Name, config.DataDir, info.RaftURL, info.RaftListenHost, &peerTLSConfig, &info.RaftTLS, registry, store, config.SnapshotCount, heartbeatTimeout, electionTimeout, &mb) - ps.MaxClusterSize = config.MaxClusterSize - ps.RetryTimes = config.MaxRetryAttempts + psConfig := server.PeerServerConfig{ + Name: info.Name, + Path: config.DataDir, + URL: info.RaftURL, + BindAddr: info.RaftListenHost, + SnapshotCount: config.SnapshotCount, + HeartbeatTimeout: time.Duration(config.Peer.HeartbeatTimeout) * time.Millisecond, + ElectionTimeout: time.Duration(config.Peer.ElectionTimeout) * time.Millisecond, + MaxClusterSize: config.MaxClusterSize, + RetryTimes: config.MaxRetryAttempts, + } + ps := server.NewPeerServer(psConfig, &peerTLSConfig, &info.RaftTLS, registry, store, &mb) // Create client server. s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &tlsConfig, &info.EtcdTLS, ps, registry, store, &mb) diff --git a/server/join_command.go b/server/join_command.go index ed262b1bd..2cf5cd71a 100644 --- a/server/join_command.go +++ b/server/join_command.go @@ -52,7 +52,7 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) { } // Check peer number in the cluster - if ps.registry.Count() == ps.MaxClusterSize { + if ps.registry.Count() == ps.Config.MaxClusterSize { log.Debug("Reject join request from ", c.Name) return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex()) } diff --git a/server/peer_server.go b/server/peer_server.go index 4e494b89b..56149a5f4 100644 --- a/server/peer_server.go +++ b/server/peer_server.go @@ -25,15 +25,25 @@ const retryInterval = 10 const ThresholdMonitorTimeout = 5 * time.Second +type PeerServerConfig struct { + Name string + Path string + URL string + BindAddr string + SnapshotCount int + HeartbeatTimeout time.Duration + ElectionTimeout time.Duration + MaxClusterSize int + RetryTimes int +} + type PeerServer struct { + Config PeerServerConfig raftServer raft.Server server *Server httpServer *http.Server listener net.Listener joinIndex uint64 - name string - url string - bindAddr string tlsConf *TLSConfig tlsInfo *TLSInfo followersStats *raftFollowersStats @@ -41,10 +51,6 @@ type PeerServer struct { registry *Registry store store.Store snapConf *snapshotConf - MaxClusterSize int - RetryTimes int - HeartbeatTimeout time.Duration - ElectionTimeout time.Duration closeChan chan bool timeoutThresholdChan chan interface{} @@ -65,22 +71,20 @@ type snapshotConf struct { snapshotThr uint64 } -func NewPeerServer(name string, path string, url string, bindAddr string, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, snapshotCount int, heartbeatTimeout, electionTimeout time.Duration, mb *metrics.Bucket) *PeerServer { - +func NewPeerServer(psConfig PeerServerConfig, tlsConf *TLSConfig, tlsInfo *TLSInfo, registry *Registry, store store.Store, mb *metrics.Bucket) *PeerServer { s := &PeerServer{ - name: name, - url: url, - bindAddr: bindAddr, + Config: psConfig, + tlsConf: tlsConf, tlsInfo: tlsInfo, registry: registry, store: store, followersStats: &raftFollowersStats{ - Leader: name, + Leader: psConfig.Name, Followers: make(map[string]*raftFollowerStats), }, serverStats: &raftServerStats{ - Name: name, + Name: psConfig.Name, StartTime: time.Now(), sendRateQueue: &statsQueue{ back: -1, @@ -89,8 +93,6 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon back: -1, }, }, - HeartbeatTimeout: heartbeatTimeout, - ElectionTimeout: electionTimeout, timeoutThresholdChan: make(chan interface{}, 1), @@ -101,7 +103,7 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s) // Create raft server - raftServer, err := raft.NewServer(name, path, raftTransporter, s.store, s, "") + raftServer, err := raft.NewServer(psConfig.Name, psConfig.Path, raftTransporter, s.store, s, "") if err != nil { log.Fatal(err) } @@ -110,7 +112,7 @@ func NewPeerServer(name string, path string, url string, bindAddr string, tlsCon checkingInterval: time.Second * 3, // this is not accurate, we will update raft to provide an api lastIndex: raftServer.CommitIndex(), - snapshotThr: uint64(snapshotCount), + snapshotThr: uint64(psConfig.SnapshotCount), } s.raftServer = raftServer @@ -134,14 +136,14 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { err := s.raftServer.LoadSnapshot() if err == nil { - log.Debugf("%s finished load snapshot", s.name) + log.Debugf("%s finished load snapshot", s.Config.Name) } else { log.Debug(err) } } - s.raftServer.SetElectionTimeout(s.ElectionTimeout) - s.raftServer.SetHeartbeatTimeout(s.HeartbeatTimeout) + s.raftServer.SetElectionTimeout(s.Config.ElectionTimeout) + s.raftServer.SetHeartbeatTimeout(s.Config.HeartbeatTimeout) s.raftServer.Start() @@ -155,7 +157,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { } else { // Rejoin the previous cluster - cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.name) + cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) for i := 0; i < len(cluster); i++ { u, err := url.Parse(cluster[i]) if err != nil { @@ -168,7 +170,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) error { log.Warn("the entire cluster is down! this peer will restart the cluster.") } - log.Debugf("%s restart as a follower", s.name) + log.Debugf("%s restart as a follower", s.Config.Name) } s.closeChan = make(chan bool) @@ -255,17 +257,17 @@ func (s *PeerServer) SetServer(server *Server) { func (s *PeerServer) startAsLeader() { // leader need to join self as a peer for { - _, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.url, s.server.URL())) + _, err := s.raftServer.Do(NewJoinCommand(store.MinVersion(), store.MaxVersion(), s.raftServer.Name(), s.Config.URL, s.server.URL())) if err == nil { break } } - log.Debugf("%s start as a leader", s.name) + log.Debugf("%s start as a leader", s.Config.Name) } func (s *PeerServer) startAsFollower(cluster []string) { // start as a follower in a existing cluster - for i := 0; i < s.RetryTimes; i++ { + for i := 0; i < s.Config.RetryTimes; i++ { ok := s.joinCluster(cluster) if ok { return @@ -274,19 +276,19 @@ func (s *PeerServer) startAsFollower(cluster []string) { time.Sleep(time.Second * retryInterval) } - log.Fatalf("Cannot join the cluster via given peers after %x retries", s.RetryTimes) + log.Fatalf("Cannot join the cluster via given peers after %x retries", s.Config.RetryTimes) } // Start to listen and response raft command func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error { - log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.bindAddr, s.url) + log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.Config.Name, s.Config.BindAddr, s.Config.URL) router := mux.NewRouter() s.httpServer = &http.Server{ Handler: router, TLSConfig: &tlsConf, - Addr: s.bindAddr, + Addr: s.Config.BindAddr, } // internal commands @@ -333,7 +335,7 @@ func getVersion(t *transporter, versionURL url.URL) (int, error) { // Upgradable checks whether all peers in a cluster support an upgrade to the next store version. func (s *PeerServer) Upgradable() error { nextVersion := s.store.Version() + 1 - for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.name) { + for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name) { u, err := url.Parse(peerURL) if err != nil { return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err) @@ -361,7 +363,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool { err := s.joinByPeer(s.raftServer, peer, s.tlsConf.Scheme) if err == nil { - log.Debugf("%s success join to the cluster via peer %s", s.name, peer) + log.Debugf("%s success join to the cluster via peer %s", s.Config.Name, peer) return true } else { @@ -392,7 +394,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion()) } - json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL())) + json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL())) joinURL := url.URL{Host: peer, Scheme: scheme, Path: "/join"} @@ -417,7 +419,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string) if resp.StatusCode == http.StatusTemporaryRedirect { address := resp.Header.Get("Location") log.Debugf("Send Join Request to %s", address) - json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.url, s.server.URL())) + json.NewEncoder(&b).Encode(NewJoinCommand(store.MinVersion(), store.MaxVersion(), server.Name(), s.Config.URL, s.server.URL())) resp, req, err = t.Post(address, &b) } else if resp.StatusCode == http.StatusBadRequest { @@ -477,21 +479,21 @@ func (s *PeerServer) raftEventLogger(event raft.Event) { switch event.Type() { case raft.StateChangeEventType: - log.Infof("%s: state changed from '%v' to '%v'.", s.name, prevValue, value) + log.Infof("%s: state changed from '%v' to '%v'.", s.Config.Name, prevValue, value) case raft.TermChangeEventType: - log.Infof("%s: term #%v started.", s.name, value) + log.Infof("%s: term #%v started.", s.Config.Name, value) case raft.LeaderChangeEventType: - log.Infof("%s: leader changed from '%v' to '%v'.", s.name, prevValue, value) + log.Infof("%s: leader changed from '%v' to '%v'.", s.Config.Name, prevValue, value) case raft.AddPeerEventType: - log.Infof("%s: peer added: '%v'", s.name, value) + log.Infof("%s: peer added: '%v'", s.Config.Name, value) case raft.RemovePeerEventType: - log.Infof("%s: peer removed: '%v'", s.name, value) + log.Infof("%s: peer removed: '%v'", s.Config.Name, value) case raft.HeartbeatTimeoutEventType: var name = "" if peer, ok := value.(*raft.Peer); ok { name = peer.Name } - log.Infof("%s: warning: heartbeat timed out: '%v'", s.name, name) + log.Infof("%s: warning: heartbeat timed out: '%v'", s.Config.Name, name) case raft.ElectionTimeoutThresholdEventType: select { case s.timeoutThresholdChan <- value: @@ -538,7 +540,7 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) { for { select { case value := <-s.timeoutThresholdChan: - log.Infof("%s: warning: heartbeat near election timeout: %v", s.name, value) + log.Infof("%s: warning: heartbeat near election timeout: %v", s.Config.Name, value) case <-closeChan: return } diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index e1b485bec..a4ef84710 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -15,7 +15,7 @@ import ( // Get all the current logs func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] GET %s/log", ps.url) + log.Debugf("[recv] GET %s/log", ps.Config.URL) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(ps.raftServer.LogEntries()) @@ -27,11 +27,11 @@ func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) if _, err := rvreq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName) + log.Debugf("[recv] POST %s/vote [%s]", ps.Config.URL, rvreq.CandidateName) resp := ps.raftServer.RequestVote(rvreq) @@ -55,11 +55,11 @@ func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http. if _, err := aereq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries)) + log.Debugf("[recv] POST %s/log/append [%d]", ps.Config.URL, len(aereq.Entries)) ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) @@ -90,11 +90,11 @@ func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Reque if _, err := ssreq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/snapshot", ps.url) + log.Debugf("[recv] POST %s/snapshot", ps.Config.URL) resp := ps.raftServer.RequestSnapshot(ssreq) @@ -117,11 +117,11 @@ func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *ht if _, err := ssrreq.Decode(req.Body); err != nil { http.Error(w, "", http.StatusBadRequest) - log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.url, err) + log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.Config.URL, err) return } - log.Debugf("[recv] POST %s/snapshotRecovery", ps.url) + log.Debugf("[recv] POST %s/snapshotRecovery", ps.Config.URL) resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq) @@ -140,7 +140,7 @@ func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *ht // Get the port that listening for etcd connecting of the server func (ps *PeerServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/etcdURL/ ", ps.url) + log.Debugf("[recv] Get %s/etcdURL/ ", ps.Config.URL) w.WriteHeader(http.StatusOK) w.Write([]byte(ps.server.URL())) } @@ -195,21 +195,21 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request // Response to the name request func (ps *PeerServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/name/ ", ps.url) + log.Debugf("[recv] Get %s/name/ ", ps.Config.URL) w.WriteHeader(http.StatusOK) - w.Write([]byte(ps.name)) + w.Write([]byte(ps.Config.Name)) } // Response to the name request func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/version/ ", ps.url) + log.Debugf("[recv] Get %s/version/ ", ps.Config.URL) w.WriteHeader(http.StatusOK) w.Write([]byte(strconv.Itoa(ps.store.Version()))) } // Checks whether a given version is supported. func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s%s ", ps.url, req.URL.Path) + log.Debugf("[recv] Get %s%s ", ps.Config.URL, req.URL.Path) vars := mux.Vars(req) version, _ := strconv.Atoi(vars["version"]) if version >= store.MinVersion() && version <= store.MaxVersion() { @@ -221,7 +221,7 @@ func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.R // Upgrades the current store version to the next version. func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) { - log.Debugf("[recv] Get %s/version", ps.url) + log.Debugf("[recv] Get %s/version", ps.Config.URL) // Check if upgrade is possible for all nodes. if err := ps.Upgradable(); err != nil { diff --git a/server/transporter.go b/server/transporter.go index 22e113605..1aa5442fd 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -27,8 +27,8 @@ type dialer func(network, addr string) (net.Conn, error) // whether the user give the server cert and key func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter { // names for each type of timeout, for the sake of clarity - dialTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout - responseHeaderTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout + dialTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout + responseHeaderTimeout := (3 * peerServer.Config.HeartbeatTimeout) + peerServer.Config.ElectionTimeout t := transporter{} @@ -227,7 +227,7 @@ func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) // Cancel the on fly HTTP transaction when timeout happens. func (t *transporter) CancelWhenTimeout(req *http.Request) { go func() { - time.Sleep(t.peerServer.HeartbeatTimeout) + time.Sleep(t.peerServer.Config.HeartbeatTimeout) t.transport.CancelRequest(req) }() } diff --git a/tests/server_utils.go b/tests/server_utils.go index 6e3d369c3..5a3c05b35 100644 --- a/tests/server_utils.go +++ b/tests/server_utils.go @@ -26,8 +26,17 @@ func RunServer(f func(*server.Server)) { store := store.New() registry := server.NewRegistry(store) - ps := server.NewPeerServer(testName, path, "http://"+testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount, testHeartbeatTimeout, testElectionTimeout, nil) - ps.MaxClusterSize = 9 + psConfig := server.PeerServerConfig{ + Name: testName, + Path: path, + URL: "http://"+testRaftURL, + BindAddr: testRaftURL, + SnapshotCount: testSnapshotCount, + HeartbeatTimeout: testHeartbeatTimeout, + ElectionTimeout: testElectionTimeout, + MaxClusterSize: 9, + } + ps := server.NewPeerServer(psConfig, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, nil) s := server.New(testName, "http://"+testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store, nil) ps.SetServer(s)