data.cli   A
last analyzed

Complexity

Total Complexity 36

Size/Duplication

Total Lines 503
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 36
eloc 297
dl 0
loc 503
rs 9.52
c 0
b 0
f 0

4 Functions

Rating   Name   Duplication   Size   Complexity  
A serve() 0 30 1
A main() 0 8 2
A airflow() 0 7 1
F egon_data() 0 412 32
1
"""
2
Module that contains the command line app.
3
4
Why does this file exist, and why not put this in __main__?
5
6
  You might be tempted to import things from __main__ later, but that will
7
  cause problems: the code will get executed twice:
8
9
  - When you run `python -megon.data` python will execute
10
    ``__main__.py`` as a script. That means there won't be any
11
    ``egon.data.__main__`` in ``sys.modules``.
12
  - When you import __main__ it will get executed again (as a module) because
13
    there's no ``egon.data.__main__`` in ``sys.modules``.
14
15
  Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration
16
"""
17
import os
18
import shutil
19
import socket
20
import subprocess
21
import sys
22
import time
23
from multiprocessing import Process
24
from pathlib import Path
25
26
import click
27
import yaml
28
from psycopg2 import OperationalError as PSPGOE
29
30
import egon.data
31
import egon.data.airflow
32
import egon.data.config as config
33
import importlib_resources as resources
34
from egon.data import logger
35
from sqlalchemy import create_engine
36
from sqlalchemy.exc import OperationalError as SQLAOE
37
from sqlalchemy.orm import Session
38
39
40
@click.group(
41
    name="egon-data", context_settings={"help_option_names": ["-h", "--help"]}
42
)
43
@click.option(
44
    "--airflow-database-name",
45
    default="airflow",
46
    metavar="DB",
47
    help=("Specify the name of the airflow metadata database."),
48
    show_default=True,
49
)
50
@click.option(
51
    "--database-name",
52
    "--database",
53
    default="egon-data",
54
    metavar="DB",
55
    help=(
56
        "Specify the name of the local database. The database will be"
57
        " created if it doesn't already exist.\n\n\b"
58
        ' Note: "--database" is deprecated and will be removed in the'
59
        " future. Please use the longer but consistent"
60
        ' "--database-name".'
61
    ),
62
    show_default=True,
63
)
64
@click.option(
65
    "--database-user",
66
    default="egon",
67
    metavar="USERNAME",
68
    help=("Specify the user used to access the local database."),
69
    show_default=True,
70
)
71
@click.option(
72
    "--database-host",
73
    default="127.0.0.1",
74
    metavar="HOST",
75
    help=("Specify the host on which the local database is running."),
76
    show_default=True,
77
)
78
@click.option(
79
    "--database-port",
80
    default="59734",
81
    metavar="PORT",
82
    help=("Specify the port on which the local DBMS is listening."),
83
    show_default=True,
84
)
85
@click.option(
86
    "--database-password",
87
    default="data",
88
    metavar="PW",
89
    help=("Specify the password used to access the local database."),
90
    show_default=True,
91
)
92
@click.option(
93
    "--dataset-boundary",
94
    type=click.Choice(["Everything", "Schleswig-Holstein"]),
95
    default="Everything",
96
    help=(
97
        "Choose to limit the processed data to one of the available"
98
        " built-in boundaries."
99
    ),
100
    show_default=True,
101
)
102
@click.option(
103
    "--household-electrical-demand-source",
104
    type=click.Choice(["bottom-up-profiles", "slp"]),
105
    default="slp",
106
    help=(
107
        "Choose the source to calculate and allocate household electrical"
108
        "demands. There are currently two options:"
109
        "'bottom-up-profiles' and 'slp' (Standard Load Profiles)"
110
    ),
111
    show_default=True,
112
)
113
@click.option(
114
    "--jobs",
115
    default=1,
116
    metavar="N",
117
    help=(
118
        "Spawn at maximum N tasks in parallel. Remember that in addition"
119
        " to that, there's always the scheduler and probably the server"
120
        " running."
121
    ),
122
    show_default=True,
123
)
124
@click.option(
125
    "--processes-per-task",
126
    default=1,
127
    metavar="N_PROCESS",
128
    help=(
129
        "Each task can use at maximum N_PROCESS parallel processes. Remember"
130
        " that in addition to that, tasks can run in parallel (see N) and"
131
        " there's always the scheduler and probably the serverrunning."
132
    ),
133
    show_default=True,
134
)
135
@click.option(
136
    "--docker-container-name",
137
    default="egon-data-local-database-container",
138
    metavar="NAME",
139
    help=(
140
        "The name of the Docker container containing the local database."
141
        " You usually can stick to the default, unless you run into errors"
142
        " due to clashing names and don't want to delete or rename your old"
143
        " containers."
144
    ),
145
    show_default=True,
146
)
147
@click.option(
148
    "--compose-project-name",
149
    default="egon-data",
150
    metavar="PROJECT",
151
    help=(
152
        "The name of the Docker project."
153
        " Different compose_project_names are needed to run multiple instances"
154
        " of egon-data on the same machine."
155
    ),
156
    show_default=True,
157
)
158
@click.option(
159
    "--airflow-port",
160
    default=8080,
161
    metavar="AIRFLOW_PORT",
162
    help=("Specify the port on which airflow runs."),
163
    show_default=True,
164
)
165
@click.option(
166
    "--random-seed",
167
    default=42,
168
    metavar="RANDOM_SEED",
169
    help=(
170
        "Random seed used by some tasks in the pipeline to ensure "
171
        " deterministic behaviour. All published results in the eGon project "
172
        " will be created with the default value so keep it if you want to "
173
        " make sure to get the same results."
174
    ),
175
    show_default=True,
176
)
177
@click.option(
178
    "--scenarios",
179
    default=["status2023", "eGon2035"],
180
    metavar="SCENARIOS",
181
    help=("List of scenario names for which a data model shall be created."),
182
    multiple=True,
183
    show_default=True,
184
)
185
@click.option(
186
    "--run-pypsa-eur",
187
    default=False,
188
    metavar="RUN_PYPSA_EUR",
189
    help=(
190
        "State if pypsa-eur should be executed and installed within egon-data."
191
        " If set to false, a predefined network from the data bundle is used."
192
    ),
193
    show_default=True,
194
)
195
@click.option(
196
    "--prefix",
197
    default=None,
198
    metavar="PREFIX",
199
    help=(
200
        "Add optional prefix to the DAG name in the Airflow config. "
201
    ),
202
    show_default=True,
203
)
204
205
@click.version_option(version=egon.data.__version__)
206
@click.pass_context
207
def egon_data(context, **kwargs):
208
    """Run and control the eGo^n data processing pipeline.
209
210
    It is recommended to create a dedicated working directory in which to
211
    run `egon-data` because `egon-data` will use it's working directory to
212
    store configuration files and other data generated during a workflow
213
    run. Go to to a location where you want to store eGon-data project data
214
    and create a new directory via:
215
216
        `mkdir egon-data-production && cd egon-data-production`
217
218
    Of course you are free to choose a different directory name.
219
220
    It is also recommended to use separate directories for production and
221
    test mode. In test mode, you should also use a different database. This
222
    will be created and used by typing e.g.:
223
224
        `egon-data --database-name 'test-egon-data' serve`
225
226
    It is important that options (e.g. `--database-name`) are placed before
227
    commands (e.g. `serve`).
228
229
    For using a smaller dataset in the test mode, use the option
230
    `--dataset-boundary`. The complete command for starting Aiflow in test
231
    mode with using a separate database is
232
233
        `egon-data --database-name 'test-egon-data' --dataset-boundary
234
        'Schleswig-Holstein' serve`
235
236
    Whenever `egon-data` is executed, it searches for the configuration file
237
    "egon-data.configuration.yaml" in CWD. If that file doesn't exist,
238
    `egon-data` will create one, containing the command line parameters
239
    supplied, as well as the defaults for those switches for which no value
240
    was supplied.
241
    This means, run the above command that specifies a custom database once.
242
    Afterwards, it's sufficient to execute `egon-data serve` in the same
243
    directory and the same configuration will be used. You can also edit the
244
    configuration the file "egon-data.configuration.yaml" manually.
245
246
    Last but not least, if you're using the default behaviour of setting
247
    up the database in a Docker container, the working directory will
248
    also contain a directory called "docker", containing the database
249
    data as well as other volumes used by the "Docker"ed database.
250
251
    """
