Passed
Pull Request — dev (#1063)
by
unknown
01:34
created

data.datasets.power_plants.mastr   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 400
Duplicated Lines 15.75 %

Importance

Changes 0
Metric Value
wmc 10
eloc 264
dl 63
loc 400
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
D import_mastr() 12 228 10

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from geoalchemy2 import Geometry
25
from sqlalchemy import (
26
    Boolean,
27
    Column,
28
    DateTime,
29
    Float,
30
    Integer,
31
    Sequence,
32
    String,
33
)
34
from sqlalchemy.ext.declarative import declarative_base
35
import geopandas as gpd
36
import pandas as pd
37
38
from egon.data import db
39
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
40
import egon.data.config
41
42
Base = declarative_base()
43
44
TESTMODE_OFF = (
45
    egon.data.config.settings()["egon-data"]["--dataset-boundary"]
46
    == "Everything"
47
)
48
49
50
class EgonPowerPlantsPv(Base):
51
    __tablename__ = "egon_power_plants_pv"
52
    __table_args__ = {"schema": "supply"}
53
54
    id = Column(Integer, Sequence("pp_pv_seq"), primary_key=True)
55
    bus_id = Column(Integer, nullable=True)  # Grid district id
56
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
57
58
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
59
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
60
    postcode = Column(String(5), nullable=True)  # Postleitzahl
61
    city = Column(String(50), nullable=True)  # Ort
62
    federal_state = Column(String(31), nullable=True)  # Bundesland
63
64
    site_type = Column(String(69), nullable=True)  # Lage
65
    usage_sector = Column(String(36), nullable=True)  # Nutzungsbereich
66
    orientation_primary = Column(String(11), nullable=True)  # Hauptausrichtung
67
    orientation_primary_angle = Column(
68
        String(18), nullable=True
69
    )  # HauptausrichtungNeigungswinkel
70
    orientation_secondary = Column(
71
        String(11), nullable=True
72
    )  # Nebenausrichtung
73
    orientation_secondary_angle = Column(
74
        String(18), nullable=True
75
    )  # NebenausrichtungNeigungswinkel
76
    orientation_uniform = Column(
77
        Boolean, nullable=True
78
    )  # EinheitlicheAusrichtungUndNeigungswinkel
79
    module_count = Column(Float, nullable=True)  # AnzahlModule
80
81
    capacity = Column(Float, nullable=True)  # Nettonennleistung
82
    capacity_inverter = Column(
83
        Float, nullable=True
84
    )  # ZugeordneteWirkleistungWechselrichter in MW
85
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
86
    voltage_level = Column(Integer, nullable=True)
87
    voltage_level_inferred = Column(Boolean, nullable=True)
88
89
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
90
91
92 View Code Duplication
class EgonPowerPlantsWind(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
93
    __tablename__ = "egon_power_plants_wind"
94
    __table_args__ = {"schema": "supply"}
95
96
    id = Column(Integer, Sequence("pp_wind_seq"), primary_key=True)
97
    bus_id = Column(Integer, nullable=True)  # Grid district id
98
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
99
100
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
101
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
102
    postcode = Column(String(5), nullable=True)  # Postleitzahl
103
    city = Column(String(50), nullable=True)  # Ort
104
    federal_state = Column(String(31), nullable=True)  # Bundesland
105
106
    site_type = Column(String(17), nullable=True)  # Lage
107
    manufacturer_name = Column(String(100), nullable=True)  # Hersteller
108
    type_name = Column(String(100), nullable=True)  # Typenbezeichnung
109
    hub_height = Column(Float, nullable=True)  # Nabenhoehe
110
    rotor_diameter = Column(Float, nullable=True)  # Rotordurchmesser
111
112
    capacity = Column(Float, nullable=True)  # Nettonennleistung
113
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
114
    voltage_level = Column(Integer, nullable=True)
115
    voltage_level_inferred = Column(Boolean, nullable=True)
116
117
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
118
119
120 View Code Duplication
class EgonPowerPlantsBiomass(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
121
    __tablename__ = "egon_power_plants_biomass"
122
    __table_args__ = {"schema": "supply"}
123
124
    id = Column(Integer, Sequence("pp_biomass_seq"), primary_key=True)
125
    bus_id = Column(Integer, nullable=True)  # Grid district id
126
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
127
128
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
129
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
130
    postcode = Column(String(5), nullable=True)  # Postleitzahl
131
    city = Column(String(50), nullable=True)  # Ort
132
    federal_state = Column(String(31), nullable=True)  # Bundesland
133
134
    technology = Column(String(45), nullable=True)  # Technologie
135
    fuel_name = Column(String(52), nullable=True)  # Hauptbrennstoff
136
    fuel_type = Column(String(19), nullable=True)  # Biomasseart
137
138
    capacity = Column(Float, nullable=True)  # Nettonennleistung
139
    th_capacity = Column(Float, nullable=True)  # ThermischeNutzleistung
140
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
141
    voltage_level = Column(Integer, nullable=True)
142
    voltage_level_inferred = Column(Boolean, nullable=True)
143
144
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
145
146
147
class EgonPowerPlantsHydro(Base):
148
    __tablename__ = "egon_power_plants_hydro"
149
    __table_args__ = {"schema": "supply"}
150
151
    id = Column(Integer, Sequence("pp_hydro_seq"), primary_key=True)
152
    bus_id = Column(Integer, nullable=True)  # Grid district id
153
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
154
155
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
156
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
157
    postcode = Column(String(5), nullable=True)  # Postleitzahl
158
    city = Column(String(50), nullable=True)  # Ort
159
    federal_state = Column(String(31), nullable=True)  # Bundesland
160
161
    plant_type = Column(String(39), nullable=True)  # ArtDerWasserkraftanlage
162
    water_origin = Column(String(20), nullable=True)  # ArtDesZuflusses
163
164
    capacity = Column(Float, nullable=True)  # Nettonennleistung
165
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
166
    voltage_level = Column(Integer, nullable=True)
167
    voltage_level_inferred = Column(Boolean, nullable=True)
168
169
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
170
171
172
def import_mastr() -> None:
173
    """Import MaStR data into database"""
174
175
    def infer_voltage_level(
176
        units_gdf: gpd.GeoDataFrame,
177
    ) -> gpd.GeoDataFrame:
178
        """
179
        Infer nan values in voltage level derived from generator capacity to
180
        the power plants.
181
182
        Parameters
183
        -----------
184
        units_gdf : geopandas.GeoDataFrame
185
            GeoDataFrame containing units with voltage levels from MaStR
186
        Returnsunits_gdf: gpd.GeoDataFrame
187
        -------
188
        geopandas.GeoDataFrame
189
            GeoDataFrame containing units all having assigned a voltage level.
190
        """
191
192 View Code Duplication
        def voltage_levels(p: float) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
193
            if p <= 100:
194
                return 7
195
            elif p <= 200:
196
                return 6
197
            elif p <= 5500:
198
                return 5
199
            elif p <= 20000:
200
                return 4
201
            elif p <= 120000:
202
                return 3
203
            return 1
204
205
        units_gdf["voltage_level_inferred"] = False
206
        mask = units_gdf.voltage_level.isna()
207
        units_gdf.loc[mask, "voltage_level_inferred"] = True
208
        units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
209
            mask
210
        ].Nettonennleistung.apply(voltage_levels)
211
212
        return units_gdf
213
214
    engine = db.engine()
215
    cfg = egon.data.config.datasets()["power_plants"]
216
217
    cols_mapping = {
218
        "all": {
219
            "EinheitMastrNummer": "gens_id",
220
            "EinheitBetriebsstatus": "status",
221
            "Inbetriebnahmedatum": "commissioning_date",
222
            "Postleitzahl": "postcode",
223
            "Ort": "city",
224
            "Bundesland": "federal_state",
225
            "Nettonennleistung": "capacity",
226
            "Einspeisungsart": "feedin_type",
227
        },
228
        "pv": {
229
            "Lage": "site_type",
230
            "Nutzungsbereich": "usage_sector",
231
            "Hauptausrichtung": "orientation_primary",
232
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
233
            "Nebenausrichtung": "orientation_secondary",
234
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
235
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
236
            "AnzahlModule": "module_count",
237
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
238
        },
239
        "wind": {
240
            "Lage": "site_type",
241
            "Hersteller": "manufacturer_name",
242
            "Typenbezeichnung": "type_name",
243
            "Nabenhoehe": "hub_height",
244
            "Rotordurchmesser": "rotor_diameter",
245
        },
246
        "biomass": {
247
            "Technologie": "technology",
248
            "Hauptbrennstoff": "fuel_name",
249
            "Biomasseart": "fuel_type",
250
            "ThermischeNutzleistung": "th_capacity",
251
        },
252
        "hydro": {
253
            "ArtDerWasserkraftanlage": "plant_type",
254
            "ArtDesZuflusses": "water_origin",
255
        },
256
    }
257
258
    source_files = {
259
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
260
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
261
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
262
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
263
    }
264
    target_tables = {
265
        "pv": EgonPowerPlantsPv,
266
        "wind": EgonPowerPlantsWind,
267
        "biomass": EgonPowerPlantsBiomass,
268
        "hydro": EgonPowerPlantsHydro,
269
    }
270
    vlevel_mapping = {
271
        "Höchstspannung": 1,
272
        "UmspannungZurHochspannung": 2,
273
        "Hochspannung": 3,
274
        "UmspannungZurMittelspannung": 4,
275
        "Mittelspannung": 5,
276
        "UmspannungZurNiederspannung": 6,
277
        "Niederspannung": 7,
278
    }
279
280
    # import locations
281
    locations = pd.read_csv(
282
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
283
        index_col=None,
284
    )
285
286
    # import grid districts
287
    mv_grid_districts = db.select_geodataframe(
288
        f"""
289
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
290
        """,
291
        epsg=4326,
292
    )
293
294
    # import units
295
    technologies = ["pv", "wind", "biomass", "hydro"]
296
    for tech in technologies:
297
        # read units
298
        print(f"===== Importing MaStR dataset: {tech} =====")
299
        print("  Reading CSV and filtering data...")
300
        units = pd.read_csv(
301
            source_files[tech],
302
            usecols=(
303
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
304
                + list(cols_mapping["all"].keys())
305
                + list(cols_mapping[tech].keys())
306
            ),
307
            index_col=None,
308
            dtype={"Postleitzahl": str},
309
        ).rename(columns=cols_mapping)
310
311
        # drop units outside of Germany
312
        len_old = len(units)
313
        units = units.loc[units.Land == "Deutschland"]
314
        print(f"    {len_old-len(units)} units outside of Germany dropped...")
315
316
        # filter for SH units if in testmode
317
        if not TESTMODE_OFF:
318
            print(
319
                """    TESTMODE:
320
                Dropping all units outside of Schleswig-Holstein...
321
                """
322
            )
323
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
324
325
        # merge and rename voltage level
326
        print("  Merging with locations and allocate voltage level...")
327
        units = units.merge(
328
            locations[["MaStRNummer", "Spannungsebene"]],
329
            left_on="LokationMastrNummer",
330
            right_on="MaStRNummer",
331
            how="left",
332
        )
333
        # convert voltage levels to numbers
334
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
335
        # set voltage level for nan values
336
        units = infer_voltage_level(units)
337
338
        # add geometry
339
        print("  Adding geometries...")
340
        units = gpd.GeoDataFrame(
341
            units,
342
            geometry=gpd.points_from_xy(
343
                units["Laengengrad"], units["Breitengrad"], crs=4326
344
            ),
345
            crs=4326,
346
        )
347
        units_wo_geom = len(
348
            units.loc[(units.Laengengrad.isna() | units.Laengengrad.isna())]
349
        )
350
        print(
351
            f"    {units_wo_geom}/{len(units)} units do not have a geometry!"
352
        )
353
354
        # drop unnecessary and rename columns
355
        print("  Reformatting...")
356
        units.drop(
357
            columns=[
358
                "LokationMastrNummer",
359
                "MaStRNummer",
360
                "Laengengrad",
361
                "Breitengrad",
362
                "Spannungsebene",
363
                "Land",
364
            ],
365
            inplace=True,
366
        )
367
        mapping = cols_mapping["all"].copy()
368
        mapping.update(cols_mapping[tech])
369
        mapping.update({"geometry": "geom"})
370
        units.rename(columns=mapping, inplace=True)
371
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
372
373
        units.set_geometry("geom", inplace=True)
374
        units["id"] = range(0, len(units))
375
376
        # change capacity unit: kW to MW
377
        units["capacity"] = units["capacity"] / 1e3
378
        if "capacity_inverter" in units.columns:
379
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
380
        if "th_capacity" in units.columns:
381
            units["th_capacity"] = units["th_capacity"] / 1e3
382
383
        # assign bus ids
384
        print("  Assigning bus ids...")
385
        units = units.assign(
386
            bus_id=units.loc[~units.geom.x.isna()]
387
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
388
            .drop(columns=["index_right"])
389
            .bus_id
390
        )
391
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
392
393
        # write to DB
394
        print(f"  Writing {len(units)} units to DB...")
395
        units.to_postgis(
396
            name=target_tables[tech].__tablename__,
397
            con=engine,
398
            if_exists="append",
399
            schema=target_tables[tech].__table_args__["schema"],
400
        )
401