Merge pull request #17381 from ivanvc/3.5-backport-wait-leader

[3.5] backport e2e WaitLeader
This commit is contained in:
Marek Siarkowicz 2024-02-08 15:44:54 +01:00 committed by GitHub
commit b2aa3a13e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 116 additions and 26 deletions

View File

@ -110,12 +110,12 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, member e2e.Et
}
t.Run("etcdctl", func(t *testing.T) {
t.Run("v2", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true)
etcdctl := e2e.NewEtcdctl([]string{httpEndpoint}, connType, false, true)
err := etcdctl.Set("a", "1")
assert.NoError(t, err)
})
t.Run("v3", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
etcdctl := e2e.NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
err := etcdctl.Put("a", "1")
assert.NoError(t, err)
})

View File

@ -128,7 +128,7 @@ func TestInPlaceRecovery(t *testing.T) {
//Put some data into the old cluster, so that after recovering from a blank db, the hash diverges.
t.Log("putting 10 keys...")
oldCc := NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false)
oldCc := e2e.NewEtcdctl(epcOld.EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
err := oldCc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
assert.NoError(t, err, "error on put")
@ -154,7 +154,7 @@ func TestInPlaceRecovery(t *testing.T) {
}
})
newCc := NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false)
newCc := e2e.NewEtcdctl(epcNew.EndpointsV3(), e2e.ClientNonTLS, false, false)
assert.NoError(t, err)
wg := sync.WaitGroup{}
@ -211,7 +211,7 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) {
}
})
cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
@ -252,7 +252,7 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
}
})
cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
err := cc.Put(testutil.PickKey(int64(i)), fmt.Sprint(i))
@ -302,7 +302,7 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
t.Log("putting 200 values to the identical key...")
cc := NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(epc.EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 200; i++ {
err = cc.Put("key", fmt.Sprint(i))

View File

@ -49,7 +49,7 @@ func TestMemberReplace(t *testing.T) {
for i := 1; i < len(epc.Procs); i++ {
endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...)
}
cc := NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false)
cc := e2e.NewEtcdctl(endpoints, e2e.ClientNonTLS, false, false)
memberID, found, err := getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)

View File

