Passed
Pull Request — dev (#1112)
by
unknown
01:55
created

zip_and_municipality_from_standort()   A

Complexity

Conditions 5

Size

Total Lines 41
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 41
rs 9.0333
c 0
b 0
f 0
cc 5
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from loguru import logger
29
import geopandas as gpd
30
import numpy as np
31
import pandas as pd
32
33
from egon.data import config, db
34
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
35
from egon.data.datasets.power_plants.mastr_db_classes import (
36
    EgonMastrGeocoded,
37
    EgonPowerPlantsBiomass,
38
    EgonPowerPlantsCombustion,
39
    EgonPowerPlantsGsgk,
40
    EgonPowerPlantsHydro,
41
    EgonPowerPlantsNuclear,
42
    EgonPowerPlantsPv,
43
    EgonPowerPlantsStorage,
44
    EgonPowerPlantsWind,
45
)
46
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
47
    federal_state_data,
48
)
49
50
TESTMODE_OFF = (
51
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
52
)
53
54
55
def isfloat(num: str):
56
    """
57
    Determine if string can be converted to float.
58
    Parameters
59
    -----------
60
    num : str
61
        String to parse.
62
    Returns
63
    -------
64
    bool
65
        Returns True in string can be parsed to float.
66
    """
67
    try:
68
        float(num)
69
        return True
70
    except ValueError:
71
        return False
72
73
74
def zip_and_municipality_from_standort(
75
    standort: str,
76
) -> tuple[str, bool]:
77
    """
78
    Get zip code and municipality from Standort string split into a list.
79
    Parameters
80
    -----------
81
    standort : str
82
        Standort as given from MaStR data.
83
    Returns
84
    -------
85
    str
86
        Standort with only the zip code and municipality
87
        as well a ', Germany' added.
88
    """
89
    standort_list = standort.split()
90
91
    found = False
92
    count = 0
93
94
    for count, elem in enumerate(standort_list):
95
        if len(elem) != 5:
96
            continue
97
        if not elem.isnumeric():
98
            continue
99
100
        found = True
101
102
        break
103
104
    if found:
105
        cleaned_str = " ".join(standort_list[count:])
106
107
        return cleaned_str, found
108
109
    logger.warning(
110
        "Couldn't identify zip code. This entry will be dropped."
111
        f" Original standort: {standort}."
112
    )
113
114
    return standort, found
115
116
117
def infer_voltage_level(
118
    units_gdf: gpd.GeoDataFrame,
119
) -> gpd.GeoDataFrame:
120
    """
121
    Infer nan values in voltage level derived from generator capacity to
122
    the power plants.
123
124
    Parameters
125
    -----------
126
    units_gdf : geopandas.GeoDataFrame
127
        GeoDataFrame containing units with voltage levels from MaStR
128
    Returnsunits_gdf: gpd.GeoDataFrame
129
    -------
130
    geopandas.GeoDataFrame
131
        GeoDataFrame containing units all having assigned a voltage level.
132
    """
133
134
    def voltage_levels(p: float) -> int:
135
        if p <= 100:
136
            return 7
137
        elif p <= 200:
138
            return 6
139
        elif p <= 5500:
140
            return 5
141
        elif p <= 20000:
142
            return 4
143
        elif p <= 120000:
144
            return 3
145
        return 1
146
147
    units_gdf["voltage_level_inferred"] = False
148
    mask = units_gdf.voltage_level.isna()
149
    units_gdf.loc[mask, "voltage_level_inferred"] = True
150
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
151
        mask
152
    ].Nettonennleistung.apply(voltage_levels)
153
154
    return units_gdf
155
156
157
def import_mastr() -> None:
158
    """Import MaStR data into database"""
159
    engine = db.engine()
160
161
    # import geocoded data
162
    cfg = config.datasets()["mastr_new"]
163
    path_parts = cfg["geocoding_path"]
164
    path = Path(*["."] + path_parts).resolve()
165
    path = list(path.iterdir())[0]
166
167
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
168
    deposit_id_mastr = cfg["deposit_id"]
169
170
    if deposit_id_geocoding != deposit_id_mastr:
171
        raise AssertionError(
172
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
173
            f" dataset is not matching with the geocoding version "
174
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
175
            f"the MaStR dataset is updated also update the geocoding and "
176
            f"update the egon data bundle. The geocoding can be done using: "
177
            f"https://github.com/RLI-sandbox/mastr-geocoding"
178
        )
179
180
    geocoding_gdf = gpd.read_file(path)
181
182
    # remove failed requests
183
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
184
185
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
186
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
187
188
    geocoding_gdf.to_postgis(
189
        name=EgonMastrGeocoded.__tablename__,
190
        con=engine,
191
        if_exists="append",
192
        schema=EgonMastrGeocoded.__table_args__["schema"],
193
        index=True,
194
    )
