Passed
Pull Request — dev (#1112)
by
unknown
01:40
created

data.datasets.power_plants.mastr.isfloat()   A

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 17
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from loguru import logger
29
import geopandas as gpd
30
import numpy as np
31
import pandas as pd
32
33
from egon.data import config, db
34
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
35
from egon.data.datasets.power_plants.mastr_db_classes import (
36
    EgonMastrGeocoded,
37
    EgonPowerPlantsBiomass,
38
    EgonPowerPlantsCombustion,
39
    EgonPowerPlantsHydro,
40
    EgonPowerPlantsPv,
41
    EgonPowerPlantsWind,
42
)
43
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
44
    federal_state_data,
45
)
46
47
TESTMODE_OFF = (
48
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
49
)
50
51
52
def isfloat(num: str):
53
    """
54
    Determine if string can be converted to float.
55
    Parameters
56
    -----------
57
    num : str
58
        String to parse.
59
    Returns
60
    -------
61
    bool
62
        Returns True in string can be parsed to float.
63
    """
64
    try:
65
        float(num)
66
        return True
67
    except ValueError:
68
        return False
69
70
71
def zip_and_municipality_from_standort(
72
    standort: str,
73
) -> tuple[str, bool]:
74
    """
75
    Get zip code and municipality from Standort string split into a list.
76
    Parameters
77
    -----------
78
    standort : str
79
        Standort as given from MaStR data.
80
    Returns
81
    -------
82
    str
83
        Standort with only the zip code and municipality
84
        as well a ', Germany' added.
85
    """
86
    standort_list = standort.split()
87
88
    found = False
89
    count = 0
90
91
    for count, elem in enumerate(standort_list):
92
        if len(elem) != 5:
93
            continue
94
        if not elem.isnumeric():
95
            continue
96
97
        found = True
98
99
        break
100
101
    if found:
102
        cleaned_str = " ".join(standort_list[count:])
103
104
        return cleaned_str, found
105
106
    logger.warning(
107
        "Couldn't identify zip code. This entry will be dropped."
108
        f" Original standort: {standort}."
109
    )
110
111
    return standort, found
112
113
114
def infer_voltage_level(
115
    units_gdf: gpd.GeoDataFrame,
116
) -> gpd.GeoDataFrame:
117
    """
118
    Infer nan values in voltage level derived from generator capacity to
119
    the power plants.
120
121
    Parameters
122
    -----------
123
    units_gdf : geopandas.GeoDataFrame
124
        GeoDataFrame containing units with voltage levels from MaStR
125
    Returnsunits_gdf: gpd.GeoDataFrame
126
    -------
127
    geopandas.GeoDataFrame
128
        GeoDataFrame containing units all having assigned a voltage level.
129
    """
130
131
    def voltage_levels(p: float) -> int:
132
        if p <= 100:
133
            return 7
134
        elif p <= 200:
135
            return 6
136
        elif p <= 5500:
137
            return 5
138
        elif p <= 20000:
139
            return 4
140
        elif p <= 120000:
141
            return 3
142
        return 1
143
144
    units_gdf["voltage_level_inferred"] = False
145
    mask = units_gdf.voltage_level.isna()
146
    units_gdf.loc[mask, "voltage_level_inferred"] = True
147
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
148
        mask
149
    ].Nettonennleistung.apply(voltage_levels)
150
151
    return units_gdf
152
153
154
def import_mastr() -> None:
155
    """Import MaStR data into database"""
156
    engine = db.engine()
157
158
    # import geocoded data
159
    cfg = config.datasets()["mastr_new"]
160
    path_parts = cfg["geocoding_path"]
161
    path = Path(*["."] + path_parts).resolve()
162
    path = list(path.iterdir())[0]
163
164
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
165
    deposit_id_mastr = cfg["deposit_id"]
166
167
    if deposit_id_geocoding != deposit_id_mastr:
168
        raise AssertionError(
169
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
170
            f" dataset is not matching with the geocoding version "
171
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
172
            f"the MaStR dataset is updated also update the geocoding and "
173
            f"update the egon data bundle. The geocoding can be done using: "
174
            f"https://github.com/RLI-sandbox/mastr-geocoding"
175
        )
176
177
    geocoding_gdf = gpd.read_file(path)
178
179
    # remove failed requests
180
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
181
182
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
183
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
184
185
    geocoding_gdf.to_postgis(
186
        name=EgonMastrGeocoded.__tablename__,
187
        con=engine,
188
        if_exists="append",
189
        schema=EgonMastrGeocoded.__table_args__["schema"],
190
        index=True,
191
    )
