1
|
|
|
from __future__ import annotations |
2
|
|
|
|
3
|
|
|
from loguru import logger |
4
|
|
|
import geopandas as gpd |
5
|
|
|
import numpy as np |
6
|
|
|
import pandas as pd |
7
|
|
|
|
8
|
|
|
from egon.data import config |
9
|
|
|
|
10
|
|
|
DATASET_CFG = config.datasets()["charging_infrastructure"] |
11
|
|
|
|
12
|
|
|
|
13
|
|
|
def hpc(hpc_points: gpd.GeoDataFrame, uc_dict: dict) -> gpd.GeoDataFrame: |
14
|
|
|
""" |
15
|
|
|
Calculate placements and energy distribution for use case hpc. |
16
|
|
|
|
17
|
|
|
:param hpc_points: gpd.GeoDataFrame |
18
|
|
|
GeoDataFrame of possible hpc locations |
19
|
|
|
:param uc_dict: dict |
20
|
|
|
contains basic run info like region boundary and save directory |
21
|
|
|
""" |
22
|
|
|
uc_id = "hpc" |
23
|
|
|
logger.debug(f"Use case: {uc_id}") |
24
|
|
|
|
25
|
|
|
num_hpc = 10**6 |
26
|
|
|
energy_sum = 1 |
27
|
|
|
|
28
|
|
|
# filter hpc points by region |
29
|
|
|
in_region_bool = hpc_points["geometry"].within(uc_dict["region"].iat[0]) |
30
|
|
|
in_region = hpc_points.loc[in_region_bool] |
31
|
|
|
|
32
|
|
|
if "has_hpc" in in_region.columns: |
33
|
|
|
in_region = in_region.loc[in_region["has_hpc"]] |
34
|
|
|
|
35
|
|
|
cols = [ |
36
|
|
|
"geometry", |
37
|
|
|
"hpc_count", |
38
|
|
|
"potential", |
39
|
|
|
"new_hpc_index", |
40
|
|
|
"new_hpc_tag", |
41
|
|
|
] |
42
|
|
|
in_region = in_region[cols] |
43
|
|
|
|
44
|
|
|
# select all hpc points tagged 0 (all registered points) |
45
|
|
|
real_mask = in_region["new_hpc_tag"] == 0 |
46
|
|
|
real_in_region = in_region.loc[real_mask] |
47
|
|
|
num_hpc_real = real_in_region["hpc_count"].sum() |
48
|
|
|
|
49
|
|
|
if num_hpc_real < num_hpc: |
50
|
|
|
sim_in_region = in_region.loc[~real_mask] |
51
|
|
|
sim_in_region = sim_in_region.loc[in_region["new_hpc_index"] > 0] |
52
|
|
|
sim_in_region_sorted = sim_in_region.sort_values( |
53
|
|
|
"potential", ascending=False |
54
|
|
|
) |
55
|
|
|
additional_hpc = int( |
56
|
|
|
min(num_hpc - num_hpc_real, len(sim_in_region.index)) |
57
|
|
|
) |
58
|
|
|
selected_hpc = sim_in_region_sorted.iloc[:additional_hpc] |
59
|
|
|
real_in_region = pd.concat([real_in_region, selected_hpc]) |
60
|
|
|
if not len(real_in_region.index): |
61
|
|
|
logger.warning( |
62
|
|
|
f"No potential charging points found in region {uc_dict['key']}!" |
63
|
|
|
) |
64
|
|
|
else: |
65
|
|
|
real_in_region["potential"] = ( |
66
|
|
|
real_in_region["potential"] * real_in_region["hpc_count"] |
67
|
|
|
) |
68
|
|
|
total_potential = real_in_region["potential"].sum() |
69
|
|
|
real_in_region = real_in_region.assign( |
70
|
|
|
share=real_in_region["potential"] / total_potential |
71
|
|
|
).round(6) |
72
|
|
|
real_in_region["exists"] = real_in_region["new_hpc_tag"] == 0 |
73
|
|
|
|
74
|
|
|
# outputs |
75
|
|
|
logger.debug( |
76
|
|
|
f"{round(energy_sum, 1)} kWh got fastcharged in region {uc_dict['key']}." |
77
|
|
|
) |
78
|
|
|
|
79
|
|
|
return gpd.GeoDataFrame(real_in_region, crs=hpc_points.crs) |
80
|
|
|
|
81
|
|
|
|
82
|
|
|
def public( |
83
|
|
|
public_points: gpd.GeoDataFrame, |
84
|
|
|
public_data: gpd.GeoDataFrame, |
85
|
|
|
uc_dict: dict, |
86
|
|
|
) -> gpd.GeoDataFrame: |
87
|
|
|
""" |
88
|
|
|
Calculate placements and energy distribution for use case hpc. |
89
|
|
|
|
90
|
|
|
:param public_points: gpd.GeoDataFrame |
91
|
|
|
existing public charging points |
92
|
|
|
:param public_data: gpd.GeoDataFrame |
93
|
|
|
clustered POI |
94
|
|
|
:param uc_dict: dict |
95
|
|
|
contains basic run info like region boundary and save directory |
96
|
|
|
""" |
97
|
|
|
|
98
|
|
|
uc_id = "public" |
99
|
|
|
logger.debug(f"Use case: {uc_id}") |
100
|
|
|
|
101
|
|
|
num_public = 10**6 |
102
|
|
|
energy_sum = 1 |
103
|
|
|
|
104
|
|
|
# filter hpc points by region |
105
|
|
|
in_region_bool = public_points["geometry"].within(uc_dict["region"].iat[0]) |
106
|
|
|
in_region = public_points.loc[in_region_bool] |
107
|
|
|
|
108
|
|
|
poi_in_region_bool = public_data["geometry"].within( |
109
|
|
|
uc_dict["region"].iat[0] |
110
|
|
|
) |
111
|
|
|
poi_in_region = public_data.loc[poi_in_region_bool] |
112
|
|
|
|
113
|
|
|
num_public_real = in_region["count"].sum() |
114
|
|
|
|
115
|
|
|
# match with clusters anyway (for weights) |
116
|
|
|
region_points, region_poi = match_existing_points(in_region, poi_in_region) |
117
|
|
|
region_points["exists"] = True |
118
|
|
|
|
119
|
|
|
if num_public_real < num_public: |
120
|
|
|
additional_public = num_public - num_public_real |
121
|
|
|
# distribute additional public points via POI |
122
|
|
|
add_points = distribute_by_poi(region_poi, additional_public) |
123
|
|
|
region_points = pd.concat([region_points, add_points]) |
124
|
|
|
|
125
|
|
|
region_points["energy"] = ( |
126
|
|
|
region_points["potential"] |
127
|
|
|
/ region_points["potential"].sum() |
128
|
|
|
* energy_sum |
129
|
|
|
) |
130
|
|
|
|
131
|
|
|
# outputs |
132
|
|
|
logger.debug( |
133
|
|
|
f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." |
134
|
|
|
) |
135
|
|
|
|
136
|
|
|
return gpd.GeoDataFrame(region_points, crs=public_points.crs) |
137
|
|
|
|
138
|
|
|
|
139
|
|
|
def distribute_by_poi(region_poi: gpd.GeoDataFrame, num_points: int | float): |
140
|
|
|
# sort clusters without existing points by weight, then choose highest |
141
|
|
|
region_poi = region_poi.copy() |
142
|
|
|
region_poi.sort_values("potential", inplace=True, ascending=False) |
143
|
|
|
num_points = int(min(num_points, len(region_poi.index))) |
144
|
|
|
# choose point in cluster that is closest to big street |
145
|
|
|
return region_poi.iloc[:num_points] |
146
|
|
|
|
147
|
|
|
|
148
|
|
|
def match_existing_points( |
149
|
|
|
region_points: gpd.GeoDataFrame, region_poi: gpd.GeoDataFrame |
150
|
|
|
): |
151
|
|
|
|
152
|
|
|
region_poi = region_poi.assign(exists=False) |
153
|
|
|
poi_buffer = region_poi.buffer(region_poi["radius"].astype(int)) |
154
|
|
|
region_points = region_points.assign(potential=0) |
155
|
|
|
for i in region_points.index: |
156
|
|
|
lis_point = region_points.at[i, "geometry"] |
157
|
|
|
cluster = poi_buffer.contains(lis_point) |
158
|
|
|
clusters = region_poi.loc[cluster] |
159
|
|
|
num_clusters = len(clusters.index) |
160
|
|
|
|
161
|
|
|
if num_clusters == 0: |
162
|
|
|
# decent average as fallback |
163
|
|
|
region_points.at[i, "potential"] = 5 |
164
|
|
|
elif num_clusters == 1: |
165
|
|
|
region_points.at[i, "potential"] = clusters["potential"] |
166
|
|
|
region_poi.loc[cluster, "exists"] = True |
167
|
|
|
|
168
|
|
|
elif num_clusters > 1: |
169
|
|
|
# choose cluster with closest Point |
170
|
|
|
dist = clusters.distance(lis_point) |
171
|
|
|
idx = dist.idxmin() |
172
|
|
|
region_poi.at[idx, "exists"] = True |
173
|
|
|
region_points.at[i, "potential"] = clusters.at[idx, "potential"] |
174
|
|
|
|
175
|
|
|
# delete all clusters with exists = True |
176
|
|
|
region_poi = region_poi.loc[~region_poi["exists"]] |
177
|
|
|
|
178
|
|
|
return region_points, region_poi |
179
|
|
|
|
180
|
|
|
|
181
|
|
|
def home( |
182
|
|
|
home_data: gpd.GeoDataFrame, |
183
|
|
|
uc_dict: dict, |
184
|
|
|
) -> gpd.GeoDataFrame: |
185
|
|
|
""" |
186
|
|
|
Calculate placements and energy distribution for use case hpc. |
187
|
|
|
|
188
|
|
|
:param home_data: gpd.GeoDataFrame |
189
|
|
|
info about house types |
190
|
|
|
:param uc_dict: dict |
191
|
|
|
contains basic run info like region boundary and save directory |
192
|
|
|
""" |
193
|
|
|
uc_id = "home" |
194
|
|
|
logger.debug(f"Use case: {uc_id}") |
195
|
|
|
|
196
|
|
|
num_home = 1000000 |
197
|
|
|
energy_sum = 1 |
198
|
|
|
|
199
|
|
|
# filter houses by region |
200
|
|
|
in_region_bool = home_data["geometry"].within(uc_dict["region"].iat[0]) |
201
|
|
|
|
202
|
|
|
in_region = home_data.loc[in_region_bool] |
203
|
|
|
if in_region.empty: |
204
|
|
|
return in_region |
205
|
|
|
|
206
|
|
|
in_region = in_region.assign( |
207
|
|
|
num=in_region["num"].fillna(value=0), |
208
|
|
|
num_mfh=in_region["num_mfh"].fillna(value=0), |
209
|
|
|
) |
210
|
|
|
|
211
|
|
|
potential = apportion_home(in_region, num_home, uc_dict) |
212
|
|
|
|
213
|
|
|
in_region["charge_spots"] = potential |
214
|
|
|
in_region = in_region.loc[in_region["charge_spots"] > 0] |
215
|
|
|
in_region["energy"] = energy_sum * in_region["charge_spots"] / num_home |
216
|
|
|
in_region = in_region.sort_values(by="energy", ascending=False) |
217
|
|
|
|
218
|
|
|
logger.debug( |
219
|
|
|
f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." |
220
|
|
|
) |
221
|
|
|
|
222
|
|
|
return gpd.GeoDataFrame(in_region, crs=home_data.crs) |
223
|
|
|
|
224
|
|
|
|
225
|
|
|
def apportion_home(home_df: pd.DataFrame, num_spots: int, config: dict): |
226
|
|
|
# use parameters to set number of possible charge spots per row |
227
|
|
|
home_df["num_available"] = home_df[["num", "num_mfh"]].apply( |
228
|
|
|
home_charge_spots, axis=1, raw=True, args=(config,) |
229
|
|
|
) |
230
|
|
|
# if too many spots need to be placed, every house gets a spot |
231
|
|
|
if num_spots >= home_df["num_available"].sum(): |
232
|
|
|
logger.debug( |
233
|
|
|
f"All private home spots have been filled. Leftover: " |
234
|
|
|
f"{num_spots - home_df['num_available'].sum()}" |
235
|
|
|
) |
236
|
|
|
return home_df.loc[:, "num_available"] |
237
|
|
|
# distribute charge points based on houses per square |
238
|
|
|
samples = home_df.sample( |
239
|
|
|
num_spots, weights="num_available", random_state=1, replace=True |
240
|
|
|
) |
241
|
|
|
result = pd.Series([0] * len(home_df.index), index=home_df.index) |
242
|
|
|
for i in samples.index: |
243
|
|
|
result.at[i] += 1 |
244
|
|
|
return result |
245
|
|
|
|
246
|
|
|
|
247
|
|
|
def home_charge_spots(house_array: pd.Series | np.array, config: dict): |
248
|
|
|
# take number of houses, random seed, average spots per house and share of houses |
249
|
|
|
# with possible spots |
250
|
|
|
sfh = ( |
251
|
|
|
house_array[0] |
252
|
|
|
* config["sfh_avg_spots"] |
253
|
|
|
* max(config["random_seed"].normal(config["sfh_available"], 0.1), 0) |
254
|
|
|
) |
255
|
|
|
mfh = ( |
256
|
|
|
house_array[1] |
257
|
|
|
* config["mfh_avg_spots"] |
258
|
|
|
* max(config["random_seed"].normal(config["mfh_available"], 0.1), 0) |
259
|
|
|
) |
260
|
|
|
return round(sfh + mfh) |
261
|
|
|
|
262
|
|
|
|
263
|
|
|
def work( |
264
|
|
|
landuse: gpd.GeoDataFrame, |
265
|
|
|
weights_dict: dict, |
266
|
|
|
uc_dict: dict, |
267
|
|
|
) -> gpd.GeoDataFrame: |
268
|
|
|
""" |
269
|
|
|
Calculate placements and energy distribution for use case hpc. |
270
|
|
|
|
271
|
|
|
:param landuse: gpd.GeoDataFrame |
272
|
|
|
work areas by land use |
273
|
|
|
:param weights_dict: dict |
274
|
|
|
weights for different land use types |
275
|
|
|
:param uc_dict: dict |
276
|
|
|
contains basic run info like region boundary and save directory |
277
|
|
|
""" |
278
|
|
|
uc_id = "work" |
279
|
|
|
logger.debug(f"Use case: {uc_id}") |
280
|
|
|
|
281
|
|
|
energy_sum = 1 |
282
|
|
|
|
283
|
|
|
in_region_bool = landuse.within(uc_dict["region"].iat[0]) |
284
|
|
|
in_region = landuse[in_region_bool] |
285
|
|
|
|
286
|
|
|
# calculating the area of polygons |
287
|
|
|
in_region = in_region.assign(area=in_region["geometry"].area / 10**6) |
288
|
|
|
|
289
|
|
|
groups = in_region.groupby("landuse") |
290
|
|
|
group_labels = ["retail", "commercial", "industrial"] |
291
|
|
|
|
292
|
|
|
srid = DATASET_CFG["original_data"]["sources"]["tracbev"]["srid"] |
293
|
|
|
|
294
|
|
|
result = gpd.GeoDataFrame( |
295
|
|
|
columns=["geometry", "landuse", "potential"], crs=f"EPSG:{srid}" |
296
|
|
|
) |
297
|
|
|
|
298
|
|
|
for g in group_labels: |
299
|
|
|
if g in groups.groups: |
300
|
|
|
group = groups.get_group(g) |
301
|
|
|
group = group.assign( |
302
|
|
|
potential=group["geometry"].area * weights_dict[g] |
303
|
|
|
) |
304
|
|
|
group.to_crs(srid) |
305
|
|
|
result = gpd.GeoDataFrame( |
306
|
|
|
pd.concat([result, group]), crs=f"EPSG:{srid}" |
307
|
|
|
) |
308
|
|
|
|
309
|
|
|
result["energy"] = ( |
310
|
|
|
result["potential"] * energy_sum / result["potential"].sum() |
311
|
|
|
) |
312
|
|
|
# outputs |
313
|
|
|
logger.debug( |
314
|
|
|
f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." |
315
|
|
|
) |
316
|
|
|
|
317
|
|
|
return gpd.GeoDataFrame(result, crs=landuse.crs) |
318
|
|
|
|