data.datasets.demandregio   F
last analyzed

Complexity

Total Complexity 60

Size/Duplication

Total Lines 1058
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 60
eloc 534
dl 0
loc 1058
rs 3.6
c 0
b 0
f 0

16 Functions

Rating   Name   Duplication   Size   Complexity  
A insert_cts_ind_wz_definitions() 0 42 3
D insert_cts_ind() 0 127 11
B insert_cts_ind_demands() 0 49 5
A create_tables() 0 20 1
A data_in_boundaries() 0 32 1
B adjust_ind_pes() 0 119 1
A disagg_households_power() 0 60 2
A insert_household_demand() 0 32 4
A timeseries_per_wz() 0 18 4
A adjust_cts_ind_nep() 0 40 2
A match_nuts3_bl() 0 26 1
B insert_society_data() 0 48 5
B write_demandregio_hh_profiles_to_db() 0 58 5
B insert_timeseries_per_wz() 0 79 3
C insert_hh_demand() 0 99 8
A get_cached_tables() 0 14 3

1 Method

Rating   Name   Duplication   Size   Complexity  
A DemandRegio.__init__() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like data.datasets.demandregio often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""The central module containing all code dealing with importing and
2
adjusting data from demandRegio
3
4
"""
5
6
from pathlib import Path
7
import os
8
import zipfile
9
10
from sqlalchemy import ARRAY, Column, Float, ForeignKey, Integer, String
11
from sqlalchemy.ext.declarative import declarative_base
12
import numpy as np
13
import pandas as pd
14
15
from egon.data import db, logger
16
from egon.data.datasets import Dataset
17
from egon.data.datasets.scenario_parameters import (
18
    EgonScenario,
19
    get_sector_parameters,
20
)
21
import egon.data.config
22
import egon.data.datasets.scenario_parameters.parameters as scenario_parameters
23
24
try:
25
    from disaggregator import config, data, spatial, temporal
26
27
except ImportError:
28
    pass
29
30
# will be later imported from another file ###
31
Base = declarative_base()
32
33
34
class DemandRegio(Dataset):
35
    """
36
    Extract and adjust data from DemandRegio
37
38
    Demand data for the sectors households, CTS and industry are calculated
39
    using DemandRegio's diaggregator and input data. To bring the resulting
40
    data in line with other data used in eGon-data and the eGon project in
41
    general some data needed to be adjusted or extended, e.g. in function
42
    :py:func:`adjust_ind_pes` or function :py:func:`adjust_cts_ind_nep`. The
43
    resulting data is written into newly created tables.
44
45
    *Dependencies*
46
      * :py:class:`DataBundle <egon.data.datasets.data_bundle.DataBundle>`
47
      * :py:class:`ScenarioParameters
48
      <egon.data.datasets.scenario_parameters.ScenarioParameters>`
49
      * :py:class:`ZensusVg250 <egon.data.datasets.zensus_vg250.ZensusVg250>`
50
51
    *Resulting tables*
52
      * :py:class:`demand.egon_demandregio_hh
53
      <egon.data.datasets.demandregio.EgonDemandRegioHH>` is created and filled
54
      * :py:class:`demand.egon_demandregio_cts_ind
55
      <egon.data.datasets.demandregio.EgonDemandRegioCtsInd>`
56
      is created and filled
57
      * :py:class:`society.egon_demandregio_population
58
      <egon.data.datasets.demandregio.EgonDemandRegioPopulation>`
59
      is created and filled
60
      * :py:class:`society.egon_demandregio_household
61
      <egon.data.datasets.demandregio.EgonDemandRegioHouseholds>`
62
      is created and filled
63
      * :py:class:`demand.egon_demandregio_wz
64
      <egon.data.datasets.demandregio.EgonDemandRegioWz>` is created and filled
65
      * :py:class:`demand.egon_demandregio_timeseries_cts_ind
66
      <egon.data.datasets.demandregio.EgonDemandRegioTimeseriesCtsInd>`
67
      is created and filled
