Passed
Pull Request — dev (#1112)
by
unknown
01:57
created

data.datasets.power_plants.mastr   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 464
Duplicated Lines 2.59 %

Importance

Changes 0
Metric Value
wmc 22
eloc 267
dl 12
loc 464
rs 10
c 0
b 0
f 0

4 Functions

Rating   Name   Duplication   Size   Complexity  
B infer_voltage_level() 12 38 6
A zip_and_municipality_from_standort() 0 41 5
A isfloat() 0 17 2
D import_mastr() 0 311 9

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from loguru import logger
29
import geopandas as gpd
30
import numpy as np
31
import pandas as pd
32
33
from egon.data import config, db
34
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
35
from egon.data.datasets.power_plants.mastr_db_classes import (
36
    EgonMastrGeocoded,
37
    EgonPowerPlantsBiomass,
38
    EgonPowerPlantsHydro,
39
    EgonPowerPlantsPv,
40
    EgonPowerPlantsWind,
41
)
42
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
43
    federal_state_data,
44
)
45
46
TESTMODE_OFF = (
47
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
48
)
49
50
51
def isfloat(num: str):
52
    """
53
    Determine if string can be converted to float.
54
    Parameters
55
    -----------
56
    num : str
57
        String to parse.
58
    Returns
59
    -------
60
    bool
61
        Returns True in string can be parsed to float.
62
    """
63
    try:
64
        float(num)
65
        return True
66
    except ValueError:
67
        return False
68
69
70
def zip_and_municipality_from_standort(
71
    standort: str,
72
) -> tuple[str, bool]:
73
    """
74
    Get zip code and municipality from Standort string split into a list.
75
    Parameters
76
    -----------
77
    standort : str
78
        Standort as given from MaStR data.
79
    Returns
80
    -------
81
    str
82
        Standort with only the zip code and municipality
83
        as well a ', Germany' added.
84
    """
85
    standort_list = standort.split()
86
87
    found = False
88
    count = 0
89
90
    for count, elem in enumerate(standort_list):
91
        if len(elem) != 5:
92
            continue
93
        if not elem.isnumeric():
94
            continue
95
96
        found = True
97
98
        break
99
100
    if found:
101
        cleaned_str = " ".join(standort_list[count:])
102
103
        return cleaned_str, found
104
105
    logger.warning(
106
        "Couldn't identify zip code. This entry will be dropped."
107
        f" Original standort: {standort}."
108
    )
109
110
    return standort, found
111
112
113
def infer_voltage_level(
114
    units_gdf: gpd.GeoDataFrame,
115
) -> gpd.GeoDataFrame:
116
    """
117
    Infer nan values in voltage level derived from generator capacity to
118
    the power plants.
119
120
    Parameters
121
    -----------
122
    units_gdf : geopandas.GeoDataFrame
123
        GeoDataFrame containing units with voltage levels from MaStR
124
    Returnsunits_gdf: gpd.GeoDataFrame
125
    -------
126
    geopandas.GeoDataFrame
127
        GeoDataFrame containing units all having assigned a voltage level.
128
    """
129
130 View Code Duplication
    def voltage_levels(p: float) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
131
        if p <= 100:
132
            return 7
133
        elif p <= 200:
134
            return 6
135
        elif p <= 5500:
136
            return 5
137
        elif p <= 20000:
138
            return 4
139
        elif p <= 120000:
140
            return 3
141
        return 1
142
143
    units_gdf["voltage_level_inferred"] = False
144
    mask = units_gdf.voltage_level.isna()
145
    units_gdf.loc[mask, "voltage_level_inferred"] = True
146
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
147
        mask
148
    ].Nettonennleistung.apply(voltage_levels)
149
150
    return units_gdf
151
152
153
def import_mastr() -> None:
154
    """Import MaStR data into database"""
155
    engine = db.engine()
156
157
    # import geocoded data
158
    cfg = config.datasets()["mastr_new"]
159
    path_parts = cfg["geocoding_path"]
160
    path = Path(*["."] + path_parts).resolve()
161
    path = list(path.iterdir())[0]
162
163
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
164
    deposit_id_mastr = cfg["deposit_id"]
165
166
    if deposit_id_geocoding != deposit_id_mastr:
167
        raise AssertionError(
168
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
169
            f" dataset is not matching with the geocoding version "
170
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
171
            f"the MaStR dataset is updated also update the geocoding and "
172
            f"update the egon data bundle. The geocoding can be done using: "
173
            f"https://github.com/RLI-sandbox/mastr-geocoding"
174
        )
