e2e: refactor to support -tags cluster_proxy

This commit is contained in:
Anthony Romano 2017-07-14 16:03:04 -07:00
parent 46ee06a85c
commit 5c6a6bdc5a
23 changed files with 1083 additions and 774 deletions

View File

@ -0,0 +1,21 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !cluster_proxy
package e2e
func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
return newEtcdServerProcess(cfg)
}

278
e2e/cluster_proxy_test.go Normal file
View File

@ -0,0 +1,278 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build cluster_proxy
package e2e
import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/coreos/etcd/pkg/expect"
)
type proxyEtcdProcess struct {
etcdProc etcdProcess
proxyV2 *proxyV2Proc
proxyV3 *proxyV3Proc
}
func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
return newProxyEtcdProcess(cfg)
}
func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) {
ep, err := newEtcdServerProcess(cfg)
if err != nil {
return nil, err
}
pep := &proxyEtcdProcess{
etcdProc: ep,
proxyV2: newProxyV2Proc(cfg),
proxyV3: newProxyV3Proc(cfg),
}
return pep, nil
}
func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) Start() error {
if err := p.etcdProc.Start(); err != nil {
return err
}
if err := p.proxyV2.Start(); err != nil {
return err
}
return p.proxyV3.Start()
}
func (p *proxyEtcdProcess) Restart() error {
if err := p.etcdProc.Restart(); err != nil {
return err
}
if err := p.proxyV2.Restart(); err != nil {
return err
}
return p.proxyV3.Restart()
}
func (p *proxyEtcdProcess) Stop() error {
err := p.proxyV2.Stop()
if v3err := p.proxyV3.Stop(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
}
}
return err
}
func (p *proxyEtcdProcess) Close() error {
err := p.proxyV2.Close()
if v3err := p.proxyV3.Close(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
}
}
return err
}
func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
p.proxyV3.WithStopSignal(sig)
p.proxyV3.WithStopSignal(sig)
return p.etcdProc.WithStopSignal(sig)
}
type proxyProc struct {
execPath string
args []string
ep string
donec chan struct{}
proc *expect.ExpectProcess
}
func (pp *proxyProc) endpoints() []string { return []string{pp.ep} }
func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...))
if err != nil {
return err
}
pp.proc = proc
return nil
}
func (pp *proxyProc) waitReady(readyStr string) error {
defer close(pp.donec)
return waitReadyExpectProc(pp.proc, []string{readyStr})
}
func (pp *proxyProc) Stop() error {
if pp.proc == nil {
return nil
}
if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") {
// v2proxy exits with status 1 on auto tls; not sure why
return err
}
pp.proc = nil
<-pp.donec
pp.donec = make(chan struct{})
return nil
}
func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal {
ret := pp.proc.StopSignal
pp.proc.StopSignal = sig
return ret
}
func (pp *proxyProc) Close() error { return pp.Stop() }
type proxyV2Proc struct {
proxyProc
dataDir string
}
func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string {
u, err := url.Parse(cfg.acurl)
if err != nil {
panic(err)
}
host, port, _ := net.SplitHostPort(u.Host)
p, _ := strconv.ParseInt(port, 10, 16)
u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset)
return u.String()
}
func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc {
listenAddr := proxyListenURL(cfg, 2)
name := fmt.Sprintf("testname-proxy-%p", cfg)
args := []string{
"--name", name,
"--proxy", "on",
"--listen-client-urls", listenAddr,
"--initial-cluster", cfg.name + "=" + cfg.purl.String(),
}
return &proxyV2Proc{
proxyProc{
execPath: cfg.execPath,
args: append(args, cfg.tlsArgs...),
ep: listenAddr,
donec: make(chan struct{}),
},
name + ".etcd",
}
}
func (v2p *proxyV2Proc) Start() error {
os.RemoveAll(v2p.dataDir)
if err := v2p.start(); err != nil {
return err
}
return v2p.waitReady("httpproxy: endpoints found")
}
func (v2p *proxyV2Proc) Restart() error {
if err := v2p.Stop(); err != nil {
return err
}
return v2p.Start()
}
func (v2p *proxyV2Proc) Stop() error {
if err := v2p.proxyProc.Stop(); err != nil {
return err
}
// v2 proxy caches members; avoid reuse of directory
return os.RemoveAll(v2p.dataDir)
}
type proxyV3Proc struct {
proxyProc
}
func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc {
listenAddr := proxyListenURL(cfg, 3)
args := []string{
"grpc-proxy",
"start",
"--listen-addr", strings.Split(listenAddr, "/")[2],
"--endpoints", cfg.acurl,
// pass-through member RPCs
"--advertise-client-url", "",
}
tlsArgs := []string{}
for i := 0; i < len(cfg.tlsArgs); i++ {
switch cfg.tlsArgs[i] {
case "--cert-file":
tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1])
i++
case "--key-file":
tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1])
i++
case "--ca-file":
tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1])
i++
case "--auto-tls":
tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify")
case "--peer-ca-file", "--peer-cert-file", "--peer-key-file":
i++ // skip arg
case "--client-cert-auth", "--peer-auto-tls":
default:
tlsArgs = append(tlsArgs, cfg.tlsArgs[i])
}
}
return &proxyV3Proc{
proxyProc{
execPath: cfg.execPath,
args: append(args, tlsArgs...),
ep: listenAddr,
donec: make(chan struct{}),
},
}
}
func (v3p *proxyV3Proc) Restart() error {
if err := v3p.Stop(); err != nil {
return err
}
return v3p.Start()
}
func (v3p *proxyV3Proc) Start() error {
if err := v3p.start(); err != nil {
return err
}
return v3p.waitReady("listening for grpc-proxy client requests")
}

359
e2e/cluster_test.go Normal file
View File

