Passed
Pull Request — dev (#1112)
by
unknown
01:50
created

zip_and_municipality_from_standort()   A

Complexity

Conditions 5

Size

Total Lines 41
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 41
rs 9.0333
c 0
b 0
f 0
cc 5
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from loguru import logger
29
import geopandas as gpd
30
import numpy as np
31
import pandas as pd
32
33
from egon.data import config, db
34
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
35
from egon.data.datasets.power_plants.mastr_db_classes import (
36
    EgonMastrGeocoded,
37
    EgonPowerPlantsBiomass,
38
    EgonPowerPlantsCombustion,
39
    EgonPowerPlantsGsgk,
40
    EgonPowerPlantsHydro,
41
    EgonPowerPlantsNuclear,
42
    EgonPowerPlantsPv,
43
    EgonPowerPlantsStorage,
44
    EgonPowerPlantsWind,
45
)
46
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
47
    federal_state_data,
48
)
49
50
TESTMODE_OFF = (
51
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
52
)
53
54
55
def isfloat(num: str):
56
    """
57
    Determine if string can be converted to float.
58
    Parameters
59
    -----------
60
    num : str
61
        String to parse.
62
    Returns
63
    -------
64
    bool
65
        Returns True in string can be parsed to float.
66
    """
67
    try:
68
        float(num)
69
        return True
70
    except ValueError:
71
        return False
72
73
74
def zip_and_municipality_from_standort(
75
    standort: str,
76
) -> tuple[str, bool]:
77
    """
78
    Get zip code and municipality from Standort string split into a list.
79
    Parameters
80
    -----------
81
    standort : str
82
        Standort as given from MaStR data.
83
    Returns
84
    -------
85
    str
86
        Standort with only the zip code and municipality
87
        as well a ', Germany' added.
88
    """
89
    standort_list = standort.split()
90
91
    found = False
92
    count = 0
93
94
    for count, elem in enumerate(standort_list):
95
        if len(elem) != 5:
96
            continue
97
        if not elem.isnumeric():
98
            continue
99
100
        found = True
101
102
        break
103
104
    if found:
105
        cleaned_str = " ".join(standort_list[count:])
106
107
        return cleaned_str, found
108
109
    logger.warning(
110
        "Couldn't identify zip code. This entry will be dropped."
111
        f" Original standort: {standort}."
112
    )
113
114
    return standort, found
115
116
117
def infer_voltage_level(
118
    units_gdf: gpd.GeoDataFrame,
119
) -> gpd.GeoDataFrame:
120
    """
121
    Infer nan values in voltage level derived from generator capacity to
122
    the power plants.
123
124
    Parameters
125
    -----------
126
    units_gdf : geopandas.GeoDataFrame
127
        GeoDataFrame containing units with voltage levels from MaStR
128
    Returnsunits_gdf: gpd.GeoDataFrame
129
    -------
130
    geopandas.GeoDataFrame
131
        GeoDataFrame containing units all having assigned a voltage level.
132
    """
133
134
    def voltage_levels(p: float) -> int:
135
        if p <= 100:
136
            return 7
137
        elif p <= 200:
138
            return 6
139
        elif p <= 5500:
140
            return 5
141
        elif p <= 20000:
142
            return 4
143
        elif p <= 120000:
144
            return 3
145
        return 1
146
147
    units_gdf["voltage_level_inferred"] = False
148
    mask = units_gdf.voltage_level.isna()
149
    units_gdf.loc[mask, "voltage_level_inferred"] = True
150
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
151
        mask
152
    ].Nettonennleistung.apply(voltage_levels)
153
154
    return units_gdf
155
156
157
def import_mastr() -> None:
158
    """Import MaStR data into database"""
159
    engine = db.engine()
160
161
    # import geocoded data
162
    cfg = config.datasets()["mastr_new"]
163
    path_parts = cfg["geocoding_path"]
164
    path = Path(*["."] + path_parts).resolve()
165
    path = list(path.iterdir())[0]
166
167
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
168
    deposit_id_mastr = cfg["deposit_id"]
169
170
    if deposit_id_geocoding != deposit_id_mastr:
171
        raise AssertionError(
172
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
173
            f" dataset is not matching with the geocoding version "
174
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
175
            f"the MaStR dataset is updated also update the geocoding and "
176
            f"update the egon data bundle. The geocoding can be done using: "
177
            f"https://github.com/RLI-sandbox/mastr-geocoding"
178
        )
179
180
    geocoding_gdf = gpd.read_file(path)
181
182
    # remove failed requests
183
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
184
185
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
186
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
187
188
    geocoding_gdf.to_postgis(
189
        name=EgonMastrGeocoded.__tablename__,
190
        con=engine,
191
        if_exists="append",
192
        schema=EgonMastrGeocoded.__table_args__["schema"],
193
        index=True,
194
    )
