Passed
Pull Request — dev (#978)
by
unknown
01:53
created

download_and_preprocess()   B

Complexity

Conditions 4

Size

Total Lines 72
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 40
dl 0
loc 72
rs 8.92
c 0
b 0
f 0
cc 4
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Motorized Individual Travel (MIT)
3
4
Main module for preparation of model data (static and timeseries) for
5
motorized individual travel.
6
7
**Contents of this module**
8
* Creation of DB tables
9
* Download and preprocessing of vehicle registration data from KBA and BMVI
10
* Calculate number of electric vehicles and allocate on different spatial
11
  levels. See :py:mod:`egon.data.metadata`
12
* Extract and write pre-generated trips to DB
13
14
**Configuration**
15
16
The config of this dataset can be found in *datasets.yml* in section
17
*emobility_mit*.
18
19
**Scenarios and variations**
20
21
* Scenario overview
22
* Change scenario variation for 2050: adjust in
23
emobility_mit->scenario->variation->eGon100RE
24
25
**Trip data**
26
27
The electric vehicles' trip data for each scenario have been generated using
28
`simBEV <https://github.com/rl-institut/simbev/>`_. The methodical background
29
is given in its `documentation <https://simbev.readthedocs.io>`_.
30
31
6 different vehicle types are used:
32
* Battery Electric Vehicle (BEV): mini, medium, luxury
33
* Plug-in Hybrid Electric Vehicle (PHEV): mini, medium, luxury
34
35
.. csv-table:: EV types
36
    :header: "Tecnnology", "Size", "Max. charging capacity slow [kW]",
37
             "Max. charging capacity fast [kW]", "Battery capacity [kWh]",
38
             "Energy consumption [kWh/km]"
39
    :widths: 10, 10, 30, 30, 25, 30
40
41
    "BEV", "mini", 11, 120, 60, 0.1397
42
    "BEV", "medium", 22, 350, 90, 0.1746
43
    "BEV", "luxury", 50, 350, 110, 0.2096
44
    "PHEV", "mini", 3.7, 40, 14, 0.1425
45
    "PHEV", "medium", 11, 40, 20, 0.1782
46
    "PHEV", "luxury", 11, 120, 30, 0.2138
47
48
The complete tech data and assumptions of the run can be found in the metadata:
49
*<WORKING_DIRECTORY>/data_bundle_egon_data/emobility/mit_trip_data/<SCENARIO>/
50
metadata_simbev_run.json*.efficiency_fixed
51
52
* explain scenario parameters
53
54
* run params (all in meta file?)
55
56
**EV allocation**
57
58
The EVs per registration district (Zulassungsbezirk) is taken from KBA's
59
vehicle registration data. The numbers per EV type (BEV and PHEV)
60
61
* RegioStaR7
62
* scenario parameters: shares
63
64
**Further notes**
65
66
* Sanity checks
67
68
**Model paametrization**
69
70
**Example queries**
71
72
73
"""
74
75
from pathlib import Path
76
from urllib.request import urlretrieve
77
import os
78
import tarfile
79
80
from airflow.operators.python_operator import PythonOperator
81
from psycopg2.extensions import AsIs, register_adapter
82
import numpy as np
83
import pandas as pd
84
85
from egon.data import db, subprocess
86
from egon.data.datasets import Dataset
87
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import (  # noqa: E501
88
    EgonEvCountMunicipality,
89
    EgonEvCountMvGridDistrict,
90
    EgonEvCountRegistrationDistrict,
91
    EgonEvMetadata,
92
    EgonEvMvGridDistrict,
93
    EgonEvPool,
94
    EgonEvTrip,
95
)
96
from egon.data.datasets.emobility.motorized_individual_travel.ev_allocation import (  # noqa: E501
97
    allocate_evs_numbers,
98
    allocate_evs_to_grid_districts,
99
)
100
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
101
    COLUMNS_KBA,
102
    DATA_BUNDLE_DIR,
103
    DATASET_CFG,
104
    MVGD_MIN_COUNT,
105
    TESTMODE_OFF,
106
    TRIP_COLUMN_MAPPING,
107
    WORKING_DIR,
108
)
109
from egon.data.datasets.emobility.motorized_individual_travel.model_timeseries import (  # noqa: E501
110
    delete_model_data_from_db,
111
    generate_model_data_bunch,
112
    generate_model_data_eGon100RE_remaining,
113
    generate_model_data_eGon2035_remaining,
114
    read_simbev_metadata_file,
115
)
116
117
118
# ========== Register np datatypes with SQLA ==========
119
def adapt_numpy_float64(numpy_float64):
120
    return AsIs(numpy_float64)
121
122
123
def adapt_numpy_int64(numpy_int64):
124
    return AsIs(numpy_int64)
125
126
127
register_adapter(np.float64, adapt_numpy_float64)
128
register_adapter(np.int64, adapt_numpy_int64)
129
# =====================================================
130
131
132
def create_tables():
133
    """Create tables for electric vehicles
134
135
    Returns
136
    -------
137
    None
138
    """
139
140
    engine = db.engine()
141
    EgonEvCountRegistrationDistrict.__table__.drop(
142
        bind=engine, checkfirst=True
143
    )
144
    EgonEvCountRegistrationDistrict.__table__.create(
145
        bind=engine, checkfirst=True
146
    )
147
    EgonEvCountMunicipality.__table__.drop(bind=engine, checkfirst=True)
148
    EgonEvCountMunicipality.__table__.create(bind=engine, checkfirst=True)
149
    EgonEvCountMvGridDistrict.__table__.drop(bind=engine, checkfirst=True)
150
    EgonEvCountMvGridDistrict.__table__.create(bind=engine, checkfirst=True)
151
    EgonEvPool.__table__.drop(bind=engine, checkfirst=True)
152
    EgonEvPool.__table__.create(bind=engine, checkfirst=True)
153
    EgonEvTrip.__table__.drop(bind=engine, checkfirst=True)
154
    EgonEvTrip.__table__.create(bind=engine, checkfirst=True)
155
    EgonEvMvGridDistrict.__table__.drop(bind=engine, checkfirst=True)
156
    EgonEvMvGridDistrict.__table__.create(bind=engine, checkfirst=True)
157
    EgonEvMetadata.__table__.drop(bind=engine, checkfirst=True)
158
    EgonEvMetadata.__table__.create(bind=engine, checkfirst=True)
159
160
    # Create dir for results, if it does not exist
161
    result_dir = WORKING_DIR / Path("results")
162
    result_dir.mkdir(exist_ok=True, parents=True)
163
164
165
def download_and_preprocess():
166
    """Downloads and preprocesses data from KBA and BMVI
167
168
    Returns
169
    -------
170
    pandas.DataFrame
171
        Vehicle registration data for registration district
172
    pandas.DataFrame
173
        RegioStaR7 data
174
    """
175
176
    mit_sources = DATASET_CFG["original_data"]["sources"]
177
178
    # Create the folder, if it does not exist
179
    if not os.path.exists(WORKING_DIR):
180
        os.mkdir(WORKING_DIR)
181
182
    ################################
183
    # Download and import KBA data #
184
    ################################
185
    url = mit_sources["KBA"]["url"]
186
    file = WORKING_DIR / mit_sources["KBA"]["file"]
187
    if not os.path.isfile(file):
188
        urlretrieve(url, file)
189
190
    kba_data = pd.read_excel(
191
        file,
192
        sheet_name=mit_sources["KBA"]["sheet"],
193
        usecols=mit_sources["KBA"]["columns"],
194
        skiprows=mit_sources["KBA"]["skiprows"],
195
    )
196
    kba_data.columns = COLUMNS_KBA
197
    kba_data.replace(
198
        " ",
199
        np.nan,
200
        inplace=True,
201
    )
202
    kba_data = kba_data.dropna()
203
    kba_data[
204
        ["ags_reg_district", "reg_district"]
205
    ] = kba_data.reg_district.str.split(
206
        " ",
207
        1,
208
        expand=True,
209
    )
210
    kba_data.ags_reg_district = kba_data.ags_reg_district.astype("int")
211
212
    kba_data.to_csv(
213
        WORKING_DIR / mit_sources["KBA"]["file_processed"], index=None
214
    )
215
216
    #######################################
217
    # Download and import RegioStaR7 data #
218
    #######################################
219
220
    url = mit_sources["RS7"]["url"]
221
    file = WORKING_DIR / mit_sources["RS7"]["file"]
222
    if not os.path.isfile(file):
223
        urlretrieve(url, file)
224
225
    rs7_data = pd.read_excel(file, sheet_name=mit_sources["RS7"]["sheet"])
226
227
    rs7_data["ags_district"] = (
228
        rs7_data.gem_20.multiply(1 / 1000).apply(np.floor).astype("int")
229
    )
230
    rs7_data = rs7_data.rename(
231
        columns={"gem_20": "ags", "RegioStaR7": "rs7_id"}
232
    )
233
    rs7_data.rs7_id = rs7_data.rs7_id.astype("int")
234
235
    rs7_data.to_csv(
236
        WORKING_DIR / mit_sources["RS7"]["file_processed"], index=None
237
    )
238
239
240
def extract_trip_file():
241
    """Extract trip file from data bundle"""
242
    trip_dir = DATA_BUNDLE_DIR / Path("mit_trip_data")
243
244
    for scenario_name in ["eGon2035", "eGon100RE"]:
245
        print(f"SCENARIO: {scenario_name}")
246
        trip_file = trip_dir / Path(
247
            DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][
248
                "file"
249
            ]
250
        )
251
252
        tar = tarfile.open(trip_file)
253
        if os.path.isfile(trip_file):
254
            tar.extractall(trip_dir)
255
        else:
256
            raise FileNotFoundError(
257
                f"Trip file {trip_file} not found in data bundle."
258
            )
259
260
261
def write_evs_trips_to_db():
262
    """Write EVs and trips generated by simBEV from data bundle to database
263
    table
264
    """
265
266
    def import_csv(f):
267
        df = pd.read_csv(f, usecols=TRIP_COLUMN_MAPPING.keys())
268
        df["rs7_id"] = int(f.parent.name)
269
        df["simbev_ev_id"] = "_".join(f.name.split("_")[0:3])
270
        return df
271
272
    for scenario_name in ["eGon2035", "eGon100RE"]:
273
        print(f"SCENARIO: {scenario_name}")
274
        trip_dir_name = Path(
275
            DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][
276
                "file"
277
            ].split(".")[0]
278
        )
279
280
        trip_dir_root = DATA_BUNDLE_DIR / Path("mit_trip_data", trip_dir_name)
281
282
        if TESTMODE_OFF:
283
            trip_files = list(trip_dir_root.glob("*/*.csv"))
284
        else:
285
            # Load only 1000 EVs per region if in test mode
286
            trip_files = [
287
                list(rdir.glob("*.csv"))[:1000]
288
                for rdir in [_ for _ in trip_dir_root.iterdir() if _.is_dir()]
289
            ]
290
            # Flatten
291
            trip_files = [i for sub in trip_files for i in sub]
292
293
        # Read, concat and reorder cols
294
        print(f"Importing {len(trip_files)} EV trip CSV files...")
295
        trip_data = pd.concat(map(import_csv, trip_files))
296
        trip_data.rename(columns=TRIP_COLUMN_MAPPING, inplace=True)
297
        trip_data = trip_data.reset_index().rename(
298
            columns={"index": "simbev_event_id"}
299
        )
300
        cols = ["rs7_id", "simbev_ev_id", "simbev_event_id"] + list(
301
            TRIP_COLUMN_MAPPING.values()
302
        )
303
        trip_data.index.name = "event_id"
304
        trip_data = trip_data[cols]
305
306
        # Extract EVs from trips
307
        evs_unique = trip_data[["rs7_id", "simbev_ev_id"]].drop_duplicates()
308
        evs_unique = evs_unique.reset_index().drop(columns=["event_id"])
309
        evs_unique.index.name = "ev_id"
310
311
        # Add EV id to trip DF
312
        trip_data["egon_ev_pool_ev_id"] = pd.merge(
313
            trip_data, evs_unique.reset_index(), on=["rs7_id", "simbev_ev_id"]
314
        )["ev_id"]
315
316
        # Split simBEV id into type and id
317
        evs_unique[["type", "simbev_ev_id"]] = evs_unique[
318
            "simbev_ev_id"
319
        ].str.rsplit("_", 1, expand=True)
320
        evs_unique.simbev_ev_id = evs_unique.simbev_ev_id.astype(int)
321
        evs_unique["scenario"] = scenario_name
322
323
        trip_data.drop(columns=["rs7_id", "simbev_ev_id"], inplace=True)
324
        trip_data["scenario"] = scenario_name
325
        trip_data.sort_index(inplace=True)
326
327
        # Write EVs to DB
328
        print("Writing EVs to DB pool...")
329
        evs_unique.to_sql(
330
            name=EgonEvPool.__table__.name,
331
            schema=EgonEvPool.__table__.schema,
332
            con=db.engine(),
333
            if_exists="append",
334
            index=True,
335
        )
336
337
        # Write trips to CSV and import to DB
338
        print("Writing EV trips to CSV file...")
339
        trip_file = WORKING_DIR / f"trip_data_{scenario_name}.csv"
340
        trip_data.to_csv(trip_file)
341
342
        # Get DB config
343
        docker_db_config = db.credentials()
344
        host = ["-h", f"{docker_db_config['HOST']}"]
345
        port = ["-p", f"{docker_db_config['PORT']}"]
346
        pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
347
        user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
348
        command = [
349
            "-c",
350
            rf"\copy {EgonEvTrip.__table__.schema}.{EgonEvTrip.__table__.name}"
351
            rf"({','.join(trip_data.reset_index().columns)})"
352
            rf" FROM '{str(trip_file)}' DELIMITER ',' CSV HEADER;",
353
        ]
354
355
        print("Importing EV trips from CSV file to DB...")
356
        subprocess.run(
357
            ["psql"] + host + port + pgdb + user + command,
358
            env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
359
        )
360
361
        os.remove(trip_file)
362
363
364
def write_metadata_to_db():
365
    """
366
    Write used SimBEV metadata per scenario to database.
367
    """
368
    dtypes = {
369
        "scenario": str,
370
        "eta_cp": float,
371
        "stepsize": int,
372
        "start_date": np.datetime64,
373
        "end_date": np.datetime64,
374
        "soc_min": float,
375
        "grid_timeseries": bool,
376
        "grid_timeseries_by_usecase": bool,
377
    }
378
379
    for scenario_name in ["eGon2035", "eGon100RE"]:
380
        meta_run_config = read_simbev_metadata_file(
381
            scenario_name, "config"
382
        ).loc["basic"]
383
384
        meta_run_config = (
385
            meta_run_config.to_frame()
386
            .T.assign(scenario=scenario_name)[dtypes.keys()]
387
            .astype(dtypes)
388
        )
389
390
        meta_run_config.to_sql(
391
            name=EgonEvMetadata.__table__.name,
392
            schema=EgonEvMetadata.__table__.schema,
393
            con=db.engine(),
394
            if_exists="append",
395
            index=False,
396
        )
397
398
399
class MotorizedIndividualTravel(Dataset):
400
    def __init__(self, dependencies):
401
        def generate_model_data_tasks(scenario_name):
402
            """Dynamically generate tasks for model data creation.
403
404
            The goal is to speed up the creation of model timeseries. However,
405
            the exact number of parallel task cannot be determined during the
406
            DAG building as the number of grid districts (MVGD) is calculated
407
            within another pipeline task.
408
            Approach: assuming an approx. count of `mvgd_min_count` of 3700,
409
            the majority of the MVGDs can be parallelized. The remainder is
410
            handled subsequently in XXX.
411
            The number of parallel tasks is defined via parameter
412
            `parallel_tasks` in the dataset config `datasets.yml`.
413
414
            Parameters
415
            ----------
416
            scenario_name : str
417
                Scenario name
418
419
            Returns
420
            -------
421
            set of functools.partial
422
                The tasks. Each element is of
423
                :func:`egon.data.datasets.emobility.motorized_individual_travel.model_timeseries.generate_model_data`
424
            """
425
            parallel_tasks = DATASET_CFG["model_timeseries"].get(
426
                "parallel_tasks", 1
427
            )
428
            mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0]
429
430
            tasks = set()
431
            for _ in range(parallel_tasks):
432
                bunch = range(_ * mvgd_bunch_size, (_ + 1) * mvgd_bunch_size)
433
                tasks.add(
434
                    PythonOperator(
435
                        task_id=(
436
                            f"generate_model_data_"
437
                            f"{scenario_name}_"
438
                            f"bunch{bunch[0]}-{bunch[-1]}"
439
                        ),
440
                        python_callable=generate_model_data_bunch,
441
                        op_kwargs={
442
                            "scenario_name": scenario_name,
443
                            "bunch": bunch,
444
                        },
445
                    )
446
                )
447
448
            if scenario_name == "eGon2035":
449
                tasks.add(generate_model_data_eGon2035_remaining)
450
            elif scenario_name == "eGon100RE":
451
                tasks.add(generate_model_data_eGon100RE_remaining)
452
            return tasks
453
454
        super().__init__(
455
            name="MotorizedIndividualTravel",
456
            version="0.0.6",
457
            dependencies=dependencies,
458
            tasks=(
459
                create_tables,
460
                {
461
                    (
462
                        download_and_preprocess,
463
                        allocate_evs_numbers,
464
                    ),
465
                    (
466
                        extract_trip_file,
467
                        write_metadata_to_db,
468
                        write_evs_trips_to_db,
469
                    ),
470
                },
471
                allocate_evs_to_grid_districts,
472
                delete_model_data_from_db,
473
                {
474
                    *generate_model_data_tasks(scenario_name="eGon2035"),
475
                    *generate_model_data_tasks(scenario_name="eGon100RE"),
476
                },
477
            ),
478
        )
479