Passed
Pull Request — dev (#1112)
by
unknown
01:52
created

zip_and_municipality_from_standort()   A

Complexity

Conditions 5

Size

Total Lines 41
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 41
rs 9.0333
c 0
b 0
f 0
cc 5
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from loguru import logger
29
import geopandas as gpd
30
import numpy as np
31
import pandas as pd
32
33
from egon.data import config, db
34
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
35
from egon.data.datasets.power_plants.mastr_db_classes import (
36
    EgonMastrGeocoded,
37
    EgonPowerPlantsBiomass,
38
    EgonPowerPlantsCombustion,
39
    EgonPowerPlantsGsgk,
40
    EgonPowerPlantsHydro,
41
    EgonPowerPlantsPv,
42
    EgonPowerPlantsWind,
43
)
44
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
45
    federal_state_data,
46
)
47
48
TESTMODE_OFF = (
49
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
50
)
51
52
53
def isfloat(num: str):
54
    """
55
    Determine if string can be converted to float.
56
    Parameters
57
    -----------
58
    num : str
59
        String to parse.
60
    Returns
61
    -------
62
    bool
63
        Returns True in string can be parsed to float.
64
    """
65
    try:
66
        float(num)
67
        return True
68
    except ValueError:
69
        return False
70
71
72
def zip_and_municipality_from_standort(
73
    standort: str,
74
) -> tuple[str, bool]:
75
    """
76
    Get zip code and municipality from Standort string split into a list.
77
    Parameters
78
    -----------
79
    standort : str
80
        Standort as given from MaStR data.
81
    Returns
82
    -------
83
    str
84
        Standort with only the zip code and municipality
85
        as well a ', Germany' added.
86
    """
87
    standort_list = standort.split()
88
89
    found = False
90
    count = 0
91
92
    for count, elem in enumerate(standort_list):
93
        if len(elem) != 5:
94
            continue
95
        if not elem.isnumeric():
96
            continue
97
98
        found = True
99
100
        break
101
102
    if found:
103
        cleaned_str = " ".join(standort_list[count:])
104
105
        return cleaned_str, found
106
107
    logger.warning(
108
        "Couldn't identify zip code. This entry will be dropped."
109
        f" Original standort: {standort}."
110
    )
111
112
    return standort, found
113
114
115
def infer_voltage_level(
116
    units_gdf: gpd.GeoDataFrame,
117
) -> gpd.GeoDataFrame:
118
    """
119
    Infer nan values in voltage level derived from generator capacity to
120
    the power plants.
121
122
    Parameters
123
    -----------
124
    units_gdf : geopandas.GeoDataFrame
125
        GeoDataFrame containing units with voltage levels from MaStR
126
    Returnsunits_gdf: gpd.GeoDataFrame
127
    -------
128
    geopandas.GeoDataFrame
129
        GeoDataFrame containing units all having assigned a voltage level.
130
    """
131
132
    def voltage_levels(p: float) -> int:
133
        if p <= 100:
134
            return 7
135
        elif p <= 200:
136
            return 6
137
        elif p <= 5500:
138
            return 5
139
        elif p <= 20000:
140
            return 4
141
        elif p <= 120000:
142
            return 3
143
        return 1
144
145
    units_gdf["voltage_level_inferred"] = False
146
    mask = units_gdf.voltage_level.isna()
147
    units_gdf.loc[mask, "voltage_level_inferred"] = True
148
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
149
        mask
150
    ].Nettonennleistung.apply(voltage_levels)
151
152
    return units_gdf
153
154
155
def import_mastr() -> None:
156
    """Import MaStR data into database"""
157
    engine = db.engine()
158
159
    # import geocoded data
160
    cfg = config.datasets()["mastr_new"]
161
    path_parts = cfg["geocoding_path"]
162
    path = Path(*["."] + path_parts).resolve()
163
    path = list(path.iterdir())[0]
164
165
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
166
    deposit_id_mastr = cfg["deposit_id"]
167
168
    if deposit_id_geocoding != deposit_id_mastr:
169
        raise AssertionError(
170
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
171
            f" dataset is not matching with the geocoding version "
172
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
173
            f"the MaStR dataset is updated also update the geocoding and "
174
            f"update the egon data bundle. The geocoding can be done using: "
175
            f"https://github.com/RLI-sandbox/mastr-geocoding"
176
        )
177
178
    geocoding_gdf = gpd.read_file(path)
179
180
    # remove failed requests
