Passed
Push — dev ( 854d9c...2ebbae )
by Stephan
01:34 queued 11s
created

data.cli.egon_data()   F

Complexity

Conditions 30

Size

Total Lines 272
Code Lines 165

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 30
eloc 165
nop 2
dl 0
loc 272
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.cli.egon_data() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Module that contains the command line app.
3
4
Why does this file exist, and why not put this in __main__?
5
6
  You might be tempted to import things from __main__ later, but that will
7
  cause problems: the code will get executed twice:
8
9
  - When you run `python -megon.data` python will execute
10
    ``__main__.py`` as a script. That means there won't be any
11
    ``egon.data.__main__`` in ``sys.modules``.
12
  - When you import __main__ it will get executed again (as a module) because
13
    there's no ``egon.data.__main__`` in ``sys.modules``.
14
15
  Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration
16
"""
17
from multiprocessing import Process
18
from pathlib import Path
19
import os
20
import socket
21
import subprocess
22
import sys
23
import time
24
25
from psycopg2 import OperationalError as PSPGOE
26
from sqlalchemy import create_engine
27
from sqlalchemy.exc import OperationalError as SQLAOE
28
import click
29
import importlib_resources as resources
30
import yaml
31
32
from egon.data import logger
33
import egon.data
34
import egon.data.airflow
35
import egon.data.config as config
36
37
38
@click.command(
39
    add_help_option=False,
40
    context_settings=dict(allow_extra_args=True, ignore_unknown_options=True),
41
)
42
@click.pass_context
43
def airflow(context):
44
    subprocess.run(["airflow"] + context.args)
45
46
47
@click.command(context_settings={"help_option_names": ["-h", "--help"]})
48
@click.pass_context
49
def serve(context):
50
    """Start the airflow webapp controlling the egon-data pipeline.
51
52
    Airflow needs, among other things, a metadata database and a running
53
    scheduler. This command acts as a shortcut, creating the database if it
54
    doesn't exist and starting the scheduler in the background before starting
55
    the webserver.
56
57
    """
58
    subprocess.run(["airflow", "initdb"])
59
    scheduler = Process(
60
        target=subprocess.run,
61
        args=(["airflow", "scheduler"],),
62
        kwargs=dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL),
63
    )
64
    scheduler.start()
65
    subprocess.run(["airflow", "webserver"])
66
67
68
@click.group(
69
    name="egon-data", context_settings={"help_option_names": ["-h", "--help"]}
70
)
71
@click.option(
72
    "--airflow-database-name",
73
    default="airflow",
74
    metavar="DB",
75
    help=("Specify the name of the airflow metadata database."),
76
    show_default=True,
77
)
78
@click.option(
79
    "--database-name",
80
    "--database",
81
    default="egon-data",
82
    metavar="DB",
83
    help=(
84
        "Specify the name of the local database. The database will be"
85
        " created if it doesn't already exist.\n\n\b"
86
        ' Note: "--database" is deprecated and will be removed in the'
87
        " future. Please use the longer but consistent"
88
        ' "--database-name".'
89
    ),
90
    show_default=True,
91
)
92
@click.option(
93
    "--database-user",
94
    default="egon",
95
    metavar="USERNAME",
96
    help=("Specify the user used to access the local database."),
97
    show_default=True,
98
)
99
@click.option(
100
    "--database-host",
101
    default="127.0.0.1",
102
    metavar="HOST",
103
    help=("Specify the host on which the local database is running."),
104
    show_default=True,
105
)
106
@click.option(
107
    "--database-port",
108
    default="59734",
109
    metavar="PORT",
110
    help=("Specify the port on which the local DBMS is listening."),
111
    show_default=True,
112
)
113
@click.option(
114
    "--database-password",
115
    default="data",
116
    metavar="PW",
117
    help=("Specify the password used to access the local database."),
118
    show_default=True,
119
)
120
@click.option(
121
    "--dataset-boundary",
122
    type=click.Choice(["Everything", "Schleswig-Holstein"]),
123
    default="Everything",
124
    help=(
125
        "Choose to limit the processed data to one of the available"
126
        " built-in boundaries."
127
    ),
128
    show_default=True,
129
)
130
@click.option(
131
    "--jobs",
132
    default=1,
133
    metavar="N",
134
    help=(
135
        "Spawn at maximum N tasks in parallel. Remember that in addition"
136
        " to that, there's always the scheduler and probably the server"
137
        " running."
138
    ),
139
    show_default=True,
140
)
141
@click.option(
142
    "--docker-container-name",
143
    default="egon-data-local-database-container",
144
    metavar="NAME",
145
    help=(
146
        "The name of the Docker container containing the local database."
147
        " You usually can stick to the default, unless you run into errors"
148
        " due to clashing names and don't want to delete or rename your old"
149
        " containers."
150
    ),
151
    show_default=True,
152
)
153
@click.version_option(version=egon.data.__version__)
154
@click.pass_context
155
def egon_data(context, **kwargs):
156
    """Run and control the eGo^n data processing pipeline.
