*: initial support for gatway

etcd gatway is a simple l4 gateway that forwards tcp connections to
the given endpoints.
This commit is contained in:
Xiang Li 2016-04-14 21:00:03 -07:00
parent 07c04c7c75
commit a300be92dc
5 changed files with 155 additions and 33 deletions

View File

@ -75,9 +75,7 @@ var (
dirEmpty = dirType("empty")
)
func Main() {
checkSupportArch()
func startEtcdOrProxyV2() {
cfg := NewConfig()
err := cfg.Parse(os.Args[1:])
if err != nil {

83
etcdmain/gateway.go Normal file
View File

@ -0,0 +1,83 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
import (
"fmt"
"net"
"os"
"strings"
"github.com/coreos/etcd/proxy/tcpproxy"
"github.com/spf13/cobra"
)
var (
gatewayListenAddr string
gatewayEndpoints string
)
var (
rootCmd = &cobra.Command{
Use: "etcd",
Short: "etcd server",
SuggestFor: []string{"etcd"},
}
)
func init() {
rootCmd.AddCommand(newGatewayCommand())
}
// newGatewayCommand returns the cobra command for "gateway".
func newGatewayCommand() *cobra.Command {
lpc := &cobra.Command{
Use: "gateway <subcommand>",
Short: "gateway related command",
}
lpc.AddCommand(newGatewayStartCommand())
return lpc
}
func newGatewayStartCommand() *cobra.Command {
cmd := cobra.Command{
Use: "start",
Short: "start the gateway",
Run: startGateway,
}
cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
cmd.Flags().StringVar(&gatewayEndpoints, "endpoints", "127.0.0.1:2379", "comma separated etcd cluster endpoints")
return &cmd
}
func startGateway(cmd *cobra.Command, args []string) {
endpoints := strings.Split(gatewayEndpoints, ",")
l, err := net.Listen("tcp", gatewayListenAddr)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
tp := tcpproxy.TCPProxy{
Listener: l,
Endpoints: endpoints,
}
tp.Run()
}

37
etcdmain/main.go Normal file
View File

@ -0,0 +1,37 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdmain
import (
"fmt"
"os"
)
func Main() {
checkSupportArch()
if len(os.Args) > 1 {
switch os.Args[1] {
case "gateway":
if err := rootCmd.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
return
}
}
startEtcdOrProxyV2()
}

View File

@ -21,16 +21,6 @@ import (
"time"
)
type tcpProxy struct {
l net.Listener
monitorInterval time.Duration
donec chan struct{}
mu sync.Mutex // guards the following fields
remotes []*remote
nextRemote int
}
type remote struct {
mu sync.Mutex
addr string
@ -61,10 +51,30 @@ func (r *remote) isActive() bool {
return !r.inactive
}
func (tp *tcpProxy) run() error {
type TCPProxy struct {
Listener net.Listener
Endpoints []string
MonitorInterval time.Duration
donec chan struct{}
mu sync.Mutex // guards the following fields
remotes []*remote
nextRemote int
}
func (tp *TCPProxy) Run() error {
tp.donec = make(chan struct{})
if tp.MonitorInterval == 0 {
tp.MonitorInterval = 5 * time.Minute
}
for _, ep := range tp.Endpoints {
tp.remotes = append(tp.remotes, &remote{addr: ep})
}
go tp.runMonitor()
for {
in, err := tp.l.Accept()
in, err := tp.Listener.Accept()
if err != nil {
return err
}
@ -73,13 +83,13 @@ func (tp *tcpProxy) run() error {
}
}
func (tp *tcpProxy) numRemotes() int {
func (tp *TCPProxy) numRemotes() int {
tp.mu.Lock()
defer tp.mu.Unlock()
return len(tp.remotes)
}
func (tp *tcpProxy) serve(in net.Conn) {
func (tp *TCPProxy) serve(in net.Conn) {
var (
err error
out net.Conn
@ -115,7 +125,7 @@ func (tp *tcpProxy) serve(in net.Conn) {
}
// pick picks a remote in round-robin fashion
func (tp *tcpProxy) pick() *remote {
func (tp *TCPProxy) pick() *remote {
tp.mu.Lock()
defer tp.mu.Unlock()
@ -124,10 +134,10 @@ func (tp *tcpProxy) pick() *remote {
return picked
}
func (tp *tcpProxy) runMonitor() {
func (tp *TCPProxy) runMonitor() {
for {
select {
case <-time.After(tp.monitorInterval):
case <-time.After(tp.MonitorInterval):
tp.mu.Lock()
for _, r := range tp.remotes {
if !r.isActive() {
@ -141,9 +151,9 @@ func (tp *tcpProxy) runMonitor() {
}
}
func (tp *tcpProxy) stop() {
func (tp *TCPProxy) Stop() {
// graceful shutdown?
// shutdown current connections?
tp.l.Close()
tp.Listener.Close()
close(tp.donec)
}

View File

@ -22,7 +22,6 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"
)
func TestUserspaceProxy(t *testing.T) {
@ -43,17 +42,12 @@ func TestUserspaceProxy(t *testing.T) {
t.Fatal(err)
}
p := tcpProxy{
l: l,
donec: make(chan struct{}),
monitorInterval: time.Second,
remotes: []*remote{
{addr: u.Host},
},
p := TCPProxy{
Listener: l,
Endpoints: []string{u.Host},
}
go p.run()
defer p.stop()
go p.Run()
defer p.Stop()
u.Host = l.Addr().String()