68
69
    """
70
71
    #:
72
    name: str = "DemandRegio"
73
    #:
74
    version: str = "0.0.13"
75
76
    def __init__(self, dependencies):
77
        super().__init__(
78
            name=self.name,
79
            version=self.version,
80
            dependencies=dependencies,
81
            tasks=(
82
                get_cached_tables,  # adhoc workaround #180
83
                create_tables,
84
                {
85
                    insert_household_demand,
86
                    insert_society_data,
87
                    insert_cts_ind_demands,
88
                },
89
            ),
90
        )
91
92
93
class DemandRegioLoadProfiles(Base):
94
    __tablename__ = "demandregio_household_load_profiles"
95
    __table_args__ = {"schema": "demand"}
96
97
    id = Column(Integer, primary_key=True)
98
    year = Column(Integer)
99
    nuts3 = Column(String)
100
    load_in_mwh = Column(ARRAY(Float()))
101
102
103
class EgonDemandRegioHH(Base):
104
    __tablename__ = "egon_demandregio_hh"
105
    __table_args__ = {"schema": "demand"}
106
    nuts3 = Column(String(5), primary_key=True)
107
    hh_size = Column(Integer, primary_key=True)
108
    scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
109
    year = Column(Integer)
110
    demand = Column(Float)
111
112
113
class EgonDemandRegioCtsInd(Base):
114
    __tablename__ = "egon_demandregio_cts_ind"
115
    __table_args__ = {"schema": "demand"}
116
    nuts3 = Column(String(5), primary_key=True)
117
    wz = Column(Integer, primary_key=True)
118
    scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
119
    year = Column(Integer)
120
    demand = Column(Float)
121
122
123
class EgonDemandRegioPopulation(Base):
124
    __tablename__ = "egon_demandregio_population"
125
    __table_args__ = {"schema": "society"}
126
    nuts3 = Column(String(5), primary_key=True)
127
    year = Column(Integer, primary_key=True)
128
    population = Column(Float)
129
130
131
class EgonDemandRegioHouseholds(Base):
132
    __tablename__ = "egon_demandregio_household"
133
    __table_args__ = {"schema": "society"}
134
    nuts3 = Column(String(5), primary_key=True)
135
    hh_size = Column(Integer, primary_key=True)
136
    year = Column(Integer, primary_key=True)
137
    households = Column(Integer)
138
139
140
class EgonDemandRegioWz(Base):
141
    __tablename__ = "egon_demandregio_wz"
142
    __table_args__ = {"schema": "demand"}
143
    wz = Column(Integer, primary_key=True)
144
    sector = Column(String(50))
145
    definition = Column(String(150))
146
147
148
class EgonDemandRegioTimeseriesCtsInd(Base):
149
    __tablename__ = "egon_demandregio_timeseries_cts_ind"
150
    __table_args__ = {"schema": "demand"}
151
    wz = Column(Integer, primary_key=True)
152
    year = Column(Integer, primary_key=True)
153
    slp = Column(String(50))
154
    load_curve = Column(ARRAY(Float()))
155
156
157
def create_tables():
158
    """Create tables for demandregio data
159
    Returns
160
    -------
161
    None.
162
    """
163
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;")
164
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;")
165
    engine = db.engine()
166
    EgonDemandRegioHH.__table__.create(bind=engine, checkfirst=True)
167
    EgonDemandRegioCtsInd.__table__.create(bind=engine, checkfirst=True)
168
    EgonDemandRegioPopulation.__table__.create(bind=engine, checkfirst=True)
169
    EgonDemandRegioHouseholds.__table__.create(bind=engine, checkfirst=True)
170
    EgonDemandRegioWz.__table__.create(bind=engine, checkfirst=True)
171
    DemandRegioLoadProfiles.__table__.create(bind=db.engine(), checkfirst=True)
172
    EgonDemandRegioTimeseriesCtsInd.__table__.drop(
173
        bind=engine, checkfirst=True
174
    )
175
    EgonDemandRegioTimeseriesCtsInd.__table__.create(
176
        bind=engine, checkfirst=True
177
    )
178
179
180
def data_in_boundaries(df):
181
    """Select rows with nuts3 code within boundaries, used for testmode
182
183
    Parameters
184
    ----------
185
    df : pandas.DataFrame
186
        Data for all nuts3 regions
187
188
    Returns
189
    -------
190
    pandas.DataFrame
191
        Data for nuts3 regions within boundaries
192
193
    """
194
    engine = db.engine()
195
196
    df = df.reset_index()
197
198
    # Change nuts3 region names to 2016 version
199
    nuts_names = {"DEB16": "DEB1C", "DEB19": "DEB1D"}
200
    df.loc[df.nuts3.isin(nuts_names), "nuts3"] = df.loc[
201
        df.nuts3.isin(nuts_names), "nuts3"
202
    ].map(nuts_names)
203
204
    df = df.set_index("nuts3")
205
206
    return df[
207
        df.index.isin(
208
            pd.read_sql(
209
                "SELECT DISTINCT ON (nuts) nuts FROM boundaries.vg250_krs",
210
                engine,
211
            ).nuts
212
        )
213
    ]
214
215
216
def insert_cts_ind_wz_definitions():
217
    """Insert demandregio's definitions of CTS and industrial branches
218
219
    Returns
220
    -------
221
    None.
