diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 083820b8f..ca68fab85 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -31,6 +31,10 @@ "ImportPath": "github.com/cheggaaa/pb", "Rev": "da1f27ad1d9509b16f65f52fd9d8138b0f2dc7b2" }, + { + "ImportPath": "github.com/cockroachdb/cmux", + "Rev": "112f0506e7743d64a6eb8fedbcff13d9979bbf92" + }, { "ImportPath": "github.com/codegangsta/cli", "Comment": "1.2.0-183-gb5232bb", diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/.gitignore b/Godeps/_workspace/src/github.com/cockroachdb/cmux/.gitignore new file mode 100644 index 000000000..daf913b1b --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/.travis.yml b/Godeps/_workspace/src/github.com/cockroachdb/cmux/.travis.yml new file mode 100644 index 000000000..e73780f2e --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/.travis.yml @@ -0,0 +1,22 @@ +language: go + +go: + - 1.3 + - 1.4 + - 1.5 + - 1.6 + +gobuild_args: -race + +before_install: + - go get -u github.com/golang/lint/golint + - if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then go get -u github.com/kisielk/errcheck; fi + - go get -u golang.org/x/tools/cmd/vet + +before_script: + - '! gofmt -s -l . | read' + - golint ./... + - echo $TRAVIS_GO_VERSION + - if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then errcheck ./...; fi + - go vet . + - go tool vet --shadow . diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/LICENSE b/Godeps/_workspace/src/github.com/cockroachdb/cmux/LICENSE new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/README.md b/Godeps/_workspace/src/github.com/cockroachdb/cmux/README.md new file mode 100644 index 000000000..b3713da58 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/README.md @@ -0,0 +1,65 @@ +# cmux: Connection Mux [![Build Status](https://travis-ci.org/cockroachdb/cmux.svg?branch=master)](https://travis-ci.org/cockroachdb/cmux) [![GoDoc](https://godoc.org/github.com/cockroachdb/cmux?status.svg)](https://godoc.org/github.com/cockroachdb/cmux) + +cmux is a generic Go library to multiplex connections based on their payload. +Using cmux, you can serve gRPC, SSH, HTTPS, HTTP, Go RPC, and pretty much any +other protocol on the same TCP listener. + +## How-To +Simply create your main listener, create a cmux for that listener, +and then match connections: +```go +// Create the main listener. +l, err := net.Listen("tcp", ":23456") +if err != nil { + log.Fatal(err) +} + +// Create a cmux. +m := cmux.New(l) + +// Match connections in order: +// First grpc, then HTTP, and otherwise Go RPC/TCP. +grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) +httpL := m.Match(cmux.HTTP1Fast()) +trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched. + +// Create your protocol servers. +grpcS := grpc.NewServer() +grpchello.RegisterGreeterServer(grpcs, &server{}) + +httpS := &http.Server{ + Handler: &helloHTTP1Handler{}, +} + +trpcS := rpc.NewServer() +s.Register(&ExampleRPCRcvr{}) + +// Use the muxed listeners for your servers. +go grpcS.Serve(grpcL) +go httpS.Serve(httpL) +go trpcS.Accept(trpcL) + +// Start serving! +m.Serve() +``` + +There are [more examples on GoDoc](https://godoc.org/github.com/cockroachdb/cmux#pkg-examples). + +## Performance +Since we are only matching the very first bytes of a connection, the +performance overhead on long-lived connections (i.e., RPCs and pipelined HTTP +streams) is negligible. + +## Limitations +* *TLS*: `net/http` uses a [type assertion](https://github.com/golang/go/issues/14221) +to identify TLS connections; since cmux's lookahead-implementing connection +wraps the underlying TLS connection, this type assertion fails. This means you +can serve HTTPS using cmux but `http.Request.TLS` will not be set in your +handlers. If you are able to wrap TLS around cmux, you can work around this +limitation. See https://github.com/cockroachdb/cockroach/commit/83caba2 for an +example of this approach. + +* *Different Protocols on The Same Connection*: `cmux` matches the connection +when it's accepted. For example, one connection can be either gRPC or REST, but +not both. That is, we assume that a client connection is either used for gRPC +or REST. diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/buffer.go b/Godeps/_workspace/src/github.com/cockroachdb/cmux/buffer.go new file mode 100644 index 000000000..5c1785853 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/buffer.go @@ -0,0 +1,35 @@ +package cmux + +import ( + "bytes" + "io" +) + +// bufferedReader is an optimized implementation of io.Reader that behaves like +// ``` +// io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer)) +// ``` +// without allocating. +type bufferedReader struct { + source io.Reader + buffer *bytes.Buffer + bufferRead int + bufferSize int +} + +func (s *bufferedReader) Read(p []byte) (int, error) { + // Functionality of bytes.Reader. + bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize]) + s.bufferRead += bn + + p = p[bn:] + + // Funtionality of io.TeeReader. + sn, sErr := s.source.Read(p) + if sn > 0 { + if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil { + return bn + wn, wErr + } + } + return bn + sn, sErr +} diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/cmux.go b/Godeps/_workspace/src/github.com/cockroachdb/cmux/cmux.go new file mode 100644 index 000000000..89cc910b0 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/cmux.go @@ -0,0 +1,210 @@ +package cmux + +import ( + "bytes" + "fmt" + "io" + "net" + "sync" +) + +// Matcher matches a connection based on its content. +type Matcher func(io.Reader) bool + +// ErrorHandler handles an error and returns whether +// the mux should continue serving the listener. +type ErrorHandler func(error) bool + +var _ net.Error = ErrNotMatched{} + +// ErrNotMatched is returned whenever a connection is not matched by any of +// the matchers registered in the multiplexer. +type ErrNotMatched struct { + c net.Conn +} + +func (e ErrNotMatched) Error() string { + return fmt.Sprintf("mux: connection %v not matched by an matcher", + e.c.RemoteAddr()) +} + +// Temporary implements the net.Error interface. +func (e ErrNotMatched) Temporary() bool { return true } + +// Timeout implements the net.Error interface. +func (e ErrNotMatched) Timeout() bool { return false } + +type errListenerClosed string + +func (e errListenerClosed) Error() string { return string(e) } +func (e errListenerClosed) Temporary() bool { return false } +func (e errListenerClosed) Timeout() bool { return false } + +// ErrListenerClosed is returned from muxListener.Accept when the underlying +// listener is closed. +var ErrListenerClosed = errListenerClosed("mux: listener closed") + +// New instantiates a new connection multiplexer. +func New(l net.Listener) CMux { + return &cMux{ + root: l, + bufLen: 1024, + errh: func(_ error) bool { return true }, + donec: make(chan struct{}), + } +} + +// CMux is a multiplexer for network connections. +type CMux interface { + // Match returns a net.Listener that sees (i.e., accepts) only + // the connections matched by at least one of the matcher. + // + // The order used to call Match determines the priority of matchers. + Match(...Matcher) net.Listener + // Serve starts multiplexing the listener. Serve blocks and perhaps + // should be invoked concurrently within a go routine. + Serve() error + // HandleError registers an error handler that handles listener errors. + HandleError(ErrorHandler) +} + +type matchersListener struct { + ss []Matcher + l muxListener +} + +type cMux struct { + root net.Listener + bufLen int + errh ErrorHandler + donec chan struct{} + sls []matchersListener +} + +func (m *cMux) Match(matchers ...Matcher) net.Listener { + ml := muxListener{ + Listener: m.root, + connc: make(chan net.Conn, m.bufLen), + } + m.sls = append(m.sls, matchersListener{ss: matchers, l: ml}) + return ml +} + +func (m *cMux) Serve() error { + var wg sync.WaitGroup + + defer func() { + close(m.donec) + wg.Wait() + + for _, sl := range m.sls { + close(sl.l.connc) + // Drain the connections enqueued for the listener. + for c := range sl.l.connc { + _ = c.Close() + } + } + }() + + for { + c, err := m.root.Accept() + if err != nil { + if !m.handleErr(err) { + return err + } + continue + } + + wg.Add(1) + go m.serve(c, m.donec, &wg) + } +} + +func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + + muc := newMuxConn(c) + for _, sl := range m.sls { + for _, s := range sl.ss { + matched := s(muc.getSniffer()) + if matched { + select { + case sl.l.connc <- muc: + case <-donec: + _ = c.Close() + } + return + } + } + } + + _ = c.Close() + err := ErrNotMatched{c: c} + if !m.handleErr(err) { + _ = m.root.Close() + } +} + +func (m *cMux) HandleError(h ErrorHandler) { + m.errh = h +} + +func (m *cMux) handleErr(err error) bool { + if !m.errh(err) { + return false + } + + if ne, ok := err.(net.Error); ok { + return ne.Temporary() + } + + return false +} + +type muxListener struct { + net.Listener + connc chan net.Conn +} + +func (l muxListener) Accept() (net.Conn, error) { + c, ok := <-l.connc + if !ok { + return nil, ErrListenerClosed + } + return c, nil +} + +// MuxConn wraps a net.Conn and provides transparent sniffing of connection data. +type MuxConn struct { + net.Conn + buf bytes.Buffer + sniffer bufferedReader +} + +func newMuxConn(c net.Conn) *MuxConn { + return &MuxConn{ + Conn: c, + } +} + +// From the io.Reader documentation: +// +// When Read encounters an error or end-of-file condition after +// successfully reading n > 0 bytes, it returns the number of +// bytes read. It may return the (non-nil) error from the same call +// or return the error (and n == 0) from a subsequent call. +// An instance of this general case is that a Reader returning +// a non-zero number of bytes at the end of the input stream may +// return either err == EOF or err == nil. The next Read should +// return 0, EOF. +func (m *MuxConn) Read(p []byte) (int, error) { + if n, err := m.buf.Read(p); err != io.EOF { + return n, err + } + return m.Conn.Read(p) +} + +func (m *MuxConn) getSniffer() io.Reader { + m.sniffer = bufferedReader{source: m.Conn, buffer: &m.buf, bufferSize: m.buf.Len()} + return &m.sniffer +} diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/matchers.go b/Godeps/_workspace/src/github.com/cockroachdb/cmux/matchers.go new file mode 100644 index 000000000..c5a8f25df --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/matchers.go @@ -0,0 +1,150 @@ +package cmux + +import ( + "bufio" + "io" + "io/ioutil" + "net/http" + "strings" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/http2" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/http2/hpack" +) + +// Any is a Matcher that matches any connection. +func Any() Matcher { + return func(r io.Reader) bool { return true } +} + +// PrefixMatcher returns a matcher that matches a connection if it +// starts with any of the strings in strs. +func PrefixMatcher(strs ...string) Matcher { + pt := newPatriciaTreeString(strs...) + return pt.matchPrefix +} + +var defaultHTTPMethods = []string{ + "OPTIONS", + "GET", + "HEAD", + "POST", + "PUT", + "DELETE", + "TRACE", + "CONNECT", +} + +// HTTP1Fast only matches the methods in the HTTP request. +// +// This matcher is very optimistic: if it returns true, it does not mean that +// the request is a valid HTTP response. If you want a correct but slower HTTP1 +// matcher, use HTTP1 instead. +func HTTP1Fast(extMethods ...string) Matcher { + return PrefixMatcher(append(defaultHTTPMethods, extMethods...)...) +} + +const maxHTTPRead = 4096 + +// HTTP1 parses the first line or upto 4096 bytes of the request to see if +// the conection contains an HTTP request. +func HTTP1() Matcher { + return func(r io.Reader) bool { + br := bufio.NewReader(&io.LimitedReader{R: r, N: maxHTTPRead}) + l, part, err := br.ReadLine() + if err != nil || part { + return false + } + + _, _, proto, ok := parseRequestLine(string(l)) + if !ok { + return false + } + + v, _, ok := http.ParseHTTPVersion(proto) + return ok && v == 1 + } +} + +// grabbed from net/http. +func parseRequestLine(line string) (method, uri, proto string, ok bool) { + s1 := strings.Index(line, " ") + s2 := strings.Index(line[s1+1:], " ") + if s1 < 0 || s2 < 0 { + return + } + s2 += s1 + 1 + return line[:s1], line[s1+1 : s2], line[s2+1:], true +} + +// HTTP2 parses the frame header of the first frame to detect whether the +// connection is an HTTP2 connection. +func HTTP2() Matcher { + return hasHTTP2Preface +} + +// HTTP1HeaderField returns a matcher matching the header fields of the first +// request of an HTTP 1 connection. +func HTTP1HeaderField(name, value string) Matcher { + return func(r io.Reader) bool { + return matchHTTP1Field(r, name, value) + } +} + +// HTTP2HeaderField resturns a matcher matching the header fields of the first +// headers frame. +func HTTP2HeaderField(name, value string) Matcher { + return func(r io.Reader) bool { + return matchHTTP2Field(r, name, value) + } +} + +func hasHTTP2Preface(r io.Reader) bool { + var b [len(http2.ClientPreface)]byte + if _, err := io.ReadFull(r, b[:]); err != nil { + return false + } + + return string(b[:]) == http2.ClientPreface +} + +func matchHTTP1Field(r io.Reader, name, value string) (matched bool) { + req, err := http.ReadRequest(bufio.NewReader(r)) + if err != nil { + return false + } + + return req.Header.Get(name) == value +} + +func matchHTTP2Field(r io.Reader, name, value string) (matched bool) { + if !hasHTTP2Preface(r) { + return false + } + + framer := http2.NewFramer(ioutil.Discard, r) + hdec := hpack.NewDecoder(uint32(4<<10), func(hf hpack.HeaderField) { + if hf.Name == name && hf.Value == value { + matched = true + } + }) + for { + f, err := framer.ReadFrame() + if err != nil { + return false + } + + switch f := f.(type) { + case *http2.HeadersFrame: + if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil { + return false + } + if matched { + return true + } + + if f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0 { + return false + } + } + } +} diff --git a/Godeps/_workspace/src/github.com/cockroachdb/cmux/patricia.go b/Godeps/_workspace/src/github.com/cockroachdb/cmux/patricia.go new file mode 100644 index 000000000..56ec4e7b2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/cockroachdb/cmux/patricia.go @@ -0,0 +1,173 @@ +package cmux + +import ( + "bytes" + "io" +) + +// patriciaTree is a simple patricia tree that handles []byte instead of string +// and cannot be changed after instantiation. +type patriciaTree struct { + root *ptNode +} + +func newPatriciaTree(b ...[]byte) *patriciaTree { + return &patriciaTree{ + root: newNode(b), + } +} + +func newPatriciaTreeString(strs ...string) *patriciaTree { + b := make([][]byte, len(strs)) + for i, s := range strs { + b[i] = []byte(s) + } + return &patriciaTree{ + root: newNode(b), + } +} + +func (t *patriciaTree) matchPrefix(r io.Reader) bool { + return t.root.match(r, true) +} + +func (t *patriciaTree) match(r io.Reader) bool { + return t.root.match(r, false) +} + +type ptNode struct { + prefix []byte + next map[byte]*ptNode + terminal bool +} + +func newNode(strs [][]byte) *ptNode { + if len(strs) == 0 { + return &ptNode{ + prefix: []byte{}, + terminal: true, + } + } + + if len(strs) == 1 { + return &ptNode{ + prefix: strs[0], + terminal: true, + } + } + + p, strs := splitPrefix(strs) + n := &ptNode{ + prefix: p, + } + + nexts := make(map[byte][][]byte) + for _, s := range strs { + if len(s) == 0 { + n.terminal = true + continue + } + nexts[s[0]] = append(nexts[s[0]], s[1:]) + } + + n.next = make(map[byte]*ptNode) + for first, rests := range nexts { + n.next[first] = newNode(rests) + } + + return n +} + +func splitPrefix(bss [][]byte) (prefix []byte, rest [][]byte) { + if len(bss) == 0 || len(bss[0]) == 0 { + return prefix, bss + } + + if len(bss) == 1 { + return bss[0], [][]byte{{}} + } + + for i := 0; ; i++ { + var cur byte + eq := true + for j, b := range bss { + if len(b) <= i { + eq = false + break + } + + if j == 0 { + cur = b[i] + continue + } + + if cur != b[i] { + eq = false + break + } + } + + if !eq { + break + } + + prefix = append(prefix, cur) + } + + rest = make([][]byte, 0, len(bss)) + for _, b := range bss { + rest = append(rest, b[len(prefix):]) + } + + return prefix, rest +} + +func readBytes(r io.Reader, n int) (b []byte, err error) { + b = make([]byte, n) + o := 0 + for o < n { + nr, err := r.Read(b[o:]) + if err != nil && err != io.EOF { + return b, err + } + + o += nr + + if err == io.EOF { + break + } + } + return b[:o], nil +} + +func (n *ptNode) match(r io.Reader, prefix bool) bool { + if l := len(n.prefix); l > 0 { + b, err := readBytes(r, l) + if err != nil || len(b) != l || !bytes.Equal(b, n.prefix) { + return false + } + } + + if prefix && n.terminal { + return true + } + + b := make([]byte, 1) + for { + nr, err := r.Read(b) + if nr != 0 { + break + } + + if err == io.EOF { + return n.terminal + } + + if err != nil { + return false + } + } + + nextN, ok := n.next[b[0]] + return ok && nextN.match(r, prefix) +} diff --git a/V3DemoProcfile b/V3DemoProcfile index 09a288691..ecd6a8550 100644 --- a/V3DemoProcfile +++ b/V3DemoProcfile @@ -1,7 +1,7 @@ # Use goreman to run `go get github.com/mattn/goreman` # etcd1 is the default client server for etcdctlv3 commands -etcd1: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:2378 --name infra1 --listen-client-urls http://127.0.0.1:12379 --advertise-client-urls http://127.0.0.1:12379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof -etcd2: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:22378 --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof -etcd3: bin/etcd --experimental-v3demo=true --experimental-gRPC-addr 127.0.0.1:32378 --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof +etcd1: bin/etcd --experimental-v3demo=true --name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof +etcd2: bin/etcd --experimental-v3demo=true --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof +etcd3: bin/etcd --experimental-v3demo=true --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof # in future, use proxy to listen on 2379 #proxy: bin/etcd --name infra-proxy1 --proxy=on --listen-client-urls http://127.0.0.1:2378 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --enable-pprof diff --git a/clientv3/README.md b/clientv3/README.md index 6b9735d03..581329465 100644 --- a/clientv3/README.md +++ b/clientv3/README.md @@ -16,7 +16,7 @@ Create client using `clientv3.New`: ```go cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{"localhost:12378", "localhost:22378", "localhost:32378"}, + Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, DialTimeout: 5 * time.Second, }) if err != nil { diff --git a/clientv3/doc.go b/clientv3/doc.go index 49b4037bb..b1cecf726 100644 --- a/clientv3/doc.go +++ b/clientv3/doc.go @@ -17,7 +17,7 @@ // Create client using `clientv3.New`: // // cli, err := clientv3.New(clientv3.Config{ -// Endpoints: []string{"localhost:12378", "localhost:22378", "localhost:32378"}, +// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}, // DialTimeout: 5 * time.Second, // }) // if err != nil { diff --git a/clientv3/example_cluster_test.go b/clientv3/example_cluster_test.go index 2db168fa7..24c75d229 100644 --- a/clientv3/example_cluster_test.go +++ b/clientv3/example_cluster_test.go @@ -113,7 +113,7 @@ func ExampleCluster_memberUpdate() { log.Fatal(err) } - peerURLs := []string{"http://localhost:12378"} + peerURLs := []string{"http://localhost:12380"} _, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs) if err != nil { log.Fatal(err) diff --git a/clientv3/example_test.go b/clientv3/example_test.go index 5cf696a6a..a3f5c9b4f 100644 --- a/clientv3/example_test.go +++ b/clientv3/example_test.go @@ -25,7 +25,7 @@ import ( var ( dialTimeout = 5 * time.Second requestTimeout = 1 * time.Second - endpoints = []string{"localhost:2378", "localhost:22378", "http://localhost:32380"} + endpoints = []string{"localhost:2379", "localhost:22379", "http://localhost:32379"} ) func Example() { diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go index 135c458d0..c0f4014d2 100644 --- a/etcdctlv3/main.go +++ b/etcdctlv3/main.go @@ -41,7 +41,7 @@ var ( ) func init() { - rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2378", "127.0.0.1:22378", "127.0.0.1:32378"}, "gRPC endpoints") + rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"}, "gRPC endpoints") rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (simple, json, protobuf)") rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings") diff --git a/etcdmain/config.go b/etcdmain/config.go index df8ba5e8d..7823f1b63 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -123,7 +123,6 @@ type config struct { printVersion bool v3demo bool - gRPCAddr string autoCompactionRetention int enablePprof bool @@ -226,7 +225,6 @@ func NewConfig() *config { // demo flag fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API.") - fs.StringVar(&cfg.gRPCAddr, "experimental-gRPC-addr", "127.0.0.1:2378", "gRPC address for experimental v3 demo API.") fs.IntVar(&cfg.autoCompactionRetention, "experimental-auto-compaction-retention", 0, "Auto compaction retention in hour. 0 means disable auto compaction.") // backwards-compatibility with v0.4.6 diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index d19557bca..69138b680 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -35,6 +35,7 @@ import ( systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc" @@ -281,15 +282,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { clns = append(clns, l) } - var v3l net.Listener - if cfg.v3demo { - v3l, err = net.Listen("tcp", cfg.gRPCAddr) - if err != nil { - plog.Fatal(err) - } - plog.Infof("listening for client rpc on %s", cfg.gRPCAddr) - } - srvcfg := &etcdserver.ServerConfig{ Name: cfg.name, ClientURLs: cfg.acurls, @@ -329,10 +321,26 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Info: cfg.corsInfo, } ph := etcdhttp.NewPeerHandler(s) + + var grpcS *grpc.Server + if cfg.v3demo { + // set up v3 demo rpc + tls := &cfg.clientTLSInfo + if cfg.clientTLSInfo.Empty() { + tls = nil + } + grpcS, err = v3rpc.Server(s, tls) + if err != nil { + s.Stop() + <-s.StopNotify() + return nil, err + } + } + // Start the peer server in a goroutine for _, l := range plns { go func(l net.Listener) { - plog.Fatal(serveHTTP(l, ph, 5*time.Minute)) + plog.Fatal(serve(l, nil, ph, 5*time.Minute)) }(l) } // Start a client server goroutine for each listen address @@ -340,25 +348,10 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { go func(l net.Listener) { // read timeout does not work with http close notify // TODO: https://github.com/golang/go/issues/9524 - plog.Fatal(serveHTTP(l, ch, 0)) + plog.Fatal(serve(l, grpcS, ch, 0)) }(l) } - if cfg.v3demo { - // set up v3 demo rpc - tls := &cfg.clientTLSInfo - if cfg.clientTLSInfo.Empty() { - tls = nil - } - grpcServer, err := v3rpc.Server(s, tls) - if err != nil { - s.Stop() - <-s.StopNotify() - return nil, err - } - go func() { plog.Fatal(grpcServer.Serve(v3l)) }() - } - return s.StopNotify(), nil } diff --git a/etcdmain/help.go b/etcdmain/help.go index 2e4bb6e5b..5379c36f0 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -139,8 +139,6 @@ experimental flags: enable experimental v3 demo API. --experimental-auto-compaction-retention '0' auto compaction retention in hour. 0 means disable auto compaction. - --experimental-gRPC-addr '127.0.0.1:2378' - gRPC address for experimental v3 demo API. profiling flags: --enable-pprof 'false' diff --git a/etcdmain/http.go b/etcdmain/serve.go similarity index 67% rename from etcdmain/http.go rename to etcdmain/serve.go index d45996605..c9f4daf77 100644 --- a/etcdmain/http.go +++ b/etcdmain/serve.go @@ -20,14 +20,25 @@ import ( "net" "net/http" "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/cockroachdb/cmux" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" ) -// serveHTTP accepts incoming HTTP connections on the listener l, +// serve accepts incoming connections on the listener l, // creating a new service goroutine for each. The service goroutines // read requests and then call handler to reply to them. -func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) error { +func serve(l net.Listener, grpcS *grpc.Server, handler http.Handler, readTimeout time.Duration) error { // TODO: assert net.Listener type? Arbitrary listener might break HTTPS server which // expect a TLS Conn type. + httpl := l + if grpcS != nil { + m := cmux.New(l) + grpcl := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) + httpl = m.Match(cmux.Any()) + go plog.Fatal(m.Serve()) + go plog.Fatal(grpcS.Serve(grpcl)) + } logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0) // TODO: add debug flag; enable logging when debug flag is set @@ -36,5 +47,5 @@ func serveHTTP(l net.Listener, handler http.Handler, readTimeout time.Duration) ReadTimeout: readTimeout, ErrorLog: logger, // do not log user error } - return srv.Serve(l) + return srv.Serve(httpl) } diff --git a/tools/benchmark/cmd/root.go b/tools/benchmark/cmd/root.go index ad843d630..5fd050d7b 100644 --- a/tools/benchmark/cmd/root.go +++ b/tools/benchmark/cmd/root.go @@ -49,7 +49,7 @@ var ( ) func init() { - RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2378"}, "gRPC endpoints") + RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints") RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections") RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients") diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index c1256080d..c60f883cd 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -44,7 +44,6 @@ type cluster struct { Agents []client.Agent Stressers []Stresser Names []string - GRPCURLs []string ClientURLs []string } @@ -89,7 +88,6 @@ func (c *cluster) Bootstrap() error { if err != nil { return err } - grpcURLs[i] = fmt.Sprintf("%s:2378", host) clientURLs[i] = fmt.Sprintf("http://%s:2379", host) peerURLs[i] = fmt.Sprintf("http://%s:%d", host, peerURLPort) @@ -115,9 +113,7 @@ func (c *cluster) Bootstrap() error { } if !c.v2Only { flags = append(flags, - "--experimental-v3demo", - "--experimental-gRPC-addr", grpcURLs[i], - ) + "--experimental-v3demo") } if _, err := a.Start(flags...); err != nil { @@ -161,7 +157,6 @@ func (c *cluster) Bootstrap() error { c.Agents = agents c.Stressers = stressers c.Names = names - c.GRPCURLs = grpcURLs c.ClientURLs = clientURLs return nil } @@ -172,7 +167,7 @@ func (c *cluster) WaitHealth() error { // TODO: set it to a reasonable value. It is set that high because // follower may use long time to catch up the leader when reboot under // reasonable workload (https://github.com/coreos/etcd/issues/2698) - healthFunc, urls := setHealthKey, c.GRPCURLs + healthFunc, urls := setHealthKey, c.ClientURLs if c.v2Only { healthFunc, urls = setHealthKeyV2, c.ClientURLs } @@ -192,7 +187,7 @@ func (c *cluster) GetLeader() (int, error) { return 0, nil } cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.GRPCURLs, + Endpoints: c.ClientURLs, DialTimeout: 5 * time.Second, }) if err != nil { @@ -304,7 +299,7 @@ func setHealthKeyV2(us []string) error { func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) { revs := make(map[string]int64) hashes := make(map[string]int64) - for _, u := range c.GRPCURLs { + for _, u := range c.ClientURLs { conn, err := grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) if err != nil { return nil, nil, err @@ -328,7 +323,7 @@ func (c *cluster) compactKV(rev int64) error { conn *grpc.ClientConn err error ) - for _, u := range c.GRPCURLs { + for _, u := range c.ClientURLs { conn, err = grpc.Dial(u, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second)) if err != nil { continue