Passed
Pull Request — dev (#1369)
by
unknown
02:00
created

data.datasets.power_plants.mastr.import_mastr()   F

Complexity

Conditions 12

Size

Total Lines 377
Code Lines 250

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 250
dl 0
loc 377
rs 3.36
c 0
b 0
f 0
cc 12
nop 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.datasets.power_plants.mastr.import_mastr() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
14
* `voltage_level`: inferred based on nominal power (`capacity`) using the
15
  ranges from
16
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
17
  which results in True in column `voltage_level_inferred`. Remaining datasets
18
  are set to -1 (which only occurs if `capacity` is empty).
19
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
20
  districts or no geom available, e.g. for units with nom. power <30 kW)
21
* `supply.egon_power_plants_hydro.plant_type`: NaN
22
23
The data is used especially for the generation of status quo grids by ding0.
24
"""
25
26
from __future__ import annotations
27
28
from pathlib import Path
29
30
from loguru import logger
31
import geopandas as gpd
32
import numpy as np
33
import pandas as pd
34
35
from egon.data import config, db
36
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
37
from egon.data.datasets.power_plants.mastr_db_classes import (
38
    EgonMastrGeocoded,
39
    EgonPowerPlantsBiomass,
40
    EgonPowerPlantsCombustion,
41
    EgonPowerPlantsGsgk,
42
    EgonPowerPlantsHydro,
43
    EgonPowerPlantsNuclear,
44
    EgonPowerPlantsPv,
45
    EgonPowerPlantsStorage,
46
    EgonPowerPlantsWind,
47
    add_metadata,
48
)
49
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
50
    federal_state_data,
51
)
52
53
TESTMODE_OFF = (
54
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
55
)
56
57
58
def isfloat(num: str):
59
    """
60
    Determine if string can be converted to float.
61
62
    Parameters
63
    -----------
64
    num : str
65
        String to parse.
66
    Returns
67
    -------
68
    bool
69
        Returns True in string can be parsed to float.
70
    """
71
    try:
72
        float(num)
73
        return True
74
    except ValueError:
75
        return False
76
77
78
def zip_and_municipality_from_standort(
79
    standort: str,
80
) -> tuple[str, bool]:
81
    """
82
    Get zip code and municipality from Standort string split into a list.
83
84
    Parameters
85
    -----------
86
    standort : str
87
        Standort as given from MaStR data.
88
    Returns
89
    -------
90
    str
91
        Standort with only the zip code and municipality
92
        as well a ', Germany' added.
93
    """
94
    standort_list = standort.split()
95
96
    found = False
97
    count = 0
98
99
    for count, elem in enumerate(standort_list):
100
        if len(elem) != 5:
101
            continue
102
        if not elem.isnumeric():
103
            continue
104
105
        found = True
106
107
        break
108
109
    if found:
110
        cleaned_str = " ".join(standort_list[count:])
111
112
        return cleaned_str, found
113
114
    logger.warning(
115
        "Couldn't identify zip code. This entry will be dropped."
116
        f" Original standort: {standort}."
117
    )
118
119
    return standort, found
120
121
122 View Code Duplication
def infer_voltage_level(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
123
    units_gdf: gpd.GeoDataFrame,
124
) -> gpd.GeoDataFrame:
125
    """
126
    Infer nan values in voltage level derived from generator capacity to
127
    the power plants.
128
129
    Parameters
130
    -----------
131
    units_gdf : geopandas.GeoDataFrame
132
        GeoDataFrame containing units with voltage levels from MaStR
133
    Returns
134
    -------
135
    geopandas.GeoDataFrame
136
        GeoDataFrame containing units all having assigned a voltage level.