195
196
    cfg = config.datasets()["power_plants"]
197
198
    cols_mapping = {
199
        "all": {
200
            "EinheitMastrNummer": "gens_id",
201
            "EinheitBetriebsstatus": "status",
202
            "Inbetriebnahmedatum": "commissioning_date",
203
            "Postleitzahl": "postcode",
204
            "Ort": "city",
205
            "Gemeinde": "municipality",
206
            "Bundesland": "federal_state",
207
            "Nettonennleistung": "capacity",
208
            "Einspeisungsart": "feedin_type",
209
        },
210
        "pv": {
211
            "Lage": "site_type",
212
            "Standort": "site",
213
            "Nutzungsbereich": "usage_sector",
214
            "Hauptausrichtung": "orientation_primary",
215
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
216
            "Nebenausrichtung": "orientation_secondary",
217
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
218
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
219
            "AnzahlModule": "module_count",
220
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
221
        },
222
        "wind": {
223
            "Lage": "site_type",
224
            "Hersteller": "manufacturer_name",
225
            "Typenbezeichnung": "type_name",
226
            "Nabenhoehe": "hub_height",
227
            "Rotordurchmesser": "rotor_diameter",
228
        },
229
        "biomass": {
230
            "Technologie": "technology",
231
            "Hauptbrennstoff": "main_fuel",
232
            "Biomasseart": "fuel_type",
233
            "ThermischeNutzleistung": "th_capacity",
234
        },
235
        "hydro": {
236
            "ArtDerWasserkraftanlage": "plant_type",
237
            "ArtDesZuflusses": "water_origin",
238
        },
239
        "combustion": {
240
            "Energietraeger": "carrier",
241
            "Hauptbrennstoff": "main_fuel",
242
            "WeitererHauptbrennstoff": "other_main_fuel",
243
            "Technologie": "technology",
244
            "ThermischeNutzleistung": "th_capacity",
245
        },
246
        "gsgk": {
247
            "Energietraeger": "carrier",
248
            "Technologie": "technology",
249
        },
250
        "nuclear": {
251
            "Energietraeger": "carrier",
252
            "Technologie": "technology",
253
        },
254
        "storage": {
255
            "Energietraeger": "carrier",
256
            "Technologie": "technology",
257
            "Batterietechnologie": "battery_type",
258
            "Pumpspeichertechnologie": "pump_storage_type",
259
        },
260
    }
261
262
    source_files = {
263
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
264
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
265
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
266
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
267
        "combustion": WORKING_DIR_MASTR_NEW
268
        / cfg["sources"]["mastr_combustion"],
269
        "gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"],
270
        "nuclear": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_nuclear"],
271
        "storage": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_storage"],
272
    }
273
274
    target_tables = {
275
        "pv": EgonPowerPlantsPv,
276
        "wind": EgonPowerPlantsWind,
277
        "biomass": EgonPowerPlantsBiomass,
278
        "hydro": EgonPowerPlantsHydro,
279
        "combustion": EgonPowerPlantsCombustion,
280
        "gsgk": EgonPowerPlantsGsgk,
281
        "nuclear": EgonPowerPlantsNuclear,
282
        "storage": EgonPowerPlantsStorage,
283
    }
284
285
    vlevel_mapping = {
286
        "Höchstspannung": 1,
287
        "UmspannungZurHochspannung": 2,
288
        "Hochspannung": 3,
289
        "UmspannungZurMittelspannung": 4,
290
        "Mittelspannung": 5,
291
        "UmspannungZurNiederspannung": 6,
292
        "Niederspannung": 7,
293
    }
294
295
    # import locations
296
    locations = pd.read_csv(
297
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
298
        index_col=None,
299
    )
300
301
    # import grid districts
302
    mv_grid_districts = db.select_geodataframe(
303
        f"""
304
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
305
        """,
306
        epsg=4326,
307
    )
308
309
    # import units
310
    technologies = [
311
        "pv",
312
        "wind",
313
        "biomass",
314
        "hydro",
315
        "combustion",
316
        "gsgk",
317
        "nuclear",
318
        "storage",
319
    ]
320
321
    for tech in technologies:
322
        # read units
323
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
324
        logger.debug("Reading CSV and filtering data...")
325
        units = pd.read_csv(
326
            source_files[tech],
327
            usecols=(
328
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
329
                + list(cols_mapping["all"].keys())
330
                + list(cols_mapping[tech].keys())
331
            ),
332
            index_col=None,
333
            dtype={"Postleitzahl": str},
334
        ).rename(columns=cols_mapping)
335
336
        # drop units outside of Germany
337
        len_old = len(units)
338
        units = units.loc[units.Land == "Deutschland"]
339
        logger.debug(
340
            f"{len_old - len(units)} units outside of Germany dropped..."
341
        )
342
343
        # get boundary
344
        boundary = (
345
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
346
        )
