Completed
Push — dev ( 85d654...312d71 )
by
unknown
21s queued 16s
created

adapt_numpy_float64()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""
2
Main module for preparation of model data (static and timeseries) for
3
motorized individual travel (MIT).
4
5
**Contents of this module**
6
  * Creation of DB tables
7
  * Download and preprocessing of vehicle registration data from KBA and BMVI
8
  * Calculate number of electric vehicles and allocate on different spatial
9
    levels.
10
  * Extract and write pre-generated trips to DB
11
12
"""
13
14
from pathlib import Path
15
from urllib.request import urlretrieve
16
import os
17
import tarfile
18
19
from airflow.operators.python_operator import PythonOperator
20
from psycopg2.extensions import AsIs, register_adapter
21
import numpy as np
22
import pandas as pd
23
24
from egon.data import db, subprocess
25
from egon.data.datasets import Dataset
26
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import (  # noqa: E501
27
    EgonEvCountMunicipality,
28
    EgonEvCountMvGridDistrict,
29
    EgonEvCountRegistrationDistrict,
30
    EgonEvMetadata,
31
    EgonEvMvGridDistrict,
32
    EgonEvPool,
33
    EgonEvTrip,
34
)
35
from egon.data.datasets.emobility.motorized_individual_travel.ev_allocation import (  # noqa: E501
36
    allocate_evs_numbers,
37
    allocate_evs_to_grid_districts,
38
)
39
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
40
    COLUMNS_KBA,
41
    DATA_BUNDLE_DIR,
42
    DATASET_CFG,
43
    MVGD_MIN_COUNT,
44
    TESTMODE_OFF,
45
    TRIP_COLUMN_MAPPING,
46
    WORKING_DIR,
47
)
48
from egon.data.datasets.emobility.motorized_individual_travel.model_timeseries import (  # noqa: E501
49
    delete_model_data_from_db,
50
    generate_model_data_bunch,
51
    generate_model_data_eGon100RE_remaining,
52
    generate_model_data_eGon2035_remaining,
53
    read_simbev_metadata_file,
54
)
55
56
57
# ========== Register np datatypes with SQLA ==========
58
def adapt_numpy_float64(numpy_float64):
59
    return AsIs(numpy_float64)
60
61
62
def adapt_numpy_int64(numpy_int64):
63
    return AsIs(numpy_int64)
64
65
66
register_adapter(np.float64, adapt_numpy_float64)
67
register_adapter(np.int64, adapt_numpy_int64)
68
# =====================================================
69
70
71
def create_tables():
72
    """Create tables for electric vehicles
73
74
    Returns
75
    -------
76
    None
77
    """
78
79
    engine = db.engine()
80
    EgonEvCountRegistrationDistrict.__table__.drop(
81
        bind=engine, checkfirst=True
82
    )
83
    EgonEvCountRegistrationDistrict.__table__.create(
84
        bind=engine, checkfirst=True
85
    )
86
    EgonEvCountMunicipality.__table__.drop(bind=engine, checkfirst=True)
87
    EgonEvCountMunicipality.__table__.create(bind=engine, checkfirst=True)
88
    EgonEvCountMvGridDistrict.__table__.drop(bind=engine, checkfirst=True)
89
    EgonEvCountMvGridDistrict.__table__.create(bind=engine, checkfirst=True)
90
    EgonEvPool.__table__.drop(bind=engine, checkfirst=True)
91
    EgonEvPool.__table__.create(bind=engine, checkfirst=True)
92
    EgonEvTrip.__table__.drop(bind=engine, checkfirst=True)
93
    EgonEvTrip.__table__.create(bind=engine, checkfirst=True)
94
    EgonEvMvGridDistrict.__table__.drop(bind=engine, checkfirst=True)
95
    EgonEvMvGridDistrict.__table__.create(bind=engine, checkfirst=True)
96
    EgonEvMetadata.__table__.drop(bind=engine, checkfirst=True)
97
    EgonEvMetadata.__table__.create(bind=engine, checkfirst=True)
98
99
    # Create dir for results, if it does not exist
100
    result_dir = WORKING_DIR / Path("results")
101
    result_dir.mkdir(exist_ok=True, parents=True)
102
103
104
def download_and_preprocess():
105
    """Downloads and preprocesses data from KBA and BMVI
