Passed
Pull Request — dev (#905)
by
unknown
02:03
created

export_to_db()   A

Complexity

Conditions 3

Size

Total Lines 57
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 31
dl 0
loc 57
rs 9.1359
c 0
b 0
f 0
cc 3
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""The central module containing all code dealing with
2
individual heat supply.
3
4
"""
5
from pathlib import Path
6
import os
7
import random
8
import time
9
10
from airflow.operators.python_operator import PythonOperator
11
from psycopg2.extensions import AsIs, register_adapter
12
from sqlalchemy import ARRAY, REAL, Column, Integer, String
13
from sqlalchemy.ext.declarative import declarative_base
14
import geopandas as gpd
15
import numpy as np
16
import pandas as pd
17
import saio
18
19
from egon.data import config, db, logger
20
from egon.data.datasets import Dataset
21
from egon.data.datasets.district_heating_areas import (
22
    MapZensusDistrictHeatingAreas,
23
)
24
from egon.data.datasets.electricity_demand_timeseries.cts_buildings import (
25
    calc_cts_building_profiles,
26
)
27
from egon.data.datasets.electricity_demand_timeseries.mapping import (
28
    EgonMapZensusMvgdBuildings,
29
)
30
from egon.data.datasets.electricity_demand_timeseries.tools import (
31
    write_table_to_postgres,
32
)
33
from egon.data.datasets.heat_demand import EgonPetaHeat
34
from egon.data.datasets.heat_demand_timeseries.daily import (
35
    EgonDailyHeatDemandPerClimateZone,
36
    EgonMapZensusClimateZones,
37
)
38
from egon.data.datasets.heat_demand_timeseries.idp_pool import (
39
    EgonHeatTimeseries,
40
)
41
42
# get zensus cells with district heating
43
from egon.data.datasets.zensus_mv_grid_districts import MapZensusGridDistricts
44
45
engine = db.engine()
46
Base = declarative_base()
47
48
49
class EgonEtragoTimeseriesIndividualHeating(Base):
50
    __tablename__ = "egon_etrago_timeseries_individual_heating"
51
    __table_args__ = {"schema": "demand"}
52
    bus_id = Column(Integer, primary_key=True)
53
    scenario = Column(String, primary_key=True)
54
    carrier = Column(String, primary_key=True)
55
    dist_aggregated_mw = Column(ARRAY(REAL))
56
57
58
class EgonHpCapacityBuildings(Base):
59
    __tablename__ = "egon_hp_capacity_buildings"
60
    __table_args__ = {"schema": "demand"}
61
    building_id = Column(Integer, primary_key=True)
62
    scenario = Column(String, primary_key=True)
63
    hp_capacity = Column(REAL)
64
65
66
class HeatPumpsPypsaEurSec(Dataset):
67
    def __init__(self, dependencies):
68
        def dyn_parallel_tasks_pypsa_eur_sec():
69
            """Dynamically generate tasks
70
            The goal is to speed up tasks by parallelising bulks of mvgds.
71
72
            The number of parallel tasks is defined via parameter
73
            `parallel_tasks` in the dataset config `datasets.yml`.
74
75
            Returns
76
            -------
77
            set of airflow.PythonOperators
78
                The tasks. Each element is of
79
                :func:`egon.data.datasets.heat_supply.individual_heating.
80
                determine_hp_cap_peak_load_mvgd_ts_pypsa_eur_sec`
81
            """
82
            parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get(
83
                "parallel_tasks", 1
84
            )
85
86
            tasks = set()
87
            for i in range(parallel_tasks):
88
                tasks.add(
89
                    PythonOperator(
90
                        task_id=(
91
                            f"individual_heating."
92
                            f"determine-hp-capacity-pypsa-eur-sec-"
93
                            f"mvgd-bulk{i}"
94
                        ),
95
                        python_callable=split_mvgds_into_bulks,
96
                        op_kwargs={
97
                            "n": i,
98
                            "max_n": parallel_tasks,
99
                            "func": determine_hp_cap_peak_load_mvgd_ts_pypsa_eur_sec,  # noqa: E501
100
                        },
101
                    )
102
                )
103
            return tasks
104
105
        super().__init__(
106
            name="HeatPumpsPypsaEurSec",
107
            version="0.0.2",
108
            dependencies=dependencies,
109
            tasks=({*dyn_parallel_tasks_pypsa_eur_sec()},),
110
        )
111
112
113
class HeatPumps2035(Dataset):
114
    def __init__(self, dependencies):
115
        def dyn_parallel_tasks_2035():
116
            """Dynamically generate tasks
117
118
            The goal is to speed up tasks by parallelising bulks of mvgds.
119
120
            The number of parallel tasks is defined via parameter
121
            `parallel_tasks` in the dataset config `datasets.yml`.
122
123
            Returns
124
            -------
125
            set of airflow.PythonOperators
126
                The tasks. Each element is of
127
                :func:`egon.data.datasets.heat_supply.individual_heating.
128
                determine_hp_cap_peak_load_mvgd_ts_2035`
129
            """
130
            parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get(
131
                "parallel_tasks", 1
132
            )
133
            tasks = set()
134
            for i in range(parallel_tasks):
135
                tasks.add(
136
                    PythonOperator(
137
                        task_id=(
138
                            "individual_heating."
139
                            f"determine-hp-capacity-2035-"
140
                            f"mvgd-bulk{i}"
141
                        ),
142
                        python_callable=split_mvgds_into_bulks,
143
                        op_kwargs={
144
                            "n": i,
145
                            "max_n": parallel_tasks,
146
                            "func": determine_hp_cap_peak_load_mvgd_ts_2035,
147
                        },
148
                    )
149
                )
150
            return tasks
151
152
        super().__init__(
153
            name="HeatPumps2035",
154
            version="0.0.2",
155
            dependencies=dependencies,
156
            tasks=(
157
                delete_heat_peak_loads_eGon2035,
158
                delete_hp_capacity_2035,
159
                delete_mvgd_ts_2035,
160
                {*dyn_parallel_tasks_2035()},
161
            ),
162
        )
163
164
165
class HeatPumps2050(Dataset):
166
    def __init__(self, dependencies):
167
        super().__init__(
168
            name="HeatPumps2050",
169
            version="0.0.2",
170
            dependencies=dependencies,
171
            tasks=(
172
                delete_hp_capacity_100RE,
173
                determine_hp_cap_buildings_eGon100RE,
174
            ),
175
        )
176
177
178
class BuildingHeatPeakLoads(Base):
179
    __tablename__ = "egon_building_heat_peak_loads"
180
    __table_args__ = {"schema": "demand"}
181
182
    building_id = Column(Integer, primary_key=True)
183
    scenario = Column(String, primary_key=True)
184
    sector = Column(String, primary_key=True)
185
    peak_load_in_w = Column(REAL)
186
187
188
def adapt_numpy_float64(numpy_float64):
189
    return AsIs(numpy_float64)
190
191
192
def adapt_numpy_int64(numpy_int64):
193
    return AsIs(numpy_int64)
194
195
196
def timeit(func):
197
    """
198
    Decorator for measuring function's running time.
199
    """
200
201
    def measure_time(*args, **kw):
202
        start_time = time.time()
203
        result = func(*args, **kw)
204
        print(
205
            "Processing time of %s(): %.2f seconds."
206
            % (func.__qualname__, time.time() - start_time)
207
        )
208
        return result
209
210
    return measure_time
211
212
213
def cascade_per_technology(
214
    heat_per_mv,
215
    technologies,
216
    scenario,
217
    distribution_level,
218
    max_size_individual_chp=0.05,
219
):
220
221
    """Add plants for individual heat.
222
    Currently only on mv grid district level.
223
224
    Parameters
225
    ----------
226
    mv_grid_districts : geopandas.geodataframe.GeoDataFrame
227
        MV grid districts including the heat demand
228
    technologies : pandas.DataFrame
229
        List of supply technologies and their parameters
230
    scenario : str
231
        Name of the scenario
232
    max_size_individual_chp : float
233
        Maximum capacity of an individual chp in MW
234
    Returns
235
    -------
236
    mv_grid_districts : geopandas.geodataframe.GeoDataFrame
237
        MV grid district which need additional individual heat supply
238
    technologies : pandas.DataFrame
239
        List of supply technologies and their parameters
240
    append_df : pandas.DataFrame
241
        List of plants per mv grid for the selected technology
242
243
    """
244
    sources = config.datasets()["heat_supply"]["sources"]
245
246
    tech = technologies[technologies.priority == technologies.priority.max()]
247
248
    # Distribute heat pumps linear to remaining demand.
249
    if tech.index == "heat_pump":
250
251
        if distribution_level == "federal_state":
252
            # Select target values per federal state
253
            target = db.select_dataframe(
254
                f"""
255
                    SELECT DISTINCT ON (gen) gen as state, capacity
256
                    FROM {sources['scenario_capacities']['schema']}.
257
                    {sources['scenario_capacities']['table']} a
258
                    JOIN {sources['federal_states']['schema']}.
259
                    {sources['federal_states']['table']} b
260
                    ON a.nuts = b.nuts
261
                    WHERE scenario_name = '{scenario}'
262
                    AND carrier = 'residential_rural_heat_pump'
263
                    """,
264
                index_col="state",
265
            )
266
267
            heat_per_mv["share"] = heat_per_mv.groupby(
268
                "state"
269
            ).remaining_demand.apply(lambda grp: grp / grp.sum())
270
271
            append_df = (
272
                heat_per_mv["share"]
273
                .mul(target.capacity[heat_per_mv["state"]].values)
274
                .reset_index()
275
            )
276
        else:
277
            # Select target value for Germany
278
            target = db.select_dataframe(
279
                f"""
280
                    SELECT SUM(capacity) AS capacity
281
                    FROM {sources['scenario_capacities']['schema']}.
282
                    {sources['scenario_capacities']['table']} a
283
                    WHERE scenario_name = '{scenario}'
284
                    AND carrier = 'residential_rural_heat_pump'
285
                    """
286
            )
287
288
            heat_per_mv["share"] = (
289
                heat_per_mv.remaining_demand
290
                / heat_per_mv.remaining_demand.sum()
291
            )
292
293
            append_df = (
294
                heat_per_mv["share"].mul(target.capacity[0]).reset_index()
295
            )
296
297
        append_df.rename(
298
            {"bus_id": "mv_grid_id", "share": "capacity"}, axis=1, inplace=True
299
        )
300
301
    elif tech.index == "gas_boiler":
302
303
        append_df = pd.DataFrame(
304
            data={
305
                "capacity": heat_per_mv.remaining_demand.div(
306
                    tech.estimated_flh.values[0]
307
                ),
308
                "carrier": "residential_rural_gas_boiler",
309
                "mv_grid_id": heat_per_mv.index,
310
                "scenario": scenario,
311
            }
312
        )
313
314
    if append_df.size > 0:
0 ignored issues
show
introduced by
The variable append_df does not seem to be defined for all execution paths.
Loading history...
315
        append_df["carrier"] = tech.index[0]
316
        heat_per_mv.loc[
317
            append_df.mv_grid_id, "remaining_demand"
318
        ] -= append_df.set_index("mv_grid_id").capacity.mul(
319
            tech.estimated_flh.values[0]
320
        )
321
322
    heat_per_mv = heat_per_mv[heat_per_mv.remaining_demand >= 0]
323
324
    technologies = technologies.drop(tech.index)
325
326
    return heat_per_mv, technologies, append_df
327
328
329
def cascade_heat_supply_indiv(scenario, distribution_level, plotting=True):
330
    """Assigns supply strategy for individual heating in four steps.
331
332
    1.) all small scale CHP are connected.