175
176
    geocoding_gdf = gpd.read_file(path)
177
178
    # remove failed requests
179
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
180
181
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
182
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
183
184
    geocoding_gdf.to_postgis(
185
        name=EgonMastrGeocoded.__tablename__,
186
        con=engine,
187
        if_exists="append",
188
        schema=EgonMastrGeocoded.__table_args__["schema"],
189
        index=True,
190
    )
191
192
    cfg = config.datasets()["power_plants"]
193
194
    cols_mapping = {
195
        "all": {
196
            "EinheitMastrNummer": "gens_id",
197
            "EinheitBetriebsstatus": "status",
198
            "Inbetriebnahmedatum": "commissioning_date",
199
            "Postleitzahl": "postcode",
200
            "Ort": "city",
201
            "Gemeinde": "municipality",
202
            "Bundesland": "federal_state",
203
            "Nettonennleistung": "capacity",
204
            "Einspeisungsart": "feedin_type",
205
        },
206
        "pv": {
207
            "Lage": "site_type",
208
            "Standort": "site",
209
            "Nutzungsbereich": "usage_sector",
210
            "Hauptausrichtung": "orientation_primary",
211
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
212
            "Nebenausrichtung": "orientation_secondary",
213
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
214
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
215
            "AnzahlModule": "module_count",
216
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
217
        },
218
        "wind": {
219
            "Lage": "site_type",
220
            "Hersteller": "manufacturer_name",
221
            "Typenbezeichnung": "type_name",
222
            "Nabenhoehe": "hub_height",
223
            "Rotordurchmesser": "rotor_diameter",
224
        },
225
        "biomass": {
226
            "Technologie": "technology",
227
            "Hauptbrennstoff": "fuel_name",
228
            "Biomasseart": "fuel_type",
229
            "ThermischeNutzleistung": "th_capacity",
230
        },
231
        "hydro": {
232
            "ArtDerWasserkraftanlage": "plant_type",
233
            "ArtDesZuflusses": "water_origin",
234
        },
235
    }
236
237
    source_files = {
238
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
239
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
240
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
241
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
242
    }
243
    target_tables = {
244
        "pv": EgonPowerPlantsPv,
245
        "wind": EgonPowerPlantsWind,
246
        "biomass": EgonPowerPlantsBiomass,
247
        "hydro": EgonPowerPlantsHydro,
248
    }
249
    vlevel_mapping = {
250
        "Höchstspannung": 1,
251
        "UmspannungZurHochspannung": 2,
252
        "Hochspannung": 3,
253
        "UmspannungZurMittelspannung": 4,
254
        "Mittelspannung": 5,
255
        "UmspannungZurNiederspannung": 6,
256
        "Niederspannung": 7,
257
    }
258
259
    # import locations
260
    locations = pd.read_csv(
261
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
262
        index_col=None,
263
    )
264
265
    # import grid districts
266
    mv_grid_districts = db.select_geodataframe(
267
        f"""
268
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
269
        """,
270
        epsg=4326,
271
    )
272
273
    # import units
274
    technologies = ["pv", "wind", "biomass", "hydro"]
275
    for tech in technologies:
276
        # read units
277
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
278
        logger.debug("Reading CSV and filtering data...")
279
        units = pd.read_csv(
280
            source_files[tech],
281
            usecols=(
282
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
283
                + list(cols_mapping["all"].keys())
284
                + list(cols_mapping[tech].keys())
285
            ),
286
            index_col=None,
287
            dtype={"Postleitzahl": str},
288
        ).rename(columns=cols_mapping)
289
290
        # drop units outside of Germany
291
        len_old = len(units)
292
        units = units.loc[units.Land == "Deutschland"]
293
        logger.debug(
294
            f"{len_old - len(units)} units outside of Germany dropped..."
295
        )
296
297
        # get boundary
298
        boundary = (
299
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
300
        )
301
302
        # filter for SH units if in testmode
303
        if not TESTMODE_OFF:
304
            logger.info(
305
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
306
            )
307
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
308
309
        # merge and rename voltage level
310
        logger.debug("Merging with locations and allocate voltage level...")
311
        units = units.merge(
312
            locations[["MaStRNummer", "Spannungsebene"]],
313
            left_on="LokationMastrNummer",
314
            right_on="MaStRNummer",
315
            how="left",
316
        )