222
223
    """
224
225
    source = egon.data.config.datasets()["demandregio_cts_ind_demand"][
226
        "sources"
227
    ]
228
229
    target = egon.data.config.datasets()["demandregio_cts_ind_demand"][
230
        "targets"
231
    ]["wz_definitions"]
232
233
    engine = db.engine()
234
235
    for sector in source["wz_definitions"]:
236
        file_path = (
237
            Path(".")
238
            / "data_bundle_egon_data"
239
            / "WZ_definition"
240
            / source["wz_definitions"][sector]
241
        )
242
243
        if sector == "CTS":
244
            delimiter = ";"
245
        else:
246
            delimiter = ","
247
        df = (
248
            pd.read_csv(file_path, delimiter=delimiter, header=None)
249
            .rename({0: "wz", 1: "definition"}, axis="columns")
250
            .set_index("wz")
251
        )
252
        df["sector"] = sector
253
        df.to_sql(
254
            target["table"],
255
            engine,
256
            schema=target["schema"],
257
            if_exists="append",
258
        )
259
260
261
def match_nuts3_bl():
262
    """Function that maps the federal state to each nuts3 region
263
264
    Returns
265
    -------
266
    df : pandas.DataFrame
267
        List of nuts3 regions and the federal state of Germany.
268
269
    """
270
271
    engine = db.engine()
272
273
    df = pd.read_sql(
274
        "SELECT DISTINCT ON (boundaries.vg250_krs.nuts) "
275
        "boundaries.vg250_krs.nuts, boundaries.vg250_lan.gen "
276
        "FROM boundaries.vg250_lan, boundaries.vg250_krs "
277
        " WHERE ST_CONTAINS("
278
        "boundaries.vg250_lan.geometry, "
279
        "boundaries.vg250_krs.geometry)",
280
        con=engine,
281
    )
282
283
    df.gen[df.gen == "Baden-Württemberg (Bodensee)"] = "Baden-Württemberg"
284
    df.gen[df.gen == "Bayern (Bodensee)"] = "Bayern"
285
286
    return df.set_index("nuts")
287
288
289
def adjust_ind_pes(ec_cts_ind):
290
    """
291
    Adjust electricity demand of industrial consumers due to electrification
292
    of process heat based on assumptions of pypsa-eur-sec.
293
294
    Parameters
295
    ----------
296
    ec_cts_ind : pandas.DataFrame
297
        Industrial demand without additional electrification
298
299
    Returns
300
    -------
301
    ec_cts_ind : pandas.DataFrame
302
        Industrial demand with additional electrification
303
304
    """
305
306
    pes_path = (
307
        Path(".") / "data_bundle_powerd_data" / "pypsa_eur" / "resources"
308
    )
309
310
    sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
311
        "sources"
312
    ]["new_consumers_2050"]
313
314
    # Extract today's industrial demand from pypsa-eur-sec
315
    demand_today = pd.read_csv(
316
        pes_path / sources["pes-demand-today"],
317
        header=None,
318
    ).transpose()
319
320
    # Filter data
321
    demand_today[1].fillna("carrier", inplace=True)
322
    demand_today = demand_today[
323
        (demand_today[0] == "DE") | (demand_today[1] == "carrier")
324
    ].drop([0, 2], axis="columns")
325
326
    demand_today = (
327
        demand_today.transpose()
328
        .set_index(0)
329
        .transpose()
330
        .set_index("carrier")
331
        .transpose()
332
        .loc["electricity"]
333
        .astype(float)
334
    )
335
336
    # Calculate future industrial demand from pypsa-eur-sec
337
    # based on production and energy demands per carrier ('sector ratios')
338
    prod_tomorrow = pd.read_csv(pes_path / sources["pes-production-tomorrow"])
339
340
    prod_tomorrow = prod_tomorrow[prod_tomorrow["kton/a"] == "DE"].set_index(
341
        "kton/a"
342
    )
343
344
    sector_ratio = (
345
        pd.read_csv(pes_path / sources["pes-sector-ratios"])
346
        .set_index("MWh/tMaterial")
347
        .loc["elec"]
348
    )
349
350
    demand_tomorrow = prod_tomorrow.multiply(
351
        sector_ratio.div(1000)
352
    ).transpose()["DE"]
353
354
    # Calculate changes of electrical demand per sector in pypsa-eur-sec
355
    change = pd.DataFrame(
356
        (demand_tomorrow / demand_today)
357
        / (demand_tomorrow / demand_today).sum()
358
    )
359
360
    # Drop rows without changes
361
    change = change[~change[0].isnull()]
362
363
    # Map industrial branches of pypsa-eur-sec to WZ2008 used in demandregio
364
    change["wz"] = change.index.map(
365
        {
366
            "Alumina production": 24,
367
            "Aluminium - primary production": 24,
368
            "Aluminium - secondary production": 24,
369
            "Ammonia": 20,
370
            "Basic chemicals (without ammonia)": 20,
371
            "Cement": 23,
372
            "Ceramics & other NMM": 23,
373
            "Electric arc": 24,
374
            "Food, beverages and tobacco": 10,
375
            "Glass production": 23,
376
            "Integrated steelworks": 24,
377
            "Machinery Equipment": 28,
378
            "Other Industrial Sectors": 32,
379
            "Other chemicals": 20,
380
            "Other non-ferrous metals": 24,
381
            "Paper production": 17,
382
            "Pharmaceutical products etc.": 21,
383
            "Printing and media reproduction": 18,
384
            "Pulp production": 17,
385
            "Textiles and leather": 13,
386
            "Transport Equipment": 29,
387
            "Wood and wood products": 16,
388
        }
389
    )
390
391
    # Group by WZ2008
392
    shares_per_wz = change.groupby("wz")[0].sum()
393
394
    # Calculate addtional demands needed to meet future demand of pypsa-eur-sec
395
    addtional_mwh = shares_per_wz.multiply(
396
        demand_tomorrow.sum() * 1000000 - ec_cts_ind.sum().sum()
397
    )
398
399
    # Calulate overall industrial demand for eGon100RE
400
    final_mwh = addtional_mwh + ec_cts_ind[addtional_mwh.index].sum()
401
402
    # Linear scale the industrial demands per nuts3 and wz to meet final demand
403
    ec_cts_ind[addtional_mwh.index] *= (
404
        final_mwh / ec_cts_ind[addtional_mwh.index].sum()
405
    )
406
407
    return ec_cts_ind
408
409
410
def adjust_cts_ind_nep(ec_cts_ind, sector):
411
    """Add electrical demand of new largescale CTS und industrial consumers
