Passed
Pull Request — dev (#943)
by
unknown
01:47
created

motorized_individual_travel_charging_infrastructure.use_cases   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 318
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 21
eloc 177
dl 0
loc 318
rs 10
c 0
b 0
f 0

8 Functions

Rating   Name   Duplication   Size   Complexity  
A apportion_home() 0 20 3
B match_existing_points() 0 31 5
A work() 0 55 3
A distribute_by_poi() 0 7 1
A public() 0 55 2
A home() 0 42 2
B hpc() 0 67 4
A home_charge_spots() 0 14 1
1
from __future__ import annotations
2
3
from loguru import logger
4
import geopandas as gpd
5
import numpy as np
6
import pandas as pd
7
8
from egon.data import config
9
10
DATASET_CFG = config.datasets()["charging_infrastructure"]
11
12
13
def hpc(hpc_points: gpd.GeoDataFrame, uc_dict: dict) -> gpd.GeoDataFrame:
14
    """
15
    Calculate placements and energy distribution for use case hpc.
16
17
    :param hpc_points: gpd.GeoDataFrame
18
        GeoDataFrame of possible hpc locations
19
    :param uc_dict: dict
20
        contains basic run info like region boundary and save directory
21
    """
22
    uc_id = "hpc"
23
    logger.debug(f"Use case: {uc_id}")
24
25
    num_hpc = 10**6
26
    energy_sum = 1
27
28
    # filter hpc points by region
29
    in_region_bool = hpc_points["geometry"].within(uc_dict["region"].iat[0])
30
    in_region = hpc_points.loc[in_region_bool]
31
32
    if "has_hpc" in in_region.columns:
33
        in_region = in_region.loc[in_region["has_hpc"]]
34
35
    cols = [
36
        "geometry",
37
        "hpc_count",
38
        "potential",
39
        "new_hpc_index",
40
        "new_hpc_tag",
41
    ]
42
    in_region = in_region[cols]
43
44
    # select all hpc points tagged 0 (all registered points)
45
    real_mask = in_region["new_hpc_tag"] == 0
46
    real_in_region = in_region.loc[real_mask]
47
    num_hpc_real = real_in_region["hpc_count"].sum()
48
49
    if num_hpc_real < num_hpc:
50
        sim_in_region = in_region.loc[~real_mask]
51
        sim_in_region = sim_in_region.loc[in_region["new_hpc_index"] > 0]
52
        sim_in_region_sorted = sim_in_region.sort_values(
53
            "potential", ascending=False
54
        )
55
        additional_hpc = int(
56
            min(num_hpc - num_hpc_real, len(sim_in_region.index))
57
        )
58
        selected_hpc = sim_in_region_sorted.iloc[:additional_hpc]
59
        real_in_region = pd.concat([real_in_region, selected_hpc])
60
    if not len(real_in_region.index):
61
        logger.warning(
62
            f"No potential charging points found in region {uc_dict['key']}!"
63
        )
64
    else:
65
        real_in_region["potential"] = (
66
            real_in_region["potential"] * real_in_region["hpc_count"]
67
        )
68
        total_potential = real_in_region["potential"].sum()
69
        real_in_region = real_in_region.assign(
70
            share=real_in_region["potential"] / total_potential
71
        ).round(6)
72
        real_in_region["exists"] = real_in_region["new_hpc_tag"] == 0
73
74
        # outputs
75
        logger.debug(
76
            f"{round(energy_sum, 1)} kWh got fastcharged in region {uc_dict['key']}."
77
        )
78
79
    return gpd.GeoDataFrame(real_in_region, crs=hpc_points.crs)
80
81
82
def public(
83
    public_points: gpd.GeoDataFrame,
84
    public_data: gpd.GeoDataFrame,
85
    uc_dict: dict,
86
) -> gpd.GeoDataFrame:
87
    """
88
    Calculate placements and energy distribution for use case hpc.
89
90
    :param public_points: gpd.GeoDataFrame
91
        existing public charging points
92
    :param public_data: gpd.GeoDataFrame
93
        clustered POI
94
    :param uc_dict: dict
95
        contains basic run info like region boundary and save directory
96
    """
97
98
    uc_id = "public"
99
    logger.debug(f"Use case: {uc_id}")
100
101
    num_public = 10**6
102
    energy_sum = 1
103
104
    # filter hpc points by region
105
    in_region_bool = public_points["geometry"].within(uc_dict["region"].iat[0])
106
    in_region = public_points.loc[in_region_bool]
107
108
    poi_in_region_bool = public_data["geometry"].within(
109
        uc_dict["region"].iat[0]
110
    )
111
    poi_in_region = public_data.loc[poi_in_region_bool]
112
113
    num_public_real = in_region["count"].sum()
114
115
    # match with clusters anyway (for weights)
116
    region_points, region_poi = match_existing_points(in_region, poi_in_region)
117
    region_points["exists"] = True
118
119
    if num_public_real < num_public:
120
        additional_public = num_public - num_public_real
121
        # distribute additional public points via POI
122
        add_points = distribute_by_poi(region_poi, additional_public)
123
        region_points = pd.concat([region_points, add_points])
124
125
    region_points["energy"] = (
126
        region_points["potential"]
127
        / region_points["potential"].sum()
128
        * energy_sum
129
    )
130
131
    # outputs
132
    logger.debug(
133
        f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}."
134
    )