181
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
182
183
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
184
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
185
186
    geocoding_gdf.to_postgis(
187
        name=EgonMastrGeocoded.__tablename__,
188
        con=engine,
189
        if_exists="append",
190
        schema=EgonMastrGeocoded.__table_args__["schema"],
191
        index=True,
192
    )
193
194
    cfg = config.datasets()["power_plants"]
195
196
    cols_mapping = {
197
        "all": {
198
            "EinheitMastrNummer": "gens_id",
199
            "EinheitBetriebsstatus": "status",
200
            "Inbetriebnahmedatum": "commissioning_date",
201
            "Postleitzahl": "postcode",
202
            "Ort": "city",
203
            "Gemeinde": "municipality",
204
            "Bundesland": "federal_state",
205
            "Nettonennleistung": "capacity",
206
            "Einspeisungsart": "feedin_type",
207
        },
208
        "pv": {
209
            "Lage": "site_type",
210
            "Standort": "site",
211
            "Nutzungsbereich": "usage_sector",
212
            "Hauptausrichtung": "orientation_primary",
213
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
214
            "Nebenausrichtung": "orientation_secondary",
215
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
216
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
217
            "AnzahlModule": "module_count",
218
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
219
        },
220
        "wind": {
221
            "Lage": "site_type",
222
            "Hersteller": "manufacturer_name",
223
            "Typenbezeichnung": "type_name",
224
            "Nabenhoehe": "hub_height",
225
            "Rotordurchmesser": "rotor_diameter",
226
        },
227
        "biomass": {
228
            "Technologie": "technology",
229
            "Hauptbrennstoff": "main_fuel",
230
            "Biomasseart": "fuel_type",
231
            "ThermischeNutzleistung": "th_capacity",
232
        },
233
        "hydro": {
234
            "ArtDerWasserkraftanlage": "plant_type",
235
            "ArtDesZuflusses": "water_origin",
236
        },
237
        "combustion": {
238
            "Energietraeger": "carrier",
239
            "Hauptbrennstoff": "main_fuel",
240
            "WeitererHauptbrennstoff": "other_main_fuel",
241
            "Technologie": "technology",
242
            "ThermischeNutzleistung": "th_capacity",
243
        },
244
        "gsgk": {
245
            "Energietraeger": "carrier",
246
            "Technologie": "technology",
247
        },
248
    }
249
250
    source_files = {
251
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
252
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
253
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
254
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
255
        "combustion": WORKING_DIR_MASTR_NEW
256
        / cfg["sources"]["mastr_combustion"],
257
        "gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"],
258
    }
259
    target_tables = {
260
        "pv": EgonPowerPlantsPv,
261
        "wind": EgonPowerPlantsWind,
262
        "biomass": EgonPowerPlantsBiomass,
263
        "hydro": EgonPowerPlantsHydro,
264
        "combustion": EgonPowerPlantsCombustion,
265
        "gsgk": EgonPowerPlantsGsgk,
266
    }
267
    vlevel_mapping = {
268
        "Höchstspannung": 1,
269
        "UmspannungZurHochspannung": 2,
270
        "Hochspannung": 3,
271
        "UmspannungZurMittelspannung": 4,
272
        "Mittelspannung": 5,
273
        "UmspannungZurNiederspannung": 6,
274
        "Niederspannung": 7,
275
    }
276
277
    # import locations
278
    locations = pd.read_csv(
279
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
280
        index_col=None,
281
    )
282
283
    # import grid districts
284
    mv_grid_districts = db.select_geodataframe(
285
        f"""
286
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
287
        """,
288
        epsg=4326,
289
    )
290
291
    # import units
292
    technologies = ["pv", "wind", "biomass", "hydro", "combustion", "gsgk"]
293
    for tech in technologies:
294
        # read units
295
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
296
        logger.debug("Reading CSV and filtering data...")
297
        units = pd.read_csv(
298
            source_files[tech],
299
            usecols=(
300
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
301
                + list(cols_mapping["all"].keys())
302
                + list(cols_mapping[tech].keys())
303
            ),
304
            index_col=None,
305
            dtype={"Postleitzahl": str},
306
        ).rename(columns=cols_mapping)
307
308
        # drop units outside of Germany
309
        len_old = len(units)
310
        units = units.loc[units.Land == "Deutschland"]
311
        logger.debug(
312
            f"{len_old - len(units)} units outside of Germany dropped..."
313
        )
314
315
        # get boundary
316
        boundary = (
317
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
318
        )
319
320
        # filter for SH units if in testmode
321
        if not TESTMODE_OFF:
322
            logger.info(
323
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
324
            )
325
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
326
327
        # merge and rename voltage level
328
        logger.debug("Merging with locations and allocate voltage level...")
