diff --git a/functional/tester/cluster.go b/functional/tester/cluster.go index ef2d999c4..8b15c0193 100644 --- a/functional/tester/cluster.go +++ b/functional/tester/cluster.go @@ -24,6 +24,7 @@ import ( "net/url" "path/filepath" "strings" + "sync" "time" "github.com/coreos/etcd/functional/rpcpb" @@ -302,23 +303,40 @@ func (clus *Cluster) Restart() error { } func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error { + var wg sync.WaitGroup + wg.Add(len(clus.agentStreams)) + + errc := make(chan error, len(clus.agentStreams)) for i := range clus.agentStreams { - err := clus.sendOperation(i, op) - if err != nil { - if op == rpcpb.Operation_DestroyEtcdAgent && - strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { - // agent server has already closed; - // so this error is expected - clus.lg.Info( - "successfully destroyed", - zap.String("member", clus.Members[i].EtcdClientEndpoint), - ) - continue - } - return err - } + go func(idx int, o rpcpb.Operation) { + defer wg.Done() + errc <- clus.sendOperation(idx, o) + }(i, op) } - return nil + wg.Wait() + close(errc) + + errs := []string{} + for err := range errc { + if err == nil { + continue + } + + if err != nil && + op == rpcpb.Operation_DestroyEtcdAgent && + strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { + // agent server has already closed; + // so this error is expected + clus.lg.Info("successfully destroyed all") + continue + } + errs = append(errs, err.Error()) + } + + if len(errs) == 0 { + return nil + } + return errors.New(strings.Join(errs, ", ")) } func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {