Passed
Pull Request — dev (#1369)
by
unknown
01:59
created

data.datasets.power_plants.mastr.isfloat()   A

Complexity

Conditions 2

Size

Total Lines 18
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 18
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
14
* `voltage_level`: inferred based on nominal power (`capacity`) using the
15
  ranges from
16
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
17
  which results in True in column `voltage_level_inferred`. Remaining datasets
18
  are set to -1 (which only occurs if `capacity` is empty).
19
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
20
  districts or no geom available, e.g. for units with nom. power <30 kW)
21
* `supply.egon_power_plants_hydro.plant_type`: NaN
22
23
The data is used especially for the generation of status quo grids by ding0.
24
"""
25
26
from __future__ import annotations
27
28
from pathlib import Path
29
30
from loguru import logger
31
import geopandas as gpd
32
import numpy as np
33
import pandas as pd
34
35
from egon.data import config, db
36
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
37
from egon.data.datasets.power_plants.mastr_db_classes import (
38
    EgonMastrGeocoded,
39
    EgonPowerPlantsBiomass,
40
    EgonPowerPlantsCombustion,
41
    EgonPowerPlantsGsgk,
42
    EgonPowerPlantsHydro,
43
    EgonPowerPlantsNuclear,
44
    EgonPowerPlantsPv,
45
    EgonPowerPlantsStorage,
46
    EgonPowerPlantsWind,
47
    add_metadata,
48
)
49
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
50
    federal_state_data,
51
)
52
53
TESTMODE_OFF = (
54
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
55
)
56
57
58
def isfloat(num: str):
59
    """
60
    Determine if string can be converted to float.
61
62
    Parameters
63
    -----------
64
    num : str
65
        String to parse.
66
    Returns
67
    -------
68
    bool
69
        Returns True in string can be parsed to float.
70
    """
71
    try:
72
        float(num)
73
        return True
74
    except ValueError:
75
        return False
76
77
78
def zip_and_municipality_from_standort(
79
    standort: str,
80
) -> tuple[str, bool]:
81
    """
82
    Get zip code and municipality from Standort string split into a list.
83
84
    Parameters
85
    -----------
86
    standort : str
87
        Standort as given from MaStR data.
88
    Returns
89
    -------
90
    str
91
        Standort with only the zip code and municipality
92
        as well a ', Germany' added.
93
    """
94
    standort_list = standort.split()
95
96
    found = False
97
    count = 0
98
99
    for count, elem in enumerate(standort_list):
100
        if len(elem) != 5:
101
            continue
102
        if not elem.isnumeric():
103
            continue
104
105
        found = True
106
107
        break
108
109
    if found:
110
        cleaned_str = " ".join(standort_list[count:])
111
112
        return cleaned_str, found
113
114
    logger.warning(
115
        "Couldn't identify zip code. This entry will be dropped."
116
        f" Original standort: {standort}."
117
    )
118
119
    return standort, found
120
121
122 View Code Duplication
def infer_voltage_level(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
123
    units_gdf: gpd.GeoDataFrame,
124
) -> gpd.GeoDataFrame:
125
    """
126
    Infer nan values in voltage level derived from generator capacity to
127
    the power plants.
128
129
    Parameters
130
    -----------
131
    units_gdf : geopandas.GeoDataFrame
132
        GeoDataFrame containing units with voltage levels from MaStR
133
    Returns
134
    -------
135
    geopandas.GeoDataFrame
136
        GeoDataFrame containing units all having assigned a voltage level.
