Passed
Pull Request — dev (#1112)
by
unknown
01:49
created

data.datasets.power_plants.mastr.isfloat()   A

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 17
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from geoalchemy2 import Geometry
29
from loguru import logger
30
from shapely.geometry import Point
31
from sqlalchemy import (
32
    Boolean,
33
    Column,
34
    DateTime,
35
    Float,
36
    Integer,
37
    Sequence,
38
    String,
39
)
40
from sqlalchemy.ext.declarative import declarative_base
41
import geopandas as gpd
42
import numpy as np
43
import pandas as pd
44
45
from egon.data import config, db
46
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
47
48
Base = declarative_base()
49
50
TESTMODE_OFF = (
51
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
52
)
53
54
55
class EgonMastrGeocoded(Base):
56
    __tablename__ = "egon_mastr_geocoded"
57
    __table_args__ = {"schema": "supply"}
58
59
    index = Column(
60
        Integer, Sequence("mastr_geocoded_seq"), primary_key=True, index=True
61
    )
62
    zip_and_municipality = Column(String)
63
    latitude = Column(Float)
64
    longitude = Column(Float)
65
    altitude = Column(Float)
66
    geometry = Column(Geometry("POINT", 4326))
67
68
69
class EgonPowerPlantsPv(Base):
70
    __tablename__ = "egon_power_plants_pv"
71
    __table_args__ = {"schema": "supply"}
72
73
    id = Column(Integer, Sequence("pp_pv_seq"), primary_key=True)
74
    bus_id = Column(Integer, nullable=True)  # Grid district id
75
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
76
77
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
78
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
79
    postcode = Column(String(5), nullable=True)  # Postleitzahl
80
    city = Column(String(50), nullable=True)  # Ort
81
    municipality = Column(String, nullable=True)  # Gemeinde
82
    federal_state = Column(String(31), nullable=True)  # Bundesland
83
    site = Column(String, nullable=True)  # Standort
84
    zip_and_municipality = Column(String, nullable=True)
85
86
    site_type = Column(String(69), nullable=True)  # Lage
87
    usage_sector = Column(String(36), nullable=True)  # Nutzungsbereich
88
    orientation_primary = Column(String(11), nullable=True)  # Hauptausrichtung
89
    orientation_primary_angle = Column(
90
        String(18), nullable=True
91
    )  # HauptausrichtungNeigungswinkel
92
    orientation_secondary = Column(
93
        String(11), nullable=True
94
    )  # Nebenausrichtung
95
    orientation_secondary_angle = Column(
96
        String(18), nullable=True
97
    )  # NebenausrichtungNeigungswinkel
98
    orientation_uniform = Column(
99
        Boolean, nullable=True
100
    )  # EinheitlicheAusrichtungUndNeigungswinkel
101
    module_count = Column(Float, nullable=True)  # AnzahlModule
102
103
    capacity = Column(Float, nullable=True)  # Nettonennleistung
104
    capacity_inverter = Column(
105
        Float, nullable=True
106
    )  # ZugeordneteWirkleistungWechselrichter in MW
107
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
108
    voltage_level = Column(Integer, nullable=True)
109
    voltage_level_inferred = Column(Boolean, nullable=True)
110
111
    geometry_geocoded = Column(Boolean)
112
113
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
114
115
116 View Code Duplication
class EgonPowerPlantsWind(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
117
    __tablename__ = "egon_power_plants_wind"
118
    __table_args__ = {"schema": "supply"}
119
120
    id = Column(Integer, Sequence("pp_wind_seq"), primary_key=True)
121
    bus_id = Column(Integer, nullable=True)  # Grid district id
122
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
123
124
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
125
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
126
    postcode = Column(String(5), nullable=True)  # Postleitzahl
127
    city = Column(String(50), nullable=True)  # Ort
128
    municipality = Column(String, nullable=True)  # Gemeinde
129
    federal_state = Column(String(31), nullable=True)  # Bundesland
130
    zip_and_municipality = Column(String, nullable=True)
131
132
    site_type = Column(String(17), nullable=True)  # Lage
133
    manufacturer_name = Column(String(100), nullable=True)  # Hersteller
134
    type_name = Column(String(100), nullable=True)  # Typenbezeichnung
135
    hub_height = Column(Float, nullable=True)  # Nabenhoehe
136
    rotor_diameter = Column(Float, nullable=True)  # Rotordurchmesser
137
138
    capacity = Column(Float, nullable=True)  # Nettonennleistung
139
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
140
    voltage_level = Column(Integer, nullable=True)
141
    voltage_level_inferred = Column(Boolean, nullable=True)
142
143
    geometry_geocoded = Column(Boolean)
144
145
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
146
147
148 View Code Duplication
class EgonPowerPlantsBiomass(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
149
    __tablename__ = "egon_power_plants_biomass"
150
    __table_args__ = {"schema": "supply"}
151
152
    id = Column(Integer, Sequence("pp_biomass_seq"), primary_key=True)
153
    bus_id = Column(Integer, nullable=True)  # Grid district id
154
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
155
156
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
157
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
158
    postcode = Column(String(5), nullable=True)  # Postleitzahl
159
    city = Column(String(50), nullable=True)  # Ort
160
    municipality = Column(String, nullable=True)  # Gemeinde
161
    federal_state = Column(String(31), nullable=True)  # Bundesland
162
    zip_and_municipality = Column(String, nullable=True)
163
164
    technology = Column(String(45), nullable=True)  # Technologie
165
    fuel_name = Column(String(52), nullable=True)  # Hauptbrennstoff
166
    fuel_type = Column(String(19), nullable=True)  # Biomasseart
167
168
    capacity = Column(Float, nullable=True)  # Nettonennleistung
169
    th_capacity = Column(Float, nullable=True)  # ThermischeNutzleistung
170
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
171
    voltage_level = Column(Integer, nullable=True)
172
    voltage_level_inferred = Column(Boolean, nullable=True)
173
174
    geometry_geocoded = Column(Boolean)
175
176
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
177
178
179
class EgonPowerPlantsHydro(Base):
180
    __tablename__ = "egon_power_plants_hydro"
181
    __table_args__ = {"schema": "supply"}
182
183
    id = Column(Integer, Sequence("pp_hydro_seq"), primary_key=True)
184
    bus_id = Column(Integer, nullable=True)  # Grid district id
185
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
186
187
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
188
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
189
    postcode = Column(String(5), nullable=True)  # Postleitzahl
190
    city = Column(String(50), nullable=True)  # Ort
191
    municipality = Column(String, nullable=True)  # Gemeinde
192
    federal_state = Column(String(31), nullable=True)  # Bundesland
193
    zip_and_municipality = Column(String, nullable=True)
194
195
    plant_type = Column(String(39), nullable=True)  # ArtDerWasserkraftanlage
196
    water_origin = Column(String(20), nullable=True)  # ArtDesZuflusses
197
198
    capacity = Column(Float, nullable=True)  # Nettonennleistung
199
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
200
    voltage_level = Column(Integer, nullable=True)
201
    voltage_level_inferred = Column(Boolean, nullable=True)
202
203
    geometry_geocoded = Column(Boolean)
204
205
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
206
207
208
def isfloat(num: str):
209
    """
210
    Determine if string can be converted to float.
211
    Parameters
212
    -----------
213
    num : str
214
        String to parse.
215
    Returns
216
    -------
217
    bool
218
        Returns True in string can be parsed to float.
219
    """
220
    try:
221
        float(num)
222
        return True
223
    except ValueError:
224
        return False
225
226
227
def zip_and_municipality_from_standort(
228
    standort: str,
229
) -> tuple[str, bool]:
230
    """
231
    Get zip code and municipality from Standort string split into a list.
232
    Parameters
233
    -----------
234
    standort : str
235
        Standort as given from MaStR data.
236
    Returns
237
    -------
238
    str
239
        Standort with only the zip code and municipality
240
        as well a ', Germany' added.
241
    """
242
    standort_list = standort.split()
243
244
    found = False
245
    count = 0
246
247
    for count, elem in enumerate(standort_list):
248
        if len(elem) != 5:
249
            continue
250
        if not elem.isnumeric():
251
            continue
252
253
        found = True
254
255
        break
256
257
    if found:
258
        cleaned_str = " ".join(standort_list[count:])
259
260
        return cleaned_str, found
261
262
    logger.warning(
263
        "Couldn't identify zip code. This entry will be dropped."
264
        f" Original standort: {standort}."
265
    )
266
267
    return standort, found
268
269
270
def infer_voltage_level(
271
    units_gdf: gpd.GeoDataFrame,
272
) -> gpd.GeoDataFrame:
273
    """
274
    Infer nan values in voltage level derived from generator capacity to
275
    the power plants.
276
277
    Parameters
278
    -----------
279
    units_gdf : geopandas.GeoDataFrame
280
        GeoDataFrame containing units with voltage levels from MaStR
281
    Returnsunits_gdf: gpd.GeoDataFrame
282
    -------
283
    geopandas.GeoDataFrame
284
        GeoDataFrame containing units all having assigned a voltage level.
285
    """
286
287 View Code Duplication
    def voltage_levels(p: float) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
288
        if p <= 100:
289
            return 7
290
        elif p <= 200:
291
            return 6
292
        elif p <= 5500:
293
            return 5
294
        elif p <= 20000:
295
            return 4
296
        elif p <= 120000:
297
            return 3
298
        return 1
299
300
    units_gdf["voltage_level_inferred"] = False
301
    mask = units_gdf.voltage_level.isna()
302
    units_gdf.loc[mask, "voltage_level_inferred"] = True
303
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
304
        mask
305
    ].Nettonennleistung.apply(voltage_levels)
306
307
    return units_gdf
308
309
310
def import_mastr() -> None:
311
    """Import MaStR data into database"""
312
    engine = db.engine()
313
314
    # import geocoded data
315
    cfg = config.datasets()["mastr_new"]
316
    path_parts = cfg["geocoding_path"]
317
    path = Path(*["."] + path_parts).resolve()
318
    path = list(path.iterdir())[0]
319
320
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
321
    deposit_id_mastr = cfg["deposit_id"]
322
323
    if deposit_id_geocoding != deposit_id_mastr:
324
        raise AssertionError(
325
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
326
            f" dataset is not matching with the geocoding version "
327
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
328
            f"the MaStR dataset is updated also update the geocoding and "
329
            f"update the egon data bundle. The geocoding can be done using: "
330
            f"https://github.com/RLI-sandbox/mastr-geocoding"
331
        )
332
333
    geocoding_gdf = gpd.read_file(path)
334
335
    # remove failed requests
336
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
337
338
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
339
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
340
341
    geocoding_gdf.to_postgis(
342
        name=EgonMastrGeocoded.__tablename__,
343
        con=engine,
344
        if_exists="append",
345
        schema=EgonMastrGeocoded.__table_args__["schema"],
346
        index=True,
347
    )
348
349
    cfg = config.datasets()["power_plants"]
350
351
    cols_mapping = {
352
        "all": {
353
            "EinheitMastrNummer": "gens_id",
354
            "EinheitBetriebsstatus": "status",
355
            "Inbetriebnahmedatum": "commissioning_date",
356
            "Postleitzahl": "postcode",
357
            "Ort": "city",
358
            "Gemeinde": "municipality",
359
            "Bundesland": "federal_state",
360
            "Nettonennleistung": "capacity",
361
            "Einspeisungsart": "feedin_type",
362
        },
363
        "pv": {
364
            "Lage": "site_type",
365
            "Standort": "site",
366
            "Nutzungsbereich": "usage_sector",
367
            "Hauptausrichtung": "orientation_primary",
368
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
369
            "Nebenausrichtung": "orientation_secondary",
370
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
371
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
372
            "AnzahlModule": "module_count",
373
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
374
        },
375
        "wind": {
376
            "Lage": "site_type",
377
            "Hersteller": "manufacturer_name",
378
            "Typenbezeichnung": "type_name",
379
            "Nabenhoehe": "hub_height",
380
            "Rotordurchmesser": "rotor_diameter",
381
        },
382
        "biomass": {
383
            "Technologie": "technology",
384
            "Hauptbrennstoff": "fuel_name",
385
            "Biomasseart": "fuel_type",
386
            "ThermischeNutzleistung": "th_capacity",
387
        },
388
        "hydro": {
389
            "ArtDerWasserkraftanlage": "plant_type",
390
            "ArtDesZuflusses": "water_origin",
391
        },
392
    }
393
394
    source_files = {
395
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
396
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
397
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
398
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
399
    }
400
    target_tables = {
401
        "pv": EgonPowerPlantsPv,
402
        "wind": EgonPowerPlantsWind,
403
        "biomass": EgonPowerPlantsBiomass,
404
        "hydro": EgonPowerPlantsHydro,
405
    }
406
    vlevel_mapping = {
407
        "Höchstspannung": 1,
408
        "UmspannungZurHochspannung": 2,
409
        "Hochspannung": 3,
410
        "UmspannungZurMittelspannung": 4,
411
        "Mittelspannung": 5,
412
        "UmspannungZurNiederspannung": 6,
413
        "Niederspannung": 7,
414
    }
415
416
    # import locations
417
    locations = pd.read_csv(
418
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
419
        index_col=None,
420
    )
421
422
    # import grid districts
423
    mv_grid_districts = db.select_geodataframe(
424
        f"""
425
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
426
        """,
427
        epsg=4326,
428
    )
429
430
    # import units
431
    technologies = ["pv", "wind", "biomass", "hydro"]
432
    for tech in technologies:
433
        # read units
434
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
435
        logger.debug("Reading CSV and filtering data...")
436
        units = pd.read_csv(
437
            source_files[tech],
438
            usecols=(
439
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
440
                + list(cols_mapping["all"].keys())
441
                + list(cols_mapping[tech].keys())
442
            ),
443
            index_col=None,
444
            dtype={"Postleitzahl": str},
445
        ).rename(columns=cols_mapping)
446
447
        # drop units outside of Germany
448
        len_old = len(units)
449
        units = units.loc[units.Land == "Deutschland"]
450
        logger.debug(
451
            f"{len_old - len(units)} units outside of Germany dropped..."
452
        )
453
454
        # filter for SH units if in testmode
455
        if not TESTMODE_OFF:
456
            logger.info(
457
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
458
            )
459
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
460
461
        # merge and rename voltage level
462
        logger.debug("Merging with locations and allocate voltage level...")
463
        units = units.merge(
464
            locations[["MaStRNummer", "Spannungsebene"]],
465
            left_on="LokationMastrNummer",
466
            right_on="MaStRNummer",
467
            how="left",
468
        )
469
        # convert voltage levels to numbers
470
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
471
        # set voltage level for nan values
472
        units = infer_voltage_level(units)
473
474
        # add geometry
475
        logger.debug("Adding geometries...")
476
        units = gpd.GeoDataFrame(
477
            units,
478
            geometry=gpd.points_from_xy(
479
                units["Laengengrad"], units["Breitengrad"], crs=4326
480
            ),
481
            crs=4326,
482
        )
483
484
        units["geometry_geocoded"] = (
485
            units.Laengengrad.isna() | units.Laengengrad.isna()
486
        )
487
488
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
489
            ~units.geometry_geocoded, "geometry"
490
        ].is_valid
491
492
        units_wo_geom = units["geometry_geocoded"].sum()
493
494
        logger.debug(
495
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
496
            " Adding geocoding results."
497
        )
498
499
        # determine zip and municipality string
500
        mask = (
501
            units.Postleitzahl.apply(isfloat)
502
            & ~units.Postleitzahl.isna()
503
            & ~units.Gemeinde.isna()
504
        )
505
        units["zip_and_municipality"] = np.nan
506
        ok_units = units.loc[mask]
507
508
        units.loc[mask, "zip_and_municipality"] = (
509
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
510
            + " "
511
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
512
            + ", Deutschland"
513
        )
514
515
        # get zip and municipality from Standort
516
        parse_df = units.loc[~mask]
517
518
        if not parse_df.empty and "Standort" in parse_df.columns:
519
            init_len = len(parse_df)
520
521
            logger.info(
522
                f"Parsing ZIP code and municipality from Standort for "
523
                f"{init_len} values for {tech}."
524
            )
525
526
            parse_df[["zip_and_municipality", "drop_this"]] = (
527
                parse_df.Standort.astype(str)
528
                .apply(zip_and_municipality_from_standort)
529
                .tolist()
530
            )
531
532
            parse_df = parse_df.loc[parse_df.drop_this]
533
534
            if not parse_df.empty:
535
                units.loc[
536
                    parse_df.index, "zip_and_municipality"
537
                ] = parse_df.zip_and_municipality
538
539
        # add geocoding to missing
540
        units = units.merge(
541
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
542
                columns={"geometry": "temp"}
543
            ),
544
            how="left",
545
            on="zip_and_municipality",
546
        )
