Passed
Pull Request — dev (#1054)
by
unknown
01:25
created

data.datasets.power_plants.mastr.import_mastr()   B

Complexity

Conditions 4

Size

Total Lines 151
Code Lines 114

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 114
dl 0
loc 151
rs 7
c 0
b 0
f 0
cc 4
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister is imported into the database. Only the
4
following technologies are taken into account:
5
* PV
6
* wind turbines
7
* biomass/biogas plants
8
* hydro plants
9
10
The data is used especially for the generation of status quo grids by ding0.
11
"""
12
from geoalchemy2 import Geometry
13
from sqlalchemy import (
14
    Boolean,
15
    Column,
16
    DateTime,
17
    Float,
18
    Integer,
19
    Sequence,
20
    String,
21
)
22
from sqlalchemy.ext.declarative import declarative_base
23
import geopandas as gpd
24
import pandas as pd
25
26
from egon.data import db
27
import egon.data.config
28
29
Base = declarative_base()
30
31
TESTMODE_OFF = (
32
    egon.data.config.settings()["egon-data"]["--dataset-boundary"]
33
    == "Everything"
34
)
35
36
37
class EgonPowerPlantsPv(Base):
38
    __tablename__ = "egon_power_plants_pv"
39
    __table_args__ = {"schema": "supply"}
40
41
    id = Column(Integer, Sequence("pp_pv_seq"), primary_key=True)
42
    bus_id = Column(Integer, nullable=True)  # Grid district id
43
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
44
45
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
46
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
47
    postcode = Column(String(5), nullable=True)  # Postleitzahl
48
    city = Column(String(50), nullable=True)  # Ort
49
    federal_state = Column(String(31), nullable=True)  # Bundesland
50
51
    site_type = Column(String(69), nullable=True)  # Lage
52
    usage_sector = Column(String(36), nullable=True)  # Nutzungsbereich
53
    orientation_primary = Column(String(11), nullable=True)  # Hauptausrichtung
54
    orientation_primary_angle = Column(
55
        String(18), nullable=True
56
    )  # HauptausrichtungNeigungswinkel
57
    orientation_secondary = Column(
58
        String(11), nullable=True
59
    )  # Nebenausrichtung
60
    orientation_secondary_angle = Column(
61
        String(18), nullable=True
62
    )  # NebenausrichtungNeigungswinkel
63
    orientation_uniform = Column(
64
        Boolean, nullable=True
65
    )  # EinheitlicheAusrichtungUndNeigungswinkel
66
    module_count = Column(Float, nullable=True)  # AnzahlModule
67
68
    capacity = Column(Float, nullable=True)  # Nettonennleistung
69
    capacity_inverter = Column(
70
        Float, nullable=True
71
    )  # ZugeordneteWirkleistungWechselrichter in MW
72
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
73
    voltage_level = Column(Integer, nullable=True)
74
75
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
76
77
78
class EgonPowerPlantsWind(Base):
79
    __tablename__ = "egon_power_plants_wind"
80
    __table_args__ = {"schema": "supply"}
81
82
    id = Column(Integer, Sequence("pp_wind_seq"), primary_key=True)
83
    bus_id = Column(Integer, nullable=True)  # Grid district id
84
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
85
86
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
87
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
88
    postcode = Column(String(5), nullable=True)  # Postleitzahl
89
    city = Column(String(50), nullable=True)  # Ort
90
    federal_state = Column(String(31), nullable=True)  # Bundesland
91
92
    site_type = Column(String(17), nullable=True)  # Lage
93
    manufacturer_name = Column(String(100), nullable=True)  # Hersteller
94
    type_name = Column(String(100), nullable=True)  # Typenbezeichnung
95
    hub_height = Column(Float, nullable=True)  # Nabenhoehe
96
    rotor_diameter = Column(Float, nullable=True)  # Rotordurchmesser
97
98
    capacity = Column(Float, nullable=True)  # Nettonennleistung
99
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
100
    voltage_level = Column(Integer, nullable=True)
101
102
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
103
104
105
class EgonPowerPlantsBiomass(Base):
106
    __tablename__ = "egon_power_plants_biomass"
107
    __table_args__ = {"schema": "supply"}
108
109
    id = Column(Integer, Sequence("pp_biomass_seq"), primary_key=True)
110
    bus_id = Column(Integer, nullable=True)  # Grid district id
111
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
112
113
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
114
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
115
    postcode = Column(String(5), nullable=True)  # Postleitzahl
116
    city = Column(String(50), nullable=True)  # Ort
117
    federal_state = Column(String(31), nullable=True)  # Bundesland
118
119
    technology = Column(String(45), nullable=True)  # Technologie
120
    fuel_name = Column(String(52), nullable=True)  # Hauptbrennstoff
121
    fuel_type = Column(String(19), nullable=True)  # Biomasseart
122
123
    capacity = Column(Float, nullable=True)  # Nettonennleistung
124
    th_capacity = Column(Float, nullable=True)  # ThermischeNutzleistung
125
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
126
    voltage_level = Column(Integer, nullable=True)
127
128
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
129
130
131
class EgonPowerPlantsHydro(Base):
132
    __tablename__ = "egon_power_plants_hydro"
133
    __table_args__ = {"schema": "supply"}
134
135
    id = Column(Integer, Sequence("pp_hydro_seq"), primary_key=True)
136
    bus_id = Column(Integer, nullable=True)  # Grid district id
137
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
138
139
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
140
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
141
    postcode = Column(String(5), nullable=True)  # Postleitzahl
142
    city = Column(String(50), nullable=True)  # Ort
143
    federal_state = Column(String(31), nullable=True)  # Bundesland
144
145
    plant_type = Column(Integer, nullable=True)  # ArtDerWasserkraftanlage
146
    water_origin = Column(String(20), nullable=True)  # ArtDesZuflusses
147
148
    capacity = Column(Float, nullable=True)  # Nettonennleistung
149
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
150
    voltage_level = Column(Integer, nullable=True)
151
152
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
153
154
155
def import_mastr():
156
    engine = db.engine()
157
    cfg = egon.data.config.datasets()["power_plants"]
158
159
    cols_mapping = {
160
        "all": {
161
            "EinheitMastrNummer": "gens_id",
162
            "EinheitBetriebsstatus": "status",
163
            "Inbetriebnahmedatum": "commissioning_date",
164
            "Postleitzahl": "postcode",
165
            "Ort": "city",
166
            "Bundesland": "federal_state",
167
            "Nettonennleistung": "capacity",
168
            "Einspeisungsart": "feedin_type",
169
        },
170
        "pv": {
171
            "Lage": "site_type",
172
            "Nutzungsbereich": "usage_sector",
173
            "Hauptausrichtung": "orientation_primary",
174
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
175
            "Nebenausrichtung": "orientation_secondary",
176
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
177
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
178
            "AnzahlModule": "module_count",
179
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
180
        },
181
        "wind": {
182
            "Lage": "site_type",
183
            "Hersteller": "manufacturer_name",
184
            "Typenbezeichnung": "type_name",
185
            "Nabenhoehe": "hub_height",
186
            "Rotordurchmesser": "rotor_diameter",
187
        },
188
        "biomass": {
189
            "Technologie": "technology",
190
            "Hauptbrennstoff": "fuel_name",
191
            "Biomasseart": "fuel_type",
192
            "ThermischeNutzleistung": "th_capacity",
193
        },
194
        "hydro": {
195
            "ArtDerWasserkraftanlage": "plant_type",
196
            "ArtDesZuflusses": "water_origin",
197
        },
198
    }
199
200
    source_files = {
201
        "pv": cfg["sources"]["mastr_pv"],
202
        "wind": cfg["sources"]["mastr_wind"],
203
        "biomass": cfg["sources"]["mastr_biomass"],
204
        "hydro": cfg["sources"]["mastr_hydro"],
205
    }
206
    target_tables = {
207
        "pv": EgonPowerPlantsPv,
208
        "wind": EgonPowerPlantsWind,
209
        "biomass": EgonPowerPlantsBiomass,
210
        "hydro": EgonPowerPlantsHydro,
211
    }
212
213
    # import locations
214
    locations = pd.read_csv(cfg["sources"]["mastr_location"], index_col=None)
215
216
    # import units
217
    technologies = ["pv", "wind", "biomass", "hydro"]
218
    for tech in technologies:
219
        # read units
220
        print(f"Importing MaStR dataset: {tech}:")
221
        print("  Reading CSV and filtering data...")
222
        units = pd.read_csv(
223
            source_files[tech],
224
            usecols=(
225
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
226
                + list(cols_mapping["all"].keys())
227
                + list(cols_mapping[tech].keys())
228
            ),
229
            index_col=None,
230
            dtype={"Postleitzahl": str},
231
        ).rename(columns=cols_mapping)
232
233
        # drop units outside of Germany
234
        len_old = len(units)
235
        units = units.loc[units.Land == "Deutschland"]
236
        print(f"    {len_old-len(units)} units outside of Germany dropped...")
237
238
        # filter for SH units if in testmode
239
        if not TESTMODE_OFF:
240
            print(
241
                """    TESTMODE:
242
                Dropping all units outside of Schleswig-Holstein...
243
                """
244
            )
245
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
246
247
        # merge and rename voltage level
248
        print("  Merging with locations and allocate voltage level...")
249
        units = units.merge(
250
            locations[["MaStRNummer", "Spannungsebene"]],
251
            left_on="LokationMastrNummer",
252
            right_on="MaStRNummer",
253
            how="left",
254
        )
255
        vlevel_mapping = {
256
            "Höchstspannung": 1,
257
            "UmspannungZurHochspannung": 2,
258
            "Hochspannung": 3,
259
            "UmspannungZurMittelspannung": 4,
260
            "Mittelspannung": 5,
261
            "UmspannungZurNiederspannung": 6,
262
            "Niederspannung": 7,
263
        }
264
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
265
266
        # add geometry
267
        print("  Adding geometries...")
268
        units = gpd.GeoDataFrame(
269
            units,
270
            geometry=gpd.points_from_xy(
271
                units["Laengengrad"], units["Breitengrad"], crs=4326
272
            ),
273
            crs=4326,
274
        )
275
276
        # drop unnecessary and rename columns
277
        print("  Reformatting...")
278
        units.drop(
279
            columns=[
280
                "LokationMastrNummer",
281
                "MaStRNummer",
282
                "Laengengrad",
283
                "Breitengrad",
284
                "Spannungsebene",
285
                "Land",
286
            ],
287
            inplace=True,
288
        )
289
        mapping = cols_mapping["all"].copy()
290
        mapping.update(cols_mapping[tech])
291
        mapping.update({"geometry": "geom"})
292
        units.rename(columns=mapping, inplace=True)
293
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
294
        if tech == "hydro":
295
            units["plant_type"] = units.plant_type.fillna(-1).astype(int)
296
        units.set_geometry("geom", inplace=True)
297
        units["id"] = range(0, len(units))
298
299
        # write to DB
300
        print("  Writing to DB...")
301
        units.to_postgis(
302
            name=target_tables[tech].__tablename__,
303
            con=engine,
304
            if_exists="append",
305
            schema=target_tables[tech].__table_args__["schema"],
306
        )
307