From e6d8d4046dcf199b3ba0b968d320b592bc91b052 Mon Sep 17 00:00:00 2001 From: "Fabrizio (Misto) Milo" Date: Mon, 12 Aug 2013 17:45:52 -0700 Subject: [PATCH] split raft server logic into separate module --- etcd.go | 210 +--------------------------------------------- raft_server.go | 223 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 225 insertions(+), 208 deletions(-) create mode 100644 raft_server.go diff --git a/etcd.go b/etcd.go index e12ec47b0..2306875de 100644 --- a/etcd.go +++ b/etcd.go @@ -1,8 +1,6 @@ package main import ( - "path/filepath" - "bytes" "crypto/tls" "crypto/x509" "encoding/json" @@ -11,13 +9,14 @@ import ( "fmt" "github.com/coreos/etcd/store" "github.com/coreos/etcd/web" - "github.com/coreos/go-raft" + "io/ioutil" "net" "net/http" "net/url" "os" "os/signal" + "path/filepath" "runtime/pprof" "strings" "time" @@ -133,8 +132,6 @@ type TLSConfig struct { // //------------------------------------------------------------------------------ -var raftServer *raft.Server -var raftTransporter transporter var etcdStore *store.Store var info *Info @@ -197,7 +194,6 @@ func main() { if veryVerbose { verbose = true - raft.SetLogLevel(raft.Debug) } if machines != "" { @@ -256,112 +252,6 @@ func main() { } -// Start the raft server -func startRaft(tlsConfig TLSConfig) { - var err error - - raftName := info.Name - - // Create transporter for raft - raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client) - - // Create raft server - raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil) - - if err != nil { - fatal(err) - } - - // LoadSnapshot - if snapshot { - err = raftServer.LoadSnapshot() - - if err == nil { - debugf("%s finished load snapshot", raftServer.Name()) - } else { - debug(err) - } - } - - raftServer.SetElectionTimeout(ElectionTimeout) - raftServer.SetHeartbeatTimeout(HeartbeatTimeout) - - raftServer.Start() - - if raftServer.IsLogEmpty() { - - // start as a leader in a new cluster - if len(cluster) == 0 { - - time.Sleep(time.Millisecond * 20) - - // leader need to join self as a peer - for { - command := &JoinCommand{ - Name: raftServer.Name(), - RaftURL: argInfo.RaftURL, - EtcdURL: argInfo.EtcdURL, - } - _, err := raftServer.Do(command) - if err == nil { - break - } - } - debugf("%s start as a leader", raftServer.Name()) - - // start as a follower in a existing cluster - } else { - - time.Sleep(time.Millisecond * 20) - - for i := 0; i < retryTimes; i++ { - - success := false - for _, machine := range cluster { - if len(machine) == 0 { - continue - } - err = joinCluster(raftServer, machine, tlsConfig.Scheme) - if err != nil { - if err.Error() == errors[103] { - fmt.Println(err) - os.Exit(1) - } - debugf("cannot join to cluster via machine %s %s", machine, err) - } else { - success = true - break - } - } - - if success { - break - } - - warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) - time.Sleep(time.Second * RetryInterval) - } - if err != nil { - fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) - } - debugf("%s success join to the cluster", raftServer.Name()) - } - - } else { - // rejoin the previous cluster - debugf("%s restart as a follower", raftServer.Name()) - } - - // open the snapshot - if snapshot { - go monitorSnapshot() - } - - // start to response to raft requests - go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server) - -} - // Create transporter using by raft server // Create http or https transporter based on // whether the user give the server cert and key @@ -387,37 +277,6 @@ func dialTimeout(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, HTTPTimeout) } -// Start to listen and response raft command -func startRaftTransport(info Info, scheme string, tlsConf tls.Config) { - u, _ := url.Parse(info.RaftURL) - fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) - - raftMux := http.NewServeMux() - - server := &http.Server{ - Handler: raftMux, - TLSConfig: &tlsConf, - Addr: u.Host, - } - - // internal commands - raftMux.HandleFunc("/name", NameHttpHandler) - raftMux.HandleFunc("/join", JoinHttpHandler) - raftMux.HandleFunc("/vote", VoteHttpHandler) - raftMux.HandleFunc("/log", GetLogHttpHandler) - raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) - raftMux.HandleFunc("/snapshot", SnapshotHttpHandler) - raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) - raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler) - - if scheme == "http" { - fatal(server.ListenAndServe()) - } else { - fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile)) - } - -} - // Start to listen and response client command func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) { u, _ := url.Parse(info.EtcdURL) @@ -576,68 +435,3 @@ func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) { return tls.RequireAndVerifyClientCert, certPool } - -// Send join requests to the leader. -func joinCluster(s *raft.Server, raftURL string, scheme string) error { - var b bytes.Buffer - - command := &JoinCommand{ - Name: s.Name(), - RaftURL: info.RaftURL, - EtcdURL: info.EtcdURL, - } - - json.NewEncoder(&b).Encode(command) - - // t must be ok - t, ok := raftServer.Transporter().(transporter) - - if !ok { - panic("wrong type") - } - - joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} - - debugf("Send Join Request to %s", raftURL) - - resp, err := t.Post(joinURL.String(), &b) - - for { - if err != nil { - return fmt.Errorf("Unable to join: %v", err) - } - if resp != nil { - defer resp.Body.Close() - if resp.StatusCode == http.StatusOK { - return nil - } - if resp.StatusCode == http.StatusTemporaryRedirect { - - address := resp.Header.Get("Location") - debugf("Send Join Request to %s", address) - - json.NewEncoder(&b).Encode(command) - - resp, err = t.Post(address, &b) - - } else if resp.StatusCode == http.StatusBadRequest { - debug("Reach max number machines in the cluster") - return fmt.Errorf(errors[103]) - } else { - return fmt.Errorf("Unable to join") - } - } - - } - return fmt.Errorf("Unable to join: %v", err) -} - -// Register commands to raft server -func registerCommands() { - raft.RegisterCommand(&JoinCommand{}) - raft.RegisterCommand(&SetCommand{}) - raft.RegisterCommand(&GetCommand{}) - raft.RegisterCommand(&DeleteCommand{}) - raft.RegisterCommand(&WatchCommand{}) - raft.RegisterCommand(&TestAndSetCommand{}) -} diff --git a/raft_server.go b/raft_server.go new file mode 100644 index 000000000..03a62fa65 --- /dev/null +++ b/raft_server.go @@ -0,0 +1,223 @@ +package main + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "time" + + "github.com/coreos/go-raft" +) + +var raftTransporter transporter +var raftServer *raft.Server + +// Start the raft server +func startRaft(tlsConfig TLSConfig) { + if veryVerbose { + raft.SetLogLevel(raft.Debug) + } + + var err error + + raftName := info.Name + + // Create transporter for raft + raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client) + + // Create raft server + raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil) + + if err != nil { + fatal(err) + } + + // LoadSnapshot + if snapshot { + err = raftServer.LoadSnapshot() + + if err == nil { + debugf("%s finished load snapshot", raftServer.Name()) + } else { + debug(err) + } + } + + raftServer.SetElectionTimeout(ElectionTimeout) + raftServer.SetHeartbeatTimeout(HeartbeatTimeout) + + raftServer.Start() + + if raftServer.IsLogEmpty() { + + // start as a leader in a new cluster + if len(cluster) == 0 { + + time.Sleep(time.Millisecond * 20) + + // leader need to join self as a peer + for { + command := &JoinCommand{ + Name: raftServer.Name(), + RaftURL: argInfo.RaftURL, + EtcdURL: argInfo.EtcdURL, + } + _, err := raftServer.Do(command) + if err == nil { + break + } + } + debugf("%s start as a leader", raftServer.Name()) + + // start as a follower in a existing cluster + } else { + + time.Sleep(time.Millisecond * 20) + + for i := 0; i < retryTimes; i++ { + + success := false + for _, machine := range cluster { + if len(machine) == 0 { + continue + } + err = joinCluster(raftServer, machine, tlsConfig.Scheme) + if err != nil { + if err.Error() == errors[103] { + fmt.Println(err) + os.Exit(1) + } + debugf("cannot join to cluster via machine %s %s", machine, err) + } else { + success = true + break + } + } + + if success { + break + } + + warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) + time.Sleep(time.Second * RetryInterval) + } + if err != nil { + fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) + } + debugf("%s success join to the cluster", raftServer.Name()) + } + + } else { + // rejoin the previous cluster + debugf("%s restart as a follower", raftServer.Name()) + } + + // open the snapshot + if snapshot { + go monitorSnapshot() + } + + // start to response to raft requests + go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server) + +} + +// Start to listen and response raft command +func startRaftTransport(info Info, scheme string, tlsConf tls.Config) { + u, _ := url.Parse(info.RaftURL) + fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) + + raftMux := http.NewServeMux() + + server := &http.Server{ + Handler: raftMux, + TLSConfig: &tlsConf, + Addr: u.Host, + } + + // internal commands + raftMux.HandleFunc("/name", NameHttpHandler) + raftMux.HandleFunc("/join", JoinHttpHandler) + raftMux.HandleFunc("/vote", VoteHttpHandler) + raftMux.HandleFunc("/log", GetLogHttpHandler) + raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) + raftMux.HandleFunc("/snapshot", SnapshotHttpHandler) + raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) + raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler) + + if scheme == "http" { + fatal(server.ListenAndServe()) + } else { + fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile)) + } + +} + +// Send join requests to the leader. +func joinCluster(s *raft.Server, raftURL string, scheme string) error { + var b bytes.Buffer + + command := &JoinCommand{ + Name: s.Name(), + RaftURL: info.RaftURL, + EtcdURL: info.EtcdURL, + } + + json.NewEncoder(&b).Encode(command) + + // t must be ok + t, ok := raftServer.Transporter().(transporter) + + if !ok { + panic("wrong type") + } + + joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} + + debugf("Send Join Request to %s", raftURL) + + resp, err := t.Post(joinURL.String(), &b) + + for { + if err != nil { + return fmt.Errorf("Unable to join: %v", err) + } + if resp != nil { + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return nil + } + if resp.StatusCode == http.StatusTemporaryRedirect { + + address := resp.Header.Get("Location") + debugf("Send Join Request to %s", address) + + json.NewEncoder(&b).Encode(command) + + resp, err = t.Post(address, &b) + + } else if resp.StatusCode == http.StatusBadRequest { + debug("Reach max number machines in the cluster") + return fmt.Errorf(errors[103]) + } else { + return fmt.Errorf("Unable to join") + } + } + + } + return fmt.Errorf("Unable to join: %v", err) +} + +// Register commands to raft server +func registerCommands() { + raft.RegisterCommand(&JoinCommand{}) + raft.RegisterCommand(&SetCommand{}) + raft.RegisterCommand(&GetCommand{}) + raft.RegisterCommand(&DeleteCommand{}) + raft.RegisterCommand(&WatchCommand{}) + raft.RegisterCommand(&TestAndSetCommand{}) +}