192
193
    cfg = config.datasets()["power_plants"]
194
195
    cols_mapping = {
196
        "all": {
197
            "EinheitMastrNummer": "gens_id",
198
            "EinheitBetriebsstatus": "status",
199
            "Inbetriebnahmedatum": "commissioning_date",
200
            "Postleitzahl": "postcode",
201
            "Ort": "city",
202
            "Gemeinde": "municipality",
203
            "Bundesland": "federal_state",
204
            "Nettonennleistung": "capacity",
205
            "Einspeisungsart": "feedin_type",
206
        },
207
        "pv": {
208
            "Lage": "site_type",
209
            "Standort": "site",
210
            "Nutzungsbereich": "usage_sector",
211
            "Hauptausrichtung": "orientation_primary",
212
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
213
            "Nebenausrichtung": "orientation_secondary",
214
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
215
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
216
            "AnzahlModule": "module_count",
217
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
218
        },
219
        "wind": {
220
            "Lage": "site_type",
221
            "Hersteller": "manufacturer_name",
222
            "Typenbezeichnung": "type_name",
223
            "Nabenhoehe": "hub_height",
224
            "Rotordurchmesser": "rotor_diameter",
225
        },
226
        "biomass": {
227
            "Technologie": "technology",
228
            "Hauptbrennstoff": "main_fuel",
229
            "Biomasseart": "fuel_type",
230
            "ThermischeNutzleistung": "th_capacity",
231
        },
232
        "hydro": {
233
            "ArtDerWasserkraftanlage": "plant_type",
234
            "ArtDesZuflusses": "water_origin",
235
        },
236
        "combustion": {
237
            "Energietraeger": "carrier",
238
            "Hauptbrennstoff": "main_fuel",
239
            "WeitererHauptbrennstoff": "other_main_fuel",
240
            "Technologie": "technology",
241
            "ThermischeNutzleistung": "th_capacity",
242
        },
243
    }
244
245
    source_files = {
246
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
247
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
248
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
249
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
250
        "combustion": WORKING_DIR_MASTR_NEW
251
        / cfg["sources"]["mastr_combustion"],
252
    }
253
    target_tables = {
254
        "pv": EgonPowerPlantsPv,
255
        "wind": EgonPowerPlantsWind,
256
        "biomass": EgonPowerPlantsBiomass,
257
        "hydro": EgonPowerPlantsHydro,
258
        "combustion": EgonPowerPlantsCombustion,
259
    }
260
    vlevel_mapping = {
261
        "Höchstspannung": 1,
262
        "UmspannungZurHochspannung": 2,
263
        "Hochspannung": 3,
264
        "UmspannungZurMittelspannung": 4,
265
        "Mittelspannung": 5,
266
        "UmspannungZurNiederspannung": 6,
267
        "Niederspannung": 7,
268
    }
269
270
    # import locations
271
    locations = pd.read_csv(
272
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
273
        index_col=None,
274
    )
275
276
    # import grid districts
277
    mv_grid_districts = db.select_geodataframe(
278
        f"""
279
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
280
        """,
281
        epsg=4326,
282
    )
283
284
    # import units
285
    technologies = ["pv", "wind", "biomass", "hydro", "combustion"]
286
    for tech in technologies:
287
        # read units
288
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
289
        logger.debug("Reading CSV and filtering data...")
290
        units = pd.read_csv(
291
            source_files[tech],
292
            usecols=(
293
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
294
                + list(cols_mapping["all"].keys())
295
                + list(cols_mapping[tech].keys())
296
            ),
297
            index_col=None,
298
            dtype={"Postleitzahl": str},
299
        ).rename(columns=cols_mapping)
300
301
        # drop units outside of Germany
302
        len_old = len(units)
303
        units = units.loc[units.Land == "Deutschland"]
304
        logger.debug(
305
            f"{len_old - len(units)} units outside of Germany dropped..."
306
        )
307
308
        # get boundary
309
        boundary = (
310
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
311
        )
312
313
        # filter for SH units if in testmode
314
        if not TESTMODE_OFF:
315
            logger.info(
316
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
317
            )
318
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
319
320
        # merge and rename voltage level
321
        logger.debug("Merging with locations and allocate voltage level...")
322
        units = units.merge(
323
            locations[["MaStRNummer", "Spannungsebene"]],
324
            left_on="LokationMastrNummer",
325
            right_on="MaStRNummer",
326
            how="left",
327
        )
