Passed
Pull Request — dev (#1052)
by
unknown
01:26
created

insert_sites_ind_load()   A

Complexity

Conditions 2

Size

Total Lines 62
Code Lines 29

Duplication

Lines 58
Ratio 93.55 %

Importance

Changes 0
Metric Value
eloc 29
dl 58
loc 62
rs 9.184
c 0
b 0
f 0
cc 2
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""The central module containing all code dealing with processing
2
timeseries data using demandregio
3
4
"""
5
6
from sqlalchemy import ARRAY, Column, Float, Integer, String
7
from sqlalchemy.ext.declarative import declarative_base
8
import geopandas as gpd
9
import numpy as np
10
import pandas as pd
11
12
from egon.data import db
13
from egon.data.datasets.electricity_demand.temporal import calc_load_curve
14
import egon.data.config
15
16
Base = declarative_base()
17
18
19
def identify_voltage_level(df):
20
21
    """Identify the voltage_level of a grid component based on its peak load and
22
    defined thresholds.
23
24
25
    Parameters
26
    ----------
27
    df : pandas.DataFrame
28
        Data frame containing information about peak loads
29
30
31
    Returns
32
    -------
33
    pandas.DataFrame
34
        Data frame with an additional column with voltage level
35
36
    """
37
38
    df["voltage_level"] = np.nan
39
40
    # Identify voltage_level for every demand area taking thresholds into account which were defined in the eGon project
41
    df.loc[df["peak_load"] <= 0.1, "voltage_level"] = 7
42
    df.loc[df["peak_load"] > 0.1, "voltage_level"] = 6
43
    df.loc[df["peak_load"] > 0.2, "voltage_level"] = 5
44
    df.loc[df["peak_load"] > 5.5, "voltage_level"] = 4
45
    df.loc[df["peak_load"] > 20, "voltage_level"] = 3
46
    df.loc[df["peak_load"] > 120, "voltage_level"] = 1
47
48
    return df
49
50
51
def identify_bus(load_curves, demand_area):
52
    """Identify the grid connection point for a consumer by determining its grid level
53
    based on the time series' peak load and the spatial intersection to mv
54
    grid districts or ehv voronoi cells.
55
56
57
    Parameters
58
    ----------
59
    load_curves : pandas.DataFrame
60
        Demand timeseries per demand area (e.g. osm landuse area, industrial site)
61
62
    demand_area: pandas.DataFrame
63
        Dataframe with id and geometry of areas where an industrial demand
64
        is assigned to, such as osm landuse areas or industrial sites.
65
66
    Returns
67
    -------
68
    pandas.DataFrame
69
        Aggregated industrial demand timeseries per bus
70
71
    """
72
73
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
74
        "sources"
75
    ]
76
77
    # Select mv griddistrict
78
    griddistrict = db.select_geodataframe(
79
        f"""SELECT bus_id, geom FROM
80
                {sources['egon_mv_grid_district']['schema']}.
81
                {sources['egon_mv_grid_district']['table']}""",
82
        geom_col="geom",
83
        epsg=3035,
84
    )
85
86
    # Initialize dataframe to identify peak load per demand area (e.g. osm landuse area or industrial site)
87
    peak = pd.DataFrame(columns=["id", "peak_load"])
88
    peak["id"] = load_curves.max(axis=0).index
89
    peak["peak_load"] = load_curves.max(axis=0).values
90
91
    peak = identify_voltage_level(peak)
92
93
    # Assign bus_id to demand area by merging landuse and peak df
94
    peak = pd.merge(demand_area, peak, right_on="id", left_index=True)
95
96
    # Identify all demand areas connected to HVMV buses
97
    peak_hv = peak[peak["voltage_level"] > 1]
98
99
    # Perform a spatial join between the centroid of the demand area and mv grid districts to identify grid connection point
100
    peak_hv["centroid"] = peak_hv["geom"].centroid
101
    peak_hv = peak_hv.set_geometry("centroid")
102
    peak_hv_c = gpd.sjoin(peak_hv, griddistrict, how="inner", op="intersects")
103
104
    # Perform a spatial join between the polygon of the demand area  and mv grid districts to ensure every area got assign to a bus
105
    peak_hv_p = peak_hv[~peak_hv.isin(peak_hv_c)].dropna().set_geometry("geom")
106
    peak_hv_p = gpd.sjoin(
107
        peak_hv_p, griddistrict, how="inner", op="intersects"
108
    ).drop_duplicates(subset=["id"])
109
110
    # Bring both dataframes together
111
    peak_bus = peak_hv_c.append(peak_hv_p, ignore_index=True)
112
113
    # Select ehv voronoi
114
    ehv_voronoi = db.select_geodataframe(
115
        f"""SELECT bus_id, geom FROM
116
                {sources['egon_mv_grid_district']['schema']}.
117
                {sources['egon_mv_grid_district']['table']}""",
118
        geom_col="geom",
119
        epsg=3035,
120
    )
121
122
    # Identify all demand areas connected to EHV buses
123
    peak_ehv = peak[peak["voltage_level"] == 1]
124
125
    # Perform a spatial join between the centroid of the demand area and ehv voronoi to identify grid connection point
126
    peak_ehv["centroid"] = peak_ehv["geom"].centroid
127
    peak_ehv = peak_ehv.set_geometry("centroid")
128
    peak_ehv = gpd.sjoin(peak_ehv, ehv_voronoi, how="inner", op="intersects")
129
130
    # Bring both dataframes together
131
    peak_bus = peak_bus.append(peak_ehv, ignore_index=True)
132
133
    # Combine dataframes to bring loadcurves and bus id together
134
    curves_da = pd.merge(
135
        load_curves.T,
136
        peak_bus[["bus_id", "id", "geom"]],
137
        left_index=True,
138
        right_on="id",
139
    )
140
141
    return curves_da
142
143
144
def calc_load_curves_ind_osm(scenario):
145
    """Temporal disaggregate electrical demand per osm industrial landuse area.
146
147
148
    Parameters
149
    ----------
150
    scenario : str
151
        Scenario name.
152
153
    Returns
154
    -------
155
    pandas.DataFrame
156
        Demand timeseries of industry allocated to osm landuse areas and aggregated
157
        per substation id
158
159
    """
160
161
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
162
        "sources"
163
    ]
164
165
    # Select demands per industrial branch and osm landuse area
166
    demands_osm_area = db.select_dataframe(
167
        f"""SELECT osm_id, wz, demand
168
            FROM {sources['osm']['schema']}.
169
            {sources['osm']['table']}
170
            WHERE scenario = '{scenario}'
171
            AND demand > 0
172
            """
173
    ).set_index(["osm_id", "wz"])
174
175
    # Select industrial landuse polygons as demand area
176
    demand_area = db.select_geodataframe(
177
        f"""SELECT id, geom FROM
178
                {sources['osm_landuse']['schema']}.
179
                {sources['osm_landuse']['table']}
180
                WHERE sector = 3 """,
181
        index_col="id",
182
        geom_col="geom",
183
        epsg=3035,
184
    )
185
186
    # Calculate shares of industrial branches per osm area
187
    osm_share_wz = demands_osm_area.groupby("osm_id").apply(
188
        lambda grp: grp / grp.sum()
189
    )
190
191
    osm_share_wz.reset_index(inplace=True)
192
193
    share_wz_transpose = pd.DataFrame(
194
        index=osm_share_wz.osm_id.unique(), columns=osm_share_wz.wz.unique()
195
    )
196
    share_wz_transpose.index.rename("osm_id", inplace=True)
197
198
    for wz in share_wz_transpose.columns:
199
        share_wz_transpose[wz] = (
200
            osm_share_wz[osm_share_wz.wz == wz].set_index("osm_id").demand
201
        )
202
203
    # Rename columns to bring it in line with demandregio data
204
    share_wz_transpose.rename(columns={1718: 17}, inplace=True)
205
206
    # Calculate industrial annual demand per osm area
207
    annual_demand_osm = demands_osm_area.groupby("osm_id").demand.sum()
208
209
    # Return electrical load curves per osm industrial landuse area
210
    load_curves = calc_load_curve(share_wz_transpose, annual_demand_osm)
211
212
    curves_da = identify_bus(load_curves, demand_area)
213
214
    # Group all load curves per bus
215
    curves_bus = (
216
        curves_da.drop(["id"], axis=1).fillna(0).groupby("bus_id").sum()
217
    )
218
219
    # Initalize pandas.DataFrame for export to database
220
    load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"])
221
222
    # Insert time series data to df as an array
223
    load_ts_df.p_set = curves_bus.values.tolist()
224
225
    # Create Dataframe to store time series individually
226
    curves_individual_interim = (
227
        curves_da.drop(["bus_id", "geom"], axis=1).fillna(0)
228
    ).set_index("id")
229
    curves_individual = curves_da[["id", "bus_id"]]
230
    curves_individual["p_set"] = curves_individual_interim.values.tolist()
231
    curves_individual["scn_name"] = scenario
232
    curves_individual = curves_individual.rename(
233
        columns={"id": "osm_id"}
234
    ).set_index(["osm_id", "scn_name"])
235
236
    return load_ts_df, curves_individual
237
238
239 View Code Duplication
def insert_osm_ind_load():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
240
    """Inserts electrical industry loads assigned to osm landuse areas to the database
241
242
    Returns
243
    -------
244
    None.
245
246
    """
247
248
    targets = egon.data.config.datasets()["electrical_load_curves_industry"][
249
        "targets"
250
    ]
251
252
    for scenario in ["eGon2035", "eGon100RE"]:
253
254
        # Delete existing data from database
255
        db.execute_sql(
256
            f"""
257
            DELETE FROM
258
            {targets['osm_load']['schema']}.{targets['osm_load']['table']}
259
            WHERE scn_name = '{scenario}'
260
            """
261
        )
262
263
        db.execute_sql(
264
            f"""
265
            DELETE FROM
266
            {targets['osm_load_individual']['schema']}.{targets['osm_load_individual']['table']}
267
            WHERE scn_name = '{scenario}'
268
            """
269
        )
270
271
        # Calculate cts load curves per mv substation (hvmv bus)
272
        data, curves_individual = calc_load_curves_ind_osm(scenario)
273
        data.index = data.index.rename("bus")
274
        data["scn_name"] = scenario
275
276
        data.set_index(["scn_name"], inplace=True, append=True)
277
278
        # Insert into database
279
        data.to_sql(
280
            targets["osm_load"]["table"],
281
            schema=targets["osm_load"]["schema"],
282
            con=db.engine(),
283
            if_exists="append",
284
        )
285
286
        curves_individual["peak_load"] = np.array(
287
            curves_individual["p_set"].values.tolist()
288
        ).max(axis=1)
289
        curves_individual["demand"] = np.array(
290
            curves_individual["p_set"].values.tolist()
291
        ).sum(axis=1)
292
        curves_individual = identify_voltage_level(curves_individual)
293
294
        curves_individual.to_sql(
295
            targets["osm_load_individual"]["table"],
296
            schema=targets["osm_load_individual"]["schema"],
297
            con=db.engine(),
298
            if_exists="append",
299
        )
300
301
302
def calc_load_curves_ind_sites(scenario):
303
    """Temporal disaggregation of load curves per industrial site and industrial subsector.
304
305
306
    Parameters
307
    ----------
308
    scenario : str
309
        Scenario name.
310
311
    Returns
312
    -------
313
    pandas.DataFrame
314
        Demand timeseries of industry allocated to industrial sites and aggregated
315
        per substation id and industrial subsector
316
317
    """
318
    sources = egon.data.config.datasets()["electrical_load_curves_industry"][
319
        "sources"
320
    ]
321
322
    # Select demands per industrial site including the subsector information
323
    demands_ind_sites = db.select_dataframe(
324
        f"""SELECT industrial_sites_id, wz, demand
325
            FROM {sources['sites']['schema']}.
326
            {sources['sites']['table']}
327
            WHERE scenario = '{scenario}'
328
            AND demand > 0
329
            """
330
    ).set_index(["industrial_sites_id"])
331
332
    # Select industrial sites as demand_areas from database
333
334
    demand_area = db.select_geodataframe(
335
        f"""SELECT id, geom FROM
336
                {sources['sites_geom']['schema']}.
337
                {sources['sites_geom']['table']}""",
338
        index_col="id",
339
        geom_col="geom",
340
        epsg=3035,
341
    )
342
343
    # Replace entries to bring it in line with demandregio's subsector definitions
344
    demands_ind_sites.replace(1718, 17, inplace=True)
345
    share_wz_sites = demands_ind_sites.copy()
346
347
    # Create additional df on wz_share per industrial site, which is always set to one
348
    # as the industrial demand per site is subsector specific
349
350
    share_wz_sites.demand = 1
351
    share_wz_sites.reset_index(inplace=True)
352
353
    share_transpose = pd.DataFrame(
354
        index=share_wz_sites.industrial_sites_id.unique(),
355
        columns=share_wz_sites.wz.unique(),
356
    )
357
    share_transpose.index.rename("industrial_sites_id", inplace=True)
358
    for wz in share_transpose.columns:
359
        share_transpose[wz] = (
360
            share_wz_sites[share_wz_sites.wz == wz]
361
            .set_index("industrial_sites_id")
362
            .demand
363
        )
364
365
    load_curves = calc_load_curve(share_transpose, demands_ind_sites["demand"])
366
367
    curves_da = identify_bus(load_curves, demand_area)
368
369
    curves_da = pd.merge(
370
        curves_da, demands_ind_sites.wz, left_on="id", right_index=True
371
    )
372
373
    # Group all load curves per bus and wz
374
    curves_bus = (
375
        curves_da.fillna(0)
376
        .groupby(["bus_id", "wz"])
377
        .sum()
378
        .drop(["id"], axis=1)
379
    )
380
381
    # Initalize pandas.DataFrame for pf table load timeseries
382
    load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"])
383
384
    # Insert data for pf load timeseries table
385
    load_ts_df.p_set = curves_bus.values.tolist()
386
387
    # Create Dataframe to store time series individually
388
    curves_individual_interim = (
389
        curves_da.drop(["bus_id", "geom", "wz"], axis=1).fillna(0)
390
    ).set_index("id")
391
    curves_individual = curves_da[["id", "bus_id"]]
392
    curves_individual["p_set"] = curves_individual_interim.values.tolist()
393
    curves_individual["scn_name"] = scenario
394
    curves_individual = curves_individual.merge(
395
        curves_da[["wz", "id"]], left_on="id", right_on="id"
396
    )
397
    curves_individual = curves_individual.rename(
398
        columns={"id": "site_id"}
399
    ).set_index(["site_id", "scn_name"])
400
401
    return load_ts_df, curves_individual
402
403
404 View Code Duplication
def insert_sites_ind_load():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
405
    """Inserts electrical industry loads assigned to osm landuse areas to the database
406
407
    Returns
408
    -------
409
    None.
410
411
    """
412
413
    targets = egon.data.config.datasets()["electrical_load_curves_industry"][
414
        "targets"
415
    ]
416
417
    for scenario in ["eGon2035", "eGon100RE"]:
418
419
        # Delete existing data from database
420
        db.execute_sql(
421
            f"""
422
            DELETE FROM
423
            {targets['sites_load']['schema']}.{targets['sites_load']['table']}
424
            WHERE scn_name = '{scenario}'
425
            """
426
        )
427
428
        # Delete existing data from database
429
        db.execute_sql(
430
            f"""
431
            DELETE FROM
432
            {targets['sites_load_individual']['schema']}.
433
            {targets['sites_load_individual']['table']}
434
            WHERE scn_name = '{scenario}'
435
            """
436
        )
437
438
        # Calculate industrial load curves per bus
439
        data, curves_individual = calc_load_curves_ind_sites(scenario)
440
        data.index = data.index.rename(["bus", "wz"])
441
        data["scn_name"] = scenario
442
443
        data.set_index(["scn_name"], inplace=True, append=True)
444
445
        # Insert into database
446
        data.to_sql(
447
            targets["sites_load"]["table"],
448
            schema=targets["sites_load"]["schema"],
449
            con=db.engine(),
450
            if_exists="append",
451
        )
452
453
        curves_individual["peak_load"] = np.array(
454
            curves_individual["p_set"].values.tolist()
455
        ).max(axis=1)
456
        curves_individual["demand"] = np.array(
457
            curves_individual["p_set"].values.tolist()
458
        ).sum(axis=1)
459
        curves_individual = identify_voltage_level(curves_individual)
460
461
        curves_individual.to_sql(
462
            targets["sites_load_individual"]["table"],
463
            schema=targets["sites_load_individual"]["schema"],
464
            con=db.engine(),
465
            if_exists="append",
466
        )
467