mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests/e2e: backport WaitLeader
Add Etcdctl function to etcdProcess interface, and implement it in etcdServerProcess, which is used by WaitMembersForLeader. Define Etcdctl's Status function, that is used by WaitMembersForLeader too. Signed-off-by: Ivan Valdes <ivan@vald.es>
This commit is contained in:
parent
1a6eaca539
commit
9c067a3c36
@ -15,6 +15,7 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
@ -501,3 +502,71 @@ func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// WaitLeader returns index of the member in c.Members() that is leader
|
||||
// or fails the test (if not established in 30s).
|
||||
func (epc *etcdProcessCluster) WaitLeader(t testing.TB) int {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
return epc.WaitMembersForLeader(ctx, t, epc.procs)
|
||||
}
|
||||
|
||||
// WaitMembersForLeader waits until given members agree on the same leader,
|
||||
// and returns its 'index' in the 'membs' list
|
||||
func (epc *etcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []etcdProcess) int {
|
||||
cc := NewEtcdctl(epc.EndpointsV3(), epc.cfg.clientTLS, epc.cfg.isClientAutoTLS, epc.cfg.enableV2)
|
||||
|
||||
// ensure leader is up via linearizable get
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("WaitMembersForLeader timeout")
|
||||
default:
|
||||
}
|
||||
_, err := cc.Get("0")
|
||||
if err == nil || strings.Contains(err.Error(), "Key not found") {
|
||||
break
|
||||
}
|
||||
t.Logf("WaitMembersForLeader Get err: %v", err)
|
||||
}
|
||||
|
||||
leaders := make(map[uint64]struct{})
|
||||
members := make(map[uint64]int)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("WaitMembersForLeader timeout")
|
||||
default:
|
||||
}
|
||||
for i := range membs {
|
||||
resp, err := membs[i].Etcdctl(epc.cfg.clientTLS, epc.cfg.isClientAutoTLS, epc.cfg.enableV2).Status()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "connection refused") {
|
||||
// if member[i] has stopped
|
||||
continue
|
||||
} else {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
members[resp[0].Header.MemberId] = i
|
||||
leaders[resp[0].Leader] = struct{}{}
|
||||
}
|
||||
// members agree on the same leader
|
||||
if len(leaders) == 1 {
|
||||
break
|
||||
}
|
||||
leaders = make(map[uint64]struct{})
|
||||
members = make(map[uint64]int)
|
||||
// From main branch 10 * config.TickDuration (10 * time.Millisecond)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
for l := range leaders {
|
||||
if index, ok := members[l]; ok {
|
||||
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
|
||||
return index
|
||||
}
|
||||
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
|
||||
}
|
||||
t.Fatal("impossible path of execution")
|
||||
return -1
|
||||
}
|
||||
|
@ -56,6 +56,8 @@ type etcdProcess interface {
|
||||
PeerProxy() proxy.Server
|
||||
Failpoints() *BinaryFailpoints
|
||||
IsRunning() bool
|
||||
|
||||
Etcdctl(connType clientConnType, isAutoTLS bool, v2 bool) *Etcdctl
|
||||
}
|
||||
|
||||
type logsExpect interface {
|
||||
@ -223,6 +225,10 @@ func (ep *etcdServerProcess) IsRunning() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (ep *etcdServerProcess) Etcdctl(connType clientConnType, isAutoTLS, v2 bool) *Etcdctl {
|
||||
return NewEtcdctl(ep.EndpointsV3(), connType, isAutoTLS, v2)
|
||||
}
|
||||
|
||||
type BinaryFailpoints struct {
|
||||
member etcdProcess
|
||||
availableCache map[string]string
|
||||
|
@ -141,6 +141,22 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
|
||||
return nil, spawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev))
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
|
||||
var epStatus []*struct {
|
||||
Endpoint string
|
||||
Status *clientv3.StatusResponse
|
||||
}
|
||||
err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := make([]*clientv3.StatusResponse, len(epStatus))
|
||||
for i, e := range epStatus {
|
||||
resp[i] = e.Status
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (ctl *Etcdctl) spawnJsonCmd(output interface{}, expectedOutput string, args ...string) error {
|
||||
args = append(args, "-w", "json")
|
||||
cmd, err := spawnCmd(append(ctl.cmdArgs(), args...))
|
||||
|
Loading…
x
Reference in New Issue
Block a user