106
107
    Returns
108
    -------
109
    pandas.DataFrame
110
        Vehicle registration data for registration district
111
    pandas.DataFrame
112
        RegioStaR7 data
113
    """
114
115
    mit_sources = DATASET_CFG["original_data"]["sources"]
116
117
    # Create the folder, if it does not exist
118
    if not os.path.exists(WORKING_DIR):
119
        os.mkdir(WORKING_DIR)
120
121
    ################################
122
    # Download and import KBA data #
123
    ################################
124
    url = mit_sources["KBA"]["url"]
125
    file = WORKING_DIR / mit_sources["KBA"]["file"]
126
    if not os.path.isfile(file):
127
        urlretrieve(url, file)
128
129
    kba_data = pd.read_excel(
130
        file,
131
        sheet_name=mit_sources["KBA"]["sheet"],
132
        usecols=mit_sources["KBA"]["columns"],
133
        skiprows=mit_sources["KBA"]["skiprows"],
134
    )
135
    kba_data.columns = COLUMNS_KBA
136
    kba_data.replace(
137
        " ",
138
        np.nan,
139
        inplace=True,
140
    )
141
    kba_data = kba_data.dropna()
142
    kba_data[
143
        ["ags_reg_district", "reg_district"]
144
    ] = kba_data.reg_district.str.split(
145
        " ",
146
        1,
147
        expand=True,
148
    )
149
    kba_data.ags_reg_district = kba_data.ags_reg_district.astype("int")
150
151
    kba_data.to_csv(
152
        WORKING_DIR / mit_sources["KBA"]["file_processed"], index=None
153
    )
154
155
    #######################################
156
    # Download and import RegioStaR7 data #
157
    #######################################
158
159
    url = mit_sources["RS7"]["url"]
160
    file = WORKING_DIR / mit_sources["RS7"]["file"]
161
    if not os.path.isfile(file):
162
        urlretrieve(url, file)
163
164
    rs7_data = pd.read_excel(file, sheet_name=mit_sources["RS7"]["sheet"])
165
166
    rs7_data["ags_district"] = (
167
        rs7_data.gem_20.multiply(1 / 1000).apply(np.floor).astype("int")
168
    )
169
    rs7_data = rs7_data.rename(
170
        columns={"gem_20": "ags", "RegioStaR7": "rs7_id"}
171
    )
172
    rs7_data.rs7_id = rs7_data.rs7_id.astype("int")
173
174
    rs7_data.to_csv(
175
        WORKING_DIR / mit_sources["RS7"]["file_processed"], index=None
176
    )
177
178
179
def extract_trip_file():
180
    """Extract trip file from data bundle"""
181
    trip_dir = DATA_BUNDLE_DIR / Path("mit_trip_data")
182
183
    for scenario_name in ["eGon2035", "eGon100RE"]:
184
        print(f"SCENARIO: {scenario_name}")
185
        trip_file = trip_dir / Path(
186
            DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][
187
                "file"
188
            ]
189
        )
190
191
        tar = tarfile.open(trip_file)
192
        if os.path.isfile(trip_file):
193
            tar.extractall(trip_dir)
194
        else:
195
            raise FileNotFoundError(
196
                f"Trip file {trip_file} not found in data bundle."
197
            )
198
199
200
def write_evs_trips_to_db():
201
    """Write EVs and trips generated by simBEV from data bundle to database
202
    table
