Completed
Push — dev ( c38e3d...5d805f )
by
unknown
22s queued 18s
created

zip_and_municipality_from_standort()   A

Complexity

Conditions 5

Size

Total Lines 41
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 41
rs 9.0333
c 0
b 0
f 0
cc 5
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from loguru import logger
29
import geopandas as gpd
30
import numpy as np
31
import pandas as pd
32
33
from egon.data import config, db
34
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
35
from egon.data.datasets.power_plants.mastr_db_classes import (
36
    EgonMastrGeocoded,
37
    EgonPowerPlantsBiomass,
38
    EgonPowerPlantsCombustion,
39
    EgonPowerPlantsGsgk,
40
    EgonPowerPlantsHydro,
41
    EgonPowerPlantsNuclear,
42
    EgonPowerPlantsPv,
43
    EgonPowerPlantsStorage,
44
    EgonPowerPlantsWind,
45
)
46
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
47
    federal_state_data,
48
)
49
50
TESTMODE_OFF = (
51
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
52
)
53
54
55
def isfloat(num: str):
56
    """
57
    Determine if string can be converted to float.
58
    Parameters
59
    -----------
60
    num : str
61
        String to parse.
62
    Returns
63
    -------
64
    bool
65
        Returns True in string can be parsed to float.
66
    """
67
    try:
68
        float(num)
69
        return True
70
    except ValueError:
71
        return False
72
73
74
def zip_and_municipality_from_standort(
75
    standort: str,
76
) -> tuple[str, bool]:
77
    """
78
    Get zip code and municipality from Standort string split into a list.
79
    Parameters
80
    -----------
81
    standort : str
82
        Standort as given from MaStR data.
83
    Returns
84
    -------
85
    str
86
        Standort with only the zip code and municipality
87
        as well a ', Germany' added.
88
    """
89
    standort_list = standort.split()
90
91
    found = False
92
    count = 0
93
94
    for count, elem in enumerate(standort_list):
95
        if len(elem) != 5:
96
            continue
97
        if not elem.isnumeric():
98
            continue
99
100
        found = True
101
102
        break
103
104
    if found:
105
        cleaned_str = " ".join(standort_list[count:])
106
107
        return cleaned_str, found
108
109
    logger.warning(
110
        "Couldn't identify zip code. This entry will be dropped."
111
        f" Original standort: {standort}."
112
    )
113
114
    return standort, found
115
116
117 View Code Duplication
def infer_voltage_level(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
118
    units_gdf: gpd.GeoDataFrame,
119
) -> gpd.GeoDataFrame:
120
    """
121
    Infer nan values in voltage level derived from generator capacity to
122
    the power plants.
123
124
    Parameters
125
    -----------
126
    units_gdf : geopandas.GeoDataFrame
127
        GeoDataFrame containing units with voltage levels from MaStR
128
    Returnsunits_gdf: gpd.GeoDataFrame
129
    -------
130
    geopandas.GeoDataFrame
131
        GeoDataFrame containing units all having assigned a voltage level.
132
    """
133
134
    def voltage_levels(p: float) -> int:
135
        if p <= 100:
136
            return 7
137
        elif p <= 200:
138
            return 6
139
        elif p <= 5500:
140
            return 5
141
        elif p <= 20000:
142
            return 4
143
        elif p <= 120000:
144
            return 3
145
        return 1
146
147
    units_gdf["voltage_level_inferred"] = False
148
    mask = units_gdf.voltage_level.isna()
149
    units_gdf.loc[mask, "voltage_level_inferred"] = True
150
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
151
        mask
152
    ].Nettonennleistung.apply(voltage_levels)
153
154
    return units_gdf
155
156
157
def import_mastr() -> None:
158
    """Import MaStR data into database"""
159
    engine = db.engine()
160
161
    # import geocoded data
162
    cfg = config.datasets()["mastr_new"]
163
    path_parts = cfg["geocoding_path"]
164
    path = Path(*["."] + path_parts).resolve()
165
    path = list(path.iterdir())[0]
166
167
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
168
    deposit_id_mastr = cfg["deposit_id"]
169
170
    if deposit_id_geocoding != deposit_id_mastr:
171
        raise AssertionError(
172
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
173
            f" dataset is not matching with the geocoding version "
174
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
175
            f"the MaStR dataset is updated also update the geocoding and "
176
            f"update the egon data bundle. The geocoding can be done using: "
177
            f"https://github.com/RLI-sandbox/mastr-geocoding"
178
        )
179
180
    geocoding_gdf = gpd.read_file(path)
181
182
    # remove failed requests
183
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
184
185
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
186
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
187
188
    geocoding_gdf.to_postgis(
189
        name=EgonMastrGeocoded.__tablename__,
190
        con=engine,
191
        if_exists="append",
192
        schema=EgonMastrGeocoded.__table_args__["schema"],
193
        index=True,
194
    )
195
196
    cfg = config.datasets()["power_plants"]
