Passed
Pull Request — dev (#1138)
by
unknown
02:19
created

motorized_individual_travel   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 476
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 19
eloc 242
dl 0
loc 476
rs 10
c 0
b 0
f 0
1
"""
2
Main module for preparation of model data (static and timeseries) for
3
motorized individual travel (MIT).
4
5
"""
6
7
from pathlib import Path
8
from urllib.request import urlretrieve
9
import os
10
import tarfile
11
12
from airflow.operators.python_operator import PythonOperator
13
from psycopg2.extensions import AsIs, register_adapter
14
import numpy as np
15
import pandas as pd
16
17
from egon.data import db, subprocess
18
from egon.data.datasets import Dataset
19
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import (  # noqa: E501
20
    EgonEvCountMunicipality,
21
    EgonEvCountMvGridDistrict,
22
    EgonEvCountRegistrationDistrict,
23
    EgonEvMetadata,
24
    EgonEvMvGridDistrict,
25
    EgonEvPool,
26
    EgonEvTrip,
27
)
28
from egon.data.datasets.emobility.motorized_individual_travel.ev_allocation import (  # noqa: E501
29
    allocate_evs_numbers,
30
    allocate_evs_to_grid_districts,
31
)
32
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
33
    COLUMNS_KBA,
34
    DATA_BUNDLE_DIR,
35
    DATASET_CFG,
36
    MVGD_MIN_COUNT,
37
    TESTMODE_OFF,
38
    TRIP_COLUMN_MAPPING,
39
    WORKING_DIR,
40
)
41
from egon.data.datasets.emobility.motorized_individual_travel.model_timeseries import (  # noqa: E501
42
    delete_model_data_from_db,
43
    generate_model_data_bunch,
44
    generate_model_data_eGon100RE_remaining,
45
    generate_model_data_eGon2035_remaining,
46
    read_simbev_metadata_file,
47
)
48
49
50
# ========== Register np datatypes with SQLA ==========
51
def adapt_numpy_float64(numpy_float64):
52
    return AsIs(numpy_float64)
53
54
55
def adapt_numpy_int64(numpy_int64):
56
    return AsIs(numpy_int64)
57
58
59
register_adapter(np.float64, adapt_numpy_float64)
60
register_adapter(np.int64, adapt_numpy_int64)
61
# =====================================================
62
63
64
def create_tables():
65
    """Create tables for electric vehicles
66
67
    Returns
68
    -------
69
    None
70
    """
71
72
    engine = db.engine()
73
    EgonEvCountRegistrationDistrict.__table__.drop(
74
        bind=engine, checkfirst=True
75
    )
76
    EgonEvCountRegistrationDistrict.__table__.create(
77
        bind=engine, checkfirst=True
78
    )
79
    EgonEvCountMunicipality.__table__.drop(bind=engine, checkfirst=True)
80
    EgonEvCountMunicipality.__table__.create(bind=engine, checkfirst=True)
81
    EgonEvCountMvGridDistrict.__table__.drop(bind=engine, checkfirst=True)
82
    EgonEvCountMvGridDistrict.__table__.create(bind=engine, checkfirst=True)
83
    EgonEvPool.__table__.drop(bind=engine, checkfirst=True)
84
    EgonEvPool.__table__.create(bind=engine, checkfirst=True)
85
    EgonEvTrip.__table__.drop(bind=engine, checkfirst=True)
86
    EgonEvTrip.__table__.create(bind=engine, checkfirst=True)
87
    EgonEvMvGridDistrict.__table__.drop(bind=engine, checkfirst=True)
88
    EgonEvMvGridDistrict.__table__.create(bind=engine, checkfirst=True)
89
    EgonEvMetadata.__table__.drop(bind=engine, checkfirst=True)
90
    EgonEvMetadata.__table__.create(bind=engine, checkfirst=True)
91
92
    # Create dir for results, if it does not exist
93
    result_dir = WORKING_DIR / Path("results")
94
    result_dir.mkdir(exist_ok=True, parents=True)
95
96
97
def download_and_preprocess():
98
    """Downloads and preprocesses data from KBA and BMVI
99
100
    Returns
101
    -------
102
    pandas.DataFrame
103
        Vehicle registration data for registration district
104
    pandas.DataFrame
105
        RegioStaR7 data
106
    """
107
108
    mit_sources = DATASET_CFG["original_data"]["sources"]
