Passed
Pull Request — dev (#1355)
by
unknown
02:50
created

data.datasets.industry.temporal   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 485
Duplicated Lines 25.36 %

Importance

Changes 0
Metric Value
wmc 11
eloc 205
dl 123
loc 485
rs 10
c 0
b 0
f 0

6 Functions

Rating   Name   Duplication   Size   Complexity  
A identify_voltage_level() 0 30 1
B calc_load_curves_ind_osm() 0 99 3
A insert_sites_ind_load() 62 62 2
B calc_load_curves_ind_sites() 0 104 2
A insert_osm_ind_load() 61 61 2
B identify_bus() 0 100 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""The central module containing all code dealing with processing
2
timeseries data using demandregio
3
4
"""
5
6
from sqlalchemy.ext.declarative import declarative_base
7
import geopandas as gpd
8
import numpy as np
9
import pandas as pd
10
11
from egon.data import db
12
from egon.data.datasets.electricity_demand.temporal import calc_load_curve
13
import egon.data.config
14
15
Base = declarative_base()
16
17
18
def identify_voltage_level(df):
19
    """Identify the voltage_level of a grid component based on its peak load
20
    and defined thresholds.
21
22
23
    Parameters
24
    ----------
25
    df : pandas.DataFrame
26
        Data frame containing information about peak loads
27
28
29
    Returns
30
    -------
31
    pandas.DataFrame
32
        Data frame with an additional column with voltage level
33
34
    """
35
36
    df["voltage_level"] = np.nan
37
38
    # Identify voltage_level for every demand area taking thresholds into
39
    # account which were defined in the eGon project
40
    df.loc[df["peak_load"] <= 0.1, "voltage_level"] = 7
41
    df.loc[df["peak_load"] > 0.1, "voltage_level"] = 6
42
    df.loc[df["peak_load"] > 0.2, "voltage_level"] = 5
43
    df.loc[df["peak_load"] > 5.5, "voltage_level"] = 4
44
    df.loc[df["peak_load"] > 20, "voltage_level"] = 3
45
    df.loc[df["peak_load"] > 120, "voltage_level"] = 1
46
47
    return df
48
49
50
def identify_bus(load_curves, demand_area):
51
    """Identify the grid connection point for a consumer by determining its
52
    grid level based on the time series' peak load and the spatial
53
    intersection to mv grid districts or ehv voronoi cells.
54
55
56
    Parameters
57
    ----------
58
    load_curves : pandas.DataFrame
59
        Demand timeseries per demand area (e.g. osm landuse area, industrial
60
        site)
61
62
    demand_area: pandas.DataFrame
63
        Dataframe with id and geometry of areas where an industrial demand
64
        is assigned to, such as osm landuse areas or industrial sites.
65
66
    Returns
67
    -------
68
    pandas.DataFrame
69
        Aggregated industrial demand timeseries per bus
70
71
    """
72
73
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
74
        "sources"
75
    ]
76
77
    # Select mv griddistrict
78
    griddistrict = db.select_geodataframe(
79
        f"""SELECT bus_id, geom FROM
80
                {sources['egon_mv_grid_district']['schema']}.
81
                {sources['egon_mv_grid_district']['table']}""",
82
        geom_col="geom",
83
        epsg=3035,
84
    )
85
86
    # Initialize dataframe to identify peak load per demand area (e.g. osm
87
    # landuse area or industrial site)
88
    peak = pd.DataFrame(columns=["id", "peak_load"])
89
    peak["id"] = load_curves.max(axis=0).index
90
    peak["peak_load"] = load_curves.max(axis=0).values
91
92
    peak = identify_voltage_level(peak)
93
94
    # Assign bus_id to demand area by merging landuse and peak df
95
    peak = pd.merge(demand_area, peak, right_on="id", left_index=True)
96
97
    # Identify all demand areas connected to HVMV buses
98
    peak_hv = peak[peak["voltage_level"] > 1]
99
100
    # Perform a spatial join between the centroid of the demand area and mv
101
    # grid districts to identify grid connection point
102
    peak_hv["centroid"] = peak_hv["geom"].centroid
103
    peak_hv = peak_hv.set_geometry("centroid")
104
    peak_hv_c = gpd.sjoin(
105
        peak_hv, griddistrict, how="inner", predicate="intersects"
106
    )
107
108
    # Perform a spatial join between the polygon of the demand area  and mv
109
    # grid districts to ensure every area got assign to a bus
110
    peak_hv_p = peak_hv[~peak_hv.isin(peak_hv_c)].dropna().set_geometry("geom")
111
    peak_hv_p = gpd.sjoin(
112
        peak_hv_p, griddistrict, how="inner", predicate="intersects"
113
    ).drop_duplicates(subset=["id"])
114
115
    # Bring both dataframes together
116
    peak_bus = pd.concat([peak_hv_c, peak_hv_p], ignore_index=True)
117
118
    # Select ehv voronoi
119
    ehv_voronoi = db.select_geodataframe(
120
        f"""SELECT bus_id, geom FROM
121
                {sources['egon_mv_grid_district']['schema']}.
122
                {sources['egon_mv_grid_district']['table']}""",
123
        geom_col="geom",
124
        epsg=3035,
125
    )
126
127
    # Identify all demand areas connected to EHV buses
128
    peak_ehv = peak[peak["voltage_level"] == 1]
129
130
    # Perform a spatial join between the centroid of the demand area and ehv
131
    # voronoi to identify grid connection point
132
    peak_ehv["centroid"] = peak_ehv["geom"].centroid
133
    peak_ehv = peak_ehv.set_geometry("centroid")
134
    peak_ehv = gpd.sjoin(
135
        peak_ehv, ehv_voronoi, how="inner", predicate="intersects"
136
    )
137
138
    # Bring both dataframes together
139
    peak_bus = pd.concat([peak_bus, peak_ehv], ignore_index=True)
140
141
    # Combine dataframes to bring loadcurves and bus id together
142
    curves_da = pd.merge(
143
        load_curves.T,
144
        peak_bus[["bus_id", "id", "geom"]],
145
        left_index=True,
146
        right_on="id",
147
    )
148
149
    return curves_da
150
151
152
def calc_load_curves_ind_osm(scenario):
153
    """Temporal disaggregate electrical demand per osm industrial landuse
154
    area.
155
156
157
    Parameters
158
    ----------
159
    scenario : str
160
        Scenario name.
161
162
    Returns
163
    -------
164
    pandas.DataFrame
165
        Demand timeseries of industry allocated to osm landuse areas and
166
        aggregated per substation id
167
168
    """
169
170
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
171
        "sources"
172
    ]
173
174
    # Select demands per industrial branch and osm landuse area
175
    demands_osm_area = db.select_dataframe(
176
        f"""SELECT osm_id, wz, demand
177
            FROM {sources['osm']['schema']}.
178
            {sources['osm']['table']}
179
            WHERE scenario = '{scenario}'
180
            AND demand > 0
181
            """
182
    ).set_index(["osm_id", "wz"])
183
184
    # Select industrial landuse polygons as demand area
185
    demand_area = db.select_geodataframe(
186
        f"""SELECT id, geom FROM
187
                {sources['osm_landuse']['schema']}.
188
                {sources['osm_landuse']['table']}
189
                WHERE sector = 3 """,
190
        index_col="id",
191
        geom_col="geom",
192
        epsg=3035,
193
    )
194
195
    # Calculate shares of industrial branches per osm area
196
    osm_share_wz = demands_osm_area.groupby(["osm_id"], as_index=False).apply(
197
        lambda grp: grp / grp.sum()
198
    )
199
200
    osm_share_wz.reset_index(inplace=True)
201
202
    share_wz_transpose = pd.DataFrame(
203
        index=osm_share_wz.osm_id.unique(), columns=osm_share_wz.wz.unique()
204
    )
205
    share_wz_transpose.index.rename("osm_id", inplace=True)
206
207
    for wz in share_wz_transpose.columns:
208
        share_wz_transpose[wz] = (
209
            osm_share_wz[osm_share_wz.wz == wz].set_index("osm_id").demand
210
        )
211
212
    # Rename columns to bring it in line with demandregio data
213
    share_wz_transpose.rename(columns={1718: 17}, inplace=True)
214
215
    # Calculate industrial annual demand per osm area
216
    annual_demand_osm = demands_osm_area.groupby("osm_id").demand.sum()
217
218
    # Return electrical load curves per osm industrial landuse area
219
    load_curves = calc_load_curve(
220
        share_wz_transpose, scenario, annual_demand_osm
221
    )
222
223
    curves_da = identify_bus(load_curves, demand_area)
224
225
    # Group all load curves per bus
226
    curves_bus = (
227
        curves_da.drop(["id", "geom"], axis=1)
228
        .fillna(0)
229
        .groupby("bus_id")
230
        .sum()
231
    )
232
233
    # Initalize pandas.DataFrame for export to database
234
    load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"])
235
236
    # Insert time series data to df as an array
237
    load_ts_df.p_set = curves_bus.values.tolist()
238
239
    # Create Dataframe to store time series individually
240
    curves_individual_interim = (
241
        curves_da.drop(["bus_id", "geom"], axis=1).fillna(0)
242
    ).set_index("id")
243
    curves_individual = curves_da[["id", "bus_id"]]
244
    curves_individual["p_set"] = curves_individual_interim.values.tolist()
245
    curves_individual["scn_name"] = scenario
246
    curves_individual = curves_individual.rename(
247
        columns={"id": "osm_id"}
248
    ).set_index(["osm_id", "scn_name"])
249
250
    return load_ts_df, curves_individual
251
252
253 View Code Duplication
def insert_osm_ind_load():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
254
    """Inserts electrical industry loads assigned to osm landuse areas to the
255
    database.
256
257
    Returns
258
    -------
259
    None.
260
261
    """
262
263
    targets = egon.data.config.datasets()["electrical_load_curves_industry"][
264
        "targets"
265
    ]
266
267
    for scenario in egon.data.config.settings()["egon-data"]["--scenarios"]:
268
        # Delete existing data from database
269
        db.execute_sql(
270
            f"""
271
            DELETE FROM
272
            {targets['osm_load']['schema']}.{targets['osm_load']['table']}
273
            WHERE scn_name = '{scenario}'
274
            """
275
        )
276
277
        db.execute_sql(
278
            f"""
279
            DELETE FROM
280
            {targets['osm_load_individual']['schema']}.
281
            {targets['osm_load_individual']['table']}
282
            WHERE scn_name = '{scenario}'
283
            """
284
        )
285
286
        # Calculate cts load curves per mv substation (hvmv bus)
287
        data, curves_individual = calc_load_curves_ind_osm(scenario)
288
        data.index = data.index.rename("bus")
289
        data["scn_name"] = scenario
290
291
        data.set_index(["scn_name"], inplace=True, append=True)
292
293
        # Insert into database
294
        data.to_sql(
295
            targets["osm_load"]["table"],
296
            schema=targets["osm_load"]["schema"],
297
            con=db.engine(),
298
            if_exists="append",
299
        )
300
301
        curves_individual["peak_load"] = np.array(
302
            curves_individual["p_set"].values.tolist()
303
        ).max(axis=1)
304
        curves_individual["demand"] = np.array(
305
            curves_individual["p_set"].values.tolist()
306
        ).sum(axis=1)
307
        curves_individual = identify_voltage_level(curves_individual)
308
309
        curves_individual.to_sql(
310
            targets["osm_load_individual"]["table"],
311
            schema=targets["osm_load_individual"]["schema"],
312
            con=db.engine(),
313
            if_exists="append",
314
        )
315
316
317
def calc_load_curves_ind_sites(scenario):
318
    """Temporal disaggregation of load curves per industrial site and
319
    industrial subsector.
320
321
322
    Parameters
323
    ----------
324
    scenario : str
325
        Scenario name.
326
327
    Returns
328
    -------
329
    pandas.DataFrame
330
        Demand timeseries of industry allocated to industrial sites and
331
        aggregated per substation id and industrial subsector
332
333
    """
334
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
335
        "sources"
336
    ]
337
338
    # Select demands per industrial site including the subsector information
339
    demands_ind_sites = db.select_dataframe(
340
        f"""SELECT industrial_sites_id, wz, demand
341
            FROM {sources['sites']['schema']}.
342
            {sources['sites']['table']}
343
            WHERE scenario = '{scenario}'
344
            AND demand > 0
345
            """
346
    ).set_index(["industrial_sites_id"])
347
348
    # Select industrial sites as demand_areas from database
349
350
    demand_area = db.select_geodataframe(
351
        f"""SELECT id, geom FROM
352
                {sources['sites_geom']['schema']}.
353
                {sources['sites_geom']['table']}""",
354
        index_col="id",
355
        geom_col="geom",
356
        epsg=3035,
357
    )
358
359
    # Replace entries to bring it in line with demandregio's subsector
360
    # definitions
361
    demands_ind_sites.replace(1718, 17, inplace=True)
362
    share_wz_sites = demands_ind_sites.copy()
363
364
    # Create additional df on wz_share per industrial site, which is always
365
    # set to one as the industrial demand per site is subsector specific
366
367
    share_wz_sites.demand = 1
368
    share_wz_sites.reset_index(inplace=True)
369
370
    share_transpose = pd.DataFrame(
371
        index=share_wz_sites.industrial_sites_id.unique(),
372
        columns=share_wz_sites.wz.unique(),
373
    )
374
    share_transpose.index.rename("industrial_sites_id", inplace=True)
375
    for wz in share_transpose.columns:
376
        share_transpose[wz] = (
377
            share_wz_sites[share_wz_sites.wz == wz]
378
            .set_index("industrial_sites_id")
379
            .demand
380
        )
381
382
    load_curves = calc_load_curve(
383
        share_transpose, scenario, demands_ind_sites["demand"]
384
    )
385
386
    curves_da = identify_bus(load_curves, demand_area)
387
388
    curves_da = pd.merge(
389
        curves_da, demands_ind_sites.wz, left_on="id", right_index=True
390
    )
391
392
    # Group all load curves per bus and wz
393
    curves_bus = (
394
        curves_da.drop(["id", "geom"], axis=1)
395
        .fillna(0)
396
        .groupby(["bus_id", "wz"])
397
        .sum()
398
    )
399
400
    # Initalize pandas.DataFrame for pf table load timeseries
401
    load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"])
402
403
    # Insert data for pf load timeseries table
404
    load_ts_df.p_set = curves_bus.values.tolist()
405
406
    # Create Dataframe to store time series individually
407
    curves_individual_interim = (
408
        curves_da.drop(["bus_id", "geom", "wz"], axis=1).fillna(0)
409
    ).set_index("id")
410
    curves_individual = curves_da[["id", "bus_id"]]
411
    curves_individual["p_set"] = curves_individual_interim.values.tolist()
412
    curves_individual["scn_name"] = scenario
413
    curves_individual = curves_individual.merge(
414
        curves_da[["wz", "id"]], left_on="id", right_on="id"
415
    )
416
    curves_individual = curves_individual.rename(
417
        columns={"id": "site_id"}
418
    ).set_index(["site_id", "scn_name"])
419
420
    return load_ts_df, curves_individual
421
422
423 View Code Duplication
def insert_sites_ind_load():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
424
    """Inserts electrical industry loads assigned to osm landuse areas to the
425
    database.
426
427
    Returns
428
    -------
429
    None.
430
431
    """
432
433
    targets = egon.data.config.datasets()["electrical_load_curves_industry"][
434
        "targets"
435
    ]
436
437
    for scenario in egon.data.config.settings()["egon-data"]["--scenarios"]:
438
        # Delete existing data from database
439
        db.execute_sql(
440
            f"""
441
            DELETE FROM
442
            {targets['sites_load']['schema']}.{targets['sites_load']['table']}
443
            WHERE scn_name = '{scenario}'
444
            """
445
        )
446
447
        # Delete existing data from database
448
        db.execute_sql(
449
            f"""
450
            DELETE FROM
451
            {targets['sites_load_individual']['schema']}.
452
            {targets['sites_load_individual']['table']}
453
            WHERE scn_name = '{scenario}'
454
            """
455
        )
456
457
        # Calculate industrial load curves per bus
458
        data, curves_individual = calc_load_curves_ind_sites(scenario)
459
        data.index = data.index.rename(["bus", "wz"])
460
        data["scn_name"] = scenario
461
462
        data.set_index(["scn_name"], inplace=True, append=True)
463
464
        # Insert into database
465
        data.to_sql(
466
            targets["sites_load"]["table"],
467
            schema=targets["sites_load"]["schema"],
468
            con=db.engine(),
469
            if_exists="append",
470
        )
471
472
        curves_individual["peak_load"] = np.array(
473
            curves_individual["p_set"].values.tolist()
474
        ).max(axis=1)
475
        curves_individual["demand"] = np.array(
476
            curves_individual["p_set"].values.tolist()
477
        ).sum(axis=1)
478
        curves_individual = identify_voltage_level(curves_individual)
479
480
        curves_individual.to_sql(
481
            targets["sites_load_individual"]["table"],
482
            schema=targets["sites_load_individual"]["schema"],
483
            con=db.engine(),
484
            if_exists="append",
485
        )
486