137
    """
138
139
    def voltage_levels(p: float) -> int:
140
        if p <= 100:
141
            return 7
142
        elif p <= 200:
143
            return 6
144
        elif p <= 5500:
145
            return 5
146
        elif p <= 20000:
147
            return 4
148
        elif p <= 120000:
149
            return 3
150
        return 1
151
152
    units_gdf["voltage_level_inferred"] = False
153
    mask = units_gdf.voltage_level.isna()
154
    units_gdf.loc[mask, "voltage_level_inferred"] = True
155
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
156
        mask
157
    ].Nettonennleistung.apply(voltage_levels)
158
159
    return units_gdf
160
161
162
def import_mastr() -> None:
163
    """Import MaStR data into database"""
164
    engine = db.engine()
165
166
    # import geocoded data
167
    cfg = config.datasets()["mastr_new"]
168
    path_parts = cfg["geocoding_path"]
169
    path = Path(*["."] + path_parts).resolve()
170
    path = list(path.iterdir())[0]
171
172
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
173
    deposit_id_mastr = cfg["deposit_id"]
174
175
    if deposit_id_geocoding != deposit_id_mastr:
176
        raise AssertionError(
177
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
178
            f" dataset is not matching with the geocoding version "
179
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
180
            f"the MaStR dataset is updated also update the geocoding and "
181
            f"update the egon data bundle. The geocoding can be done using: "
182
            f"https://github.com/RLI-sandbox/mastr-geocoding"
183
        )
184
185
    geocoding_gdf = gpd.read_file(path)
186
187
    # remove failed requests
188
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
189
190
    # remove unnecesary columns
191
    geocoding_gdf.drop(columns="geocode_source", inplace=True)
192
193
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
194
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
195
196
    geocoding_gdf.to_postgis(
197
        name=EgonMastrGeocoded.__tablename__,
198
        con=engine,
199
        if_exists="append",
200
        schema=EgonMastrGeocoded.__table_args__["schema"],
201
        index=True,
202
    )
203
204
    cfg = config.datasets()["power_plants"]
205
206
    cols_mapping = {
207
        "all": {
208
            "EinheitMastrNummer": "gens_id",
209
            "EinheitBetriebsstatus": "status",
210
            "Inbetriebnahmedatum": "commissioning_date",
211
            "Postleitzahl": "postcode",
212
            "Ort": "city",
213
            "Gemeinde": "municipality",
214
            "Bundesland": "federal_state",
215
            "Nettonennleistung": "capacity",
216
            "Einspeisungsart": "feedin_type",
217
            "DatumEndgueltigeStilllegung": "decommissioning_date",
218
        },
219
        "pv": {
220
            "Lage": "site_type",
221
            "Standort": "site",
222
            "Nutzungsbereich": "usage_sector",
223
            "Hauptausrichtung": "orientation_primary",
224
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
225
            "Nebenausrichtung": "orientation_secondary",
226
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
227
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
228
            "AnzahlModule": "module_count",
229
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
230
        },
231
        "wind": {
232
            "Lage": "site_type",
233
            "Hersteller": "manufacturer_name",
234
            "Typenbezeichnung": "type_name",
235
            "Nabenhoehe": "hub_height",
236
            "Rotordurchmesser": "rotor_diameter",
237
        },
238
        "biomass": {
239
            "Technologie": "technology",
240
            "Hauptbrennstoff": "main_fuel",
241
            "Biomasseart": "fuel_type",
242
            "ThermischeNutzleistung": "th_capacity",
243
        },
244
        "hydro": {
245
            "ArtDerWasserkraftanlage": "plant_type",
246
            "ArtDesZuflusses": "water_origin",
247
        },
248
        "combustion": {
249
            "Energietraeger": "carrier",
250
            "Hauptbrennstoff": "main_fuel",
251
            "WeitererHauptbrennstoff": "other_main_fuel",
252
            "Technologie": "technology",
253
            "ThermischeNutzleistung": "th_capacity",
254
        },
255
        "gsgk": {
256
            "Energietraeger": "carrier",
257
            "Technologie": "technology",
258
        },
259
        "nuclear": {
260
            "Energietraeger": "carrier",
261
            "Technologie": "technology",
262
        },
263
        "storage": {
264
            "Energietraeger": "carrier",
265
            "Technologie": "technology",
266
            "Batterietechnologie": "battery_type",
267
            "Pumpspeichertechnologie": "pump_storage_type",
268
        },
269
    }
270
271
    source_files = {
272
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
273
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
274
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
275
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
276
        "combustion": WORKING_DIR_MASTR_NEW
277
        / cfg["sources"]["mastr_combustion"],
278
        "gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"],
279
        "nuclear": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_nuclear"],
280
        "storage": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_storage"],
281
    }
282
283
    target_tables = {
284
        "pv": EgonPowerPlantsPv,
285
        "wind": EgonPowerPlantsWind,
286
        "biomass": EgonPowerPlantsBiomass,
287
        "hydro": EgonPowerPlantsHydro,
288
        "combustion": EgonPowerPlantsCombustion,
289
        "gsgk": EgonPowerPlantsGsgk,
290
        "nuclear": EgonPowerPlantsNuclear,
291
        "storage": EgonPowerPlantsStorage,
292
    }
293
294
    vlevel_mapping = {
295
        "Höchstspannung": 1,
296
        "UmspannungZurHochspannung": 2,
297
        "Hochspannung": 3,
298
        "UmspannungZurMittelspannung": 4,
299
        "Mittelspannung": 5,
300
        "UmspannungZurNiederspannung": 6,
301
        "Niederspannung": 7,
302
    }
303
304
    # import locations
305
    locations = pd.read_csv(
306
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
307
        index_col=None,
308
    )
309
310
    # import grid districts
311
    mv_grid_districts = db.select_geodataframe(
312
        f"""
