Passed
Push — dev ( 13b57a...d34095 )
by Stephan
01:05 queued 10s
created

data.cli.main()   B

Complexity

Conditions 3

Size

Total Lines 46
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 39
nop 2
dl 0
loc 46
rs 8.9439
c 0
b 0
f 0
1
"""
2
Module that contains the command line app.
3
4
Why does this file exist, and why not put this in __main__?
5
6
  You might be tempted to import things from __main__ later, but that will
7
  cause problems: the code will get executed twice:
8
9
  - When you run `python -megon.data` python will execute
10
    ``__main__.py`` as a script. That means there won't be any
11
    ``egon.data.__main__`` in ``sys.modules``.
12
  - When you import __main__ it will get executed again (as a module) because
13
    there's no ``egon.data.__main__`` in ``sys.modules``.
14
15
  Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration
16
"""
17
from multiprocessing import Process
18
import os
19
import os.path
20
import subprocess
21
22
import click
23
import yaml
24
25
import egon.data
26
import egon.data.airflow
27
28
29
@click.command(
30
    add_help_option=False,
31
    context_settings=dict(allow_extra_args=True, ignore_unknown_options=True),
32
)
33
@click.pass_context
34
def airflow(context):
35
    subprocess.run(["airflow"] + context.args)
36
37
38
@click.command()
39
@click.pass_context
40
def serve(context):
41
    """Start the airflow webapp controlling the egon-data pipeline.
42
43
    Airflow needs, among other things, a metadata database and a running
44
    scheduler. This command acts as a shortcut, creating the database if it
45
    doesn't exist and starting the scheduler in the background before starting
46
    the webserver.
47
48
    """
49
    subprocess.run(["airflow", "initdb"])
50
    scheduler = Process(
51
        target=subprocess.run,
52
        args=(["airflow", "scheduler"],),
53
        kwargs=dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL),
54
    )
55
    scheduler.start()
56
    subprocess.run(["airflow", "webserver"])
57
58
59
@click.group()
60
@click.option(
61
    "--database",
62
    metavar="DB",
63
    help=("Specify the name of the local database."),
64
)
65
@click.option(
66
    "--database-host",
67
    metavar="HOST",
68
    help=("Specify the host on which the local database is running."),
69
)
70
@click.option(
71
    "--database-password",
72
    metavar="PW",
73
    help=("Specify the password used to access the local database."),
74
)
75
@click.option(
76
    "--database-port",
77
    metavar="PORT",
78
    help=("Specify the port on which the local DBMS is listening."),
79
)
80
@click.option(
81
    "--database-user",
82
    metavar="USERNAME",
83
    help=("Specify the user used to access the local database."),
84
)
85
@click.version_option(version=egon.data.__version__)
86
@click.pass_context
87
def main(context, **kwargs):
88
    os.environ["AIRFLOW_HOME"] = os.path.dirname(egon.data.airflow.__file__)
89
    translations = {
90
        "database": "POSTGRES_DB",
91
        "database_password": "POSTGRES_PASSWORD",
92
        "database_host": "HOST",
93
        "database_port": "PORT",
94
        "database_user": "POSTGRES_USER",
95
    }
96
    database_configuration = {
97
        translations[k]: kwargs[k]
98
        for k in kwargs
99
        if "database" in k
100
        if kwargs[k] is not None
101
    }
102
    if database_configuration:
103
        with open("local-database.yaml", "w") as configuration:
104
            configuration.write(yaml.safe_dump(database_configuration))
105
106
107
main.name = "egon-data"
108
main.add_command(airflow)
109
main.add_command(serve)
110