Passed
Pull Request — dev (#1054)
by
unknown
02:17 queued 14s
created

data.datasets.power_plants.mastr   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 324
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 4
eloc 227
dl 0
loc 324
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
B import_mastr() 0 169 4
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister is imported into the database. Only the
4
following technologies are taken into account:
5
* PV
6
* wind turbines
7
* biomass/biogas plants
8
* hydro plants
9
10
The data is used especially for the generation of status quo grids by ding0.
11
"""
12
from geoalchemy2 import Geometry
13
from sqlalchemy import (
14
    Boolean,
15
    Column,
16
    DateTime,
17
    Float,
18
    Integer,
19
    Sequence,
20
    String,
21
)
22
from sqlalchemy.ext.declarative import declarative_base
23
import geopandas as gpd
24
import pandas as pd
25
26
from egon.data import db
27
import egon.data.config
28
29
Base = declarative_base()
30
31
TESTMODE_OFF = (
32
    egon.data.config.settings()["egon-data"]["--dataset-boundary"]
33
    == "Everything"
34
)
35
36
37
class EgonPowerPlantsPv(Base):
38
    __tablename__ = "egon_power_plants_pv"
39
    __table_args__ = {"schema": "supply"}
40
41
    id = Column(Integer, Sequence("pp_pv_seq"), primary_key=True)
42
    bus_id = Column(Integer, nullable=True)  # Grid district id
43
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
44
45
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
46
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
47
    postcode = Column(String(5), nullable=True)  # Postleitzahl
48
    city = Column(String(50), nullable=True)  # Ort
49
    federal_state = Column(String(31), nullable=True)  # Bundesland
50
51
    site_type = Column(String(69), nullable=True)  # Lage
52
    usage_sector = Column(String(36), nullable=True)  # Nutzungsbereich
53
    orientation_primary = Column(String(11), nullable=True)  # Hauptausrichtung
54
    orientation_primary_angle = Column(
55
        String(18), nullable=True
56
    )  # HauptausrichtungNeigungswinkel
57
    orientation_secondary = Column(
58
        String(11), nullable=True
59
    )  # Nebenausrichtung
60
    orientation_secondary_angle = Column(
61
        String(18), nullable=True
62
    )  # NebenausrichtungNeigungswinkel
63
    orientation_uniform = Column(
64
        Boolean, nullable=True
65
    )  # EinheitlicheAusrichtungUndNeigungswinkel
66
    module_count = Column(Float, nullable=True)  # AnzahlModule
67
68
    capacity = Column(Float, nullable=True)  # Nettonennleistung
69
    capacity_inverter = Column(
70
        Float, nullable=True
71
    )  # ZugeordneteWirkleistungWechselrichter in MW
72
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
73
    voltage_level = Column(Integer, nullable=True)
74
75
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
76
77
78
class EgonPowerPlantsWind(Base):
79
    __tablename__ = "egon_power_plants_wind"
80
    __table_args__ = {"schema": "supply"}
81
82
    id = Column(Integer, Sequence("pp_wind_seq"), primary_key=True)
83
    bus_id = Column(Integer, nullable=True)  # Grid district id
84
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
85
86
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
87
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
88
    postcode = Column(String(5), nullable=True)  # Postleitzahl
89
    city = Column(String(50), nullable=True)  # Ort
90
    federal_state = Column(String(31), nullable=True)  # Bundesland
91
92
    site_type = Column(String(17), nullable=True)  # Lage
93
    manufacturer_name = Column(String(100), nullable=True)  # Hersteller
94
    type_name = Column(String(100), nullable=True)  # Typenbezeichnung
95
    hub_height = Column(Float, nullable=True)  # Nabenhoehe
96
    rotor_diameter = Column(Float, nullable=True)  # Rotordurchmesser
97
98
    capacity = Column(Float, nullable=True)  # Nettonennleistung
99
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
100
    voltage_level = Column(Integer, nullable=True)
101
102
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
103
104
105
class EgonPowerPlantsBiomass(Base):
106
    __tablename__ = "egon_power_plants_biomass"
107
    __table_args__ = {"schema": "supply"}
108
109
    id = Column(Integer, Sequence("pp_biomass_seq"), primary_key=True)
110
    bus_id = Column(Integer, nullable=True)  # Grid district id
111
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
112
113
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
114
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
115
    postcode = Column(String(5), nullable=True)  # Postleitzahl
116
    city = Column(String(50), nullable=True)  # Ort
117
    federal_state = Column(String(31), nullable=True)  # Bundesland
118
119
    technology = Column(String(45), nullable=True)  # Technologie
120
    fuel_name = Column(String(52), nullable=True)  # Hauptbrennstoff
121
    fuel_type = Column(String(19), nullable=True)  # Biomasseart
122
123
    capacity = Column(Float, nullable=True)  # Nettonennleistung
124
    th_capacity = Column(Float, nullable=True)  # ThermischeNutzleistung
125
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
126
    voltage_level = Column(Integer, nullable=True)
127
128
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
129
130
131
class EgonPowerPlantsHydro(Base):
132
    __tablename__ = "egon_power_plants_hydro"
133
    __table_args__ = {"schema": "supply"}
134
135
    id = Column(Integer, Sequence("pp_hydro_seq"), primary_key=True)
136
    bus_id = Column(Integer, nullable=True)  # Grid district id
137
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
138
139
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
140
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
141
    postcode = Column(String(5), nullable=True)  # Postleitzahl
142
    city = Column(String(50), nullable=True)  # Ort
143
    federal_state = Column(String(31), nullable=True)  # Bundesland
144
145
    plant_type = Column(Integer, nullable=True)  # ArtDerWasserkraftanlage
146
    water_origin = Column(String(20), nullable=True)  # ArtDesZuflusses
147
148
    capacity = Column(Float, nullable=True)  # Nettonennleistung
149
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
150
    voltage_level = Column(Integer, nullable=True)
151
152
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
153
154
155
def import_mastr() -> None:
156
    """Import MaStR data into database"""
157
    engine = db.engine()
158
    cfg = egon.data.config.datasets()["power_plants"]
159
160
    cols_mapping = {
161
        "all": {
162
            "EinheitMastrNummer": "gens_id",
163
            "EinheitBetriebsstatus": "status",
164
            "Inbetriebnahmedatum": "commissioning_date",
165
            "Postleitzahl": "postcode",
166
            "Ort": "city",
167
            "Bundesland": "federal_state",
168
            "Nettonennleistung": "capacity",
169
            "Einspeisungsart": "feedin_type",
170
        },
171
        "pv": {
172
            "Lage": "site_type",
173
            "Nutzungsbereich": "usage_sector",
174
            "Hauptausrichtung": "orientation_primary",
175
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
176
            "Nebenausrichtung": "orientation_secondary",
177
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
178
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
179
            "AnzahlModule": "module_count",
180
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
181
        },
182
        "wind": {
183
            "Lage": "site_type",
184
            "Hersteller": "manufacturer_name",
185
            "Typenbezeichnung": "type_name",
186
            "Nabenhoehe": "hub_height",
187
            "Rotordurchmesser": "rotor_diameter",
188
        },
189
        "biomass": {
190
            "Technologie": "technology",
191
            "Hauptbrennstoff": "fuel_name",
192
            "Biomasseart": "fuel_type",
193
            "ThermischeNutzleistung": "th_capacity",
194
        },
195
        "hydro": {
196
            "ArtDerWasserkraftanlage": "plant_type",
197
            "ArtDesZuflusses": "water_origin",
198
        },
199
    }
200
201
    source_files = {
202
        "pv": cfg["sources"]["mastr_pv"],
203
        "wind": cfg["sources"]["mastr_wind"],
204
        "biomass": cfg["sources"]["mastr_biomass"],
205
        "hydro": cfg["sources"]["mastr_hydro"],
206
    }
207
    target_tables = {
208
        "pv": EgonPowerPlantsPv,
209
        "wind": EgonPowerPlantsWind,
210
        "biomass": EgonPowerPlantsBiomass,
211
        "hydro": EgonPowerPlantsHydro,
212
    }
213
214
    # import locations
215
    locations = pd.read_csv(cfg["sources"]["mastr_location"], index_col=None)
216
217
    # import grid districts
218
    mv_grid_districts = db.select_geodataframe(
219
        f"""
220
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
221
        """,
222
        epsg=4326,
223
    )
224
225
    # import units
226
    technologies = ["pv", "wind", "biomass", "hydro"]
227
    for tech in technologies:
228
        # read units
229
        print(f"Importing MaStR dataset: {tech}:")
230
        print("  Reading CSV and filtering data...")
231
        units = pd.read_csv(
232
            source_files[tech],
233
            usecols=(
234
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
235
                + list(cols_mapping["all"].keys())
236
                + list(cols_mapping[tech].keys())
237
            ),
238
            index_col=None,
239
            dtype={"Postleitzahl": str},
240
        ).rename(columns=cols_mapping)
241
242
        # drop units outside of Germany
243
        len_old = len(units)
244
        units = units.loc[units.Land == "Deutschland"]
245
        print(f"    {len_old-len(units)} units outside of Germany dropped...")
246
247
        # filter for SH units if in testmode
248
        if not TESTMODE_OFF:
249
            print(
250
                """    TESTMODE:
251
                Dropping all units outside of Schleswig-Holstein...
252
                """
253
            )
254
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
255
256
        # merge and rename voltage level
257
        print("  Merging with locations and allocate voltage level...")
258
        units = units.merge(
259
            locations[["MaStRNummer", "Spannungsebene"]],
260
            left_on="LokationMastrNummer",
261
            right_on="MaStRNummer",
262
            how="left",
263
        )
264
        vlevel_mapping = {
265
            "Höchstspannung": 1,
266
            "UmspannungZurHochspannung": 2,
267
            "Hochspannung": 3,
268
            "UmspannungZurMittelspannung": 4,
269
            "Mittelspannung": 5,
270
            "UmspannungZurNiederspannung": 6,
271
            "Niederspannung": 7,
272
        }
273
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
274
275
        # add geometry
276
        print("  Adding geometries...")
277
        units = gpd.GeoDataFrame(
278
            units,
279
            geometry=gpd.points_from_xy(
280
                units["Laengengrad"], units["Breitengrad"], crs=4326
281
            ),
282
            crs=4326,
283
        )
284
285
        # drop unnecessary and rename columns
286
        print("  Reformatting...")
287
        units.drop(
288
            columns=[
289
                "LokationMastrNummer",
290
                "MaStRNummer",
291
                "Laengengrad",
292
                "Breitengrad",
293
                "Spannungsebene",
294
                "Land",
295
            ],
296
            inplace=True,
297
        )
298
        mapping = cols_mapping["all"].copy()
299
        mapping.update(cols_mapping[tech])
300
        mapping.update({"geometry": "geom"})
301
        units.rename(columns=mapping, inplace=True)
302
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
303
        if tech == "hydro":
304
            units["plant_type"] = units.plant_type.fillna(-1).astype(int)
305
        units.set_geometry("geom", inplace=True)
306
        units["id"] = range(0, len(units))
307
308
        # assign bus ids
309
        print("  Assigning bus ids...")
310
        units = (
311
            units.loc[~units.geom.x.isna()]
312
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
313
            .drop(columns=["index_right"])
314
        )
315
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
316
317
        # write to DB
318
        print("  Writing to DB...")
319
        units.to_postgis(
320
            name=target_tables[tech].__tablename__,
321
            con=engine,
322
            if_exists="append",
323
            schema=target_tables[tech].__table_args__["schema"],
324
        )
325