333
    2.) If the supply can not  meet the heat demand, solar thermal collectors
334
        are attached. This is not implemented yet, since individual
335
        solar thermal plants are not considered in eGon2035 scenario.
336
    3.) If this is not suitable, the mv grid is also supplied by heat pumps.
337
    4.) The last option are individual gas boilers.
338
339
    Parameters
340
    ----------
341
    scenario : str
342
        Name of scenario
343
    plotting : bool, optional
344
        Choose if individual heating supply is plotted. The default is True.
345
346
    Returns
347
    -------
348
    resulting_capacities : pandas.DataFrame
349
        List of plants per mv grid
350
351
    """
352
353
    sources = config.datasets()["heat_supply"]["sources"]
354
355
    # Select residential heat demand per mv grid district and federal state
356
    heat_per_mv = db.select_geodataframe(
357
        f"""
358
        SELECT d.bus_id as bus_id, SUM(demand) as demand,
359
        c.vg250_lan as state, d.geom
360
        FROM {sources['heat_demand']['schema']}.
361
        {sources['heat_demand']['table']} a
362
        JOIN {sources['map_zensus_grid']['schema']}.
363
        {sources['map_zensus_grid']['table']} b
364
        ON a.zensus_population_id = b.zensus_population_id
365
        JOIN {sources['map_vg250_grid']['schema']}.
366
        {sources['map_vg250_grid']['table']} c
367
        ON b.bus_id = c.bus_id
368
        JOIN {sources['mv_grids']['schema']}.
369
        {sources['mv_grids']['table']} d
370
        ON d.bus_id = c.bus_id
371
        WHERE scenario = '{scenario}'
372
        AND a.zensus_population_id NOT IN (
373
            SELECT zensus_population_id
374
            FROM {sources['map_dh']['schema']}.{sources['map_dh']['table']}
375
            WHERE scenario = '{scenario}')
376
        GROUP BY d.bus_id, vg250_lan, geom
377
        """,
378
        index_col="bus_id",
379
    )
380
381
    # Store geometry of mv grid
382
    geom_mv = heat_per_mv.geom.centroid.copy()
383
384
    # Initalize Dataframe for results
385
    resulting_capacities = pd.DataFrame(
386
        columns=["mv_grid_id", "carrier", "capacity"]
387
    )
388
389
    # Set technology data according to
390
    # http://www.wbzu.de/seminare/infopool/infopool-bhkw
391
    # TODO: Add gas boilers and solar themal (eGon100RE)
392
    technologies = pd.DataFrame(
393
        index=["heat_pump", "gas_boiler"],
394
        columns=["estimated_flh", "priority"],
395
        data={"estimated_flh": [4000, 8000], "priority": [2, 1]},
396
    )
397
398
    # In the beginning, the remaining demand equals demand
399
    heat_per_mv["remaining_demand"] = heat_per_mv["demand"]
400
401
    # Connect new technologies, if there is still heat demand left
402
    while (len(technologies) > 0) and (len(heat_per_mv) > 0):
403
        # Attach new supply technology
404
        heat_per_mv, technologies, append_df = cascade_per_technology(
405
            heat_per_mv, technologies, scenario, distribution_level
406
        )
407
        # Collect resulting capacities
408
        resulting_capacities = resulting_capacities.append(
409
            append_df, ignore_index=True
410
        )
411
412
    if plotting:
413
        plot_heat_supply(resulting_capacities)
414
415
    return gpd.GeoDataFrame(
416
        resulting_capacities,
417
        geometry=geom_mv[resulting_capacities.mv_grid_id].values,
418
    )
419
420
421
def get_peta_demand(mvgd, scenario):
422
    """
423
    Retrieve annual peta heat demand for residential buildings for either
424
    eGon2035 or eGon100RE scenario.
425
426
    Parameters
427
    ----------
428
    mvgd : int
429
        MV grid ID.
430
    scenario : str
431
        Possible options are eGon2035 or eGon100RE
432
433
    Returns
434
    -------
435
    df_peta_demand : pd.DataFrame
436
        Annual residential heat demand per building and scenario. Columns of
437
        the dataframe are zensus_population_id and demand.
438
439
    """
440
441
    with db.session_scope() as session:
442
        query = (
443
            session.query(
444
                MapZensusGridDistricts.zensus_population_id,
445
                EgonPetaHeat.demand,
446
            )
447
            .filter(MapZensusGridDistricts.bus_id == mvgd)
448
            .filter(
449
                MapZensusGridDistricts.zensus_population_id
450
                == EgonPetaHeat.zensus_population_id
451
            )
452
            .filter(
453
                EgonPetaHeat.sector == "residential",
454
                EgonPetaHeat.scenario == scenario,
455
            )
456
        )
457
458
        df_peta_demand = pd.read_sql(
459
            query.statement, query.session.bind, index_col=None
460
        )
461
462
    return df_peta_demand
463
464
465
def get_residential_heat_profile_ids(mvgd):
466
    """
467
    Retrieve 365 daily heat profiles ids per residential building and selected
468
    mvgd.
469
470
    Parameters
471
    ----------
472
    mvgd : int
473
        ID of MVGD
474
475
    Returns
476
    -------
477
    df_profiles_ids : pd.DataFrame