@ -0,0 +1,359 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"fmt"
"io/ioutil"
"net/url"
"os"
"strings"
"github.com/coreos/etcd/etcdserver"
)
const etcdProcessBasePort = 20000
type clientConnType int
const (
clientNonTLS clientConnType = iota
clientTLS
clientTLSAndNonTLS
)
var (
configNoTLS = etcdProcessClusterConfig{
clusterSize: 3,
initialToken: "new",
}
configAutoTLS = etcdProcessClusterConfig{
clusterSize: 3,
isPeerTLS: true,
isPeerAutoTLS: true,
initialToken: "new",
}
configTLS = etcdProcessClusterConfig{
clusterSize: 3,
clientTLS: clientTLS,
isPeerTLS: true,
initialToken: "new",
}
configClientTLS = etcdProcessClusterConfig{
clusterSize: 3,
clientTLS: clientTLS,
initialToken: "new",
}
configClientBoth = etcdProcessClusterConfig{
clusterSize: 1,
clientTLS: clientTLSAndNonTLS,
initialToken: "new",
}
configClientAutoTLS = etcdProcessClusterConfig{
clusterSize: 1,
isClientAutoTLS: true,
clientTLS: clientTLS,
initialToken: "new",
}
configPeerTLS = etcdProcessClusterConfig{
clusterSize: 3,
isPeerTLS: true,
initialToken: "new",
}
configClientTLSCertAuth = etcdProcessClusterConfig{
clusterSize: 1,
clientTLS: clientTLS,
initialToken: "new",
clientCertAuthEnabled: true,
}
)
func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
ret := cfg
ret.clusterSize = 1
return &ret
}
type etcdProcessCluster struct {
cfg *etcdProcessClusterConfig
procs []etcdProcess
}
type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
clusterSize int
baseScheme string
basePort int
snapCount int // default is 10000
clientTLS clientConnType
clientCertAuthEnabled bool
isPeerTLS bool
isPeerAutoTLS bool
isClientAutoTLS bool
isClientCRL bool
forceNewCluster bool
initialToken string
quotaBackendBytes int64
noStrictReconfig bool
}
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new etcdProcessCluster once all nodes are ready to accept client requests.
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
etcdCfgs := cfg.etcdServerProcessConfigs()
epc := &etcdProcessCluster{
cfg: cfg,
procs: make([]etcdProcess, cfg.clusterSize),
}
// launch etcd processes
for i := range etcdCfgs {
proc, err := newEtcdProcess(etcdCfgs[i])
if err != nil {
epc.Close()
return nil, err
}
epc.procs[i] = proc
}
if err := epc.Start(); err != nil {
return nil, err
}
return epc, nil
}
func (cfg *etcdProcessClusterConfig) clientScheme() string {
if cfg.clientTLS == clientTLS {
return "https"
}
return "http"
}
func (cfg *etcdProcessClusterConfig) peerScheme() string {
peerScheme := cfg.baseScheme
if peerScheme == "" {
peerScheme = "http"
}
if cfg.isPeerTLS {
peerScheme += "s"
}
return peerScheme
}
func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
if cfg.basePort == 0 {
cfg.basePort = etcdProcessBasePort
}
if cfg.execPath == "" {
cfg.execPath = binPath
}
if cfg.snapCount == 0 {
cfg.snapCount = etcdserver.DefaultSnapCount
}
etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
initialCluster := make([]string, cfg.clusterSize)
for i := 0; i < cfg.clusterSize; i++ {
var curls []string
var curl, curltls string
port := cfg.basePort + 4*i
curlHost := fmt.Sprintf("localhost:%d", port)
switch cfg.clientTLS {
case clientNonTLS, clientTLS:
curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
curls = []string{curl}
case clientTLSAndNonTLS:
curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
curls = []string{curl, curltls}
}
purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
name := fmt.Sprintf("testname%d", i)
dataDirPath := cfg.dataDirPath
if cfg.dataDirPath == "" {
var derr error
dataDirPath, derr = ioutil.TempDir("", name+".etcd")
if derr != nil {
panic("could not get tempdir for datadir")
}
}
initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
args := []string{
"--name", name,
"--listen-client-urls", strings.Join(curls, ","),
"--advertise-client-urls", strings.Join(curls, ","),
"--listen-peer-urls", purl.String(),
"--initial-advertise-peer-urls", purl.String(),
"--initial-cluster-token", cfg.initialToken,
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
}
if cfg.forceNewCluster {
args = append(args, "--force-new-cluster")
}
if cfg.quotaBackendBytes > 0 {
args = append(args,
"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
)
}
if cfg.noStrictReconfig {
args = append(args, "--strict-reconfig-check=false")
}
args = append(args, cfg.tlsArgs()...)
etcdCfgs[i] = &etcdServerProcessConfig{
execPath: cfg.execPath,
args: args,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
initialToken: cfg.initialToken,
}
}
initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
for i := range etcdCfgs {
etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
}
return etcdCfgs
}
func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
if cfg.clientTLS != clientNonTLS {
if cfg.isClientAutoTLS {
args = append(args, "--auto-tls")
} else {
tlsClientArgs := []string{
"--cert-file", certPath,
"--key-file", privateKeyPath,
"--ca-file", caPath,
}
args = append(args, tlsClientArgs...)
if cfg.clientCertAuthEnabled {
args = append(args, "--client-cert-auth")
}
}
}
if cfg.isPeerTLS {
if cfg.isPeerAutoTLS {
args = append(args, "--peer-auto-tls")
} else {
tlsPeerArgs := []string{
"--peer-cert-file", certPath,
"--peer-key-file", privateKeyPath,
"--peer-ca-file", caPath,
}
args = append(args, tlsPeerArgs...)
}
}
if cfg.isClientCRL {
args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
}
return args
}
func (epc *etcdProcessCluster) EndpointsV2() []string {
return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
}
func (epc *etcdProcessCluster) EndpointsV3() []string {
return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
}
func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
for _, p := range epc.procs {
ret = append(ret, f(p)...)
}
return ret
}
func (epc *etcdProcessCluster) Start() error {
return epc.start(func(ep etcdProcess) error { return ep.Start() })
}
func (epc *etcdProcessCluster) Restart() error {
return epc.start(func(ep etcdProcess) error { return ep.Restart() })
}
func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
readyC := make(chan error, len(epc.procs))
for i := range epc.procs {
go func(n int) { readyC <- f(epc.procs[n]) }(i)
}
for range epc.procs {
if err := <-readyC; err != nil {
epc.Close()
return err
}
}
return nil
}
func (epc *etcdProcessCluster) Stop() (err error) {
for _, p := range epc.procs {
if p == nil {
continue
}
if curErr := p.Stop(); curErr != nil {
if err != nil {
err = fmt.Errorf("%v; %v", err, curErr)
} else {
err = curErr
}
}
}
return err
}
func (epc *etcdProcessCluster) Close() error {
err := epc.Stop()
for _, p := range epc.procs {
// p is nil when newEtcdProcess fails in the middle
// Close still gets called to clean up test data
if p == nil {
continue
}
if cerr := p.Close(); cerr != nil {
err = cerr
}
}
return err
}
func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
for _, p := range epc.procs {
ret = p.WithStopSignal(sig)
}
return ret
}