109
110
    # Create the folder, if it does not exist
111
    if not os.path.exists(WORKING_DIR):
112
        os.mkdir(WORKING_DIR)
113
114
    ################################
115
    # Download and import KBA data #
116
    ################################
117
    url = mit_sources["KBA"]["url"]
118
    file = WORKING_DIR / mit_sources["KBA"]["file"]
119
    if not os.path.isfile(file):
120
        urlretrieve(url, file)
121
122
    kba_data = pd.read_excel(
123
        file,
124
        sheet_name=mit_sources["KBA"]["sheet"],
125
        usecols=mit_sources["KBA"]["columns"],
126
        skiprows=mit_sources["KBA"]["skiprows"],
127
    )
128
    kba_data.columns = COLUMNS_KBA
129
    kba_data.replace(
130
        " ",
131
        np.nan,
132
        inplace=True,
133
    )
134
    kba_data = kba_data.dropna()
135
    kba_data[
136
        ["ags_reg_district", "reg_district"]
137
    ] = kba_data.reg_district.str.split(
138
        " ",
139
        1,
140
        expand=True,
141
    )
142
    kba_data.ags_reg_district = kba_data.ags_reg_district.astype("int")
143
144
    kba_data.to_csv(
145
        WORKING_DIR / mit_sources["KBA"]["file_processed"], index=None
146
    )
147
148
    #######################################
149
    # Download and import RegioStaR7 data #
150
    #######################################
151
152
    url = mit_sources["RS7"]["url"]
153
    file = WORKING_DIR / mit_sources["RS7"]["file"]
154
    if not os.path.isfile(file):
155
        urlretrieve(url, file)
156
157
    rs7_data = pd.read_excel(file, sheet_name=mit_sources["RS7"]["sheet"])
158
159
    rs7_data["ags_district"] = (
160
        rs7_data.gem_20.multiply(1 / 1000).apply(np.floor).astype("int")
161
    )
162
    rs7_data = rs7_data.rename(
163
        columns={"gem_20": "ags", "RegioStaR7": "rs7_id"}
164
    )
165
    rs7_data.rs7_id = rs7_data.rs7_id.astype("int")
166
167
    rs7_data.to_csv(
168
        WORKING_DIR / mit_sources["RS7"]["file_processed"], index=None
169
    )
170
171
172
def extract_trip_file():
173
    """Extract trip file from data bundle"""
174
    trip_dir = DATA_BUNDLE_DIR / Path("mit_trip_data")
175
176
    for scenario_name in ["eGon2035", "eGon100RE"]:
177
        print(f"SCENARIO: {scenario_name}")
178
        trip_file = trip_dir / Path(
179
            DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][
180
                "file"
181
            ]
182
        )
183
184
        tar = tarfile.open(trip_file)
185
        if os.path.isfile(trip_file):
186
            tar.extractall(trip_dir)
187
        else:
188
            raise FileNotFoundError(
189
                f"Trip file {trip_file} not found in data bundle."
190
            )
191
192
193
def write_evs_trips_to_db():
194
    """Write EVs and trips generated by simBEV from data bundle to database
195
    table
196
    """
197
198
    def import_csv(f):
199
        df = pd.read_csv(f, usecols=TRIP_COLUMN_MAPPING.keys())
200
        df["rs7_id"] = int(f.parent.name)
201
        df["simbev_ev_id"] = "_".join(f.name.split("_")[0:3])
202
        return df
203
204
    for scenario_name in ["eGon2035", "eGon100RE"]:
205
        print(f"SCENARIO: {scenario_name}")
206
        trip_dir_name = Path(
207
            DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][
208
                "file"
209
            ].split(".")[0]
210
        )
211
212
        trip_dir_root = DATA_BUNDLE_DIR / Path("mit_trip_data", trip_dir_name)
213
214
        if TESTMODE_OFF:
215
            trip_files = list(trip_dir_root.glob("*/*.csv"))
216
        else:
217
            # Load only 1000 EVs per region if in test mode
218
            trip_files = [
219
                list(rdir.glob("*.csv"))[:1000]
220
                for rdir in [_ for _ in trip_dir_root.iterdir() if _.is_dir()]
221
            ]
222
            # Flatten
223
            trip_files = [i for sub in trip_files for i in sub]
224
225
        # Read, concat and reorder cols