478
        Residential daily heat profile ID's per building. Columns of the
479
        dataframe are zensus_population_id, building_id,
480
        selected_idp_profiles, buildings and day_of_year.
481
482
    """
483
    with db.session_scope() as session:
484
        query = (
485
            session.query(
486
                MapZensusGridDistricts.zensus_population_id,
487
                EgonHeatTimeseries.building_id,
488
                EgonHeatTimeseries.selected_idp_profiles,
489
            )
490
            .filter(MapZensusGridDistricts.bus_id == mvgd)
491
            .filter(
492
                MapZensusGridDistricts.zensus_population_id
493
                == EgonHeatTimeseries.zensus_population_id
494
            )
495
        )
496
497
        df_profiles_ids = pd.read_sql(
498
            query.statement, query.session.bind, index_col=None
499
        )
500
    # Add building count per cell
501
    df_profiles_ids = pd.merge(
502
        left=df_profiles_ids,
503
        right=df_profiles_ids.groupby("zensus_population_id")["building_id"]
504
        .count()
505
        .rename("buildings"),
506
        left_on="zensus_population_id",
507
        right_index=True,
508
    )
509
510
    # unnest array of ids per building
511
    df_profiles_ids = df_profiles_ids.explode("selected_idp_profiles")
512
    # add day of year column by order of list
513
    df_profiles_ids["day_of_year"] = (
514
        df_profiles_ids.groupby("building_id").cumcount() + 1
515
    )
516
    return df_profiles_ids
517
518
519
def get_daily_profiles(profile_ids):
520
    """
521
    Parameters
522
    ----------
523
    profile_ids : list(int)
524
        daily heat profile ID's
525
526
    Returns
527
    -------
528
    df_profiles : pd.DataFrame
529
        Residential daily heat profiles. Columns of the dataframe are idp,
530
        house, temperature_class and hour.
531
532
    """
533
534
    saio.register_schema("demand", db.engine())
535
    from saio.demand import egon_heat_idp_pool
536
537
    with db.session_scope() as session:
538
        query = session.query(egon_heat_idp_pool).filter(
539
            egon_heat_idp_pool.index.in_(profile_ids)
540
        )
541
542
        df_profiles = pd.read_sql(
543
            query.statement, query.session.bind, index_col="index"
544
        )
545
546
    # unnest array of profile values per id
547
    df_profiles = df_profiles.explode("idp")
548
    # Add column for hour of day
549
    df_profiles["hour"] = df_profiles.groupby(axis=0, level=0).cumcount() + 1
550
551
    return df_profiles
552
553
554
def get_daily_demand_share(mvgd):
555
    """per census cell
556
    Parameters
557
    ----------
558
    mvgd : int
559
        MVGD id
560
561
    Returns
562
    -------
563
    df_daily_demand_share : pd.DataFrame
564
        Daily annual demand share per cencus cell. Columns of the dataframe
565
        are zensus_population_id, day_of_year and daily_demand_share.
566
567
    """
568
569
    with db.session_scope() as session:
570
        query = session.query(
571
            MapZensusGridDistricts.zensus_population_id,
572
            EgonDailyHeatDemandPerClimateZone.day_of_year,
573
            EgonDailyHeatDemandPerClimateZone.daily_demand_share,
574
        ).filter(
575
            EgonMapZensusClimateZones.climate_zone
576
            == EgonDailyHeatDemandPerClimateZone.climate_zone,
577
            MapZensusGridDistricts.zensus_population_id
578
            == EgonMapZensusClimateZones.zensus_population_id,
579
            MapZensusGridDistricts.bus_id == mvgd,
580
        )
581
582
        df_daily_demand_share = pd.read_sql(
583
            query.statement, query.session.bind, index_col=None
584
        )
585
    return df_daily_demand_share
586
587
588
def calc_residential_heat_profiles_per_mvgd(mvgd, scenario):
589
    """
590
    Gets residential heat profiles per building in MV grid for either eGon2035
591
    or eGon100RE scenario.
592
593
    Parameters
594
    ----------
595
    mvgd : int
596
        MV grid ID.
597
    scenario : str
598
        Possible options are eGon2035 or eGon100RE.
599
600
    Returns
601
    --------
602
    pd.DataFrame
603
        Heat demand profiles of buildings. Columns are:
604
            * zensus_population_id : int
605
                Zensus cell ID building is in.
606
            * building_id : int
607
                ID of building.
608
            * day_of_year : int
609
                Day of the year (1 - 365).
610
            * hour : int
611
                Hour of the day (1 - 24).
612
            * demand_ts : float
613
                Building's residential heat demand in MW, for specified hour
614
                of the year (specified through columns `day_of_year` and
615
                `hour`).
616
    """
617
618
    columns = [
619
        "zensus_population_id",
620
        "building_id",
621
        "day_of_year",
622
        "hour",
623
        "demand_ts",
624
    ]
625
626
    df_peta_demand = get_peta_demand(mvgd, scenario)
627
628
    # TODO maybe return empty dataframe
629
    if df_peta_demand.empty:
630
        logger.info(f"No demand for MVGD: {mvgd}")
631
        return pd.DataFrame(columns=columns)
632
633
    df_profiles_ids = get_residential_heat_profile_ids(mvgd)
634
635
    if df_profiles_ids.empty:
636
        logger.info(f"No profiles for MVGD: {mvgd}")
637
        return pd.DataFrame(columns=columns)
638
639
    df_profiles = get_daily_profiles(
640
        df_profiles_ids["selected_idp_profiles"].unique()
641
    )
642
643
    df_daily_demand_share = get_daily_demand_share(mvgd)
644
645
    # Merge profile ids to peta demand by zensus_population_id
646
    df_profile_merge = pd.merge(
647
        left=df_peta_demand, right=df_profiles_ids, on="zensus_population_id"
648
    )
649
650
    # Merge daily demand to daily profile ids by zensus_population_id and day
651
    df_profile_merge = pd.merge(
652
        left=df_profile_merge,
653
        right=df_daily_demand_share,
654
        on=["zensus_population_id", "day_of_year"],
655
    )
656
657
    # Merge daily profiles by profile id
658
    df_profile_merge = pd.merge(
659
        left=df_profile_merge,
660
        right=df_profiles[["idp", "hour"]],
661
        left_on="selected_idp_profiles",
662
        right_index=True,
663
    )
664
665
    # Scale profiles
666
    df_profile_merge["demand_ts"] = (
667
        df_profile_merge["idp"]
668
        .mul(df_profile_merge["daily_demand_share"])
669
        .mul(df_profile_merge["demand"])
670
        .div(df_profile_merge["buildings"])
671
    )
672
673
    return df_profile_merge.loc[:, columns]
674
675
676 View Code Duplication
def plot_heat_supply(resulting_capacities):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
677
678
    from matplotlib import pyplot as plt
679
680
    mv_grids = db.select_geodataframe(
681
        """
682
        SELECT * FROM grid.egon_mv_grid_district
683
        """,
684
        index_col="bus_id",
685
    )
686
687
    for c in ["CHP", "heat_pump"]:
688
        mv_grids[c] = (
689
            resulting_capacities[resulting_capacities.carrier == c]
690
            .set_index("mv_grid_id")
691
            .capacity
692
        )
693
694
        fig, ax = plt.subplots(1, 1)
695
        mv_grids.boundary.plot(linewidth=0.2, ax=ax, color="black")
696
        mv_grids.plot(
697
            ax=ax,
698
            column=c,
699
            cmap="magma_r",
700
            legend=True,
701
            legend_kwds={
702
                "label": f"Installed {c} in MW",
703
                "orientation": "vertical",
704
            },
705
        )
706
        plt.savefig(f"plots/individual_heat_supply_{c}.png", dpi=300)
707
708
709
def get_zensus_cells_with_decentral_heat_demand_in_mv_grid(
710
    scenario, mv_grid_id
711
):
712
    """
713
    Returns zensus cell IDs with decentral heating systems in given MV grid.
714
715
    As cells with district heating differ between scenarios, this is also
716
    depending on the scenario.
717
718
    Parameters
719
    -----------
720
    scenario : str
721
        Name of scenario. Can be either "eGon2035" or "eGon100RE".
722
    mv_grid_id : int
723
        ID of MV grid.
724
725
    Returns
726
    --------
727
    pd.Index(int)
728
        Zensus cell IDs (as int) of buildings with decentral heating systems in
729
        given MV grid. Type is pandas Index to avoid errors later on when it is
730
        used in a query.