412
    according to NEP 2021, scneario C 2035. Values per federal state are
413
    linear distributed over all CTS branches and nuts3 regions.
414
415
    Parameters
416
    ----------
417
    ec_cts_ind : pandas.DataFrame
418
        CTS or industry demand without new largescale consumers.
419
420
    Returns
421
    -------
422
    ec_cts_ind : pandas.DataFrame
423
        CTS or industry demand including new largescale consumers.
424
425
    """
426
    sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
427
        "sources"
428
    ]
429
430
    file_path = (
431
        Path(".")
432
        / "data_bundle_egon_data"
433
        / "nep2035_version2021"
434
        / sources["new_consumers_2035"]
435
    )
436
437
    # get data from NEP per federal state
438
    new_con = pd.read_csv(file_path, delimiter=";", decimal=",", index_col=0)
439
440
    # match nuts3 regions to federal states
441
    groups = ec_cts_ind.groupby(match_nuts3_bl().gen)
442
443
    # update demands per federal state
444
    for group in groups.indices.keys():
445
        g = groups.get_group(group)
446
        data_new = g.mul(1 + new_con[sector][group] * 1e6 / g.sum().sum())
447
        ec_cts_ind[ec_cts_ind.index.isin(g.index)] = data_new
448
449
    return ec_cts_ind
450
451
452
def disagg_households_power(scenario, year, original=False, **kwargs):
453
    """
454
    Perform spatial disaggregation of electric power in [GWh/a] by key
455
    Similar to disaggregator.spatial.disagg_households_power
456
457
458
    Parameters
459
    ----------
460
    by : str
461
        must be one of ['households', 'population']
462
    orignal : bool, optional
463
        Throughput to function households_per_size,
464
        A flag if the results should be left untouched and returned in
465
        original form for the year 2011 (True) or if they should be scaled to
466
        the given `year` by the population in that year (False).
467
468
    Returns
469
    -------
470
    pd.DataFrame or pd.Series
471
    """
472
    # source: survey of energieAgenturNRW
473
    # with/without direct water heating (DHW), and weighted average
474
    # https://1-stromvergleich.com/wp-content/uploads/erhebung_wo_bleibt_der_strom.pdf
475
    demand_per_hh_size = pd.DataFrame(
476
        index=range(1, 7),
477
        data={
478
            # "weighted DWH": [2290, 3202, 4193, 4955, 5928, 5928],
479
            # "without DHW": [1714, 2812, 3704, 4432, 5317, 5317],
480
            "with_DHW": [2181, 3843, 5151, 6189, 7494, 8465],
481
            "without_DHW": [1798, 2850, 3733, 4480, 5311, 5816],
482
            "weighted": [2256, 3248, 4246, 5009, 5969, 6579],
483
        },
484
    )
485
486
    if scenario == "eGon100RE":
487
        # chose demand per household size from survey without DHW
488
        power_per_HH = demand_per_hh_size["without_DHW"] / 1e3
489
490
        # calculate demand per nuts3 in 2011
491
        df_2011 = data.households_per_size(year=2011) * power_per_HH
492
493
        # scale demand per hh-size to meet demand without heat
494
        # according to JRC in 2011 (136.6-(20.14+9.41) TWh)
495
        power_per_HH *= (136.6 - (20.14 + 9.41)) * 1e6 / df_2011.sum().sum()
496
497
        # calculate demand per nuts3 in 2050
498
        df = data.households_per_size(year=year) * power_per_HH
499
500
    # Bottom-Up: Power demand by household sizes in [MWh/a] for each scenario
501
    else:
502
        # chose demand per household size from survey including weighted DHW
503
        power_per_HH = demand_per_hh_size["weighted"] / 1e3
504
505
        # calculate demand per nuts3
506
        df = (
507
            data.households_per_size(original=original, year=year)
508
            * power_per_HH
509
        )
510
511
    return df
512
513
514
def write_demandregio_hh_profiles_to_db(hh_profiles):
515
    """Write HH demand profiles from demand regio into db. One row per
