data.cli.egon_data()   F
last analyzed

Complexity

Conditions 32

Size

Total Lines 416
Code Lines 252

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 252
dl 0
loc 416
rs 0
c 0
b 0
f 0
cc 32
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.cli.egon_data() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Module that contains the command line app.
3
4
Why does this file exist, and why not put this in __main__?
5
6
  You might be tempted to import things from __main__ later, but that will
7
  cause problems: the code will get executed twice:
8
9
  - When you run `python -megon.data` python will execute
10
    ``__main__.py`` as a script. That means there won't be any
11
    ``egon.data.__main__`` in ``sys.modules``.
12
  - When you import __main__ it will get executed again (as a module) because
13
    there's no ``egon.data.__main__`` in ``sys.modules``.
14
15
  Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration
16
"""
17
import os
18
import shutil
19
import socket
20
import subprocess
21
import sys
22
import time
23
from multiprocessing import Process
24
from pathlib import Path
25
26
import click
27
import yaml
28
from psycopg2 import OperationalError as PSPGOE
29
30
import egon.data
31
import egon.data.airflow
32
import egon.data.config as config
33
import importlib_resources as resources
34
from egon.data import logger
35
from sqlalchemy import create_engine
36
from sqlalchemy.exc import OperationalError as SQLAOE
37
from sqlalchemy.orm import Session
38
39
40
@click.group(
41
    name="egon-data", context_settings={"help_option_names": ["-h", "--help"]}
42
)
43
@click.option(
44
    "--airflow-database-name",
45
    default="airflow",
46
    metavar="DB",
47
    help=("Specify the name of the airflow metadata database."),
48
    show_default=True,
49
)
50
@click.option(
51
    "--database-name",
52
    "--database",
53
    default="egon-data",
54
    metavar="DB",
55
    help=(
56
        "Specify the name of the local database. The database will be"
57
        " created if it doesn't already exist.\n\n\b"
58
        ' Note: "--database" is deprecated and will be removed in the'
59
        " future. Please use the longer but consistent"
60
        ' "--database-name".'
61
    ),
62
    show_default=True,
63
)
64
@click.option(
65
    "--database-user",
66
    default="egon",
67
    metavar="USERNAME",
68
    help=("Specify the user used to access the local database."),
69
    show_default=True,
70
)
71
@click.option(
72
    "--database-host",
73
    default="127.0.0.1",
74
    metavar="HOST",
75
    help=("Specify the host on which the local database is running."),
76
    show_default=True,
77
)
78
@click.option(
79
    "--database-port",
80
    default="59734",
81
    metavar="PORT",
82
    help=("Specify the port on which the local DBMS is listening."),
83
    show_default=True,
84
)
85
@click.option(
86
    "--database-password",
87
    default="data",
88
    metavar="PW",
89
    help=("Specify the password used to access the local database."),
90
    show_default=True,
91
)
92
@click.option(
93
    "--dataset-boundary",
94
    type=click.Choice(["Everything", "Schleswig-Holstein"]),
95
    default="Everything",
96
    help=(
97
        "Choose to limit the processed data to one of the available"
98
        " built-in boundaries."
99
    ),
100
    show_default=True,
101
)
102
@click.option(
103
    "--household-electrical-demand-source",
104
    type=click.Choice(["bottom-up-profiles", "slp"]),
105
    default="slp",
106
    help=(
107
        "Choose the source to calculate and allocate household electrical"
108
        "demands. There are currently two options:"
109
        "'bottom-up-profiles' and 'slp' (Standard Load Profiles)"
110
    ),
111
    show_default=True,
112
)
113
@click.option(
114
    "--jobs",
115
    default=1,
116
    metavar="N",
117
    help=(
118
        "Spawn at maximum N tasks in parallel. Remember that in addition"
119
        " to that, there's always the scheduler and probably the server"
120
        " running."
121
    ),
122
    show_default=True,
123
)
124
@click.option(
125
    "--processes-per-task",
126
    default=1,
127
    metavar="N_PROCESS",
128
    help=(
129
        "Each task can use at maximum N_PROCESS parallel processes. Remember"
130
        " that in addition to that, tasks can run in parallel (see N) and"
131
        " there's always the scheduler and probably the serverrunning."
132
    ),
133
    show_default=True,
134
)
135
@click.option(
136
    "--docker-container-name",
137
    default="egon-data-local-database-container",
138
    metavar="NAME",
139
    help=(
140
        "The name of the Docker container containing the local database."
141
        " You usually can stick to the default, unless you run into errors"
142
        " due to clashing names and don't want to delete or rename your old"
143
        " containers."
144
    ),
145
    show_default=True,
146
)
147
@click.option(
148
    "--compose-project-name",
149
    default="egon-data",
150
    metavar="PROJECT",
151
    help=(
152
        "The name of the Docker project."
153
        " Different compose_project_names are needed to run multiple instances"
154
        " of egon-data on the same machine."
155
    ),
156
    show_default=True,
157
)
158
@click.option(
159
    "--airflow-port",
160
    default=8080,
161
    metavar="AIRFLOW_PORT",
162
    help=("Specify the port on which airflow runs."),
163
    show_default=True,
164
)
165
@click.option(
166
    "--random-seed",
167
    default=42,
168
    metavar="RANDOM_SEED",
169
    help=(
170
        "Random seed used by some tasks in the pipeline to ensure "
171
        " deterministic behaviour. All published results in the eGon project "
172
        " will be created with the default value so keep it if you want to "
173
        " make sure to get the same results."
174
    ),
175
    show_default=True,
176
)
177
@click.option(
178
    "--scenarios",
179
    default=["status2023", "eGon2035"],
180
    metavar="SCENARIOS",
181
    help=(
182
        "Scenario name for which a data model shall be created."
183
        " If you want to create multiple scenarios, set the parameter multiple"
184
        " times, e.g. --scenarios eGon2035 --scenarios status2023"
185
        ),
186
    multiple=True,
187
    show_default=True,
188
)
189
@click.option(
190
    "--run-pypsa-eur",
191
    default=False,
192
    metavar="RUN_PYPSA_EUR",
193
    help=(
194
        "State if pypsa-eur should be executed and installed within egon-data."
195
        " If set to false, a predefined network from the data bundle is used."
196
    ),
197
    show_default=True,
198
)
199
@click.option(
200
    "--prefix",
201
    default=None,
202
    metavar="PREFIX",
203
    help=(
204
        "Add optional prefix to the DAG name in the Airflow config. "
205
    ),
206
    show_default=True,
207
)
208
209
@click.version_option(version=egon.data.__version__)
210
@click.pass_context
211
def egon_data(context, **kwargs):
212
    """Run and control the eGo^n data processing pipeline.