157
158
    It is recommended to create a dedicated working directory in which to
159
    run `egon-data` because `egon-data` will use it's working directory to
160
    store configuration files and other data generated during a workflow
161
    run. Go to to a location where you want to store eGon-data project data
162
    and create a new directory via:
163
164
        `mkdir egon-data-production && cd egon-data-production`
165
166
    Of course you are free to choose a different directory name.
167
168
    It is also recommended to use separate directories for production and
169
    test mode. In test mode, you should also use a different database. This
170
    will be created and used by typing e.g.:
171
172
        `egon-data --database-name 'test-egon-data' serve`
173
174
    It is important that options (e.g. `--database-name`) are placed before
175
    commands (e.g. `serve`).
176
177
    For using a smaller dataset in the test mode, use the option
178
    `--dataset-boundary`. The complete command for starting Aiflow in test
179
    mode with using a separate database is
180
181
        `egon-data --database-name 'test-egon-data' --dataset-boundary
182
        'Schleswig-Holstein' serve`
183
184
    Whenever `egon-data` is executed, it searches for the configuration file
185
    "egon-data.configuration.yaml" in CWD. If that file doesn't exist,
186
    `egon-data` will create one, containing the command line parameters
187
    supplied, as well as the defaults for those switches for which no value
188
    was supplied.
189
    This means, run the above command that specifies a custom database once.
190
    Afterwards, it's sufficient to execute `egon-data serve` in the same
191
    directory and the same configuration will be used. You can also edit the
192
    configuration the file "egon-data.configuration.yaml" manually.
193
194
    Last but not least, if you're using the default behaviour of setting
195
    up the database in a Docker container, the working directory will
196
    also contain a directory called "docker", containing the database
197
    data as well as other volumes used by the "Docker"ed database.