252
253
    # Adapted from the `merge_copy` implementation at:
254
    #
255
    #   https://stackoverflow.com/questions/29847098/the-best-way-to-merge-multi-nested-dictionaries-in-python-2-7
256
    #
257
    def merge(d1, d2):
258
        return {
259
            k: d1[k]
260
            if k in d1 and k not in d2
261
            else d2[k]
262
            if k not in d1 and k in d2
263
            else merge(d1[k], d2[k])
264
            if isinstance(d1[k], dict) and isinstance(d2[k], dict)
265
            else d2[k]
266
            for k in set(d1).union(d2)
267
        }
268
269
    def options(value, check=None):
270
        check = value if check is None else check
271
        flags = {p.opts[0]: value(p) for p in egon_data.params if check(p)}
272
        return {"egon-data": flags}
273
274
    options = {
275
        "cli": options(lambda o: kwargs[o.name], lambda o: o.name in kwargs),
276
        "defaults": options(lambda o: o.default),
277
    }
278
279
    # Fix: Convert 'scenarios' to list if it exists
280
    if "scenarios" in options["cli"]:
281
        options["cli"]["scenarios"] = list(options["cli"]["scenarios"])
282
283
    combined = merge(options["defaults"], options["cli"])
284
    if not config.paths()[0].exists():