195
196
    cfg = config.datasets()["power_plants"]
197
198
    cols_mapping = {
199
        "all": {
200
            "EinheitMastrNummer": "gens_id",
201
            "EinheitBetriebsstatus": "status",
202
            "Inbetriebnahmedatum": "commissioning_date",
203
            "Postleitzahl": "postcode",
204
            "Ort": "city",
205
            "Gemeinde": "municipality",
206
            "Bundesland": "federal_state",
207
            "Nettonennleistung": "capacity",
208
            "Einspeisungsart": "feedin_type",
209
        },
210
        "pv": {
211
            "Lage": "site_type",
212
            "Standort": "site",
213
            "Nutzungsbereich": "usage_sector",
214
            "Hauptausrichtung": "orientation_primary",
215
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
216
            "Nebenausrichtung": "orientation_secondary",
217
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
218
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
219
            "AnzahlModule": "module_count",
220
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
221
        },
222
        "wind": {
223
            "Lage": "site_type",
224
            "Hersteller": "manufacturer_name",
225
            "Typenbezeichnung": "type_name",
226
            "Nabenhoehe": "hub_height",
227
            "Rotordurchmesser": "rotor_diameter",
228
        },
229
        "biomass": {
230
            "Technologie": "technology",
231
            "Hauptbrennstoff": "main_fuel",
232
            "Biomasseart": "fuel_type",
233
            "ThermischeNutzleistung": "th_capacity",
234
        },
235
        "hydro": {
236
            "ArtDerWasserkraftanlage": "plant_type",
237
            "ArtDesZuflusses": "water_origin",
238
        },
239
        "combustion": {
240
            "Energietraeger": "carrier",
241
            "Hauptbrennstoff": "main_fuel",
242
            "WeitererHauptbrennstoff": "other_main_fuel",
243
            "Technologie": "technology",
244
            "ThermischeNutzleistung": "th_capacity",
245
        },
246
        "gsgk": {
247
            "Energietraeger": "carrier",
248
            "Technologie": "technology",
249
        },
250
        "nuclear": {
251
            "Energietraeger": "carrier",
252
            "Technologie": "technology",
253
        },
254
        "storage": {
255
            "Energietraeger": "carrier",
256
            "Technologie": "technology",
257
            "Batterietechnologie": "battery_type",
258
            "Pumpspeichertechnologie": "pump_storage_type",
259
        },
260
    }
261
262
    source_files = {
263
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
264
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
265
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
266
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
267
        "combustion": WORKING_DIR_MASTR_NEW
268
        / cfg["sources"]["mastr_combustion"],
269
        "gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"],
270
        "nuclear": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_nuclear"],
271
        "storage": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_storage"],
272
    }
273
274
    target_tables = {
275
        "pv": EgonPowerPlantsPv,
276
        "wind": EgonPowerPlantsWind,
277
        "biomass": EgonPowerPlantsBiomass,
278
        "hydro": EgonPowerPlantsHydro,
279
        "combustion": EgonPowerPlantsCombustion,
280
        "gsgk": EgonPowerPlantsGsgk,
281
        "nuclear": EgonPowerPlantsNuclear,
282
        "storage": EgonPowerPlantsStorage,
283
    }
284
285
    vlevel_mapping = {
286
        "Höchstspannung": 1,
287
        "UmspannungZurHochspannung": 2,
288
        "Hochspannung": 3,
289
        "UmspannungZurMittelspannung": 4,
290
        "Mittelspannung": 5,
291
        "UmspannungZurNiederspannung": 6,
292
        "Niederspannung": 7,
293
    }
294
295
    # import locations
296
    locations = pd.read_csv(
297
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
298
        index_col=None,
299
    )
300
301
    # import grid districts
302
    mv_grid_districts = db.select_geodataframe(
303
        f"""
304
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
305
        """,
306
        epsg=4326,
307
    )
308
309
    # import units
310
    technologies = [
311
        "pv",
312
        "wind",
313
        "biomass",
314
        "hydro",
315
        "combustion",
316
        "gsgk",
317
        "nuclear",
318
        "storage",
319
    ]
320
321
    for tech in technologies:
322
        # read units
323
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
324
        logger.debug("Reading CSV and filtering data...")
325
        units = pd.read_csv(
326
            source_files[tech],
327
            usecols=(
328
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
329
                + list(cols_mapping["all"].keys())
330
                + list(cols_mapping[tech].keys())
331
            ),
332
            index_col=None,
333
            dtype={"Postleitzahl": str},
334
            low_memory=False,
335
        ).rename(columns=cols_mapping)
336
337
        # drop units outside of Germany
338
        len_old = len(units)
339
        units = units.loc[units.Land == "Deutschland"]
340
        logger.debug(
341
            f"{len_old - len(units)} units outside of Germany dropped..."
342
        )
343
344
        # get boundary
345
        boundary = (
346
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
347
        )
