1
|
|
|
""" |
2
|
|
|
Functions related to the four different use cases |
3
|
|
|
""" |
4
|
|
|
from __future__ import annotations |
5
|
|
|
|
6
|
|
|
from loguru import logger |
7
|
|
|
import geopandas as gpd |
8
|
|
|
import numpy as np |
9
|
|
|
import pandas as pd |
10
|
|
|
|
11
|
|
|
from egon.data import config |
12
|
|
|
|
13
|
|
|
DATASET_CFG = config.datasets()["charging_infrastructure"] |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
def hpc(hpc_points: gpd.GeoDataFrame, uc_dict: dict) -> gpd.GeoDataFrame: |
17
|
|
|
""" |
18
|
|
|
Calculate placements and energy distribution for use case hpc. |
19
|
|
|
|
20
|
|
|
:param hpc_points: gpd.GeoDataFrame |
21
|
|
|
GeoDataFrame of possible hpc locations |
22
|
|
|
:param uc_dict: dict |
23
|
|
|
contains basic run info like region boundary and save directory |
24
|
|
|
""" |
25
|
|
|
uc_id = "hpc" |
26
|
|
|
logger.debug(f"Use case: {uc_id}") |
27
|
|
|
|
28
|
|
|
num_hpc = 10**6 |
29
|
|
|
energy_sum = 1 |
30
|
|
|
|
31
|
|
|
# filter hpc points by region |
32
|
|
|
in_region_bool = hpc_points["geometry"].within(uc_dict["region"].iat[0]) |
33
|
|
|
in_region = hpc_points.loc[in_region_bool] |
34
|
|
|
|
35
|
|
|
if "has_hpc" in in_region.columns: |
36
|
|
|
in_region = in_region.loc[in_region["has_hpc"]] |
37
|
|
|
|
38
|
|
|
cols = [ |
39
|
|
|
"geometry", |
40
|
|
|
"hpc_count", |
41
|
|
|
"potential", |
42
|
|
|
"new_hpc_index", |
43
|
|
|
"new_hpc_tag", |
44
|
|
|
] |
45
|
|
|
in_region = in_region[cols] |
46
|
|
|
|
47
|
|
|
# select all hpc points tagged 0 (all registered points) |
48
|
|
|
real_mask = in_region["new_hpc_tag"] == 0 |
49
|
|
|
real_in_region = in_region.loc[real_mask] |
50
|
|
|
num_hpc_real = real_in_region["hpc_count"].sum() |
51
|
|
|
|
52
|
|
|
if num_hpc_real < num_hpc: |
53
|
|
|
sim_in_region = in_region.loc[~real_mask] |
54
|
|
|
sim_in_region = sim_in_region.loc[in_region["new_hpc_index"] > 0] |
55
|
|
|
sim_in_region_sorted = sim_in_region.sort_values( |
56
|
|
|
"potential", ascending=False |
57
|
|
|
) |
58
|
|
|
additional_hpc = int( |
59
|
|
|
min(num_hpc - num_hpc_real, len(sim_in_region.index)) |
60
|
|
|
) |
61
|
|
|
selected_hpc = sim_in_region_sorted.iloc[:additional_hpc] |
62
|
|
|
real_in_region = pd.concat([real_in_region, selected_hpc]) |
63
|
|
|
if not len(real_in_region.index): |
64
|
|
|
logger.warning( |
65
|
|
|
f"No potential charging points found in region {uc_dict['key']}!" |
66
|
|
|
) |
67
|
|
|
else: |
68
|
|
|
real_in_region["potential"] = ( |
69
|
|
|
real_in_region["potential"] * real_in_region["hpc_count"] |
70
|
|
|
) |
71
|
|
|
total_potential = real_in_region["potential"].sum() |
72
|
|
|
real_in_region = real_in_region.assign( |
73
|
|
|
share=real_in_region["potential"] / total_potential |
74
|
|
|
).round(6) |
75
|
|
|
real_in_region["exists"] = real_in_region["new_hpc_tag"] == 0 |
76
|
|
|
|
77
|
|
|
# outputs |
78
|
|
|
logger.debug( |
79
|
|
|
f"{round(energy_sum, 1)} kWh got fastcharged in region {uc_dict['key']}." |
80
|
|
|
) |
81
|
|
|
|
82
|
|
|
return gpd.GeoDataFrame(real_in_region) |
83
|
|
|
|
84
|
|
|
|
85
|
|
|
def public( |
86
|
|
|
public_points: gpd.GeoDataFrame, |
87
|
|
|
public_data: gpd.GeoDataFrame, |
88
|
|
|
uc_dict: dict, |
89
|
|
|
) -> gpd.GeoDataFrame: |
90
|
|
|
""" |
91
|
|
|
Calculate placements and energy distribution for use case hpc. |
92
|
|
|
|
93
|
|
|
:param public_points: gpd.GeoDataFrame |
94
|
|
|
existing public charging points |
95
|
|
|
:param public_data: gpd.GeoDataFrame |
96
|
|
|
clustered POI |
97
|
|
|
:param uc_dict: dict |
98
|
|
|
contains basic run info like region boundary and save directory |
99
|
|
|
""" |
100
|
|
|
|
101
|
|
|
uc_id = "public" |
102
|
|
|
logger.debug(f"Use case: {uc_id}") |
103
|
|
|
|
104
|
|
|
num_public = 10**6 |
105
|
|
|
energy_sum = 1 |
106
|
|
|
|
107
|
|
|
# filter hpc points by region |
108
|
|
|
in_region_bool = public_points["geometry"].within(uc_dict["region"].iat[0]) |
109
|
|
|
in_region = public_points.loc[in_region_bool] |
110
|
|
|
|
111
|
|
|
poi_in_region_bool = public_data["geometry"].within( |
112
|
|
|
uc_dict["region"].iat[0] |
113
|
|
|
) |
114
|
|
|
poi_in_region = public_data.loc[poi_in_region_bool] |
115
|
|
|
|
116
|
|
|
num_public_real = in_region["count"].sum() |
117
|
|
|
|
118
|
|
|
# match with clusters anyway (for weights) |
119
|
|
|
region_points, region_poi = match_existing_points(in_region, poi_in_region) |
120
|
|
|
region_points["exists"] = True |
121
|
|
|
|
122
|
|
|
if num_public_real < num_public: |
123
|
|
|
additional_public = num_public - num_public_real |
124
|
|
|
# distribute additional public points via POI |
125
|
|
|
add_points = distribute_by_poi(region_poi, additional_public) |
126
|
|
|
region_points = pd.concat([region_points, add_points]) |
127
|
|
|
|
128
|
|
|
region_points["energy"] = ( |
129
|
|
|
region_points["potential"] |
130
|
|
|
/ region_points["potential"].sum() |
131
|
|
|
* energy_sum |
132
|
|
|
) |
133
|
|
|
|
134
|
|
|
# outputs |
135
|
|
|
logger.debug( |
136
|
|
|
f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." |
137
|
|
|
) |
138
|
|
|
|
139
|
|
|
return gpd.GeoDataFrame(region_points, crs=public_points.crs) |
140
|
|
|
|
141
|
|
|
|
142
|
|
|
def distribute_by_poi(region_poi: gpd.GeoDataFrame, num_points: int | float): |
143
|
|
|
# sort clusters without existing points by weight, then choose highest |
144
|
|
|
region_poi = region_poi.copy() |
145
|
|
|
region_poi.sort_values("potential", inplace=True, ascending=False) |
146
|
|
|
num_points = int(min(num_points, len(region_poi.index))) |
147
|
|
|
# choose point in cluster that is closest to big street |
148
|
|
|
return region_poi.iloc[:num_points] |
149
|
|
|
|
150
|
|
|
|
151
|
|
|
def match_existing_points( |
152
|
|
|
region_points: gpd.GeoDataFrame, region_poi: gpd.GeoDataFrame |
153
|
|
|
): |
154
|
|
|
|
155
|
|
|
region_poi = region_poi.assign(exists=False) |
156
|
|
|
poi_buffer = region_poi.buffer(region_poi["radius"].astype(int)) |
157
|
|
|
region_points = region_points.assign(potential=0) |
158
|
|
|
for i in region_points.index: |
159
|
|
|
lis_point = region_points.at[i, "geometry"] |
160
|
|
|
cluster = poi_buffer.contains(lis_point) |
161
|
|
|
clusters = region_poi.loc[cluster] |
162
|
|
|
num_clusters = len(clusters.index) |
163
|
|
|
|
164
|
|
|
if num_clusters == 0: |
165
|
|
|
# decent average as fallback |
166
|
|
|
region_points.at[i, "potential"] = 5 |
167
|
|
|
elif num_clusters == 1: |
168
|
|
|
region_points.at[i, "potential"] = clusters["potential"] |
169
|
|
|
region_poi.loc[cluster, "exists"] = True |
170
|
|
|
|
171
|
|
|
elif num_clusters > 1: |
172
|
|
|
# choose cluster with closest Point |
173
|
|
|
dist = clusters.distance(lis_point) |
174
|
|
|
idx = dist.idxmin() |
175
|
|
|
region_poi.at[idx, "exists"] = True |
176
|
|
|
region_points.at[i, "potential"] = clusters.at[idx, "potential"] |
177
|
|
|
|
178
|
|
|
# delete all clusters with exists = True |
179
|
|
|
region_poi = region_poi.loc[~region_poi["exists"]] |
180
|
|
|
|
181
|
|
|
return region_points, region_poi |
182
|
|
|
|
183
|
|
|
|
184
|
|
|
def home( |
185
|
|
|
home_data: gpd.GeoDataFrame, |
186
|
|
|
uc_dict: dict, |
187
|
|
|
) -> gpd.GeoDataFrame: |
188
|
|
|
""" |
189
|
|
|
Calculate placements and energy distribution for use case hpc. |
190
|
|
|
|
191
|
|
|
:param home_data: gpd.GeoDataFrame |
192
|
|
|
info about house types |
193
|
|
|
:param uc_dict: dict |
194
|
|
|
contains basic run info like region boundary and save directory |
195
|
|
|
""" |
196
|
|
|
uc_id = "home" |
197
|
|
|
logger.debug(f"Use case: {uc_id}") |
198
|
|
|
|
199
|
|
|
num_home = 1000000 |
200
|
|
|
energy_sum = 1 |
201
|
|
|
|
202
|
|
|
# filter houses by region |
203
|
|
|
in_region_bool = home_data["geometry"].within(uc_dict["region"].iat[0]) |
204
|
|
|
|
205
|
|
|
in_region = home_data.loc[in_region_bool] |
206
|
|
|
if in_region.empty: |
207
|
|
|
return in_region |
208
|
|
|
|
209
|
|
|
in_region = in_region.assign( |
210
|
|
|
num=in_region["num"].fillna(value=0), |
211
|
|
|
num_mfh=in_region["num_mfh"].fillna(value=0), |
212
|
|
|
) |
213
|
|
|
|
214
|
|
|
potential = apportion_home(in_region, num_home, uc_dict) |
215
|
|
|
|
216
|
|
|
in_region["charge_spots"] = potential |
217
|
|
|
in_region = in_region.loc[in_region["charge_spots"] > 0] |
218
|
|
|
in_region["energy"] = energy_sum * in_region["charge_spots"] / num_home |
219
|
|
|
in_region = in_region.sort_values(by="energy", ascending=False) |
220
|
|
|
|
221
|
|
|
logger.debug( |
222
|
|
|
f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." |
223
|
|
|
) |
224
|
|
|
|
225
|
|
|
return gpd.GeoDataFrame(in_region, crs=home_data.crs) |
226
|
|
|
|
227
|
|
|
|
228
|
|
|
def apportion_home(home_df: pd.DataFrame, num_spots: int, config: dict): |
229
|
|
|
# use parameters to set number of possible charge spots per row |
230
|
|
|
home_df["num_available"] = home_df[["num", "num_mfh"]].apply( |
231
|
|
|
home_charge_spots, axis=1, raw=True, args=(config,) |
232
|
|
|
) |
233
|
|
|
# if too many spots need to be placed, every house gets a spot |
234
|
|
|
if num_spots >= home_df["num_available"].sum(): |
235
|
|
|
logger.debug( |
236
|
|
|
f"All private home spots have been filled. Leftover: " |
237
|
|
|
f"{num_spots - home_df['num_available'].sum()}" |
238
|
|
|
) |
239
|
|
|
return home_df.loc[:, "num_available"] |
240
|
|
|
# distribute charge points based on houses per square |
241
|
|
|
samples = home_df.sample( |
242
|
|
|
num_spots, weights="num_available", random_state=1, replace=True |
243
|
|
|
) |
244
|
|
|
result = pd.Series([0] * len(home_df.index), index=home_df.index) |
245
|
|
|
for i in samples.index: |
246
|
|
|
result.at[i] += 1 |
247
|
|
|
return result |
248
|
|
|
|
249
|
|
|
|
250
|
|
|
def home_charge_spots(house_array: pd.Series | np.array, config: dict): |
251
|
|
|
# take number of houses, random seed, average spots per house and share of houses |
252
|
|
|
# with possible spots |
253
|
|
|
sfh = ( |
254
|
|
|
house_array[0] |
255
|
|
|
* config["sfh_avg_spots"] |
256
|
|
|
* max(config["random_seed"].normal(config["sfh_available"], 0.1), 0) |
257
|
|
|
) |
258
|
|
|
mfh = ( |
259
|
|
|
house_array[1] |
260
|
|
|
* config["mfh_avg_spots"] |
261
|
|
|
* max(config["random_seed"].normal(config["mfh_available"], 0.1), 0) |
262
|
|
|
) |
263
|
|
|
return round(sfh + mfh) |
264
|
|
|
|
265
|
|
|
|
266
|
|
|
def work( |
267
|
|
|
landuse: gpd.GeoDataFrame, |
268
|
|
|
weights_dict: dict, |
269
|
|
|
uc_dict: dict, |
270
|
|
|
) -> gpd.GeoDataFrame: |
271
|
|
|
""" |
272
|
|
|
Calculate placements and energy distribution for use case hpc. |
273
|
|
|
|
274
|
|
|
:param landuse: gpd.GeoDataFrame |
275
|
|
|
work areas by land use |
276
|
|
|
:param weights_dict: dict |
277
|
|
|
weights for different land use types |
278
|
|
|
:param uc_dict: dict |
279
|
|
|
contains basic run info like region boundary and save directory |
280
|
|
|
""" |
281
|
|
|
uc_id = "work" |
282
|
|
|
logger.debug(f"Use case: {uc_id}") |
283
|
|
|
|
284
|
|
|
energy_sum = 1 |
285
|
|
|
|
286
|
|
|
in_region_bool = landuse.within(uc_dict["region"].iat[0]) |
287
|
|
|
in_region = landuse[in_region_bool] |
288
|
|
|
|
289
|
|
|
# calculating the area of polygons |
290
|
|
|
in_region = in_region.assign(area=in_region["geometry"].area / 10**6) |
291
|
|
|
|
292
|
|
|
groups = in_region.groupby("landuse") |
293
|
|
|
group_labels = ["retail", "commercial", "industrial"] |
294
|
|
|
|
295
|
|
|
srid = DATASET_CFG["original_data"]["sources"]["tracbev"]["srid"] |
296
|
|
|
|
297
|
|
|
result = gpd.GeoDataFrame( |
298
|
|
|
columns=["geometry", "landuse", "potential"], crs=f"EPSG:{srid}" |
299
|
|
|
) |
300
|
|
|
|
301
|
|
|
for g in group_labels: |
302
|
|
|
if g in groups.groups: |
303
|
|
|
group = groups.get_group(g) |
304
|
|
|
group = group.assign( |
305
|
|
|
potential=group["geometry"].area * weights_dict[g] |
306
|
|
|
) |
307
|
|
|
group.to_crs(srid) |
308
|
|
|
result = gpd.GeoDataFrame( |
309
|
|
|
pd.concat([result, group]), crs=f"EPSG:{srid}" |
310
|
|
|
) |
311
|
|
|
|
312
|
|
|
result["energy"] = ( |
313
|
|
|
result["potential"] * energy_sum / result["potential"].sum() |
314
|
|
|
) |
315
|
|
|
# outputs |
316
|
|
|
logger.debug( |
317
|
|
|
f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." |
318
|
|
|
) |
319
|
|
|
|
320
|
|
|
return gpd.GeoDataFrame(result, crs=landuse.crs) |
321
|
|
|
|