329
        units = units.merge(
330
            locations[["MaStRNummer", "Spannungsebene"]],
331
            left_on="LokationMastrNummer",
332
            right_on="MaStRNummer",
333
            how="left",
334
        )
335
        # convert voltage levels to numbers
336
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
337
        # set voltage level for nan values
338
        units = infer_voltage_level(units)
339
340
        # add geometry
341
        logger.debug("Adding geometries...")
342
        units = gpd.GeoDataFrame(
343
            units,
344
            geometry=gpd.points_from_xy(
345
                units["Laengengrad"], units["Breitengrad"], crs=4326
346
            ),
347
            crs=4326,
348
        )
349
350
        units["geometry_geocoded"] = (
351
            units.Laengengrad.isna() | units.Laengengrad.isna()
352
        )
353
354
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
355
            ~units.geometry_geocoded, "geometry"
356
        ].is_valid
357
358
        units_wo_geom = units["geometry_geocoded"].sum()
359
360
        logger.debug(
361
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
362
            " Adding geocoding results."
363
        )
364
365
        # determine zip and municipality string
366
        mask = (
367
            units.Postleitzahl.apply(isfloat)
368
            & ~units.Postleitzahl.isna()
369
            & ~units.Gemeinde.isna()
370
        )
371
        units["zip_and_municipality"] = np.nan
372
        ok_units = units.loc[mask]
373
374
        units.loc[mask, "zip_and_municipality"] = (
375
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
376
            + " "
377
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
378
            + ", Deutschland"
379
        )
380
381
        # get zip and municipality from Standort
382
        parse_df = units.loc[~mask]
383
384
        if not parse_df.empty and "Standort" in parse_df.columns:
385
            init_len = len(parse_df)
386
387
            logger.info(
388
                f"Parsing ZIP code and municipality from Standort for "
389
                f"{init_len} values for {tech}."
390
            )
391
392
            parse_df[["zip_and_municipality", "drop_this"]] = (
393
                parse_df.Standort.astype(str)
394
                .apply(zip_and_municipality_from_standort)
395
                .tolist()
396
            )
397
398
            parse_df = parse_df.loc[parse_df.drop_this]
399
400
            if not parse_df.empty:
401
                units.loc[
402
                    parse_df.index, "zip_and_municipality"
403
                ] = parse_df.zip_and_municipality
404
405
        # add geocoding to missing
406
        units = units.merge(
407
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
408
                columns={"geometry": "temp"}
409
            ),
410
            how="left",
411
            on="zip_and_municipality",
412
        )
413
414
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
415
            units.geometry_geocoded, "temp"
416
        ]
417
418
        init_len = len(units)
419
420
        logger.info(
421
            "Dropping units outside boundary by geometry or without geometry"
422
            "..."
423
        )
424
425
        units.dropna(subset=["geometry"], inplace=True)
426
427
        units = units.loc[units.geometry.within(boundary)]
428
429
        logger.debug(
430
            f"{init_len - len(units)}/{init_len} "
431
            f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
432
        )
433
434
        # drop unnecessary and rename columns
435
        logger.debug("Reformatting...")
436
        units.drop(
437
            columns=[
438
                "LokationMastrNummer",
439
                "MaStRNummer",
440
                "Laengengrad",
441
                "Breitengrad",
442
                "Spannungsebene",
443
                "Land",
444
                "temp",
445
            ],
446
            inplace=True,
447
        )
448
        mapping = cols_mapping["all"].copy()
449
        mapping.update(cols_mapping[tech])
450
        mapping.update({"geometry": "geom"})
451
        units.rename(columns=mapping, inplace=True)
452
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
453
454
        units.set_geometry("geom", inplace=True)
455
        units["id"] = range(0, len(units))
456
457
        # change capacity unit: kW to MW
458
        units["capacity"] = units["capacity"] / 1e3
459
        if "capacity_inverter" in units.columns:
460
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
461
        if "th_capacity" in units.columns:
462
            units["th_capacity"] = units["th_capacity"] / 1e3
463
464
        # assign bus ids
465
        logger.debug("Assigning bus ids...")
466
        units = units.assign(
467
            bus_id=units.loc[~units.geom.x.isna()]
468
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
469
            .drop(columns=["index_right"])
470
            .bus_id
471
        )
472
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
473
474
        # write to DB
475
        logger.info(f"Writing {len(units)} units to DB...")
476
477
        units.to_postgis(
478
            name=target_tables[tech].__tablename__,
479
            con=engine,
480
            if_exists="append",
481
            schema=target_tables[tech].__table_args__["schema"],
482
        )
483