197
198
    cols_mapping = {
199
        "all": {
200
            "EinheitMastrNummer": "gens_id",
201
            "EinheitBetriebsstatus": "status",
202
            "Inbetriebnahmedatum": "commissioning_date",
203
            "Postleitzahl": "postcode",
204
            "Ort": "city",
205
            "Gemeinde": "municipality",
206
            "Bundesland": "federal_state",
207
            "Nettonennleistung": "capacity",
208
            "Einspeisungsart": "feedin_type",
209
        },
210
        "pv": {
211
            "Lage": "site_type",
212
            "Standort": "site",
213
            "Nutzungsbereich": "usage_sector",
214
            "Hauptausrichtung": "orientation_primary",
215
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
216
            "Nebenausrichtung": "orientation_secondary",
217
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
218
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
219
            "AnzahlModule": "module_count",
220
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
221
        },
222
        "wind": {
223
            "Lage": "site_type",
224
            "Hersteller": "manufacturer_name",
225
            "Typenbezeichnung": "type_name",
226
            "Nabenhoehe": "hub_height",
227
            "Rotordurchmesser": "rotor_diameter",
228
        },
229
        "biomass": {
230
            "Technologie": "technology",
231
            "Hauptbrennstoff": "main_fuel",
232
            "Biomasseart": "fuel_type",
233
            "ThermischeNutzleistung": "th_capacity",
234
        },
235
        "hydro": {
236
            "ArtDerWasserkraftanlage": "plant_type",
237
            "ArtDesZuflusses": "water_origin",
238
        },
239
        "combustion": {
240
            "Energietraeger": "carrier",
241
            "Hauptbrennstoff": "main_fuel",
242
            "WeitererHauptbrennstoff": "other_main_fuel",
243
            "Technologie": "technology",
244
            "ThermischeNutzleistung": "th_capacity",
245
        },
246
        "gsgk": {
247
            "Energietraeger": "carrier",
248
            "Technologie": "technology",
249
        },
250
        "nuclear": {
251
            "Energietraeger": "carrier",
252
            "Technologie": "technology",
253
        },
254
        "storage": {
255
            "Energietraeger": "carrier",
256
            "Technologie": "technology",
257
            "Batterietechnologie": "battery_type",
258
            "Pumpspeichertechnologie": "pump_storage_type",
259
        },
260
    }
261
262
    source_files = {
263
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
264
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
265
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
266
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
267
        "combustion": WORKING_DIR_MASTR_NEW
268
        / cfg["sources"]["mastr_combustion"],
269
        "gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"],
270
        "nuclear": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_nuclear"],
271
        "storage": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_storage"],
272
    }
273
274
    target_tables = {
275
        "pv": EgonPowerPlantsPv,
276
        "wind": EgonPowerPlantsWind,
277
        "biomass": EgonPowerPlantsBiomass,
278
        "hydro": EgonPowerPlantsHydro,
279
        "combustion": EgonPowerPlantsCombustion,
280
        "gsgk": EgonPowerPlantsGsgk,
281
        "nuclear": EgonPowerPlantsNuclear,
282
        "storage": EgonPowerPlantsStorage,
283
    }
284
285
    vlevel_mapping = {
286
        "Höchstspannung": 1,
287
        "UmspannungZurHochspannung": 2,
288
        "Hochspannung": 3,
289
        "UmspannungZurMittelspannung": 4,
290
        "Mittelspannung": 5,
291
        "UmspannungZurNiederspannung": 6,
292
        "Niederspannung": 7,
293
    }
294
295
    # import locations
296
    locations = pd.read_csv(
297
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
298
        index_col=None,
299
    )
300
301
    # import grid districts
302
    mv_grid_districts = db.select_geodataframe(
303
        f"""
304
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
305
        """,
306
        epsg=4326,
307
    )
308
309
    # import units
310
    technologies = [
311
        "pv",
312
        "wind",
313
        "biomass",
314
        "hydro",
315
        "combustion",
316
        "gsgk",
317
        "nuclear",
318
        "storage",
319
    ]
320
321
    for tech in technologies:
322
        # read units
323
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
324
        logger.debug("Reading CSV and filtering data...")
325
        units = pd.read_csv(
326
            source_files[tech],
327
            usecols=(
328
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
329
                + list(cols_mapping["all"].keys())
330
                + list(cols_mapping[tech].keys())
331
            ),
332
            index_col=None,
333
            dtype={"Postleitzahl": str},
334
            low_memory=False,
335
        ).rename(columns=cols_mapping)
336
337
        # drop units outside of Germany
338
        len_old = len(units)
339
        units = units.loc[units.Land == "Deutschland"]
340
        logger.debug(
341
            f"{len_old - len(units)} units outside of Germany dropped..."
342
        )
343
344
        # get boundary
345
        boundary = (
346
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
347
        )
348
349
        # drop units installed after reference date from cfg
350
        # (eGon2021 scenario)
351
        len_old = len(units)
352
        ts = pd.Timestamp(config.datasets()["mastr_new"]["egon2021_date_max"])
353
        units = units.loc[pd.to_datetime(units.Inbetriebnahmedatum) <= ts]
354
        logger.debug(
355
            f"{len_old - len(units)} units installed after {ts} dropped..."
356
        )
357
358
        # drop not operating units
359
        len_old = len(units)