213
214
    It is recommended to create a dedicated working directory in which to
215
    run `egon-data` because `egon-data` will use it's working directory to
216
    store configuration files and other data generated during a workflow
217
    run. Go to to a location where you want to store eGon-data project data
218
    and create a new directory via:
219
220
        `mkdir egon-data-production && cd egon-data-production`
221
222
    Of course you are free to choose a different directory name.
223
224
    It is also recommended to use separate directories for production and
225
    test mode. In test mode, you should also use a different database. This
226
    will be created and used by typing e.g.:
227
228
        `egon-data --database-name 'test-egon-data' serve`
229
230
    It is important that options (e.g. `--database-name`) are placed before
231
    commands (e.g. `serve`).
232
233
    For using a smaller dataset in the test mode, use the option
234
    `--dataset-boundary`. The complete command for starting Aiflow in test
235
    mode with using a separate database is
236
237
        `egon-data --database-name 'test-egon-data' --dataset-boundary
238
        'Schleswig-Holstein' serve`
239
240
    Whenever `egon-data` is executed, it searches for the configuration file
241
    "egon-data.configuration.yaml" in CWD. If that file doesn't exist,
242
    `egon-data` will create one, containing the command line parameters
243
    supplied, as well as the defaults for those switches for which no value
244
    was supplied.
245
    This means, run the above command that specifies a custom database once.
246
    Afterwards, it's sufficient to execute `egon-data serve` in the same
247
    directory and the same configuration will be used. You can also edit the
248
    configuration the file "egon-data.configuration.yaml" manually.
249
250
    Last but not least, if you're using the default behaviour of setting
251
    up the database in a Docker container, the working directory will
252
    also contain a directory called "docker", containing the database
253
    data as well as other volumes used by the "Docker"ed database.