203
    """
204
205
    def import_csv(f):
206
        df = pd.read_csv(f, usecols=TRIP_COLUMN_MAPPING.keys())
207
        df["rs7_id"] = int(f.parent.name)
208
        df["simbev_ev_id"] = "_".join(f.name.split("_")[0:3])
209
        return df
210
211
    for scenario_name in ["eGon2035", "eGon100RE"]:
212
        print(f"SCENARIO: {scenario_name}")
213
        trip_dir_name = Path(
214
            DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][
215
                "file"
216
            ].split(".")[0]
217
        )
218
219
        trip_dir_root = DATA_BUNDLE_DIR / Path("mit_trip_data", trip_dir_name)
220
221
        if TESTMODE_OFF:
222
            trip_files = list(trip_dir_root.glob("*/*.csv"))
223
        else:
224
            # Load only 1000 EVs per region if in test mode
225
            trip_files = [
226
                list(rdir.glob("*.csv"))[:1000]
227
                for rdir in [_ for _ in trip_dir_root.iterdir() if _.is_dir()]
228
            ]
229
            # Flatten
230
            trip_files = [i for sub in trip_files for i in sub]
231
232
        # Read, concat and reorder cols
233
        print(f"Importing {len(trip_files)} EV trip CSV files...")
234
        trip_data = pd.concat(map(import_csv, trip_files))
235
        trip_data.rename(columns=TRIP_COLUMN_MAPPING, inplace=True)
236
        trip_data = trip_data.reset_index().rename(
237
            columns={"index": "simbev_event_id"}
238
        )
239
        cols = ["rs7_id", "simbev_ev_id", "simbev_event_id"] + list(
240
            TRIP_COLUMN_MAPPING.values()
241
        )
242
        trip_data.index.name = "event_id"
243
        trip_data = trip_data[cols]
244
245
        # Extract EVs from trips
246
        evs_unique = trip_data[["rs7_id", "simbev_ev_id"]].drop_duplicates()
247
        evs_unique = evs_unique.reset_index().drop(columns=["event_id"])
248
        evs_unique.index.name = "ev_id"
249
250
        # Add EV id to trip DF
251
        trip_data["egon_ev_pool_ev_id"] = pd.merge(
252
            trip_data, evs_unique.reset_index(), on=["rs7_id", "simbev_ev_id"]
253
        )["ev_id"]
254
255
        # Split simBEV id into type and id
256
        evs_unique[["type", "simbev_ev_id"]] = evs_unique[
257
            "simbev_ev_id"
258
        ].str.rsplit("_", 1, expand=True)
259
        evs_unique.simbev_ev_id = evs_unique.simbev_ev_id.astype(int)
260
        evs_unique["scenario"] = scenario_name
261
262
        trip_data.drop(columns=["rs7_id", "simbev_ev_id"], inplace=True)
263
        trip_data["scenario"] = scenario_name
264
        trip_data.sort_index(inplace=True)
265
266
        # Write EVs to DB
267
        print("Writing EVs to DB pool...")
268
        evs_unique.to_sql(
269
            name=EgonEvPool.__table__.name,
270
            schema=EgonEvPool.__table__.schema,
271
            con=db.engine(),
272
            if_exists="append",
273
            index=True,
274
        )
275
276
        # Write trips to CSV and import to DB
277
        print("Writing EV trips to CSV file...")
278
        trip_file = WORKING_DIR / f"trip_data_{scenario_name}.csv"
279
        trip_data.to_csv(trip_file)
280
281
        # Get DB config
282
        docker_db_config = db.credentials()
283
        host = ["-h", f"{docker_db_config['HOST']}"]
284
        port = ["-p", f"{docker_db_config['PORT']}"]
285
        pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
286
        user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
287
        command = [
288
            "-c",
289
            rf"\copy {EgonEvTrip.__table__.schema}.{EgonEvTrip.__table__.name}"
290
            rf"({','.join(trip_data.reset_index().columns)})"
291
            rf" FROM '{str(trip_file)}' DELIMITER ',' CSV HEADER;",
292
        ]
293
294
        print("Importing EV trips from CSV file to DB...")
295
        subprocess.run(
296
            ["psql"] + host + port + pgdb + user + command,
297
            env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
298
        )
299
300
        os.remove(trip_file)
301
302
303
def write_metadata_to_db():
304
    """
