Passed
Pull Request — dev (#1375)
by
unknown
02:18
created

data.datasets.demandregio.insert_cts_ind()   D

Complexity

Conditions 11

Size

Total Lines 127
Code Lines 73

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 73
dl 0
loc 127
rs 4.6636
c 0
b 0
f 0
cc 11
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.datasets.demandregio.insert_cts_ind() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""The central module containing all code dealing with importing and
2
adjusting data from demandRegio
3
4
"""
5
6
from pathlib import Path
7
import os
8
import zipfile
9
10
from sqlalchemy import ARRAY, Column, Float, ForeignKey, Integer, String
11
from sqlalchemy.ext.declarative import declarative_base
12
import numpy as np
13
import pandas as pd
14
15
from egon.data import db, logger
16
from egon.data.datasets import Dataset
17
from egon.data.datasets.scenario_parameters import (
18
    EgonScenario,
19
    get_sector_parameters,
20
)
21
import egon.data.config
22
import egon.data.datasets.scenario_parameters.parameters as scenario_parameters
23
from egon_validation import (
24
    RowCountValidation,
25
    DataTypeValidation,
26
    WholeTableNotNullAndNotNaNValidation,
27
    ValueSetValidation,
28
    ArrayCardinalityValidation
29
)
30
31
try:
32
    from disaggregator import config, data, spatial, temporal
33
34
except ImportError:
35
    pass
36
37
# will be later imported from another file ###
38
Base = declarative_base()
39
40
41
class DemandRegio(Dataset):
42
    """
43
    Extract and adjust data from DemandRegio
44
45
    Demand data for the sectors households, CTS and industry are calculated
46
    using DemandRegio's diaggregator and input data. To bring the resulting
47
    data in line with other data used in eGon-data and the eGon project in
48
    general some data needed to be adjusted or extended, e.g. in function
49
    :py:func:`adjust_ind_pes` or function :py:func:`adjust_cts_ind_nep`. The
50
    resulting data is written into newly created tables.
51
52
    *Dependencies*
53
      * :py:class:`DataBundle <egon.data.datasets.data_bundle.DataBundle>`
54
      * :py:class:`ScenarioParameters
55
      <egon.data.datasets.scenario_parameters.ScenarioParameters>`
56
      * :py:class:`ZensusVg250 <egon.data.datasets.zensus_vg250.ZensusVg250>`
57
58
    *Resulting tables*
59
      * :py:class:`demand.egon_demandregio_hh
60
      <egon.data.datasets.demandregio.EgonDemandRegioHH>` is created and filled
61
      * :py:class:`demand.egon_demandregio_cts_ind
62
      <egon.data.datasets.demandregio.EgonDemandRegioCtsInd>`
63
      is created and filled
64
      * :py:class:`society.egon_demandregio_population
65
      <egon.data.datasets.demandregio.EgonDemandRegioPopulation>`
66
      is created and filled
67
      * :py:class:`society.egon_demandregio_household
68
      <egon.data.datasets.demandregio.EgonDemandRegioHouseholds>`
69
      is created and filled
70
      * :py:class:`demand.egon_demandregio_wz
71
      <egon.data.datasets.demandregio.EgonDemandRegioWz>` is created and filled
72
      * :py:class:`demand.egon_demandregio_timeseries_cts_ind
73
      <egon.data.datasets.demandregio.EgonDemandRegioTimeseriesCtsInd>`
74
      is created and filled
75
76
    """
77
78
    #:
79
    name: str = "DemandRegio"
80
    #:
81
    version: str = "0.0.13"
82
83
    def __init__(self, dependencies):
84
        super().__init__(
85
            name=self.name,
86
            version=self.version,
87
            dependencies=dependencies,
88
            tasks=(
89
                get_cached_tables,  # adhoc workaround #180
90
                create_tables,
91
                {
92
                    insert_household_demand,
93
                    insert_society_data,
94
                    insert_cts_ind_demands,
95
                },
96
            ),
97
            validation={
98
                "data_quality": [
99
                    RowCountValidation(
100
                        table=" demand.egon_demandregio_hh",
101
                        rule_id="ROW_COUNT.egon_demandregio_hh",
102
                        expected_count={"Schleswig-Holstein": 180, "everything": 7218}
103
                    ),
104
                    DataTypeValidation(
105
                        table="demand.egon_demandregio_hh",
106
                        rule_id="DATA_MULTIPLE_TYPES.egon_demandregio_hh",
107
                        column_types={"nuts3": "character varying", "hh_size": "integer", "scenario": "character varying",
108
                                      "year": "integer", "demand": "double precision"}
109
                    ),
110
                    WholeTableNotNullAndNotNaNValidation(
111
                        table="demand.egon_demandregio_hh",
112
                        rule_id="WHOLE_TABLE_NOT_NAN.egon_demandregio_hh"
113
                    ),
114
                    ValueSetValidation(
115
                        table="demand.egon_demandregio_hh",
116
                        rule_id="VALUE_SET_VALIDATION_SCENARIO.egon_demandregio_hh",
117
                        column="scenario",
118
                        expected_values=["eGon2035", "eGon100RE", "eGon2021"]
119
                    ),
120
                    RowCountValidation(
121
                        table=" demand.egon_demandregio_wz",
122
                        rule_id="ROW_COUNT.egon_demandregio_wz",
123
                        expected_count=87
124
                    ),
125
                    DataTypeValidation(
126
                        table="demand.egon_demandregio_wz",
127
                        rule_id="DATA_MULTIPLE_TYPES.egon_demandregio_wz",
128
                        column_types={"wz": "integer", "sector": "character varying", "definition": "character varying"}
129
                    ),
130
                    WholeTableNotNullAndNotNaNValidation(
131
                        table="demand.egon_demandregio_wz",
132
                        rule_id="WHOLE_TABLE_NOT_NAN.egon_demandregio_wz"
133
                    ),
134
                    ValueSetValidation(
135
                        table="demand.egon_demandregio_wz",
136
                        rule_id="VALUE_SET_VALIDATION_SECTOR.egon_demandregio_wz",
137
                        column="sector",
138
                        expected_values=["industry", "CTS"]
139
                    ),
140
                    ArrayCardinalityValidation(
141
                        table="demand.egon_demandregio_sites_ind_electricity_dsm_timeseries",
142
                        array_column="load_curve",
143
                        expected_length=8760,
144
                    )
145
                ]
146
            },
147
            on_validation_failure="continue"
148
        )
149
150
151
class DemandRegioLoadProfiles(Base):
152
    __tablename__ = "demandregio_household_load_profiles"
153
    __table_args__ = {"schema": "demand"}
154
155
    id = Column(Integer, primary_key=True)
156
    year = Column(Integer)
157
    nuts3 = Column(String)
158
    load_in_mwh = Column(ARRAY(Float()))
159
160
161
class EgonDemandRegioHH(Base):
162
    __tablename__ = "egon_demandregio_hh"
163
    __table_args__ = {"schema": "demand"}
164
    nuts3 = Column(String(5), primary_key=True)
165
    hh_size = Column(Integer, primary_key=True)
166
    scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
167
    year = Column(Integer)
168
    demand = Column(Float)
169
170
171
class EgonDemandRegioCtsInd(Base):
172
    __tablename__ = "egon_demandregio_cts_ind"
173
    __table_args__ = {"schema": "demand"}
174
    nuts3 = Column(String(5), primary_key=True)
175
    wz = Column(Integer, primary_key=True)
176
    scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
177
    year = Column(Integer)
178
    demand = Column(Float)
179
180
181
class EgonDemandRegioPopulation(Base):
182
    __tablename__ = "egon_demandregio_population"
183
    __table_args__ = {"schema": "society"}
184
    nuts3 = Column(String(5), primary_key=True)
185
    year = Column(Integer, primary_key=True)
186
    population = Column(Float)
187
188
189
class EgonDemandRegioHouseholds(Base):
190
    __tablename__ = "egon_demandregio_household"
191
    __table_args__ = {"schema": "society"}
192
    nuts3 = Column(String(5), primary_key=True)
193
    hh_size = Column(Integer, primary_key=True)
194
    year = Column(Integer, primary_key=True)
195
    households = Column(Integer)
196
197
198
class EgonDemandRegioWz(Base):
199
    __tablename__ = "egon_demandregio_wz"
200
    __table_args__ = {"schema": "demand"}
201
    wz = Column(Integer, primary_key=True)
202
    sector = Column(String(50))
203
    definition = Column(String(150))
204
205
206
class EgonDemandRegioTimeseriesCtsInd(Base):
207
    __tablename__ = "egon_demandregio_timeseries_cts_ind"
208
    __table_args__ = {"schema": "demand"}
209
    wz = Column(Integer, primary_key=True)
210
    year = Column(Integer, primary_key=True)
211
    slp = Column(String(50))
212
    load_curve = Column(ARRAY(Float()))
213
214
215
def create_tables():
216
    """Create tables for demandregio data
217
    Returns
218
    -------
219
    None.
220
    """
221
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;")
222
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;")
223
    engine = db.engine()
224
    EgonDemandRegioHH.__table__.create(bind=engine, checkfirst=True)
225
    EgonDemandRegioCtsInd.__table__.create(bind=engine, checkfirst=True)
226
    EgonDemandRegioPopulation.__table__.create(bind=engine, checkfirst=True)
227
    EgonDemandRegioHouseholds.__table__.create(bind=engine, checkfirst=True)
228
    EgonDemandRegioWz.__table__.create(bind=engine, checkfirst=True)
229
    DemandRegioLoadProfiles.__table__.create(bind=db.engine(), checkfirst=True)
230
    EgonDemandRegioTimeseriesCtsInd.__table__.drop(
231
        bind=engine, checkfirst=True
232
    )
233
    EgonDemandRegioTimeseriesCtsInd.__table__.create(
234
        bind=engine, checkfirst=True
235
    )
236
237
238
def data_in_boundaries(df):
239
    """Select rows with nuts3 code within boundaries, used for testmode
240
241
    Parameters
242
    ----------
243
    df : pandas.DataFrame
244
        Data for all nuts3 regions
245
246
    Returns
247
    -------
248
    pandas.DataFrame
249
        Data for nuts3 regions within boundaries
250
251
    """
252
    engine = db.engine()
253
254
    df = df.reset_index()
255
256
    # Change nuts3 region names to 2016 version
257
    nuts_names = {"DEB16": "DEB1C", "DEB19": "DEB1D"}
258
    df.loc[df.nuts3.isin(nuts_names), "nuts3"] = df.loc[
259
        df.nuts3.isin(nuts_names), "nuts3"
260
    ].map(nuts_names)
261
262
    df = df.set_index("nuts3")
263
264
    return df[
265
        df.index.isin(
266
            pd.read_sql(
267
                "SELECT DISTINCT ON (nuts) nuts FROM boundaries.vg250_krs",
268
                engine,
269
            ).nuts
270
        )
271
    ]
272
273
274
def insert_cts_ind_wz_definitions():
275
    """Insert demandregio's definitions of CTS and industrial branches
276
277
    Returns
278
    -------
279
    None.
280
281
    """
282
283
    source = egon.data.config.datasets()["demandregio_cts_ind_demand"][
284
        "sources"
285
    ]
286
287
    target = egon.data.config.datasets()["demandregio_cts_ind_demand"][
288
        "targets"
289
    ]["wz_definitions"]
290
291
    engine = db.engine()
292
293
    for sector in source["wz_definitions"]:
294
        file_path = (
295
            Path(".")
296
            / "data_bundle_egon_data"
297
            / "WZ_definition"
298
            / source["wz_definitions"][sector]
299
        )
300
301
        if sector == "CTS":
302
            delimiter = ";"
303
        else:
304
            delimiter = ","
305
        df = (
306
            pd.read_csv(file_path, delimiter=delimiter, header=None)
307
            .rename({0: "wz", 1: "definition"}, axis="columns")
308
            .set_index("wz")
309
        )
310
        df["sector"] = sector
311
        df.to_sql(
312
            target["table"],
313
            engine,
314
            schema=target["schema"],
315
            if_exists="append",
316
        )
317
318
319
def match_nuts3_bl():
320
    """Function that maps the federal state to each nuts3 region
321
322
    Returns
323
    -------
324
    df : pandas.DataFrame
325
        List of nuts3 regions and the federal state of Germany.
326
327
    """
328
329
    engine = db.engine()
330
331
    df = pd.read_sql(
332
        "SELECT DISTINCT ON (boundaries.vg250_krs.nuts) "
333
        "boundaries.vg250_krs.nuts, boundaries.vg250_lan.gen "
334
        "FROM boundaries.vg250_lan, boundaries.vg250_krs "
335
        " WHERE ST_CONTAINS("
336
        "boundaries.vg250_lan.geometry, "
337
        "boundaries.vg250_krs.geometry)",
338
        con=engine,
339
    )
340
341
    df.gen[df.gen == "Baden-Württemberg (Bodensee)"] = "Baden-Württemberg"
342
    df.gen[df.gen == "Bayern (Bodensee)"] = "Bayern"
343
344
    return df.set_index("nuts")
345
346
347
def adjust_ind_pes(ec_cts_ind):
348
    """
349
    Adjust electricity demand of industrial consumers due to electrification
350
    of process heat based on assumptions of pypsa-eur-sec.
351
352
    Parameters
353
    ----------
354
    ec_cts_ind : pandas.DataFrame
355
        Industrial demand without additional electrification
356
357
    Returns
358
    -------
359
    ec_cts_ind : pandas.DataFrame
360
        Industrial demand with additional electrification
361
362
    """
363
364
    pes_path = (
365
        Path(".") / "data_bundle_powerd_data" / "pypsa_eur" / "resources"
366
    )
367
368
    sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
369
        "sources"
370
    ]["new_consumers_2050"]
371
372
    # Extract today's industrial demand from pypsa-eur-sec
373
    demand_today = pd.read_csv(
374
        pes_path / sources["pes-demand-today"],
375
        header=None,
376
    ).transpose()
377
378
    # Filter data
379
    demand_today[1].fillna("carrier", inplace=True)
380
    demand_today = demand_today[
381
        (demand_today[0] == "DE") | (demand_today[1] == "carrier")
382
    ].drop([0, 2], axis="columns")
383
384
    demand_today = (
385
        demand_today.transpose()
386
        .set_index(0)
387
        .transpose()
388
        .set_index("carrier")
389
        .transpose()
390
        .loc["electricity"]
391
        .astype(float)
392
    )
393
394
    # Calculate future industrial demand from pypsa-eur-sec
395
    # based on production and energy demands per carrier ('sector ratios')
396
    prod_tomorrow = pd.read_csv(pes_path / sources["pes-production-tomorrow"])
397
398
    prod_tomorrow = prod_tomorrow[prod_tomorrow["kton/a"] == "DE"].set_index(
399
        "kton/a"
400
    )
401
402
    sector_ratio = (
403
        pd.read_csv(pes_path / sources["pes-sector-ratios"])
404
        .set_index("MWh/tMaterial")
405
        .loc["elec"]
406
    )
407
408
    demand_tomorrow = prod_tomorrow.multiply(
409
        sector_ratio.div(1000)
410
    ).transpose()["DE"]
411
412
    # Calculate changes of electrical demand per sector in pypsa-eur-sec
413
    change = pd.DataFrame(
414
        (demand_tomorrow / demand_today)
415
        / (demand_tomorrow / demand_today).sum()
416
    )
417
418
    # Drop rows without changes
419
    change = change[~change[0].isnull()]
420
421
    # Map industrial branches of pypsa-eur-sec to WZ2008 used in demandregio
422
    change["wz"] = change.index.map(
423
        {
424
            "Alumina production": 24,
425
            "Aluminium - primary production": 24,
426
            "Aluminium - secondary production": 24,
427
            "Ammonia": 20,
428
            "Basic chemicals (without ammonia)": 20,
429
            "Cement": 23,
430
            "Ceramics & other NMM": 23,
431
            "Electric arc": 24,
432
            "Food, beverages and tobacco": 10,
433
            "Glass production": 23,
434
            "Integrated steelworks": 24,
435
            "Machinery Equipment": 28,
436
            "Other Industrial Sectors": 32,
437
            "Other chemicals": 20,
438
            "Other non-ferrous metals": 24,
439
            "Paper production": 17,
440
            "Pharmaceutical products etc.": 21,
441
            "Printing and media reproduction": 18,
442
            "Pulp production": 17,
443
            "Textiles and leather": 13,
444
            "Transport Equipment": 29,
445
            "Wood and wood products": 16,
446
        }
447
    )
448
449
    # Group by WZ2008
450
    shares_per_wz = change.groupby("wz")[0].sum()
451
452
    # Calculate addtional demands needed to meet future demand of pypsa-eur-sec
453
    addtional_mwh = shares_per_wz.multiply(
454
        demand_tomorrow.sum() * 1000000 - ec_cts_ind.sum().sum()
455
    )
456
457
    # Calulate overall industrial demand for eGon100RE
458
    final_mwh = addtional_mwh + ec_cts_ind[addtional_mwh.index].sum()
459
460
    # Linear scale the industrial demands per nuts3 and wz to meet final demand
461
    ec_cts_ind[addtional_mwh.index] *= (
462
        final_mwh / ec_cts_ind[addtional_mwh.index].sum()
463
    )
464
465
    return ec_cts_ind
466
467
468
def adjust_cts_ind_nep(ec_cts_ind, sector):
469
    """Add electrical demand of new largescale CTS und industrial consumers
470
    according to NEP 2021, scneario C 2035. Values per federal state are
471
    linear distributed over all CTS branches and nuts3 regions.
472
473
    Parameters
474
    ----------
475
    ec_cts_ind : pandas.DataFrame
476
        CTS or industry demand without new largescale consumers.
477
478
    Returns
479
    -------
480
    ec_cts_ind : pandas.DataFrame
481
        CTS or industry demand including new largescale consumers.
482
483
    """
484
    sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
485
        "sources"
486
    ]
487
488
    file_path = (
489
        Path(".")
490
        / "data_bundle_egon_data"
491
        / "nep2035_version2021"
492
        / sources["new_consumers_2035"]
493
    )
494
495
    # get data from NEP per federal state
496
    new_con = pd.read_csv(file_path, delimiter=";", decimal=",", index_col=0)
497
498
    # match nuts3 regions to federal states
499
    groups = ec_cts_ind.groupby(match_nuts3_bl().gen)
500
501
    # update demands per federal state
502
    for group in groups.indices.keys():
503
        g = groups.get_group(group)
504
        data_new = g.mul(1 + new_con[sector][group] * 1e6 / g.sum().sum())
505
        ec_cts_ind[ec_cts_ind.index.isin(g.index)] = data_new
506
507
    return ec_cts_ind
508
509
510
def disagg_households_power(scenario, year, original=False, **kwargs):
511
    """
512
    Perform spatial disaggregation of electric power in [GWh/a] by key
513
    Similar to disaggregator.spatial.disagg_households_power
514
515
516
    Parameters
517
    ----------
518
    by : str
519
        must be one of ['households', 'population']
520
    orignal : bool, optional
521
        Throughput to function households_per_size,
522
        A flag if the results should be left untouched and returned in
523
        original form for the year 2011 (True) or if they should be scaled to
524
        the given `year` by the population in that year (False).
525
526
    Returns
527
    -------
528
    pd.DataFrame or pd.Series
529
    """
530
    # source: survey of energieAgenturNRW
531
    # with/without direct water heating (DHW), and weighted average
532
    # https://1-stromvergleich.com/wp-content/uploads/erhebung_wo_bleibt_der_strom.pdf
533
    demand_per_hh_size = pd.DataFrame(
534
        index=range(1, 7),
535
        data={
536
            # "weighted DWH": [2290, 3202, 4193, 4955, 5928, 5928],
537
            # "without DHW": [1714, 2812, 3704, 4432, 5317, 5317],
538
            "with_DHW": [2181, 3843, 5151, 6189, 7494, 8465],
539
            "without_DHW": [1798, 2850, 3733, 4480, 5311, 5816],
540
            "weighted": [2256, 3248, 4246, 5009, 5969, 6579],
541
        },
542
    )
543
544
    if scenario == "eGon100RE":
545
        # chose demand per household size from survey without DHW
546
        power_per_HH = demand_per_hh_size["without_DHW"] / 1e3
547
548
        # calculate demand per nuts3 in 2011
549
        df_2011 = data.households_per_size(year=2011) * power_per_HH
550
551
        # scale demand per hh-size to meet demand without heat
552
        # according to JRC in 2011 (136.6-(20.14+9.41) TWh)
553
        power_per_HH *= (136.6 - (20.14 + 9.41)) * 1e6 / df_2011.sum().sum()
554
555
        # calculate demand per nuts3 in 2050
556
        df = data.households_per_size(year=year) * power_per_HH
557
558
    # Bottom-Up: Power demand by household sizes in [MWh/a] for each scenario
559
    else:
560
        # chose demand per household size from survey including weighted DHW
561
        power_per_HH = demand_per_hh_size["weighted"] / 1e3
562
563
        # calculate demand per nuts3
564
        df = (
565
            data.households_per_size(original=original, year=year)
566
            * power_per_HH
567
        )
568
569
    return df
570
571
572
def write_demandregio_hh_profiles_to_db(hh_profiles):
573
    """Write HH demand profiles from demand regio into db. One row per
574
    year and nuts3. The annual load profile timeseries is an array.
575
576
    schema: demand
577
    tablename: demandregio_household_load_profiles
578
579
580
581
    Parameters
582
    ----------
583
    hh_profiles: pd.DataFrame
584
585
    Returns
586
    -------
587
    """
588
    years = hh_profiles.index.year.unique().values
589
    df_to_db = pd.DataFrame(
590
        columns=["id", "year", "nuts3", "load_in_mwh"]
591
    ).set_index("id")
592
    dataset = egon.data.config.settings()["egon-data"]["--dataset-boundary"]
593
594
    if dataset == "Schleswig-Holstein":
595
        hh_profiles = hh_profiles.loc[
596
            :, hh_profiles.columns.str.contains("DEF0")
597
        ]
598
599
    idx = pd.read_sql_query(
600
        f"""
601
                           SELECT MAX(id)
602
                           FROM {DemandRegioLoadProfiles.__table__.schema}.
603
                           {DemandRegioLoadProfiles.__table__.name}
604
                           """,
605
        con=db.engine(),
606
    ).iat[0, 0]
607
608
    idx = 0 if idx is None else idx + 1
609
610
    for year in years:
611
        df = hh_profiles[hh_profiles.index.year == year]
612
613
        for nuts3 in hh_profiles.columns:
614
            idx += 1
615
            df_to_db.at[idx, "year"] = year
616
            df_to_db.at[idx, "nuts3"] = nuts3
617
            df_to_db.at[idx, "load_in_mwh"] = df[nuts3].to_list()
618
619
    df_to_db["year"] = df_to_db["year"].apply(int)
620
    df_to_db["nuts3"] = df_to_db["nuts3"].astype(str)
621
    df_to_db["load_in_mwh"] = df_to_db["load_in_mwh"].apply(list)
622
    df_to_db = df_to_db.reset_index()
623
624
    df_to_db.to_sql(
625
        name=DemandRegioLoadProfiles.__table__.name,
626
        schema=DemandRegioLoadProfiles.__table__.schema,
627
        con=db.engine(),
628
        if_exists="append",
629
        index=-False,
630
    )
631
632
633
def insert_hh_demand(scenario, year, engine):
634
    """Calculates electrical demands of private households using demandregio's
635
    disaggregator and insert results into the database.
636
637
    Parameters
638
    ----------
639
    scenario : str
640
        Name of the corresponding scenario.
641
    year : int
642
        The number of households per region is taken from this year.
643
644
    Returns
645
    -------
646
    None.
647
648
    """
649
    targets = egon.data.config.datasets()["demandregio_household_demand"][
650
        "targets"
651
    ]["household_demand"]
652
    # get spatial distribution of demands of private households per nuts
653
    # and size from demandregio
654
    ec_hh = disagg_households_power(scenario, year)
655
656
    # Scale to meet target value
657
    # For status2019 and eGon2021 the final demand from demandregio is kept
658
    if scenario not in ["status2019", "eGon2021"]:
659
        ec_hh *= (
660
            get_sector_parameters("electricity", scenario=scenario)[
661
                "annual_demand"
662
            ]["households"]
663
            * 1e6
664
            / ec_hh.sum().sum()
665
        )
666
667
    # Select demands for nuts3-regions in boundaries (needed for testmode)
668
    ec_hh = data_in_boundaries(ec_hh)
669
670
    # insert into database
671
    for hh_size in ec_hh.columns:
672
        df = pd.DataFrame(ec_hh[hh_size])
673
        df["year"] = (
674
            2023 if scenario == "status2023" else year
675
        )  # TODO status2023
676
        # adhoc fix until ffeopendata servers are up and population_year
677
        # can be set
678
679
        df["scenario"] = scenario
680
        df["hh_size"] = hh_size
681
        df = df.rename({hh_size: "demand"}, axis="columns")
682
        df.to_sql(
683
            targets["table"],
684
            engine,
685
            schema=targets["schema"],
686
            if_exists="append",
687
        )
688
689
    # insert housholds demand timeseries
690
    try:
691
        hh_load_timeseries = (
692
            temporal.disagg_temporal_power_housholds_slp(
693
                use_nuts3code=True,
694
                by="households",
695
                weight_by_income=False,
696
                year=year,
697
            )
698
            .resample("h")
699
            .sum()
700
        )
701
        hh_load_timeseries.rename(
702
            columns={"DEB16": "DEB1C", "DEB19": "DEB1D"}, inplace=True
703
        )
704
    except Exception as e:
705
        logger.warning(
706
            f"Couldnt get profiles from FFE, will use pickeld fallback! \n {e}"
707
        )
708
        hh_load_timeseries = pd.read_csv(
709
            "data_bundle_egon_data/demand_regio_backup/df_load_profiles.csv",
710
            index_col="time",
711
        )
712
        hh_load_timeseries.index = pd.to_datetime(
713
            hh_load_timeseries.index, format="%Y-%m-%d %H:%M:%S"
714
        )
715
716
        def change_year(dt, year):
717
            return dt.replace(year=year)
718
719
        year = 2023 if scenario == "status2023" else year  # TODO status2023
720
        hh_load_timeseries.index = hh_load_timeseries.index.map(
721
            lambda dt: change_year(dt, year)
0 ignored issues
show
introduced by
The variable change_year does not seem to be defined for all execution paths.
Loading history...
722
        )
723
724
        if scenario == "status2023":
725
            hh_load_timeseries = hh_load_timeseries.shift(24 * 2)
726
727
            hh_load_timeseries.iloc[: 24 * 7] = hh_load_timeseries.iloc[
728
                24 * 7 : 24 * 7 * 2
729
            ].values
730
731
    write_demandregio_hh_profiles_to_db(hh_load_timeseries)
732
733
734
def insert_cts_ind(scenario, year, engine, target_values):
735
    """Calculates electrical demands of CTS and industry using demandregio's
736
    disaggregator, adjusts them according to resulting values of NEP 2021 or
737
    JRC IDEES and insert results into the database.
738
739
    Parameters
740
    ----------
741
    scenario : str
742
        Name of the corresponing scenario.
743
    year : int
744
        The number of households per region is taken from this year.
745
    target_values : dict
746
        List of target values for each scenario and sector.
747
748
    Returns
749
    -------
750
    None.
751
752
    """
753
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
754
        "targets"
755
    ]
756
757
    wz_table = pd.read_sql(
758
        "SELECT wz, sector FROM demand.egon_demandregio_wz",
759
        con=engine,
760
        index_col="wz",
761
    )
762
763
    # Workaround: Since the disaggregator does not work anymore, data from
764
    # previous runs is used for eGon2035 and eGon100RE
765
    if scenario == "eGon2035":
766
        file2035_path = (
767
            Path(".")
768
            / "data_bundle_egon_data"
769
            / "demand_regio_backup"
770
            / "egon_demandregio_cts_ind_egon2035.csv"
771
        )
772
        ec_cts_ind2 = pd.read_csv(file2035_path)
773
        ec_cts_ind2.to_sql(
774
            targets["cts_ind_demand"]["table"],
775
            engine,
776
            targets["cts_ind_demand"]["schema"],
777
            if_exists="append",
778
            index=False,
779
        )
780
        return
781
782
    if scenario == "eGon100RE":
783
        ec_cts_ind2 = pd.read_csv(
784
            "data_bundle_egon_data/"
785
            "demand_regio_backup/egon_demandregio_cts_ind.csv"
786
        )
787
        ec_cts_ind2["sector"] = ec_cts_ind2["wz"].map(wz_table["sector"])
788
        factor_ind = target_values["industry"] / (
789
            ec_cts_ind2[ec_cts_ind2["sector"] == "industry"]["demand"].sum()
790
            / 1000
791
        )
792
        factor_cts = target_values["CTS"] / (
793
            ec_cts_ind2[ec_cts_ind2["sector"] == "CTS"]["demand"].sum() / 1000
794
        )
795
796
        ec_cts_ind2["demand"] = ec_cts_ind2.apply(
797
            lambda x: (
798
                x["demand"] * factor_ind
0 ignored issues
show
introduced by
The variable factor_ind does not seem to be defined for all execution paths.
Loading history...
799
                if x["sector"] == "industry"
800
                else x["demand"] * factor_cts
0 ignored issues
show
introduced by
The variable factor_cts does not seem to be defined for all execution paths.
Loading history...
801
            ),
802
            axis=1,
803
        )
804
805
        ec_cts_ind2.drop(columns=["sector"], inplace=True)
806
807
        ec_cts_ind2.to_sql(
808
            targets["cts_ind_demand"]["table"],
809
            engine,
810
            targets["cts_ind_demand"]["schema"],
811
            if_exists="append",
812
            index=False,
813
        )
814
        return
815
816
    for sector in ["CTS", "industry"]:
817
        # get demands per nuts3 and wz of demandregio
818
        ec_cts_ind = spatial.disagg_CTS_industry(
819
            use_nuts3code=True, source="power", sector=sector, year=year
820
        ).transpose()
821
822
        ec_cts_ind.index = ec_cts_ind.index.rename("nuts3")
823
824
        # exclude mobility sector from GHD
825
        ec_cts_ind = ec_cts_ind.drop(columns=49, errors="ignore")
826
827
        # scale values according to target_values
828
        if target_values:
829
            if sector in target_values.keys():
830
                ec_cts_ind *= (
831
                    target_values[sector] * 1e3 / ec_cts_ind.sum().sum()
832
                )
833
        else:
834
            print(
835
                f"No scaling factors for scenario {scenario}."
836
                "Data from demandregio is used without scaling."
837
            )
838
        # include new largescale consumers according to NEP 2021
839
        if scenario == "eGon2035":
840
            ec_cts_ind = adjust_cts_ind_nep(ec_cts_ind, sector)
841
        # include new industrial demands due to sector coupling
842
        if (scenario == "eGon100RE") & (sector == "industry"):
843
            ec_cts_ind = adjust_ind_pes(ec_cts_ind)
844
845
        # Select demands for nuts3-regions in boundaries (needed for testmode)
846
        ec_cts_ind = data_in_boundaries(ec_cts_ind)
847
848
        # insert into database
849
        for wz in ec_cts_ind.columns:
850
            df = pd.DataFrame(ec_cts_ind[wz])
851
            df["year"] = year
852
            df["wz"] = wz
853
            df["scenario"] = scenario
854
            df = df.rename({wz: "demand"}, axis="columns")
855
            df.index = df.index.rename("nuts3")
856
            df.to_sql(
857
                targets["cts_ind_demand"]["table"],
858
                engine,
859
                targets["cts_ind_demand"]["schema"],
860
                if_exists="append",
861
            )
862
863
864
def insert_household_demand():
865
    """Insert electrical demands for households according to
866
    demandregio using its disaggregator-tool in MWh
867
868
    Returns
869
    -------
870
    None.
871
872
    """
873
    targets = egon.data.config.datasets()["demandregio_household_demand"][
874
        "targets"
875
    ]
876
    engine = db.engine()
877
878
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
879
880
    scenarios.append("eGon2021")
881
882
    for t in targets:
883
        db.execute_sql(
884
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
885
        )
886
887
    for scn in scenarios:
888
        year = (
889
            2023
890
            if scn == "status2023"
891
            else scenario_parameters.global_settings(scn)["population_year"]
892
        )
893
894
        # Insert demands of private households
895
        insert_hh_demand(scn, year, engine)
896
897
898
def insert_cts_ind_demands():
899
    """Insert electricity demands per nuts3-region in Germany according to
900
    demandregio using its disaggregator-tool in MWh
901
902
    Returns
903
    -------
904
    None.
905
906
    """
907
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
908
        "targets"
909
    ]
910
    engine = db.engine()
911
912
    for t in targets:
913
        db.execute_sql(
914
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
915
        )
916
917
    insert_cts_ind_wz_definitions()
918
919
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
920
921
    scenarios.append("eGon2021")
922
923
    for scn in scenarios:
924
        year = scenario_parameters.global_settings(scn)["population_year"]
925
926
        if year > 2035:
927
            year = 2035
928
929
        # target values per scenario in MWh
930
        # for eGon2021 and status2019 demandregio-data is used without scaling
931
        if scn not in ["eGon2021", "status2019"]:
932
            target_values = {
933
                "CTS": get_sector_parameters("electricity", scenario=scn)[
934
                    "annual_demand"
935
                ]["CTS"],
936
                "industry": get_sector_parameters("electricity", scenario=scn)[
937
                    "annual_demand"
938
                ]["industry"],
939
            }
940
        else:
941
            target_values = None
942
943
        insert_cts_ind(scn, year, engine, target_values)
944
945
    # Insert load curves per wz
946
    timeseries_per_wz()
947
948
949
def insert_society_data():
950
    """Insert population and number of households per nuts3-region in Germany
951
    according to demandregio using its disaggregator-tool
952
953
    Returns
954
    -------
955
    None.
956
957
    """
958
    targets = egon.data.config.datasets()["demandregio_society"]["targets"]
959
    engine = db.engine()
960
961
    for t in targets:
962
        db.execute_sql(
963
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
964
        )
965
966
    target_years = np.append(
967
        get_sector_parameters("global").population_year.values, 2018
968
    )
969
970
    for year in target_years:
971
        df_pop = pd.DataFrame(data.population(year=year))
972
        df_pop["year"] = year
973
        df_pop = df_pop.rename({"value": "population"}, axis="columns")
974
        # Select data for nuts3-regions in boundaries (needed for testmode)
975
        df_pop = data_in_boundaries(df_pop)
976
        df_pop.to_sql(
977
            targets["population"]["table"],
978
            engine,
979
            schema=targets["population"]["schema"],
980
            if_exists="append",
981
        )
982
983
    for year in target_years:
984
        df_hh = pd.DataFrame(data.households_per_size(year=year))
985
        # Select data for nuts3-regions in boundaries (needed for testmode)
986
        df_hh = data_in_boundaries(df_hh)
987
        for hh_size in df_hh.columns:
988
            df = pd.DataFrame(df_hh[hh_size])
989
            df["year"] = year
990
            df["hh_size"] = hh_size
991
            df = df.rename({hh_size: "households"}, axis="columns")
992
            df.to_sql(
993
                targets["household"]["table"],
994
                engine,
995
                schema=targets["household"]["schema"],
996
                if_exists="append",
997
            )
998
999
1000
def insert_timeseries_per_wz(sector, year):
1001
    """Insert normalized electrical load time series for the selected sector
1002
1003
    Parameters
1004
    ----------
1005
    sector : str
1006
        Name of the sector. ['CTS', 'industry']
1007
    year : int
1008
        Selected weather year
1009
1010
    Returns
1011
    -------
1012
    None.
1013
1014
    """
1015
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
1016
        "targets"
1017
    ]
1018
1019
    if sector == "CTS":
1020
        profiles = (
1021
            data.CTS_power_slp_generator("SH", year=year)
1022
            .drop(
1023
                [
1024
                    "Day",
1025
                    "Hour",
1026
                    "DayOfYear",
1027
                    "WD",
1028
                    "SA",
1029
                    "SU",
1030
                    "WIZ",
1031
                    "SOZ",
1032
                    "UEZ",
1033
                ],
1034
                axis="columns",
1035
            )
1036
            .resample("H")
1037
            .sum()
1038
        )
1039
        wz_slp = config.slp_branch_cts_power()
1040
    elif sector == "industry":
1041
        profiles = (
1042
            data.shift_load_profile_generator(state="SH", year=year)
1043
            .resample("H")
1044
            .sum()
1045
        )
1046
        wz_slp = config.shift_profile_industry()
1047
1048
    else:
1049
        print(f"Sector {sector} is not valid.")
1050
1051
    df = pd.DataFrame(
1052
        index=wz_slp.keys(), columns=["slp", "load_curve", "year"]
0 ignored issues
show
introduced by
The variable wz_slp does not seem to be defined for all execution paths.
Loading history...
1053
    )
1054
1055
    df.index.rename("wz", inplace=True)
1056
1057
    df.slp = wz_slp.values()
1058
1059
    df.year = year
1060
1061
    df.load_curve = profiles[df.slp].transpose().values.tolist()
0 ignored issues
show
introduced by
The variable profiles does not seem to be defined for all execution paths.
Loading history...
1062
1063
    db.execute_sql(
1064
        f"""
1065
                   DELETE FROM {targets['timeseries_cts_ind']['schema']}.
1066
                   {targets['timeseries_cts_ind']['table']}
1067
                   WHERE wz IN (
1068
                       SELECT wz FROM {targets['wz_definitions']['schema']}.
1069
                       {targets['wz_definitions']['table']}
1070
                       WHERE sector = '{sector}')
1071
                   """
1072
    )
1073
1074
    df.to_sql(
1075
        targets["timeseries_cts_ind"]["table"],
1076
        schema=targets["timeseries_cts_ind"]["schema"],
1077
        con=db.engine(),
1078
        if_exists="append",
1079
    )
1080
1081
1082
def timeseries_per_wz():
1083
    """Calcultae and insert normalized timeseries per wz for cts and industry
1084
1085
    Returns
1086
    -------
1087
    None.
1088
1089
    """
1090
1091
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
1092
    year_already_in_database = []
1093
    for scn in scenarios:
1094
        year = int(scenario_parameters.global_settings(scn)["weather_year"])
1095
1096
        for sector in ["CTS", "industry"]:
1097
            if year not in year_already_in_database:
1098
                insert_timeseries_per_wz(sector, int(year))
1099
        year_already_in_database.append(year)
1100
1101
1102
def get_cached_tables():
1103
    """Get cached demandregio tables and db-dump from former runs"""
1104
    data_config = egon.data.config.datasets()
1105
    for s in ["cache", "dbdump"]:
1106
        source_path = data_config["demandregio_workaround"]["source"][s][
1107
            "path"
1108
        ]
1109
        target_path = Path(
1110
            ".", data_config["demandregio_workaround"]["targets"][s]["path"]
1111
        )
1112
        os.makedirs(target_path, exist_ok=True)
1113
1114
        with zipfile.ZipFile(source_path, "r") as zip_ref:
1115
            zip_ref.extractall(path=target_path)
1116