731
732
    """
733
734
    # get zensus cells in grid
735
    zensus_population_ids = db.select_dataframe(
736
        f"""
737
        SELECT zensus_population_id
738
        FROM boundaries.egon_map_zensus_grid_districts
739
        WHERE bus_id = {mv_grid_id}
740
        """,
741
        index_col=None,
742
    ).zensus_population_id.values
743
744
    # maybe use adapter
745
    # convert to pd.Index (otherwise type is np.int64, which will for some
746
    # reason throw an error when used in a query)
747
    zensus_population_ids = pd.Index(zensus_population_ids)
748
749
    # get zensus cells with district heating
750
    with db.session_scope() as session:
751
        query = session.query(
752
            MapZensusDistrictHeatingAreas.zensus_population_id,
753
        ).filter(
754
            MapZensusDistrictHeatingAreas.scenario == scenario,
755
            MapZensusDistrictHeatingAreas.zensus_population_id.in_(
756
                zensus_population_ids
757
            ),
758
        )
759
760
        cells_with_dh = pd.read_sql(
761
            query.statement, query.session.bind, index_col=None
762
        ).zensus_population_id.values
763
764
    # remove zensus cells with district heating
765
    zensus_population_ids = zensus_population_ids.drop(
766
        cells_with_dh, errors="ignore"
767
    )
768
    return pd.Index(zensus_population_ids)
769
770
771
def get_residential_buildings_with_decentral_heat_demand_in_mv_grid(
772
    scenario, mv_grid_id
773
):
774
    """
775
    Returns building IDs of buildings with decentral residential heat demand in
776
    given MV grid.
777
778
    As cells with district heating differ between scenarios, this is also
779
    depending on the scenario.
780
781
    Parameters
782
    -----------
783
    scenario : str
784
        Name of scenario. Can be either "eGon2035" or "eGon100RE".
785
    mv_grid_id : int
786
        ID of MV grid.
787
788
    Returns
789
    --------
790
    pd.Index(int)
791
        Building IDs (as int) of buildings with decentral heating system in
792
        given MV grid. Type is pandas Index to avoid errors later on when it is
793
        used in a query.
794
795
    """
796
    # get zensus cells with decentral heating
797
    zensus_population_ids = (
798
        get_zensus_cells_with_decentral_heat_demand_in_mv_grid(
799
            scenario, mv_grid_id
800
        )
801
    )
802
803
    # get buildings with decentral heat demand
804
    saio.register_schema("demand", engine)
805
    from saio.demand import egon_heat_timeseries_selected_profiles
806
807
    with db.session_scope() as session:
808
        query = session.query(
809
            egon_heat_timeseries_selected_profiles.building_id,
810
        ).filter(
811
            egon_heat_timeseries_selected_profiles.zensus_population_id.in_(
812
                zensus_population_ids
813
            )
814
        )
815
816
        buildings_with_heat_demand = pd.read_sql(
817
            query.statement, query.session.bind, index_col=None
818
        ).building_id.values
819
820
    return pd.Index(buildings_with_heat_demand)
821
822
823
def get_cts_buildings_with_decentral_heat_demand_in_mv_grid(
824
    scenario, mv_grid_id
825
):
826
    """
827
    Returns building IDs of buildings with decentral CTS heat demand in
828
    given MV grid.
829
830
    As cells with district heating differ between scenarios, this is also
831
    depending on the scenario.
832
833
    Parameters
834
    -----------
835
    scenario : str
836
        Name of scenario. Can be either "eGon2035" or "eGon100RE".
837
    mv_grid_id : int
838
        ID of MV grid.
839
840
    Returns
841
    --------
842
    pd.Index(int)
843
        Building IDs (as int) of buildings with decentral heating system in
844
        given MV grid. Type is pandas Index to avoid errors later on when it is
845
        used in a query.
846
847
    """
848
849
    # get zensus cells with decentral heating
850
    zensus_population_ids = (
851
        get_zensus_cells_with_decentral_heat_demand_in_mv_grid(
852
            scenario, mv_grid_id
853
        )
854
    )
855
856
    # get buildings with decentral heat demand
857
    with db.session_scope() as session:
858
        query = session.query(EgonMapZensusMvgdBuildings.building_id).filter(
859
            EgonMapZensusMvgdBuildings.sector == "cts",
860
            EgonMapZensusMvgdBuildings.zensus_population_id.in_(
861
                zensus_population_ids
862
            ),
863
        )
864
865
        buildings_with_heat_demand = pd.read_sql(
866
            query.statement, query.session.bind, index_col=None
867
        ).building_id.values
868
869
    return pd.Index(buildings_with_heat_demand)
870
871
872
def get_buildings_with_decentral_heat_demand_in_mv_grid(mvgd, scenario):
873
    """
874
    Returns building IDs of buildings with decentral heat demand in
875
    given MV grid.
876
877
    As cells with district heating differ between scenarios, this is also
878
    depending on the scenario. CTS and residential have to be retrieved
879
    seperatly as some residential buildings only have electricity but no
880
    heat demand. This does not occure in CTS.
881
882
    Parameters
883
    -----------
884
    mvgd : int
885
        ID of MV grid.
886
    scenario : str
887
        Name of scenario. Can be either "eGon2035" or "eGon100RE".
888
889
    Returns
890
    --------
891
    pd.Index(int)
892
        Building IDs (as int) of buildings with decentral heating system in
893
        given MV grid. Type is pandas Index to avoid errors later on when it is
894
        used in a query.
895
896
    """
897
    # get residential buildings with decentral heating systems
898
    buildings_decentral_heating_res = (
899
        get_residential_buildings_with_decentral_heat_demand_in_mv_grid(
900
            scenario, mvgd
901
        )
902
    )
903
904
    # get CTS buildings with decentral heating systems
905
    buildings_decentral_heating_cts = (
906
        get_cts_buildings_with_decentral_heat_demand_in_mv_grid(scenario, mvgd)
907
    )
908
909
    # merge residential and CTS buildings
910
    buildings_decentral_heating = buildings_decentral_heating_res.append(
911
        buildings_decentral_heating_cts
912
    ).unique()
913
914
    return buildings_decentral_heating
915
916
917
def get_total_heat_pump_capacity_of_mv_grid(scenario, mv_grid_id):
918
    """
919
    Returns total heat pump capacity per grid that was previously defined
920
    (by NEP or pypsa-eur-sec).
921
922
    Parameters
923
    -----------
924
    scenario : str
925
        Name of scenario. Can be either "eGon2035" or "eGon100RE".
926
    mv_grid_id : int
927
        ID of MV grid.
928
929
    Returns
930
    --------
931
    float
932
        Total heat pump capacity in MW in given MV grid.
933
934
    """
935
    from egon.data.datasets.heat_supply import EgonIndividualHeatingSupply
936
937
    with db.session_scope() as session:
938
        query = (
939
            session.query(
940
                EgonIndividualHeatingSupply.mv_grid_id,
941
                EgonIndividualHeatingSupply.capacity,
942
            )
943
            .filter(EgonIndividualHeatingSupply.scenario == scenario)
944
            .filter(EgonIndividualHeatingSupply.carrier == "heat_pump")
945
            .filter(EgonIndividualHeatingSupply.mv_grid_id == mv_grid_id)
946
        )
947
948
        hp_cap_mv_grid = pd.read_sql(
949
            query.statement, query.session.bind, index_col="mv_grid_id"
950
        )
951
    if hp_cap_mv_grid.empty:
952
        return 0.0
953
    else:
954
        return hp_cap_mv_grid.capacity.values[0]
955
956
957
def get_heat_peak_demand_per_building(scenario, building_ids):
958
    """"""
959
960
    with db.session_scope() as session:
961
        query = (
962
            session.query(
963
                BuildingHeatPeakLoads.building_id,
964
                BuildingHeatPeakLoads.peak_load_in_w,
965
            )
966
            .filter(BuildingHeatPeakLoads.scenario == scenario)
967
            .filter(BuildingHeatPeakLoads.building_id.in_(building_ids))
968
        )
969
970
        df_heat_peak_demand = pd.read_sql(
971
            query.statement, query.session.bind, index_col=None
972
        )
973
974
    # TODO remove check
975
    if df_heat_peak_demand.duplicated("building_id").any():
976
        raise ValueError("Duplicate building_id")
977
978
    # convert to series and from W to MW
979
    df_heat_peak_demand = (
980
        df_heat_peak_demand.set_index("building_id").loc[:, "peak_load_in_w"]
981
        * 1e6
982
    )
983
    return df_heat_peak_demand
984
985
986
def determine_minimum_hp_capacity_per_building(
987
    peak_heat_demand, flexibility_factor=24 / 18, cop=1.7
988
):
989
    """