516
    year and nuts3. The annual load profile timeseries is an array.
517
518
    schema: demand
519
    tablename: demandregio_household_load_profiles
520
521
522
523
    Parameters
524
    ----------
525
    hh_profiles: pd.DataFrame
526
527
    Returns
528
    -------
529
    """
530
    years = hh_profiles.index.year.unique().values
531
    df_to_db = pd.DataFrame(
532
        columns=["id", "year", "nuts3", "load_in_mwh"]
533
    ).set_index("id")
534
    dataset = egon.data.config.settings()["egon-data"]["--dataset-boundary"]
535
536
    if dataset == "Schleswig-Holstein":
537
        hh_profiles = hh_profiles.loc[
538
            :, hh_profiles.columns.str.contains("DEF0")
539
        ]
540
541
    idx = pd.read_sql_query(
542
        f"""
543
                           SELECT MAX(id)
544
                           FROM {DemandRegioLoadProfiles.__table__.schema}.
545
                           {DemandRegioLoadProfiles.__table__.name}
546
                           """,
547
        con=db.engine(),
548
    ).iat[0, 0]
549
550
    idx = 0 if idx is None else idx + 1
551
552
    for year in years:
553
        df = hh_profiles[hh_profiles.index.year == year]
554
555
        for nuts3 in hh_profiles.columns:
556
            idx += 1
557
            df_to_db.at[idx, "year"] = year
558
            df_to_db.at[idx, "nuts3"] = nuts3
559
            df_to_db.at[idx, "load_in_mwh"] = df[nuts3].to_list()
560
561
    df_to_db["year"] = df_to_db["year"].apply(int)
562
    df_to_db["nuts3"] = df_to_db["nuts3"].astype(str)
563
    df_to_db["load_in_mwh"] = df_to_db["load_in_mwh"].apply(list)
564
    df_to_db = df_to_db.reset_index()
565
566
    df_to_db.to_sql(
567
        name=DemandRegioLoadProfiles.__table__.name,
568
        schema=DemandRegioLoadProfiles.__table__.schema,
569
        con=db.engine(),
570
        if_exists="append",
571
        index=-False,
572
    )
573
574
575
def insert_hh_demand(scenario, year, engine):
576
    """Calculates electrical demands of private households using demandregio's
577
    disaggregator and insert results into the database.
578
579
    Parameters
580
    ----------
581
    scenario : str
582
        Name of the corresponding scenario.
583
    year : int
584
        The number of households per region is taken from this year.
585
586
    Returns
587
    -------
588
    None.