View File

@ -128,10 +128,9 @@ func testCtlV2Ls(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) {
}
}
func TestCtlV2Watch(t *testing.T) { testCtlV2Watch(t, &configNoTLS, false) }
func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, &configTLS, false) }
func TestCtlV2WatchWithProxy(t *testing.T) { testCtlV2Watch(t, &configWithProxy, false) }
func TestCtlV2WatchWithProxyNoSync(t *testing.T) { testCtlV2Watch(t, &configWithProxy, true) }
func TestCtlV2Watch(t *testing.T) { testCtlV2Watch(t, &configNoTLS, false) }
func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, &configTLS, false) }
func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) {
defer testutil.AfterTest(t)
@ -158,12 +157,10 @@ func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) {
}
}
func TestCtlV2GetRoleUser(t *testing.T) { testCtlV2GetRoleUser(t, &configNoTLS) }
func TestCtlV2GetRoleUserWithProxy(t *testing.T) { testCtlV2GetRoleUser(t, &configWithProxy) }
func testCtlV2GetRoleUser(t *testing.T, cfg *etcdProcessClusterConfig) {
func TestCtlV2GetRoleUser(t *testing.T) {
defer testutil.AfterTest(t)
epc := setupEtcdctlTest(t, cfg, false)
epc := setupEtcdctlTest(t, &configNoTLS, false)
defer func() {
if err := epc.Close(); err != nil {
t.Fatalf("error closing etcd processes (%v)", err)
@ -196,7 +193,7 @@ func TestCtlV2UserListRoot(t *testing.T) { testCtlV2UserList(t, "root") }
func testCtlV2UserList(t *testing.T, username string) {
defer testutil.AfterTest(t)
epc := setupEtcdctlTest(t, &configWithProxy, false)
epc := setupEtcdctlTest(t, &configNoTLS, false)
defer func() {
if err := epc.Close(); err != nil {
t.Fatalf("error closing etcd processes (%v)", err)
@ -214,7 +211,7 @@ func testCtlV2UserList(t *testing.T, username string) {
func TestCtlV2RoleList(t *testing.T) {
defer testutil.AfterTest(t)
epc := setupEtcdctlTest(t, &configWithProxy, false)
epc := setupEtcdctlTest(t, &configNoTLS, false)
defer func() {
if err := epc.Close(); err != nil {
t.Fatalf("error closing etcd processes (%v)", err)
@ -243,7 +240,7 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue
t.Fatal(err)
}
if err := etcdctlBackup(epc1, epc1.procs[0].cfg.dataDirPath, backupDir); err != nil {
if err := etcdctlBackup(epc1, epc1.procs[0].Config().dataDirPath, backupDir); err != nil {
t.Fatal(err)
}
@ -350,16 +347,7 @@ func TestCtlV2ClusterHealth(t *testing.T) {
}
func etcdctlPrefixArgs(clus *etcdProcessCluster) []string {
endpoints := ""
if proxies := clus.proxies(); len(proxies) != 0 {
endpoints = proxies[0].cfg.acurl
} else if processes := clus.processes(); len(processes) != 0 {
es := []string{}
for _, b := range processes {
es = append(es, b.cfg.acurl)
}
endpoints = strings.Join(es, ",")
}
endpoints := strings.Join(clus.EndpointsV2(), ",")
cmdArgs := []string{ctlBinPath, "--endpoints", endpoints}
if clus.cfg.clientTLS == clientTLS {
cmdArgs = append(cmdArgs, "--ca-file", caPath, "--cert-file", certPath, "--key-file", privateKeyPath)

View File

@ -64,7 +64,7 @@ func alarmTest(cx ctlCtx) {
}
}
eps := cx.epc.grpcEndpoints()
eps := cx.epc.EndpointsV3()
// get latest revision to compact
cli, err := clientv3.New(clientv3.Config{

View File

@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Skip proxy tests for now since auth is broken on grpcproxy.
// +build !cluster_proxy
package e2e
import (
"fmt"
"os"
"testing"
"github.com/coreos/etcd/clientv3"
@ -44,6 +48,12 @@ func TestCtlV3AuthRoleGet(t *testing.T) { testCtl(t, authTestRoleGet) }
func TestCtlV3AuthUserGet(t *testing.T) { testCtl(t, authTestUserGet) }
func TestCtlV3AuthRoleList(t *testing.T) { testCtl(t, authTestRoleList) }
func TestCtlV3AuthDefrag(t *testing.T) { testCtl(t, authTestDefrag) }
func TestCtlV3AuthEndpointHealth(t *testing.T) {
testCtl(t, authTestEndpointHealth, withQuorum())
}
func TestCtlV3AuthSnapshot(t *testing.T) { testCtl(t, authTestSnapshot) }
func authEnableTest(cx ctlCtx) {
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
@ -816,3 +826,92 @@ func authTestRoleList(cx ctlCtx) {
cx.t.Fatal(err)
}
}
func authTestDefrag(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
// ordinary user cannot defrag
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Defrag(cx); err == nil {
cx.t.Fatal("ordinary user should not be able to issue a defrag request")
}
// root can defrag
cx.user, cx.pass = "root", "root"
if err := ctlV3Defrag(cx); err != nil {
cx.t.Fatal(err)
}
}
func authTestSnapshot(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
fpath := "test.snapshot"
defer os.RemoveAll(fpath)
// ordinary user cannot save a snapshot
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3SnapshotSave(cx, fpath); err == nil {
cx.t.Fatal("ordinary user should not be able to save a snapshot")
}
// root can save a snapshot
cx.user, cx.pass = "root", "root"
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
}
st, err := getSnapshotStatus(cx, fpath)
if err != nil {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
if st.Revision != 4 {
cx.t.Fatalf("expected 4, got %d", st.Revision)
}
if st.TotalKey < 3 {
cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
}
}
func authTestEndpointHealth(cx ctlCtx) {
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
if err := ctlV3EndpointHealth(cx); err != nil {
cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
}
// health checking with an ordinary user "succeeds" since permission denial goes through consensus
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3EndpointHealth(cx); err != nil {
cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
}
// succeed if permissions granted for ordinary user
cx.user, cx.pass = "root", "root"
if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3EndpointHealth(cx); err != nil {
cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
}
}

View File

@ -16,8 +16,7 @@ package e2e
import "testing"
func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func TestCtlV3DefragWithAuth(t *testing.T) { testCtl(t, defragTestWithAuth) }
func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) }
func maintenanceInitKeys(cx ctlCtx) {
var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}}
@ -40,29 +39,6 @@ func defragTest(cx ctlCtx) {
}
}
func defragTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
// ordinary user cannot defrag
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3Defrag(cx); err == nil {
cx.t.Fatal("ordinary user should not be able to issue a defrag request")
}
// root can defrag
cx.user, cx.pass = "root", "root"
if err := ctlV3Defrag(cx); err != nil {
cx.t.Fatal(err)
}
}
func ctlV3Defrag(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "defrag")
lines := make([]string, cx.epc.cfg.clusterSize)

View File

@ -33,8 +33,8 @@ func TestCtlV3Elect(t *testing.T) {
func testElect(cx ctlCtx) {
// debugging for #6934
sig := cx.epc.withStopSignal(debugLockSignal)
defer cx.epc.withStopSignal(sig)
sig := cx.epc.WithStopSignal(debugLockSignal)
defer cx.epc.WithStopSignal(sig)
name := "a"

View File

@ -21,9 +21,6 @@ import (
func TestCtlV3EndpointHealth(t *testing.T) { testCtl(t, endpointHealthTest, withQuorum()) }
func TestCtlV3EndpointStatus(t *testing.T) { testCtl(t, endpointStatusTest, withQuorum()) }
func TestCtlV3EndpointHealthWithAuth(t *testing.T) {
testCtl(t, endpointHealthTestWithAuth, withQuorum())
}
func endpointHealthTest(cx ctlCtx) {
if err := ctlV3EndpointHealth(cx); err != nil {
@ -49,38 +46,9 @@ func endpointStatusTest(cx ctlCtx) {
func ctlV3EndpointStatus(cx ctlCtx) error {
cmdArgs := append(cx.PrefixArgs(), "endpoint", "status")
var eps []string
for _, ep := range cx.epc.endpoints() {
for _, ep := range cx.epc.EndpointsV3() {
u, _ := url.Parse(ep)
eps = append(eps, u.Host)
}
return spawnWithExpects(cmdArgs, eps...)
}
func endpointHealthTestWithAuth(cx ctlCtx) {
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
if err := ctlV3EndpointHealth(cx); err != nil {
cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
}
// health checking with an ordinary user "succeeds" since permission denial goes through consensus
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3EndpointHealth(cx); err != nil {
cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
}
// succeed if permissions granted for ordinary user
cx.user, cx.pass = "root", "root"
if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3EndpointHealth(cx); err != nil {
cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err)
}
}

View File

@ -56,8 +56,8 @@ func TestCtlV3Lock(t *testing.T) {
func testLock(cx ctlCtx) {
// debugging for #6464
sig := cx.epc.withStopSignal(debugLockSignal)
defer cx.epc.withStopSignal(sig)
sig := cx.epc.WithStopSignal(debugLockSignal)
defer cx.epc.WithStopSignal(sig)
name := "a"

View File

@ -48,8 +48,8 @@ func TestCtlV3Migrate(t *testing.T) {
}
}
dataDir := epc.procs[0].cfg.dataDirPath
if err := epc.StopAll(); err != nil {
dataDir := epc.procs[0].Config().dataDirPath
if err := epc.Stop(); err != nil {
t.Fatalf("error closing etcd processes (%v)", err)
}
@ -65,8 +65,8 @@ func TestCtlV3Migrate(t *testing.T) {
t.Fatal(err)
}
epc.procs[0].cfg.keepDataDir = true
if err := epc.RestartAll(); err != nil {
epc.procs[0].Config().keepDataDir = true
if err := epc.Restart(); err != nil {
t.Fatal(err)
}
@ -75,7 +75,7 @@ func TestCtlV3Migrate(t *testing.T) {
t.Fatal(err)
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: epc.grpcEndpoints(),
Endpoints: epc.EndpointsV3(),
DialTimeout: 3 * time.Second,
})
if err != nil {

View File

@ -39,7 +39,7 @@ func TestCtlV3MoveLeader(t *testing.T) {
var leadIdx int
var leaderID uint64
var transferee uint64
for i, ep := range epc.grpcEndpoints() {
for i, ep := range epc.EndpointsV3() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 3 * time.Second,
@ -75,11 +75,11 @@ func TestCtlV3MoveLeader(t *testing.T) {
expect string
}{
{ // request to non-leader
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[(leadIdx+1)%3]}),
cx.prefixArgs([]string{cx.epc.EndpointsV3()[(leadIdx+1)%3]}),
"no leader endpoint given at ",
},
{ // request to leader
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[leadIdx]}),
cx.prefixArgs([]string{cx.epc.EndpointsV3()[leadIdx]}),
fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)),
},
}

View File

@ -152,7 +152,7 @@ func TestIssue6361(t *testing.T) {
}()
dialTimeout := 7 * time.Second
prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.grpcEndpoints(), ","), "--dial-timeout", dialTimeout.String()}
prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()}
// write some keys
kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}}
@ -170,7 +170,7 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
if err = epc.processes()[0].Stop(); err != nil {
if err = epc.procs[0].Stop(); err != nil {
t.Fatal(err)
}
@ -178,19 +178,19 @@ func TestIssue6361(t *testing.T) {
defer os.RemoveAll(newDataDir)
// etcdctl restore the snapshot
err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].cfg.name, "--initial-cluster", epc.procs[0].cfg.initialCluster, "--initial-cluster-token", epc.procs[0].cfg.initialToken, "--initial-advertise-peer-urls", epc.procs[0].cfg.purl.String(), "--data-dir", newDataDir}, "membership: added member")
err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].Config().name, "--initial-cluster", epc.procs[0].Config().initialCluster, "--initial-cluster-token", epc.procs[0].Config().initialToken, "--initial-advertise-peer-urls", epc.procs[0].Config().purl.String(), "--data-dir", newDataDir}, "membership: added member")
if err != nil {
t.Fatal(err)
}
// start the etcd member using the restored snapshot
epc.procs[0].cfg.dataDirPath = newDataDir
for i := range epc.procs[0].cfg.args {
if epc.procs[0].cfg.args[i] == "--data-dir" {
epc.procs[0].cfg.args[i+1] = newDataDir
epc.procs[0].Config().dataDirPath = newDataDir
for i := range epc.procs[0].Config().args {
if epc.procs[0].Config().args[i] == "--data-dir" {
epc.procs[0].Config().args[i+1] = newDataDir
}
}
if err = epc.processes()[0].Restart(); err != nil {
if err = epc.procs[0].Restart(); err != nil {
t.Fatal(err)
}
@ -217,11 +217,11 @@ func TestIssue6361(t *testing.T) {
defer os.RemoveAll(newDataDir2)
name2 := "infra2"
initialCluster2 := epc.procs[0].cfg.initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL)
initialCluster2 := epc.procs[0].Config().initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL)
// start the new member
var nepc *expect.ExpectProcess
nepc, err = spawnCmd([]string{epc.procs[0].cfg.execPath, "--name", name2,
nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2,
"--listen-client-urls", clientURL, "--advertise-client-urls", clientURL,
"--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL,
"--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2})
@ -245,42 +245,3 @@ func TestIssue6361(t *testing.T) {
t.Fatal(err)
}
}
func TestCtlV3SnapshotWithAuth(t *testing.T) { testCtl(t, snapshotTestWithAuth) }
func snapshotTestWithAuth(cx ctlCtx) {
maintenanceInitKeys(cx)
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
}
cx.user, cx.pass = "root", "root"
authSetupTestUser(cx)
fpath := "test.snapshot"
defer os.RemoveAll(fpath)
// ordinary user cannot save a snapshot
cx.user, cx.pass = "test-user", "pass"
if err := ctlV3SnapshotSave(cx, fpath); err == nil {
cx.t.Fatal("ordinary user should not be able to save a snapshot")
}
// root can save a snapshot
cx.user, cx.pass = "root", "root"
if err := ctlV3SnapshotSave(cx, fpath); err != nil {
cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
}
st, err := getSnapshotStatus(cx, fpath)
if err != nil {
cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
}
if st.Revision != 4 {
cx.t.Fatalf("expected 4, got %d", st.Revision)
}
if st.TotalKey < 3 {
cx.t.Fatalf("expected at least 3, got %d", st.TotalKey)
}
}

View File

@ -45,7 +45,7 @@ func TestCtlV3DialWithHTTPScheme(t *testing.T) {
}
func dialWithSchemeTest(cx ctlCtx) {
cmdArgs := append(cx.prefixArgs(cx.epc.endpoints()), "put", "foo", "bar")
cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar")
if err := spawnWithExpect(cmdArgs, "OK"); err != nil {
cx.t.Fatal(err)
}
@ -169,10 +169,6 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
}
func (cx *ctlCtx) prefixArgs(eps []string) []string {
if len(cx.epc.proxies()) > 0 { // TODO: add proxy check as in v2
panic("v3 proxy not implemented")
}
fmap := make(map[string]string)
fmap["endpoints"] = strings.Join(eps, ",")
fmap["dial-timeout"] = cx.dialTimeout.String()
@ -212,7 +208,7 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string {
// PrefixArgs prefixes etcdctl command.
// Make sure to unset environment variables after tests.
func (cx *ctlCtx) PrefixArgs() []string {
return cx.prefixArgs(cx.epc.grpcEndpoints())
return cx.prefixArgs(cx.epc.EndpointsV3())
}
func isGRPCTimedout(err error) bool {

View File

@ -25,7 +25,7 @@ func TestEtcdExampleConfig(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err = waitReadyExpectProc(proc, false); err != nil {
if err = waitReadyExpectProc(proc, etcdServerReadyLines); err != nil {
t.Fatal(err)
}
if err = proc.Stop(); err != nil {

134
e2e/etcd_process.go Normal file
View File

@ -0,0 +1,134 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"fmt"
"net/url"
"os"
"github.com/coreos/etcd/pkg/expect"
"github.com/coreos/etcd/pkg/fileutil"
)
var etcdServerReadyLines = []string{"enabled capabilities for version", "published"}
// etcdProcess is a process that serves etcd requests.
type etcdProcess interface {
EndpointsV2() []string
EndpointsV3() []string
Start() error
Restart() error
Stop() error
Close() error
WithStopSignal(sig os.Signal) os.Signal
Config() *etcdServerProcessConfig
}
type etcdServerProcess struct {
cfg *etcdServerProcessConfig
proc *expect.ExpectProcess
donec chan struct{} // closed when Interact() terminates
}
type etcdServerProcessConfig struct {
execPath string
args []string
tlsArgs []string
dataDirPath string
keepDataDir bool
name string
purl url.URL
acurl string
initialToken string
initialCluster string
}
func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, error) {
if !fileutil.Exist(cfg.execPath) {
return nil, fmt.Errorf("could not find etcd binary")
}
if !cfg.keepDataDir {
if err := os.RemoveAll(cfg.dataDirPath); err != nil {
return nil, err
}
}
return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
}
func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} }
func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *etcdServerProcess) Start() error {
if ep.proc != nil {
panic("already started")
}
proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...))
if err != nil {
return err
}
ep.proc = proc
return ep.waitReady()
}
func (ep *etcdServerProcess) Restart() error {
if err := ep.Stop(); err != nil {
return err
}
ep.donec = make(chan struct{})
return ep.Start()
}
func (ep *etcdServerProcess) Stop() error {
if ep == nil || ep.proc == nil {
return nil
}
if err := ep.proc.Stop(); err != nil {
return err
}
ep.proc = nil
<-ep.donec
ep.donec = make(chan struct{})
if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path)
}
return nil
}
func (ep *etcdServerProcess) Close() error {
if err := ep.Stop(); err != nil {
return err
}
return os.RemoveAll(ep.cfg.dataDirPath)
}
func (ep *etcdServerProcess) WithStopSignal(sig os.Signal) os.Signal {
ret := ep.proc.StopSignal
ep.proc.StopSignal = sig
return ret
}
func (ep *etcdServerProcess) waitReady() error {
defer close(ep.donec)
return waitReadyExpectProc(ep.proc, etcdServerReadyLines)
}
func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg }

View File

@ -88,8 +88,8 @@ func TestReleaseUpgrade(t *testing.T) {
if err := epc.procs[i].Stop(); err != nil {
t.Fatalf("#%d: error closing etcd process (%v)", i, err)
}
epc.procs[i].cfg.execPath = binDir + "/etcd"
epc.procs[i].cfg.keepDataDir = true
epc.procs[i].Config().execPath = binDir + "/etcd"
epc.procs[i].Config().keepDataDir = true
if err := epc.procs[i].Restart(); err != nil {
t.Fatalf("error restarting etcd process (%v)", err)
@ -155,8 +155,8 @@ func TestReleaseUpgradeWithRestart(t *testing.T) {
wg.Add(len(epc.procs))
for i := range epc.procs {
go func(i int) {
epc.procs[i].cfg.execPath = binDir + "/etcd"
epc.procs[i].cfg.keepDataDir = true
epc.procs[i].Config().execPath = binDir + "/etcd"
epc.procs[i].Config().keepDataDir = true
if err := epc.procs[i].Restart(); err != nil {
t.Fatalf("error restarting etcd process (%v)", err)
}

View File

@ -33,20 +33,7 @@ const noOutputLineCount = 2 // cov-enabled binaries emit PASS and coverage count
func spawnCmd(args []string) (*expect.ExpectProcess, error) {
if args[0] == binPath {
covArgs, err := getCovArgs()
if err != nil {
return nil, err
}
ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, args2env(args[1:]))
if err != nil {
return nil, err
}
// ep sends SIGTERM to etcd_test process on ep.close()
// allowing the process to exit gracefully in order to generate a coverage report.
// note: go runtime ignores SIGINT but not SIGTERM
// if e2e test is run as a background process.
ep.StopSignal = syscall.SIGTERM
return ep, nil
return spawnEtcd(args)
}
if args[0] == ctlBinPath {
@ -73,6 +60,32 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) {
return expect.NewExpect(args[0], args[1:]...)
}
func spawnEtcd(args []string) (*expect.ExpectProcess, error) {
covArgs, err := getCovArgs()
if err != nil {
return nil, err
}
env := []string{}
if args[1] == "grpc-proxy" {
// avoid test flag conflicts in coverage enabled etcd by putting flags in ETCDCOV_ARGS
env = append(os.Environ(), "ETCDCOV_ARGS="+strings.Join(args, "\xe7\xcd"))
} else {
env = args2env(args[1:])
}
ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, env)
if err != nil {
return nil, err
}
// ep sends SIGTERM to etcd_test process on ep.close()
// allowing the process to exit gracefully in order to generate a coverage report.
// note: go runtime ignores SIGINT but not SIGTERM
// if e2e test is run as a background process.
ep.StopSignal = syscall.SIGTERM
return ep, nil
}
func getCovArgs() ([]string, error) {
coverPath := os.Getenv("COVERDIR")
if !filepath.IsAbs(coverPath) {

View File

@ -1,593 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"fmt"
"io/ioutil"
"net/url"
"os"
"strings"
"time"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/pkg/expect"
"github.com/coreos/etcd/pkg/fileutil"
)
const etcdProcessBasePort = 20000
var (
binPath string
ctlBinPath string
certPath string
privateKeyPath string
caPath string
crlPath string
revokedCertPath string
revokedPrivateKeyPath string
)
type clientConnType int
const (
clientNonTLS clientConnType = iota
clientTLS
clientTLSAndNonTLS
)
var (
configNoTLS = etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 0,
initialToken: "new",
}
configAutoTLS = etcdProcessClusterConfig{
clusterSize: 3,
isPeerTLS: true,
isPeerAutoTLS: true,
initialToken: "new",
}
configTLS = etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 0,
clientTLS: clientTLS,
isPeerTLS: true,
initialToken: "new",
}
configClientTLS = etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 0,
clientTLS: clientTLS,
initialToken: "new",
}
configClientBoth = etcdProcessClusterConfig{
clusterSize: 1,
proxySize: 0,
clientTLS: clientTLSAndNonTLS,
initialToken: "new",
}
configClientAutoTLS = etcdProcessClusterConfig{
clusterSize: 1,
proxySize: 0,
isClientAutoTLS: true,
clientTLS: clientTLS,
initialToken: "new",
}
configPeerTLS = etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 0,
isPeerTLS: true,
initialToken: "new",
}
configWithProxy = etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 1,
initialToken: "new",
}
configWithProxyTLS = etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 1,
clientTLS: clientTLS,
isPeerTLS: true,
initialToken: "new",
}
configWithProxyPeerTLS = etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 1,
isPeerTLS: true,
initialToken: "new",
}
configClientTLSCertAuth = etcdProcessClusterConfig{
clusterSize: 1,
proxySize: 0,
clientTLS: clientTLS,
initialToken: "new",
clientCertAuthEnabled: true,
}
)
func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
ret := cfg
ret.clusterSize = 1
return &ret
}
type etcdProcessCluster struct {
cfg *etcdProcessClusterConfig
procs []*etcdProcess
}
type etcdProcess struct {
cfg *etcdProcessConfig
proc *expect.ExpectProcess
donec chan struct{} // closed when Interact() terminates
}
type etcdProcessConfig struct {
execPath string
args []string
dataDirPath string
keepDataDir bool
name string
purl url.URL
acurl string
// additional url for tls connection when the etcd process
// serves both http and https
acurltls string
acurlHost string
initialToken string
initialCluster string
isProxy bool
}
type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
clusterSize int
baseScheme string
basePort int
proxySize int
snapCount int // default is 10000
clientTLS clientConnType
clientCertAuthEnabled bool
isPeerTLS bool
isPeerAutoTLS bool
isClientAutoTLS bool
isClientCRL bool
forceNewCluster bool
initialToken string
quotaBackendBytes int64
noStrictReconfig bool
}
// newEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new etcdProcessCluster once all nodes are ready to accept client requests.
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
etcdCfgs := cfg.etcdProcessConfigs()
epc := &etcdProcessCluster{
cfg: cfg,
procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
}
// launch etcd processes
for i := range etcdCfgs {
proc, err := newEtcdProcess(etcdCfgs[i])
if err != nil {
epc.Close()
return nil, err
}
epc.procs[i] = proc
}
return epc, epc.Start()
}
func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
if !fileutil.Exist(cfg.execPath) {
return nil, fmt.Errorf("could not find etcd binary")
}
if !cfg.keepDataDir {
if err := os.RemoveAll(cfg.dataDirPath); err != nil {
return nil, err
}
}
child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...))
if err != nil {
return nil, err
}
return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
}
func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
binPath = binDir + "/etcd"
ctlBinPath = binDir + "/etcdctl"
certPath = certDir + "/server.crt"
privateKeyPath = certDir + "/server.key.insecure"
caPath = certDir + "/ca.crt"
revokedCertPath = certDir + "/server-revoked.crt"
revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure"
crlPath = certDir + "/revoke.crl"
if cfg.basePort == 0 {
cfg.basePort = etcdProcessBasePort
}
if cfg.execPath == "" {
cfg.execPath = binPath
}
if cfg.snapCount == 0 {
cfg.snapCount = etcdserver.DefaultSnapCount
}
clientScheme := "http"
if cfg.clientTLS == clientTLS {
clientScheme = "https"
}
peerScheme := cfg.baseScheme
if peerScheme == "" {
peerScheme = "http"
}
if cfg.isPeerTLS {
peerScheme += "s"
}
etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize)
initialCluster := make([]string, cfg.clusterSize)
for i := 0; i < cfg.clusterSize; i++ {
var curls []string
var curl, curltls string
port := cfg.basePort + 2*i
curlHost := fmt.Sprintf("localhost:%d", port)
switch cfg.clientTLS {
case clientNonTLS, clientTLS:
curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String()
curls = []string{curl}
case clientTLSAndNonTLS:
curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
curls = []string{curl, curltls}
}
purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
name := fmt.Sprintf("testname%d", i)
dataDirPath := cfg.dataDirPath
if cfg.dataDirPath == "" {
var derr error
dataDirPath, derr = ioutil.TempDir("", name+".etcd")
if derr != nil {
panic("could not get tempdir for datadir")
}
}
initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
args := []string{
"--name", name,
"--listen-client-urls", strings.Join(curls, ","),
"--advertise-client-urls", strings.Join(curls, ","),
"--listen-peer-urls", purl.String(),
"--initial-advertise-peer-urls", purl.String(),
"--initial-cluster-token", cfg.initialToken,
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
}
if cfg.forceNewCluster {
args = append(args, "--force-new-cluster")
}
if cfg.quotaBackendBytes > 0 {
args = append(args,
"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
)
}
if cfg.noStrictReconfig {
args = append(args, "--strict-reconfig-check=false")
}
args = append(args, cfg.tlsArgs()...)
etcdCfgs[i] = &etcdProcessConfig{
execPath: cfg.execPath,
args: args,
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
acurltls: curltls,
acurlHost: curlHost,
initialToken: cfg.initialToken,
}
}
for i := 0; i < cfg.proxySize; i++ {
port := cfg.basePort + 2*cfg.clusterSize + i + 1
curlHost := fmt.Sprintf("localhost:%d", port)
curl := url.URL{Scheme: clientScheme, Host: curlHost}
name := fmt.Sprintf("testname-proxy%d", i)
dataDirPath, derr := ioutil.TempDir("", name+".etcd")
if derr != nil {
panic("could not get tempdir for datadir")
}
args := []string{
"--name", name,
"--proxy", "on",
"--listen-client-urls", curl.String(),
"--data-dir", dataDirPath,
}
args = append(args, cfg.tlsArgs()...)
etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{
execPath: cfg.execPath,
args: args,
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
acurl: curl.String(),
acurlHost: curlHost,
isProxy: true,
}
}
initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
for i := range etcdCfgs {
etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
}
return etcdCfgs
}
func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
if cfg.clientTLS != clientNonTLS {
if cfg.isClientAutoTLS {
args = append(args, "--auto-tls=true")
} else {
tlsClientArgs := []string{
"--cert-file", certPath,
"--key-file", privateKeyPath,
"--ca-file", caPath,
}
args = append(args, tlsClientArgs...)
if cfg.clientCertAuthEnabled {
args = append(args, "--client-cert-auth")
}
}
}
if cfg.isPeerTLS {
if cfg.isPeerAutoTLS {
args = append(args, "--peer-auto-tls=true")
} else {
tlsPeerArgs := []string{
"--peer-cert-file", certPath,
"--peer-key-file", privateKeyPath,
"--peer-ca-file", caPath,
}
args = append(args, tlsPeerArgs...)
}
}
if cfg.isClientCRL {
args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
}
return args
}
func (epc *etcdProcessCluster) Start() (err error) {
readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
for i := range epc.procs {
go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
}
for range epc.procs {
if err := <-readyC; err != nil {
epc.Close()
return err
}
}
return nil
}
func (epc *etcdProcessCluster) RestartAll() error {
for i := range epc.procs {
proc, err := newEtcdProcess(epc.procs[i].cfg)
if err != nil {
epc.Close()
return err
}
epc.procs[i] = proc
}
return epc.Start()
}
func (epc *etcdProcessCluster) StopAll() (err error) {
for _, p := range epc.procs {
if p == nil {
continue
}
if curErr := p.Stop(); curErr != nil {
if err != nil {
err = fmt.Errorf("%v; %v", err, curErr)
} else {
err = curErr
}
}
}
return err
}
func (epc *etcdProcessCluster) Close() error {
err := epc.StopAll()
for _, p := range epc.procs {
// p is nil when newEtcdProcess fails in the middle
// Close still gets called to clean up test data
if p == nil {
continue
}
os.RemoveAll(p.cfg.dataDirPath)
}
return err
}
func (ep *etcdProcess) Restart() error {
newEp, err := newEtcdProcess(ep.cfg)
if err != nil {
ep.Stop()
return err
}
*ep = *newEp
if err = ep.waitReady(); err != nil {
ep.Stop()
return err
}
return nil
}
func (ep *etcdProcess) Stop() error {
if ep == nil {
return nil
}
if err := ep.proc.Stop(); err != nil {
return err
}
<-ep.donec
if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" {
os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path)
}
return nil
}
func (ep *etcdProcess) waitReady() error {
defer close(ep.donec)
return waitReadyExpectProc(ep.proc, ep.cfg.isProxy)
}
func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error {
readyStrs := []string{"enabled capabilities for version", "published"}
if isProxy {
readyStrs = []string{"httpproxy: endpoints found"}
}
c := 0
matchSet := func(l string) bool {
for _, s := range readyStrs {
if strings.Contains(l, s) {
c++
break
}
}
return c == len(readyStrs)
}
_, err := exproc.ExpectFunc(matchSet)
return err
}
func spawnWithExpect(args []string, expected string) error {
return spawnWithExpects(args, []string{expected}...)
}
func spawnWithExpects(args []string, xs ...string) error {
proc, err := spawnCmd(args)
if err != nil {
return err
}
// process until either stdout or stderr contains
// the expected string
var (
lines []string
lineFunc = func(txt string) bool { return true }
)
for _, txt := range xs {
for {
l, lerr := proc.ExpectFunc(lineFunc)
if lerr != nil {
proc.Close()
return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
}
lines = append(lines, l)
if strings.Contains(l, txt) {
break
}
}
}
perr := proc.Close()
if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
}
return perr
}
// proxies returns only the proxy etcdProcess.
func (epc *etcdProcessCluster) proxies() []*etcdProcess {
return epc.procs[epc.cfg.clusterSize:]
}
func (epc *etcdProcessCluster) processes() []*etcdProcess {
return epc.procs[:epc.cfg.clusterSize]
}
func (epc *etcdProcessCluster) endpoints() []string {
eps := make([]string, epc.cfg.clusterSize)
for i, ep := range epc.processes() {
eps[i] = ep.cfg.acurl
}
return eps
}
func (epc *etcdProcessCluster) grpcEndpoints() []string {
eps := make([]string, epc.cfg.clusterSize)
for i, ep := range epc.processes() {
eps[i] = ep.cfg.acurlHost
}
return eps
}
func (epc *etcdProcessCluster) withStopSignal(sig os.Signal) os.Signal {
ret := epc.procs[0].proc.StopSignal
for _, p := range epc.procs {
p.proc.StopSignal = sig
}
return ret
}
func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
errc := make(chan error, 1)
go func() { errc <- p.Close() }()
select {
case err := <-errc:
return err
case <-time.After(d):
p.Stop()
// retry close after stopping to collect SIGQUIT data, if any
closeWithTimeout(p, time.Second)
}
return fmt.Errorf("took longer than %v to Close process %+v", d, p)
}

