diff --git a/config.go b/config.go new file mode 100644 index 000000000..ebd2c5531 --- /dev/null +++ b/config.go @@ -0,0 +1,139 @@ +package main + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "encoding/pem" + "io/ioutil" + "os" + "path/filepath" +) + +//-------------------------------------- +// Config +//-------------------------------------- + +func parseInfo(path string) *Info { + file, err := os.Open(path) + + if err != nil { + return nil + } + defer file.Close() + + info := &Info{} + + content, err := ioutil.ReadAll(file) + if err != nil { + fatalf("Unable to read info: %v", err) + return nil + } + + if err = json.Unmarshal(content, &info); err != nil { + fatalf("Unable to parse info: %v", err) + return nil + } + + return info +} + +// Get the server info from previous conf file +// or from the user +func getInfo(path string) *Info { + + // Read in the server info if available. + infoPath := filepath.Join(path, "info") + + // Delete the old configuration if exist + if force { + logPath := filepath.Join(path, "log") + confPath := filepath.Join(path, "conf") + snapshotPath := filepath.Join(path, "snapshot") + os.Remove(infoPath) + os.Remove(logPath) + os.Remove(confPath) + os.RemoveAll(snapshotPath) + } + + info := parseInfo(infoPath) + if info != nil { + infof("Found node configuration in '%s'. Ignoring flags", infoPath) + return info + } + + info = &argInfo + + // Write to file. + content, _ := json.MarshalIndent(info, "", " ") + content = []byte(string(content) + "\n") + if err := ioutil.WriteFile(infoPath, content, 0644); err != nil { + fatalf("Unable to write info to file: %v", err) + } + + infof("Wrote node configuration to '%s'", infoPath) + + return info +} + +func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) { + var keyFile, certFile, CAFile string + var tlsCert tls.Certificate + var err error + + t.Scheme = "http" + + keyFile = info.KeyFile + certFile = info.CertFile + CAFile = info.CAFile + + // If the user do not specify key file, cert file and + // CA file, the type will be HTTP + if keyFile == "" && certFile == "" && CAFile == "" { + return t, true + } + + // both the key and cert must be present + if keyFile == "" || certFile == "" { + return t, false + } + + tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + fatal(err) + } + + t.Scheme = "https" + t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile) + + // The client should trust the RootCA that the Server uses since + // everyone is a peer in the network. + t.Client.Certificates = []tls.Certificate{tlsCert} + t.Client.RootCAs = t.Server.ClientCAs + + return t, true +} + +// newCertPool creates x509 certPool and corresponding Auth Type. +// If the given CAfile is valid, add the cert into the pool and verify the clients' +// certs against the cert in the pool. +// If the given CAfile is empty, do not verify the clients' cert. +// If the given CAfile is not valid, fatal. +func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) { + if CAFile == "" { + return tls.NoClientCert, nil + } + pemByte, err := ioutil.ReadFile(CAFile) + check(err) + + block, pemByte := pem.Decode(pemByte) + + cert, err := x509.ParseCertificate(block.Bytes) + check(err) + + certPool := x509.NewCertPool() + + certPool.AddCert(cert) + + return tls.RequireAndVerifyClientCert, certPool +} diff --git a/etcd.go b/etcd.go index e12ec47b0..184ab9e63 100644 --- a/etcd.go +++ b/etcd.go @@ -1,19 +1,11 @@ package main import ( - "path/filepath" - "bytes" "crypto/tls" - "crypto/x509" - "encoding/json" - "encoding/pem" "flag" - "fmt" "github.com/coreos/etcd/store" "github.com/coreos/etcd/web" - "github.com/coreos/go-raft" "io/ioutil" - "net" "net/http" "net/url" "os" @@ -133,8 +125,6 @@ type TLSConfig struct { // //------------------------------------------------------------------------------ -var raftServer *raft.Server -var raftTransporter transporter var etcdStore *store.Store var info *Info @@ -144,30 +134,6 @@ var info *Info // //------------------------------------------------------------------------------ -// sanitizeURL will cleanup a host string in the format hostname:port and -// attach a schema. -func sanitizeURL(host string, defaultScheme string) string { - // Blank URLs are fine input, just return it - if len(host) == 0 { - return host - } - - p, err := url.Parse(host) - if err != nil { - fatal(err) - } - - // Make sure the host is in Host:Port format - _, _, err = net.SplitHostPort(host) - if err != nil { - fatal(err) - } - - p = &url.URL{Host: host, Scheme: defaultScheme} - - return p.String() -} - //-------------------------------------- // Main //-------------------------------------- @@ -187,7 +153,7 @@ func main() { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - fmt.Printf("captured %v, stopping profiler and exiting..", sig) + infof("captured %v, stopping profiler and exiting..", sig) pprof.StopCPUProfile() os.Exit(1) } @@ -197,7 +163,6 @@ func main() { if veryVerbose { verbose = true - raft.SetLogLevel(raft.Debug) } if machines != "" { @@ -256,388 +221,23 @@ func main() { } -// Start the raft server -func startRaft(tlsConfig TLSConfig) { - var err error - - raftName := info.Name - - // Create transporter for raft - raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client) - - // Create raft server - raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil) - - if err != nil { - fatal(err) - } - - // LoadSnapshot - if snapshot { - err = raftServer.LoadSnapshot() - - if err == nil { - debugf("%s finished load snapshot", raftServer.Name()) - } else { - debug(err) - } - } - - raftServer.SetElectionTimeout(ElectionTimeout) - raftServer.SetHeartbeatTimeout(HeartbeatTimeout) - - raftServer.Start() - - if raftServer.IsLogEmpty() { - - // start as a leader in a new cluster - if len(cluster) == 0 { - - time.Sleep(time.Millisecond * 20) - - // leader need to join self as a peer - for { - command := &JoinCommand{ - Name: raftServer.Name(), - RaftURL: argInfo.RaftURL, - EtcdURL: argInfo.EtcdURL, - } - _, err := raftServer.Do(command) - if err == nil { - break - } - } - debugf("%s start as a leader", raftServer.Name()) - - // start as a follower in a existing cluster - } else { - - time.Sleep(time.Millisecond * 20) - - for i := 0; i < retryTimes; i++ { - - success := false - for _, machine := range cluster { - if len(machine) == 0 { - continue - } - err = joinCluster(raftServer, machine, tlsConfig.Scheme) - if err != nil { - if err.Error() == errors[103] { - fmt.Println(err) - os.Exit(1) - } - debugf("cannot join to cluster via machine %s %s", machine, err) - } else { - success = true - break - } - } - - if success { - break - } - - warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) - time.Sleep(time.Second * RetryInterval) - } - if err != nil { - fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) - } - debugf("%s success join to the cluster", raftServer.Name()) - } - - } else { - // rejoin the previous cluster - debugf("%s restart as a follower", raftServer.Name()) - } - - // open the snapshot - if snapshot { - go monitorSnapshot() - } - - // start to response to raft requests - go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server) - -} - -// Create transporter using by raft server -// Create http or https transporter based on -// whether the user give the server cert and key -func newTransporter(scheme string, tlsConf tls.Config) transporter { - t := transporter{} - - tr := &http.Transport{ - Dial: dialTimeout, - } - - if scheme == "https" { - tr.TLSClientConfig = &tlsConf - tr.DisableCompression = true - } - - t.client = &http.Client{Transport: tr} - - return t -} - -// Dial with timeout -func dialTimeout(network, addr string) (net.Conn, error) { - return net.DialTimeout(network, addr, HTTPTimeout) -} - -// Start to listen and response raft command -func startRaftTransport(info Info, scheme string, tlsConf tls.Config) { - u, _ := url.Parse(info.RaftURL) - fmt.Printf("raft server [%s] listening on %s\n", info.Name, u) - - raftMux := http.NewServeMux() - - server := &http.Server{ - Handler: raftMux, - TLSConfig: &tlsConf, - Addr: u.Host, - } - - // internal commands - raftMux.HandleFunc("/name", NameHttpHandler) - raftMux.HandleFunc("/join", JoinHttpHandler) - raftMux.HandleFunc("/vote", VoteHttpHandler) - raftMux.HandleFunc("/log", GetLogHttpHandler) - raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) - raftMux.HandleFunc("/snapshot", SnapshotHttpHandler) - raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) - raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler) - - if scheme == "http" { - fatal(server.ListenAndServe()) - } else { - fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile)) - } - -} - // Start to listen and response client command func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) { - u, _ := url.Parse(info.EtcdURL) - fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u) + u, err := url.Parse(info.EtcdURL) + if err != nil { + fatalf("invalid url '%s': %s", info.EtcdURL, err) + } + infof("etcd server [%s:%s]", info.Name, u) - etcdMux := http.NewServeMux() - - server := &http.Server{ - Handler: etcdMux, + server := http.Server{ + Handler: NewEtcdMuxer(), TLSConfig: &tlsConf, Addr: u.Host, } - // external commands - etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer) - etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler) - etcdMux.HandleFunc("/leader", LeaderHttpHandler) - etcdMux.HandleFunc("/machines", MachinesHttpHandler) - etcdMux.HandleFunc("/", VersionHttpHandler) - etcdMux.HandleFunc("/stats", StatsHttpHandler) - etcdMux.HandleFunc("/test/", TestHttpHandler) - if scheme == "http" { fatal(server.ListenAndServe()) } else { fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile)) } } - -//-------------------------------------- -// Config -//-------------------------------------- - -func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) { - var keyFile, certFile, CAFile string - var tlsCert tls.Certificate - var err error - - t.Scheme = "http" - - keyFile = info.KeyFile - certFile = info.CertFile - CAFile = info.CAFile - - // If the user do not specify key file, cert file and - // CA file, the type will be HTTP - if keyFile == "" && certFile == "" && CAFile == "" { - return t, true - } - - // both the key and cert must be present - if keyFile == "" || certFile == "" { - return t, false - } - - tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - fatal(err) - } - - t.Scheme = "https" - t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile) - - // The client should trust the RootCA that the Server uses since - // everyone is a peer in the network. - t.Client.Certificates = []tls.Certificate{tlsCert} - t.Client.RootCAs = t.Server.ClientCAs - - return t, true -} - -func parseInfo(path string) *Info { - file, err := os.Open(path) - - if err != nil { - return nil - } - - info := &Info{} - defer file.Close() - - content, err := ioutil.ReadAll(file) - if err != nil { - fatalf("Unable to read info: %v", err) - return nil - } - - if err = json.Unmarshal(content, &info); err != nil { - fatalf("Unable to parse info: %v", err) - return nil - } - - return info -} - -// Get the server info from previous conf file -// or from the user -func getInfo(path string) *Info { - - // Read in the server info if available. - infoPath := filepath.Join(path, "info") - - // Delete the old configuration if exist - if force { - logPath := filepath.Join(path, "log") - confPath := filepath.Join(path, "conf") - snapshotPath := filepath.Join(path, "snapshot") - os.Remove(infoPath) - os.Remove(logPath) - os.Remove(confPath) - os.RemoveAll(snapshotPath) - } - - info := parseInfo(infoPath) - if info != nil { - fmt.Printf("Found node configuration in '%s'. Ignoring flags.\n", infoPath) - return info - } - - info = &argInfo - - // Write to file. - content, _ := json.MarshalIndent(info, "", " ") - content = []byte(string(content) + "\n") - if err := ioutil.WriteFile(infoPath, content, 0644); err != nil { - fatalf("Unable to write info to file: %v", err) - } - - fmt.Printf("Wrote node configuration to '%s'.\n", infoPath) - - return info -} - -// newCertPool creates x509 certPool and corresponding Auth Type. -// If the given CAfile is valid, add the cert into the pool and verify the clients' -// certs against the cert in the pool. -// If the given CAfile is empty, do not verify the clients' cert. -// If the given CAfile is not valid, fatal. -func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) { - if CAFile == "" { - return tls.NoClientCert, nil - } - pemByte, _ := ioutil.ReadFile(CAFile) - - block, pemByte := pem.Decode(pemByte) - - cert, err := x509.ParseCertificate(block.Bytes) - - if err != nil { - fatal(err) - } - - certPool := x509.NewCertPool() - - certPool.AddCert(cert) - - return tls.RequireAndVerifyClientCert, certPool -} - -// Send join requests to the leader. -func joinCluster(s *raft.Server, raftURL string, scheme string) error { - var b bytes.Buffer - - command := &JoinCommand{ - Name: s.Name(), - RaftURL: info.RaftURL, - EtcdURL: info.EtcdURL, - } - - json.NewEncoder(&b).Encode(command) - - // t must be ok - t, ok := raftServer.Transporter().(transporter) - - if !ok { - panic("wrong type") - } - - joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} - - debugf("Send Join Request to %s", raftURL) - - resp, err := t.Post(joinURL.String(), &b) - - for { - if err != nil { - return fmt.Errorf("Unable to join: %v", err) - } - if resp != nil { - defer resp.Body.Close() - if resp.StatusCode == http.StatusOK { - return nil - } - if resp.StatusCode == http.StatusTemporaryRedirect { - - address := resp.Header.Get("Location") - debugf("Send Join Request to %s", address) - - json.NewEncoder(&b).Encode(command) - - resp, err = t.Post(address, &b) - - } else if resp.StatusCode == http.StatusBadRequest { - debug("Reach max number machines in the cluster") - return fmt.Errorf(errors[103]) - } else { - return fmt.Errorf("Unable to join") - } - } - - } - return fmt.Errorf("Unable to join: %v", err) -} - -// Register commands to raft server -func registerCommands() { - raft.RegisterCommand(&JoinCommand{}) - raft.RegisterCommand(&SetCommand{}) - raft.RegisterCommand(&GetCommand{}) - raft.RegisterCommand(&DeleteCommand{}) - raft.RegisterCommand(&WatchCommand{}) - raft.RegisterCommand(&TestAndSetCommand{}) -} diff --git a/etcd_handlers.go b/etcd_handlers.go index 1c45d3487..b6f8c793a 100644 --- a/etcd_handlers.go +++ b/etcd_handlers.go @@ -12,6 +12,19 @@ import ( // Handlers to handle etcd-store related request via etcd url //------------------------------------------------------------------- +func NewEtcdMuxer() *http.ServeMux { + // external commands + etcdMux := http.NewServeMux() + etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer) + etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler) + etcdMux.HandleFunc("/leader", LeaderHttpHandler) + etcdMux.HandleFunc("/machines", MachinesHttpHandler) + etcdMux.HandleFunc("/", VersionHttpHandler) + etcdMux.HandleFunc("/stats", StatsHttpHandler) + etcdMux.HandleFunc("/test/", TestHttpHandler) + return etcdMux +} + // Multiplex GET/POST/DELETE request to corresponding handlers func Multiplexer(w http.ResponseWriter, req *http.Request) { @@ -152,8 +165,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { return } } else { + leader := raftServer.Leader() // current no leader - if raftServer.Leader() == "" { + if leader == "" { (*w).WriteHeader(http.StatusInternalServerError) (*w).Write(newJsonError(300, "")) return @@ -166,10 +180,10 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) { var url string if etcd { - etcdAddr, _ := nameToEtcdURL(raftServer.Leader()) + etcdAddr, _ := nameToEtcdURL(leader) url = etcdAddr + path } else { - raftAddr, _ := nameToRaftURL(raftServer.Leader()) + raftAddr, _ := nameToRaftURL(leader) url = raftAddr + path } diff --git a/raft_server.go b/raft_server.go new file mode 100644 index 000000000..d97aa803c --- /dev/null +++ b/raft_server.go @@ -0,0 +1,221 @@ +package main + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/coreos/go-raft" +) + +var raftTransporter transporter +var raftServer *raft.Server + +// Start the raft server +func startRaft(tlsConfig TLSConfig) { + if veryVerbose { + raft.SetLogLevel(raft.Debug) + } + + var err error + + raftName := info.Name + + // Create transporter for raft + raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client) + + // Create raft server + raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil) + + if err != nil { + fatal(err) + } + + // LoadSnapshot + if snapshot { + err = raftServer.LoadSnapshot() + + if err == nil { + debugf("%s finished load snapshot", raftServer.Name()) + } else { + debug(err) + } + } + + raftServer.SetElectionTimeout(ElectionTimeout) + raftServer.SetHeartbeatTimeout(HeartbeatTimeout) + + raftServer.Start() + + if raftServer.IsLogEmpty() { + + // start as a leader in a new cluster + if len(cluster) == 0 { + + time.Sleep(time.Millisecond * 20) + + // leader need to join self as a peer + for { + command := &JoinCommand{ + Name: raftServer.Name(), + RaftURL: argInfo.RaftURL, + EtcdURL: argInfo.EtcdURL, + } + _, err := raftServer.Do(command) + if err == nil { + break + } + } + debugf("%s start as a leader", raftServer.Name()) + + // start as a follower in a existing cluster + } else { + + time.Sleep(time.Millisecond * 20) + + for i := 0; i < retryTimes; i++ { + + success := false + for _, machine := range cluster { + if len(machine) == 0 { + continue + } + err = joinCluster(raftServer, machine, tlsConfig.Scheme) + if err != nil { + if err.Error() == errors[103] { + fatal(err) + } + debugf("cannot join to cluster via machine %s %s", machine, err) + } else { + success = true + break + } + } + + if success { + break + } + + warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval) + time.Sleep(time.Second * RetryInterval) + } + if err != nil { + fatalf("Cannot join the cluster via given machines after %x retries", retryTimes) + } + debugf("%s success join to the cluster", raftServer.Name()) + } + + } else { + // rejoin the previous cluster + debugf("%s restart as a follower", raftServer.Name()) + } + + // open the snapshot + if snapshot { + go monitorSnapshot() + } + + // start to response to raft requests + go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server) + +} + +// Start to listen and response raft command +func startRaftTransport(info Info, scheme string, tlsConf tls.Config) { + u, _ := url.Parse(info.RaftURL) + infof("raft server [%s:%s]", info.Name, u) + + raftMux := http.NewServeMux() + + server := &http.Server{ + Handler: raftMux, + TLSConfig: &tlsConf, + Addr: u.Host, + } + + // internal commands + raftMux.HandleFunc("/name", NameHttpHandler) + raftMux.HandleFunc("/join", JoinHttpHandler) + raftMux.HandleFunc("/vote", VoteHttpHandler) + raftMux.HandleFunc("/log", GetLogHttpHandler) + raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler) + raftMux.HandleFunc("/snapshot", SnapshotHttpHandler) + raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler) + raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler) + + if scheme == "http" { + fatal(server.ListenAndServe()) + } else { + fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile)) + } + +} + +// Send join requests to the leader. +func joinCluster(s *raft.Server, raftURL string, scheme string) error { + var b bytes.Buffer + + command := &JoinCommand{ + Name: s.Name(), + RaftURL: info.RaftURL, + EtcdURL: info.EtcdURL, + } + + json.NewEncoder(&b).Encode(command) + + // t must be ok + t, ok := raftServer.Transporter().(transporter) + + if !ok { + panic("wrong type") + } + + joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"} + + debugf("Send Join Request to %s", raftURL) + + resp, err := t.Post(joinURL.String(), &b) + + for { + if err != nil { + return fmt.Errorf("Unable to join: %v", err) + } + if resp != nil { + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return nil + } + if resp.StatusCode == http.StatusTemporaryRedirect { + + address := resp.Header.Get("Location") + debugf("Send Join Request to %s", address) + + json.NewEncoder(&b).Encode(command) + + resp, err = t.Post(address, &b) + + } else if resp.StatusCode == http.StatusBadRequest { + debug("Reach max number machines in the cluster") + return fmt.Errorf(errors[103]) + } else { + return fmt.Errorf("Unable to join") + } + } + + } + return fmt.Errorf("Unable to join: %v", err) +} + +// Register commands to raft server +func registerCommands() { + raft.RegisterCommand(&JoinCommand{}) + raft.RegisterCommand(&SetCommand{}) + raft.RegisterCommand(&GetCommand{}) + raft.RegisterCommand(&DeleteCommand{}) + raft.RegisterCommand(&WatchCommand{}) + raft.RegisterCommand(&TestAndSetCommand{}) +} diff --git a/transporter.go b/transporter.go index 59a385bb3..74bd33da9 100644 --- a/transporter.go +++ b/transporter.go @@ -2,10 +2,12 @@ package main import ( "bytes" + "crypto/tls" "encoding/json" "fmt" "github.com/coreos/go-raft" "io" + "net" "net/http" ) @@ -14,6 +16,31 @@ type transporter struct { client *http.Client } +// Create transporter using by raft server +// Create http or https transporter based on +// whether the user give the server cert and key +func newTransporter(scheme string, tlsConf tls.Config) transporter { + t := transporter{} + + tr := &http.Transport{ + Dial: dialTimeout, + } + + if scheme == "https" { + tr.TLSClientConfig = &tlsConf + tr.DisableCompression = true + } + + t.client = &http.Client{Transport: tr} + + return t +} + +// Dial with timeout +func dialTimeout(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, HTTPTimeout) +} + // Sends AppendEntries RPCs to a peer when the server is the leader. func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { var aersp *raft.AppendEntriesResponse diff --git a/util.go b/util.go index e57dfca59..682eafb71 100644 --- a/util.go +++ b/util.go @@ -6,7 +6,9 @@ import ( "github.com/coreos/etcd/web" "io" "log" + "net" "net/http" + "net/url" "os" "strconv" "time" @@ -69,6 +71,36 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) { } } +// sanitizeURL will cleanup a host string in the format hostname:port and +// attach a schema. +func sanitizeURL(host string, defaultScheme string) string { + // Blank URLs are fine input, just return it + if len(host) == 0 { + return host + } + + p, err := url.Parse(host) + if err != nil { + fatal(err) + } + + // Make sure the host is in Host:Port format + _, _, err = net.SplitHostPort(host) + if err != nil { + fatal(err) + } + + p = &url.URL{Host: host, Scheme: defaultScheme} + + return p.String() +} + +func check(err error) { + if err != nil { + fatal(err) + } +} + //-------------------------------------- // Log //-------------------------------------- @@ -79,6 +111,10 @@ func init() { logger = log.New(os.Stdout, "[etcd] ", log.Lmicroseconds) } +func infof(msg string, v ...interface{}) { + logger.Printf("INFO "+msg+"\n", v...) +} + func debugf(msg string, v ...interface{}) { if verbose { logger.Printf("DEBUG "+msg+"\n", v...)