305
    Write used SimBEV metadata per scenario to database.
306
    """
307
    dtypes = {
308
        "scenario": str,
309
        "eta_cp": float,
310
        "stepsize": int,
311
        "start_date": np.datetime64,
312
        "end_date": np.datetime64,
313
        "soc_min": float,
314
        "grid_timeseries": bool,
315
        "grid_timeseries_by_usecase": bool,
316
    }
317
318
    for scenario_name in ["eGon2035", "eGon100RE"]:
319
        meta_run_config = read_simbev_metadata_file(
320
            scenario_name, "config"
321
        ).loc["basic"]
322
323
        meta_run_config = (
324
            meta_run_config.to_frame()
325
            .T.assign(scenario=scenario_name)[dtypes.keys()]
326
            .astype(dtypes)
327
        )
328
329
        meta_run_config.to_sql(
330
            name=EgonEvMetadata.__table__.name,
331
            schema=EgonEvMetadata.__table__.schema,
332
            con=db.engine(),
333
            if_exists="append",
334
            index=False,
335
        )
336
337
338
class MotorizedIndividualTravel(Dataset):
339
    """
340
    Class to set up static and timeseries data for motorized individual travel (MIT).
341
342
    For more information see data documentation on :ref:`mobility-demand-mit-ref`.
343
344
    *Dependencies*
345
      * :py:class:`DataBundle <egon.data.datasets.data_bundle.DataBundle>`
346
      * :py:class:`MvGridDistricts
347
        <egon.data.datasets.mv_grid_districts.mv_grid_districts_setup>`
348
      * :py:class:`ScenarioParameters
349
        <egon.data.datasets.scenario_parameters.ScenarioParameters>`
350
      * :py:class:`EtragoSetup <egon.data.datasets.etrago_setup.EtragoSetup>`
351
      * :py:class:`ZensusMvGridDistricts
352
        <egon.data.datasets.zensus_mv_grid_districts.ZensusMvGridDistricts>`
353
      * :py:class:`ZensusVg250 <egon.data.datasets.zensus_vg250.ZensusVg250>`
354
      * :py:class:`StorageEtrago <egon.data.datasets.storages_etrago.StorageEtrago>`
355
      * :py:class:`HtsEtragoTable
356
        <egon.data.datasets.heat_etrago.hts_etrago.HtsEtragoTable>`
357
      * :py:class:`ChpEtrago <egon.data.datasets.chp_etrago.ChpEtrago>`
358
      * :py:class:`DsmPotential <egon.data.datasets.DSM_cts_ind.DsmPotential>`
359
      * :py:class:`HeatEtrago <egon.data.datasets.heat_etrago.HeatEtrago>`
360
      * :py:class:`Egon_etrago_gen <egon.data.datasets.fill_etrago_gen.Egon_etrago_gen>`
361
      * :py:class:`OpenCycleGasTurbineEtrago
362
        <egon.data.datasets.power_etrago.OpenCycleGasTurbineEtrago>`
363
      * :py:class:`HydrogenStoreEtrago
364
        <egon.data.datasets.hydrogen_etrago.HydrogenStoreEtrago>`
365
      * :py:class:`HydrogenPowerLinkEtrago
366
        <egon.data.datasets.hydrogen_etrago.HydrogenPowerLinkEtrago>`
367
      * :py:class:`HydrogenMethaneLinkEtrago
368
        <egon.data.datasets.hydrogen_etrago.HydrogenMethaneLinkEtrago>`
369
      * :py:class:`GasAreaseGon100RE <egon.data.datasets.gas_areas.GasAreaseGon100RE>`
370
      * :py:class:`CH4Production <egon.data.datasets.ch4_prod.CH4Production>`
