diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 7091e9257..97236e4c0 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -75,9 +75,7 @@ var ( dirEmpty = dirType("empty") ) -func Main() { - checkSupportArch() - +func startEtcdOrProxyV2() { cfg := NewConfig() err := cfg.Parse(os.Args[1:]) if err != nil { diff --git a/etcdmain/gateway.go b/etcdmain/gateway.go new file mode 100644 index 000000000..39c1f7d68 --- /dev/null +++ b/etcdmain/gateway.go @@ -0,0 +1,83 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdmain + +import ( + "fmt" + "net" + "os" + "strings" + + "github.com/coreos/etcd/proxy/tcpproxy" + "github.com/spf13/cobra" +) + +var ( + gatewayListenAddr string + gatewayEndpoints string +) + +var ( + rootCmd = &cobra.Command{ + Use: "etcd", + Short: "etcd server", + SuggestFor: []string{"etcd"}, + } +) + +func init() { + rootCmd.AddCommand(newGatewayCommand()) +} + +// newGatewayCommand returns the cobra command for "gateway". +func newGatewayCommand() *cobra.Command { + lpc := &cobra.Command{ + Use: "gateway ", + Short: "gateway related command", + } + lpc.AddCommand(newGatewayStartCommand()) + + return lpc +} + +func newGatewayStartCommand() *cobra.Command { + cmd := cobra.Command{ + Use: "start", + Short: "start the gateway", + Run: startGateway, + } + + cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address") + cmd.Flags().StringVar(&gatewayEndpoints, "endpoints", "127.0.0.1:2379", "comma separated etcd cluster endpoints") + + return &cmd +} + +func startGateway(cmd *cobra.Command, args []string) { + endpoints := strings.Split(gatewayEndpoints, ",") + + l, err := net.Listen("tcp", gatewayListenAddr) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + tp := tcpproxy.TCPProxy{ + Listener: l, + Endpoints: endpoints, + } + + tp.Run() +} diff --git a/etcdmain/main.go b/etcdmain/main.go new file mode 100644 index 000000000..6f8729067 --- /dev/null +++ b/etcdmain/main.go @@ -0,0 +1,37 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdmain + +import ( + "fmt" + "os" +) + +func Main() { + checkSupportArch() + + if len(os.Args) > 1 { + switch os.Args[1] { + case "gateway": + if err := rootCmd.Execute(); err != nil { + fmt.Fprint(os.Stderr, err) + os.Exit(1) + } + return + } + } + + startEtcdOrProxyV2() +} diff --git a/proxy/tcpproxy/userspace.go b/proxy/tcpproxy/userspace.go index 33c920701..9a147397e 100644 --- a/proxy/tcpproxy/userspace.go +++ b/proxy/tcpproxy/userspace.go @@ -21,16 +21,6 @@ import ( "time" ) -type tcpProxy struct { - l net.Listener - monitorInterval time.Duration - donec chan struct{} - - mu sync.Mutex // guards the following fields - remotes []*remote - nextRemote int -} - type remote struct { mu sync.Mutex addr string @@ -61,10 +51,30 @@ func (r *remote) isActive() bool { return !r.inactive } -func (tp *tcpProxy) run() error { +type TCPProxy struct { + Listener net.Listener + Endpoints []string + MonitorInterval time.Duration + + donec chan struct{} + + mu sync.Mutex // guards the following fields + remotes []*remote + nextRemote int +} + +func (tp *TCPProxy) Run() error { + tp.donec = make(chan struct{}) + if tp.MonitorInterval == 0 { + tp.MonitorInterval = 5 * time.Minute + } + for _, ep := range tp.Endpoints { + tp.remotes = append(tp.remotes, &remote{addr: ep}) + } + go tp.runMonitor() for { - in, err := tp.l.Accept() + in, err := tp.Listener.Accept() if err != nil { return err } @@ -73,13 +83,13 @@ func (tp *tcpProxy) run() error { } } -func (tp *tcpProxy) numRemotes() int { +func (tp *TCPProxy) numRemotes() int { tp.mu.Lock() defer tp.mu.Unlock() return len(tp.remotes) } -func (tp *tcpProxy) serve(in net.Conn) { +func (tp *TCPProxy) serve(in net.Conn) { var ( err error out net.Conn @@ -115,7 +125,7 @@ func (tp *tcpProxy) serve(in net.Conn) { } // pick picks a remote in round-robin fashion -func (tp *tcpProxy) pick() *remote { +func (tp *TCPProxy) pick() *remote { tp.mu.Lock() defer tp.mu.Unlock() @@ -124,10 +134,10 @@ func (tp *tcpProxy) pick() *remote { return picked } -func (tp *tcpProxy) runMonitor() { +func (tp *TCPProxy) runMonitor() { for { select { - case <-time.After(tp.monitorInterval): + case <-time.After(tp.MonitorInterval): tp.mu.Lock() for _, r := range tp.remotes { if !r.isActive() { @@ -141,9 +151,9 @@ func (tp *tcpProxy) runMonitor() { } } -func (tp *tcpProxy) stop() { +func (tp *TCPProxy) Stop() { // graceful shutdown? // shutdown current connections? - tp.l.Close() + tp.Listener.Close() close(tp.donec) } diff --git a/proxy/tcpproxy/userspace_test.go b/proxy/tcpproxy/userspace_test.go index 6d38d076c..3ca04e482 100644 --- a/proxy/tcpproxy/userspace_test.go +++ b/proxy/tcpproxy/userspace_test.go @@ -22,7 +22,6 @@ import ( "net/http/httptest" "net/url" "testing" - "time" ) func TestUserspaceProxy(t *testing.T) { @@ -43,17 +42,12 @@ func TestUserspaceProxy(t *testing.T) { t.Fatal(err) } - p := tcpProxy{ - l: l, - donec: make(chan struct{}), - monitorInterval: time.Second, - - remotes: []*remote{ - {addr: u.Host}, - }, + p := TCPProxy{ + Listener: l, + Endpoints: []string{u.Host}, } - go p.run() - defer p.stop() + go p.Run() + defer p.Stop() u.Host = l.Addr().String()