313
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
314
        """,
315
        epsg=4326,
316
    )
317
318
    # import units
319
    technologies = [
320
        "pv",
321
        "wind",
322
        "biomass",
323
        "hydro",
324
        "combustion",
325
        "gsgk",
326
        "nuclear",
327
        "storage",
328
    ]
329
330
    for tech in technologies:
331
        # read units
332
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
333
        logger.debug("Reading CSV and filtering data...")
334
        units = pd.read_csv(
335
            source_files[tech],
336
            usecols=(
337
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
338
                + list(cols_mapping["all"].keys())
339
                + list(cols_mapping[tech].keys())
340
            ),
341
            index_col=None,
342
            dtype={"Postleitzahl": str},
343
            low_memory=False,
344
        ).rename(columns=cols_mapping)
345
346
        # drop units outside of Germany
347
        len_old = len(units)
348
        units = units.loc[units.Land == "Deutschland"]
349
        logger.debug(
350
            f"{len_old - len(units)} units outside of Germany dropped..."
351
        )
352
353
        # get boundary
354
        boundary = (
355
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
356
        )
357
358
        # drop not operating units
359
        len_old = len(units)
360
        units = units.loc[
361
            units.EinheitBetriebsstatus.isin(
362
                ["InBetrieb", "VoruebergehendStillgelegt"]
363
            )
364
        ]
365
        logger.debug(f"{len_old - len(units)} not operating units dropped...")
366
367
        # filter for SH units if in testmode
368
        if not TESTMODE_OFF:
369
            logger.info(
370
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
371
            )
372
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
373
374
        # merge and rename voltage level
375
        logger.debug("Merging with locations and allocate voltage level...")
376
        units = units.merge(
377
            locations[["MaStRNummer", "Spannungsebene"]],
378
            left_on="LokationMastrNummer",
379
            right_on="MaStRNummer",
380
            how="left",
381
        )
382
        # convert voltage levels to numbers
383
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
384
        # set voltage level for nan values
385
        units = infer_voltage_level(units)
386
387
        # add geometry
388
        logger.debug("Adding geometries...")
389
        units = gpd.GeoDataFrame(
390
            units,
391
            geometry=gpd.points_from_xy(
392
                units["Laengengrad"], units["Breitengrad"], crs=4326
393
            ),
394
            crs=4326,
395
        )
396
397
        units["geometry_geocoded"] = (
398
            units.Laengengrad.isna() | units.Laengengrad.isna()
399
        )
400
401
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
402
            ~units.geometry_geocoded, "geometry"
403
        ].is_valid
404
405
        units_wo_geom = units["geometry_geocoded"].sum()
406
407
        logger.debug(
408
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
409
            " Adding geocoding results."
410
        )
411
412
        # determine zip and municipality string
413
        mask = (
414
            units.Postleitzahl.apply(isfloat)
415
            & ~units.Postleitzahl.isna()
416
            & ~units.Gemeinde.isna()
417
        )
418
        units["zip_and_municipality"] = np.nan
419
        ok_units = units.loc[mask]
420
421
        units.loc[mask, "zip_and_municipality"] = (
422
            ok_units.Postleitzahl.astype(float)
423
            .astype(int)
424
            .astype(str)
425
            .str.zfill(5)
426
            + " "
427
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
428
            + ", Deutschland"
429
        )
430
431
        # get zip and municipality from Standort
432
        parse_df = units.loc[~mask]
433
434
        if not parse_df.empty and "Standort" in parse_df.columns:
435
            init_len = len(parse_df)
436
437
            logger.info(
438
                f"Parsing ZIP code and municipality from Standort for "
439
                f"{init_len} values for {tech}."
440
            )
441
442
            parse_df[["zip_and_municipality", "drop_this"]] = (
443
                parse_df.Standort.astype(str)
444
                .apply(zip_and_municipality_from_standort)
445
                .tolist()
446
            )
447
448
            parse_df = parse_df.loc[parse_df.drop_this]
449
450
            if not parse_df.empty:
451
                units.loc[parse_df.index, "zip_and_municipality"] = (
452
                    parse_df.zip_and_municipality
453
                )
454
455
        # add geocoding to missing
456
        units = units.merge(
457
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
458
                columns={"geometry": "temp"}
459
            ),
460
            how="left",
461
            on="zip_and_municipality",
462
        )
463
464
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
465
            units.geometry_geocoded, "temp"
466
        ]
467
468
        init_len = len(units)
469
470
        logger.info(
471
            "Dropping units outside boundary by geometry or without geometry"
472
            "..."
473
        )
474
475
        units.dropna(subset=["geometry"], inplace=True)
476
477
        units = units.loc[units.geometry.within(boundary)]
478
479
        if init_len > 0:
480
            logger.debug(
481
                f"{init_len - len(units)}/{init_len} "
482
                f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
483
            )
484
485
        # drop unnecessary and rename columns
486
        logger.debug("Reformatting...")
487
        units.drop(
488
            columns=[
489
                "LokationMastrNummer",
490
                "MaStRNummer",
491
                "Laengengrad",
492
                "Breitengrad",
493
                "Spannungsebene",
494
                "Land",
495
                "temp",
496
            ],
497
            inplace=True,
498
        )
499
500
        mapping = cols_mapping["all"].copy()
501
        mapping.update(cols_mapping[tech])
502
        mapping.update({"geometry": "geom"})
503
        units.rename(columns=mapping, inplace=True)
504
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
505
        units["postcode"] = units["postcode"].apply(
506
            lambda x: int(float(x)) if not pd.isna(x) else pd.NA
507
        )
508
        units.set_geometry("geom", inplace=True)
509
        units["id"] = range(len(units))
510
511
        # change capacity unit: kW to MW
512
        units["capacity"] = units["capacity"] / 1e3
513
        if "capacity_inverter" in units.columns:
514
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
515
        if "th_capacity" in units.columns:
516
            units["th_capacity"] = units["th_capacity"] / 1e3
517
518
        # assign bus ids
519
        logger.debug("Assigning bus ids...")
520
        units = units.assign(
521
            bus_id=units.loc[~units.geom.x.isna()]
522
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
523
            .drop(columns=["index_right"])
524
            .bus_id
525
        )
526
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
527
528
        # write to DB
529
        logger.info(f"Writing {len(units)} units to DB...")
530
531
        units.to_postgis(
532
            name=target_tables[tech].__tablename__,
533
            con=engine,
534
            if_exists="append",
535
            schema=target_tables[tech].__table_args__["schema"],
536
        )
537
538
    add_metadata()
539