347
348
        # filter for SH units if in testmode
349
        if not TESTMODE_OFF:
350
            logger.info(
351
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
352
            )
353
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
354
355
        # merge and rename voltage level
356
        logger.debug("Merging with locations and allocate voltage level...")
357
        units = units.merge(
358
            locations[["MaStRNummer", "Spannungsebene"]],
359
            left_on="LokationMastrNummer",
360
            right_on="MaStRNummer",
361
            how="left",
362
        )
363
        # convert voltage levels to numbers
364
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
365
        # set voltage level for nan values
366
        units = infer_voltage_level(units)
367
368
        # add geometry
369
        logger.debug("Adding geometries...")
370
        units = gpd.GeoDataFrame(
371
            units,
372
            geometry=gpd.points_from_xy(
373
                units["Laengengrad"], units["Breitengrad"], crs=4326
374
            ),
375
            crs=4326,
376
        )
377
378
        units["geometry_geocoded"] = (
379
            units.Laengengrad.isna() | units.Laengengrad.isna()
380
        )
381
382
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
383
            ~units.geometry_geocoded, "geometry"
384
        ].is_valid
385
386
        units_wo_geom = units["geometry_geocoded"].sum()
387
388
        logger.debug(
389
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
390
            " Adding geocoding results."
391
        )
392
393
        # determine zip and municipality string
394
        mask = (
395
            units.Postleitzahl.apply(isfloat)
396
            & ~units.Postleitzahl.isna()
397
            & ~units.Gemeinde.isna()
398
        )
399
        units["zip_and_municipality"] = np.nan
400
        ok_units = units.loc[mask]
401
402
        units.loc[mask, "zip_and_municipality"] = (
403
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
404
            + " "
405
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
406
            + ", Deutschland"
407
        )
408
409
        # get zip and municipality from Standort
410
        parse_df = units.loc[~mask]
411
412
        if not parse_df.empty and "Standort" in parse_df.columns:
413
            init_len = len(parse_df)
414
415
            logger.info(
416
                f"Parsing ZIP code and municipality from Standort for "
417
                f"{init_len} values for {tech}."
418
            )
419
420
            parse_df[["zip_and_municipality", "drop_this"]] = (
421
                parse_df.Standort.astype(str)
422
                .apply(zip_and_municipality_from_standort)
423
                .tolist()
424
            )
425
426
            parse_df = parse_df.loc[parse_df.drop_this]
427
428
            if not parse_df.empty:
429
                units.loc[
430
                    parse_df.index, "zip_and_municipality"
431
                ] = parse_df.zip_and_municipality
432
433
        # add geocoding to missing
434
        units = units.merge(
435
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
436
                columns={"geometry": "temp"}
437
            ),
438
            how="left",
439
            on="zip_and_municipality",
440
        )
441
442
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
443
            units.geometry_geocoded, "temp"
444
        ]
445
446
        init_len = len(units)
447
448
        logger.info(
449
            "Dropping units outside boundary by geometry or without geometry"
450
            "..."
451
        )
452
453
        units.dropna(subset=["geometry"], inplace=True)
454
455
        units = units.loc[units.geometry.within(boundary)]
456
457
        logger.debug(
458
            f"{init_len - len(units)}/{init_len} "
459
            f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
460
        )
461
462
        # drop unnecessary and rename columns
463
        logger.debug("Reformatting...")
464
        units.drop(
465
            columns=[
466
                "LokationMastrNummer",
467
                "MaStRNummer",
468
                "Laengengrad",
469
                "Breitengrad",
470
                "Spannungsebene",
471
                "Land",
472
                "temp",
473
            ],
474
            inplace=True,
475
        )
476
        mapping = cols_mapping["all"].copy()
477
        mapping.update(cols_mapping[tech])
478
        mapping.update({"geometry": "geom"})
479
        units.rename(columns=mapping, inplace=True)
480
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
481
482
        units.set_geometry("geom", inplace=True)
483
        units["id"] = range(0, len(units))
484
485
        # change capacity unit: kW to MW
486
        units["capacity"] = units["capacity"] / 1e3
487
        if "capacity_inverter" in units.columns:
488
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
489
        if "th_capacity" in units.columns:
490
            units["th_capacity"] = units["th_capacity"] / 1e3
491
492
        # assign bus ids
493
        logger.debug("Assigning bus ids...")
494
        units = units.assign(
495
            bus_id=units.loc[~units.geom.x.isna()]
496
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
497
            .drop(columns=["index_right"])
498
            .bus_id
499
        )
500
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
501
502
        # write to DB
503
        logger.info(f"Writing {len(units)} units to DB...")
504
505
        units.to_postgis(
506
            name=target_tables[tech].__tablename__,
507
            con=engine,
508
            if_exists="append",
509
            schema=target_tables[tech].__table_args__["schema"],
510
        )
511