137
    """
138
139
    def voltage_levels(p: float) -> int:
140
        if p <= 100:
141
            return 7
142
        elif p <= 200:
143
            return 6
144
        elif p <= 5500:
145
            return 5
146
        elif p <= 20000:
147
            return 4
148
        elif p <= 120000:
149
            return 3
150
        return 1
151
152
    units_gdf["voltage_level_inferred"] = False
153
    mask = units_gdf.voltage_level.isna()
154
    units_gdf.loc[mask, "voltage_level_inferred"] = True
155
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
156
        mask
157
    ].Nettonennleistung.apply(voltage_levels)
158
159
    return units_gdf
160
161
162
def import_mastr() -> None:
163
    """Import MaStR data into database"""
164
    engine = db.engine()
165
166
    # import geocoded data
167
    cfg = config.datasets()["mastr_new"]
168
    path_parts = cfg["geocoding_path"]
169
    path = Path(*["."] + path_parts).resolve()
170
    path = list(path.iterdir())[0]
171
172
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
173
    deposit_id_mastr = cfg["deposit_id"]
174
175
    if deposit_id_geocoding != deposit_id_mastr:
176
        raise AssertionError(
177
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
178
            f" dataset is not matching with the geocoding version "
179
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
180
            f"the MaStR dataset is updated also update the geocoding and "
181
            f"update the egon data bundle. The geocoding can be done using: "
182
            f"https://github.com/RLI-sandbox/mastr-geocoding"
183
        )
184
185
    geocoding_gdf = gpd.read_file(path)
186
187
    # remove failed requests
188
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
189
190
    # remove unnecesary columns
191
    geocoding_gdf.drop(columns="geocode_source", inplace=True)
192
193
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
194
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
195
196
    geocoding_gdf.to_postgis(
197
        name=EgonMastrGeocoded.__tablename__,
198
        con=engine,
199
        if_exists="append",
200
        schema=EgonMastrGeocoded.__table_args__["schema"],
201
        index=True,
202
    )
203
204
    cfg = config.datasets()["power_plants"]
205
206
    cols_mapping = {
207
        "all": {
208
            "EinheitMastrNummer": "gens_id",
209
            "EinheitBetriebsstatus": "status",
210
            "Inbetriebnahmedatum": "commissioning_date",
211
            "Postleitzahl": "postcode",
212
            "Ort": "city",
213
            "Gemeinde": "municipality",
214
            "Bundesland": "federal_state",
215
            "Nettonennleistung": "capacity",
216
            "Einspeisungsart": "feedin_type",
217
            "DatumEndgueltigeStilllegung": "decommissioning_date",
218
        },
219
        "pv": {
220
            "Lage": "site_type",
221
            "Standort": "site",
222
            "Nutzungsbereich": "usage_sector",
223
            "Hauptausrichtung": "orientation_primary",
224
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
225
            "Nebenausrichtung": "orientation_secondary",
226
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
227
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
228
            "AnzahlModule": "module_count",
229
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
230
        },
231
        "wind": {
232
            "Lage": "site_type",
233
            "Hersteller": "manufacturer_name",
234
            "Typenbezeichnung": "type_name",
235
            "Nabenhoehe": "hub_height",
236
            "Rotordurchmesser": "rotor_diameter",
237
        },
238
        "biomass": {
239
            "Technologie": "technology",
240
            "Hauptbrennstoff": "main_fuel",
241
            "Biomasseart": "fuel_type",
242
            "ThermischeNutzleistung": "th_capacity",
243
        },
244
        "hydro": {
245
            "ArtDerWasserkraftanlage": "plant_type",
246
            "ArtDesZuflusses": "water_origin",
247
        },
248
        "combustion": {
249
            "Energietraeger": "carrier",
250
            "Hauptbrennstoff": "main_fuel",
251
            "WeitererHauptbrennstoff": "other_main_fuel",
252
            "Technologie": "technology",
253
            "ThermischeNutzleistung": "th_capacity",
254
        },
255
        "gsgk": {
256
            "Energietraeger": "carrier",
257
            "Technologie": "technology",
258
        },
259
        "nuclear": {
260
            "Energietraeger": "carrier",
261
            "Technologie": "technology",
262
        },
263
        "storage": {
264
            "Energietraeger": "carrier",
265
            "Technologie": "technology",
266
            "Batterietechnologie": "battery_type",
267
            "Pumpspeichertechnologie": "pump_storage_type",
268
        },
269
    }
270
271
    source_files = {
272
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
273
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
274
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
275
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
276
        "combustion": WORKING_DIR_MASTR_NEW
277
        / cfg["sources"]["mastr_combustion"],
278
        "gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"],
279
        "nuclear": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_nuclear"],
280
        "storage": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_storage"],
281
    }
282
283
    target_tables = {
284
        "pv": EgonPowerPlantsPv,
285
        "wind": EgonPowerPlantsWind,
286
        "biomass": EgonPowerPlantsBiomass,
287
        "hydro": EgonPowerPlantsHydro,
288
        "combustion": EgonPowerPlantsCombustion,
289
        "gsgk": EgonPowerPlantsGsgk,
290
        "nuclear": EgonPowerPlantsNuclear,
291
        "storage": EgonPowerPlantsStorage,
292
    }
293
294
    vlevel_mapping = {
295
        "Höchstspannung": 1,
296
        "UmspannungZurHochspannung": 2,
297
        "Hochspannung": 3,
298
        "UmspannungZurMittelspannung": 4,
299
        "Mittelspannung": 5,
300
        "UmspannungZurNiederspannung": 6,
301
        "Niederspannung": 7,
302
    }
303
304
    # import locations
305
    locations = pd.read_csv(
306
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
307
        index_col=None,
308
    )
309
310
    # import grid districts
311
    mv_grid_districts = db.select_geodataframe(
312
        f"""