328
        # convert voltage levels to numbers
329
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
330
        # set voltage level for nan values
331
        units = infer_voltage_level(units)
332
333
        # add geometry
334
        logger.debug("Adding geometries...")
335
        units = gpd.GeoDataFrame(
336
            units,
337
            geometry=gpd.points_from_xy(
338
                units["Laengengrad"], units["Breitengrad"], crs=4326
339
            ),
340
            crs=4326,
341
        )
342
343
        units["geometry_geocoded"] = (
344
            units.Laengengrad.isna() | units.Laengengrad.isna()
345
        )
346
347
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
348
            ~units.geometry_geocoded, "geometry"
349
        ].is_valid
350
351
        units_wo_geom = units["geometry_geocoded"].sum()
352
353
        logger.debug(
354
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
355
            " Adding geocoding results."
356
        )
357
358
        # determine zip and municipality string
359
        mask = (
360
            units.Postleitzahl.apply(isfloat)
361
            & ~units.Postleitzahl.isna()
362
            & ~units.Gemeinde.isna()
363
        )
364
        units["zip_and_municipality"] = np.nan
365
        ok_units = units.loc[mask]
366
367
        units.loc[mask, "zip_and_municipality"] = (
368
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
369
            + " "
370
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
371
            + ", Deutschland"
372
        )
373
374
        # get zip and municipality from Standort
375
        parse_df = units.loc[~mask]
376
377
        if not parse_df.empty and "Standort" in parse_df.columns:
378
            init_len = len(parse_df)
379
380
            logger.info(
381
                f"Parsing ZIP code and municipality from Standort for "
382
                f"{init_len} values for {tech}."
383
            )
384
385
            parse_df[["zip_and_municipality", "drop_this"]] = (
386
                parse_df.Standort.astype(str)
387
                .apply(zip_and_municipality_from_standort)
388
                .tolist()
389
            )
390
391
            parse_df = parse_df.loc[parse_df.drop_this]
392
393
            if not parse_df.empty:
394
                units.loc[
395
                    parse_df.index, "zip_and_municipality"
396
                ] = parse_df.zip_and_municipality
397
398
        # add geocoding to missing
399
        units = units.merge(
400
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
401
                columns={"geometry": "temp"}
402
            ),
403
            how="left",
404
            on="zip_and_municipality",
405
        )
406
407
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
408
            units.geometry_geocoded, "temp"
409
        ]
410
411
        init_len = len(units)
412
413
        logger.info(
414
            "Dropping units outside boundary by geometry or without geometry"
415
            "..."
416
        )
417
418
        units.dropna(subset=["geometry"], inplace=True)
419
420
        units = units.loc[units.geometry.within(boundary)]
421
422
        logger.debug(
423
            f"{init_len - len(units)}/{init_len} "
424
            f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
425
        )
426
427
        # drop unnecessary and rename columns
428
        logger.debug("Reformatting...")
429
        units.drop(
430
            columns=[
431
                "LokationMastrNummer",
432
                "MaStRNummer",
433
                "Laengengrad",
434
                "Breitengrad",
435
                "Spannungsebene",
436
                "Land",
437
                "temp",
438
            ],
439
            inplace=True,
440
        )
441
        mapping = cols_mapping["all"].copy()
442
        mapping.update(cols_mapping[tech])
443
        mapping.update({"geometry": "geom"})
444
        units.rename(columns=mapping, inplace=True)
445
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
446
447
        units.set_geometry("geom", inplace=True)
448
        units["id"] = range(0, len(units))
449
450
        # change capacity unit: kW to MW
451
        units["capacity"] = units["capacity"] / 1e3
452
        if "capacity_inverter" in units.columns:
453
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
454
        if "th_capacity" in units.columns:
455
            units["th_capacity"] = units["th_capacity"] / 1e3
456
457
        # assign bus ids
458
        logger.debug("Assigning bus ids...")
459
        units = units.assign(
460
            bus_id=units.loc[~units.geom.x.isna()]
461
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
462
            .drop(columns=["index_right"])
463
            .bus_id
464
        )
465
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
466
467
        # write to DB
468
        logger.info(f"Writing {len(units)} units to DB...")
469
470
        units.to_postgis(
471
            name=target_tables[tech].__tablename__,
472
            con=engine,
473
            if_exists="append",
474
            schema=target_tables[tech].__table_args__["schema"],
475
        )
476