360
        units = units.loc[
361
            units.EinheitBetriebsstatus.isin(
362
                ["InBetrieb", "VoruebergehendStillgelegt"]
363
            )
364
        ]
365
        logger.debug(f"{len_old - len(units)} not operating units dropped...")
366
367
        # filter for SH units if in testmode
368
        if not TESTMODE_OFF:
369
            logger.info(
370
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
371
            )
372
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
373
374
        # merge and rename voltage level
375
        logger.debug("Merging with locations and allocate voltage level...")
376
        units = units.merge(
377
            locations[["MaStRNummer", "Spannungsebene"]],
378
            left_on="LokationMastrNummer",
379
            right_on="MaStRNummer",
380
            how="left",
381
        )
382
        # convert voltage levels to numbers
383
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
384
        # set voltage level for nan values
385
        units = infer_voltage_level(units)
386
387
        # add geometry
388
        logger.debug("Adding geometries...")
389
        units = gpd.GeoDataFrame(
390
            units,
391
            geometry=gpd.points_from_xy(
392
                units["Laengengrad"], units["Breitengrad"], crs=4326
393
            ),
394
            crs=4326,
395
        )
396
397
        units["geometry_geocoded"] = (
398
            units.Laengengrad.isna() | units.Laengengrad.isna()
399
        )
400
401
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
402
            ~units.geometry_geocoded, "geometry"
403
        ].is_valid
404
405
        units_wo_geom = units["geometry_geocoded"].sum()
406
407
        logger.debug(
408
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
409
            " Adding geocoding results."
410
        )
411
412
        # determine zip and municipality string
413
        mask = (
414
            units.Postleitzahl.apply(isfloat)
415
            & ~units.Postleitzahl.isna()
416
            & ~units.Gemeinde.isna()
417
        )
418
        units["zip_and_municipality"] = np.nan
419
        ok_units = units.loc[mask]
420
421
        units.loc[mask, "zip_and_municipality"] = (
422
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
423
            + " "
424
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
425
            + ", Deutschland"
426
        )
427
428
        # get zip and municipality from Standort
429
        parse_df = units.loc[~mask]
430
431
        if not parse_df.empty and "Standort" in parse_df.columns:
432
            init_len = len(parse_df)
433
434
            logger.info(
435
                f"Parsing ZIP code and municipality from Standort for "
436
                f"{init_len} values for {tech}."
437
            )
438
439
            parse_df[["zip_and_municipality", "drop_this"]] = (
440
                parse_df.Standort.astype(str)
441
                .apply(zip_and_municipality_from_standort)
442
                .tolist()
443
            )
444
445
            parse_df = parse_df.loc[parse_df.drop_this]
446
447
            if not parse_df.empty:
448
                units.loc[
449
                    parse_df.index, "zip_and_municipality"
450
                ] = parse_df.zip_and_municipality
451
452
        # add geocoding to missing
453
        units = units.merge(
454
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
455
                columns={"geometry": "temp"}
456
            ),
457
            how="left",
458
            on="zip_and_municipality",
459
        )
460
461
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
462
            units.geometry_geocoded, "temp"
463
        ]
464
465
        init_len = len(units)
466
467
        logger.info(
468
            "Dropping units outside boundary by geometry or without geometry"
469
            "..."
470
        )
471
472
        units.dropna(subset=["geometry"], inplace=True)
473
474
        units = units.loc[units.geometry.within(boundary)]
475
476
        if init_len > 0:
477
            logger.debug(
478
                f"{init_len - len(units)}/{init_len} "
479
                f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
480
            )
481
482
        # drop unnecessary and rename columns
483
        logger.debug("Reformatting...")
484
        units.drop(
485
            columns=[
486
                "LokationMastrNummer",
487
                "MaStRNummer",
488
                "Laengengrad",
489
                "Breitengrad",
490
                "Spannungsebene",
491
                "Land",
492
                "temp",
493
            ],
494
            inplace=True,
495
        )
496
        mapping = cols_mapping["all"].copy()
497
        mapping.update(cols_mapping[tech])
498
        mapping.update({"geometry": "geom"})
499
        units.rename(columns=mapping, inplace=True)
500
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
501
502
        units.set_geometry("geom", inplace=True)
503
        units["id"] = range(len(units))
504
505
        # change capacity unit: kW to MW
506
        units["capacity"] = units["capacity"] / 1e3
507
        if "capacity_inverter" in units.columns:
508
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
509
        if "th_capacity" in units.columns:
510
            units["th_capacity"] = units["th_capacity"] / 1e3
511
512
        # assign bus ids
513
        logger.debug("Assigning bus ids...")
514
        units = units.assign(
515
            bus_id=units.loc[~units.geom.x.isna()]
516
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
517
            .drop(columns=["index_right"])
518
            .bus_id
519
        )
520
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
521
522
        # write to DB
523
        logger.info(f"Writing {len(units)} units to DB...")
524
525
        units.to_postgis(
526
            name=target_tables[tech].__tablename__,
527
            con=engine,
528
            if_exists="append",
529
            schema=target_tables[tech].__table_args__["schema"],
530
        )
531