diff --git a/tools/etcd-top/README.md b/tools/etcd-top/README.md new file mode 100644 index 000000000..a50047b7e --- /dev/null +++ b/tools/etcd-top/README.md @@ -0,0 +1,23 @@ +# etcd-top +etcd realtime workload analyzer. Useful for rapid diagnosis of production usage issues and analysis of production request distributions. + +usage: +``` + -iface="eth0": interface for sniffing traffic on + -period=1: seconds between submissions + -ports="2379,4001": etcd listening ports + -promiscuous=true: whether to perform promiscuous sniffing or not. + -topk=10: submit stats for the top sniffed paths +``` + +result: +``` +go run etcd-top.go --period=1 -topk=3 +1440035702 sniffed 1074 requests over last 1 seconds + +Top 3 most popular http requests: + Sum Rate Verb Path + 1305 22 GET /v2/keys/c + 1302 8 GET /v2/keys/S + 1297 10 GET /v2/keys/h +``` diff --git a/tools/etcd-top/etcd-top.go b/tools/etcd-top/etcd-top.go new file mode 100644 index 000000000..0812b8b12 --- /dev/null +++ b/tools/etcd-top/etcd-top.go @@ -0,0 +1,215 @@ +package main + +import ( + "bufio" + "bytes" + "flag" + "fmt" + "math" + "net/http" + "os" + "runtime" + "sort" + "strconv" + "strings" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/akrennmair/gopcap" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spacejam/loghisto" +) + +type nameSum struct { + Name string + Sum float64 + Rate float64 +} + +type nameSums []nameSum + +func (n nameSums) Len() int { + return len(n) +} +func (n nameSums) Less(i, j int) bool { + return n[i].Sum > n[j].Sum +} +func (n nameSums) Swap(i, j int) { + n[i], n[j] = n[j], n[i] +} + +// This function listens for periodic metrics from the loghisto metric system, +// and upon receipt of a batch of them it will print out the desired topK. +func statPrinter(metricStream chan *loghisto.ProcessedMetricSet, topK, period uint) { + for m := range metricStream { + requestCounter := float64(0) + nvs := nameSums{} + for k, v := range m.Metrics { + // loghisto adds _rate suffixed metrics for counters and histograms + if strings.HasSuffix(k, "_rate") && !strings.HasSuffix(k, "_rate_rate") { + continue + } + nvs = append(nvs, nameSum{ + Name: k, + Sum: v, + Rate: m.Metrics[k+"_rate"], + }) + requestCounter += m.Metrics[k+"_rate"] + } + + fmt.Printf("\n%d sniffed %d requests over last %d seconds\n\n", time.Now().Unix(), + uint(requestCounter), period) + if len(nvs) == 0 { + continue + } + sort.Sort(nvs) + fmt.Printf("Top %d most popular http requests:\n", topK) + fmt.Println("Total Sum Period Sum Verb Path") + for _, nv := range nvs[0:int(math.Min(float64(len(nvs)), float64(topK)))] { + fmt.Printf("%9.1d %7.1d %s\n", int(nv.Sum), int(nv.Rate), nv.Name) + } + } +} + +// packetDecoder decodes packets and hands them off to the streamRouter +func packetDecoder(packetsIn chan *pcap.Packet, packetsOut chan *pcap.Packet) { + for pkt := range packetsIn { + pkt.Decode() + select { + case packetsOut <- pkt: + default: + fmt.Fprint(os.Stderr, "shedding at decoder!") + } + } +} + +// processor tries to parse an http request from each packet, and if +// successful it records metrics about it in the loghisto metric system. +func processor(ms *loghisto.MetricSystem, packetsIn chan *pcap.Packet) { + for pkt := range packetsIn { + req, reqErr := http.ReadRequest(bufio.NewReader(bytes.NewReader(pkt.Payload))) + if reqErr == nil { + ms.Counter(req.Method+" "+req.URL.Path, 1) + } + } +} + +// streamRouter takes a decoded packet and routes it to a processor that can deal with all requests +// and responses for this particular TCP connection. This allows the processor to own a local map +// of requests so that it can avoid coordinating with other goroutines to perform analysis. +func streamRouter(ports []uint16, parsedPackets chan *pcap.Packet, processors []chan *pcap.Packet) { + for pkt := range parsedPackets { + if pkt.TCP == nil { + continue + } + clientPort := uint16(0) + for _, p := range ports { + if pkt.TCP.SrcPort == p { + clientPort = pkt.TCP.DestPort + break + } + if pkt.TCP.DestPort == p { + clientPort = pkt.TCP.SrcPort + break + } + } + if clientPort != 0 { + // client Port can be assumed to have sufficient entropy for + // distribution among processors, and we want the same + // tcp stream to go to the same processor every time + // so that if we do proper packet reconstruction it will + // be easier. + select { + case processors[int(clientPort)%len(processors)] <- pkt: + default: + fmt.Fprint(os.Stderr, "Shedding load at router!") + } + } + } +} + +// 1. parse args +// 2. start the loghisto metric system +// 3. start the processing and printing goroutines +// 4. open the pcap handler +// 5. hand off packets from the handler to the decoder +func main() { + portsArg := flag.String("ports", "2379,4001", "etcd listening ports") + iface := flag.String("iface", "eth0", "interface for sniffing traffic on") + promisc := flag.Bool("promiscuous", true, "promiscuous mode") + period := flag.Uint("period", 1, "seconds between submissions") + topK := flag.Uint("topk", 10, "submit stats for the top sniffed paths") + flag.Parse() + + numCPU := runtime.NumCPU() + runtime.GOMAXPROCS(numCPU) + + ms := loghisto.NewMetricSystem(time.Duration(*period)*time.Second, false) + ms.Start() + metricStream := make(chan *loghisto.ProcessedMetricSet, 2) + ms.SubscribeToProcessedMetrics(metricStream) + defer ms.UnsubscribeFromProcessedMetrics(metricStream) + + go statPrinter(metricStream, *topK, *period) + + ports := []uint16{} + for _, p := range strings.Split(*portsArg, ",") { + p, err := strconv.Atoi(p) + if err == nil { + ports = append(ports, uint16(p)) + } else { + fmt.Fprintf(os.Stderr, "Failed to parse port \"%s\": %v\n", p, err) + os.Exit(1) + } + } + + if len(ports) == 0 { + fmt.Fprint(os.Stderr, "No ports given! Exiting.\n") + os.Exit(1) + } + + // We choose 1518 for the snaplen because it's the default + // ethernet MTU at the link layer. We choose 1000 for the + // timeout based on a measurement for its impact on latency + // impact, but it is less precise. + h, err := pcap.Openlive(*iface, 1518, *promisc, 1000) + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + os.Exit(1) + } + defer h.Close() + + portArray := strings.Split(*portsArg, ",") + dst := strings.Join(portArray, " or dst port ") + src := strings.Join(portArray, " or src port ") + filter := fmt.Sprintf("tcp and (dst port %s or src port %s)", dst, src) + fmt.Println("using bpf filter: ", filter) + if err := h.Setfilter(filter); err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + os.Exit(1) + } + + unparsedPackets := make(chan *pcap.Packet, 16384) + parsedPackets := make(chan *pcap.Packet, 16384) + for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ { + go packetDecoder(unparsedPackets, parsedPackets) + } + + processors := []chan *pcap.Packet{} + for i := 0; i < int(math.Max(2, float64(numCPU/4))); i++ { + p := make(chan *pcap.Packet, 16384) + processors = append(processors, p) + go processor(ms, p) + } + + go streamRouter(ports, parsedPackets, processors) + + for { + pkt := h.Next() + if pkt != nil { + select { + case unparsedPackets <- pkt: + default: + fmt.Fprint(os.Stderr, "SHEDDING IN MAIN") + } + } + } +}