198
199
    """
200
201
    # Adapted from the `merge_copy` implementation at:
202
    #
203
    #   https://stackoverflow.com/questions/29847098/the-best-way-to-merge-multi-nested-dictionaries-in-python-2-7
204
    #
205
    def merge(d1, d2):
206
        return {
207
            k: d1[k]
208
            if k in d1 and k not in d2
209
            else d2[k]
210
            if k not in d1 and k in d2
211
            else merge(d1[k], d2[k])
212
            if isinstance(d1[k], dict) and isinstance(d2[k], dict)
213
            else d2[k]
214
            for k in set(d1).union(d2)
215
        }
216
217
    def options(value, check=None):
218
        check = value if check is None else check
219
        flags = {p.opts[0]: value(p) for p in egon_data.params if check(p)}
220
        return {"egon-data": flags}
221
222
    options = {
223
        "cli": options(lambda o: kwargs[o.name], lambda o: o.name in kwargs),
224
        "defaults": options(lambda o: o.default),
225
    }
226
227
    combined = merge(options["defaults"], options["cli"])
228
    if not config.paths()[0].exists():
229
        with open(config.paths()[0], "w") as f:
230
            f.write(yaml.safe_dump(combined))
231
    else:
232
        with open(config.paths()[0], "r") as f:
233
            stored = yaml.safe_load(f)
234
        with open(config.paths()[0], "w") as f:
235
            f.write(yaml.safe_dump(merge(combined, stored)))
236
237
    # Alternatively:
238
    #   `if config.paths(pid="*") != [config.paths(pid="current")]:`
239
    #   or compare file contents.
240
    if len(config.paths(pid="*")) > 1:
241
        logger.error(
242
            "Found more than one configuration file belonging to a"
243
            " specific `egon-data` process. Unable to decide which one"
244
            " to use.\nExiting."
245
        )
246
        sys.exit(1)
247
248
    if len(config.paths(pid="*")) == 1:
249
        logger.info(
250
            "Ignoring supplied options. Found a configuration file"
251
            " belonging to a different `egon-data` process. Using that"
252
            " one."
253
        )
254
        with open(config.paths(pid="*")[0]) as f:
255
            options = yaml.load(f, Loader=yaml.SafeLoader)
256
    else:  # len(config.paths(pid="*")) == 0, so need to create one.
257
        with open(config.paths()[0]) as f:
258
            options["file"] = yaml.load(f, Loader=yaml.SafeLoader)
259
        options = dict(
260
            options.get("file", {}),
261
            **{
262
                flag: options["cli"][flag]
263
                for flag in options["cli"]
264
                if options["cli"][flag] != options["defaults"][flag]
265
            },
266
        )
267
        with open(config.paths(pid="current")[0], "w") as f:
268
            f.write(yaml.safe_dump(options))
269
270
    def render(template, target, update=True, inserts={}, **more_inserts):
271
        os.makedirs(target.parent, exist_ok=True)
272
        rendered = resources.read_text(egon.data.airflow, template).format(
273
            **dict(inserts, **more_inserts)
274
        )
275
        if not target.exists():
276
            with open(target, "w") as f:
277
                f.write(rendered)
278
        elif update:
279
            with open(target, "r") as f:
280
                old = f.read()
281
            if old != rendered:
282
                with open(target, "w") as f:
283
                    f.write(rendered)
284
285
    os.environ["AIRFLOW_HOME"] = str((Path(".") / "airflow").absolute())
286
    options = options["egon-data"]
287
288
    render(
289
        "airflow.cfg",
290
        Path(".") / "airflow" / "airflow.cfg",
291
        inserts=options,
292
        dags=str(resources.files(egon.data.airflow).absolute()),
293
        update=False,
294
    )
295
    render(
296
        "docker-compose.yml",
297
        Path(".") / "docker" / "docker-compose.yml",
298
        update=False,
299
        inserts=options,
300
        airflow=resources.files(egon.data.airflow),
301
    )
302
303
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
304
        code = s.connect_ex(
305
            (options["--database-host"], int(options["--database-port"]))
306
        )
307
    if code != 0:
308
        subprocess.run(
309
            ["docker-compose", "up", "-d", "--build"],
310
            cwd=str((Path(".") / "docker").absolute()),
311
        )
312
        time.sleep(1.5)  # Give the container time to boot.
313
314
    engine = create_engine(
315
        (
316
            "postgresql+psycopg2://{--database-user}:{--database-password}"
317
            "@{--database-host}:{--database-port}"
318
            "/{--airflow-database-name}"
319
        ).format(**options),
320
        echo=False,
321
    )
322
    while True:  # Might still not be done booting. Poke it it's up.
323
        try:
324
            connection = engine.connect()
325
            break
326
        except PSPGOE:
327
            pass
328
        except SQLAOE:
329
            pass
330
    with connection.execution_options(
331
        isolation_level="AUTOCOMMIT"
332
    ) as connection:
333
        databases = [
334
            row[0]
335
            for row in connection.execute("SELECT datname FROM pg_database;")
336
        ]
337
        if not options["--database-name"] in databases:
338
            connection.execute(
339
                f'CREATE DATABASE "{options["--database-name"]}";'
340
            )
341
342
343
def main():
344
    egon_data.add_command(airflow)
345
    egon_data.add_command(serve)
346
    try:
347
        egon_data.main(sys.argv[1:])
348
    finally:
349
        try:
350
            config.paths(pid="current")[0].unlink()
351
        except FileNotFoundError:
352
            pass
353