Passed
Pull Request — dev (#1054)
by
unknown
01:28
created

data.datasets.power_plants.mastr.import_mastr()   B

Complexity

Conditions 4

Size

Total Lines 178
Code Lines 129

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 129
dl 0
loc 178
rs 7
c 0
b 0
f 0
cc 4
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Empty data in columns `voltage_level`, `bus_id` (all tables), and `plant_type`
13
(supply.egon_power_plants_hydro) is indicated by the value -1.
14
15
The data is used especially for the generation of status quo grids by ding0.
16
"""
17
from geoalchemy2 import Geometry
18
from sqlalchemy import (
19
    Boolean,
20
    Column,
21
    DateTime,
22
    Float,
23
    Integer,
24
    Sequence,
25
    String,
26
)
27
from sqlalchemy.ext.declarative import declarative_base
28
import geopandas as gpd
29
import pandas as pd
30
31
from egon.data import db
32
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
33
import egon.data.config
34
35
Base = declarative_base()
36
37
TESTMODE_OFF = (
38
    egon.data.config.settings()["egon-data"]["--dataset-boundary"]
39
    == "Everything"
40
)
41
42
43
class EgonPowerPlantsPv(Base):
44
    __tablename__ = "egon_power_plants_pv"
45
    __table_args__ = {"schema": "supply"}
46
47
    id = Column(Integer, Sequence("pp_pv_seq"), primary_key=True)
48
    bus_id = Column(Integer, nullable=True)  # Grid district id
49
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
50
51
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
52
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
53
    postcode = Column(String(5), nullable=True)  # Postleitzahl
54
    city = Column(String(50), nullable=True)  # Ort
55
    federal_state = Column(String(31), nullable=True)  # Bundesland
56
57
    site_type = Column(String(69), nullable=True)  # Lage
58
    usage_sector = Column(String(36), nullable=True)  # Nutzungsbereich
59
    orientation_primary = Column(String(11), nullable=True)  # Hauptausrichtung
60
    orientation_primary_angle = Column(
61
        String(18), nullable=True
62
    )  # HauptausrichtungNeigungswinkel
63
    orientation_secondary = Column(
64
        String(11), nullable=True
65
    )  # Nebenausrichtung
66
    orientation_secondary_angle = Column(
67
        String(18), nullable=True
68
    )  # NebenausrichtungNeigungswinkel
69
    orientation_uniform = Column(
70
        Boolean, nullable=True
71
    )  # EinheitlicheAusrichtungUndNeigungswinkel
72
    module_count = Column(Float, nullable=True)  # AnzahlModule
73
74
    capacity = Column(Float, nullable=True)  # Nettonennleistung
75
    capacity_inverter = Column(
76
        Float, nullable=True
77
    )  # ZugeordneteWirkleistungWechselrichter in MW
78
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
79
    voltage_level = Column(Integer, nullable=True)
80
81
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
82
83
84
class EgonPowerPlantsWind(Base):
85
    __tablename__ = "egon_power_plants_wind"
86
    __table_args__ = {"schema": "supply"}
87
88
    id = Column(Integer, Sequence("pp_wind_seq"), primary_key=True)
89
    bus_id = Column(Integer, nullable=True)  # Grid district id
90
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
91
92
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
93
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
94
    postcode = Column(String(5), nullable=True)  # Postleitzahl
95
    city = Column(String(50), nullable=True)  # Ort
96
    federal_state = Column(String(31), nullable=True)  # Bundesland
97
98
    site_type = Column(String(17), nullable=True)  # Lage
99
    manufacturer_name = Column(String(100), nullable=True)  # Hersteller
100
    type_name = Column(String(100), nullable=True)  # Typenbezeichnung
101
    hub_height = Column(Float, nullable=True)  # Nabenhoehe
102
    rotor_diameter = Column(Float, nullable=True)  # Rotordurchmesser
103
104
    capacity = Column(Float, nullable=True)  # Nettonennleistung
105
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
106
    voltage_level = Column(Integer, nullable=True)
107
108
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
109
110
111
class EgonPowerPlantsBiomass(Base):
112
    __tablename__ = "egon_power_plants_biomass"
113
    __table_args__ = {"schema": "supply"}
114
115
    id = Column(Integer, Sequence("pp_biomass_seq"), primary_key=True)
116
    bus_id = Column(Integer, nullable=True)  # Grid district id
117
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
118
119
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
120
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
121
    postcode = Column(String(5), nullable=True)  # Postleitzahl
122
    city = Column(String(50), nullable=True)  # Ort
123
    federal_state = Column(String(31), nullable=True)  # Bundesland
124
125
    technology = Column(String(45), nullable=True)  # Technologie
126
    fuel_name = Column(String(52), nullable=True)  # Hauptbrennstoff
127
    fuel_type = Column(String(19), nullable=True)  # Biomasseart
128
129
    capacity = Column(Float, nullable=True)  # Nettonennleistung
130
    th_capacity = Column(Float, nullable=True)  # ThermischeNutzleistung
131
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
132
    voltage_level = Column(Integer, nullable=True)
133
134
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
135
136
137
class EgonPowerPlantsHydro(Base):
138
    __tablename__ = "egon_power_plants_hydro"
139
    __table_args__ = {"schema": "supply"}
140
141
    id = Column(Integer, Sequence("pp_hydro_seq"), primary_key=True)
142
    bus_id = Column(Integer, nullable=True)  # Grid district id
143
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
144
145
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
146
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
147
    postcode = Column(String(5), nullable=True)  # Postleitzahl
148
    city = Column(String(50), nullable=True)  # Ort
149
    federal_state = Column(String(31), nullable=True)  # Bundesland
150
151
    plant_type = Column(String(39), nullable=True)  # ArtDerWasserkraftanlage
152
    water_origin = Column(String(20), nullable=True)  # ArtDesZuflusses
153
154
    capacity = Column(Float, nullable=True)  # Nettonennleistung
155
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
156
    voltage_level = Column(Integer, nullable=True)
157
158
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
159
160
161
def import_mastr() -> None:
162
    """Import MaStR data into database"""
163
    engine = db.engine()
164
    cfg = egon.data.config.datasets()["power_plants"]
165
166
    cols_mapping = {
167
        "all": {
168
            "EinheitMastrNummer": "gens_id",
169
            "EinheitBetriebsstatus": "status",
170
            "Inbetriebnahmedatum": "commissioning_date",
171
            "Postleitzahl": "postcode",
172
            "Ort": "city",
173
            "Bundesland": "federal_state",
174
            "Nettonennleistung": "capacity",
175
            "Einspeisungsart": "feedin_type",
176
        },
177
        "pv": {
178
            "Lage": "site_type",
179
            "Nutzungsbereich": "usage_sector",
180
            "Hauptausrichtung": "orientation_primary",
181
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
182
            "Nebenausrichtung": "orientation_secondary",
183
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
184
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
185
            "AnzahlModule": "module_count",
186
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
187
        },
188
        "wind": {
189
            "Lage": "site_type",
190
            "Hersteller": "manufacturer_name",
191
            "Typenbezeichnung": "type_name",
192
            "Nabenhoehe": "hub_height",
193
            "Rotordurchmesser": "rotor_diameter",
194
        },
195
        "biomass": {
196
            "Technologie": "technology",
197
            "Hauptbrennstoff": "fuel_name",
198
            "Biomasseart": "fuel_type",
199
            "ThermischeNutzleistung": "th_capacity",
200
        },
201
        "hydro": {
202
            "ArtDerWasserkraftanlage": "plant_type",
203
            "ArtDesZuflusses": "water_origin",
204
        },
205
    }
206
207
    source_files = {
208
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
209
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
210
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
211
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
212
    }
213
    target_tables = {
214
        "pv": EgonPowerPlantsPv,
215
        "wind": EgonPowerPlantsWind,
216
        "biomass": EgonPowerPlantsBiomass,
217
        "hydro": EgonPowerPlantsHydro,
218
    }
219
    vlevel_mapping = {
220
        "Höchstspannung": 1,
221
        "UmspannungZurHochspannung": 2,
222
        "Hochspannung": 3,
223
        "UmspannungZurMittelspannung": 4,
224
        "Mittelspannung": 5,
225
        "UmspannungZurNiederspannung": 6,
226
        "Niederspannung": 7,
227
    }
228
229
    # import locations
230
    locations = pd.read_csv(
231
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
232
        index_col=None
233
    )
234
235
    # import grid districts
236
    mv_grid_districts = db.select_geodataframe(
237
        f"""
238
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
239
        """,
240
        epsg=4326,
241
    )
242
243
    # import units
244
    technologies = ["pv", "wind", "biomass", "hydro"]
245
    for tech in technologies:
246
        # read units
247
        print(f"Importing MaStR dataset: {tech}:")
248
        print("  Reading CSV and filtering data...")
249
        units = pd.read_csv(
250
            source_files[tech],
251
            usecols=(
252
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
253
                + list(cols_mapping["all"].keys())
254
                + list(cols_mapping[tech].keys())
255
            ),
256
            index_col=None,
257
            dtype={"Postleitzahl": str},
258
        ).rename(columns=cols_mapping)
259
260
        # drop units outside of Germany
261
        len_old = len(units)
262
        units = units.loc[units.Land == "Deutschland"]
263
        print(f"    {len_old-len(units)} units outside of Germany dropped...")
264
265
        # filter for SH units if in testmode
266
        if not TESTMODE_OFF:
267
            print(
268
                """    TESTMODE:
269
                Dropping all units outside of Schleswig-Holstein...
270
                """
271
            )
272
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
273
274
        # merge and rename voltage level
275
        print("  Merging with locations and allocate voltage level...")
276
        units = units.merge(
277
            locations[["MaStRNummer", "Spannungsebene"]],
278
            left_on="LokationMastrNummer",
279
            right_on="MaStRNummer",
280
            how="left",
281
        )
282
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
283
284
        # add geometry
285
        print("  Adding geometries...")
286
        units = gpd.GeoDataFrame(
287
            units,
288
            geometry=gpd.points_from_xy(
289
                units["Laengengrad"], units["Breitengrad"], crs=4326
290
            ),
291
            crs=4326,
292
        )
293
        units_wo_geom = len(
294
            units.loc[(units.Laengengrad.isna() | units.Laengengrad.isna())]
295
        )
296
        print(
297
            f"    {units_wo_geom}/{len(units)} units do not have a geometry!"
298
        )
299
300
        # drop unnecessary and rename columns
301
        print("  Reformatting...")
302
        units.drop(
303
            columns=[
304
                "LokationMastrNummer",
305
                "MaStRNummer",
306
                "Laengengrad",
307
                "Breitengrad",
308
                "Spannungsebene",
309
                "Land",
310
            ],
311
            inplace=True,
312
        )
313
        mapping = cols_mapping["all"].copy()
314
        mapping.update(cols_mapping[tech])
315
        mapping.update({"geometry": "geom"})
316
        units.rename(columns=mapping, inplace=True)
317
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
318
        if tech == "hydro":
319
            units["plant_type"] = units.plant_type.fillna(-1).astype(int)
320
        units.set_geometry("geom", inplace=True)
321
        units["id"] = range(0, len(units))
322
323
        # assign bus ids
324
        print("  Assigning bus ids...")
325
        units = (
326
            units.loc[~units.geom.x.isna()]
327
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
328
            .drop(columns=["index_right"])
329
        )
330
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
331
332
        # write to DB
333
        print("  Writing to DB...")
334
        units.to_postgis(
335
            name=target_tables[tech].__tablename__,
336
            con=engine,
337
            if_exists="append",
338
            schema=target_tables[tech].__table_args__["schema"],
339
        )
340