mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14752 from serathius/linearizability-raftBeforeLeaderSend
Linearizability raft before leader send
This commit is contained in:
commit
2742bdc32a
@ -154,7 +154,7 @@ func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) {
|
||||
}
|
||||
|
||||
func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver *semver.Version) {
|
||||
c, err := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
|
||||
c, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
assert.NoError(t, err)
|
||||
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
|
||||
err := c.DowngradeEnable(context.TODO(), ver.String())
|
||||
|
@ -115,7 +115,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
@ -163,7 +163,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
|
@ -56,8 +56,8 @@ func TestCtlV3AuthCertCNWithWithConcurrentOperation(t *testing.T) {
|
||||
t.Log("Create etcd cluster")
|
||||
epc, err := e2e.NewEtcdProcessCluster(ctx, t,
|
||||
e2e.WithClusterSize(1),
|
||||
e2e.WithClientTLS(e2e.ClientTLS),
|
||||
e2e.WithClientCertAuthEnabled(true),
|
||||
e2e.WithClientConnType(e2e.ClientTLS),
|
||||
e2e.WithClientCertAuthority(true),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start etcd process cluster (%v)", err)
|
||||
|
@ -85,9 +85,9 @@ func TestAuthority(t *testing.T) {
|
||||
cfg := e2e.NewConfigNoTLS()
|
||||
cfg.ClusterSize = clusterSize
|
||||
if tc.useTLS {
|
||||
cfg.ClientTLS = e2e.ClientTLS
|
||||
cfg.Client.ConnectionType = e2e.ClientTLS
|
||||
}
|
||||
cfg.IsClientAutoTLS = tc.useInsecureTLS
|
||||
cfg.Client.AutoTLS = tc.useInsecureTLS
|
||||
// Enable debug mode to get logs with http2 headers (including authority)
|
||||
cfg.EnvVars = map[string]string{"GODEBUG": "http2debug=2"}
|
||||
|
||||
@ -98,7 +98,7 @@ func TestAuthority(t *testing.T) {
|
||||
defer epc.Close()
|
||||
endpoints := templateEndpoints(t, tc.clientURLPattern, epc)
|
||||
|
||||
client, err := e2e.NewEtcdctl(cfg, endpoints)
|
||||
client, err := e2e.NewEtcdctl(cfg.Client, endpoints)
|
||||
assert.NoError(t, err)
|
||||
err = client.Put(ctx, "foo", "bar", config.PutOptions{})
|
||||
if err != nil {
|
||||
|
@ -43,9 +43,9 @@ func TestCtlV3DelTimeout(t *testing.T) { testCtl(t, delTest, withDialTimeout(0))
|
||||
func TestCtlV3GetRevokedCRL(t *testing.T) {
|
||||
cfg := e2e.NewConfig(
|
||||
e2e.WithClusterSize(1),
|
||||
e2e.WithClientTLS(e2e.ClientTLS),
|
||||
e2e.WithIsClientCRL(true),
|
||||
e2e.WithClientCertAuthEnabled(true),
|
||||
e2e.WithClientConnType(e2e.ClientTLS),
|
||||
e2e.WithClientRevokeCerts(true),
|
||||
e2e.WithClientCertAuthority(true),
|
||||
)
|
||||
testCtl(t, testGetRevokedCRL, withCfg(*cfg))
|
||||
}
|
||||
@ -56,7 +56,7 @@ func testGetRevokedCRL(cx ctlCtx) {
|
||||
require.ErrorContains(cx.t, err, "context deadline exceeded")
|
||||
|
||||
// test accept
|
||||
cx.epc.Cfg.IsClientCRL = false
|
||||
cx.epc.Cfg.Client.RevokeCerts = false
|
||||
if err := ctlV3Put(cx, "k", "v", ""); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ func testCtlV3MoveLeader(t *testing.T, cfg e2e.EtcdProcessClusterConfig, envVars
|
||||
}()
|
||||
|
||||
var tcfg *tls.Config
|
||||
if cfg.ClientTLS == e2e.ClientTLS {
|
||||
if cfg.Client.ConnectionType == e2e.ClientTLS {
|
||||
tinfo := transport.TLSInfo{
|
||||
CertFile: e2e.CertPath,
|
||||
KeyFile: e2e.PrivateKeyPath,
|
||||
|
@ -295,11 +295,11 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string {
|
||||
fmap := make(map[string]string)
|
||||
fmap["endpoints"] = strings.Join(eps, ",")
|
||||
fmap["dial-timeout"] = cx.dialTimeout.String()
|
||||
if cx.epc.Cfg.ClientTLS == e2e.ClientTLS {
|
||||
if cx.epc.Cfg.IsClientAutoTLS {
|
||||
if cx.epc.Cfg.Client.ConnectionType == e2e.ClientTLS {
|
||||
if cx.epc.Cfg.Client.AutoTLS {
|
||||
fmap["insecure-transport"] = "false"
|
||||
fmap["insecure-skip-tls-verify"] = "true"
|
||||
} else if cx.epc.Cfg.IsClientCRL {
|
||||
} else if cx.epc.Cfg.Client.RevokeCerts {
|
||||
fmap["cacert"] = e2e.CaPath
|
||||
fmap["cert"] = e2e.RevokedCertPath
|
||||
fmap["key"] = e2e.RevokedPrivateKeyPath
|
||||
|
@ -51,8 +51,8 @@ func testClusterUsingV3Discovery(t *testing.T, discoveryClusterSize, targetClust
|
||||
ds, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
|
||||
e2e.WithBasePort(2000),
|
||||
e2e.WithClusterSize(discoveryClusterSize),
|
||||
e2e.WithClientTLS(clientTlsType),
|
||||
e2e.WithIsClientAutoTLS(isClientAutoTls),
|
||||
e2e.WithClientConnType(clientTlsType),
|
||||
e2e.WithClientAutoTLS(isClientAutoTls),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("could not start discovery etcd cluster (%v)", err)
|
||||
|
@ -57,7 +57,7 @@ func TestGrpcProxyAutoSync(t *testing.T) {
|
||||
assert.NoError(t, proxyProc.Stop())
|
||||
}()
|
||||
|
||||
proxyCtl, err := e2e.NewEtcdctl(e2e.DefaultConfig(), []string{proxyClientURL})
|
||||
proxyCtl, err := e2e.NewEtcdctl(e2e.ClientConfig{}, []string{proxyClientURL})
|
||||
require.NoError(t, err)
|
||||
err = proxyCtl.Put(ctx, "k1", "v1", config.PutOptions{})
|
||||
require.NoError(t, err)
|
||||
|
@ -114,13 +114,13 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) {
|
||||
snapshotCount := 10
|
||||
epc := runEtcdAndCreateSnapshot(t, e2e.LastVersion, lastReleaseData, snapshotCount)
|
||||
oldMemberDataDir := epc.Procs[0].Config().DataDirPath
|
||||
cc1, err := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
|
||||
cc1, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
assert.NoError(t, err)
|
||||
members1 := addAndRemoveKeysAndMembers(ctx, t, cc1, snapshotCount)
|
||||
assert.NoError(t, epc.Close())
|
||||
epc = runEtcdAndCreateSnapshot(t, e2e.CurrentVersion, currentReleaseData, snapshotCount)
|
||||
newMemberDataDir := epc.Procs[0].Config().DataDirPath
|
||||
cc2, err := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
|
||||
cc2, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
assert.NoError(t, err)
|
||||
members2 := addAndRemoveKeysAndMembers(ctx, t, cc2, snapshotCount)
|
||||
assert.NoError(t, epc.Close())
|
||||
@ -151,7 +151,7 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
|
||||
}
|
||||
epc := runEtcdAndCreateSnapshot(t, e2e.LastVersion, dataDir, 10)
|
||||
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
|
||||
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
assert.NoError(t, err)
|
||||
|
||||
lastReleaseGetResponse, err := cc.Get(ctx, "", config.GetOptions{Prefix: true})
|
||||
@ -168,7 +168,7 @@ func TestV2DeprecationSnapshotRecover(t *testing.T) {
|
||||
epc, err = e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithConfig(cfg))
|
||||
assert.NoError(t, err)
|
||||
|
||||
cc, err = e2e.NewEtcdctl(epc.Cfg, epc.EndpointsV3())
|
||||
cc, err = e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
|
||||
assert.NoError(t, err)
|
||||
currentReleaseGetResponse, err := cc.Get(ctx, "", config.GetOptions{Prefix: true})
|
||||
assert.NoError(t, err)
|
||||
|
@ -113,7 +113,7 @@ func testV3CurlPutGet(cx ctlCtx) {
|
||||
if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/range"), Value: string(rangeData), Expected: expectGet}); err != nil {
|
||||
cx.t.Fatalf("failed testV3CurlPutGet get with curl using prefix (%s) (%v)", p, err)
|
||||
}
|
||||
if cx.cfg.ClientTLS == e2e.ClientTLSAndNonTLS {
|
||||
if cx.cfg.Client.ConnectionType == e2e.ClientTLSAndNonTLS {
|
||||
if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/range"), Value: string(rangeData), Expected: expectGet, IsTLS: true}); err != nil {
|
||||
cx.t.Fatalf("failed testV3CurlPutGet get with curl using prefix (%s) (%v)", p, err)
|
||||
}
|
||||
|
@ -44,6 +44,13 @@ const (
|
||||
ClientTLSAndNonTLS
|
||||
)
|
||||
|
||||
type ClientConfig struct {
|
||||
ConnectionType ClientConnType
|
||||
CertAuthority bool
|
||||
AutoTLS bool
|
||||
RevokeCerts bool
|
||||
}
|
||||
|
||||
// allow alphanumerics, underscores and dashes
|
||||
var testNameCleanRegex = regexp.MustCompile(`[^a-zA-Z0-9 \-_]+`)
|
||||
|
||||
@ -60,20 +67,20 @@ func NewConfigAutoTLS() *EtcdProcessClusterConfig {
|
||||
|
||||
func NewConfigTLS() *EtcdProcessClusterConfig {
|
||||
return NewConfig(
|
||||
WithClientTLS(ClientTLS),
|
||||
WithClientConnType(ClientTLS),
|
||||
WithIsPeerTLS(true),
|
||||
)
|
||||
}
|
||||
|
||||
func NewConfigClientTLS() *EtcdProcessClusterConfig {
|
||||
return NewConfig(WithClientTLS(ClientTLS))
|
||||
return NewConfig(WithClientConnType(ClientTLS))
|
||||
}
|
||||
|
||||
func NewConfigClientAutoTLS() *EtcdProcessClusterConfig {
|
||||
return NewConfig(
|
||||
WithClusterSize(1),
|
||||
WithIsClientAutoTLS(true),
|
||||
WithClientTLS(ClientTLS),
|
||||
WithClientAutoTLS(true),
|
||||
WithClientConnType(ClientTLS),
|
||||
)
|
||||
}
|
||||
|
||||
@ -86,16 +93,16 @@ func NewConfigPeerTLS() *EtcdProcessClusterConfig {
|
||||
func NewConfigClientTLSCertAuth() *EtcdProcessClusterConfig {
|
||||
return NewConfig(
|
||||
WithClusterSize(1),
|
||||
WithClientTLS(ClientTLS),
|
||||
WithClientCertAuthEnabled(true),
|
||||
WithClientConnType(ClientTLS),
|
||||
WithClientCertAuthority(true),
|
||||
)
|
||||
}
|
||||
|
||||
func NewConfigClientTLSCertAuthWithNoCN() *EtcdProcessClusterConfig {
|
||||
return NewConfig(
|
||||
WithClusterSize(1),
|
||||
WithClientTLS(ClientTLS),
|
||||
WithClientCertAuthEnabled(true),
|
||||
WithClientConnType(ClientTLS),
|
||||
WithClientCertAuthority(true),
|
||||
WithNoCN(true),
|
||||
)
|
||||
}
|
||||
@ -142,13 +149,10 @@ type EtcdProcessClusterConfig struct {
|
||||
|
||||
SnapshotCount int // default is 10000
|
||||
|
||||
ClientTLS ClientConnType
|
||||
ClientCertAuthEnabled bool
|
||||
IsPeerTLS bool
|
||||
IsPeerAutoTLS bool
|
||||
IsClientAutoTLS bool
|
||||
IsClientCRL bool
|
||||
NoCN bool
|
||||
Client ClientConfig
|
||||
IsPeerTLS bool
|
||||
IsPeerAutoTLS bool
|
||||
NoCN bool
|
||||
|
||||
CipherSuites []string
|
||||
|
||||
@ -226,12 +230,12 @@ func WithBasePort(port int) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.BasePort = port }
|
||||
}
|
||||
|
||||
func WithClientTLS(clientTLS ClientConnType) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.ClientTLS = clientTLS }
|
||||
func WithClientConnType(clientConnType ClientConnType) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.Client.ConnectionType = clientConnType }
|
||||
}
|
||||
|
||||
func WithClientCertAuthEnabled(enabled bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.ClientCertAuthEnabled = enabled }
|
||||
func WithClientCertAuthority(enabled bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.Client.CertAuthority = enabled }
|
||||
}
|
||||
|
||||
func WithIsPeerTLS(isPeerTLS bool) EPClusterOption {
|
||||
@ -242,12 +246,12 @@ func WithIsPeerAutoTLS(isPeerAutoTLS bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.IsPeerAutoTLS = isPeerAutoTLS }
|
||||
}
|
||||
|
||||
func WithIsClientAutoTLS(isClientAutoTLS bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.IsClientAutoTLS = isClientAutoTLS }
|
||||
func WithClientAutoTLS(isClientAutoTLS bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.Client.AutoTLS = isClientAutoTLS }
|
||||
}
|
||||
|
||||
func WithIsClientCRL(isClientCRL bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.IsClientCRL = isClientCRL }
|
||||
func WithClientRevokeCerts(isClientCRL bool) EPClusterOption {
|
||||
return func(c *EtcdProcessClusterConfig) { c.Client.RevokeCerts = isClientCRL }
|
||||
}
|
||||
|
||||
func WithNoCN(noCN bool) EPClusterOption {
|
||||
@ -374,7 +378,7 @@ func StartEtcdProcessCluster(ctx context.Context, epc *EtcdProcessCluster, cfg *
|
||||
}
|
||||
|
||||
func (cfg *EtcdProcessClusterConfig) ClientScheme() string {
|
||||
if cfg.ClientTLS == ClientTLS {
|
||||
if cfg.Client.ConnectionType == ClientTLS {
|
||||
return "https"
|
||||
}
|
||||
return "http"
|
||||
@ -426,7 +430,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
||||
port := cfg.BasePort + 5*i
|
||||
curlHost := fmt.Sprintf("localhost:%d", port)
|
||||
|
||||
switch cfg.ClientTLS {
|
||||
switch cfg.Client.ConnectionType {
|
||||
case ClientNonTLS, ClientTLS:
|
||||
curl = (&url.URL{Scheme: cfg.ClientScheme(), Host: curlHost}).String()
|
||||
curls = []string{curl}
|
||||
@ -561,6 +565,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
||||
Args: args,
|
||||
EnvVars: envVars,
|
||||
TlsArgs: cfg.TlsArgs(),
|
||||
Client: cfg.Client,
|
||||
DataDirPath: dataDirPath,
|
||||
KeepDataDir: cfg.KeepDataDir,
|
||||
Name: name,
|
||||
@ -573,8 +578,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
|
||||
}
|
||||
|
||||
func (cfg *EtcdProcessClusterConfig) TlsArgs() (args []string) {
|
||||
if cfg.ClientTLS != ClientNonTLS {
|
||||
if cfg.IsClientAutoTLS {
|
||||
if cfg.Client.ConnectionType != ClientNonTLS {
|
||||
if cfg.Client.AutoTLS {
|
||||
args = append(args, "--auto-tls")
|
||||
} else {
|
||||
tlsClientArgs := []string{
|
||||
@ -584,7 +589,7 @@ func (cfg *EtcdProcessClusterConfig) TlsArgs() (args []string) {
|
||||
}
|
||||
args = append(args, tlsClientArgs...)
|
||||
|
||||
if cfg.ClientCertAuthEnabled {
|
||||
if cfg.Client.CertAuthority {
|
||||
args = append(args, "--client-cert-auth")
|
||||
}
|
||||
}
|
||||
@ -603,7 +608,7 @@ func (cfg *EtcdProcessClusterConfig) TlsArgs() (args []string) {
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.IsClientCRL {
|
||||
if cfg.Client.RevokeCerts {
|
||||
args = append(args, "--client-crl-file", CrlPath, "--client-cert-auth")
|
||||
}
|
||||
|
||||
@ -784,7 +789,7 @@ func (epc *EtcdProcessCluster) Stop() (err error) {
|
||||
}
|
||||
|
||||
func (epc *EtcdProcessCluster) Client(opts ...config.ClientOption) *EtcdctlV3 {
|
||||
etcdctl, err := NewEtcdctl(epc.Cfg, epc.EndpointsV3(), opts...)
|
||||
etcdctl, err := NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3(), opts...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@ -817,3 +822,69 @@ func findMemberIDByEndpoint(members []*etcdserverpb.Member, endpoint string) (ui
|
||||
|
||||
return 0, fmt.Errorf("member not found")
|
||||
}
|
||||
|
||||
// WaitLeader returns index of the member in c.Members() that is leader
|
||||
// or fails the test (if not established in 30s).
|
||||
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
return epc.WaitMembersForLeader(ctx, t, epc.Procs)
|
||||
}
|
||||
|
||||
// WaitMembersForLeader waits until given members agree on the same leader,
|
||||
// and returns its 'index' in the 'membs' list
|
||||
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int {
|
||||
cc := epc.Client()
|
||||
|
||||
// ensure leader is up via linearizable get
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("WaitMembersForLeader timeout")
|
||||
default:
|
||||
}
|
||||
_, err := cc.Get(ctx, "0", config.GetOptions{Timeout: 10*config.TickDuration + time.Second})
|
||||
if err == nil || strings.Contains(err.Error(), "Key not found") {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
leaders := make(map[uint64]struct{})
|
||||
members := make(map[uint64]int)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("WaitMembersForLeader timeout")
|
||||
default:
|
||||
}
|
||||
for i := range membs {
|
||||
resp, err := membs[i].Client().Status(ctx)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "connection refused") {
|
||||
// if member[i] has stopped
|
||||
continue
|
||||
} else {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
members[resp[0].Header.MemberId] = i
|
||||
leaders[resp[0].Leader] = struct{}{}
|
||||
}
|
||||
// members agree on the same leader
|
||||
if len(leaders) == 1 {
|
||||
break
|
||||
}
|
||||
leaders = make(map[uint64]struct{})
|
||||
members = make(map[uint64]int)
|
||||
time.Sleep(10 * config.TickDuration)
|
||||
}
|
||||
for l := range leaders {
|
||||
if index, ok := members[l]; ok {
|
||||
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
|
||||
return index
|
||||
}
|
||||
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
|
||||
}
|
||||
t.Fatal("impossible path of execution")
|
||||
return -1
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/expect"
|
||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||
)
|
||||
|
||||
type proxyEtcdProcess struct {
|
||||
@ -99,6 +100,14 @@ func (p *proxyEtcdProcess) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *proxyEtcdProcess) Client(opts ...config.ClientOption) *EtcdctlV3 {
|
||||
etcdctl, err := NewEtcdctl(p.etcdProc.Config().Client, p.etcdProc.EndpointsV3(), opts...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return etcdctl
|
||||
}
|
||||
|
||||
func (p *proxyEtcdProcess) Logs() LogsExpect {
|
||||
return p.etcdProc.Logs()
|
||||
}
|
||||
|
@ -58,12 +58,12 @@ func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method st
|
||||
)
|
||||
if req.MetricsURLScheme != "https" {
|
||||
if req.IsTLS {
|
||||
if cfg.ClientTLS != ClientTLSAndNonTLS {
|
||||
if cfg.Client.ConnectionType != ClientTLSAndNonTLS {
|
||||
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
|
||||
}
|
||||
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
|
||||
acurl = ToTLS(member.Config().Acurl)
|
||||
} else if cfg.ClientTLS == ClientTLS {
|
||||
} else if cfg.Client.ConnectionType == ClientTLS {
|
||||
if !cfg.NoCN {
|
||||
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
|
||||
} else {
|
||||
|
@ -17,14 +17,11 @@ package e2e
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/testutil"
|
||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||
intf "go.etcd.io/etcd/tests/v3/framework/interfaces"
|
||||
"go.etcd.io/etcd/tests/v3/framework/testutils"
|
||||
)
|
||||
|
||||
type e2eRunner struct{}
|
||||
@ -63,13 +60,13 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config.
|
||||
|
||||
switch cfg.ClientTLS {
|
||||
case config.NoTLS:
|
||||
e2eConfig.ClientTLS = ClientNonTLS
|
||||
e2eConfig.Client.ConnectionType = ClientNonTLS
|
||||
case config.AutoTLS:
|
||||
e2eConfig.IsClientAutoTLS = true
|
||||
e2eConfig.ClientTLS = ClientTLS
|
||||
e2eConfig.Client.AutoTLS = true
|
||||
e2eConfig.Client.ConnectionType = ClientTLS
|
||||
case config.ManualTLS:
|
||||
e2eConfig.IsClientAutoTLS = false
|
||||
e2eConfig.ClientTLS = ClientTLS
|
||||
e2eConfig.Client.AutoTLS = false
|
||||
e2eConfig.Client.ConnectionType = ClientTLS
|
||||
default:
|
||||
t.Fatalf("ClientTLS config %q not supported", cfg.ClientTLS)
|
||||
}
|
||||
@ -99,7 +96,7 @@ type e2eCluster struct {
|
||||
}
|
||||
|
||||
func (c *e2eCluster) Client(opts ...config.ClientOption) (intf.Client, error) {
|
||||
etcdctl, err := NewEtcdctl(c.Cfg, c.EndpointsV3(), opts...)
|
||||
etcdctl, err := NewEtcdctl(c.Cfg.Client, c.EndpointsV3(), opts...)
|
||||
return e2eClient{etcdctl}, err
|
||||
}
|
||||
|
||||
@ -114,72 +111,6 @@ func (c *e2eCluster) Members() (ms []intf.Member) {
|
||||
return ms
|
||||
}
|
||||
|
||||
// WaitLeader returns index of the member in c.Members() that is leader
|
||||
// or fails the test (if not established in 30s).
|
||||
func (c *e2eCluster) WaitLeader(t testing.TB) int {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
return c.WaitMembersForLeader(ctx, t, c.Members())
|
||||
}
|
||||
|
||||
// WaitMembersForLeader waits until given members agree on the same leader,
|
||||
// and returns its 'index' in the 'membs' list
|
||||
func (c *e2eCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []intf.Member) int {
|
||||
cc := testutils.MustClient(c.Client())
|
||||
|
||||
// ensure leader is up via linearizable get
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("WaitMembersForLeader timeout")
|
||||
default:
|
||||
}
|
||||
_, err := cc.Get(ctx, "0", config.GetOptions{Timeout: 10*config.TickDuration + time.Second})
|
||||
if err == nil || strings.Contains(err.Error(), "Key not found") {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
leaders := make(map[uint64]struct{})
|
||||
members := make(map[uint64]int)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("WaitMembersForLeader timeout")
|
||||
default:
|
||||
}
|
||||
for i := range membs {
|
||||
resp, err := membs[i].Client().Status(ctx)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "connection refused") {
|
||||
// if member[i] has stopped
|
||||
continue
|
||||
} else {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
members[resp[0].Header.MemberId] = i
|
||||
leaders[resp[0].Leader] = struct{}{}
|
||||
}
|
||||
// members agree on the same leader
|
||||
if len(leaders) == 1 {
|
||||
break
|
||||
}
|
||||
leaders = make(map[uint64]struct{})
|
||||
members = make(map[uint64]int)
|
||||
time.Sleep(10 * config.TickDuration)
|
||||
}
|
||||
for l := range leaders {
|
||||
if index, ok := members[l]; ok {
|
||||
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
|
||||
return index
|
||||
}
|
||||
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
|
||||
}
|
||||
t.Fatal("impossible path of execution")
|
||||
return -1
|
||||
}
|
||||
|
||||
type e2eClient struct {
|
||||
*EtcdctlV3
|
||||
}
|
||||
@ -190,7 +121,7 @@ type e2eMember struct {
|
||||
}
|
||||
|
||||
func (m e2eMember) Client() intf.Client {
|
||||
etcdctl, err := NewEtcdctl(m.Cfg, m.EndpointsV3())
|
||||
etcdctl, err := NewEtcdctl(m.Cfg.Client, m.EndpointsV3())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/pkg/v3/expect"
|
||||
"go.etcd.io/etcd/tests/v3/framework/config"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -39,6 +40,7 @@ type EtcdProcess interface {
|
||||
EndpointsV2() []string
|
||||
EndpointsV3() []string
|
||||
EndpointsMetrics() []string
|
||||
Client(opts ...config.ClientOption) *EtcdctlV3
|
||||
|
||||
Wait() error
|
||||
Start(ctx context.Context) error
|
||||
@ -69,6 +71,7 @@ type EtcdServerProcessConfig struct {
|
||||
TlsArgs []string
|
||||
EnvVars map[string]string
|
||||
|
||||
Client ClientConfig
|
||||
DataDirPath string
|
||||
KeepDataDir bool
|
||||
|
||||
@ -100,6 +103,14 @@ func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cf
|
||||
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
|
||||
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }
|
||||
|
||||
func (epc *EtcdServerProcess) Client(opts ...config.ClientOption) *EtcdctlV3 {
|
||||
etcdctl, err := NewEtcdctl(epc.Config().Client, epc.EndpointsV3(), opts...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return etcdctl
|
||||
}
|
||||
|
||||
func (ep *EtcdServerProcess) Start(ctx context.Context) error {
|
||||
ep.donec = make(chan struct{})
|
||||
if ep.proc != nil {
|
||||
|
@ -32,12 +32,12 @@ import (
|
||||
)
|
||||
|
||||
type EtcdctlV3 struct {
|
||||
cfg *EtcdProcessClusterConfig
|
||||
cfg ClientConfig
|
||||
endpoints []string
|
||||
authConfig clientv3.AuthConfig
|
||||
}
|
||||
|
||||
func NewEtcdctl(cfg *EtcdProcessClusterConfig, endpoints []string, opts ...config.ClientOption) (*EtcdctlV3, error) {
|
||||
func NewEtcdctl(cfg ClientConfig, endpoints []string, opts ...config.ClientOption) (*EtcdctlV3, error) {
|
||||
ctl := &EtcdctlV3{
|
||||
cfg: cfg,
|
||||
endpoints: endpoints,
|
||||
@ -308,11 +308,11 @@ func (ctl *EtcdctlV3) cmdArgs(args ...string) []string {
|
||||
|
||||
func (ctl *EtcdctlV3) flags() map[string]string {
|
||||
fmap := make(map[string]string)
|
||||
if ctl.cfg.ClientTLS == ClientTLS {
|
||||
if ctl.cfg.IsClientAutoTLS {
|
||||
if ctl.cfg.ConnectionType == ClientTLS {
|
||||
if ctl.cfg.AutoTLS {
|
||||
fmap["insecure-transport"] = "false"
|
||||
fmap["insecure-skip-tls-verify"] = "true"
|
||||
} else if ctl.cfg.IsClientCRL {
|
||||
} else if ctl.cfg.RevokeCerts {
|
||||
fmap["cacert"] = CaPath
|
||||
fmap["cert"] = RevokedCertPath
|
||||
fmap["key"] = RevokedPrivateKeyPath
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
@ -32,24 +33,25 @@ import (
|
||||
|
||||
var (
|
||||
KillFailpoint Failpoint = killFailpoint{}
|
||||
DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag}
|
||||
DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag}
|
||||
BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil}
|
||||
AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil}
|
||||
RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil}
|
||||
RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil}
|
||||
BackendBeforePreCommitHookPanic Failpoint = goFailpoint{"backend/commitBeforePreCommitHook", "panic", nil}
|
||||
BackendAfterPreCommitHookPanic Failpoint = goFailpoint{"backend/commitAfterPreCommitHook", "panic", nil}
|
||||
BackendBeforeStartDBTxnPanic Failpoint = goFailpoint{"backend/beforeStartDBTxn", "panic", nil}
|
||||
BackendAfterStartDBTxnPanic Failpoint = goFailpoint{"backend/afterStartDBTxn", "panic", nil}
|
||||
BackendBeforeWritebackBufPanic Failpoint = goFailpoint{"backend/beforeWritebackBuf", "panic", nil}
|
||||
BackendAfterWritebackBufPanic Failpoint = goFailpoint{"backend/afterWritebackBuf", "panic", nil}
|
||||
CompactBeforeCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitScheduledCompact", "panic", triggerCompact}
|
||||
CompactAfterCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitScheduledCompact", "panic", triggerCompact}
|
||||
CompactBeforeSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeSetFinishedCompact", "panic", triggerCompact}
|
||||
CompactAfterSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterSetFinishedCompact", "panic", triggerCompact}
|
||||
CompactBeforeCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitBatch", "panic", triggerCompact}
|
||||
CompactAfterCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitBatch", "panic", triggerCompact}
|
||||
DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag, AnyMember}
|
||||
DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag, AnyMember}
|
||||
BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil, AnyMember}
|
||||
AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil, AnyMember}
|
||||
RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil, AnyMember}
|
||||
RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil, AnyMember}
|
||||
BackendBeforePreCommitHookPanic Failpoint = goFailpoint{"backend/commitBeforePreCommitHook", "panic", nil, AnyMember}
|
||||
BackendAfterPreCommitHookPanic Failpoint = goFailpoint{"backend/commitAfterPreCommitHook", "panic", nil, AnyMember}
|
||||
BackendBeforeStartDBTxnPanic Failpoint = goFailpoint{"backend/beforeStartDBTxn", "panic", nil, AnyMember}
|
||||
BackendAfterStartDBTxnPanic Failpoint = goFailpoint{"backend/afterStartDBTxn", "panic", nil, AnyMember}
|
||||
BackendBeforeWritebackBufPanic Failpoint = goFailpoint{"backend/beforeWritebackBuf", "panic", nil, AnyMember}
|
||||
BackendAfterWritebackBufPanic Failpoint = goFailpoint{"backend/afterWritebackBuf", "panic", nil, AnyMember}
|
||||
CompactBeforeCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitScheduledCompact", "panic", triggerCompact, AnyMember}
|
||||
CompactAfterCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitScheduledCompact", "panic", triggerCompact, AnyMember}
|
||||
CompactBeforeSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeSetFinishedCompact", "panic", triggerCompact, AnyMember}
|
||||
CompactAfterSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterSetFinishedCompact", "panic", triggerCompact, AnyMember}
|
||||
CompactBeforeCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitBatch", "panic", triggerCompact, AnyMember}
|
||||
CompactAfterCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitBatch", "panic", triggerCompact, AnyMember}
|
||||
RaftBeforeLeaderSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeLeaderSend", "panic", nil, Leader}
|
||||
RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{
|
||||
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
|
||||
RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic,
|
||||
@ -59,25 +61,25 @@ var (
|
||||
CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
|
||||
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic,
|
||||
CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic,
|
||||
RaftBeforeLeaderSendPanic,
|
||||
}}
|
||||
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
|
||||
raftBeforeLeaderSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeLeaderSend", "panic", nil}
|
||||
raftBeforeApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeApplySnap", "panic", nil}
|
||||
raftAfterApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterApplySnap", "panic", nil}
|
||||
raftAfterWALReleasePanic Failpoint = goFailpoint{"etcdserver/raftAfterWALRelease", "panic", nil}
|
||||
raftBeforeFollowerSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeFollowerSend", "panic", nil}
|
||||
raftBeforeSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeSaveSnap", "panic", nil}
|
||||
raftAfterSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterSaveSnap", "panic", nil}
|
||||
raftBeforeApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeApplySnap", "panic", nil, AnyMember}
|
||||
raftAfterApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterApplySnap", "panic", nil, AnyMember}
|
||||
raftAfterWALReleasePanic Failpoint = goFailpoint{"etcdserver/raftAfterWALRelease", "panic", nil, AnyMember}
|
||||
raftBeforeFollowerSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeFollowerSend", "panic", nil, AnyMember}
|
||||
raftBeforeSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeSaveSnap", "panic", nil, AnyMember}
|
||||
raftAfterSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterSaveSnap", "panic", nil, AnyMember}
|
||||
)
|
||||
|
||||
type Failpoint interface {
|
||||
Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error
|
||||
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error
|
||||
Name() string
|
||||
}
|
||||
|
||||
type killFailpoint struct{}
|
||||
|
||||
func (f killFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
err := member.Kill()
|
||||
if err != nil {
|
||||
@ -102,10 +104,26 @@ type goFailpoint struct {
|
||||
failpoint string
|
||||
payload string
|
||||
trigger func(ctx context.Context, member e2e.EtcdProcess) error
|
||||
target failpointTarget
|
||||
}
|
||||
|
||||
func (f goFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
member := clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
type failpointTarget string
|
||||
|
||||
const (
|
||||
AnyMember failpointTarget = "AnyMember"
|
||||
Leader failpointTarget = "Leader"
|
||||
)
|
||||
|
||||
func (f goFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
var member e2e.EtcdProcess
|
||||
switch f.target {
|
||||
case AnyMember:
|
||||
member = clus.Procs[rand.Int()%len(clus.Procs)]
|
||||
case Leader:
|
||||
member = clus.Procs[clus.WaitLeader(t)]
|
||||
default:
|
||||
panic("unknown target")
|
||||
}
|
||||
address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort)
|
||||
err := setupGoFailpoint(address, f.failpoint, f.payload)
|
||||
if err != nil {
|
||||
@ -201,10 +219,10 @@ type randomFailpoint struct {
|
||||
failpoints []Failpoint
|
||||
}
|
||||
|
||||
func (f randomFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
|
||||
failpoint := f.failpoints[rand.Int()%len(f.failpoints)]
|
||||
fmt.Printf("Triggering %v failpoint\n", failpoint.Name())
|
||||
return failpoint.Trigger(ctx, clus)
|
||||
t.Logf("Triggering %v failpoint\n", failpoint.Name())
|
||||
return failpoint.Trigger(t, ctx, clus)
|
||||
}
|
||||
|
||||
func (f randomFailpoint) Name() string {
|
||||
|
@ -119,7 +119,7 @@ func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessC
|
||||
failures := 0
|
||||
time.Sleep(config.waitBetweenTriggers)
|
||||
for successes < config.count && failures < config.count {
|
||||
err = config.failpoint.Trigger(ctx, clus)
|
||||
err = config.failpoint.Trigger(t, ctx, clus)
|
||||
if err != nil {
|
||||
t.Logf("Failed to trigger failpoint %q, err: %v\n", config.failpoint.Name(), err)
|
||||
failures++
|
||||
|
Loading…
x
Reference in New Issue
Block a user