254
255
    """
256
257
    # Adapted from the `merge_copy` implementation at:
258
    #
259
    #   https://stackoverflow.com/questions/29847098/the-best-way-to-merge-multi-nested-dictionaries-in-python-2-7
260
    #
261
    def merge(d1, d2):
262
        return {
263
            k: d1[k]
264
            if k in d1 and k not in d2
265
            else d2[k]
266
            if k not in d1 and k in d2
267
            else merge(d1[k], d2[k])
268
            if isinstance(d1[k], dict) and isinstance(d2[k], dict)
269
            else d2[k]
270
            for k in set(d1).union(d2)
271
        }
272
273
    def options(value, check=None):
274
        check = value if check is None else check
275
        flags = {p.opts[0]: value(p) for p in egon_data.params if check(p)}
276
        return {"egon-data": flags}
277
278
    options = {
279
        "cli": options(lambda o: kwargs[o.name], lambda o: o.name in kwargs),
280
        "defaults": options(lambda o: o.default),
281
    }
282
283
    # Fix: Convert 'scenarios' to list if it exists
284
    if "scenarios" in options["cli"]:
285
        options["cli"]["scenarios"] = list(options["cli"]["scenarios"])
286
287
    combined = merge(options["defaults"], options["cli"])
288
    if not config.paths()[0].exists():
289
        with open(config.paths()[0], "w") as f:
290
            f.write(yaml.safe_dump(combined))
291
    else:
292
        with open(config.paths()[0], "r") as f:
293
            stored = yaml.safe_load(f)
294
        with open(config.paths()[0], "w") as f:
295
            f.write(yaml.safe_dump(merge(combined, stored)))
296
297
    # Alternatively:
298
    #   `if config.paths(pid="*") != [config.paths(pid="current")]:`
299
    #   or compare file contents.
300
    if len(config.paths(pid="*")) > 1:
301
        logger.error(
302
            "Found more than one configuration file belonging to a"
303
            " specific `egon-data` process. Unable to decide which one"
304
            " to use.\nExiting."
305
        )
306
        sys.exit(1)
307
308
    if len(config.paths(pid="*")) == 1:
309
        logger.info(
310
            "Ignoring supplied options. Found a configuration file"
311
            " belonging to a different `egon-data` process. Using that"
312
            " one."
313
        )
314
        with open(config.paths(pid="*")[0]) as f:
315
            options = yaml.load(f, Loader=yaml.SafeLoader)
316
    else:  # len(config.paths(pid="*")) == 0, so need to create one.
317
318
        with open(config.paths()[0]) as f:
319
            options["file"] = yaml.load(f, Loader=yaml.SafeLoader)
320
321
        options = dict(
322
            options.get("file", {}),
323
            **{
324
                flag: options["file"][flag]
325
                for flag in options["cli"]
326
                if options["cli"][flag] != options["defaults"][flag]
327
            },
328
        )
329
330
        with open(config.paths(pid="current")[0], "w") as f:
331
            f.write(yaml.safe_dump(options))
332
333
    def render(template, target, update=True, inserts={}, **more_inserts):
334
        os.makedirs(target.parent, exist_ok=True)
335
        rendered = resources.read_text(egon.data.airflow, template).format(
336
            **dict(inserts, **more_inserts)
337
        )
338
        if not target.exists():
339
            with open(target, "w") as f:
340
                f.write(rendered)
341
        elif update:
342
            with open(target, "r") as f:
343
                old = f.read()
344
            if old != rendered:
345
                with open(target, "w") as f:
346
                    f.write(rendered)
347
348
    os.environ["AIRFLOW_HOME"] = str((Path(".") / "airflow").absolute())
349
350
    options = options["egon-data"]
351
    render(
352
        "airflow.cfg",
353
        Path(".") / "airflow" / "airflow.cfg",
354
        inserts=options,
355
        dags=str(resources.files(egon.data.airflow).absolute()),
356
    )
357
    render(
358
        "docker-compose.yml",
359
        Path(".") / "docker" / "docker-compose.yml",
360
        update=False,
361
        inserts=options,
362
        airflow=resources.files(egon.data.airflow),
363
        gid=os.getgid(),
364
        uid=os.getuid(),
365
    )
366
    (Path(".") / "docker" / "database-data").mkdir(parents=True, exist_ok=True)
367
368
    # Copy webserver_config.py to disable authentification on webinterface
369
    shutil.copy2(
370
        os.path.dirname(egon.data.airflow.__file__) + "/webserver_config.py",
371
        Path(".") / "airflow/webserver_config.py",
372
    )
373
374
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
375
        code = s.connect_ex(
376
            (options["--database-host"], int(options["--database-port"]))
377
        )
378
    if code != 0:
379
        subprocess.run(
380
            [
381
                "docker-compose",
382
                "-p",
383
                options["--compose-project-name"],
384
                "up",
385
                "-d",
386
                "--build",
387
            ],
388
            cwd=str((Path(".") / "docker").absolute()),
389
        )
390
        time.sleep(1.5)  # Give the container time to boot.
391
392
    # TODO: Since "AIRFLOW_HOME" needs to be set before importing `conf`, the
393
    #       import can only be done inside this function, which is generally
394
    #       frowned upon, instead of at the module level. Maybe there's a
395
    #       better way to encapsulate this?
396
    from airflow.configuration import conf as airflow_cfg
397
    from airflow.models import Connection
398
399
    engine = create_engine(
400
        (
401
            "postgresql+psycopg2://{--database-user}:{--database-password}"
402
            "@{--database-host}:{--database-port}"
403
            "/{--airflow-database-name}"
404
        ).format(**options),
405
        echo=False,
406
    )
407
    while True:  # Might still not be done booting. Poke it it's up.
408
        try:
409
            connection = engine.connect()
410
            break
411
        except PSPGOE:
412
            pass
413
        except SQLAOE:
414
            pass
415
    with connection.execution_options(
416
        isolation_level="AUTOCOMMIT"
417
    ) as connection:
418
        databases = [
419
            row[0]
420
            for row in connection.execute("SELECT datname FROM pg_database;")
421
        ]
422
        if not options["--database-name"] in databases:
423
            connection.execute(
424
                f'CREATE DATABASE "{options["--database-name"]}";'
425
            )
426
427
    subprocess.run(["airflow", "db", "init"])
428
429
    # TODO: Constrain SQLAlchemy's lower version to 1.4 and use a `with` block
430
    #       like the one in the last commented line to avoid an explicit
431
    #       `commit`. This can then also be used to get rid of the
432
    #       `egon.data.db.session_scope` context manager and use the new
433
    #       buil-in one instead. And we can migrate to the SQLA 2.0 query
434
    #       API.
435
    # with Session(engine) as airflow, airflow.begin():
436
    engine = create_engine(airflow_cfg.get("core", "SQL_ALCHEMY_CONN"))
437
    airflow = Session(engine)
438
    connection = (
439
        airflow.query(Connection).filter_by(conn_id="egon_data").one_or_none()
440
    )
441
    connection = connection if connection else Connection(conn_id="egon_data")
442
    connection.login = options["--database-user"]
443
    connection.password = options["--database-password"]
444
    connection.host = options["--database-host"]
445
    connection.port = options["--database-port"]
446
    connection.schema = options["--database-name"]
447
    connection.conn_type = "postgres"
448
    airflow.add(connection)
449
    airflow.commit()
450
451
    # TODO: This should probably rather be done during the database
452
    #       initialization workflow task.
453
    from egon.data.datasets import setup
454
455
    setup()
456
457
458
@egon_data.command(
459
    add_help_option=False,
460
    context_settings=dict(allow_extra_args=True, ignore_unknown_options=True),
461
)
462
@click.pass_context
463
def airflow(context):
464
    subprocess.run(["airflow"] + context.args)
465
466
467
@egon_data.command(
468
    context_settings={
469
        "allow_extra_args": True,
470
        "help_option_names": ["-h", "--help"],
471
        "ignore_unknown_options": True,
472
    }
473
)
474
@click.pass_context
475
def serve(context):
476
    """Start the airflow webapp controlling the egon-data pipeline.
477
478
    Airflow needs, among other things, a metadata database and a running
479
    scheduler. This command acts as a shortcut, creating the database if it
480
    doesn't exist and starting the scheduler in the background before starting
481
    the webserver.
482
483
    Any OPTIONS other than `-h`/`--help` will be forwarded to
484
    `airflow webserver`, so you can for example specify an alternate port
485
    for the webapp to listen on via `egon-data serve -p PORT_NUMBER`.
486
    Find out more about the possible webapp options via:
487
488
        `egon-data airflow webserver --help`.
489
    """
490
    scheduler = Process(
491
        target=subprocess.run,
492
        args=(["airflow", "scheduler"],),
493
        kwargs=dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL),
494
    )
495
    scheduler.start()
496
    subprocess.run(["airflow", "webserver"] + context.args)
497
498
499
def main():
500
    try:
501
        egon_data.main(sys.argv[1:])
502
    finally:
503
        try:
504
            config.paths(pid="current")[0].unlink()
505
        except FileNotFoundError:
506
            pass
507