Passed
Pull Request — dev (#1138)
by
unknown
02:19
created

hpc()   B

Complexity

Conditions 4

Size

Total Lines 67
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 41
dl 0
loc 67
rs 8.896
c 0
b 0
f 0
cc 4
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Functions related to the four different use cases
3
"""
4
from __future__ import annotations
5
6
from loguru import logger
7
import geopandas as gpd
8
import numpy as np
9
import pandas as pd
10
11
from egon.data import config
12
13
DATASET_CFG = config.datasets()["charging_infrastructure"]
14
15
16
def hpc(hpc_points: gpd.GeoDataFrame, uc_dict: dict) -> gpd.GeoDataFrame:
17
    """
18
    Calculate placements and energy distribution for use case hpc.
19
20
    :param hpc_points: gpd.GeoDataFrame
21
        GeoDataFrame of possible hpc locations
22
    :param uc_dict: dict
23
        contains basic run info like region boundary and save directory
24
    """
25
    uc_id = "hpc"
26
    logger.debug(f"Use case: {uc_id}")
27
28
    num_hpc = 10**6
29
    energy_sum = 1
30
31
    # filter hpc points by region
32
    in_region_bool = hpc_points["geometry"].within(uc_dict["region"].iat[0])
33
    in_region = hpc_points.loc[in_region_bool]
34
35
    if "has_hpc" in in_region.columns:
36
        in_region = in_region.loc[in_region["has_hpc"]]
37
38
    cols = [
39
        "geometry",
40
        "hpc_count",
41
        "potential",
42
        "new_hpc_index",
43
        "new_hpc_tag",
44
    ]
45
    in_region = in_region[cols]
46
47
    # select all hpc points tagged 0 (all registered points)
48
    real_mask = in_region["new_hpc_tag"] == 0
49
    real_in_region = in_region.loc[real_mask]
50
    num_hpc_real = real_in_region["hpc_count"].sum()
51
52
    if num_hpc_real < num_hpc:
53
        sim_in_region = in_region.loc[~real_mask]
54
        sim_in_region = sim_in_region.loc[in_region["new_hpc_index"] > 0]
55
        sim_in_region_sorted = sim_in_region.sort_values(
56
            "potential", ascending=False
57
        )
58
        additional_hpc = int(
59
            min(num_hpc - num_hpc_real, len(sim_in_region.index))
60
        )
61
        selected_hpc = sim_in_region_sorted.iloc[:additional_hpc]
62
        real_in_region = pd.concat([real_in_region, selected_hpc])
63
    if not len(real_in_region.index):
64
        logger.warning(
65
            f"No potential charging points found in region {uc_dict['key']}!"
66
        )
67
    else:
68
        real_in_region["potential"] = (
69
            real_in_region["potential"] * real_in_region["hpc_count"]
70
        )
71
        total_potential = real_in_region["potential"].sum()
72
        real_in_region = real_in_region.assign(
73
            share=real_in_region["potential"] / total_potential
74
        ).round(6)
75
        real_in_region["exists"] = real_in_region["new_hpc_tag"] == 0
76
77
        # outputs
78
        logger.debug(
79
            f"{round(energy_sum, 1)} kWh got fastcharged in region {uc_dict['key']}."
80
        )
81
82
    return gpd.GeoDataFrame(real_in_region)
83
84
85
def public(
86
    public_points: gpd.GeoDataFrame,
87
    public_data: gpd.GeoDataFrame,
88
    uc_dict: dict,
89
) -> gpd.GeoDataFrame:
90
    """
91
    Calculate placements and energy distribution for use case hpc.
92
93
    :param public_points: gpd.GeoDataFrame
94
        existing public charging points
95
    :param public_data: gpd.GeoDataFrame
96
        clustered POI
97
    :param uc_dict: dict
98
        contains basic run info like region boundary and save directory
99
    """
100
101
    uc_id = "public"
102
    logger.debug(f"Use case: {uc_id}")
103
104
    num_public = 10**6
105
    energy_sum = 1
106
107
    # filter hpc points by region
108
    in_region_bool = public_points["geometry"].within(uc_dict["region"].iat[0])
109
    in_region = public_points.loc[in_region_bool]
110
111
    poi_in_region_bool = public_data["geometry"].within(
112
        uc_dict["region"].iat[0]
113
    )
114
    poi_in_region = public_data.loc[poi_in_region_bool]
115
116
    num_public_real = in_region["count"].sum()
117
118
    # match with clusters anyway (for weights)
119
    region_points, region_poi = match_existing_points(in_region, poi_in_region)
120
    region_points["exists"] = True
121
122
    if num_public_real < num_public:
123
        additional_public = num_public - num_public_real
124
        # distribute additional public points via POI
125
        add_points = distribute_by_poi(region_poi, additional_public)
126
        region_points = pd.concat([region_points, add_points])
127
128
    region_points["energy"] = (
129
        region_points["potential"]
130
        / region_points["potential"].sum()
131
        * energy_sum
132
    )
133
134
    # outputs
135
    logger.debug(
136
        f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}."
137
    )
138
139
    return gpd.GeoDataFrame(region_points, crs=public_points.crs)
140
141
142
def distribute_by_poi(region_poi: gpd.GeoDataFrame, num_points: int | float):
143
    # sort clusters without existing points by weight, then choose highest
144
    region_poi = region_poi.copy()
145
    region_poi.sort_values("potential", inplace=True, ascending=False)
146
    num_points = int(min(num_points, len(region_poi.index)))
147
    # choose point in cluster that is closest to big street
148
    return region_poi.iloc[:num_points]
149
150
151
def match_existing_points(
152
    region_points: gpd.GeoDataFrame, region_poi: gpd.GeoDataFrame
153
):
154
155
    region_poi = region_poi.assign(exists=False)
156
    poi_buffer = region_poi.buffer(region_poi["radius"].astype(int))
157
    region_points = region_points.assign(potential=0)
158
    for i in region_points.index:
159
        lis_point = region_points.at[i, "geometry"]
160
        cluster = poi_buffer.contains(lis_point)
161
        clusters = region_poi.loc[cluster]
162
        num_clusters = len(clusters.index)
163
164
        if num_clusters == 0:
165
            # decent average as fallback
166
            region_points.at[i, "potential"] = 5
167
        elif num_clusters == 1:
168
            region_points.at[i, "potential"] = clusters["potential"]
169
            region_poi.loc[cluster, "exists"] = True
170
171
        elif num_clusters > 1:
172
            # choose cluster with closest Point
173
            dist = clusters.distance(lis_point)
174
            idx = dist.idxmin()
175
            region_poi.at[idx, "exists"] = True
176
            region_points.at[i, "potential"] = clusters.at[idx, "potential"]
177
178
    # delete all clusters with exists = True
179
    region_poi = region_poi.loc[~region_poi["exists"]]
180
181
    return region_points, region_poi
182
183
184
def home(
185
    home_data: gpd.GeoDataFrame,
186
    uc_dict: dict,
187
) -> gpd.GeoDataFrame:
188
    """
189
    Calculate placements and energy distribution for use case hpc.
190
191
    :param home_data: gpd.GeoDataFrame
192
        info about house types
193
    :param uc_dict: dict
194
        contains basic run info like region boundary and save directory
195
    """
196
    uc_id = "home"
197
    logger.debug(f"Use case: {uc_id}")
198
199
    num_home = 1000000
200
    energy_sum = 1
201
202
    # filter houses by region
203
    in_region_bool = home_data["geometry"].within(uc_dict["region"].iat[0])
204
205
    in_region = home_data.loc[in_region_bool]
206
    if in_region.empty:
207
        return in_region
208
209
    in_region = in_region.assign(
210
        num=in_region["num"].fillna(value=0),
211
        num_mfh=in_region["num_mfh"].fillna(value=0),
212
    )
213
214
    potential = apportion_home(in_region, num_home, uc_dict)
215
216
    in_region["charge_spots"] = potential
217
    in_region = in_region.loc[in_region["charge_spots"] > 0]
218
    in_region["energy"] = energy_sum * in_region["charge_spots"] / num_home
219
    in_region = in_region.sort_values(by="energy", ascending=False)
220
221
    logger.debug(
222
        f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}."
223
    )
224
225
    return gpd.GeoDataFrame(in_region, crs=home_data.crs)
226
227
228
def apportion_home(home_df: pd.DataFrame, num_spots: int, config: dict):
229
    # use parameters to set number of possible charge spots per row
230
    home_df["num_available"] = home_df[["num", "num_mfh"]].apply(
231
        home_charge_spots, axis=1, raw=True, args=(config,)
232
    )
233
    # if too many spots need to be placed, every house gets a spot
234
    if num_spots >= home_df["num_available"].sum():
235
        logger.debug(
236
            f"All private home spots have been filled. Leftover: "
237
            f"{num_spots - home_df['num_available'].sum()}"
238
        )
239
        return home_df.loc[:, "num_available"]
240
    # distribute charge points based on houses per square
241
    samples = home_df.sample(
242
        num_spots, weights="num_available", random_state=1, replace=True
243
    )
244
    result = pd.Series([0] * len(home_df.index), index=home_df.index)
245
    for i in samples.index:
246
        result.at[i] += 1
247
    return result
248
249
250
def home_charge_spots(house_array: pd.Series | np.array, config: dict):
251
    # take number of houses, random seed, average spots per house and share of houses
252
    # with possible spots
253
    sfh = (
254
        house_array[0]
255
        * config["sfh_avg_spots"]
256
        * max(config["random_seed"].normal(config["sfh_available"], 0.1), 0)
257
    )
258
    mfh = (
259
        house_array[1]
260
        * config["mfh_avg_spots"]
261
        * max(config["random_seed"].normal(config["mfh_available"], 0.1), 0)
262
    )
263
    return round(sfh + mfh)
264
265
266
def work(
267
    landuse: gpd.GeoDataFrame,
268
    weights_dict: dict,
269
    uc_dict: dict,
270
) -> gpd.GeoDataFrame:
271
    """
272
    Calculate placements and energy distribution for use case hpc.
273
274
    :param landuse: gpd.GeoDataFrame
275
        work areas by land use
276
    :param weights_dict: dict
277
        weights for different land use types
278
    :param uc_dict: dict
279
        contains basic run info like region boundary and save directory
280
    """
281
    uc_id = "work"
282
    logger.debug(f"Use case: {uc_id}")
283
284
    energy_sum = 1
285
286
    in_region_bool = landuse.within(uc_dict["region"].iat[0])
287
    in_region = landuse[in_region_bool]
288
289
    # calculating the area of polygons
290
    in_region = in_region.assign(area=in_region["geometry"].area / 10**6)
291
292
    groups = in_region.groupby("landuse")
293
    group_labels = ["retail", "commercial", "industrial"]
294
295
    srid = DATASET_CFG["original_data"]["sources"]["tracbev"]["srid"]
296
297
    result = gpd.GeoDataFrame(
298
        columns=["geometry", "landuse", "potential"], crs=f"EPSG:{srid}"
299
    )
300
301
    for g in group_labels:
302
        if g in groups.groups:
303
            group = groups.get_group(g)
304
            group = group.assign(
305
                potential=group["geometry"].area * weights_dict[g]
306
            )
307
            group.to_crs(srid)
308
            result = gpd.GeoDataFrame(
309
                pd.concat([result, group]), crs=f"EPSG:{srid}"
310
            )
311
312
    result["energy"] = (
313
        result["potential"] * energy_sum / result["potential"].sum()
314
    )
315
    # outputs
316
    logger.debug(
317
        f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}."
318
    )
319
320
    return gpd.GeoDataFrame(result, crs=landuse.crs)
321