226
        print(f"Importing {len(trip_files)} EV trip CSV files...")
227
        trip_data = pd.concat(map(import_csv, trip_files))
228
        trip_data.rename(columns=TRIP_COLUMN_MAPPING, inplace=True)
229
        trip_data = trip_data.reset_index().rename(
230
            columns={"index": "simbev_event_id"}
231
        )
232
        cols = ["rs7_id", "simbev_ev_id", "simbev_event_id"] + list(
233
            TRIP_COLUMN_MAPPING.values()
234
        )
235
        trip_data.index.name = "event_id"
236
        trip_data = trip_data[cols]
237
238
        # Extract EVs from trips
239
        evs_unique = trip_data[["rs7_id", "simbev_ev_id"]].drop_duplicates()
240
        evs_unique = evs_unique.reset_index().drop(columns=["event_id"])
241
        evs_unique.index.name = "ev_id"
242
243
        # Add EV id to trip DF
244
        trip_data["egon_ev_pool_ev_id"] = pd.merge(
245
            trip_data, evs_unique.reset_index(), on=["rs7_id", "simbev_ev_id"]
246
        )["ev_id"]
247
248
        # Split simBEV id into type and id
249
        evs_unique[["type", "simbev_ev_id"]] = evs_unique[
250
            "simbev_ev_id"
251
        ].str.rsplit("_", 1, expand=True)
252
        evs_unique.simbev_ev_id = evs_unique.simbev_ev_id.astype(int)
253
        evs_unique["scenario"] = scenario_name
254
255
        trip_data.drop(columns=["rs7_id", "simbev_ev_id"], inplace=True)
256
        trip_data["scenario"] = scenario_name
257
        trip_data.sort_index(inplace=True)
258
259
        # Write EVs to DB
260
        print("Writing EVs to DB pool...")
261
        evs_unique.to_sql(
262
            name=EgonEvPool.__table__.name,
263
            schema=EgonEvPool.__table__.schema,
264
            con=db.engine(),
265
            if_exists="append",
266
            index=True,
267
        )
268
269
        # Write trips to CSV and import to DB
270
        print("Writing EV trips to CSV file...")
271
        trip_file = WORKING_DIR / f"trip_data_{scenario_name}.csv"
272
        trip_data.to_csv(trip_file)
273
274
        # Get DB config
275
        docker_db_config = db.credentials()
276
        host = ["-h", f"{docker_db_config['HOST']}"]
277
        port = ["-p", f"{docker_db_config['PORT']}"]
278
        pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
279
        user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
280
        command = [
281
            "-c",
282
            rf"\copy {EgonEvTrip.__table__.schema}.{EgonEvTrip.__table__.name}"
283
            rf"({','.join(trip_data.reset_index().columns)})"
284
            rf" FROM '{str(trip_file)}' DELIMITER ',' CSV HEADER;",
285
        ]
286
287
        print("Importing EV trips from CSV file to DB...")
288
        subprocess.run(
289
            ["psql"] + host + port + pgdb + user + command,
290
            env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
291
        )
292
293
        os.remove(trip_file)
294
295
296
def write_metadata_to_db():
297
    """
298
    Write used SimBEV metadata per scenario to database.
299
    """
300
    dtypes = {
301
        "scenario": str,
302
        "eta_cp": float,
303
        "stepsize": int,
304
        "start_date": np.datetime64,
305
        "end_date": np.datetime64,
306
        "soc_min": float,
307
        "grid_timeseries": bool,
308
        "grid_timeseries_by_usecase": bool,
309
    }
310
311
    for scenario_name in ["eGon2035", "eGon100RE"]:
312
        meta_run_config = read_simbev_metadata_file(
313
            scenario_name, "config"
314
        ).loc["basic"]
315
316
        meta_run_config = (
317
            meta_run_config.to_frame()
318
            .T.assign(scenario=scenario_name)[dtypes.keys()]
319
            .astype(dtypes)
320
        )
321
322
        meta_run_config.to_sql(
323
            name=EgonEvMetadata.__table__.name,
324
            schema=EgonEvMetadata.__table__.schema,
325
            con=db.engine(),
326
            if_exists="append",
327
            index=False,
328
        )