285
        with open(config.paths()[0], "w") as f:
286
            f.write(yaml.safe_dump(combined))
287
    else:
288
        with open(config.paths()[0], "r") as f:
289
            stored = yaml.safe_load(f)
290
        with open(config.paths()[0], "w") as f:
291
            f.write(yaml.safe_dump(merge(combined, stored)))
292
293
    # Alternatively:
294
    #   `if config.paths(pid="*") != [config.paths(pid="current")]:`
295
    #   or compare file contents.
296
    if len(config.paths(pid="*")) > 1:
297
        logger.error(
298
            "Found more than one configuration file belonging to a"
299
            " specific `egon-data` process. Unable to decide which one"
300
            " to use.\nExiting."
301
        )
302
        sys.exit(1)
303
304
    if len(config.paths(pid="*")) == 1:
305
        logger.info(
306
            "Ignoring supplied options. Found a configuration file"
307
            " belonging to a different `egon-data` process. Using that"
308
            " one."
309
        )
310
        with open(config.paths(pid="*")[0]) as f:
311
            options = yaml.load(f, Loader=yaml.SafeLoader)
312
    else:  # len(config.paths(pid="*")) == 0, so need to create one.
313
314
        with open(config.paths()[0]) as f:
315
            options["file"] = yaml.load(f, Loader=yaml.SafeLoader)
316
317
        options = dict(
318
            options.get("file", {}),
319
            **{
320
                flag: options["file"][flag]
321
                for flag in options["cli"]
322
                if options["cli"][flag] != options["defaults"][flag]
323
            },
324
        )
325
326
        with open(config.paths(pid="current")[0], "w") as f:
327
            f.write(yaml.safe_dump(options))
328
329
    def render(template, target, update=True, inserts={}, **more_inserts):
330
        os.makedirs(target.parent, exist_ok=True)
331
        rendered = resources.read_text(egon.data.airflow, template).format(
332
            **dict(inserts, **more_inserts)
333
        )
334
        if not target.exists():
335
            with open(target, "w") as f:
336
                f.write(rendered)
337
        elif update:
338
            with open(target, "r") as f:
339
                old = f.read()
340
            if old != rendered:
341
                with open(target, "w") as f:
342
                    f.write(rendered)
343
344
    os.environ["AIRFLOW_HOME"] = str((Path(".") / "airflow").absolute())
345
346
    options = options["egon-data"]
347
    render(
348
        "airflow.cfg",
349
        Path(".") / "airflow" / "airflow.cfg",
350
        inserts=options,
351
        dags=str(resources.files(egon.data.airflow).absolute()),
352
    )
353
    render(
354
        "docker-compose.yml",
355
        Path(".") / "docker" / "docker-compose.yml",
356
        update=False,
357
        inserts=options,
358
        airflow=resources.files(egon.data.airflow),
359
        gid=os.getgid(),
360
        uid=os.getuid(),
361
    )
362
    (Path(".") / "docker" / "database-data").mkdir(parents=True, exist_ok=True)
363
364
    # Copy webserver_config.py to disable authentification on webinterface
365
    shutil.copy2(
366
        os.path.dirname(egon.data.airflow.__file__) + "/webserver_config.py",
367
        Path(".") / "airflow/webserver_config.py",
368
    )
369
370
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
371
        code = s.connect_ex(
372
            (options["--database-host"], int(options["--database-port"]))
373
        )
374
    if code != 0:
