Merge pull request #5934 from heyitsanthony/fix-publish-race

e2e: wait for every etcd server to publish to cluster
This commit is contained in:
Anthony Romano 2016-07-13 19:22:08 -07:00 committed by GitHub
commit 35d379b052
2 changed files with 46 additions and 50 deletions

View File

@ -258,10 +258,6 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue
cfg2.forceNewCluster = true cfg2.forceNewCluster = true
epc2 := setupEtcdctlTest(t, &cfg2, false) epc2 := setupEtcdctlTest(t, &cfg2, false)
if _, err := epc2.procs[0].proc.Expect("etcdserver: published"); err != nil {
t.Fatal(err)
}
// check if backup went through correctly // check if backup went through correctly
if err := etcdctlGet(epc2, "foo1", "bar", false); err != nil { if err := etcdctlGet(epc2, "foo1", "bar", false); err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -345,18 +345,8 @@ func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
func (epc *etcdProcessCluster) Start() (err error) { func (epc *etcdProcessCluster) Start() (err error) {
readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize) readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize)
readyStr := "enabled capabilities for version"
for i := range epc.procs { for i := range epc.procs {
go func(etcdp *etcdProcess) { go func(n int) { readyC <- epc.procs[n].waitReady() }(i)
etcdp.donec = make(chan struct{})
rs := readyStr
if etcdp.cfg.isProxy {
rs = "httpproxy: endpoints found"
}
_, err := etcdp.proc.Expect(rs)
readyC <- err
close(etcdp.donec)
}(epc.procs[i])
} }
for range epc.procs { for range epc.procs {
if err := <-readyC; err != nil { if err := <-readyC; err != nil {
@ -379,28 +369,6 @@ func (epc *etcdProcessCluster) RestartAll() error {
return epc.Start() return epc.Start()
} }
func (epr *etcdProcess) Restart() error {
proc, err := newEtcdProcess(epr.cfg)
if err != nil {
epr.Stop()
return err
}
*epr = *proc
readyStr := "enabled capabilities for version"
if proc.cfg.isProxy {
readyStr = "httpproxy: endpoints found"
}
if _, err = proc.proc.Expect(readyStr); err != nil {
epr.Stop()
return err
}
close(proc.donec)
return nil
}
func (epc *etcdProcessCluster) StopAll() (err error) { func (epc *etcdProcessCluster) StopAll() (err error) {
for _, p := range epc.procs { for _, p := range epc.procs {
if p == nil { if p == nil {
@ -418,19 +386,6 @@ func (epc *etcdProcessCluster) StopAll() (err error) {
return err return err
} }
func (epr *etcdProcess) Stop() error {
if epr == nil {
return nil
}
if err := epr.proc.Stop(); err != nil {
return err
}
<-epr.donec
return nil
}
func (epc *etcdProcessCluster) Close() error { func (epc *etcdProcessCluster) Close() error {
err := epc.StopAll() err := epc.StopAll()
for _, p := range epc.procs { for _, p := range epc.procs {
@ -439,6 +394,51 @@ func (epc *etcdProcessCluster) Close() error {
return err return err
} }
func (ep *etcdProcess) Restart() error {
newEp, err := newEtcdProcess(ep.cfg)
if err != nil {
ep.Stop()
return err
}
*ep = *newEp
if err = ep.waitReady(); err != nil {
ep.Stop()
return err
}
return nil
}
func (ep *etcdProcess) Stop() error {
if ep == nil {
return nil
}
if err := ep.proc.Stop(); err != nil {
return err
}
<-ep.donec
return nil
}
func (ep *etcdProcess) waitReady() error {
readyStrs := []string{"enabled capabilities for version", "published"}
if ep.cfg.isProxy {
readyStrs = []string{"httpproxy: endpoints found"}
}
c := 0
matchSet := func(l string) bool {
for _, s := range readyStrs {
if strings.Contains(l, s) {
c++
break
}
}
return c == len(readyStrs)
}
_, err := ep.proc.ExpectFunc(matchSet)
close(ep.donec)
return err
}
func spawnCmd(args []string) (*expect.ExpectProcess, error) { func spawnCmd(args []string) (*expect.ExpectProcess, error) {
return expect.NewExpect(args[0], args[1:]...) return expect.NewExpect(args[0], args[1:]...)
} }