329
330
331
class MotorizedIndividualTravel(Dataset):
332
    """
333
    Class for model data (static and timeseries) for
334
    motorized individual travel (MIT).
335
336
    **Contents of this module**
337
      * Creation of DB tables
338
      * Download and preprocessing of vehicle registration data from KBA and BMVI
339
      * Calculate number of electric vehicles and allocate on different spatial \
340
        levels. See :py:mod:`egon.data.metadata`
341
      * Extract and write pre-generated trips to DB
342
343
    **Configuration**
344
345
    The config of this dataset can be found in *datasets.yml* in section
346
    *emobility_mit*.
347
348
    **Scenarios and variations**
349
350
    * Scenario overview
351
    * Change scenario variation for 2050: adjust in
352
      emobility_mit->scenario->variation->eGon100RE
353
354
    **Trip data**
355
356
    The electric vehicles' trip data for each scenario have been generated using
357
    `simBEV <https://github.com/rl-institut/simbev/>`_. The methodical background
358
    is given in its `documentation <https://simbev.readthedocs.io>`_.
359
360
    6 different vehicle types are used:
361
      * Battery Electric Vehicle (BEV): mini, medium, luxury
362
      * Plug-in Hybrid Electric Vehicle (PHEV): mini, medium, luxury
363
364
    .. csv-table:: EV types
365
        :header: "Technology", "Size", "Max. charging capacity slow [kW]", \
366
                 "Max. charging capacity fast [kW]", "Battery capacity [kWh]", \
367
                 "Energy consumption [kWh/km]"
368
        :widths: 10, 10, 30, 30, 25, 30
369
370
        "BEV", "mini", 11, 120, 60, 0.1397
371
        "BEV", "medium", 22, 350, 90, 0.1746
372
        "BEV", "luxury", 50, 350, 110, 0.2096
373
        "PHEV", "mini", 3.7, 40, 14, 0.1425
374
        "PHEV", "medium", 11, 40, 20, 0.1782
375
        "PHEV", "luxury", 11, 120, 30, 0.2138
376
377
    The complete tech data and assumptions of the run can be found in the metadata:
378
    *<WORKING_DIRECTORY>/data_bundle_egon_data/emobility/mit_trip_data/<SCENARIO>/
379
    metadata_simbev_run.json*.efficiency_fixed
380
381
    * explain scenario parameters
382
383
    * run params (all in meta file?)
384
385
    **EV allocation**
386
387
    The EVs per registration district (Zulassungsbezirk) is taken from KBA's
388
    vehicle registration data. The numbers per EV type (BEV and PHEV)
389
390
    * RegioStaR7
391
    * scenario parameters: shares
392
393
    **Further notes**
394
395
    * Sanity checks
396
397
398
    *Dependencies*
399
      * :py:class:`DataBundle <egon.data.datasets.data_bundle.DataBundle>`
400
      * :py:class:`MvGridDistricts
401
        <egon.data.datasets.mv_grid_districts.mv_grid_districts_setup>`
402
      * :py:class:`ScenarioParameters
403
        <egon.data.datasets.scenario_parameters.ScenarioParameters>`
404
      * :py:class:`EtragoSetup <egon.data.datasets.etrago_setup.EtragoSetup>`
405
      * :py:class:`ZensusMvGridDistricts
406
        <egon.data.datasets.zensus_mv_grid_districts.ZensusMvGridDistricts>`
407
      * :py:class:`ZensusVg250 <egon.data.datasets.zensus_vg250.ZensusVg250>`
408
      * :py:class:`StorageEtrago <egon.data.datasets.storages_etrago.StorageEtrago>`
409
      * :py:class:`HtsEtragoTable
410
        <egon.data.datasets.heat_etrago.hts_etrago.HtsEtragoTable>`
411
      * :py:class:`ChpEtrago <egon.data.datasets.chp_etrago.ChpEtrago>`
412
      * :py:class:`DsmPotential <egon.data.datasets.DSM_cts_ind.DsmPotential>`
413
      * :py:class:`HeatEtrago <egon.data.datasets.heat_etrago.HeatEtrago>`
414
      * :py:class:`Egon_etrago_gen <egon.data.datasets.fill_etrago_gen.Egon_etrago_gen>`
415
      * :py:class:`OpenCycleGasTurbineEtrago
416
        <egon.data.datasets.power_etrago.OpenCycleGasTurbineEtrago>`
417
      * :py:class:`HydrogenStoreEtrago
418
        <egon.data.datasets.hydrogen_etrago.HydrogenStoreEtrago>`
419
      * :py:class:`HydrogenPowerLinkEtrago
420
        <egon.data.datasets.hydrogen_etrago.HydrogenPowerLinkEtrago>`
421
      * :py:class:`HydrogenMethaneLinkEtrago
422
        <egon.data.datasets.hydrogen_etrago.HydrogenMethaneLinkEtrago>`
423
      * :py:class:`GasAreaseGon100RE <egon.data.datasets.gas_areas.GasAreaseGon100RE>`
424
      * :py:class:`CH4Production <egon.data.datasets.ch4_prod.CH4Production>`
425
      * :py:class:`CH4Storages <egon.data.datasets.ch4_storages.CH4Storages>`
426
427
    *Resulting Tables*
428
      * :py:class:`EgonEvPool <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvPool>`
429
        is created and filled
430
      * :py:class:`EgonEvTrip <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvTrip>`
431
        is created and filled
432
      * :py:class:`EgonEvCountRegistrationDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountRegistrationDistrict>`
433
        is created and filled
434
      * :py:class:`EgonEvCountMunicipality <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountMunicipality>`
435
        is created and filled
436
      * :py:class:`EgonEvCountMvGridDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountMvGridDistrict>`
437
        is created and filled
438
      * :py:class:`EgonEvMvGridDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvMvGridDistrict>`
439
        is created and filled
440
      * :py:class:`EgonEvMetadata <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvMetadata>`
441
        is created and filled
442
443
    """