375
        subprocess.run(
376
            [
377
                "docker-compose",
378
                "-p",
379
                options["--compose-project-name"],
380
                "up",
381
                "-d",
382
                "--build",
383
            ],
384
            cwd=str((Path(".") / "docker").absolute()),
385
        )
386
        time.sleep(1.5)  # Give the container time to boot.
387
388
    # TODO: Since "AIRFLOW_HOME" needs to be set before importing `conf`, the
389
    #       import can only be done inside this function, which is generally
390
    #       frowned upon, instead of at the module level. Maybe there's a
391
    #       better way to encapsulate this?
392
    from airflow.configuration import conf as airflow_cfg
393
    from airflow.models import Connection
394
395
    engine = create_engine(
396
        (
397
            "postgresql+psycopg2://{--database-user}:{--database-password}"
398
            "@{--database-host}:{--database-port}"
399
            "/{--airflow-database-name}"
400
        ).format(**options),
401
        echo=False,
402
    )
403
    while True:  # Might still not be done booting. Poke it it's up.
404
        try:
405
            connection = engine.connect()
406
            break
407
        except PSPGOE:
408
            pass
409
        except SQLAOE:
410
            pass
411
    with connection.execution_options(
412
        isolation_level="AUTOCOMMIT"
413
    ) as connection:
414
        databases = [
415
            row[0]
416
            for row in connection.execute("SELECT datname FROM pg_database;")
417
        ]
418
        if not options["--database-name"] in databases:
419
            connection.execute(
420
                f'CREATE DATABASE "{options["--database-name"]}";'
421
            )
422
423
    subprocess.run(["airflow", "db", "init"])
424
425
    # TODO: Constrain SQLAlchemy's lower version to 1.4 and use a `with` block
426
    #       like the one in the last commented line to avoid an explicit
427
    #       `commit`. This can then also be used to get rid of the
428
    #       `egon.data.db.session_scope` context manager and use the new
429
    #       buil-in one instead. And we can migrate to the SQLA 2.0 query
430
    #       API.
431
    # with Session(engine) as airflow, airflow.begin():
432
    engine = create_engine(airflow_cfg.get("core", "SQL_ALCHEMY_CONN"))
433
    airflow = Session(engine)
434
    connection = (
435
        airflow.query(Connection).filter_by(conn_id="egon_data").one_or_none()
436
    )
437
    connection = connection if connection else Connection(conn_id="egon_data")
438
    connection.login = options["--database-user"]
439
    connection.password = options["--database-password"]
440
    connection.host = options["--database-host"]
441
    connection.port = options["--database-port"]
442
    connection.schema = options["--database-name"]
443
    connection.conn_type = "pgsql"
444
    airflow.add(connection)
445
    airflow.commit()
446
447
    # TODO: This should probably rather be done during the database
448
    #       initialization workflow task.
449
    from egon.data.datasets import setup
450
451
    setup()
452
453
454
@egon_data.command(
455
    add_help_option=False,
456
    context_settings=dict(allow_extra_args=True, ignore_unknown_options=True),
457
)
458
@click.pass_context
459
def airflow(context):
460
    subprocess.run(["airflow"] + context.args)
461
462
463
@egon_data.command(
464
    context_settings={
465
        "allow_extra_args": True,
466
        "help_option_names": ["-h", "--help"],
467
        "ignore_unknown_options": True,
468
    }
469
)
470
@click.pass_context
471
def serve(context):
472
    """Start the airflow webapp controlling the egon-data pipeline.
473
474
    Airflow needs, among other things, a metadata database and a running
475
    scheduler. This command acts as a shortcut, creating the database if it
476
    doesn't exist and starting the scheduler in the background before starting
477
    the webserver.
478
479
    Any OPTIONS other than `-h`/`--help` will be forwarded to
480
    `airflow webserver`, so you can for example specify an alternate port
481
    for the webapp to listen on via `egon-data serve -p PORT_NUMBER`.
482
    Find out more about the possible webapp options via:
483
484
        `egon-data airflow webserver --help`.
485
    """
486
    scheduler = Process(
487
        target=subprocess.run,
488
        args=(["airflow", "scheduler"],),
489
        kwargs=dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL),
490
    )
491
    scheduler.start()
492
    subprocess.run(["airflow", "webserver"] + context.args)
493
494
495
def main():
496
    try:
497
        egon_data.main(sys.argv[1:])
498
    finally:
499
        try:
500
            config.paths(pid="current")[0].unlink()
501
        except FileNotFoundError:
502
            pass
503