@ -362,7 +362,7 @@ func doHealthCheckAndVerify(t *testing.T, client *http.Client, url string, expec
func triggerNoSpaceAlarm(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
buf := strings.Repeat("b", os.Getpagesize())
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
for {
if err := etcdctl.Put("foo", buf); err != nil {
if !strings.Contains(err.Error(), "etcdserver: mvcc: database space exceeded") {
@ -377,7 +377,7 @@ func triggerSlowApply(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCl
// the following proposal will be blocked at applying stage
// because when apply index < committed index, linearizable read would time out.
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "beforeApplyOneEntryNormal", fmt.Sprintf(`sleep("%s")`, duration)))
etcdctl := NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[1].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl.Put("foo", "bar")
}
@ -391,12 +391,12 @@ func blackhole(_ context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _
func triggerRaftLoopDeadLock(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "raftBeforeSaveWaitWalSync", fmt.Sprintf(`sleep("%s")`, duration)))
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl.Put("foo", "bar")
}
func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, duration time.Duration) {
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
_, err := etcdctl.UserAdd("root", "root")
require.NoError(t, err)
@ -409,7 +409,7 @@ func triggerSlowBufferWriteBackWithAuth(ctx context.Context, t *testing.T, clus
}
func triggerCorrupt(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, _ time.Duration) {
etcdctl := NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
etcdctl := e2e.NewEtcdctl(clus.Procs[0].EndpointsV3(), e2e.ClientNonTLS, false, false)
for i := 0; i < 10; i++ {
require.NoError(t, etcdctl.Put("foo", "bar"))
}

View File

@ -116,7 +116,7 @@ func fillEtcdWithData(ctx context.Context, c *clientv3.Client, dbSize int) error
return g.Wait()
}
func getMemberIdByName(ctx context.Context, c *Etcdctl, name string) (id uint64, found bool, err error) {
func getMemberIdByName(ctx context.Context, c *e2e.Etcdctl, name string) (id uint64, found bool, err error) {
resp, err := c.MemberList()
if err != nil {
return 0, false, err

View File

@ -15,6 +15,7 @@
package e2e
import (
"context"
"fmt"
"net/url"
"os"
@ -576,3 +577,71 @@ func (epc *EtcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
}
return ret
}
// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.Procs)
}
// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int {
cc := NewEtcdctl(epc.EndpointsV3(), epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2)
// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get("0")
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
t.Logf("WaitMembersForLeader Get err: %v", err)
}
leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Etcdctl(epc.Cfg.ClientTLS, epc.Cfg.IsClientAutoTLS, epc.Cfg.EnableV2).Status()
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
// From main branch 10 * config.TickDuration (10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}

View File

@ -58,6 +58,8 @@ type EtcdProcess interface {
PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints
IsRunning() bool
Etcdctl(connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl
}
type LogsExpect interface {
@ -249,6 +251,10 @@ func (ep *EtcdServerProcess) IsRunning() bool {
return false
}
func (ep *EtcdServerProcess) Etcdctl(connType ClientConnType, isAutoTLS, v2 bool) *Etcdctl {
return NewEtcdctl(ep.EndpointsV3(), connType, isAutoTLS, v2)
}
type BinaryFailpoints struct {
member EtcdProcess
availableCache map[string]string

View File

@ -20,18 +20,17 @@ import (
"strings"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/integration"
)
type Etcdctl struct {
connType e2e.ClientConnType
connType ClientConnType
isAutoTLS bool
endpoints []string
v2 bool
}
func NewEtcdctl(endpoints []string, connType e2e.ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl {
func NewEtcdctl(endpoints []string, connType ClientConnType, isAutoTLS bool, v2 bool) *Etcdctl {
return &Etcdctl{
endpoints: endpoints,
connType: connType,
@ -52,7 +51,7 @@ func (ctl *Etcdctl) Put(key, value string) error {
}
args := ctl.cmdArgs()
args = append(args, "put", key, value)
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
return SpawnWithExpectWithEnv(args, ctl.env(), "OK")
}
func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
@ -61,7 +60,7 @@ func (ctl *Etcdctl) PutWithAuth(key, value, username, password string) error {
}
args := ctl.cmdArgs()
args = append(args, "--user", fmt.Sprintf("%s:%s", username, password), "put", key, value)
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "OK")
return SpawnWithExpectWithEnv(args, ctl.env(), "OK")
}
func (ctl *Etcdctl) Set(key, value string) error {
@ -70,7 +69,7 @@ func (ctl *Etcdctl) Set(key, value string) error {
}
args := ctl.cmdArgs()
args = append(args, "set", key, value)
lines, err := e2e.RunUtilCompletion(args, ctl.env())
lines, err := RunUtilCompletion(args, ctl.env())
if err != nil {
return err
}
@ -83,7 +82,7 @@ func (ctl *Etcdctl) Set(key, value string) error {
func (ctl *Etcdctl) AuthEnable() error {
args := ctl.cmdArgs("auth", "enable")
return e2e.SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled")
return SpawnWithExpectWithEnv(args, ctl.env(), "Authentication Enabled")
}
func (ctl *Etcdctl) UserGrantRole(user string, role string) (*clientv3.AuthUserGrantRoleResponse, error) {
@ -148,12 +147,28 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
panic("Unsupported method for v2")
}
args := ctl.cmdArgs("compact", fmt.Sprint(rev))
return nil, e2e.SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
return nil, SpawnWithExpectWithEnv(args, ctl.env(), fmt.Sprintf("compacted revision %v", rev))
}
func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Status *clientv3.StatusResponse
}
err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status")
if err != nil {
return nil, err
}
resp := make([]*clientv3.StatusResponse, len(epStatus))
for i, e := range epStatus {
resp[i] = e.Status
}
return resp, err
}
func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error {
args = append(args, "-w", "json")
cmd, err := e2e.SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env())
cmd, err := SpawnCmd(append(ctl.cmdArgs(), args...), ctl.env())
if err != nil {
return err
}
@ -165,7 +180,7 @@ func (ctl *Etcdctl) spawnJsonCmd(output interface{}, args ...string) error {
}
func (ctl *Etcdctl) cmdArgs(args ...string) []string {
cmdArgs := []string{e2e.CtlBinPath}
cmdArgs := []string{CtlBinPath}
for k, v := range ctl.flags() {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
}
@ -176,13 +191,13 @@ func (ctl *Etcdctl) flags() map[string]string {
fmap := make(map[string]string)
if ctl.v2 {
fmap["no-sync"] = "true"
if ctl.connType == e2e.ClientTLS {
if ctl.connType == ClientTLS {
fmap["ca-file"] = integration.TestTLSInfo.TrustedCAFile
fmap["cert-file"] = integration.TestTLSInfo.CertFile
fmap["key-file"] = integration.TestTLSInfo.KeyFile
}
} else {
if ctl.connType == e2e.ClientTLS {
if ctl.connType == ClientTLS {
if ctl.isAutoTLS {
fmap["insecure-transport"] = "false"
fmap["insecure-skip-tls-verify"] = "true"