| Total Complexity | 7 |
| Total Lines | 45 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | from importlib import import_module |
||
| 2 | |||
| 3 | from click.testing import CliRunner |
||
| 4 | import pytest |
||
| 5 | |||
| 6 | from egon.data import __version__ |
||
| 7 | from egon.data.cli import egon_data |
||
| 8 | |||
| 9 | |||
| 10 | def test_main(): |
||
| 11 | runner = CliRunner() |
||
| 12 | with runner.isolated_filesystem(): |
||
| 13 | result = runner.invoke(egon_data, ["--version"]) |
||
| 14 | |||
| 15 | assert result.output == "{name}, version {version}\n".format( |
||
| 16 | name=egon_data.name, version=__version__ |
||
| 17 | ) |
||
| 18 | assert result.exit_code == 0 |
||
| 19 | |||
| 20 | |||
| 21 | @pytest.mark.skip( |
||
| 22 | reason=( |
||
| 23 | "Needs `docker` and/or PostgreSQL and we're currently not making sure" |
||
| 24 | "\nthese are present on the continuous integration service(s) we use." |
||
| 25 | ) |
||
| 26 | ) |
||
| 27 | def test_airflow(): |
||
| 28 | """ Test that `egon-data airflow` correctly forwards to airflow. """ |
||
| 29 | runner = CliRunner() |
||
| 30 | with runner.isolated_filesystem(): |
||
| 31 | result = runner.invoke(egon_data, ["airflow", "--help"]) |
||
| 32 | assert result.output == "" |
||
| 33 | |||
| 34 | |||
| 35 | def test_pipeline_importability(): |
||
| 36 | error = None |
||
| 37 | for m in ["egon.data.airflow.dags.pipeline"]: |
||
| 38 | try: |
||
| 39 | import_module(m) |
||
| 40 | except Exception as e: |
||
| 41 | error = e |
||
| 42 | assert error is None, ( |
||
| 43 | "\nDid not expect an error when importing:\n\n `{}`\n\nGot: {}" |
||
| 44 | ).format(m, error) |
||
| 45 |