371
      * :py:class:`CH4Storages <egon.data.datasets.ch4_storages.CH4Storages>`
372
373
    *Resulting Tables*
374
      * :py:class:`EgonEvPool <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvPool>`
375
        is created and filled
376
      * :py:class:`EgonEvTrip <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvTrip>`
377
        is created and filled
378
      * :py:class:`EgonEvCountRegistrationDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountRegistrationDistrict>`
379
        is created and filled
380
      * :py:class:`EgonEvCountMunicipality <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountMunicipality>`
381
        is created and filled
382
      * :py:class:`EgonEvCountMvGridDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountMvGridDistrict>`
383
        is created and filled
384
      * :py:class:`EgonEvMvGridDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvMvGridDistrict>`
385
        is created and filled
386
      * :py:class:`EgonEvMetadata <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvMetadata>`
387
        is created and filled
388
389
    *Configuration*
390
391
    The config of this dataset can be found in *datasets.yml* in section
392
    *emobility_mit*.
393
394
    """
395
396
    #:
397
    name: str = "MotorizedIndividualTravel"
398
    #:
399
    version: str = "0.0.7"
400
401
    def __init__(self, dependencies):
402
        def generate_model_data_tasks(scenario_name):
403
            """Dynamically generate tasks for model data creation.
404
405
            The goal is to speed up the creation of model timeseries. However,
406
            the exact number of parallel task cannot be determined during the
407
            DAG building as the number of grid districts (MVGD) is calculated
408
            within another pipeline task.
409
            Approach: assuming an approx. count of `mvgd_min_count` of 3700,
410
            the majority of the MVGDs can be parallelized. The remainder is
411
            handled subsequently in XXX.
412
            The number of parallel tasks is defined via parameter
413
            `parallel_tasks` in the dataset config `datasets.yml`.
414
415
            Parameters
416
            ----------
417
            scenario_name : str
418
                Scenario name
419
420
            Returns
421
            -------
422
            set of functools.partial
423
                The tasks. Each element is of
424
                :func:`egon.data.datasets.emobility.motorized_individual_travel.model_timeseries.generate_model_data`
425
            """
426
            parallel_tasks = DATASET_CFG["model_timeseries"].get(
427
                "parallel_tasks", 1
428
            )
429
            mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0]
430
431
            tasks = set()
432
            for _ in range(parallel_tasks):
433
                bunch = range(_ * mvgd_bunch_size, (_ + 1) * mvgd_bunch_size)
434
                tasks.add(
435
                    PythonOperator(
436
                        task_id=(
437
                            f"generate_model_data_"
438
                            f"{scenario_name}_"
439
                            f"bunch{bunch[0]}-{bunch[-1]}"
440
                        ),
441
                        python_callable=generate_model_data_bunch,
442
                        op_kwargs={
443
                            "scenario_name": scenario_name,
444
                            "bunch": bunch,
445
                        },
446
                    )
447
                )
448
449
            if scenario_name == "eGon2035":
450
                tasks.add(generate_model_data_eGon2035_remaining)
451
            elif scenario_name == "eGon100RE":
452
                tasks.add(generate_model_data_eGon100RE_remaining)
453
            return tasks
454
455
        super().__init__(
456
            name=self.name,
457
            version=self.version,
458
            dependencies=dependencies,
459
            tasks=(
460
                create_tables,
461
                {
462
                    (
463
                        download_and_preprocess,
464
                        allocate_evs_numbers,
465
                    ),
466
                    (
467
                        extract_trip_file,
468
                        write_metadata_to_db,
469
                        write_evs_trips_to_db,
470
                    ),
471
                },
472
                allocate_evs_to_grid_districts,
473
                delete_model_data_from_db,
474
                {
475
                    *generate_model_data_tasks(scenario_name="eGon2035"),
476
                    *generate_model_data_tasks(scenario_name="eGon100RE"),
477
                },
478
            ),
479
        )
480