589
590
    """
591
    targets = egon.data.config.datasets()["demandregio_household_demand"][
592
        "targets"
593
    ]["household_demand"]
594
    # get spatial distribution of demands of private households per nuts
595
    # and size from demandregio
596
    ec_hh = disagg_households_power(scenario, year)
597
598
    # Scale to meet target value
599
    # For status2019 and eGon2021 the final demand from demandregio is kept
600
    if scenario not in ["status2019", "eGon2021"]:
601
        ec_hh *= (
602
            get_sector_parameters("electricity", scenario=scenario)[
603
                "annual_demand"
604
            ]["households"]
605
            * 1e6
606
            / ec_hh.sum().sum()
607
        )
608
609
    # Select demands for nuts3-regions in boundaries (needed for testmode)
610
    ec_hh = data_in_boundaries(ec_hh)
611
612
    # insert into database
613
    for hh_size in ec_hh.columns:
614
        df = pd.DataFrame(ec_hh[hh_size])
615
        df["year"] = (
616
            2023 if scenario == "status2023" else year
617
        )  # TODO status2023
618
        # adhoc fix until ffeopendata servers are up and population_year
619
        # can be set
620
621
        df["scenario"] = scenario
622
        df["hh_size"] = hh_size
623
        df = df.rename({hh_size: "demand"}, axis="columns")
624
        df.to_sql(
625
            targets["table"],
626
            engine,
627
            schema=targets["schema"],
628
            if_exists="append",
629
        )
630
631
    # insert housholds demand timeseries
632
    try:
633
        hh_load_timeseries = (
634
            temporal.disagg_temporal_power_housholds_slp(
635
                use_nuts3code=True,
636
                by="households",
637
                weight_by_income=False,
638
                year=year,
639
            )
640
            .resample("h")
641
            .sum()
642
        )
643
        hh_load_timeseries.rename(
644
            columns={"DEB16": "DEB1C", "DEB19": "DEB1D"}, inplace=True
645
        )
646
    except Exception as e:
647
        logger.warning(
648
            f"Couldnt get profiles from FFE, will use pickeld fallback! \n {e}"
649
        )
650
        hh_load_timeseries = pd.read_csv(
651
            "data_bundle_egon_data/demand_regio_backup/df_load_profiles.csv",
652
            index_col="time",
653
        )
654
        hh_load_timeseries.index = pd.to_datetime(
655
            hh_load_timeseries.index, format="%Y-%m-%d %H:%M:%S"
656
        )
657
658
        def change_year(dt, year):
659
            return dt.replace(year=year)
660
661
        year = 2023 if scenario == "status2023" else year  # TODO status2023
662
        hh_load_timeseries.index = hh_load_timeseries.index.map(
663
            lambda dt: change_year(dt, year)
0 ignored issues
show
introduced by
The variable change_year does not seem to be defined for all execution paths.
Loading history...
664
        )
665
666
        if scenario == "status2023":
667
            hh_load_timeseries = hh_load_timeseries.shift(24 * 2)
668
669
            hh_load_timeseries.iloc[: 24 * 7] = hh_load_timeseries.iloc[
670
                24 * 7 : 24 * 7 * 2
671
            ].values
672
673
    write_demandregio_hh_profiles_to_db(hh_load_timeseries)
674
675
676
def insert_cts_ind(scenario, year, engine, target_values):
677
    """Calculates electrical demands of CTS and industry using demandregio's
678
    disaggregator, adjusts them according to resulting values of NEP 2021 or
679
    JRC IDEES and insert results into the database.
680
681
    Parameters
682
    ----------
683
    scenario : str
684
        Name of the corresponing scenario.
685
    year : int
686
        The number of households per region is taken from this year.
687
    target_values : dict
688
        List of target values for each scenario and sector.
689
690
    Returns
691
    -------
692
    None.
693
694
    """
695
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
696
        "targets"
697
    ]
698
699
    wz_table = pd.read_sql(
700
        "SELECT wz, sector FROM demand.egon_demandregio_wz",
701
        con=engine,
702
        index_col="wz",
703
    )
704
705
    # Workaround: Since the disaggregator does not work anymore, data from
706
    # previous runs is used for eGon2035 and eGon100RE
707
    if scenario == "eGon2035":
708
        file2035_path = (
709
            Path(".")
710
            / "data_bundle_egon_data"
711
            / "demand_regio_backup"
712
            / "egon_demandregio_cts_ind_egon2035.csv"
713
        )
714
        ec_cts_ind2 = pd.read_csv(file2035_path)
715
        ec_cts_ind2.to_sql(
716
            targets["cts_ind_demand"]["table"],
717
            engine,
718
            targets["cts_ind_demand"]["schema"],
719
            if_exists="append",
720
            index=False,
721
        )
722
        return
723
724
    if scenario == "eGon100RE":
725
        ec_cts_ind2 = pd.read_csv(
726
            "data_bundle_egon_data/"
727
            "demand_regio_backup/egon_demandregio_cts_ind.csv"
728
        )
729
        ec_cts_ind2["sector"] = ec_cts_ind2["wz"].map(wz_table["sector"])
730
        factor_ind = target_values["industry"] / (
731
            ec_cts_ind2[ec_cts_ind2["sector"] == "industry"]["demand"].sum()
732
            / 1000
733
        )
734
        factor_cts = target_values["CTS"] / (
735
            ec_cts_ind2[ec_cts_ind2["sector"] == "CTS"]["demand"].sum() / 1000
736
        )
737
738
        ec_cts_ind2["demand"] = ec_cts_ind2.apply(
739
            lambda x: (
740
                x["demand"] * factor_ind
0 ignored issues
show
introduced by
The variable factor_ind does not seem to be defined for all execution paths.
Loading history...
741
                if x["sector"] == "industry"
742
                else x["demand"] * factor_cts
0 ignored issues
show
introduced by
The variable factor_cts does not seem to be defined for all execution paths.
Loading history...
743
            ),
744
            axis=1,
745
        )
746
747
        ec_cts_ind2.drop(columns=["sector"], inplace=True)
748
749
        ec_cts_ind2.to_sql(
750
            targets["cts_ind_demand"]["table"],
751
            engine,
752
            targets["cts_ind_demand"]["schema"],
753
            if_exists="append",
754
            index=False,
755
        )
756
        return
757
758
    for sector in ["CTS", "industry"]:
759
        # get demands per nuts3 and wz of demandregio
760
        ec_cts_ind = spatial.disagg_CTS_industry(
761
            use_nuts3code=True, source="power", sector=sector, year=year
762
        ).transpose()
763
764
        ec_cts_ind.index = ec_cts_ind.index.rename("nuts3")
765
766
        # exclude mobility sector from GHD
767
        ec_cts_ind = ec_cts_ind.drop(columns=49, errors="ignore")
768
769
        # scale values according to target_values
770
        if target_values:
771
            if sector in target_values.keys():
772
                ec_cts_ind *= (
773
                    target_values[sector] * 1e3 / ec_cts_ind.sum().sum()
774
                )
775
        else:
776
            print(
777
                f"No scaling factors for scenario {scenario}."
778
                "Data from demandregio is used without scaling."
779
            )
780
        # include new largescale consumers according to NEP 2021
781
        if scenario == "eGon2035":
782
            ec_cts_ind = adjust_cts_ind_nep(ec_cts_ind, sector)
783
        # include new industrial demands due to sector coupling
784
        if (scenario == "eGon100RE") & (sector == "industry"):
785
            ec_cts_ind = adjust_ind_pes(ec_cts_ind)
786
787
        # Select demands for nuts3-regions in boundaries (needed for testmode)
788
        ec_cts_ind = data_in_boundaries(ec_cts_ind)
789
790
        # insert into database
791
        for wz in ec_cts_ind.columns:
792
            df = pd.DataFrame(ec_cts_ind[wz])
793
            df["year"] = year
794
            df["wz"] = wz
795
            df["scenario"] = scenario
796
            df = df.rename({wz: "demand"}, axis="columns")
797
            df.index = df.index.rename("nuts3")
798
            df.to_sql(
799
                targets["cts_ind_demand"]["table"],
800
                engine,
801
                targets["cts_ind_demand"]["schema"],
802
                if_exists="append",
803
            )
804
805
806
def insert_household_demand():
807
    """Insert electrical demands for households according to
