Passed
Pull Request — dev (#840)
by
unknown
01:36
created

data.datasets.power_plants.assign_gas_bus_id()   A

Complexity

Conditions 1

Size

Total Lines 31
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 31
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""The central module containing all code dealing with power plant data.
2
"""
3
from geoalchemy2 import Geometry
4
from sqlalchemy import (
5
    BigInteger,
6
    Boolean,
7
    Column,
8
    Float,
9
    Integer,
10
    Sequence,
11
    String,
12
)
13
from sqlalchemy.dialects.postgresql import JSONB
14
from sqlalchemy.ext.declarative import declarative_base
15
from sqlalchemy.orm import sessionmaker
16
import geopandas as gpd
17
import numpy as np
18
import pandas as pd
19
20
from egon.data import db
21
from egon.data.datasets import Dataset
22
from egon.data.datasets.power_plants.conventional import (
23
    match_nep_no_chp,
24
    select_nep_power_plants,
25
    select_no_chp_combustion_mastr,
26
)
27
from egon.data.datasets.power_plants.pv_rooftop import pv_rooftop_per_mv_grid
28
import egon.data.config
29
import egon.data.datasets.power_plants.assign_weather_data as assign_weather_data
30
import egon.data.datasets.power_plants.pv_ground_mounted as pv_ground_mounted
31
import egon.data.datasets.power_plants.wind_farms as wind_onshore
32
import egon.data.datasets.power_plants.wind_offshore as wind_offshore
33
34
Base = declarative_base()
35
36
37 View Code Duplication
class EgonPowerPlants(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
38
    __tablename__ = "egon_power_plants"
39
    __table_args__ = {"schema": "supply"}
40
    id = Column(BigInteger, Sequence("pp_seq"), primary_key=True)
41
    sources = Column(JSONB)
42
    source_id = Column(JSONB)
43
    carrier = Column(String)
44
    el_capacity = Column(Float)
45
    bus_id = Column(Integer)
46
    voltage_level = Column(Integer)
47
    weather_cell_id = Column(Integer)
48
    scenario = Column(String)
49
    geom = Column(Geometry("POINT", 4326))
50
51
52
class PowerPlants(Dataset):
53
    def __init__(self, dependencies):
54
        super().__init__(
55
            name="PowerPlants",
56
            version="0.0.8",
57
            dependencies=dependencies,
58
            tasks=(
59
                create_tables,
60
                insert_hydro_biomass,
61
                allocate_conventional_non_chp_power_plants,
62
                {
63
                    wind_onshore.insert,
64
                    pv_ground_mounted.insert,
65
                    pv_rooftop_per_mv_grid,
66
                },
67
                wind_offshore.insert,
68
                assign_weather_data.weatherId_and_busId,
69
            ),
70
        )
71
72
73
def create_tables():
74
    """Create tables for power plant data
75
    Returns
76
    -------
77
    None.
78
    """
79
80
    cfg = egon.data.config.datasets()["power_plants"]
81
    db.execute_sql(f"CREATE SCHEMA IF NOT EXISTS {cfg['target']['schema']};")
82
    engine = db.engine()
83
    db.execute_sql(
84
        f"""DROP TABLE IF EXISTS
85
        {cfg['target']['schema']}.{cfg['target']['table']}"""
86
    )
87
88
    db.execute_sql("""DROP SEQUENCE IF EXISTS pp_seq""")
89
    EgonPowerPlants.__table__.create(bind=engine, checkfirst=True)
90
91
92
def scale_prox2now(df, target, level="federal_state"):
93
    """Scale installed capacities linear to status quo power plants
94
95
    Parameters
96
    ----------
97
    df : pandas.DataFrame
98
        Status Quo power plants
99
    target : pandas.Series
100
        Target values for future sceanrio
101
    level : str, optional
102
        Scale per 'federal_state' or 'country'. The default is 'federal_state'.
103
104
    Returns
105
    -------
106
    df : pandas.DataFrame
107
        Future power plants
108
109
    """
110
111
    if level == "federal_state":
112
        df.loc[:, "Nettonennleistung"] = (
113
            df.groupby(df.Bundesland)
114
            .Nettonennleistung.apply(lambda grp: grp / grp.sum())
115
            .mul(target[df.Bundesland.values].values)
116
        )
117
    else:
118
        df.loc[:, "Nettonennleistung"] = df.Nettonennleistung.apply(
119
            lambda x: x / x.sum()
120
        ).mul(target.values)
121
122
    df = df[df.Nettonennleistung > 0]
123
124
    return df
125
126
127
def select_target(carrier, scenario):
128
    """Select installed capacity per scenario and carrier
129
130
    Parameters
131
    ----------
132
    carrier : str
133
        Name of energy carrier
134
    scenario : str
135
        Name of scenario
136
137
    Returns
138
    -------
139
    pandas.Series
140
        Target values for carrier and scenario
141
142
    """
143
    cfg = egon.data.config.datasets()["power_plants"]
144
145
    return (
146
        pd.read_sql(
147
            f"""SELECT DISTINCT ON (b.gen)
148
                         REPLACE(REPLACE(b.gen, '-', ''), 'ü', 'ue') as state,
149
                         a.capacity
150
                         FROM {cfg['sources']['capacities']} a,
151
                         {cfg['sources']['geom_federal_states']} b
152
                         WHERE a.nuts = b.nuts
153
                         AND scenario_name = '{scenario}'
154
                         AND carrier = '{carrier}'
155
                         AND b.gen NOT IN ('Baden-Württemberg (Bodensee)',
156
                                           'Bayern (Bodensee)')""",
157
            con=db.engine(),
158
        )
159
        .set_index("state")
160
        .capacity
161
    )
162
163
164
def filter_mastr_geometry(mastr, federal_state=None):
165
    """Filter data from MaStR by geometry
166
167
    Parameters
168
    ----------
169
    mastr : pandas.DataFrame
170
        All power plants listed in MaStR
171
    federal_state : str or None
172
        Name of federal state whoes power plants are returned.
173
        If None, data for Germany is returned
174
175
    Returns
176
    -------
177
    mastr_loc : pandas.DataFrame
178
        Power plants listed in MaStR with geometry inside German boundaries
179
180
    """
181
    cfg = egon.data.config.datasets()["power_plants"]
182
183
    if type(mastr) == pd.core.frame.DataFrame:
184
        # Drop entries without geometry for insert
185
        mastr_loc = mastr[
186
            mastr.Laengengrad.notnull() & mastr.Breitengrad.notnull()
187
        ]
188
189
        # Create geodataframe
190
        mastr_loc = gpd.GeoDataFrame(
191
            mastr_loc,
192
            geometry=gpd.points_from_xy(
193
                mastr_loc.Laengengrad, mastr_loc.Breitengrad, crs=4326
194
            ),
195
        )
196
    else:
197
        mastr_loc = mastr.copy()
198
199
    # Drop entries outside of germany or federal state
200
    if not federal_state:
201
        sql = f"SELECT geometry as geom FROM {cfg['sources']['geom_germany']}"
202
    else:
203
        sql = f"""
204
        SELECT geometry as geom
205
        FROM boundaries.vg250_lan_union
206
        WHERE REPLACE(REPLACE(gen, '-', ''), 'ü', 'ue') = '{federal_state}'"""
207
208
    mastr_loc = (
209
        gpd.sjoin(
210
            gpd.read_postgis(sql, con=db.engine()).to_crs(4326),
211
            mastr_loc,
212
            how="right",
213
        )
214
        .query("index_left==0")
215
        .drop("index_left", axis=1)
216
    )
217
218
    return mastr_loc
219
220
221
def insert_biomass_plants(scenario):
222
    """Insert biomass power plants of future scenario
223
224
    Parameters
225
    ----------
226
    scenario : str
227
        Name of scenario.
228
229
    Returns
230
    -------
231
    None.
232
233
    """
234
    cfg = egon.data.config.datasets()["power_plants"]
235
236
    # import target values from NEP 2021, scneario C 2035
237
    target = select_target("biomass", scenario)
238
239
    # import data for MaStR
240
    mastr = pd.read_csv(cfg["sources"]["mastr_biomass"]).query(
241
        "EinheitBetriebsstatus=='InBetrieb'"
242
    )
243
244
    # Drop entries without federal state or 'AusschließlichWirtschaftszone'
245
    mastr = mastr[
246
        mastr.Bundesland.isin(
247
            pd.read_sql(
248
                f"""SELECT DISTINCT ON (gen)
249
        REPLACE(REPLACE(gen, '-', ''), 'ü', 'ue') as states
250
        FROM {cfg['sources']['geom_federal_states']}""",
251
                con=db.engine(),
252
            ).states.values
253
        )
254
    ]
255
256
    # Scaling will be done per federal state in case of eGon2035 scenario.
257
    if scenario == "eGon2035":
258
        level = "federal_state"
259
    else:
260
        level = "country"
261
262
    # Choose only entries with valid geometries inside DE/test mode
263
    mastr_loc = filter_mastr_geometry(mastr).set_geometry("geometry")
264
265
    # Scale capacities to meet target values
266
    mastr_loc = scale_prox2now(mastr_loc, target, level=level)
267
268
    # Assign bus_id
269
    if len(mastr_loc) > 0:
270
        mastr_loc["voltage_level"] = assign_voltage_level(mastr_loc, cfg)
271
        mastr_loc = assign_bus_id(mastr_loc, cfg)
272
273
    # Insert entries with location
274
    session = sessionmaker(bind=db.engine())()
275
276
    for i, row in mastr_loc.iterrows():
277
        if not row.ThermischeNutzleistung > 0:
278
            entry = EgonPowerPlants(
279
                sources={"el_capacity": "MaStR scaled with NEP 2021"},
280
                source_id={"MastrNummer": row.EinheitMastrNummer},
281
                carrier="biomass",
282
                el_capacity=row.Nettonennleistung,
283
                scenario=scenario,
284
                bus_id=row.bus_id,
285
                voltage_level=row.voltage_level,
286
                geom=f"SRID=4326;POINT({row.Laengengrad} {row.Breitengrad})",
287
            )
288
            session.add(entry)
289
290
    session.commit()
291
292
293
def insert_hydro_plants(scenario):
294
    """Insert hydro power plants of future scenario.
295
296
    Hydro power plants are diveded into run_of_river and reservoir plants
297
    according to Marktstammdatenregister.
298
    Additional hydro technologies (e.g. turbines inside drinking water
299
    systems) are not considered.
300
301
    Parameters
302
    ----------
303
    scenario : str
304
        Name of scenario.
305
306
    Returns
307
    -------
308
    None.
309
310
    """
311
    cfg = egon.data.config.datasets()["power_plants"]
312
313
    # Map MaStR carriers to eGon carriers
314
    map_carrier = {
315
        "run_of_river": ["Laufwasseranlage"],
316
        "reservoir": ["Speicherwasseranlage"],
317
    }
318
319
    for carrier in map_carrier.keys():
320
321
        # import target values
322
        target = select_target(carrier, scenario)
323
324
        # import data for MaStR
325
        mastr = pd.read_csv(cfg["sources"]["mastr_hydro"]).query(
326
            "EinheitBetriebsstatus=='InBetrieb'"
327
        )
328
329
        # Choose only plants with specific carriers
330
        mastr = mastr[mastr.ArtDerWasserkraftanlage.isin(map_carrier[carrier])]
331
332
        # Drop entries without federal state or 'AusschließlichWirtschaftszone'
333
        mastr = mastr[
334
            mastr.Bundesland.isin(
335
                pd.read_sql(
336
                    f"""SELECT DISTINCT ON (gen)
337
            REPLACE(REPLACE(gen, '-', ''), 'ü', 'ue') as states
338
            FROM {cfg['sources']['geom_federal_states']}""",
339
                    con=db.engine(),
340
                ).states.values
341
            )
342
        ]
343
344
        # Scaling will be done per federal state in case of eGon2035 scenario.
345
        if scenario == "eGon2035":
346
            level = "federal_state"
347
        else:
348
            level = "country"
349
350
        # Scale capacities to meet target values
351
        mastr = scale_prox2now(mastr, target, level=level)
352
353
        # Choose only entries with valid geometries inside DE/test mode
354
        mastr_loc = filter_mastr_geometry(mastr).set_geometry("geometry")
355
        # TODO: Deal with power plants without geometry
356
357
        # Assign bus_id and voltage level
358
        if len(mastr_loc) > 0:
359
            mastr_loc["voltage_level"] = assign_voltage_level(mastr_loc, cfg)
360
            mastr_loc = assign_bus_id(mastr_loc, cfg)
361
362
        # Insert entries with location
363
        session = sessionmaker(bind=db.engine())()
364
        for i, row in mastr_loc.iterrows():
365
            entry = EgonPowerPlants(
366
                sources={"el_capacity": "MaStR scaled with NEP 2021"},
367
                source_id={"MastrNummer": row.EinheitMastrNummer},
368
                carrier=carrier,
369
                el_capacity=row.Nettonennleistung,
370
                scenario=scenario,
371
                bus_id=row.bus_id,
372
                voltage_level=row.voltage_level,
373
                geom=f"SRID=4326;POINT({row.Laengengrad} {row.Breitengrad})",
374
            )
375
            session.add(entry)
376
377
        session.commit()
378
379
380
def assign_voltage_level(mastr_loc, cfg):
381
    """Assigns voltage level to power plants.
382
383
    If location data inluding voltage level is available from
384
    Marktstammdatenregister, this is used. Otherwise the voltage level is
385
    assigned according to the electrical capacity.
386
387
    Parameters
388
    ----------
389
    mastr_loc : pandas.DataFrame
390
        Power plants listed in MaStR with geometry inside German boundaries
391
392
    Returns
393
    -------
394
    pandas.DataFrame
395
        Power plants including voltage_level
396
397
    """
398
    mastr_loc["Spannungsebene"] = np.nan
399
    mastr_loc["voltage_level"] = np.nan
400
401
    if "LokationMastrNummer" in mastr_loc.columns:
402
        location = pd.read_csv(
403
            cfg["sources"]["mastr_location"],
404
            usecols=["LokationMastrNummer", "Spannungsebene"],
405
        ).set_index("LokationMastrNummer")
406
407
        location = location[~location.index.duplicated(keep="first")]
408
409
        mastr_loc.loc[
410
            mastr_loc[
411
                mastr_loc.LokationMastrNummer.isin(location.index)
412
            ].index,
413
            "Spannungsebene",
414
        ] = location.Spannungsebene[
415
            mastr_loc[
416
                mastr_loc.LokationMastrNummer.isin(location.index)
417
            ].LokationMastrNummer
418
        ].values
419
420
        # Transfer voltage_level as integer from Spanungsebene
421
        map_voltage_levels = pd.Series(
422
            data={
423
                "Höchstspannung": 1,
424
                "Hoechstspannung": 1,
425
                "UmspannungZurHochspannung": 2,
426
                "Hochspannung": 3,
427
                "UmspannungZurMittelspannung": 4,
428
                "Mittelspannung": 5,
429
                "UmspannungZurNiederspannung": 6,
430
                "Niederspannung": 7,
431
            }
432
        )
433
434
        mastr_loc.loc[
435
            mastr_loc[mastr_loc["Spannungsebene"].notnull()].index,
436
            "voltage_level",
437
        ] = map_voltage_levels[
438
            mastr_loc.loc[
439
                mastr_loc[mastr_loc["Spannungsebene"].notnull()].index,
440
                "Spannungsebene",
441
            ].values
442
        ].values
443
444
    else:
445
        print(
446
            "No information about MaStR location available. "
447
            "All voltage levels are assigned using threshold values."
448
        )
449
450
    # If no voltage level is available from mastr, choose level according
451
    # to threshold values
452
453
    for i, row in mastr_loc[mastr_loc.voltage_level.isnull()].iterrows():
454
455
        if row.Nettonennleistung > 120:
456
            level = 1
457
        elif row.Nettonennleistung > 20:
458
            level = 3
459
        elif row.Nettonennleistung > 5.5:
460
            level = 4
461
        elif row.Nettonennleistung > 0.2:
462
            level = 5
463
        elif row.Nettonennleistung > 0.1:
464
            level = 6
465
        else:
466
            level = 7
467
468
        mastr_loc.loc[i, "voltage_level"] = level
469
470
    mastr_loc.voltage_level = mastr_loc.voltage_level.astype(int)
471
472
    return mastr_loc.voltage_level
473
474
475
def assign_bus_id(power_plants, cfg):
476
    """Assigns bus_ids to power plants according to location and voltage level
477
478
    Parameters
479
    ----------
480
    power_plants : pandas.DataFrame
481
        Power plants including voltage level
482
483
    Returns
484
    -------
485
    power_plants : pandas.DataFrame
486
        Power plants including voltage level and bus_id
487
488
    """
489
490
    mv_grid_districts = db.select_geodataframe(
491
        f"""
492
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
493
        """,
494
        epsg=4326,
495
    )
496
497
    ehv_grid_districts = db.select_geodataframe(
498
        f"""
499
        SELECT * FROM {cfg['sources']['ehv_voronoi']}
500
        """,
501
        epsg=4326,
502
    )
503
504
    # Assign power plants in hv and below to hvmv bus
505
    power_plants_hv = power_plants[power_plants.voltage_level >= 3].index
506
    if len(power_plants_hv) > 0:
507
        power_plants.loc[power_plants_hv, "bus_id"] = gpd.sjoin(
508
            power_plants[power_plants.index.isin(power_plants_hv)],
509
            mv_grid_districts,
510
        ).bus_id
511
512
    # Assign power plants in ehv to ehv bus
513
    power_plants_ehv = power_plants[power_plants.voltage_level < 3].index
514
515
    if len(power_plants_ehv) > 0:
516
        ehv_join = gpd.sjoin(
517
            power_plants[power_plants.index.isin(power_plants_ehv)],
518
            ehv_grid_districts,
519
        )
520
521
        if "bus_id_right" in ehv_join.columns:
522
            power_plants.loc[power_plants_ehv, "bus_id"] = gpd.sjoin(
523
                power_plants[power_plants.index.isin(power_plants_ehv)],
524
                ehv_grid_districts,
525
            ).bus_id_right
526
527
        else:
528
            power_plants.loc[power_plants_ehv, "bus_id"] = gpd.sjoin(
529
                power_plants[power_plants.index.isin(power_plants_ehv)],
530
                ehv_grid_districts,
531
            ).bus_id
532
533
    # Assert that all power plants have a bus_id
534
    assert power_plants.bus_id.notnull().all(), f"""Some power plants are
535
    not attached to a bus: {power_plants[power_plants.bus_id.isnull()]}"""
536
537
    return power_plants
538
539
540
def insert_hydro_biomass():
541
    """Insert hydro and biomass power plants in database
542
543
    Returns
544
    -------
545
    None.
546
547
    """
548
    cfg = egon.data.config.datasets()["power_plants"]
549
    db.execute_sql(
550
        f"""
551
        DELETE FROM {cfg['target']['schema']}.{cfg['target']['table']}
552
        WHERE carrier IN ('biomass', 'reservoir', 'run_of_river')
553
        """
554
    )
555
556
    for scenario in ["eGon2035"]:
557
        insert_biomass_plants(scenario)
558
        insert_hydro_plants(scenario)
559
560
561
def allocate_conventional_non_chp_power_plants():
562
563
    carrier = ["oil", "gas", "other_non_renewable"]
564
565
    cfg = egon.data.config.datasets()["power_plants"]
566
567
    # Delete existing CHP in the target table
568
    db.execute_sql(
569
        f"""
570
         DELETE FROM {cfg ['target']['schema']}.{cfg ['target']['table']}
571
         WHERE carrier IN ('gas', 'other_non_renewable', 'oil')
572
         AND scenario='eGon2035';
573
         """
574
    )
575
576
    for carrier in carrier:
577
578
        nep = select_nep_power_plants(carrier)
579
580
        if nep.empty:
581
            print(f"DataFrame from NEP for carrier {carrier} is empty!")
582
583
        else:
584
585
            mastr = select_no_chp_combustion_mastr(carrier)
586
587
            # Assign voltage level to MaStR
588
            mastr["voltage_level"] = assign_voltage_level(
589
                mastr.rename({"el_capacity": "Nettonennleistung"}, axis=1), cfg
590
            )
591
592
            # Initalize DataFrame for matching power plants
593
            matched = gpd.GeoDataFrame(
594
                columns=[
595
                    "carrier",
596
                    "el_capacity",
597
                    "scenario",
598
                    "geometry",
599
                    "MaStRNummer",
600
                    "source",
601
                    "voltage_level",
602
                ]
603
            )
604
605
            # Match combustion plants of a certain carrier from NEP list
606
            # using PLZ and capacity
607
            matched, mastr, nep = match_nep_no_chp(
608
                nep,
609
                mastr,
610
                matched,
611
                buffer_capacity=0.1,
612
                consider_carrier=False,
613
            )
614
615
            # Match plants from NEP list using city and capacity
616
            matched, mastr, nep = match_nep_no_chp(
617
                nep,
618
                mastr,
619
                matched,
620
                buffer_capacity=0.1,
621
                consider_carrier=False,
622
                consider_location="city",
623
            )
624
625
            # Match plants from NEP list using plz,
626
            # neglecting the capacity
627
            matched, mastr, nep = match_nep_no_chp(
628
                nep,
629
                mastr,
630
                matched,
631
                consider_location="plz",
632
                consider_carrier=False,
633
                consider_capacity=False,
634
            )
635
636
            # Match plants from NEP list using city,
637
            # neglecting the capacity
638
            matched, mastr, nep = match_nep_no_chp(
639
                nep,
640
                mastr,
641
                matched,
642
                consider_location="city",
643
                consider_carrier=False,
644
                consider_capacity=False,
645
            )
646
647
            # Match remaining plants from NEP using the federal state
648
            matched, mastr, nep = match_nep_no_chp(
649
                nep,
650
                mastr,
651
                matched,
652
                buffer_capacity=0.1,
653
                consider_location="federal_state",
654
                consider_carrier=False,
655
            )
656
657
            # Match remaining plants from NEP using the federal state
658
            matched, mastr, nep = match_nep_no_chp(
659
                nep,
660
                mastr,
661
                matched,
662
                buffer_capacity=0.7,
663
                consider_location="federal_state",
664
                consider_carrier=False,
665
            )
666
667
            print(f"{matched.el_capacity.sum()} MW of {carrier} matched")
668
            print(f"{nep.c2035_capacity.sum()} MW of {carrier} not matched")
669
670
            matched.crs = "EPSG:4326"
671
672
            # Assign bus_id
673
            # Load grid district polygons
674
            mv_grid_districts = db.select_geodataframe(
675
                f"""
676
            SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
677
            """,
678
                epsg=4326,
679
            )
680
681
            ehv_grid_districts = db.select_geodataframe(
682
                f"""
683
            SELECT * FROM {cfg['sources']['ehv_voronoi']}
684
            """,
685
                epsg=4326,
686
            )
687
688
            # Perform spatial joins for plants in ehv and hv level seperately
689
            power_plants_hv = gpd.sjoin(
690
                matched[matched.voltage_level >= 3],
691
                mv_grid_districts[["bus_id", "geom"]],
692
                how="left",
693
            ).drop(columns=["index_right"])
694
            power_plants_ehv = gpd.sjoin(
695
                matched[matched.voltage_level < 3],
696
                ehv_grid_districts[["bus_id", "geom"]],
697
                how="left",
698
            ).drop(columns=["index_right"])
699
700
            # Combine both dataframes
701
            power_plants = pd.concat([power_plants_hv, power_plants_ehv])
702
703
            # Insert into target table
704
            session = sessionmaker(bind=db.engine())()
705
            for i, row in power_plants.iterrows():
706
                entry = EgonPowerPlants(
707
                    sources={"el_capacity": row.source},
708
                    source_id={"MastrNummer": row.MaStRNummer},
709
                    carrier=row.carrier,
710
                    el_capacity=row.el_capacity,
711
                    voltage_level=row.voltage_level,
712
                    bus_id=row.bus_id,
713
                    scenario=row.scenario,
714
                    geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})",
715
                )
716
                session.add(entry)
717
            session.commit()
718