990
    Determines minimum required heat pump capacity.
991
992
    Parameters
993
    ----------
994
    peak_heat_demand : pd.Series
995
        Series with peak heat demand per building in MW. Index contains the
996
        building ID.
997
    flexibility_factor : float
998
        Factor to overdimension the heat pump to allow for some flexible
999
        dispatch in times of high heat demand. Per default, a factor of 24/18
1000
        is used, to take into account
1001
1002
    Returns
1003
    -------
1004
    pd.Series
1005
        Pandas series with minimum required heat pump capacity per building in
1006
        MW.
1007
1008
    """
1009
    return peak_heat_demand * flexibility_factor / cop
1010
1011
1012
def determine_buildings_with_hp_in_mv_grid(
1013
    hp_cap_mv_grid, min_hp_cap_per_building
1014
):
1015
    """
1016
    Distributes given total heat pump capacity to buildings based on their peak
1017
    heat demand.
1018
1019
    Parameters
1020
    -----------
1021
    hp_cap_mv_grid : float
1022
        Total heat pump capacity in MW in given MV grid.
1023
    min_hp_cap_per_building : pd.Series
1024
        Pandas series with minimum required heat pump capacity per building
1025
         in MW.
1026
1027
    Returns
1028
    -------
1029
    pd.Index(int)
1030
        Building IDs (as int) of buildings to get heat demand time series for.
1031
1032
    """
1033
    building_ids = min_hp_cap_per_building.index
1034
1035
    # get buildings with PV to give them a higher priority when selecting
1036
    # buildings a heat pump will be allocated to
1037
    saio.register_schema("supply", engine)
1038
    from saio.supply import egon_power_plants_pv_roof_building
1039
1040
    with db.session_scope() as session:
1041
        query = session.query(
1042
            egon_power_plants_pv_roof_building.building_id
1043
        ).filter(
1044
            egon_power_plants_pv_roof_building.building_id.in_(building_ids),
1045
            egon_power_plants_pv_roof_building.scenario == "eGon2035",
1046
        )
1047
1048
        buildings_with_pv = pd.read_sql(
1049
            query.statement, query.session.bind, index_col=None
1050
        ).building_id.values
1051
    # set different weights for buildings with PV and without PV
1052
    weight_with_pv = 1.5
1053
    weight_without_pv = 1.0
1054
    weights = pd.concat(
1055
        [
1056
            pd.DataFrame(
1057
                {"weight": weight_without_pv},
1058
                index=building_ids.drop(buildings_with_pv, errors="ignore"),
1059
            ),
1060
            pd.DataFrame({"weight": weight_with_pv}, index=buildings_with_pv),
1061
        ]
1062
    )
1063
    # normalise weights (probability needs to add up to 1)
1064
    weights.weight = weights.weight / weights.weight.sum()
1065
1066
    # get random order at which buildings are chosen
1067
    np.random.seed(db.credentials()["--random-seed"])
1068
    buildings_with_hp_order = np.random.choice(
1069
        weights.index,
1070
        size=len(weights),
1071
        replace=False,
1072
        p=weights.weight.values,
1073
    )
1074
1075
    # select buildings until HP capacity in MV grid is reached (some rest
1076
    # capacity will remain)
1077
    hp_cumsum = min_hp_cap_per_building.loc[buildings_with_hp_order].cumsum()
1078
    buildings_with_hp = hp_cumsum[hp_cumsum <= hp_cap_mv_grid].index
1079
1080
    # choose random heat pumps until remaining heat pumps are larger than
1081
    # remaining heat pump capacity
1082
    remaining_hp_cap = (
1083
        hp_cap_mv_grid - min_hp_cap_per_building.loc[buildings_with_hp].sum()
1084
    )
1085
    min_cap_buildings_wo_hp = min_hp_cap_per_building.loc[
1086
        building_ids.drop(buildings_with_hp)
1087
    ]
1088
    possible_buildings = min_cap_buildings_wo_hp[
1089
        min_cap_buildings_wo_hp <= remaining_hp_cap
1090
    ].index
1091
    while len(possible_buildings) > 0:
1092
        random.seed(db.credentials()["--random-seed"])
1093
        new_hp_building = random.choice(possible_buildings)
1094
        # add new building to building with HP
1095
        buildings_with_hp = buildings_with_hp.append(
1096
            pd.Index([new_hp_building])
1097
        )
1098
        # determine if there are still possible buildings
1099
        remaining_hp_cap = (
1100
            hp_cap_mv_grid
1101
            - min_hp_cap_per_building.loc[buildings_with_hp].sum()
1102
        )
1103
        min_cap_buildings_wo_hp = min_hp_cap_per_building.loc[
1104
            building_ids.drop(buildings_with_hp)
1105
        ]
1106
        possible_buildings = min_cap_buildings_wo_hp[
1107
            min_cap_buildings_wo_hp <= remaining_hp_cap
1108
        ].index
1109
1110
    return buildings_with_hp
1111
1112
1113
def desaggregate_hp_capacity(min_hp_cap_per_building, hp_cap_mv_grid):
1114
    """
1115
    Desaggregates the required total heat pump capacity to buildings.
1116
1117
    All buildings are previously assigned a minimum required heat pump
1118
    capacity. If the total heat pump capacity exceeds this, larger heat pumps
1119
    are assigned.
1120
1121
    Parameters
1122
    ------------
1123
    min_hp_cap_per_building : pd.Series
1124
        Pandas series with minimum required heat pump capacity per building
1125
         in MW.
1126
    hp_cap_mv_grid : float
1127
        Total heat pump capacity in MW in given MV grid.
1128
1129
    Returns
1130
    --------
1131
    pd.Series
1132
        Pandas series with heat pump capacity per building in MW.
1133
1134
    """
1135
    # distribute remaining capacity to all buildings with HP depending on
1136
    # installed HP capacity
1137
1138
    allocated_cap = min_hp_cap_per_building.sum()
1139
    remaining_cap = hp_cap_mv_grid - allocated_cap
1140
1141
    fac = remaining_cap / allocated_cap
1142
    hp_cap_per_building = (
1143
        min_hp_cap_per_building * fac + min_hp_cap_per_building
1144
    )
1145
    hp_cap_per_building.index.name = "building_id"
1146
1147
    return hp_cap_per_building
1148
1149
1150
def determine_min_hp_cap_buildings_pypsa_eur_sec(
1151
    peak_heat_demand, building_ids
1152
):
1153
    """
1154
    Determines minimum required HP capacity in MV grid in MW as input for
1155
    pypsa-eur-sec.
1156
1157
    Parameters
1158
    ----------
1159
    peak_heat_demand : pd.Series
1160
        Series with peak heat demand per building in MW. Index contains the
1161
        building ID.
1162
    building_ids : pd.Index(int)
1163
        Building IDs (as int) of buildings with decentral heating system in
1164
        given MV grid.
1165
1166
    Returns
1167
    --------
1168
    float
1169
        Minimum required HP capacity in MV grid in MW.
1170
1171
    """
1172
    if len(building_ids) > 0:
1173
        peak_heat_demand = peak_heat_demand.loc[building_ids]
1174
        # determine minimum required heat pump capacity per building
1175
        min_hp_cap_buildings = determine_minimum_hp_capacity_per_building(
1176
            peak_heat_demand
1177
        )
1178
        return min_hp_cap_buildings.sum()
1179
    else:
1180
        return 0.0
1181
1182
1183
def determine_hp_cap_buildings_eGon2035_per_mvgd(
1184
    mv_grid_id, peak_heat_demand, building_ids
1185
):
1186
    """
1187
    Determines which buildings in the MV grid will have a HP (buildings with PV
1188
    rooftop are more likely to be assigned) in the eGon2035 scenario, as well
1189
    as their respective HP capacity in MW.
1190
1191
    Parameters
1192
    -----------
1193
    mv_grid_id : int
1194
        ID of MV grid.
1195
    peak_heat_demand : pd.Series
1196
        Series with peak heat demand per building in MW. Index contains the
1197
        building ID.
1198
    building_ids : pd.Index(int)
1199
        Building IDs (as int) of buildings with decentral heating system in
1200
        given MV grid.