348
349
        # filter for SH units if in testmode
350
        if not TESTMODE_OFF:
351
            logger.info(
352
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
353
            )
354
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
355
356
        # merge and rename voltage level
357
        logger.debug("Merging with locations and allocate voltage level...")
358
        units = units.merge(
359
            locations[["MaStRNummer", "Spannungsebene"]],
360
            left_on="LokationMastrNummer",
361
            right_on="MaStRNummer",
362
            how="left",
363
        )
364
        # convert voltage levels to numbers
365
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
366
        # set voltage level for nan values
367
        units = infer_voltage_level(units)
368
369
        # add geometry
370
        logger.debug("Adding geometries...")
371
        units = gpd.GeoDataFrame(
372
            units,
373
            geometry=gpd.points_from_xy(
374
                units["Laengengrad"], units["Breitengrad"], crs=4326
375
            ),
376
            crs=4326,
377
        )
378
379
        units["geometry_geocoded"] = (
380
            units.Laengengrad.isna() | units.Laengengrad.isna()
381
        )
382
383
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
384
            ~units.geometry_geocoded, "geometry"
385
        ].is_valid
386
387
        units_wo_geom = units["geometry_geocoded"].sum()
388
389
        logger.debug(
390
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
391
            " Adding geocoding results."
392
        )
393
394
        # determine zip and municipality string
395
        mask = (
396
            units.Postleitzahl.apply(isfloat)
397
            & ~units.Postleitzahl.isna()
398
            & ~units.Gemeinde.isna()
399
        )
400
        units["zip_and_municipality"] = np.nan
401
        ok_units = units.loc[mask]
402
403
        units.loc[mask, "zip_and_municipality"] = (
404
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
405
            + " "
406
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
407
            + ", Deutschland"
408
        )
409
410
        # get zip and municipality from Standort
411
        parse_df = units.loc[~mask]
412
413
        if not parse_df.empty and "Standort" in parse_df.columns:
414
            init_len = len(parse_df)
415
416
            logger.info(
417
                f"Parsing ZIP code and municipality from Standort for "
418
                f"{init_len} values for {tech}."
419
            )
420
421
            parse_df[["zip_and_municipality", "drop_this"]] = (
422
                parse_df.Standort.astype(str)
423
                .apply(zip_and_municipality_from_standort)
424
                .tolist()
425
            )
426
427
            parse_df = parse_df.loc[parse_df.drop_this]
428
429
            if not parse_df.empty:
430
                units.loc[
431
                    parse_df.index, "zip_and_municipality"
432
                ] = parse_df.zip_and_municipality
433
434
        # add geocoding to missing
435
        units = units.merge(
436
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
437
                columns={"geometry": "temp"}
438
            ),
439
            how="left",
440
            on="zip_and_municipality",
441
        )
442
443
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
444
            units.geometry_geocoded, "temp"
445
        ]
446
447
        init_len = len(units)
448
449
        logger.info(
450
            "Dropping units outside boundary by geometry or without geometry"
451
            "..."
452
        )
453
454
        units.dropna(subset=["geometry"], inplace=True)
455
456
        units = units.loc[units.geometry.within(boundary)]
457
458
        logger.debug(
459
            f"{init_len - len(units)}/{init_len} "
460
            f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
461
        )
462
463
        # drop unnecessary and rename columns
464
        logger.debug("Reformatting...")
465
        units.drop(
466
            columns=[
467
                "LokationMastrNummer",
468
                "MaStRNummer",
469
                "Laengengrad",
470
                "Breitengrad",
471
                "Spannungsebene",
472
                "Land",
473
                "temp",
474
            ],
475
            inplace=True,
476
        )
477
        mapping = cols_mapping["all"].copy()
478
        mapping.update(cols_mapping[tech])
479
        mapping.update({"geometry": "geom"})
480
        units.rename(columns=mapping, inplace=True)
481
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
482
483
        units.set_geometry("geom", inplace=True)
484
        units["id"] = range(0, len(units))
485
486
        # change capacity unit: kW to MW
487
        units["capacity"] = units["capacity"] / 1e3
488
        if "capacity_inverter" in units.columns:
489
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
490
        if "th_capacity" in units.columns:
491
            units["th_capacity"] = units["th_capacity"] / 1e3
492
493
        # assign bus ids
494
        logger.debug("Assigning bus ids...")
495
        units = units.assign(
496
            bus_id=units.loc[~units.geom.x.isna()]
497
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
498
            .drop(columns=["index_right"])
499
            .bus_id
500
        )
501
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
502
503
        # write to DB
504
        logger.info(f"Writing {len(units)} units to DB...")
505
506
        units.to_postgis(
507
            name=target_tables[tech].__tablename__,
508
            con=engine,
509
            if_exists="append",
510
            schema=target_tables[tech].__table_args__["schema"],
511
        )
512