313
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
314
        """,
315
        epsg=4326,
316
    )
317
318
    # import units
319
    technologies = [
320
        "pv",
321
        "wind",
322
        "biomass",
323
        "hydro",
324
        "combustion",
325
        "gsgk",
326
        "nuclear",
327
        "storage",
328
    ]
329
330
    for tech in technologies:
331
        # read units
332
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
333
        logger.debug("Reading CSV and filtering data...")
334
        units = pd.read_csv(
335
            source_files[tech],
336
            usecols=(
337
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
338
                + list(cols_mapping["all"].keys())
339
                + list(cols_mapping[tech].keys())
340
            ),
341
            index_col=None,
342
            dtype={"Postleitzahl": str},
343
            low_memory=False,
344
        ).rename(columns=cols_mapping)
345
346
347
        # drop units outside of Germany
348
        len_old = len(units)
349
        units = units.loc[units.Land == "Deutschland"]
350
        logger.debug(
351
            f"{len_old - len(units)} units outside of Germany dropped..."
352
        )
353
354
        # get boundary
355
        boundary = (
356
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
357
        )
358
359
        # drop not operating units
360
        len_old = len(units)
361
        units = units.loc[
362
            units.EinheitBetriebsstatus.isin(
363
                ["InBetrieb", "VoruebergehendStillgelegt"]
364
            )
365
        ]
366
        logger.debug(f"{len_old - len(units)} not operating units dropped...")
367
368
        # filter for SH units if in testmode
369
        if not TESTMODE_OFF:
370
            logger.info(
371
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
372
            )
373
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
374
375
        # merge and rename voltage level
376
        logger.debug("Merging with locations and allocate voltage level...")
377
        units = units.merge(
378
            locations[["MaStRNummer", "Spannungsebene"]],
379
            left_on="LokationMastrNummer",
380
            right_on="MaStRNummer",
381
            how="left",
382
        )
383
        # convert voltage levels to numbers
384
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
385
        # set voltage level for nan values
386
        units = infer_voltage_level(units)
387
388
        # add geometry
389
        logger.debug("Adding geometries...")
390
        units = gpd.GeoDataFrame(
391
            units,
392
            geometry=gpd.points_from_xy(
393
                units["Laengengrad"], units["Breitengrad"], crs=4326
394
            ),
395
            crs=4326,
396
        )
397
398
        units["geometry_geocoded"] = (
399
            units.Laengengrad.isna() | units.Laengengrad.isna()
400
        )
401
402
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
403
            ~units.geometry_geocoded, "geometry"
404
        ].is_valid
405
406
        units_wo_geom = units["geometry_geocoded"].sum()
407
408
        logger.debug(
409
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
410
            " Adding geocoding results."
411
        )
412
413
        # determine zip and municipality string
414
        mask = (
415
            units.Postleitzahl.apply(isfloat)
416
            & ~units.Postleitzahl.isna()
417
            & ~units.Gemeinde.isna()
418
        )
419
        units["zip_and_municipality"] = np.nan
420
        ok_units = units.loc[mask]
421
422
        units.loc[mask, "zip_and_municipality"] = (
423
            ok_units.Postleitzahl.astype(float)
424
            .astype(int)
425
            .astype(str)
426
            .str.zfill(5)
427
            + " "
428
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
429
            + ", Deutschland"
430
        )
431
432
        # get zip and municipality from Standort
433
        parse_df = units.loc[~mask]
434
435
        if not parse_df.empty and "Standort" in parse_df.columns:
436
            init_len = len(parse_df)
437
438
            logger.info(
439
                f"Parsing ZIP code and municipality from Standort for "
440
                f"{init_len} values for {tech}."
441
            )
442
443
            parse_df[["zip_and_municipality", "drop_this"]] = (
444
                parse_df.Standort.astype(str)
445
                .apply(zip_and_municipality_from_standort)
446
                .tolist()
447
            )
448
449
            parse_df = parse_df.loc[parse_df.drop_this]
450
451
            if not parse_df.empty:
452
                units.loc[parse_df.index, "zip_and_municipality"] = (
453
                    parse_df.zip_and_municipality
454
                )
455
456
        # add geocoding to missing
457
        units = units.merge(
458
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
459
                columns={"geometry": "temp"}
460
            ),
461
            how="left",
462
            on="zip_and_municipality",
463
        )
464
465
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
466
            units.geometry_geocoded, "temp"
467
        ]
468
469
        init_len = len(units)
470
471
        logger.info(
472
            "Dropping units outside boundary by geometry or without geometry"
473
            "..."
474
        )
475
476
        units.dropna(subset=["geometry"], inplace=True)
477
478
        units = units.loc[units.geometry.within(boundary)]
479
480
        if init_len > 0:
481
            logger.debug(
482
                f"{init_len - len(units)}/{init_len} "
483
                f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
484
            )
485
486
        # drop unnecessary and rename columns
487
        logger.debug("Reformatting...")
488
        units.drop(
489
            columns=[
490
                "LokationMastrNummer",
491
                "MaStRNummer",
492
                "Laengengrad",
493
                "Breitengrad",
494
                "Spannungsebene",
495
                "Land",
496
                "temp",
497
            ],
498
            inplace=True,
499
        )
500
501
        mapping = cols_mapping["all"].copy()
502
        mapping.update(cols_mapping[tech])
503
        mapping.update({"geometry": "geom"})
504
        units.rename(columns=mapping, inplace=True)
505
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
506
        units["postcode"] = units["postcode"].apply(
507
            lambda x: int(float(x)) if not pd.isna(x) else pd.NA
508
        )
509
        units.set_geometry("geom", inplace=True)
510
        units["id"] = range(len(units))
511
512
        # change capacity unit: kW to MW
513
        units["capacity"] = units["capacity"] / 1e3
514
        if "capacity_inverter" in units.columns:
515
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
516
        if "th_capacity" in units.columns:
517
            units["th_capacity"] = units["th_capacity"] / 1e3
518
519
        # assign bus ids
520
        logger.debug("Assigning bus ids...")
521
        units = units.assign(
522
            bus_id=units.loc[~units.geom.x.isna()]
523
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
524
            .drop(columns=["index_right"])
525
            .bus_id
526
        )
527
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
528
529
        # write to DB
530
        logger.info(f"Writing {len(units)} units to DB...")
531
532
        units.to_postgis(
533
            name=target_tables[tech].__tablename__,
534
            con=engine,
535
            if_exists="append",
536
            schema=target_tables[tech].__table_args__["schema"],
537
        )
538
539
    add_metadata()
540