317
        # convert voltage levels to numbers
318
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
319
        # set voltage level for nan values
320
        units = infer_voltage_level(units)
321
322
        # add geometry
323
        logger.debug("Adding geometries...")
324
        units = gpd.GeoDataFrame(
325
            units,
326
            geometry=gpd.points_from_xy(
327
                units["Laengengrad"], units["Breitengrad"], crs=4326
328
            ),
329
            crs=4326,
330
        )
331
332
        units["geometry_geocoded"] = (
333
            units.Laengengrad.isna() | units.Laengengrad.isna()
334
        )
335
336
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
337
            ~units.geometry_geocoded, "geometry"
338
        ].is_valid
339
340
        units_wo_geom = units["geometry_geocoded"].sum()
341
342
        logger.debug(
343
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
344
            " Adding geocoding results."
345
        )
346
347
        # determine zip and municipality string
348
        mask = (
349
            units.Postleitzahl.apply(isfloat)
350
            & ~units.Postleitzahl.isna()
351
            & ~units.Gemeinde.isna()
352
        )
353
        units["zip_and_municipality"] = np.nan
354
        ok_units = units.loc[mask]
355
356
        units.loc[mask, "zip_and_municipality"] = (
357
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
358
            + " "
359
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
360
            + ", Deutschland"
361
        )
362
363
        # get zip and municipality from Standort
364
        parse_df = units.loc[~mask]
365
366
        if not parse_df.empty and "Standort" in parse_df.columns:
367
            init_len = len(parse_df)
368
369
            logger.info(
370
                f"Parsing ZIP code and municipality from Standort for "
371
                f"{init_len} values for {tech}."
372
            )
373
374
            parse_df[["zip_and_municipality", "drop_this"]] = (
375
                parse_df.Standort.astype(str)
376
                .apply(zip_and_municipality_from_standort)
377
                .tolist()
378
            )
379
380
            parse_df = parse_df.loc[parse_df.drop_this]
381
382
            if not parse_df.empty:
383
                units.loc[
384
                    parse_df.index, "zip_and_municipality"
385
                ] = parse_df.zip_and_municipality
386
387
        # add geocoding to missing
388
        units = units.merge(
389
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
390
                columns={"geometry": "temp"}
391
            ),
392
            how="left",
393
            on="zip_and_municipality",
394
        )
395
396
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
397
            units.geometry_geocoded, "temp"
398
        ]
399
400
        init_len = len(units)
401
402
        logger.info(
403
            "Dropping units outside boundary by geometry or without geometry"
404
            "..."
405
        )
406
407
        units.dropna(subset=["geometry"], inplace=True)
408
409
        units = units.loc[units.geometry.within(boundary)]
410
411
        logger.debug(
412
            f"{init_len - len(units)}/{init_len} "
413
            f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
414
        )
415
416
        # drop unnecessary and rename columns
417
        logger.debug("Reformatting...")
418
        units.drop(
419
            columns=[
420
                "LokationMastrNummer",
421
                "MaStRNummer",
422
                "Laengengrad",
423
                "Breitengrad",
424
                "Spannungsebene",
425
                "Land",
426
                "temp",
427
            ],
428
            inplace=True,
429
        )
430
        mapping = cols_mapping["all"].copy()
431
        mapping.update(cols_mapping[tech])
432
        mapping.update({"geometry": "geom"})
433
        units.rename(columns=mapping, inplace=True)
434
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
435
436
        units.set_geometry("geom", inplace=True)
437
        units["id"] = range(0, len(units))
438
439
        # change capacity unit: kW to MW
440
        units["capacity"] = units["capacity"] / 1e3
441
        if "capacity_inverter" in units.columns:
442
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
443
        if "th_capacity" in units.columns:
444
            units["th_capacity"] = units["th_capacity"] / 1e3
445
446
        # assign bus ids
447
        logger.debug("Assigning bus ids...")
448
        units = units.assign(
449
            bus_id=units.loc[~units.geom.x.isna()]
450
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
451
            .drop(columns=["index_right"])
452
            .bus_id
453
        )
454
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
455
456
        # write to DB
457
        logger.info(f"Writing {len(units)} units to DB...")
458
459
        units.to_postgis(
460
            name=target_tables[tech].__tablename__,
461
            con=engine,
462
            if_exists="append",
463
            schema=target_tables[tech].__table_args__["schema"],
464
        )
465