Passed
Pull Request — dev (#1170)
by
unknown
05:05
created

data.datasets.storages.allocate_pumped_hydro_sq()   D

Complexity

Conditions 10

Size

Total Lines 210
Code Lines 114

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 114
dl 0
loc 210
rs 4.1999
c 0
b 0
f 0
cc 10
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.datasets.storages.allocate_pumped_hydro_sq() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""The central module containing all code dealing with power plant data.
2
"""
3
4
from pathlib import Path
5
6
from geoalchemy2 import Geometry
7
from sqlalchemy import BigInteger, Column, Float, Integer, Sequence, String
8
from sqlalchemy.dialects.postgresql import JSONB
9
from sqlalchemy.ext.declarative import declarative_base
10
from sqlalchemy.orm import sessionmaker
11
import geopandas as gpd
12
import pandas as pd
13
14
from egon.data import config, db
15
from egon.data.datasets import Dataset
16
from egon.data.datasets.mastr import (
17
    WORKING_DIR_MASTR_NEW,
18
    WORKING_DIR_MASTR_OLD,
19
)
20
from egon.data.datasets.mv_grid_districts import Vg250GemClean
21
from egon.data.datasets.power_plants import assign_bus_id, assign_voltage_level
22
from egon.data.datasets.storages.home_batteries import (
23
    allocate_home_batteries_to_buildings,
24
)
25
from egon.data.datasets.storages.pumped_hydro import (
26
    apply_voltage_level_thresholds,
27
    get_location,
28
    match_storage_units,
29
    select_mastr_pumped_hydro,
30
    select_nep_pumped_hydro,
31
)
32
from egon.data.db import session_scope
33
34
Base = declarative_base()
35
36
37 View Code Duplication
class EgonStorages(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
38
    __tablename__ = "egon_storages"
39
    __table_args__ = {"schema": "supply"}
40
    id = Column(BigInteger, Sequence("storage_seq"), primary_key=True)
41
    sources = Column(JSONB)
42
    source_id = Column(JSONB)
43
    carrier = Column(String)
44
    el_capacity = Column(Float)
45
    bus_id = Column(Integer)
46
    voltage_level = Column(Integer)
47
    scenario = Column(String)
48
    geom = Column(Geometry("POINT", 4326))
49
50
51
class Storages(Dataset):
52
    def __init__(self, dependencies):
53
        super().__init__(
54
            name="Storages",
55
            version="0.0.8",
56
            dependencies=dependencies,
57
            tasks=(
58
                create_tables,
59
                allocate_pumped_hydro_scn,
60
                allocate_pv_home_batteries_to_grids,
61
                # allocate_home_batteries_to_buildings,
62
            ),
63
        )
64
65
66
def create_tables():
67
    """Create tables for power plant data
68
    Returns
69
    -------
70
    None.
71
    """
72
73
    cfg = config.datasets()["storages"]
74
    db.execute_sql(f"CREATE SCHEMA IF NOT EXISTS {cfg['target']['schema']};")
75
    engine = db.engine()
76
    db.execute_sql(
77
        f"""DROP TABLE IF EXISTS
78
        {cfg['target']['schema']}.{cfg['target']['table']}"""
79
    )
80
81
    db.execute_sql("""DROP SEQUENCE IF EXISTS pp_seq""")
82
    EgonStorages.__table__.create(bind=engine, checkfirst=True)
83
84
85
def allocate_pumped_hydro(scn, export=True):
86
    """Allocates pumped_hydro plants for eGon2035 and scenario2019 scenarios
87
    and either exports results to data base or returns as a dataframe
88
89
    Parameters
90
    ----------
91
    export : bool
92
        Choose if allocated pumped hydro plants should be exported to the data
93
        base. The default is True.
94
        If export=False a data frame will be returned
95
96
    Returns
97
    -------
98
    power_plants : pandas.DataFrame
99
        List of pumped hydro plants in 'eGon2035' and 'scenario2019' scenarios
100
    """
101
102
    carrier = "pumped_hydro"
103
104
    cfg = config.datasets()["power_plants"]
105
106
    nep = select_nep_pumped_hydro(scn=scn)
107
    mastr = select_mastr_pumped_hydro()
108
109
    # Assign voltage level to MaStR
110
    mastr["voltage_level"] = assign_voltage_level(
111
        mastr.rename({"el_capacity": "Nettonennleistung"}, axis=1),
112
        cfg,
113
        WORKING_DIR_MASTR_OLD,
114
    )
115
116
    # Initalize DataFrame for matching power plants
117
    matched = gpd.GeoDataFrame(
118
        columns=[
119
            "carrier",
120
            "el_capacity",
121
            "scenario",
122
            "geometry",
123
            "MaStRNummer",
124
            "source",
125
            "voltage_level",
126
        ]
127
    )
128
129
    # Match pumped_hydro units from NEP list
130
    # using PLZ and capacity
131
    matched, mastr, nep = match_storage_units(
132
        nep,
133
        mastr,
134
        matched,
135
        buffer_capacity=0.1,
136
        consider_carrier=False,
137
        scn=scn,
138
    )
139
140
    # Match plants from NEP list using plz,
141
    # neglecting the capacity
142
    matched, mastr, nep = match_storage_units(
143
        nep,
144
        mastr,
145
        matched,
146
        consider_location="plz",
147
        consider_carrier=False,
148
        consider_capacity=False,
149
        scn=scn,
150
    )
151
152
    # Match plants from NEP list using city,
153
    # neglecting the capacity
154
    matched, mastr, nep = match_storage_units(
155
        nep,
156
        mastr,
157
        matched,
158
        consider_location="city",
159
        consider_carrier=False,
160
        consider_capacity=False,
161
        scn=scn,
162
    )
163
164
    # Match remaining plants from NEP using the federal state
165
    matched, mastr, nep = match_storage_units(
166
        nep,
167
        mastr,
168
        matched,
169
        buffer_capacity=0.1,
170
        consider_location="federal_state",
171
        consider_carrier=False,
172
        scn=scn,
173
    )
174
175
    # Match remaining plants from NEP using the federal state
176
    matched, mastr, nep = match_storage_units(
177
        nep,
178
        mastr,
179
        matched,
180
        buffer_capacity=0.7,
181
        consider_location="federal_state",
182
        consider_carrier=False,
183
        scn=scn,
184
    )
185
186
    print(f"{matched.el_capacity.sum()} MW of {carrier} matched")
187
    print(f"{nep.elec_capacity.sum()} MW of {carrier} not matched")
188
189
    if nep.elec_capacity.sum() > 0:
190
        # Get location using geolocator and city information
191
        located, unmatched = get_location(nep)
192
193
        # Bring both dataframes together
194
        matched = pd.concat(
195
            [
196
                matched,
197
                located[
198
                    [
199
                        "carrier",
200
                        "el_capacity",
201
                        "scenario",
202
                        "geometry",
203
                        "source",
204
                        "MaStRNummer",
205
                    ]
206
                ],
207
            ],
208
            ignore_index=True,
209
        )
210
211
    # Set CRS
212
    matched.crs = "EPSG:4326"
213
214
    # Assign voltage level
215
    matched = apply_voltage_level_thresholds(matched)
216
217
    # Assign bus_id
218
    # Load grid district polygons
219
    mv_grid_districts = db.select_geodataframe(
220
        f"""
221
    SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
222
    """,
223
        epsg=4326,
224
    )
225
226
    ehv_grid_districts = db.select_geodataframe(
227
        f"""
228
    SELECT * FROM {cfg['sources']['ehv_voronoi']}
229
    """,
230
        epsg=4326,
231
    )
232
233
    # Perform spatial joins for plants in ehv and hv level seperately
234
    power_plants_hv = gpd.sjoin(
235
        matched[matched.voltage_level >= 3],
236
        mv_grid_districts[["bus_id", "geom"]],
237
        how="left",
238
    ).drop(columns=["index_right"])
239
    power_plants_ehv = gpd.sjoin(
240
        matched[matched.voltage_level < 3],
241
        ehv_grid_districts[["bus_id", "geom"]],
242
        how="left",
243
    ).drop(columns=["index_right"])
244
245
    # Combine both dataframes
246
    power_plants = pd.concat([power_plants_hv, power_plants_ehv])
247
248
    # Delete existing units in the target table
249
    db.execute_sql(
250
        f""" DELETE FROM {cfg ['sources']['storages']}
251
        WHERE carrier IN ('pumped_hydro')
252
        AND scenario='{scn}';"""
253
    )
254
255
    # If export = True export pumped_hydro plants to data base
256
257
    if export:
258
        # Insert into target table
259
        session = sessionmaker(bind=db.engine())()
260
        with session_scope() as session:
261
            for i, row in power_plants.iterrows():
262
                entry = EgonStorages(
263
                    sources={"el_capacity": row.source},
264
                    source_id={"MastrNummer": row.MaStRNummer},
265
                    carrier=row.carrier,
266
                    el_capacity=row.el_capacity,
267
                    voltage_level=row.voltage_level,
268
                    bus_id=row.bus_id,
269
                    scenario=row.scenario,
270
                    geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})",
271
                )
272
                session.add(entry)
273
            session.commit()
274
275
    else:
276
        return power_plants
277
278
279
def allocate_pumped_hydro_sq(scn_name):
280
    """
281
    Allocate pumped hydro by mastr data only. Capacities outside
282
    germany are assigned to foreign buses. Mastr dump 2024 is used.
283
    No filter for commissioning is applied.
284
    Parameters
285
    ----------
286
    scn_name
287
288
    Returns
289
    -------
290
291
    """
292
    sources = config.datasets()["power_plants"]["sources"]
293
294
    # Read-in data from MaStR
295
    mastr_ph = pd.read_csv(
296
        WORKING_DIR_MASTR_NEW / sources["mastr_storage"],
297
        delimiter=",",
298
        usecols=[
299
            "Nettonennleistung",
300
            "EinheitMastrNummer",
301
            "Kraftwerksnummer",
302
            "Technologie",
303
            "Postleitzahl",
304
            "Laengengrad",
305
            "Breitengrad",
306
            "EinheitBetriebsstatus",
307
            "LokationMastrNummer",
308
            "Ort",
309
            "Bundesland",
310
        ],
311
        dtype={"Postleitzahl": str},
312
    )
313
314
    if config.settings()["egon-data"]["--dataset-boundary"] == "Schleswig-Holstein":
315
        # Filter for Schleswig-Holstein
316
        mastr_ph = mastr_ph.loc[mastr_ph.Bundesland == "SchleswigHolstein"]
317
318
    # Rename columns
319
    mastr_ph = mastr_ph.rename(
320
        columns={
321
            "Kraftwerksnummer": "bnetza_id",
322
            "Technologie": "carrier",
323
            "Postleitzahl": "plz",
324
            "Ort": "city",
325
            "Bundesland": "federal_state",
326
            "Nettonennleistung": "el_capacity",
327
        }
328
    )
329
330
    # Select only pumped hydro units
331
    mastr_ph = mastr_ph.loc[mastr_ph.carrier == "Pumpspeicher"]
332
333
    # Select only pumped hydro units which are in operation
334
    mastr_ph = mastr_ph.loc[mastr_ph.EinheitBetriebsstatus == "InBetrieb"]
335
336
    # Calculate power in MW
337
    mastr_ph.loc[:, "el_capacity"] *= 1e-3
338
339
    # Create geodataframe from long, lat
340
    mastr_ph = gpd.GeoDataFrame(
341
        mastr_ph,
342
        geometry=gpd.points_from_xy(
343
            mastr_ph["Laengengrad"], mastr_ph["Breitengrad"]
344
        ),
345
        crs="4326",
346
    )
347
348
    # Identify pp without geocord
349
    mastr_ph_nogeo = mastr_ph.loc[mastr_ph["Laengengrad"].isna()]
350
351
    # Remove all PP without geocord (PP<= 30kW)
352
    mastr_ph = mastr_ph.dropna(subset="Laengengrad")
353
354
    # Get geometry of villages/cities with same name of pp with missing geocord
355
    with session_scope() as session:
356
        query = session.query(
357
            Vg250GemClean.gen, Vg250GemClean.geometry
358
        ).filter(Vg250GemClean.gen.in_(mastr_ph_nogeo.loc[:, "city"].unique()))
359
        df_cities = gpd.read_postgis(
360
            query.statement,
361
            query.session.bind,
362
            geom_col="geometry",
363
            crs="4326",
364
        )
365
366
    # Just take the first entry, inaccuracy is negligible as centroid is taken afterwards
367
    df_cities = df_cities.drop_duplicates("gen", keep="first")
368
369
    # Use the centroid instead of polygon of region
370
    df_cities.loc[:, "geometry"] = df_cities["geometry"].centroid
371
372
    # Add centroid geometry to pp without geometry
373
    mastr_ph_nogeo = pd.merge(
374
        left=df_cities,
375
        right=mastr_ph_nogeo,
376
        right_on="city",
377
        left_on="gen",
378
        how="inner",
379
    ).drop("gen", axis=1)
380
381
    mastr_ph = pd.concat([mastr_ph, mastr_ph_nogeo], axis=0)
382
383
    # aggregate capacity per location
384
    agg_cap = mastr_ph.groupby("geometry")["el_capacity"].sum()
385
386
    # list mastr number by location
387
    agg_mastr = mastr_ph.groupby("geometry")["EinheitMastrNummer"].apply(list)
388
389
    # remove duplicates by location and keep only first
390
    mastr_ph = mastr_ph.drop_duplicates(subset="geometry", keep="first").drop(
391
        ["el_capacity", "EinheitMastrNummer"], axis=1
392
    )
393
394
    # Add aggregated capacity by location
395
    mastr_ph = pd.merge(
396
        left=mastr_ph, right=agg_cap, left_on="geometry", right_on="geometry"
397
    )
398
399
    # Add list of mastr nr by location
400
    mastr_ph = pd.merge(
401
        left=mastr_ph, right=agg_mastr, left_on="geometry", right_on="geometry"
402
    )
403
404
    # Drop small pp <= 03 kW
405
    mastr_ph = mastr_ph.loc[mastr_ph["el_capacity"] > 30]
406
407
    # Apply voltage level by capacity
408
    mastr_ph = apply_voltage_level_thresholds(mastr_ph)
409
    mastr_ph["voltage_level"] = mastr_ph["voltage_level"].astype(int)
410
411
    # Capacity located outside germany -> will be assigned to foreign buses
412
    mastr_ph_foreign = mastr_ph.loc[mastr_ph["federal_state"].isna()]
413
414
    if not mastr_ph_foreign.empty:
415
        # Get foreign buses
416
        sql = f"""
417
        SELECT * FROM grid.egon_etrago_bus
418
        WHERE scn_name = '{scn_name}'
419
        and country != 'DE'
420
        """
421
        df_foreign_buses = db.select_geodataframe(
422
            sql, geom_col="geom", epsg="4326"
423
        )
424
425
        # Assign closest foreign bus at voltage level to foreign pp
426
        nearest_neighbors = []
427
        for vl, v_nom in {1: 380, 2: 220, 3: 110}.items():
428
            ph = mastr_ph_foreign.loc[mastr_ph_foreign["voltage_level"] == vl]
429
            if ph.empty:
430
                continue
431
            bus = df_foreign_buses.loc[
432
                df_foreign_buses["v_nom"] == v_nom,
433
                ["v_nom", "country", "bus_id", "geom"],
434
            ]
435
            results = gpd.sjoin_nearest(
436
                left_df=ph, right_df=bus, how="left", distance_col="distance"
437
            )
438
            nearest_neighbors.append(results)
439
        mastr_ph_foreign = pd.concat(nearest_neighbors)
440
441
    # Keep only capacities within germany
442
    mastr_ph = mastr_ph.dropna(subset="federal_state")
443
444
    # Assign buses within germany
445
    mastr_ph = assign_bus_id(mastr_ph, cfg=config.datasets()["power_plants"])
446
    mastr_ph["bus_id"] = mastr_ph["bus_id"].astype(int)
447
448
    if not mastr_ph_foreign.empty:
449
        # Merge foreign pp
450
        mastr_ph = pd.concat([mastr_ph, mastr_ph_foreign])
451
452
    # Reduce to necessary columns
453
    mastr_ph = mastr_ph[
454
        [
455
            "el_capacity",
456
            "voltage_level",
457
            "bus_id",
458
            "geometry",
459
            "EinheitMastrNummer",
460
        ]
461
    ]
462
463
    # Rename and format columns
464
    mastr_ph["carrier"] = "pumped_hydro"
465
    mastr_ph = mastr_ph.rename(
466
        columns={"EinheitMastrNummer": "source_id", "geometry": "geom"}
467
    )
468
    mastr_ph["source_id"] = mastr_ph["source_id"].apply(
469
        lambda x: {"MastrNummer": ", ".join(x)}
470
    )
471
    mastr_ph = mastr_ph.set_geometry("geom")
472
    mastr_ph["geom"] = mastr_ph["geom"].apply(lambda x: x.wkb_hex)
473
    mastr_ph["scenario"] = "status2023"
474
    mastr_ph["sources"] = [
475
        {"el_capacity": "MaStR aggregated by location"}
476
    ] * mastr_ph.shape[0]
477
478
    # Delete existing units in the target table
479
    db.execute_sql(
480
        f""" DELETE FROM supply.egon_storages
481
        WHERE carrier = 'pumped_hydro'
482
        AND scenario= '{scn_name}';"""
483
    )
484
485
    with db.session_scope() as session:
486
        session.bulk_insert_mappings(
487
            EgonStorages,
488
            mastr_ph.to_dict(orient="records"),
489
        )
490
491
492
def allocate_pumped_hydro_eGon100RE():
493
    """Allocates pumped_hydro plants for eGon100RE scenario based on a
494
    prox-to-now method applied on allocated pumped-hydro plants in the eGon2035
495
    scenario.
496
497
    Parameters
498
    ----------
499
    None
500
501
    Returns
502
    -------
503
    None
504
    """
505
506
    carrier = "pumped_hydro"
507
    cfg = config.datasets()["power_plants"]
508
    boundary = config.settings()["egon-data"]["--dataset-boundary"]
509
510
    # Select installed capacity for pumped_hydro in eGon100RE scenario from
511
    # scenario capacities table
512
    capacity = db.select_dataframe(
513
        f"""
514
        SELECT capacity
515
        FROM {cfg['sources']['capacities']}
516
        WHERE carrier = '{carrier}'
517
        AND scenario_name = 'eGon100RE';
518
        """
519
    )
520
521
    if boundary == "Schleswig-Holstein":
522
        # Break capacity of pumped hydron plants down SH share in eGon2035
523
        capacity_phes = capacity.iat[0, 0] * 0.0176
524
525
    elif boundary == "Everything":
526
        # Select national capacity for pumped hydro
527
        capacity_phes = capacity.iat[0, 0]
528
529
    else:
530
        raise ValueError(f"'{boundary}' is not a valid dataset boundary.")
531
532
    # Get allocation of pumped_hydro plants in eGon2035 scenario as the
533
    # reference for the distribution in eGon100RE scenario
534
    allocation = allocate_pumped_hydro(scn="status2019", export=False)
535
    # TODO status2023 leave same as status2019
536
537
    scaling_factor = capacity_phes / allocation.el_capacity.sum()
538
539
    power_plants = allocation.copy()
540
    power_plants["scenario"] = "eGon100RE"
541
    power_plants["el_capacity"] = allocation.el_capacity * scaling_factor
542
543
    # Insert into target table
544
    session = sessionmaker(bind=db.engine())()
545
    for i, row in power_plants.iterrows():
546
        entry = EgonStorages(
547
            sources={"el_capacity": row.source},
548
            source_id={"MastrNummer": row.MaStRNummer},
549
            carrier=row.carrier,
550
            el_capacity=row.el_capacity,
551
            voltage_level=row.voltage_level,
552
            bus_id=row.bus_id,
553
            scenario=row.scenario,
554
            geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})",
555
        )
556
        session.add(entry)
557
    session.commit()
558
559
560
def home_batteries_per_scenario(scenario):
561
    """Allocates home batteries which define a lower boundary for extendable
562
    battery storage units. The overall installed capacity is taken from NEP
563
    for eGon2035 scenario. The spatial distribution of installed battery
564
    capacities is based on the installed pv rooftop capacity.
565
566
    Parameters
567
    ----------
568
    None
569
570
    Returns
571
    -------
572
    None
573
    """
574
575
    cfg = config.datasets()["storages"]
576
    dataset = config.settings()["egon-data"]["--dataset-boundary"]
577
578
    if scenario == "eGon2035":
579
        target_file = (
580
            Path(".")
581
            / "data_bundle_egon_data"
582
            / "nep2035_version2021"
583
            / cfg["sources"]["nep_capacities"]
584
        )
585
586
        capacities_nep = pd.read_excel(
587
            target_file,
588
            sheet_name="1.Entwurf_NEP2035_V2021",
589
            index_col="Unnamed: 0",
590
        )
591
592
        # Select target value in MW
593
        target = capacities_nep.Summe["PV-Batteriespeicher"] * 1000
594
595
    else:
596
        target = db.select_dataframe(
597
            f"""
598
            SELECT capacity
599
            FROM {cfg['sources']['capacities']}
600
            WHERE scenario_name = '{scenario}'
601
            AND carrier = 'battery';
602
            """
603
        ).capacity[0]
604
605
    pv_rooftop = db.select_dataframe(
606
        f"""
607
        SELECT bus, p_nom, generator_id
608
        FROM {cfg['sources']['generators']}
609
        WHERE scn_name = '{scenario}'
610
        AND carrier = 'solar_rooftop'
611
        AND bus IN
612
            (SELECT bus_id FROM {cfg['sources']['bus']}
613
               WHERE scn_name = '{scenario}' AND country = 'DE' );
614
        """
615
    )
616
617
    if dataset == "Schleswig-Holstein":
618
        target = target / 16
619
620
    battery = pv_rooftop
621
    battery["p_nom_min"] = target * battery["p_nom"] / battery["p_nom"].sum()
622
    battery = battery.drop(columns=["p_nom"])
623
624
    battery["carrier"] = "home_battery"
625
    battery["scenario"] = scenario
626
627
    if (scenario == "eGon2035") | ("status" in scenario):
628
        source = "NEP"
629
630
    else:
631
        source = "p-e-s"
632
633
    battery[
634
        "source"
635
    ] = f"{source} capacity allocated based in installed PV rooftop capacity"
636
637
    # Insert into target table
638
    session = sessionmaker(bind=db.engine())()
639
    for i, row in battery.iterrows():
640
        entry = EgonStorages(
641
            sources={"el_capacity": row.source},
642
            source_id={"generator_id": row.generator_id},
643
            carrier=row.carrier,
644
            el_capacity=row.p_nom_min,
645
            bus_id=row.bus,
646
            scenario=row.scenario,
647
        )
648
        session.add(entry)
649
    session.commit()
650
651
652
def allocate_pv_home_batteries_to_grids():
653
    for scn in config.settings()["egon-data"]["--scenarios"]:
654
        home_batteries_per_scenario(scn)
655
656
657
def allocate_pumped_hydro_scn():
658
    for scn in config.settings()["egon-data"]["--scenarios"]:
659
        if scn == "eGon2035":
660
            allocate_pumped_hydro(scn="eGon2035")
661
        elif scn == "eGon100RE":
662
            allocate_pumped_hydro_eGon100RE()
663
        elif "status" in scn:
664
            allocate_pumped_hydro_sq(scn_name=scn)
665