808
    demandregio using its disaggregator-tool in MWh
809
810
    Returns
811
    -------
812
    None.
813
814
    """
815
    targets = egon.data.config.datasets()["demandregio_household_demand"][
816
        "targets"
817
    ]
818
    engine = db.engine()
819
820
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
821
822
    scenarios.append("eGon2021")
823
824
    for t in targets:
825
        db.execute_sql(
826
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
827
        )
828
829
    for scn in scenarios:
830
        year = (
831
            2023
832
            if scn == "status2023"
833
            else scenario_parameters.global_settings(scn)["population_year"]
834
        )
835
836
        # Insert demands of private households
837
        insert_hh_demand(scn, year, engine)
838
839
840
def insert_cts_ind_demands():
841
    """Insert electricity demands per nuts3-region in Germany according to
842
    demandregio using its disaggregator-tool in MWh
843
844
    Returns
845
    -------
846
    None.
847
848
    """
849
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
850
        "targets"
851
    ]
852
    engine = db.engine()
853
854
    for t in targets:
855
        db.execute_sql(
856
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
857
        )
858
859
    insert_cts_ind_wz_definitions()
860
861
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
862
863
    scenarios.append("eGon2021")
864
865
    for scn in scenarios:
866
        year = scenario_parameters.global_settings(scn)["population_year"]
867
868
        if year > 2035:
869
            year = 2035
870
871
        # target values per scenario in MWh
872
        # for eGon2021 and status2019 demandregio-data is used without scaling
873
        if scn not in ["eGon2021", "status2019"]:
874
            target_values = {
875
                "CTS": get_sector_parameters("electricity", scenario=scn)[
876
                    "annual_demand"
877
                ]["CTS"],
878
                "industry": get_sector_parameters("electricity", scenario=scn)[
879
                    "annual_demand"
880
                ]["industry"],
881
            }
882
        else:
883
            target_values = None
884
885
        insert_cts_ind(scn, year, engine, target_values)
886
887
    # Insert load curves per wz
888
    timeseries_per_wz()
889
890
891
def insert_society_data():
892
    """Insert population and number of households per nuts3-region in Germany
893
    according to demandregio using its disaggregator-tool
894
895
    Returns
896
    -------
897
    None.