View File

@ -31,9 +31,9 @@ func TestGateway(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer ec.StopAll()
defer ec.Stop()
eps := strings.Join(ec.grpcEndpoints(), ",")
eps := strings.Join(ec.EndpointsV3(), ",")
p := startGateway(t, eps)
defer p.Stop()

View File

@ -13,8 +13,20 @@ import (
"github.com/coreos/etcd/pkg/testutil"
)
var binDir string
var certDir string
var (
binDir string
certDir string
binPath string
ctlBinPath string
certPath string
privateKeyPath string
caPath string
crlPath string
revokedCertPath string
revokedPrivateKeyPath string
)
func TestMain(m *testing.M) {
os.Setenv("ETCD_UNSUPPORTED_ARCH", runtime.GOARCH)
@ -24,6 +36,15 @@ func TestMain(m *testing.M) {
flag.StringVar(&certDir, "cert-dir", "../integration/fixtures", "The directory for store certificate files.")
flag.Parse()
binPath = binDir + "/etcd"
ctlBinPath = binDir + "/etcdctl"
certPath = certDir + "/server.crt"
privateKeyPath = certDir + "/server.key.insecure"
caPath = certDir + "/ca.crt"
revokedCertPath = certDir + "/server-revoked.crt"
revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure"
crlPath = certDir + "/revoke.crl"
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)

91
e2e/util.go Normal file
View File

@ -0,0 +1,91 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package e2e
import (
"fmt"
"strings"
"time"
"github.com/coreos/etcd/pkg/expect"
)
func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error {
c := 0
matchSet := func(l string) bool {
for _, s := range readyStrs {
if strings.Contains(l, s) {
c++
break
}
}
return c == len(readyStrs)
}
_, err := exproc.ExpectFunc(matchSet)
return err
}
func spawnWithExpect(args []string, expected string) error {
return spawnWithExpects(args, []string{expected}...)
}
func spawnWithExpects(args []string, xs ...string) error {
proc, err := spawnCmd(args)
if err != nil {
return err
}
// process until either stdout or stderr contains
// the expected string
var (
lines []string
lineFunc = func(txt string) bool { return true }
)
for _, txt := range xs {
for {
l, lerr := proc.ExpectFunc(lineFunc)
if lerr != nil {
proc.Close()
return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines)
}
lines = append(lines, l)
if strings.Contains(l, txt) {
break
}
}
}
perr := proc.Close()
if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output
return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount())
}
return perr
}
func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error {
errc := make(chan error, 1)
go func() { errc <- p.Close() }()
select {
case err := <-errc:
return err
case <-time.After(d):
p.Stop()
// retry close after stopping to collect SIGQUIT data, if any
closeWithTimeout(p, time.Second)
}
return fmt.Errorf("took longer than %v to Close process %+v", d, p)
}
func toTLS(s string) string {
return strings.Replace(s, "http://", "https://", 1)
}

