// Copyright 2018 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "flag" "fmt" "io/ioutil" "net/http" "net/url" "os" "sort" "strings" "time" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/embed" "github.com/coreos/etcd/pkg/transport" "go.uber.org/zap" ) var lg *zap.Logger func init() { var err error lg, err = zap.NewProduction() if err != nil { panic(err) } } func main() { addr := flag.String("addr", "", "etcd metrics URL to fetch from (empty to use current git branch)") enableLog := flag.Bool("server-log", false, "true to enable embedded etcd server logs") debug := flag.Bool("debug", false, "true to enable debug logging") flag.Parse() if *debug { lg = zap.NewExample() } ep := *addr if ep == "" { uss := newEmbedURLs(4) ep = uss[0].String() + "/metrics" cfgs := []*embed.Config{embed.NewConfig(), embed.NewConfig()} cfgs[0].Name, cfgs[1].Name = "0", "1" setupEmbedCfg(cfgs[0], *enableLog, []url.URL{uss[0]}, []url.URL{uss[1]}, []url.URL{uss[1], uss[3]}) setupEmbedCfg(cfgs[1], *enableLog, []url.URL{uss[2]}, []url.URL{uss[3]}, []url.URL{uss[1], uss[3]}) type embedAndError struct { ec *embed.Etcd err error } ech := make(chan embedAndError) for _, cfg := range cfgs { go func(c *embed.Config) { e, err := embed.StartEtcd(c) if err != nil { ech <- embedAndError{err: err} return } <-e.Server.ReadyNotify() ech <- embedAndError{ec: e} }(cfg) } for range cfgs { ev := <-ech if ev.err != nil { lg.Panic("failed to start embedded etcd", zap.Error(ev.err)) } defer ev.ec.Close() } // give enough time for peer-to-peer metrics time.Sleep(7 * time.Second) lg.Debug("started 2-node embedded etcd cluster") } lg.Debug("starting etcd-dump-metrics", zap.String("endpoint", ep)) // send client requests to populate gRPC client-side metrics // TODO: enable default metrics initialization in v3.1 and v3.2 cli, err := clientv3.New(clientv3.Config{Endpoints: []string{strings.Replace(ep, "/metrics", "", 1)}}) if err != nil { lg.Panic("failed to create client", zap.Error(err)) } defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err = cli.Put(ctx, "____test", "") if err != nil { lg.Panic("failed to write test key", zap.Error(err)) } _, err = cli.Get(ctx, "____test") if err != nil { lg.Panic("failed to read test key", zap.Error(err)) } _, err = cli.Delete(ctx, "____test") if err != nil { lg.Panic("failed to delete test key", zap.Error(err)) } cli.Watch(ctx, "____test", clientv3.WithCreatedNotify()) fmt.Println(getMetrics(ep)) } func getMetrics(ep string) (m metricSlice) { lines, err := fetchMetrics(ep) if err != nil { lg.Panic("failed to fetch metrics", zap.Error(err)) } mss := parse(lines) sort.Sort(metricSlice(mss)) return mss } func (mss metricSlice) String() (s string) { ver := "unknown" for i, v := range mss { if strings.HasPrefix(v.name, "etcd_server_version") { ver = v.metrics[0] } s += v.String() if i != len(mss)-1 { s += "\n\n" } } return "# server version: " + ver + "\n\n" + s } type metricSlice []metric func (mss metricSlice) Len() int { return len(mss) } func (mss metricSlice) Less(i, j int) bool { return mss[i].name < mss[j].name } func (mss metricSlice) Swap(i, j int) { mss[i], mss[j] = mss[j], mss[i] } type metric struct { // raw data for debugging purposes raw []string // metrics name name string // metrics description desc string // metrics type tp string // aggregates of "grpc_server_handled_total" grpcCodes []string // keep fist 1 and last 4 if histogram or summary // otherwise, keep only 1 metrics []string } func (m metric) String() (s string) { s += fmt.Sprintf("# name: %q\n", m.name) s += fmt.Sprintf("# description: %q\n", m.desc) s += fmt.Sprintf("# type: %q\n", m.tp) if len(m.grpcCodes) > 0 { s += "# gRPC codes: \n" for _, c := range m.grpcCodes { s += fmt.Sprintf("# - %q\n", c) } } s += strings.Join(m.metrics, "\n") return s } func parse(lines []string) (mss []metric) { m := metric{raw: make([]string, 0), metrics: make([]string, 0)} for _, line := range lines { if strings.HasPrefix(line, "# HELP ") { // add previous metric and initialize if m.name != "" { mss = append(mss, m) } m = metric{raw: make([]string, 0), metrics: make([]string, 0)} m.raw = append(m.raw, line) ss := strings.Split(strings.Replace(line, "# HELP ", "", 1), " ") m.name, m.desc = ss[0], strings.Join(ss[1:], " ") continue } if strings.HasPrefix(line, "# TYPE ") { m.raw = append(m.raw, line) m.tp = strings.Split(strings.Replace(line, "# TYPE "+m.tp, "", 1), " ")[1] continue } m.raw = append(m.raw, line) m.metrics = append(m.metrics, strings.Split(line, " ")[0]) } if m.name != "" { mss = append(mss, m) } // aggregate for i := range mss { /* munge data for: etcd_network_active_peers{Local="c6c9b5143b47d146",Remote="fbdddd08d7e1608b"} etcd_network_peer_sent_bytes_total{To="c6c9b5143b47d146"} etcd_network_peer_received_bytes_total{From="0"} etcd_network_peer_received_bytes_total{From="fd422379fda50e48"} etcd_network_peer_round_trip_time_seconds_bucket{To="91bc3c398fb3c146",le="0.0001"} etcd_network_peer_round_trip_time_seconds_bucket{To="fd422379fda50e48",le="0.8192"} etcd_network_peer_round_trip_time_seconds_bucket{To="fd422379fda50e48",le="+Inf"} etcd_network_peer_round_trip_time_seconds_sum{To="fd422379fda50e48"} etcd_network_peer_round_trip_time_seconds_count{To="fd422379fda50e48"} */ if mss[i].name == "etcd_network_active_peers" { mss[i].metrics = []string{`etcd_network_active_peers{Local="LOCAL_NODE_ID",Remote="REMOTE_PEER_NODE_ID"}`} } if mss[i].name == "etcd_network_peer_sent_bytes_total" { mss[i].metrics = []string{`etcd_network_peer_sent_bytes_total{To="REMOTE_PEER_NODE_ID"}`} } if mss[i].name == "etcd_network_peer_received_bytes_total" { mss[i].metrics = []string{`etcd_network_peer_received_bytes_total{From="REMOTE_PEER_NODE_ID"}`} } if mss[i].tp == "histogram" || mss[i].tp == "summary" { if mss[i].name == "etcd_network_peer_round_trip_time_seconds" { for j := range mss[i].metrics { l := mss[i].metrics[j] if strings.Contains(l, `To="`) && strings.Contains(l, `le="`) { k1 := strings.Index(l, `To="`) k2 := strings.Index(l, `",le="`) mss[i].metrics[j] = l[:k1+4] + "REMOTE_PEER_NODE_ID" + l[k2:] } if strings.HasPrefix(l, "etcd_network_peer_round_trip_time_seconds_sum") { mss[i].metrics[j] = `etcd_network_peer_round_trip_time_seconds_sum{To="REMOTE_PEER_NODE_ID"}` } if strings.HasPrefix(l, "etcd_network_peer_round_trip_time_seconds_count") { mss[i].metrics[j] = `etcd_network_peer_round_trip_time_seconds_count{To="REMOTE_PEER_NODE_ID"}` } } mss[i].metrics = aggSort(mss[i].metrics) } } // aggregate gRPC RPC metrics if mss[i].name == "grpc_server_handled_total" { pfx := `grpc_server_handled_total{grpc_code="` codes, metrics := make(map[string]struct{}), make(map[string]struct{}) for _, v := range mss[i].metrics { v2 := strings.Replace(v, pfx, "", 1) idx := strings.Index(v2, `",grpc_method="`) code := v2[:idx] v2 = v2[idx:] codes[code] = struct{}{} v2 = pfx + "CODE" + v2 metrics[v2] = struct{}{} } mss[i].grpcCodes = sortMap(codes) mss[i].metrics = sortMap(metrics) } } return mss } func fetchMetrics(ep string) (lines []string, err error) { tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, time.Second, time.Second, time.Second) if err != nil { return nil, err } cli := &http.Client{Transport: tr} resp, err := cli.Get(ep) if err != nil { return nil, err } defer resp.Body.Close() b, rerr := ioutil.ReadAll(resp.Body) if rerr != nil { return nil, rerr } lines = strings.Split(string(b), "\n") return lines, nil } func newEmbedURLs(n int) (urls []url.URL) { urls = make([]url.URL, n) for i := 0; i < n; i++ { u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d%06d", os.Getpid(), i)) urls[i] = *u } return urls } func setupEmbedCfg(cfg *embed.Config, enableLog bool, curls, purls, ics []url.URL) { cfg.Logger = "zap" cfg.LogOutputs = []string{"/dev/null"} if enableLog { cfg.LogOutputs = []string{"stderr"} } cfg.Debug = false var err error cfg.Dir, err = ioutil.TempDir(os.TempDir(), fmt.Sprintf("%016X", time.Now().UnixNano())) if err != nil { panic(err) } os.RemoveAll(cfg.Dir) cfg.ClusterState = "new" cfg.LCUrls, cfg.ACUrls = curls, curls cfg.LPUrls, cfg.APUrls = purls, purls cfg.InitialCluster = "" for i := range ics { cfg.InitialCluster += fmt.Sprintf(",%d=%s", i, ics[i].String()) } cfg.InitialCluster = cfg.InitialCluster[1:] } func aggSort(ss []string) (sorted []string) { set := make(map[string]struct{}) for _, s := range ss { set[s] = struct{}{} } sorted = make([]string, 0, len(set)) for k := range set { sorted = append(sorted, k) } sort.Strings(sorted) return sorted } func sortMap(set map[string]struct{}) (sorted []string) { sorted = make([]string, 0, len(set)) for k := range set { sorted = append(sorted, k) } sort.Strings(sorted) return sorted }