Passed
Pull Request — dev (#1197)
by
unknown
05:18
created

data.datasets.demandregio   D

Complexity

Total Complexity 59

Size/Duplication

Total Lines 1001
Duplicated Lines 2.4 %

Importance

Changes 0
Metric Value
wmc 59
eloc 514
dl 24
loc 1001
rs 4.08
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A DemandRegio.__init__() 0 13 1

16 Functions

Rating   Name   Duplication   Size   Complexity  
A insert_cts_ind_wz_definitions() 0 42 3
A create_tables() 0 20 1
A data_in_boundaries() 0 32 1
B adjust_ind_pes() 0 119 1
B disagg_households_power() 0 90 6
A adjust_cts_ind_nep() 0 40 2
A match_nuts3_bl() 0 26 1
B write_demandregio_hh_profiles_to_db() 0 63 5
B insert_hh_demand() 0 71 7
C insert_cts_ind() 24 91 8
A insert_cts_ind_demands() 0 58 4
A insert_household_demand() 0 31 4
A timeseries_per_wz() 0 18 4
B insert_society_data() 0 48 5
B insert_timeseries_per_wz() 0 79 3
A get_cached_tables() 0 15 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like data.datasets.demandregio often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""The central module containing all code dealing with importing and
2
adjusting data from demandRegio
3
4
"""
5
6
from pathlib import Path
7
import os
8
import zipfile
9
10
from sqlalchemy import ARRAY, Column, Float, ForeignKey, Integer, String
11
from sqlalchemy.ext.declarative import declarative_base
12
import numpy as np
13
import pandas as pd
14
15
from egon.data import db, logger
16
from egon.data.datasets import Dataset, wrapped_partial
17
from egon.data.datasets.demandregio.install_disaggregator import (
18
    clone_and_install,
19
)
20
from egon.data.datasets.scenario_parameters import (
21
    EgonScenario,
22
    get_sector_parameters,
23
)
24
from egon.data.datasets.zensus import download_and_check
25
import egon.data.config
26
import egon.data.datasets.scenario_parameters.parameters as scenario_parameters
27
28
try:
29
    from disaggregator import config, data, spatial, temporal
30
31
except ImportError as e:
32
    pass
33
34
# will be later imported from another file ###
35
Base = declarative_base()
36
37
38
class DemandRegio(Dataset):
39
    def __init__(self, dependencies):
40
        super().__init__(
41
            name="DemandRegio",
42
            version="0.0.10",
43
            dependencies=dependencies,
44
            tasks=(
45
                clone_and_install, # demandregio must be previously installed
46
                get_cached_tables,  # adhoc workaround #180
47
                create_tables,
48
                {
49
                    insert_household_demand,
50
                    insert_society_data,
51
                    insert_cts_ind_demands,
52
                },
53
            ),
54
        )
55
56
57
class DemandRegioLoadProfiles(Base):
58
    __tablename__ = "demandregio_household_load_profiles"
59
    __table_args__ = {"schema": "demand"}
60
61
    id = Column(Integer, primary_key=True)
62
    year = Column(Integer)
63
    nuts3 = Column(String)
64
    load_in_mwh = Column(ARRAY(Float()))
65
66
67
class EgonDemandRegioHH(Base):
68
    __tablename__ = "egon_demandregio_hh"
69
    __table_args__ = {"schema": "demand"}
70
    nuts3 = Column(String(5), primary_key=True)
71
    hh_size = Column(Integer, primary_key=True)
72
    scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
73
    year = Column(Integer)
74
    demand = Column(Float)
75
76
77
class EgonDemandRegioCtsInd(Base):
78
    __tablename__ = "egon_demandregio_cts_ind"
79
    __table_args__ = {"schema": "demand"}
80
    nuts3 = Column(String(5), primary_key=True)
81
    wz = Column(Integer, primary_key=True)
82
    scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
83
    year = Column(Integer)
84
    demand = Column(Float)
85
86
87
class EgonDemandRegioPopulation(Base):
88
    __tablename__ = "egon_demandregio_population"
89
    __table_args__ = {"schema": "society"}
90
    nuts3 = Column(String(5), primary_key=True)
91
    year = Column(Integer, primary_key=True)
92
    population = Column(Float)
93
94
95
class EgonDemandRegioHouseholds(Base):
96
    __tablename__ = "egon_demandregio_household"
97
    __table_args__ = {"schema": "society"}
98
    nuts3 = Column(String(5), primary_key=True)
99
    hh_size = Column(Integer, primary_key=True)
100
    year = Column(Integer, primary_key=True)
101
    households = Column(Integer)
102
103
104
class EgonDemandRegioWz(Base):
105
    __tablename__ = "egon_demandregio_wz"
106
    __table_args__ = {"schema": "demand"}
107
    wz = Column(Integer, primary_key=True)
108
    sector = Column(String(50))
109
    definition = Column(String(150))
110
111
112
class EgonDemandRegioTimeseriesCtsInd(Base):
113
    __tablename__ = "egon_demandregio_timeseries_cts_ind"
114
    __table_args__ = {"schema": "demand"}
115
    wz = Column(Integer, primary_key=True)
116
    year = Column(Integer, primary_key=True)
117
    slp = Column(String(50))
118
    load_curve = Column(ARRAY(Float()))
119
120
121
def create_tables():
122
    """Create tables for demandregio data
123
    Returns
124
    -------
125
    None.
126
    """
127
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;")
128
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;")
129
    engine = db.engine()
130
    EgonDemandRegioHH.__table__.create(bind=engine, checkfirst=True)
131
    EgonDemandRegioCtsInd.__table__.create(bind=engine, checkfirst=True)
132
    EgonDemandRegioPopulation.__table__.create(bind=engine, checkfirst=True)
133
    EgonDemandRegioHouseholds.__table__.create(bind=engine, checkfirst=True)
134
    EgonDemandRegioWz.__table__.create(bind=engine, checkfirst=True)
135
    DemandRegioLoadProfiles.__table__.create(bind=db.engine(), checkfirst=True)
136
    EgonDemandRegioTimeseriesCtsInd.__table__.drop(
137
        bind=engine, checkfirst=True
138
    )
139
    EgonDemandRegioTimeseriesCtsInd.__table__.create(
140
        bind=engine, checkfirst=True
141
    )
142
143
144
def data_in_boundaries(df):
145
    """Select rows with nuts3 code within boundaries, used for testmode
146
147
    Parameters
148
    ----------
149
    df : pandas.DataFrame
150
        Data for all nuts3 regions
151
152
    Returns
153
    -------
154
    pandas.DataFrame
155
        Data for nuts3 regions within boundaries
156
157
    """
158
    engine = db.engine()
159
160
    df = df.reset_index()
161
162
    # Change nuts3 region names to 2016 version
163
    nuts_names = {"DEB16": "DEB1C", "DEB19": "DEB1D"}
164
    df.loc[df.nuts3.isin(nuts_names), "nuts3"] = df.loc[
165
        df.nuts3.isin(nuts_names), "nuts3"
166
    ].map(nuts_names)
167
168
    df = df.set_index("nuts3")
169
170
    return df[
171
        df.index.isin(
172
            pd.read_sql(
173
                "SELECT DISTINCT ON (nuts) nuts FROM boundaries.vg250_krs",
174
                engine,
175
            ).nuts
176
        )
177
    ]
178
179
180
def insert_cts_ind_wz_definitions():
181
    """Insert demandregio's definitions of CTS and industrial branches
182
183
    Returns
184
    -------
185
    None.
186
187
    """
188
189
    source = egon.data.config.datasets()["demandregio_cts_ind_demand"][
190
        "sources"
191
    ]
192
193
    target = egon.data.config.datasets()["demandregio_cts_ind_demand"][
194
        "targets"
195
    ]["wz_definitions"]
196
197
    engine = db.engine()
198
199
    for sector in source["wz_definitions"]:
200
        file_path = (
201
            Path(".")
202
            / "data_bundle_egon_data"
203
            / "WZ_definition"
204
            / source["wz_definitions"][sector]
205
        )
206
207
        if sector == "CTS":
208
            delimiter = ";"
209
        else:
210
            delimiter = ","
211
        df = (
212
            pd.read_csv(file_path, delimiter=delimiter, header=None)
213
            .rename({0: "wz", 1: "definition"}, axis="columns")
214
            .set_index("wz")
215
        )
216
        df["sector"] = sector
217
        df.to_sql(
218
            target["table"],
219
            engine,
220
            schema=target["schema"],
221
            if_exists="append",
222
        )
223
224
225
def match_nuts3_bl():
226
    """Function that maps the federal state to each nuts3 region
227
228
    Returns
229
    -------
230
    df : pandas.DataFrame
231
        List of nuts3 regions and the federal state of Germany.
232
233
    """
234
235
    engine = db.engine()
236
237
    df = pd.read_sql(
238
        "SELECT DISTINCT ON (boundaries.vg250_krs.nuts) "
239
        "boundaries.vg250_krs.nuts, boundaries.vg250_lan.gen "
240
        "FROM boundaries.vg250_lan, boundaries.vg250_krs "
241
        " WHERE ST_CONTAINS("
242
        "boundaries.vg250_lan.geometry, "
243
        "boundaries.vg250_krs.geometry)",
244
        con=engine,
245
    )
246
247
    df.gen[df.gen == "Baden-Württemberg (Bodensee)"] = "Baden-Württemberg"
248
    df.gen[df.gen == "Bayern (Bodensee)"] = "Bayern"
249
250
    return df.set_index("nuts")
251
252
253
def adjust_ind_pes(ec_cts_ind):
254
    """
255
    Adjust electricity demand of industrial consumers due to electrification
256
    of process heat based on assumptions of pypsa-eur-sec.
257
258
    Parameters
259
    ----------
260
    ec_cts_ind : pandas.DataFrame
261
        Industrial demand without additional electrification
262
263
    Returns
264
    -------
265
    ec_cts_ind : pandas.DataFrame
266
        Industrial demand with additional electrification
267
268
    """
269
270
    pes_path = (
271
        Path(".") / "data_bundle_powerd_data" / "pypsa_eur" / "resources"
272
    )
273
274
    sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
275
        "sources"
276
    ]["new_consumers_2050"]
277
278
    # Extract today's industrial demand from pypsa-eur-sec
279
    demand_today = pd.read_csv(
280
        pes_path / sources["pes-demand-today"],
281
        header=None,
282
    ).transpose()
283
284
    # Filter data
285
    demand_today[1].fillna("carrier", inplace=True)
286
    demand_today = demand_today[
287
        (demand_today[0] == "DE") | (demand_today[1] == "carrier")
288
    ].drop([0, 2], axis="columns")
289
290
    demand_today = (
291
        demand_today.transpose()
292
        .set_index(0)
293
        .transpose()
294
        .set_index("carrier")
295
        .transpose()
296
        .loc["electricity"]
297
        .astype(float)
298
    )
299
300
    # Calculate future industrial demand from pypsa-eur-sec
301
    # based on production and energy demands per carrier ('sector ratios')
302
    prod_tomorrow = pd.read_csv(pes_path / sources["pes-production-tomorrow"])
303
304
    prod_tomorrow = prod_tomorrow[prod_tomorrow["kton/a"] == "DE"].set_index(
305
        "kton/a"
306
    )
307
308
    sector_ratio = (
309
        pd.read_csv(pes_path / sources["pes-sector-ratios"])
310
        .set_index("MWh/tMaterial")
311
        .loc["elec"]
312
    )
313
314
    demand_tomorrow = prod_tomorrow.multiply(
315
        sector_ratio.div(1000)
316
    ).transpose()["DE"]
317
318
    # Calculate changes of electrical demand per sector in pypsa-eur-sec
319
    change = pd.DataFrame(
320
        (demand_tomorrow / demand_today)
321
        / (demand_tomorrow / demand_today).sum()
322
    )
323
324
    # Drop rows without changes
325
    change = change[~change[0].isnull()]
326
327
    # Map industrial branches of pypsa-eur-sec to WZ2008 used in demandregio
328
    change["wz"] = change.index.map(
329
        {
330
            "Alumina production": 24,
331
            "Aluminium - primary production": 24,
332
            "Aluminium - secondary production": 24,
333
            "Ammonia": 20,
334
            "Basic chemicals (without ammonia)": 20,
335
            "Cement": 23,
336
            "Ceramics & other NMM": 23,
337
            "Electric arc": 24,
338
            "Food, beverages and tobacco": 10,
339
            "Glass production": 23,
340
            "Integrated steelworks": 24,
341
            "Machinery Equipment": 28,
342
            "Other Industrial Sectors": 32,
343
            "Other chemicals": 20,
344
            "Other non-ferrous metals": 24,
345
            "Paper production": 17,
346
            "Pharmaceutical products etc.": 21,
347
            "Printing and media reproduction": 18,
348
            "Pulp production": 17,
349
            "Textiles and leather": 13,
350
            "Transport Equipment": 29,
351
            "Wood and wood products": 16,
352
        }
353
    )
354
355
    # Group by WZ2008
356
    shares_per_wz = change.groupby("wz")[0].sum()
357
358
    # Calculate addtional demands needed to meet future demand of pypsa-eur-sec
359
    addtional_mwh = shares_per_wz.multiply(
360
        demand_tomorrow.sum() * 1000000 - ec_cts_ind.sum().sum()
361
    )
362
363
    # Calulate overall industrial demand for eGon100RE
364
    final_mwh = addtional_mwh + ec_cts_ind[addtional_mwh.index].sum()
365
366
    # Linear scale the industrial demands per nuts3 and wz to meet final demand
367
    ec_cts_ind[addtional_mwh.index] *= (
368
        final_mwh / ec_cts_ind[addtional_mwh.index].sum()
369
    )
370
371
    return ec_cts_ind
372
373
374
def adjust_cts_ind_nep(ec_cts_ind, sector):
375
    """Add electrical demand of new largescale CTS und industrial consumers
376
    according to NEP 2021, scneario C 2035. Values per federal state are
377
    linear distributed over all CTS branches and nuts3 regions.
378
379
    Parameters
380
    ----------
381
    ec_cts_ind : pandas.DataFrame
382
        CTS or industry demand without new largescale consumers.
383
384
    Returns
385
    -------
386
    ec_cts_ind : pandas.DataFrame
387
        CTS or industry demand including new largescale consumers.
388
389
    """
390
    sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
391
        "sources"
392
    ]
393
394
    file_path = (
395
        Path(".")
396
        / "data_bundle_egon_data"
397
        / "nep2035_version2021"
398
        / sources["new_consumers_2035"]
399
    )
400
401
    # get data from NEP per federal state
402
    new_con = pd.read_csv(file_path, delimiter=";", decimal=",", index_col=0)
403
404
    # match nuts3 regions to federal states
405
    groups = ec_cts_ind.groupby(match_nuts3_bl().gen)
406
407
    # update demands per federal state
408
    for group in groups.indices.keys():
409
        g = groups.get_group(group)
410
        data_new = g.mul(1 + new_con[sector][group] * 1e6 / g.sum().sum())
411
        ec_cts_ind[ec_cts_ind.index.isin(g.index)] = data_new
412
413
    return ec_cts_ind
414
415
416
def disagg_households_power(
417
    scenario, year, weight_by_income=False, original=False, **kwargs
418
):
419
    """
420
    Perform spatial disaggregation of electric power in [GWh/a] by key and
421
    possibly weight by income.
422
    Similar to disaggregator.spatial.disagg_households_power
423
424
425
    Parameters
426
    ----------
427
    by : str
428
        must be one of ['households', 'population']
429
    weight_by_income : bool, optional
430
        Flag if to weight the results by the regional income (default False)
431
    orignal : bool, optional
432
        Throughput to function households_per_size,
433
        A flag if the results should be left untouched and returned in
434
        original form for the year 2011 (True) or if they should be scaled to
435
        the given `year` by the population in that year (False).
436
437
    Returns
438
    -------
439
    pd.DataFrame or pd.Series
440
    """
441
    # source: survey of energieAgenturNRW
442
    # with/without direct water heating (DHW), and weighted average
443
    # https://1-stromvergleich.com/wp-content/uploads/erhebung_wo_bleibt_der_strom.pdf
444
    demand_per_hh_size = pd.DataFrame(
445
        index=range(1, 7),
446
        data={
447
            # "weighted DWH": [2290, 3202, 4193, 4955, 5928, 5928],
448
            # "without DHW": [1714, 2812, 3704, 4432, 5317, 5317],
449
            "with_DHW": [2181, 3843, 5151, 6189, 7494, 8465],
450
            "without_DHW": [1798, 2850, 3733, 4480, 5311, 5816],
451
            "weighted": [2256, 3248, 4246, 5009, 5969, 6579],
452
        },
453
    )
454
455
    if scenario == "eGon100RE":
456
        # chose demand per household size from survey without DHW
457
        power_per_HH = (
458
            demand_per_hh_size["without_DHW"] / 1e3
459
        )  # TODO why without?
460
461
        # calculate demand per nuts3 in 2011
462
        df_2011 = data.households_per_size(year=2011) * power_per_HH
463
464
        # scale demand per hh-size to meet demand without heat
465
        # according to JRC in 2011 (136.6-(20.14+9.41) TWh)
466
        # TODO check source and method
467
        power_per_HH *= (136.6 - (20.14 + 9.41)) * 1e6 / df_2011.sum().sum()
468
469
        # calculate demand per nuts3 in 2050
470
        df = data.households_per_size(year=year) * power_per_HH
471
472
    # Bottom-Up: Power demand by household sizes in [MWh/a] for each scenario
473
    elif scenario in ["status2019", "status2023", "eGon2021", "eGon2035"]:
474
        # chose demand per household size from survey including weighted DHW
475
        power_per_HH = demand_per_hh_size["weighted"] / 1e3
476
477
        # calculate demand per nuts3
478
        df = (
479
            data.households_per_size(original=original, year=year)
480
            * power_per_HH
481
        )
482
483
        if scenario == "eGon2035":
484
            # scale to fit demand of NEP 2021 scebario C 2035 (119TWh)
485
            df *= 119 * 1e6 / df.sum().sum()
486
487
        if scenario == "status2023":
488
            # scale to fit demand of BDEW 2023 (130.48 TWh) see issue #180
489
            df *= 130.48 * 1e6 / df.sum().sum()
490
491
        # if scenario == "status2021": # TODO status2021
492
        #     # scale to fit demand of AGEB 2021 (138.6 TWh)
493
        #     # https://ag-energiebilanzen.de/wp-content/uploads/2023/01/AGEB_22p2_rev-1.pdf#page=10
494
        #     df *= 138.6 * 1e6 / df.sum().sum()
495
496
    else:
497
        print(
498
            f"Electric demand per household size for scenario {scenario} "
499
            "is not specified."
500
        )
501
502
    if weight_by_income:
503
        df = spatial.adjust_by_income(df=df)
0 ignored issues
show
introduced by
The variable df does not seem to be defined for all execution paths.
Loading history...
504
505
    return df
506
507
508
def write_demandregio_hh_profiles_to_db(hh_profiles):
509
    """Write HH demand profiles from demand regio into db. One row per
510
    year and nuts3. The annual load profile timeseries is an array.
511
512
    schema: demand
513
    tablename: demandregio_household_load_profiles
514
515
516
517
    Parameters
518
    ----------
519
    hh_profiles: pd.DataFrame
520
521
    Returns
522
    -------
523
    """
524
    years = hh_profiles.index.year.unique().values
525
    df_to_db = pd.DataFrame(
526
        columns=["id", "year", "nuts3", "load_in_mwh"]
527
    ).set_index("id")
528
    dataset = egon.data.config.settings()["egon-data"]["--dataset-boundary"]
529
530
    if dataset == "Schleswig-Holstein":
531
        hh_profiles = hh_profiles.loc[
532
            :, hh_profiles.columns.str.contains("DEF0")
533
        ]
534
535
    id = pd.read_sql_query(
536
        f"""
537
                           SELECT MAX(id)
538
                           FROM {DemandRegioLoadProfiles.__table__.schema}.
539
                           {DemandRegioLoadProfiles.__table__.name}
540
                           """,
541
        con=db.engine(),
542
    ).iat[0, 0]
543
544
    if id is None:
545
        id = 0
546
    else:
547
        id = id + 1
548
549
    for year in years:
550
        df = hh_profiles[hh_profiles.index.year == year]
551
        for nuts3 in hh_profiles.columns:
552
            id += 1
553
            df_to_db.at[id, "year"] = year
554
            df_to_db.at[id, "nuts3"] = nuts3
555
            df_to_db.at[id, "load_in_mwh"] = df[nuts3].to_list()
556
557
    df_to_db["year"] = df_to_db["year"].apply(int)
558
    df_to_db["nuts3"] = df_to_db["nuts3"].astype(str)
559
    df_to_db["load_in_mwh"] = df_to_db["load_in_mwh"].apply(list)
560
    df_to_db = df_to_db.reset_index()
561
562
    df_to_db.to_sql(
563
        name=DemandRegioLoadProfiles.__table__.name,
564
        schema=DemandRegioLoadProfiles.__table__.schema,
565
        con=db.engine(),
566
        if_exists="append",
567
        index=-False,
568
    )
569
570
    return
571
572
573
def insert_hh_demand(scenario, year, engine):
574
    """Calculates electrical demands of private households using demandregio's
575
    disaggregator and insert results into the database.
576
577
    Parameters
578
    ----------
579
    scenario : str
580
        Name of the corresponding scenario.
581
    year : int
582
        The number of households per region is taken from this year.
583
584
    Returns
585
    -------
586
    None.
587
588
    """
589
    targets = egon.data.config.datasets()["demandregio_household_demand"][
590
        "targets"
591
    ]["household_demand"]
592
    # get demands of private households per nuts and size from demandregio
593
    ec_hh = disagg_households_power(scenario, year)
594
595
    # Select demands for nuts3-regions in boundaries (needed for testmode)
596
    ec_hh = data_in_boundaries(ec_hh)
597
598
    # insert into database
599
    for hh_size in ec_hh.columns:
600
        df = pd.DataFrame(ec_hh[hh_size])
601
        df["year"] = 2023 if scenario == "status2023" else year # TODO status2023
602
        # adhoc fix until ffeopendata servers are up and population_year can be set
603
604
        df["scenario"] = scenario
605
        df["hh_size"] = hh_size
606
        df = df.rename({hh_size: "demand"}, axis="columns")
607
        df.to_sql(
608
            targets["table"],
609
            engine,
610
            schema=targets["schema"],
611
            if_exists="append",
612
        )
613
614
    # insert housholds demand timeseries
615
    try:
616
        hh_load_timeseries = (
617
            temporal.disagg_temporal_power_housholds_slp(
618
                use_nuts3code=True,
619
                by="households",
620
                weight_by_income=False,
621
                year=year,
622
            )
623
            .resample("h")
624
            .sum()
625
        )
626
        hh_load_timeseries.rename(
627
            columns={"DEB16": "DEB1C", "DEB19": "DEB1D"}, inplace=True)
628
    except Exception as e:
629
        logger.warning(f"Couldnt get profiles from FFE, will use pickeld fallback! \n {e}")
630
        hh_load_timeseries = pd.read_pickle(Path(".", "df_load_profiles.pkl").resolve())
631
632
        def change_year(dt, year):
633
            return dt.replace(year=year)
634
635
        year = 2023 if scenario == "status2023" else year  # TODO status2023
636
        hh_load_timeseries.index = hh_load_timeseries.index.map(lambda dt: change_year(dt, year))
0 ignored issues
show
introduced by
The variable change_year does not seem to be defined for all execution paths.
Loading history...
637
638
        if scenario == "status2023":
639
            hh_load_timeseries = hh_load_timeseries.shift(24 * 2)
640
641
            hh_load_timeseries.iloc[:24 * 7] = hh_load_timeseries.iloc[24 * 7:24 * 7 * 2].values
642
643
    write_demandregio_hh_profiles_to_db(hh_load_timeseries)
644
645
646
def insert_cts_ind(scenario, year, engine, target_values):
647
    """Calculates electrical demands of CTS and industry using demandregio's
648
    disaggregator, adjusts them according to resulting values of NEP 2021 or
649
    JRC IDEES and insert results into the database.
650
651
    Parameters
652
    ----------
653
    scenario : str
654
        Name of the corresponing scenario.
655
    year : int
656
        The number of households per region is taken from this year.
657
    target_values : dict
658
        List of target values for each scenario and sector.
659
660
    Returns
661
    -------
662
    None.
663
664
    """
665
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
666
        "targets"
667
    ]
668
669
    # Workaround: Since the disaggregator does not work anymore, data from
670
    # previous runs is used for eGon2035 and eGon100RE
671 View Code Duplication
    if scenario == "eGon2035":
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
672
        ec_cts_ind2 = pd.read_csv(
673
            "data_bundle_powerd_data/egon_demandregio_cts_ind_egon2035.csv"
674
        )
675
        ec_cts_ind2.to_sql(
676
            targets["cts_ind_demand"]["table"],
677
            engine,
678
            targets["cts_ind_demand"]["schema"],
679
            if_exists="append",
680
            index=False,
681
        )
682
        return
683
684 View Code Duplication
    if scenario == "eGon100RE":
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
685
        ec_cts_ind2 = pd.read_csv(
686
            "data_bundle_powerd_data/egon_demandregio_cts_ind.csv"
687
        )
688
        ec_cts_ind2.to_sql(
689
            targets["cts_ind_demand"]["table"],
690
            engine,
691
            targets["cts_ind_demand"]["schema"],
692
            if_exists="append",
693
            index=False,
694
        )
695
        return
696
697
    for sector in ["CTS", "industry"]:
698
        # get demands per nuts3 and wz of demandregio
699
        ec_cts_ind = spatial.disagg_CTS_industry(
700
            use_nuts3code=True, source="power", sector=sector, year=year
701
        ).transpose()
702
703
        ec_cts_ind.index = ec_cts_ind.index.rename("nuts3")
704
705
        # exclude mobility sector from GHD
706
        ec_cts_ind = ec_cts_ind.drop(columns=49, errors="ignore")
707
708
        # scale values according to target_values
709
        if sector in target_values[scenario].keys():
710
            ec_cts_ind *= (
711
                target_values[scenario][sector] / ec_cts_ind.sum().sum()
712
            )
713
714
        # include new largescale consumers according to NEP 2021
715
        if scenario == "eGon2035":
716
            ec_cts_ind = adjust_cts_ind_nep(ec_cts_ind, sector)
717
        # include new industrial demands due to sector coupling
718
        if (scenario == "eGon100RE") & (sector == "industry"):
719
            ec_cts_ind = adjust_ind_pes(ec_cts_ind)
720
721
        # Select demands for nuts3-regions in boundaries (needed for testmode)
722
        ec_cts_ind = data_in_boundaries(ec_cts_ind)
723
724
        # insert into database
725
        for wz in ec_cts_ind.columns:
726
            df = pd.DataFrame(ec_cts_ind[wz])
727
            df["year"] = year
728
            df["wz"] = wz
729
            df["scenario"] = scenario
730
            df = df.rename({wz: "demand"}, axis="columns")
731
            df.index = df.index.rename("nuts3")
732
            df.to_sql(
733
                targets["cts_ind_demand"]["table"],
734
                engine,
735
                targets["cts_ind_demand"]["schema"],
736
                if_exists="append",
737
            )
738
739
740
def insert_household_demand():
741
    """Insert electrical demands for households according to
742
    demandregio using its disaggregator-tool in MWh
743
744
    Returns
745
    -------
746
    None.
747
748
    """
749
    targets = egon.data.config.datasets()["demandregio_household_demand"][
750
        "targets"
751
    ]
752
    engine = db.engine()
753
754
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
755
756
    scenarios.append("eGon2021")
757
758
    for t in targets:
759
        db.execute_sql(
760
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
761
        )
762
763
    for scn in scenarios:
764
        year = (
765
            2023 if scn == "status2023"
766
            else scenario_parameters.global_settings(scn)["population_year"]
767
        )
768
769
        # Insert demands of private households
770
        insert_hh_demand(scn, year, engine)
771
772
773
def insert_cts_ind_demands():
774
    """Insert electricity demands per nuts3-region in Germany according to
775
    demandregio using its disaggregator-tool in MWh
776
777
    Returns
778
    -------
779
    None.
780
781
    """
782
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
783
        "targets"
784
    ]
785
    engine = db.engine()
786
787
    for t in targets:
788
        db.execute_sql(
789
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
790
        )
791
792
    insert_cts_ind_wz_definitions()
793
794
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
795
796
    scenarios.append("eGon2021")
797
798
    for scn in scenarios:
799
        year = scenario_parameters.global_settings(scn)["population_year"]
800
801
        if year > 2035:
802
            year = 2035
803
804
        # target values per scenario in MWh
805
        target_values = {
806
            # according to NEP 2021
807
            # new consumers will be added seperatly
808
            "eGon2035": {
809
                "CTS": 135300 * 1e3,
810
                "industry": 225400 * 1e3
811
            },
812
            # CTS: reduce overall demand from demandregio (without traffic)
813
            # by share of heat according to JRC IDEES, data from 2011
814
            # industry: no specific heat demand, use data from demandregio
815
            "eGon100RE": {
816
                "CTS": ((1 - (5.96 + 6.13) / 154.64) * 125183.403) * 1e3
817
            },
818
            # no adjustments for status quo
819
            "eGon2021": {},
820
            "status2019": {},
821
            "status2023": {
822
                "CTS": 121160 * 1e3,
823
                "industry": 200380 * 1e3
824
            },
825
        }
826
827
        insert_cts_ind(scn, year, engine, target_values)
828
829
    # Insert load curves per wz
830
    timeseries_per_wz()
831
832
833
def insert_society_data():
834
    """Insert population and number of households per nuts3-region in Germany
835
    according to demandregio using its disaggregator-tool
836
837
    Returns
838
    -------
839
    None.
840
841
    """
842
    targets = egon.data.config.datasets()["demandregio_society"]["targets"]
843
    engine = db.engine()
844
845
    for t in targets:
846
        db.execute_sql(
847
            f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
848
        )
849
850
    target_years = np.append(
851
        get_sector_parameters("global").population_year.values, 2018
852
    )
853
854
    for year in target_years:
855
        df_pop = pd.DataFrame(data.population(year=year))
856
        df_pop["year"] = year
857
        df_pop = df_pop.rename({"value": "population"}, axis="columns")
858
        # Select data for nuts3-regions in boundaries (needed for testmode)
859
        df_pop = data_in_boundaries(df_pop)
860
        df_pop.to_sql(
861
            targets["population"]["table"],
862
            engine,
863
            schema=targets["population"]["schema"],
864
            if_exists="append",
865
        )
866
867
    for year in target_years:
868
        df_hh = pd.DataFrame(data.households_per_size(year=year))
869
        # Select data for nuts3-regions in boundaries (needed for testmode)
870
        df_hh = data_in_boundaries(df_hh)
871
        for hh_size in df_hh.columns:
872
            df = pd.DataFrame(df_hh[hh_size])
873
            df["year"] = year
874
            df["hh_size"] = hh_size
875
            df = df.rename({hh_size: "households"}, axis="columns")
876
            df.to_sql(
877
                targets["household"]["table"],
878
                engine,
879
                schema=targets["household"]["schema"],
880
                if_exists="append",
881
            )
882
883
884
def insert_timeseries_per_wz(sector, year):
885
    """Insert normalized electrical load time series for the selected sector
886
887
    Parameters
888
    ----------
889
    sector : str
890
        Name of the sector. ['CTS', 'industry']
891
    year : int
892
        Selected weather year
893
894
    Returns
895
    -------
896
    None.
897
898
    """
899
    targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
900
        "targets"
901
    ]
902
903
    if sector == "CTS":
904
        profiles = (
905
            data.CTS_power_slp_generator("SH", year=year)
906
            .drop(
907
                [
908
                    "Day",
909
                    "Hour",
910
                    "DayOfYear",
911
                    "WD",
912
                    "SA",
913
                    "SU",
914
                    "WIZ",
915
                    "SOZ",
916
                    "UEZ",
917
                ],
918
                axis="columns",
919
            )
920
            .resample("H")
921
            .sum()
922
        )
923
        wz_slp = config.slp_branch_cts_power()
924
    elif sector == "industry":
925
        profiles = (
926
            data.shift_load_profile_generator(state="SH", year=year)
927
            .resample("H")
928
            .sum()
929
        )
930
        wz_slp = config.shift_profile_industry()
931
932
    else:
933
        print(f"Sector {sector} is not valid.")
934
935
    df = pd.DataFrame(
936
        index=wz_slp.keys(), columns=["slp", "load_curve", "year"]
0 ignored issues
show
introduced by
The variable wz_slp does not seem to be defined for all execution paths.
Loading history...
937
    )
938
939
    df.index.rename("wz", inplace=True)
940
941
    df.slp = wz_slp.values()
942
943
    df.year = year
944
945
    df.load_curve = profiles[df.slp].transpose().values.tolist()
0 ignored issues
show
introduced by
The variable profiles does not seem to be defined for all execution paths.
Loading history...
946
947
    db.execute_sql(
948
        f"""
949
                   DELETE FROM {targets['timeseries_cts_ind']['schema']}.
950
                   {targets['timeseries_cts_ind']['table']}
951
                   WHERE wz IN (
952
                       SELECT wz FROM {targets['wz_definitions']['schema']}.
953
                       {targets['wz_definitions']['table']}
954
                       WHERE sector = '{sector}')
955
                   """
956
    )
957
958
    df.to_sql(
959
        targets["timeseries_cts_ind"]["table"],
960
        schema=targets["timeseries_cts_ind"]["schema"],
961
        con=db.engine(),
962
        if_exists="append",
963
    )
964
965
966
def timeseries_per_wz():
967
    """Calcultae and insert normalized timeseries per wz for cts and industry
968
969
    Returns
970
    -------
971
    None.
972
973
    """
974
975
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
976
    year_already_in_database = []
977
    for scn in scenarios:
978
        year = int(scenario_parameters.global_settings(scn)["weather_year"])
979
980
        for sector in ["CTS", "industry"]:
981
            if not year in year_already_in_database:
982
                insert_timeseries_per_wz(sector, int(year))
983
        year_already_in_database.append(year)
984
985
986
def get_cached_tables():
987
    """Get cached demandregio tables and db-dump from former runs"""
988
    data_config = egon.data.config.datasets()
989
    for s in ["cache", "dbdump"]:
990
        url = data_config["demandregio_workaround"]["source"][s]["url"]
991
        target_path = data_config["demandregio_workaround"]["targets"][s][
992
            "path"
993
        ]
994
        filename = os.path.basename(url)
995
        file_path = Path(".", target_path, filename).resolve()
996
        os.makedirs(file_path.parent, exist_ok=True)
997
        logger.info(f"Downloading: {filename} from {url}.")
998
        download_and_check(url, file_path, max_iteration=5)
999
        with zipfile.ZipFile(file_path, "r") as zip_ref:
1000
            zip_ref.extractall(file_path.parent)
1001
1002