Passed
Pull Request — dev (#475)
by Stephan
02:15
created

test_egon-data.test_pipeline_importability()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 0
dl 0
loc 10
rs 9.9
c 0
b 0
f 0
1
from importlib import import_module
2
3
from click.testing import CliRunner
4
import pytest
5
6
from egon.data import __version__
7
from egon.data.cli import egon_data
8
9
10
def test_main():
11
    runner = CliRunner()
12
    with runner.isolated_filesystem():
13
        result = runner.invoke(egon_data, ["--version"])
14
15
    assert result.output == "{name}, version {version}\n".format(
16
        name=egon_data.name, version=__version__
17
    )
18
    assert result.exit_code == 0
19
20
21
@pytest.mark.skip(
22
    reason=(
23
        "Needs `docker` and/or PostgreSQL and we're currently not making sure"
24
        "\nthese are present on the continuous integration service(s) we use."
25
    )
26
)
27
def test_airflow():
28
    """ Test that `egon-data airflow` correctly forwards to airflow. """
29
    runner = CliRunner()
30
    with runner.isolated_filesystem():
31
        result = runner.invoke(egon_data, ["airflow", "--help"])
32
    assert result.output == ""
33
34
35
def test_pipeline_importability():
36
    error = None
37
    for m in ["egon.data.airflow.dags.pipeline"]:
38
        try:
39
            import_module(m)
40
        except Exception as e:
41
            error = e
42
        assert error is None, (
43
            "\nDid not expect an error when importing:\n\n  `{}`\n\nGot: {}"
44
        ).format(m, error)
45