Passed
Pull Request — dev (#1047)
by
unknown
01:27
created

data.datasets.industry.temporal   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 443
Duplicated Lines 25.73 %

Importance

Changes 0
Metric Value
wmc 11
eloc 188
dl 114
loc 443
rs 10
c 0
b 0
f 0

6 Functions

Rating   Name   Duplication   Size   Complexity  
A identify_voltage_level() 0 30 1
B calc_load_curves_ind_osm() 0 93 3
A insert_sites_ind_load() 58 62 2
B calc_load_curves_ind_sites() 0 97 2
A insert_osm_ind_load() 56 60 2
A identify_bus() 0 71 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""The central module containing all code dealing with processing
2
timeseries data using demandregio
3
4
"""
5
6
from sqlalchemy import ARRAY, Column, Float, Integer, String
7
from sqlalchemy.ext.declarative import declarative_base
8
import geopandas as gpd
9
import numpy as np
10
import pandas as pd
11
12
from egon.data import db
13
from egon.data.datasets.electricity_demand.temporal import calc_load_curve
14
import egon.data.config
15
16
Base = declarative_base()
17
18
19
def identify_voltage_level(df):
20
21
    """Identify the voltage_level of a grid component based on its peak load and
22
    defined thresholds.
23
24
25
    Parameters
26
    ----------
27
    df : pandas.DataFrame
28
        Data frame containing information about peak loads
29
30
31
    Returns
32
    -------
33
    pandas.DataFrame
34
        Data frame with an additional column with voltage level
35
36
    """
37
38
    df["voltage_level"] = np.nan
39
40
    # Identify voltage_level for every demand area taking thresholds into account which were defined in the eGon project
41
    df.loc[df["peak_load"] <= 0.1, "voltage_level"] = 7
42
    df.loc[df["peak_load"] > 0.1, "voltage_level"] = 6
43
    df.loc[df["peak_load"] > 0.2, "voltage_level"] = 5
44
    df.loc[df["peak_load"] > 5.5, "voltage_level"] = 4
45
    df.loc[df["peak_load"] > 20, "voltage_level"] = 3
46
    df.loc[df["peak_load"] > 120, "voltage_level"] = 1
47
48
    return df
49
50
51
def identify_bus(load_curves, demand_area):
52
    """Identify the grid connection point for a consumer by determining its grid level
53
    based on the time series' peak load and the spatial intersection to mv
54
    grid districts or ehv voronoi cells.
55
56
57
    Parameters
58
    ----------
59
    load_curves : pandas.DataFrame
60
        Demand timeseries per demand area (e.g. osm landuse area, industrial site)
61
62
    demand_area: pandas.DataFrame
63
        Dataframe with id and geometry of areas where an industrial demand
64
        is assigned to, such as osm landuse areas or industrial sites.
65
66
    Returns
67
    -------
68
    pandas.DataFrame
69
        Aggregated industrial demand timeseries per bus
70
71
    """
72
73
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
74
        "sources"
75
    ]
76
77
    # Select mv griddistrict
78
    griddistrict = db.select_geodataframe(
79
        f"""SELECT bus_id, geom FROM
80
                {sources['egon_mv_grid_district']['schema']}.
81
                {sources['egon_mv_grid_district']['table']}""",
82
        geom_col="geom",
83
        epsg=3035,
84
    )
85
86
    # Initialize dataframe to identify peak load per demand area (e.g. osm landuse area or industrial site)
87
    peak = pd.DataFrame(columns=["id", "peak_load"])
88
    peak["id"] = load_curves.max(axis=0).index
89
    peak["peak_load"] = load_curves.max(axis=0).values
90
91
    peak = identify_voltage_level(peak)
92
93
    # Assign bus_id to demand area by merging landuse and peak df
94
    peak = pd.merge(demand_area, peak, right_on="id", left_index=True)
95
96
    # Identify all demand areas connected to HVMV buses
97
    peak_hv = peak[peak["voltage_level"] > 1]
98
99
    # Perform a spatial join between the centroid of the demand area and mv grid districts to identify grid connection point
100
    peak_hv["centroid"] = peak_hv["geom"].centroid
101
    peak_hv = peak_hv.set_geometry("centroid")
102
    peak_hv_c = gpd.sjoin(peak_hv, griddistrict, how="inner", op="intersects")
103
104
    # Perform a spatial join between the polygon of the demand area  and mv grid districts to ensure every area got assign to a bus
105
    peak_hv_p = peak_hv[~peak_hv.isin(peak_hv_c)].dropna().set_geometry("geom")
106
    peak_hv_p = gpd.sjoin(
107
        peak_hv_p, griddistrict, how="inner", op="intersects"
108
    ).drop_duplicates(subset=["id"])
109
110
    # Bring both dataframes together
111
    peak_bus = peak_hv_c.append(peak_hv_p, ignore_index=True)
112
113
    # Combine dataframes to bring loadcurves and bus id together
114
    curves_da = pd.merge(
115
        load_curves.T,
116
        peak_bus[["bus_id", "id", "geom"]],
117
        left_index=True,
118
        right_on="id",
119
    )
120
121
    return curves_da
122
123
124
def calc_load_curves_ind_osm(scenario):
125
    """Temporal disaggregate electrical demand per osm industrial landuse area.
126
127
128
    Parameters
129
    ----------
130
    scenario : str
131
        Scenario name.
132
133
    Returns
134
    -------
135
    pandas.DataFrame
136
        Demand timeseries of industry allocated to osm landuse areas and aggregated
137
        per substation id
138
139
    """
140
141
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
142
        "sources"
143
    ]
144
145
    # Select demands per industrial branch and osm landuse area
146
    demands_osm_area = db.select_dataframe(
147
        f"""SELECT osm_id, wz, demand
148
            FROM {sources['osm']['schema']}.
149
            {sources['osm']['table']}
150
            WHERE scenario = '{scenario}'
151
            AND demand > 0
152
            """
153
    ).set_index(["osm_id", "wz"])
154
155
    # Select industrial landuse polygons as demand area
156
    demand_area = db.select_geodataframe(
157
        f"""SELECT id, geom FROM
158
                {sources['osm_landuse']['schema']}.
159
                {sources['osm_landuse']['table']}
160
                WHERE sector = 3 """,
161
        index_col="id",
162
        geom_col="geom",
163
        epsg=3035,
164
    )
165
166
    # Calculate shares of industrial branches per osm area
167
    osm_share_wz = demands_osm_area.groupby("osm_id").apply(
168
        lambda grp: grp / grp.sum()
169
    )
170
171
    osm_share_wz.reset_index(inplace=True)
172
173
    share_wz_transpose = pd.DataFrame(
174
        index=osm_share_wz.osm_id.unique(), columns=osm_share_wz.wz.unique()
175
    )
176
    share_wz_transpose.index.rename("osm_id", inplace=True)
177
178
    for wz in share_wz_transpose.columns:
179
        share_wz_transpose[wz] = (
180
            osm_share_wz[osm_share_wz.wz == wz].set_index("osm_id").demand
181
        )
182
183
    # Rename columns to bring it in line with demandregio data
184
    share_wz_transpose.rename(columns={1718: 17}, inplace=True)
185
186
    # Calculate industrial annual demand per osm area
187
    annual_demand_osm = demands_osm_area.groupby("osm_id").demand.sum()
188
189
    # Return electrical load curves per osm industrial landuse area
190
    load_curves = calc_load_curve(share_wz_transpose, annual_demand_osm)
191
192
    curves_da = identify_bus(load_curves, demand_area)
193
194
    # Group all load curves per bus
195
    curves_bus = (
196
        curves_da.drop(["id"], axis=1).fillna(0).groupby("bus_id").sum()
197
    )
198
199
    # Initalize pandas.DataFrame for export to database
200
    load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"])
201
202
    # Insert time series data to df as an array
203
    load_ts_df.p_set = curves_bus.values.tolist()
204
205
    # Create Dataframe to store time series individually
206
    curves_individual_interim = (
207
        curves_da.drop(["bus_id", "geom"], axis=1).fillna(0)
208
    ).set_index("id")
209
    curves_individual = curves_da[["id", "bus_id"]]
210
    curves_individual["p_set"] = curves_individual_interim.values.tolist()
211
    curves_individual["scn_name"] = scenario
212
    curves_individual = curves_individual.rename(
213
        columns={"id": "osm_id"}
214
    ).set_index(["osm_id", "scn_name"])
215
216
    return load_ts_df, curves_individual
217
218
219 View Code Duplication
def insert_osm_ind_load():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
    """Inserts electrical industry loads assigned to osm landuse areas to the database
221
222
    Returns
223
    -------
224
    None.
225
226
    """
227
228
    targets = egon.data.config.datasets()["electrical_load_curves_industry"][
229
        "targets"
230
    ]
231
232
    for scenario in ["eGon2035", "eGon100RE"]:
233
234
        # Delete existing data from database
235
        db.execute_sql(
236
            f"""
237
            DELETE FROM
238
            {targets['osm_load']['schema']}.{targets['osm_load']['table']}
239
            WHERE scn_name = '{scenario}'
240
            """
241
        )
242
243
        db.execute_sql(
244
            f"""
245
            DELETE FROM
246
            {targets['osm_load_individual']['schema']}.{targets['osm_load_individual']['table']}
247
            WHERE scn_name = '{scenario}'
248
            """
249
        )
250
251
        # Calculate cts load curves per mv substation (hvmv bus)
252
        data, curves_individual = calc_load_curves_ind_osm(scenario)
253
        data.index = data.index.rename("bus")
254
        data["scn_name"] = scenario
255
256
        data.set_index(["scn_name"], inplace=True, append=True)
257
258
        # Insert into database
259
        data.to_sql(
260
            targets["osm_load"]["table"],
261
            schema=targets["osm_load"]["schema"],
262
            con=db.engine(),
263
            if_exists="append",
264
        )
265
266
        curves_individual["peak_load"] = np.array(
267
            curves_individual["p_set"].values.tolist()
268
        ).max(axis=1)
269
        curves_individual["demand"] = np.array(
270
            curves_individual["p_set"].values.tolist()
271
        ).sum(axis=1)
272
        curves_individual = identify_voltage_level(curves_individual)
273
274
        curves_individual.to_sql(
275
            targets["osm_load_individual"]["table"],
276
            schema=targets["osm_load_individual"]["schema"],
277
            con=db.engine(),
278
            if_exists="append",
279
        )
280
281
282
def calc_load_curves_ind_sites(scenario):
283
    """Temporal disaggregation of load curves per industrial site and industrial subsector.
284
285
286
    Parameters
287
    ----------
288
    scenario : str
289
        Scenario name.
290
291
    Returns
292
    -------
293
    pandas.DataFrame
294
        Demand timeseries of industry allocated to industrial sites and aggregated
295
        per substation id and industrial subsector
296
297
    """
298
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
299
        "sources"
300
    ]
301
302
    # Select demands per industrial site including the subsector information
303
    demands_ind_sites = db.select_dataframe(
304
        f"""SELECT industrial_sites_id, wz, demand
305
            FROM {sources['sites']['schema']}.
306
            {sources['sites']['table']}
307
            WHERE scenario = '{scenario}'
308
            AND demand > 0
309
            """
310
    ).set_index(["industrial_sites_id"])
311
312
    # Select industrial sites as demand_areas from database
313
314
    demand_area = db.select_geodataframe(
315
        f"""SELECT id, geom FROM
316
                {sources['sites_geom']['schema']}.
317
                {sources['sites_geom']['table']}""",
318
        index_col="id",
319
        geom_col="geom",
320
        epsg=3035,
321
    )
322
323
    # Replace entries to bring it in line with demandregio's subsector definitions
324
    demands_ind_sites.replace(1718, 17, inplace=True)
325
    share_wz_sites = demands_ind_sites.copy()
326
327
    # Create additional df on wz_share per industrial site, which is always set to one
328
    # as the industrial demand per site is subsector specific
329
330
    share_wz_sites.demand = 1
331
    share_wz_sites.reset_index(inplace=True)
332
333
    share_transpose = pd.DataFrame(
334
        index=share_wz_sites.industrial_sites_id.unique(),
335
        columns=share_wz_sites.wz.unique(),
336
    )
337
    share_transpose.index.rename("industrial_sites_id", inplace=True)
338
    for wz in share_transpose.columns:
339
        share_transpose[wz] = (
340
            share_wz_sites[share_wz_sites.wz == wz]
341
            .set_index("industrial_sites_id")
342
            .demand
343
        )
344
345
    load_curves = calc_load_curve(share_transpose, demands_ind_sites["demand"])
346
347
    curves_da = identify_bus(load_curves, demand_area)
348
349
    curves_da = pd.merge(
350
        curves_da, demands_ind_sites.wz, left_on="id", right_index=True
351
    )
352
353
    # Group all load curves per bus and wz
354
    curves_bus = (
355
        curves_da.fillna(0)
356
        .groupby(["bus_id", "wz"])
357
        .sum()
358
        .drop(["id"], axis=1)
359
    )
360
361
    # Initalize pandas.DataFrame for pf table load timeseries
362
    load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"])
363
364
    # Insert data for pf load timeseries table
365
    load_ts_df.p_set = curves_bus.values.tolist()
366
367
    # Create Dataframe to store time series individually
368
    curves_individual_interim = (
369
        curves_da.drop(["bus_id", "geom", "wz"], axis=1).fillna(0)
370
    ).set_index("id")
371
    curves_individual = curves_da[["id", "bus_id"]]
372
    curves_individual["p_set"] = curves_individual_interim.values.tolist()
373
    curves_individual["scn_name"] = scenario
374
    curves_individual = curves_individual.rename(
375
        columns={"id": "site_id"}
376
    ).set_index(["site_id", "scn_name"])
377
378
    return load_ts_df, curves_individual
379
380
381 View Code Duplication
def insert_sites_ind_load():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
382
    """Inserts electrical industry loads assigned to osm landuse areas to the database
383
384
    Returns
385
    -------
386
    None.
387
388
    """
389
390
    targets = egon.data.config.datasets()["electrical_load_curves_industry"][
391
        "targets"
392
    ]
393
394
    for scenario in ["eGon2035", "eGon100RE"]:
395
396
        # Delete existing data from database
397
        db.execute_sql(
398
            f"""
399
            DELETE FROM
400
            {targets['sites_load']['schema']}.{targets['sites_load']['table']}
401
            WHERE scn_name = '{scenario}'
402
            """
403
        )
404
405
        # Delete existing data from database
406
        db.execute_sql(
407
            f"""
408
            DELETE FROM
409
            {targets['sites_load_individual']['schema']}.
410
            {targets['sites_load_individual']['table']}
411
            WHERE scn_name = '{scenario}'
412
            """
413
        )
414
415
        # Calculate industrial load curves per bus
416
        data, curves_individual = calc_load_curves_ind_sites(scenario)
417
        data.index = data.index.rename(["bus", "wz"])
418
        data["scn_name"] = scenario
419
420
        data.set_index(["scn_name"], inplace=True, append=True)
421
422
        # Insert into database
423
        data.to_sql(
424
            targets["sites_load"]["table"],
425
            schema=targets["sites_load"]["schema"],
426
            con=db.engine(),
427
            if_exists="append",
428
        )
429
430
        curves_individual["peak_load"] = np.array(
431
            curves_individual["p_set"].values.tolist()
432
        ).max(axis=1)
433
        curves_individual["demand"] = np.array(
434
            curves_individual["p_set"].values.tolist()
435
        ).sum(axis=1)
436
        curves_individual = identify_voltage_level(curves_individual)
437
438
        curves_individual.to_sql(
439
            targets["sites_load_individual"]["table"],
440
            schema=targets["sites_load_individual"]["schema"],
441
            con=db.engine(),
442
            if_exists="append",
443
        )
444