1201
1202
    """
1203
1204
    hp_cap_grid = get_total_heat_pump_capacity_of_mv_grid(
1205
        "eGon2035", mv_grid_id
1206
    )
1207
1208
    if len(building_ids) > 0 and hp_cap_grid > 0.0:
1209
        peak_heat_demand = peak_heat_demand.loc[building_ids]
1210
1211
        # determine minimum required heat pump capacity per building
1212
        min_hp_cap_buildings = determine_minimum_hp_capacity_per_building(
1213
            peak_heat_demand
1214
        )
1215
1216
        # select buildings that will have a heat pump
1217
        buildings_with_hp = determine_buildings_with_hp_in_mv_grid(
1218
            hp_cap_grid, min_hp_cap_buildings
1219
        )
1220
1221
        # distribute total heat pump capacity to all buildings with HP
1222
        hp_cap_per_building = desaggregate_hp_capacity(
1223
            min_hp_cap_buildings.loc[buildings_with_hp], hp_cap_grid
1224
        )
1225
1226
        return hp_cap_per_building.rename("hp_capacity")
1227
1228
    else:
1229
        return pd.Series(dtype="float64").rename("hp_capacity")
1230
1231
1232
def determine_hp_cap_buildings_eGon100RE_per_mvgd(mv_grid_id):
1233
    """
1234
    Determines HP capacity per building in eGon100RE scenario.
1235
1236
    In eGon100RE scenario all buildings without district heating get a heat
1237
    pump.
1238
1239
    Returns
1240
    --------
1241
    pd.Series
1242
        Pandas series with heat pump capacity per building in MW.
1243
1244
    """
1245
1246
    hp_cap_grid = get_total_heat_pump_capacity_of_mv_grid(
1247
        "eGon100RE", mv_grid_id
1248
    )
1249
1250
    if hp_cap_grid > 0.0:
1251
1252
        # get buildings with decentral heating systems
1253
        building_ids = get_buildings_with_decentral_heat_demand_in_mv_grid(
1254
            mv_grid_id, scenario="eGon100RE"
1255
        )
1256
1257
        logger.info(f"MVGD={mv_grid_id} | Get peak loads from DB")
1258
        df_peak_heat_demand = get_heat_peak_demand_per_building(
1259
            "eGon100RE", building_ids
1260
        )
1261
1262
        logger.info(f"MVGD={mv_grid_id} | Determine HP capacities.")
1263
        # determine minimum required heat pump capacity per building
1264
        min_hp_cap_buildings = determine_minimum_hp_capacity_per_building(
1265
            df_peak_heat_demand, flexibility_factor=24 / 18, cop=1.7
1266
        )
1267
1268
        logger.info(f"MVGD={mv_grid_id} | Desaggregate HP capacities.")
1269
        # distribute total heat pump capacity to all buildings with HP
1270
        hp_cap_per_building = desaggregate_hp_capacity(
1271
            min_hp_cap_buildings, hp_cap_grid
1272
        )
1273
1274
        return hp_cap_per_building.rename("hp_capacity")
1275
    else:
1276
        return pd.Series(dtype="float64").rename("hp_capacity")
1277
1278
1279
def determine_hp_cap_buildings_eGon100RE():
1280
    """
1281
    Main function to determine HP capacity per building in eGon100RE scenario.
1282
1283
    """
1284
1285
    # ========== Register np datatypes with SQLA ==========
1286
    register_adapter(np.float64, adapt_numpy_float64)
1287
    register_adapter(np.int64, adapt_numpy_int64)
1288
    # =====================================================
1289
1290
    with db.session_scope() as session:
1291
        query = (
1292
            session.query(
1293
                MapZensusGridDistricts.bus_id,
1294
            )
1295
            .filter(
1296
                MapZensusGridDistricts.zensus_population_id
1297
                == EgonPetaHeat.zensus_population_id
1298
            )
1299
            .distinct(MapZensusGridDistricts.bus_id)
1300
        )
1301
        mvgd_ids = pd.read_sql(
1302
            query.statement, query.session.bind, index_col=None
1303
        )
1304
    mvgd_ids = mvgd_ids.sort_values("bus_id")
1305
    mvgd_ids = mvgd_ids["bus_id"].values
1306
1307
    df_hp_cap_per_building_100RE_db = pd.DataFrame(
1308
        columns=["building_id", "hp_capacity"]
1309
    )
1310
1311
    for mvgd_id in mvgd_ids:
1312
1313
        logger.info(f"MVGD={mvgd_id} | Start")
1314
1315
        hp_cap_per_building_100RE = (
1316
            determine_hp_cap_buildings_eGon100RE_per_mvgd(mvgd_id)
1317
        )
1318
1319
        if not hp_cap_per_building_100RE.empty:
1320
            df_hp_cap_per_building_100RE_db = pd.concat(
1321
                [
1322
                    df_hp_cap_per_building_100RE_db,
1323
                    hp_cap_per_building_100RE.reset_index(),
1324
                ],
1325
                axis=0,
1326
            )
1327
1328
    logger.info(f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write data to db.")
1329
    df_hp_cap_per_building_100RE_db["scenario"] = "eGon100RE"
1330
1331
    EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True)
1332
1333
    write_table_to_postgres(
1334
        df_hp_cap_per_building_100RE_db,
1335
        EgonHpCapacityBuildings,
1336
        drop=False,
1337
    )
1338
1339
1340
def aggregate_residential_and_cts_profiles(mvgd, scenario):
1341
    """
1342
    Gets residential and CTS heat demand profiles per building and aggregates
1343
    them.
1344
1345
    Parameters
1346
    ----------
1347
    mvgd : int
1348
        MV grid ID.
1349
    scenario : str
1350
        Possible options are eGon2035 or eGon100RE.
1351
1352
    Returns
1353
    --------
1354
    pd.DataFrame
1355
        Table of demand profile per building. Column names are building IDs and
1356
        index is hour of the year as int (0-8759).
1357
1358
    """
1359
    # ############### get residential heat demand profiles ###############
1360
    df_heat_ts = calc_residential_heat_profiles_per_mvgd(
1361
        mvgd=mvgd, scenario=scenario
1362
    )
1363
1364
    # pivot to allow aggregation with CTS profiles
1365
    df_heat_ts = df_heat_ts.pivot(
1366
        index=["day_of_year", "hour"],
1367
        columns="building_id",
1368
        values="demand_ts",
1369
    )
1370
    df_heat_ts = df_heat_ts.sort_index().reset_index(drop=True)
1371
1372
    # ############### get CTS heat demand profiles ###############
1373
    heat_demand_cts_ts = calc_cts_building_profiles(
1374
        bus_ids=[mvgd],
1375
        scenario=scenario,
1376
        sector="heat",
1377
    )
1378
1379
    # ############# aggregate residential and CTS demand profiles #############
1380
    df_heat_ts = pd.concat([df_heat_ts, heat_demand_cts_ts], axis=1)
1381
1382
    df_heat_ts = df_heat_ts.groupby(axis=1, level=0).sum()
1383
1384
    return df_heat_ts
1385
1386
1387
def export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=False):
1388
    """
1389
    Function to export the collected results of all MVGDs per bulk to DB.
1390
1391
        Parameters
1392
    ----------
1393
    df_peak_loads_db : pd.DataFrame
1394
        Table of building peak loads of all MVGDs per bulk
1395
    df_heat_mvgd_ts_db : pd.DataFrame
1396
        Table of all aggregated MVGD profiles per bulk
1397
    drop : boolean
1398
        Drop and recreate table if True