444
445
    #:
446
    name: str = "MotorizedIndividualTravel"
447
    #:
448
    version: str = "0.0.7"
449
450
    def __init__(self, dependencies):
451
        def generate_model_data_tasks(scenario_name):
452
            """Dynamically generate tasks for model data creation.
453
454
            The goal is to speed up the creation of model timeseries. However,
455
            the exact number of parallel task cannot be determined during the
456
            DAG building as the number of grid districts (MVGD) is calculated
457
            within another pipeline task.
458
            Approach: assuming an approx. count of `mvgd_min_count` of 3700,
459
            the majority of the MVGDs can be parallelized. The remainder is
460
            handled subsequently in XXX.
461
            The number of parallel tasks is defined via parameter
462
            `parallel_tasks` in the dataset config `datasets.yml`.
463
464
            Parameters
465
            ----------
466
            scenario_name : str
467
                Scenario name
468
469
            Returns
470
            -------
471
            set of functools.partial
472
                The tasks. Each element is of
473
                :func:`egon.data.datasets.emobility.motorized_individual_travel.model_timeseries.generate_model_data`
474
            """
475
            parallel_tasks = DATASET_CFG["model_timeseries"].get(
476
                "parallel_tasks", 1
477
            )
478
            mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0]
479
480
            tasks = set()
481
            for _ in range(parallel_tasks):
482
                bunch = range(_ * mvgd_bunch_size, (_ + 1) * mvgd_bunch_size)
483
                tasks.add(
484
                    PythonOperator(
485
                        task_id=(
486
                            f"generate_model_data_"
487
                            f"{scenario_name}_"
488
                            f"bunch{bunch[0]}-{bunch[-1]}"
489
                        ),
490
                        python_callable=generate_model_data_bunch,
491
                        op_kwargs={
492
                            "scenario_name": scenario_name,
493
                            "bunch": bunch,
494
                        },
495
                    )
496
                )
497
498
            if scenario_name == "eGon2035":
499
                tasks.add(generate_model_data_eGon2035_remaining)
500
            elif scenario_name == "eGon100RE":
501
                tasks.add(generate_model_data_eGon100RE_remaining)
502
            return tasks
503
504
        super().__init__(
505
            name=self.name,
506
            version=self.version,
507
            dependencies=dependencies,
508
            tasks=(
509
                create_tables,
510
                {
511
                    (
512
                        download_and_preprocess,
513
                        allocate_evs_numbers,
514
                    ),
515
                    (
516
                        extract_trip_file,
517
                        write_metadata_to_db,
518
                        write_evs_trips_to_db,
519
                    ),
520
                },
521
                allocate_evs_to_grid_districts,
522
                delete_model_data_from_db,
523
                {
524
                    *generate_model_data_tasks(scenario_name="eGon2035"),
525
                    *generate_model_data_tasks(scenario_name="eGon100RE"),
526
                },
527
            ),
528
        )
529