mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
289 lines
6.7 KiB
Go
289 lines
6.7 KiB
Go
// Copyright 2017 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// +build cluster_proxy
|
|
|
|
package e2e
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"net/url"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/coreos/etcd/pkg/expect"
|
|
)
|
|
|
|
type proxyEtcdProcess struct {
|
|
etcdProc etcdProcess
|
|
proxyV2 *proxyV2Proc
|
|
proxyV3 *proxyV3Proc
|
|
}
|
|
|
|
func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
|
|
return newProxyEtcdProcess(cfg)
|
|
}
|
|
|
|
func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) {
|
|
ep, err := newEtcdServerProcess(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pep := &proxyEtcdProcess{
|
|
etcdProc: ep,
|
|
proxyV2: newProxyV2Proc(cfg),
|
|
proxyV3: newProxyV3Proc(cfg),
|
|
}
|
|
return pep, nil
|
|
}
|
|
|
|
func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }
|
|
|
|
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
|
|
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
|
|
func (p *proxyEtcdProcess) EndpointsMetrics() []string {
|
|
panic("not implemented; proxy doesn't provide health information")
|
|
}
|
|
|
|
func (p *proxyEtcdProcess) Start() error {
|
|
if err := p.etcdProc.Start(); err != nil {
|
|
return err
|
|
}
|
|
if err := p.proxyV2.Start(); err != nil {
|
|
return err
|
|
}
|
|
return p.proxyV3.Start()
|
|
}
|
|
|
|
func (p *proxyEtcdProcess) Restart() error {
|
|
if err := p.etcdProc.Restart(); err != nil {
|
|
return err
|
|
}
|
|
if err := p.proxyV2.Restart(); err != nil {
|
|
return err
|
|
}
|
|
return p.proxyV3.Restart()
|
|
}
|
|
|
|
func (p *proxyEtcdProcess) Stop() error {
|
|
err := p.proxyV2.Stop()
|
|
if v3err := p.proxyV3.Stop(); err == nil {
|
|
err = v3err
|
|
}
|
|
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
|
|
// fails on go-grpc issue #1384
|
|
if !strings.Contains(eerr.Error(), "exit status 2") {
|
|
err = eerr
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (p *proxyEtcdProcess) Close() error {
|
|
err := p.proxyV2.Close()
|
|
if v3err := p.proxyV3.Close(); err == nil {
|
|
err = v3err
|
|
}
|
|
if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
|
|
// fails on go-grpc issue #1384
|
|
if !strings.Contains(eerr.Error(), "exit status 2") {
|
|
err = eerr
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
|
|
p.proxyV3.WithStopSignal(sig)
|
|
p.proxyV3.WithStopSignal(sig)
|
|
return p.etcdProc.WithStopSignal(sig)
|
|
}
|
|
|
|
type proxyProc struct {
|
|
execPath string
|
|
args []string
|
|
ep string
|
|
murl string
|
|
donec chan struct{}
|
|
|
|
proc *expect.ExpectProcess
|
|
}
|
|
|
|
func (pp *proxyProc) endpoints() []string { return []string{pp.ep} }
|
|
|
|
func (pp *proxyProc) start() error {
|
|
if pp.proc != nil {
|
|
panic("already started")
|
|
}
|
|
proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pp.proc = proc
|
|
return nil
|
|
}
|
|
|
|
func (pp *proxyProc) waitReady(readyStr string) error {
|
|
defer close(pp.donec)
|
|
return waitReadyExpectProc(pp.proc, []string{readyStr})
|
|
}
|
|
|
|
func (pp *proxyProc) Stop() error {
|
|
if pp.proc == nil {
|
|
return nil
|
|
}
|
|
if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") {
|
|
// v2proxy exits with status 1 on auto tls; not sure why
|
|
return err
|
|
}
|
|
pp.proc = nil
|
|
<-pp.donec
|
|
pp.donec = make(chan struct{})
|
|
return nil
|
|
}
|
|
|
|
func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal {
|
|
ret := pp.proc.StopSignal
|
|
pp.proc.StopSignal = sig
|
|
return ret
|
|
}
|
|
|
|
func (pp *proxyProc) Close() error { return pp.Stop() }
|
|
|
|
type proxyV2Proc struct {
|
|
proxyProc
|
|
dataDir string
|
|
}
|
|
|
|
func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string {
|
|
u, err := url.Parse(cfg.acurl)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
host, port, _ := net.SplitHostPort(u.Host)
|
|
p, _ := strconv.ParseInt(port, 10, 16)
|
|
u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset)
|
|
return u.String()
|
|
}
|
|
|
|
func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc {
|
|
listenAddr := proxyListenURL(cfg, 2)
|
|
name := fmt.Sprintf("testname-proxy-%p", cfg)
|
|
args := []string{
|
|
"--name", name,
|
|
"--proxy", "on",
|
|
"--listen-client-urls", listenAddr,
|
|
"--initial-cluster", cfg.name + "=" + cfg.purl.String(),
|
|
}
|
|
return &proxyV2Proc{
|
|
proxyProc{
|
|
execPath: cfg.execPath,
|
|
args: append(args, cfg.tlsArgs...),
|
|
ep: listenAddr,
|
|
donec: make(chan struct{}),
|
|
},
|
|
name + ".etcd",
|
|
}
|
|
}
|
|
|
|
func (v2p *proxyV2Proc) Start() error {
|
|
os.RemoveAll(v2p.dataDir)
|
|
if err := v2p.start(); err != nil {
|
|
return err
|
|
}
|
|
return v2p.waitReady("httpproxy: endpoints found")
|
|
}
|
|
|
|
func (v2p *proxyV2Proc) Restart() error {
|
|
if err := v2p.Stop(); err != nil {
|
|
return err
|
|
}
|
|
return v2p.Start()
|
|
}
|
|
|
|
func (v2p *proxyV2Proc) Stop() error {
|
|
if err := v2p.proxyProc.Stop(); err != nil {
|
|
return err
|
|
}
|
|
// v2 proxy caches members; avoid reuse of directory
|
|
return os.RemoveAll(v2p.dataDir)
|
|
}
|
|
|
|
type proxyV3Proc struct {
|
|
proxyProc
|
|
}
|
|
|
|
func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc {
|
|
listenAddr := proxyListenURL(cfg, 3)
|
|
args := []string{
|
|
"grpc-proxy",
|
|
"start",
|
|
"--listen-addr", strings.Split(listenAddr, "/")[2],
|
|
"--endpoints", cfg.acurl,
|
|
// pass-through member RPCs
|
|
"--advertise-client-url", "",
|
|
}
|
|
murl := ""
|
|
if cfg.murl != "" {
|
|
murl = proxyListenURL(cfg, 4)
|
|
args = append(args, "--metrics-addr", murl)
|
|
}
|
|
tlsArgs := []string{}
|
|
for i := 0; i < len(cfg.tlsArgs); i++ {
|
|
switch cfg.tlsArgs[i] {
|
|
case "--cert-file":
|
|
tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1])
|
|
i++
|
|
case "--key-file":
|
|
tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1])
|
|
i++
|
|
case "--ca-file":
|
|
tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1])
|
|
i++
|
|
case "--auto-tls":
|
|
tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify")
|
|
case "--peer-ca-file", "--peer-cert-file", "--peer-key-file":
|
|
i++ // skip arg
|
|
case "--client-cert-auth", "--peer-auto-tls":
|
|
default:
|
|
tlsArgs = append(tlsArgs, cfg.tlsArgs[i])
|
|
}
|
|
}
|
|
return &proxyV3Proc{
|
|
proxyProc{
|
|
execPath: cfg.execPath,
|
|
args: append(args, tlsArgs...),
|
|
ep: listenAddr,
|
|
murl: murl,
|
|
donec: make(chan struct{}),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (v3p *proxyV3Proc) Restart() error {
|
|
if err := v3p.Stop(); err != nil {
|
|
return err
|
|
}
|
|
return v3p.Start()
|
|
}
|
|
|
|
func (v3p *proxyV3Proc) Start() error {
|
|
if err := v3p.start(); err != nil {
|
|
return err
|
|
}
|
|
return v3p.waitReady("listening for grpc-proxy client requests")
|
|
}
|