1399
1400
    """
1401
1402
    df_peak_loads_db = df_peak_loads_db.melt(
1403
        id_vars="building_id",
1404
        var_name="scenario",
1405
        value_name="peak_load_in_w",
1406
    )
1407
    df_peak_loads_db["building_id"] = df_peak_loads_db["building_id"].astype(
1408
        int
1409
    )
1410
    df_peak_loads_db["sector"] = "residential+cts"
1411
    # From MW to W
1412
    df_peak_loads_db["peak_load_in_w"] = (
1413
        df_peak_loads_db["peak_load_in_w"] * 1e6
1414
    )
1415
    write_table_to_postgres(df_peak_loads_db, BuildingHeatPeakLoads, drop=drop)
1416
1417
    dtypes = {
1418
        column.key: column.type
1419
        for column in EgonEtragoTimeseriesIndividualHeating.__table__.columns
1420
    }
1421
    df_heat_mvgd_ts_db = df_heat_mvgd_ts_db.loc[:, dtypes.keys()]
1422
1423
    if drop:
1424
        logger.info(
1425
            f"Drop and recreate table "
1426
            f"{EgonEtragoTimeseriesIndividualHeating.__table__.name}."
1427
        )
1428
        EgonEtragoTimeseriesIndividualHeating.__table__.drop(
1429
            bind=engine, checkfirst=True
1430
        )
1431
        EgonEtragoTimeseriesIndividualHeating.__table__.create(
1432
            bind=engine, checkfirst=True
1433
        )
1434
1435
    with db.session_scope() as session:
1436
        df_heat_mvgd_ts_db.to_sql(
1437
            name=EgonEtragoTimeseriesIndividualHeating.__table__.name,
1438
            schema=EgonEtragoTimeseriesIndividualHeating.__table__.schema,
1439
            con=session.connection(),
1440
            if_exists="append",
1441
            method="multi",
1442
            index=False,
1443
            dtype=dtypes,
1444
        )
1445
1446
1447
def export_min_cap_to_csv(df_hp_min_cap_mv_grid_pypsa_eur_sec):
1448
    """Export minimum capacity of heat pumps for pypsa eur sec to csv"""
1449
1450
    df_hp_min_cap_mv_grid_pypsa_eur_sec.index.name = "mvgd_id"
1451
    df_hp_min_cap_mv_grid_pypsa_eur_sec = (
1452
        df_hp_min_cap_mv_grid_pypsa_eur_sec.to_frame(
1453
            name="min_hp_capacity"
1454
        ).reset_index()
1455
    )
1456
1457
    folder = Path(".") / "input-pypsa-eur-sec"
1458
    file = folder / "minimum_hp_capacity_mv_grid_100RE.csv"
1459
    # Create the folder, if it does not exist already
1460
    if not os.path.exists(folder):
1461
        os.mkdir(folder)
1462
    if not file.is_file():
1463
        logger.info(f"Create {file}")
1464
        df_hp_min_cap_mv_grid_pypsa_eur_sec.to_csv(
1465
            file, mode="w", header=False
1466
        )
1467
    else:
1468
        logger.info(f"Remove {file}")
1469
        os.remove(file)
1470
        logger.info(f"Create {file}")
1471
        df_hp_min_cap_mv_grid_pypsa_eur_sec.to_csv(
1472
            file, mode="a", header=False
1473
        )
1474
1475
1476
def catch_missing_buidings(buildings_decentral_heating, peak_load):
1477
    """
1478
    Check for missing buildings and reduce the list of buildings with
1479
    decentral heating if no peak loads available. This should only happen
1480
    in case of cutout SH
1481
1482
    Parameters
1483
    -----------
1484
    buildings_decentral_heating : list(int)
1485
        Array or list of buildings with decentral heating
1486
1487
    peak_load : pd.Series
1488
        Peak loads of all building within the mvgd
1489
1490
    """
1491
    # Catch missing buildings key error
1492
    # should only happen within cutout SH
1493
    if (
1494
        not all(buildings_decentral_heating.isin(peak_load.index))
1495
        and config.settings()["egon-data"]["--dataset-boundary"]
1496
        == "Schleswig-Holstein"
1497
    ):
1498
        diff = buildings_decentral_heating.difference(peak_load.index)
1499
        logger.warning(
1500
            f"Dropped {len(diff)} building ids due to missing peak "
1501
            f"loads. {len(buildings_decentral_heating)} left."
1502
        )
1503
        logger.info(f"Dropped buildings: {diff.values}")
1504
        buildings_decentral_heating = buildings_decentral_heating.drop(diff)
1505
1506
    return buildings_decentral_heating
1507
1508
1509
def determine_hp_cap_peak_load_mvgd_ts_2035(mvgd_ids):
1510
    """
1511
    Main function to determine HP capacity per building in eGon2035 scenario.
1512
    Further, creates heat demand time series for all buildings with heat pumps
1513
    in MV grid, as well as for all buildings with gas boilers, used in eTraGo.
1514
1515
    Parameters
1516
    -----------
1517
    mvgd_ids : list(int)
1518
        List of MV grid IDs to determine data for.
1519
1520
    """
1521
1522
    # ========== Register np datatypes with SQLA ==========
1523
    register_adapter(np.float64, adapt_numpy_float64)
1524
    register_adapter(np.int64, adapt_numpy_int64)
1525
    # =====================================================
1526
1527
    df_peak_loads_db = pd.DataFrame()
1528
    df_hp_cap_per_building_2035_db = pd.DataFrame()
1529
    df_heat_mvgd_ts_db = pd.DataFrame()
1530
1531
    for mvgd in mvgd_ids:
1532
1533
        logger.info(f"MVGD={mvgd} | Start")
1534
1535
        # ############# aggregate residential and CTS demand profiles #####
1536
1537
        df_heat_ts = aggregate_residential_and_cts_profiles(
1538
            mvgd, scenario="eGon2035"
1539
        )
1540
1541
        # ##################### determine peak loads ###################
1542
        logger.info(f"MVGD={mvgd} | Determine peak loads.")
1543
1544
        peak_load_2035 = df_heat_ts.max().rename("eGon2035")
1545
1546
        # ######## determine HP capacity per building #########
1547
        logger.info(f"MVGD={mvgd} | Determine HP capacities.")
1548
1549
        buildings_decentral_heating = (
1550
            get_buildings_with_decentral_heat_demand_in_mv_grid(
1551
                mvgd, scenario="eGon2035"
1552
            )
1553
        )
1554
1555
        # Reduce list of decentral heating if no Peak load available
1556
        # TODO maybe remove after succesfull DE run
1557
        # Might be fixed in #990
1558
        buildings_decentral_heating = catch_missing_buidings(
1559
            buildings_decentral_heating, peak_load_2035
1560
        )
1561
1562
        hp_cap_per_building_2035 = (
1563
            determine_hp_cap_buildings_eGon2035_per_mvgd(
1564
                mvgd,
1565
                peak_load_2035,
1566
                buildings_decentral_heating,
1567
            )
1568
        )
1569
        buildings_gas_2035 = pd.Index(buildings_decentral_heating).drop(
1570
            hp_cap_per_building_2035.index
1571
        )
1572
1573
        # ################ aggregated heat profiles ###################
1574
        logger.info(f"MVGD={mvgd} | Aggregate heat profiles.")
1575
1576
        df_mvgd_ts_2035_hp = df_heat_ts.loc[
1577
            :,
1578
            hp_cap_per_building_2035.index,
1579
        ].sum(axis=1)
1580
1581
        # heat demand time series for buildings with gas boiler
1582
        df_mvgd_ts_2035_gas = df_heat_ts.loc[:, buildings_gas_2035].sum(axis=1)
1583
1584
        df_heat_mvgd_ts = pd.DataFrame(
1585
            data={
1586
                "carrier": ["heat_pump", "CH4"],
1587
                "bus_id": mvgd,
1588
                "scenario": ["eGon2035", "eGon2035"],
1589
                "dist_aggregated_mw": [
1590
                    df_mvgd_ts_2035_hp.to_list(),
1591
                    df_mvgd_ts_2035_gas.to_list(),
1592
                ],
1593
            }
1594
        )
1595
1596
        # ################ collect results ##################
1597
        logger.info(f"MVGD={mvgd} | Collect results.")
1598
1599
        df_peak_loads_db = pd.concat(
1600
            [df_peak_loads_db, peak_load_2035.reset_index()],
1601
            axis=0,
1602
            ignore_index=True,
1603
        )
1604
1605
        df_heat_mvgd_ts_db = pd.concat(
1606
            [df_heat_mvgd_ts_db, df_heat_mvgd_ts], axis=0, ignore_index=True
1607
        )
1608
1609
        df_hp_cap_per_building_2035_db = pd.concat(
1610
            [
1611
                df_hp_cap_per_building_2035_db,
1612
                hp_cap_per_building_2035.reset_index(),
1613
            ],
1614
            axis=0,
1615
        )
1616
1617
    # ################ export to db #######################
1618
    logger.info(f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write data to db.")
1619
1620
    export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=False)
1621
1622
    df_hp_cap_per_building_2035_db["scenario"] = "eGon2035"
1623
1624
    # TODO debug duplicated building_ids
1625
    duplicates = df_hp_cap_per_building_2035_db.loc[
1626
        df_hp_cap_per_building_2035_db.duplicated("building_id", keep=False)
1627
    ]
1628
1629
    logger.info(
1630
        f"Dropped duplicated buildings: "
1631
        f"{duplicates.loc[:,['building_id', 'hp_capacity']]}"
1632
    )
1633
1634
    df_hp_cap_per_building_2035_db.drop_duplicates("building_id", inplace=True)
1635
1636
    write_table_to_postgres(
1637
        df_hp_cap_per_building_2035_db,
1638
        EgonHpCapacityBuildings,
1639
        drop=False,
1640
    )
1641
1642
1643
def determine_hp_cap_peak_load_mvgd_ts_pypsa_eur_sec(mvgd_ids):
1644
    """