135
136
    return gpd.GeoDataFrame(region_points, crs=public_points.crs)
137
138
139
def distribute_by_poi(region_poi: gpd.GeoDataFrame, num_points: int | float):
140
    # sort clusters without existing points by weight, then choose highest
141
    region_poi = region_poi.copy()
142
    region_poi.sort_values("potential", inplace=True, ascending=False)
143
    num_points = int(min(num_points, len(region_poi.index)))
144
    # choose point in cluster that is closest to big street
145
    return region_poi.iloc[:num_points]
146
147
148
def match_existing_points(
149
    region_points: gpd.GeoDataFrame, region_poi: gpd.GeoDataFrame
150
):
151
152
    region_poi = region_poi.assign(exists=False)
153
    poi_buffer = region_poi.buffer(region_poi["radius"].astype(int))
154
    region_points = region_points.assign(potential=0)
155
    for i in region_points.index:
156
        lis_point = region_points.at[i, "geometry"]
157
        cluster = poi_buffer.contains(lis_point)
158
        clusters = region_poi.loc[cluster]
159
        num_clusters = len(clusters.index)
160
161
        if num_clusters == 0:
162
            # decent average as fallback
163
            region_points.at[i, "potential"] = 5
164
        elif num_clusters == 1:
165
            region_points.at[i, "potential"] = clusters["potential"]
166
            region_poi.loc[cluster, "exists"] = True
167
168
        elif num_clusters > 1:
169
            # choose cluster with closest Point
170
            dist = clusters.distance(lis_point)
171
            idx = dist.idxmin()
172
            region_poi.at[idx, "exists"] = True
173
            region_points.at[i, "potential"] = clusters.at[idx, "potential"]
174
175
    # delete all clusters with exists = True
176
    region_poi = region_poi.loc[~region_poi["exists"]]
177
178
    return region_points, region_poi
179
180
181
def home(
182
    home_data: gpd.GeoDataFrame,
183
    uc_dict: dict,
184
) -> gpd.GeoDataFrame:
185
    """
186
    Calculate placements and energy distribution for use case hpc.
187
188
    :param home_data: gpd.GeoDataFrame
189
        info about house types
190
    :param uc_dict: dict
191
        contains basic run info like region boundary and save directory
192
    """
193
    uc_id = "home"
194
    logger.debug(f"Use case: {uc_id}")
195
196
    num_home = 1000000
197
    energy_sum = 1
198
199
    # filter houses by region
200
    in_region_bool = home_data["geometry"].within(uc_dict["region"].iat[0])
201
202
    in_region = home_data.loc[in_region_bool]
203
    if in_region.empty:
204
        return in_region
205
206
    in_region = in_region.assign(
207
        num=in_region["num"].fillna(value=0),
208
        num_mfh=in_region["num_mfh"].fillna(value=0),
209
    )
