Passed
Pull Request — dev (#1181)
by
unknown
05:34
created

get_annual_household_el_demand_cells()   C

Complexity

Conditions 10

Size

Total Lines 111
Code Lines 68

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 68
dl 0
loc 111
rs 5.2472
c 0
b 0
f 0
cc 10
nop 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.datasets.electricity_demand.get_annual_household_el_demand_cells() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""The central module containing all code dealing with processing
2
 data from demandRegio
3
4
"""
5
from sqlalchemy import Column, Float, ForeignKey, Integer, String
6
from sqlalchemy.ext.declarative import declarative_base
7
import pandas as pd
8
9
from egon.data import db
10
from egon.data.datasets import Dataset
11
from egon.data.datasets.electricity_demand.temporal import insert_cts_load
12
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
13
    HouseholdElectricityProfilesOfBuildings,
14
    get_iee_hh_demand_profiles_raw,
15
)
16
from egon.data.datasets.electricity_demand_timeseries.hh_profiles import (
17
    HouseholdElectricityProfilesInCensusCells,
18
)
19
from egon.data.datasets.zensus_vg250 import DestatisZensusPopulationPerHa
20
import egon.data.config
21
22
# will be later imported from another file ###
23
Base = declarative_base()
24
engine = db.engine()
25
26
27
class HouseholdElectricityDemand(Dataset):
28
    def __init__(self, dependencies):
29
        super().__init__(
30
            name="HouseholdElectricityDemand",
31
            version="0.0.5",
32
            dependencies=dependencies,
33
            tasks=(create_tables, get_annual_household_el_demand_cells),
34
        )
35
36
37
class CtsElectricityDemand(Dataset):
38
    def __init__(self, dependencies):
39
        super().__init__(
40
            name="CtsElectricityDemand",
41
            version="0.0.2",
42
            dependencies=dependencies,
43
            tasks=(distribute_cts_demands, insert_cts_load),
44
        )
45
46
47
class EgonDemandRegioZensusElectricity(Base):
48
    __tablename__ = "egon_demandregio_zensus_electricity"
49
    __table_args__ = {"schema": "demand", "extend_existing": True}
50
    zensus_population_id = Column(
51
        Integer, ForeignKey(DestatisZensusPopulationPerHa.id), primary_key=True
52
    )
53
    scenario = Column(String(50), primary_key=True)
54
    sector = Column(String, primary_key=True)
55
    demand = Column(Float)
56
57
58
def create_tables():
59
    """Create tables for demandregio data
60
    Returns
61
    -------
62
    None.
63
    """
64
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;")
65
    db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;")
66
    engine = db.engine()
67
    EgonDemandRegioZensusElectricity.__table__.drop(
68
        bind=engine, checkfirst=True
69
    )
70
    EgonDemandRegioZensusElectricity.__table__.create(
71
        bind=engine, checkfirst=True
72
    )
73
74
75
def get_annual_household_el_demand_cells():
76
    """
77
    Annual electricity demand per cell is determined
78
79
    Timeseries for every cell are accumulated, the maximum value
80
    determined and with the respective nuts3 factor scaled for 2035 and 2050
81
    scenario.
82
83
    Note
84
    ----------
85
    In test-mode 'SH' the iteration takes place by 'cell_id' to avoid
86
    intensive RAM usage. For whole Germany 'nuts3' are taken and
87
    RAM > 32GB is necessary.
88
    """
89
90
    with db.session_scope() as session:
91
        cells_query = (
92
            session.query(
93
                HouseholdElectricityProfilesOfBuildings,
94
                HouseholdElectricityProfilesInCensusCells.nuts3,
95
                HouseholdElectricityProfilesInCensusCells.factor_2019,
96
                HouseholdElectricityProfilesInCensusCells.factor_2023,
97
                HouseholdElectricityProfilesInCensusCells.factor_2035,
98
                HouseholdElectricityProfilesInCensusCells.factor_2050,
99
            )
100
            .filter(
101
                HouseholdElectricityProfilesOfBuildings.cell_id
102
                == HouseholdElectricityProfilesInCensusCells.cell_id
103
            )
104
            .order_by(HouseholdElectricityProfilesOfBuildings.id)
105
        )
106
107
    df_buildings_and_profiles = pd.read_sql(
108
        cells_query.statement, cells_query.session.bind, index_col="id"
109
    )
110
111
    # Read demand profiles from egon-data-bundle
112
    df_profiles = get_iee_hh_demand_profiles_raw()
113
114
    def ve(s):
115
        raise (ValueError(s))
116
117
    dataset = egon.data.config.settings()["egon-data"]["--dataset-boundary"]
118
    scenarios = egon.data.config.settings()["egon-data"]["--scenarios"]
119
120
    iterate_over = (
121
        "nuts3"
122
        if dataset == "Everything"
123
        else "cell_id"
124
        if dataset == "Schleswig-Holstein"
125
        else ve(f"'{dataset}' is not a valid dataset boundary.")
126
    )
127
128
    df_annual_demand = pd.DataFrame(
129
        columns=scenarios + ["zensus_population_id"]
130
    )
131
132
    for _, df in df_buildings_and_profiles.groupby(by=iterate_over):
133
        df_annual_demand_iter = pd.DataFrame(
134
            columns=scenarios + ["zensus_population_id"]
135
        )
136
137
        if "eGon2035" in scenarios:
138
            df_annual_demand_iter["eGon2035"] = (
139
                df_profiles.loc[:, df["profile_id"]].sum(axis=0)
140
                * df["factor_2035"].values
141
            )
142
        if "eGon100RE" in scenarios:
143
            df_annual_demand_iter["eGon100RE"] = (
144
                df_profiles.loc[:, df["profile_id"]].sum(axis=0)
145
                * df["factor_2050"].values
146
            )
147
        if "status2019" in scenarios:
148
            df_annual_demand_iter["status2019"] = (
149
                df_profiles.loc[:, df["profile_id"]].sum(axis=0)
150
                * df["factor_2019"].values
151
            )
152
153
        if "status2023" in scenarios:
154
            df_annual_demand_iter["status2023"] = (
155
                df_profiles.loc[:, df["profile_id"]].sum(axis=0)
156
                * df["factor_2023"].values
157
            )
158
        df_annual_demand_iter["zensus_population_id"] = df["cell_id"].values
159
        df_annual_demand = pd.concat([df_annual_demand, df_annual_demand_iter])
160
161
    df_annual_demand = (
162
        df_annual_demand.groupby("zensus_population_id").sum().reset_index()
163
    )
164
    df_annual_demand["sector"] = "residential"
165
    df_annual_demand = df_annual_demand.melt(
166
        id_vars=["zensus_population_id", "sector"],
167
        var_name="scenario",
168
        value_name="demand",
169
    )
170
    # convert from Wh to MWh
171
    df_annual_demand["demand"] = df_annual_demand["demand"] / 1e6
172
173
    # delete all cells for residentials
174
    with db.session_scope() as session:
175
        session.query(EgonDemandRegioZensusElectricity).filter(
176
            EgonDemandRegioZensusElectricity.sector == "residential"
177
        ).delete()
178
179
    # Insert data to target table
180
    df_annual_demand.to_sql(
181
        name=EgonDemandRegioZensusElectricity.__table__.name,
182
        schema=EgonDemandRegioZensusElectricity.__table__.schema,
183
        con=db.engine(),
184
        index=False,
185
        if_exists="append",
186
    )
187
188
189
def distribute_cts_demands():
190
    """Distribute electrical demands for cts to zensus cells.
191
192
    The demands on nuts3-level from demandregio are linear distributed
193
    to the heat demand of cts in each zensus cell.
194
195
    Returns
196
    -------
197
    None.
198
199
    """
200
201
    sources = egon.data.config.datasets()["electrical_demands_cts"]["sources"]
202
203
    target = egon.data.config.datasets()["electrical_demands_cts"]["targets"][
204
        "cts_demands_zensus"
205
    ]
206
207
    db.execute_sql(
208
        f"""DELETE FROM {target['schema']}.{target['table']}
209
                   WHERE sector = 'service'"""
210
    )
211
212
    # Select match between zensus cells and nuts3 regions of vg250
213
    map_nuts3 = db.select_dataframe(
214
        f"""SELECT zensus_population_id, vg250_nuts3 as nuts3 FROM
215
        {sources['map_zensus_vg250']['schema']}.
216
        {sources['map_zensus_vg250']['table']}""",
217
        index_col="zensus_population_id",
218
    )
219
220
    # Insert data per scenario
221
    for scn in egon.data.config.settings()["egon-data"]["--scenarios"]:
222
        # Select heat_demand per zensus cell
223
        peta = db.select_dataframe(
224
            f"""SELECT zensus_population_id, demand as heat_demand,
225
            sector, scenario FROM
226
            {sources['heat_demand_cts']['schema']}.
227
            {sources['heat_demand_cts']['table']}
228
            WHERE scenario = '{scn}'
229
            AND sector = 'service'""",
230
            index_col="zensus_population_id",
231
        )
232
233
        # Add nuts3 key to zensus cells
234
        peta["nuts3"] = map_nuts3.nuts3
235
236
        # Calculate share of nuts3 heat demand per zensus cell
237
        for nuts3, df in peta.groupby("nuts3"):
238
            peta.loc[df.index, "share"] = (
239
                df["heat_demand"] / df["heat_demand"].sum()
240
            )
241
242
        # Select forecasted electrical demands from demandregio table
243
        demand_nuts3 = db.select_dataframe(
244
            f"""SELECT nuts3, SUM(demand) as demand FROM
245
            {sources['demandregio']['schema']}.
246
            {sources['demandregio']['table']}
247
            WHERE scenario = '{scn}'
248
            AND wz IN (
249
                SELECT wz FROM
250
                {sources['demandregio_wz']['schema']}.
251
                {sources['demandregio_wz']['table']}
252
                WHERE sector = 'CTS')
253
            GROUP BY nuts3""",
254
            index_col="nuts3",
255
        )
256
257
        # Scale demands on nuts3 level linear to heat demand share
258
        peta["demand"] = peta["share"].mul(
259
            demand_nuts3.demand[peta["nuts3"]].values
260
        )
261
262
        # Rename index
263
        peta.index = peta.index.rename("zensus_population_id")
264
265
        # Insert data to target table
266
        peta[["scenario", "demand", "sector"]].to_sql(
267
            target["table"],
268
            schema=target["schema"],
269
            con=db.engine(),
270
            if_exists="append",
271
        )
272