1645
    Main function to determine minimum required HP capacity in MV for
1646
    pypsa-eur-sec. Further, creates heat demand time series for all buildings
1647
    with heat pumps in MV grid in eGon100RE scenario, used in eTraGo.
1648
1649
    Parameters
1650
    -----------
1651
    mvgd_ids : list(int)
1652
        List of MV grid IDs to determine data for.
1653
1654
    """
1655
1656
    # ========== Register np datatypes with SQLA ==========
1657
    register_adapter(np.float64, adapt_numpy_float64)
1658
    register_adapter(np.int64, adapt_numpy_int64)
1659
    # =====================================================
1660
1661
    df_peak_loads_db = pd.DataFrame()
1662
    df_heat_mvgd_ts_db = pd.DataFrame()
1663
    df_hp_min_cap_mv_grid_pypsa_eur_sec = pd.Series(dtype="float64")
1664
1665
    for mvgd in mvgd_ids:
1666
1667
        logger.info(f"MVGD={mvgd} | Start")
1668
1669
        # ############# aggregate residential and CTS demand profiles #####
1670
1671
        df_heat_ts = aggregate_residential_and_cts_profiles(
1672
            mvgd, scenario="eGon100RE"
1673
        )
1674
1675
        # ##################### determine peak loads ###################
1676
        logger.info(f"MVGD={mvgd} | Determine peak loads.")
1677
1678
        peak_load_100RE = df_heat_ts.max().rename("eGon100RE")
1679
1680
        # ######## determine minimum HP capacity pypsa-eur-sec ###########
1681
        logger.info(f"MVGD={mvgd} | Determine minimum HP capacity.")
1682
1683
        buildings_decentral_heating = (
1684
            get_buildings_with_decentral_heat_demand_in_mv_grid(
1685
                mvgd, scenario="eGon100RE"
1686
            )
1687
        )
1688
1689
        # Reduce list of decentral heating if no Peak load available
1690
        # TODO maybe remove after succesfull DE run
1691
        buildings_decentral_heating = catch_missing_buidings(
1692
            buildings_decentral_heating, peak_load_100RE
1693
        )
1694
1695
        hp_min_cap_mv_grid_pypsa_eur_sec = (
1696
            determine_min_hp_cap_buildings_pypsa_eur_sec(
1697
                peak_load_100RE,
1698
                buildings_decentral_heating,
1699
            )
1700
        )
1701
1702
        # ################ aggregated heat profiles ###################
1703
        logger.info(f"MVGD={mvgd} | Aggregate heat profiles.")
1704
1705
        df_mvgd_ts_hp = df_heat_ts.loc[
1706
            :,
1707
            buildings_decentral_heating,
1708
        ].sum(axis=1)
1709
1710
        df_heat_mvgd_ts = pd.DataFrame(
1711
            data={
1712
                "carrier": "heat_pump",
1713
                "bus_id": mvgd,
1714
                "scenario": "eGon100RE",
1715
                "dist_aggregated_mw": [df_mvgd_ts_hp.to_list()],
1716
            }
1717
        )
1718
1719
        # ################ collect results ##################
1720
        logger.info(f"MVGD={mvgd} | Collect results.")
1721
1722
        df_peak_loads_db = pd.concat(
1723
            [df_peak_loads_db, peak_load_100RE.reset_index()],
1724
            axis=0,
1725
            ignore_index=True,
1726
        )
1727
1728
        df_heat_mvgd_ts_db = pd.concat(
1729
            [df_heat_mvgd_ts_db, df_heat_mvgd_ts], axis=0, ignore_index=True
1730
        )
1731
1732
        df_hp_min_cap_mv_grid_pypsa_eur_sec.loc[
1733
            mvgd
1734
        ] = hp_min_cap_mv_grid_pypsa_eur_sec
1735
1736
    # ################ export to db and csv ######################
1737
    logger.info(f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write data to db.")
1738
1739
    export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=True)
1740
1741
    logger.info(
1742
        f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write "
1743
        f"pypsa-eur-sec min "
1744
        f"HP capacities to csv."
1745
    )
1746
    export_min_cap_to_csv(df_hp_min_cap_mv_grid_pypsa_eur_sec)
1747
1748
1749
def split_mvgds_into_bulks(n, max_n, func):
1750
    """
1751
    Generic function to split task into multiple parallel tasks,
1752
    dividing the number of MVGDs into even bulks.
1753
1754
    Parameters
1755
    -----------
1756
    n : int
1757
        Number of bulk
1758
    max_n: int
1759
        Maximum number of bulks
1760
    func : function
1761
        The funnction which is then called with the list of MVGD as
1762
        parameter.
1763
    """
1764
1765
    with db.session_scope() as session:
1766
        query = (
1767
            session.query(
1768
                MapZensusGridDistricts.bus_id,
1769
            )
1770
            .filter(
1771
                MapZensusGridDistricts.zensus_population_id
1772
                == EgonPetaHeat.zensus_population_id
1773
            )
1774
            .distinct(MapZensusGridDistricts.bus_id)
1775
        )
1776
        mvgd_ids = pd.read_sql(
1777
            query.statement, query.session.bind, index_col=None
1778
        )
1779
1780
    mvgd_ids = mvgd_ids.sort_values("bus_id").reset_index(drop=True)
1781
1782
    mvgd_ids = np.array_split(mvgd_ids["bus_id"].values, max_n)
1783
    # Only take split n
1784
    mvgd_ids = mvgd_ids[n]
1785
1786
    logger.info(f"Bulk takes care of MVGD: {min(mvgd_ids)} : {max(mvgd_ids)}")
1787
    func(mvgd_ids)
1788
1789
1790
def delete_hp_capacity(scenario):
1791
    """Remove all hp capacities for the selected scenario
1792
1793
    Parameters
1794
    -----------
1795
    scenario : string
1796
        Either eGon2035 or eGon100RE
1797
1798
    """
1799
1800
    with db.session_scope() as session:
1801
        # Buses
1802
        session.query(EgonHpCapacityBuildings).filter(
1803
            EgonHpCapacityBuildings.scenario == scenario
1804
        ).delete(synchronize_session=False)
1805
1806
1807
def delete_mvgd_ts(scenario):
1808
    """Remove all hp capacities for the selected scenario
1809
1810
    Parameters
1811
    -----------
1812
    scenario : string
1813
        Either eGon2035 or eGon100RE
1814
1815
    """
1816
1817
    with db.session_scope() as session:
1818
        # Buses
1819
        session.query(EgonEtragoTimeseriesIndividualHeating).filter(
1820
            EgonEtragoTimeseriesIndividualHeating.scenario == scenario
1821
        ).delete(synchronize_session=False)
1822
1823
1824
def delete_hp_capacity_100RE():
1825
    """Remove all hp capacities for the selected eGon100RE"""
1826
    EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True)
1827
    delete_hp_capacity(scenario="eGon100RE")
1828
1829
1830
def delete_hp_capacity_2035():
1831
    """Remove all hp capacities for the selected eGon2035"""
1832
    EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True)
1833
    delete_hp_capacity(scenario="eGon2035")
1834
1835
1836
def delete_mvgd_ts_2035():
1837
    """Remove all mvgd ts for the selected eGon2035"""
1838
    EgonEtragoTimeseriesIndividualHeating.__table__.create(
1839
        bind=engine, checkfirst=True
1840
    )
1841
    delete_mvgd_ts(scenario="eGon2035")
1842
1843
1844
def delete_heat_peak_loads_eGon2035():
1845
    """Remove all heat peak loads for eGon2035.
1846
1847
    This is not necessary for eGon100RE as these peak loads are calculated in
1848
    HeatPumpsPypsaEurSec and tables are recreated during this dataset."""
1849
    with db.session_scope() as session:
1850
        # Buses
1851
        session.query(BuildingHeatPeakLoads).filter(
1852
            BuildingHeatPeakLoads.scenario == "eGon2035"
1853
        ).delete(synchronize_session=False)
1854