210
211
    potential = apportion_home(in_region, num_home, uc_dict)
212
213
    in_region["charge_spots"] = potential
214
    in_region = in_region.loc[in_region["charge_spots"] > 0]
215
    in_region["energy"] = energy_sum * in_region["charge_spots"] / num_home
216
    in_region = in_region.sort_values(by="energy", ascending=False)
217
218
    logger.debug(
219
        f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}."
220
    )
221
222
    return gpd.GeoDataFrame(in_region, crs=home_data.crs)
223
224
225
def apportion_home(home_df: pd.DataFrame, num_spots: int, config: dict):
226
    # use parameters to set number of possible charge spots per row
227
    home_df["num_available"] = home_df[["num", "num_mfh"]].apply(
228
        home_charge_spots, axis=1, raw=True, args=(config,)
229
    )
230
    # if too many spots need to be placed, every house gets a spot
231
    if num_spots >= home_df["num_available"].sum():
232
        logger.debug(
233
            f"All private home spots have been filled. Leftover: "
234
            f"{num_spots - home_df['num_available'].sum()}"
235
        )
236
        return home_df.loc[:, "num_available"]
237
    # distribute charge points based on houses per square
238
    samples = home_df.sample(
239
        num_spots, weights="num_available", random_state=1, replace=True
240
    )
241
    result = pd.Series([0] * len(home_df.index), index=home_df.index)
242
    for i in samples.index:
243
        result.at[i] += 1
244
    return result
245
246
247
def home_charge_spots(house_array: pd.Series | np.array, config: dict):
248
    # take number of houses, random seed, average spots per house and share of houses
249
    # with possible spots
250
    sfh = (
251
        house_array[0]
252
        * config["sfh_avg_spots"]
253
        * max(config["random_seed"].normal(config["sfh_available"], 0.1), 0)
254
    )
255
    mfh = (
256
        house_array[1]
257
        * config["mfh_avg_spots"]
258
        * max(config["random_seed"].normal(config["mfh_available"], 0.1), 0)
259
    )
260
    return round(sfh + mfh)
261
262
263
def work(
264
    landuse: gpd.GeoDataFrame,
265
    weights_dict: dict,
266
    uc_dict: dict,
267
) -> gpd.GeoDataFrame:
268
    """
269
    Calculate placements and energy distribution for use case hpc.
270
271
    :param landuse: gpd.GeoDataFrame
272
        work areas by land use
273
    :param weights_dict: dict
274
        weights for different land use types
275
    :param uc_dict: dict
276
        contains basic run info like region boundary and save directory
277
    """
278
    uc_id = "work"
279
    logger.debug(f"Use case: {uc_id}")
280
281
    energy_sum = 1
282
283
    in_region_bool = landuse.within(uc_dict["region"].iat[0])
284
    in_region = landuse[in_region_bool]
285
286
    # calculating the area of polygons
287
    in_region = in_region.assign(area=in_region["geometry"].area / 10**6)
288
289
    groups = in_region.groupby("landuse")
290
    group_labels = ["retail", "commercial", "industrial"]
291
292
    srid = DATASET_CFG["original_data"]["sources"]["tracbev"]["srid"]
293
294
    result = gpd.GeoDataFrame(
295
        columns=["geometry", "landuse", "potential"], crs=f"EPSG:{srid}"
296
    )
297
298
    for g in group_labels:
299
        if g in groups.groups:
300
            group = groups.get_group(g)
301
            group = group.assign(
302
                potential=group["geometry"].area * weights_dict[g]
303
            )
304
            group.to_crs(srid)
305
            result = gpd.GeoDataFrame(
306
                pd.concat([result, group]), crs=f"EPSG:{srid}"
307
            )
308
309
    result["energy"] = (
310
        result["potential"] * energy_sum / result["potential"].sum()
311
    )
312
    # outputs
313
    logger.debug(
314
        f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}."
315
    )
316
317
    return gpd.GeoDataFrame(result, crs=landuse.crs)
318