898
899
    """
900
    targets = egon.data.config.datasets()["demandregio_society"]["targets"]
901
    engine = db.engine()
902
903
    for t in targets:
904
        db.execute_sql(
905
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
906
        )
907
908
    target_years = np.append(
909
        get_sector_parameters("global").population_year.values, 2018
910
    )
911
912
    for year in target_years:
913
        df_pop = pd.DataFrame(data.population(year=year))
914
        df_pop["year"] = year
915
        df_pop = df_pop.rename({"value": "population"}, axis="columns")
916
        # Select data for nuts3-regions in boundaries (needed for testmode)
917
        df_pop = data_in_boundaries(df_pop)
918
        df_pop.to_sql(
919
            targets["population"]["table"],
920
            engine,
921
            schema=targets["population"]["schema"],
922
            if_exists="append",
923
        )
924
925
    for year in target_years:
926
        df_hh = pd.DataFrame(data.households_per_size(year=year))
927
        # Select data for nuts3-regions in boundaries (needed for testmode)
928
        df_hh = data_in_boundaries(df_hh)
929
        for hh_size in df_hh.columns:
930
            df = pd.DataFrame(df_hh[hh_size])
931
            df["year"] = year
932
            df["hh_size"] = hh_size
933
            df = df.rename({hh_size: "households"}, axis="columns")
934
            df.to_sql(
935
                targets["household"]["table"],
936
                engine,
937
                schema=targets["household"]["schema"],
938
                if_exists="append",
939
            )
940
941
942
def insert_timeseries_per_wz(sector, year):
943
    """Insert normalized electrical load time series for the selected sector
944
945
    Parameters
946
    ----------
947
    sector : str
948
        Name of the sector. ['CTS', 'industry']
949
    year : int
950
        Selected weather year
951
952
    Returns
953
    -------
954
    None.
955
956
    """
957
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
958
        "targets"
959
    ]
960
961
    if sector == "CTS":
962
        profiles = (
963
            data.CTS_power_slp_generator("SH", year=year)
964
            .drop(
965
                [
966
                    "Day",
967
                    "Hour",
968
                    "DayOfYear",
969
                    "WD",
970
                    "SA",
971
                    "SU",
972
                    "WIZ",
973
                    "SOZ",
974
                    "UEZ",
975
                ],
976
                axis="columns",
977
            )
978
            .resample("H")
979
            .sum()
980
        )
981
        wz_slp = config.slp_branch_cts_power()
982
    elif sector == "industry":
983
        profiles = (
984
            data.shift_load_profile_generator(state="SH", year=year)
985
            .resample("H")
986
            .sum()
987
        )
988
        wz_slp = config.shift_profile_industry()
989
990
    else:
991
        print(f"Sector {sector} is not valid.")
992
993
    df = pd.DataFrame(
994
        index=wz_slp.keys(), columns=["slp", "load_curve", "year"]
0 ignored issues
show
introduced by
The variable wz_slp does not seem to be defined for all execution paths.
Loading history...
995
    )
996
997
    df.index.rename("wz", inplace=True)
998
999
    df.slp = wz_slp.values()
1000
1001
    df.year = year
1002
1003
    df.load_curve = profiles[df.slp].transpose().values.tolist()
0 ignored issues
show
introduced by
The variable profiles does not seem to be defined for all execution paths.
Loading history...
1004
1005
    db.execute_sql(
1006
        f"""
1007
                   DELETE FROM {targets['timeseries_cts_ind']['schema']}.
1008
                   {targets['timeseries_cts_ind']['table']}
1009
                   WHERE wz IN (
1010
                       SELECT wz FROM {targets['wz_definitions']['schema']}.
1011
                       {targets['wz_definitions']['table']}
1012
                       WHERE sector = '{sector}')
1013
                   """
1014
    )
1015
1016
    df.to_sql(
1017
        targets["timeseries_cts_ind"]["table"],
1018
        schema=targets["timeseries_cts_ind"]["schema"],
1019
        con=db.engine(),
1020
        if_exists="append",
1021
    )
1022
1023
1024
def timeseries_per_wz():
1025
    """Calcultae and insert normalized timeseries per wz for cts and industry
1026
1027
    Returns
1028
    -------
1029
    None.
1030
1031
    """
1032
1033
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
1034
    year_already_in_database = []
1035
    for scn in scenarios:
1036
        year = int(scenario_parameters.global_settings(scn)["weather_year"])
1037
1038
        for sector in ["CTS", "industry"]:
1039
            if year not in year_already_in_database:
1040
                insert_timeseries_per_wz(sector, int(year))
1041
        year_already_in_database.append(year)
1042
1043
1044
def get_cached_tables():
1045
    """Get cached demandregio tables and db-dump from former runs"""
1046
    data_config = egon.data.config.datasets()
1047
    for s in ["cache", "dbdump"]:
1048
        source_path = data_config["demandregio_workaround"]["source"][s][
1049
            "path"
1050
        ]
1051
        target_path = Path(
1052
            ".", data_config["demandregio_workaround"]["targets"][s]["path"]
1053
        )
1054
        os.makedirs(target_path, exist_ok=True)
1055
1056
        with zipfile.ZipFile(source_path, "r") as zip_ref:
1057
            zip_ref.extractall(path=target_path)
1058