View File

@ -23,15 +23,12 @@ import (
"github.com/coreos/etcd/pkg/testutil"
)
func TestV2CurlNoTLS(t *testing.T) { testCurlPutGet(t, &configNoTLS) }
func TestV2CurlAutoTLS(t *testing.T) { testCurlPutGet(t, &configAutoTLS) }
func TestV2CurlAllTLS(t *testing.T) { testCurlPutGet(t, &configTLS) }
func TestV2CurlPeerTLS(t *testing.T) { testCurlPutGet(t, &configPeerTLS) }
func TestV2CurlClientTLS(t *testing.T) { testCurlPutGet(t, &configClientTLS) }
func TestV2CurlProxyNoTLS(t *testing.T) { testCurlPutGet(t, &configWithProxy) }
func TestV2CurlProxyTLS(t *testing.T) { testCurlPutGet(t, &configWithProxyTLS) }
func TestV2CurlProxyPeerTLS(t *testing.T) { testCurlPutGet(t, &configWithProxyPeerTLS) }
func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, &configClientBoth) }
func TestV2CurlNoTLS(t *testing.T) { testCurlPutGet(t, &configNoTLS) }
func TestV2CurlAutoTLS(t *testing.T) { testCurlPutGet(t, &configAutoTLS) }
func TestV2CurlAllTLS(t *testing.T) { testCurlPutGet(t, &configTLS) }
func TestV2CurlPeerTLS(t *testing.T) { testCurlPutGet(t, &configPeerTLS) }
func TestV2CurlClientTLS(t *testing.T) { testCurlPutGet(t, &configClientTLS) }
func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, &configClientBoth) }
func testCurlPutGet(t *testing.T, cfg *etcdProcessClusterConfig) {
defer testutil.AfterTest(t)
@ -135,14 +132,14 @@ type cURLReq struct {
func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []string {
var (
cmdArgs = []string{"curl"}
acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurl
acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl
)
if req.isTLS {
if clus.cfg.clientTLS != clientTLSAndNonTLS {
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
}
cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurltls
acurl = toTLS(clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl)
} else if clus.cfg.clientTLS == clientTLS {
cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath)
}