547
548
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
549
            units.geometry_geocoded, "temp"
550
        ]
551
552
        # fill None and NaN values with empty geom because to_postgis fails
553
        # otherwise
554
        units.geometry.fillna(Point(np.nan, np.nan), inplace=True)
555
556
        # drop unnecessary and rename columns
557
        logger.debug("Reformatting...")
558
        units.drop(
559
            columns=[
560
                "LokationMastrNummer",
561
                "MaStRNummer",
562
                "Laengengrad",
563
                "Breitengrad",
564
                "Spannungsebene",
565
                "Land",
566
                "temp",
567
            ],
568
            inplace=True,
569
        )
570
        mapping = cols_mapping["all"].copy()
571
        mapping.update(cols_mapping[tech])
572
        mapping.update({"geometry": "geom"})
573
        units.rename(columns=mapping, inplace=True)
574
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
575
576
        units.set_geometry("geom", inplace=True)
577
        units["id"] = range(0, len(units))
578
579
        # change capacity unit: kW to MW
580
        units["capacity"] = units["capacity"] / 1e3
581
        if "capacity_inverter" in units.columns:
582
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
583
        if "th_capacity" in units.columns:
584
            units["th_capacity"] = units["th_capacity"] / 1e3
585
586
        # assign bus ids
587
        logger.debug("Assigning bus ids...")
588
        units = units.assign(
589
            bus_id=units.loc[~units.geom.x.isna()]
590
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
591
            .drop(columns=["index_right"])
592
            .bus_id
593
        )
594
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
595
596
        # write to DB
597
        logger.info(f"Writing {len(units)} units to DB...")
598
        target_tables[tech].__table__.drop(bind=engine, checkfirst=True)
599
        target_tables[tech].__table__.create(bind=engine, checkfirst=True)
600
601
        units.to_postgis(
602
            name=target_tables[tech].__tablename__,
603
            con=engine,
604
            if_exists="append",
605
            schema=target_tables[tech].__table_args__["schema"],
606
        )
607