|
1
|
|
|
""" |
|
2
|
|
|
Motorized Individual Travel (MIT) |
|
3
|
|
|
|
|
4
|
|
|
Main module for preparation of model data (static and timeseries) for |
|
5
|
|
|
motorized individual travel. |
|
6
|
|
|
|
|
7
|
|
|
**Contents of this module** |
|
8
|
|
|
* Creation of DB tables |
|
9
|
|
|
* Download and preprocessing of vehicle registration data from KBA and BMVI |
|
10
|
|
|
* Calculate number of electric vehicles and allocate on different spatial |
|
11
|
|
|
levels. See :py:mod:`egon.data.metadata` |
|
12
|
|
|
* Extract and write pre-generated trips to DB |
|
13
|
|
|
|
|
14
|
|
|
**Configuration** |
|
15
|
|
|
|
|
16
|
|
|
The config of this dataset can be found in *datasets.yml* in section |
|
17
|
|
|
*emobility_mit*. |
|
18
|
|
|
|
|
19
|
|
|
**Scenarios and variations** |
|
20
|
|
|
|
|
21
|
|
|
* Scenario overview |
|
22
|
|
|
* Change scenario variation for 2050: adjust in |
|
23
|
|
|
emobility_mit->scenario->variation->eGon100RE |
|
24
|
|
|
|
|
25
|
|
|
**Trip data** |
|
26
|
|
|
|
|
27
|
|
|
The electric vehicles' trip data for each scenario have been generated using |
|
28
|
|
|
`simBEV <https://github.com/rl-institut/simbev/>`_. The methodical background |
|
29
|
|
|
is given in its `documentation <https://simbev.readthedocs.io>`_. |
|
30
|
|
|
|
|
31
|
|
|
6 different vehicle types are used: |
|
32
|
|
|
* Battery Electric Vehicle (BEV): mini, medium, luxury |
|
33
|
|
|
* Plug-in Hybrid Electric Vehicle (PHEV): mini, medium, luxury |
|
34
|
|
|
|
|
35
|
|
|
.. csv-table:: EV types |
|
36
|
|
|
:header: "Tecnnology", "Size", "Max. charging capacity slow [kW]", |
|
37
|
|
|
"Max. charging capacity fast [kW]", "Battery capacity [kWh]", |
|
38
|
|
|
"Energy consumption [kWh/km]" |
|
39
|
|
|
:widths: 10, 10, 30, 30, 25, 30 |
|
40
|
|
|
|
|
41
|
|
|
"BEV", "mini", 11, 120, 60, 0.1397 |
|
42
|
|
|
"BEV", "medium", 22, 350, 90, 0.1746 |
|
43
|
|
|
"BEV", "luxury", 50, 350, 110, 0.2096 |
|
44
|
|
|
"PHEV", "mini", 3.7, 40, 14, 0.1425 |
|
45
|
|
|
"PHEV", "medium", 11, 40, 20, 0.1782 |
|
46
|
|
|
"PHEV", "luxury", 11, 120, 30, 0.2138 |
|
47
|
|
|
|
|
48
|
|
|
The complete tech data and assumptions of the run can be found in the metadata: |
|
49
|
|
|
*<WORKING_DIRECTORY>/data_bundle_egon_data/emobility/mit_trip_data/<SCENARIO>/ |
|
50
|
|
|
metadata_simbev_run.json*.efficiency_fixed |
|
51
|
|
|
|
|
52
|
|
|
* explain scenario parameters |
|
53
|
|
|
|
|
54
|
|
|
* run params (all in meta file?) |
|
55
|
|
|
|
|
56
|
|
|
**EV allocation** |
|
57
|
|
|
|
|
58
|
|
|
The EVs per registration district (Zulassungsbezirk) is taken from KBA's |
|
59
|
|
|
vehicle registration data. The numbers per EV type (BEV and PHEV) |
|
60
|
|
|
|
|
61
|
|
|
* RegioStaR7 |
|
62
|
|
|
* scenario parameters: shares |
|
63
|
|
|
|
|
64
|
|
|
**Further notes** |
|
65
|
|
|
|
|
66
|
|
|
* Sanity checks |
|
67
|
|
|
|
|
68
|
|
|
**Model paametrization** |
|
69
|
|
|
|
|
70
|
|
|
**Example queries** |
|
71
|
|
|
|
|
72
|
|
|
|
|
73
|
|
|
""" |
|
74
|
|
|
|
|
75
|
|
|
from pathlib import Path |
|
76
|
|
|
from urllib.request import urlretrieve |
|
77
|
|
|
import os |
|
78
|
|
|
import tarfile |
|
79
|
|
|
|
|
80
|
|
|
from airflow.operators.python_operator import PythonOperator |
|
81
|
|
|
from psycopg2.extensions import AsIs, register_adapter |
|
82
|
|
|
import numpy as np |
|
83
|
|
|
import pandas as pd |
|
84
|
|
|
|
|
85
|
|
|
from egon.data import db, subprocess |
|
86
|
|
|
from egon.data.datasets import Dataset |
|
87
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import ( # noqa: E501 |
|
88
|
|
|
EgonEvCountMunicipality, |
|
89
|
|
|
EgonEvCountMvGridDistrict, |
|
90
|
|
|
EgonEvCountRegistrationDistrict, |
|
91
|
|
|
EgonEvMetadata, |
|
92
|
|
|
EgonEvMvGridDistrict, |
|
93
|
|
|
EgonEvPool, |
|
94
|
|
|
EgonEvTrip, |
|
95
|
|
|
) |
|
96
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.ev_allocation import ( # noqa: E501 |
|
97
|
|
|
allocate_evs_numbers, |
|
98
|
|
|
allocate_evs_to_grid_districts, |
|
99
|
|
|
) |
|
100
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.helpers import ( |
|
101
|
|
|
COLUMNS_KBA, |
|
102
|
|
|
DATA_BUNDLE_DIR, |
|
103
|
|
|
DATASET_CFG, |
|
104
|
|
|
MVGD_MIN_COUNT, |
|
105
|
|
|
TESTMODE_OFF, |
|
106
|
|
|
TRIP_COLUMN_MAPPING, |
|
107
|
|
|
WORKING_DIR, |
|
108
|
|
|
) |
|
109
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.model_timeseries import ( # noqa: E501 |
|
110
|
|
|
delete_model_data_from_db, |
|
111
|
|
|
generate_model_data_bunch, |
|
112
|
|
|
generate_model_data_eGon100RE_remaining, |
|
113
|
|
|
generate_model_data_eGon2035_remaining, |
|
114
|
|
|
read_simbev_metadata_file, |
|
115
|
|
|
) |
|
116
|
|
|
|
|
117
|
|
|
|
|
118
|
|
|
# ========== Register np datatypes with SQLA ========== |
|
119
|
|
|
def adapt_numpy_float64(numpy_float64): |
|
120
|
|
|
return AsIs(numpy_float64) |
|
121
|
|
|
|
|
122
|
|
|
|
|
123
|
|
|
def adapt_numpy_int64(numpy_int64): |
|
124
|
|
|
return AsIs(numpy_int64) |
|
125
|
|
|
|
|
126
|
|
|
|
|
127
|
|
|
register_adapter(np.float64, adapt_numpy_float64) |
|
128
|
|
|
register_adapter(np.int64, adapt_numpy_int64) |
|
129
|
|
|
# ===================================================== |
|
130
|
|
|
|
|
131
|
|
|
|
|
132
|
|
|
def create_tables(): |
|
133
|
|
|
"""Create tables for electric vehicles |
|
134
|
|
|
|
|
135
|
|
|
Returns |
|
136
|
|
|
------- |
|
137
|
|
|
None |
|
138
|
|
|
""" |
|
139
|
|
|
|
|
140
|
|
|
engine = db.engine() |
|
141
|
|
|
EgonEvCountRegistrationDistrict.__table__.drop( |
|
142
|
|
|
bind=engine, checkfirst=True |
|
143
|
|
|
) |
|
144
|
|
|
EgonEvCountRegistrationDistrict.__table__.create( |
|
145
|
|
|
bind=engine, checkfirst=True |
|
146
|
|
|
) |
|
147
|
|
|
EgonEvCountMunicipality.__table__.drop(bind=engine, checkfirst=True) |
|
148
|
|
|
EgonEvCountMunicipality.__table__.create(bind=engine, checkfirst=True) |
|
149
|
|
|
EgonEvCountMvGridDistrict.__table__.drop(bind=engine, checkfirst=True) |
|
150
|
|
|
EgonEvCountMvGridDistrict.__table__.create(bind=engine, checkfirst=True) |
|
151
|
|
|
EgonEvPool.__table__.drop(bind=engine, checkfirst=True) |
|
152
|
|
|
EgonEvPool.__table__.create(bind=engine, checkfirst=True) |
|
153
|
|
|
EgonEvTrip.__table__.drop(bind=engine, checkfirst=True) |
|
154
|
|
|
EgonEvTrip.__table__.create(bind=engine, checkfirst=True) |
|
155
|
|
|
EgonEvMvGridDistrict.__table__.drop(bind=engine, checkfirst=True) |
|
156
|
|
|
EgonEvMvGridDistrict.__table__.create(bind=engine, checkfirst=True) |
|
157
|
|
|
EgonEvMetadata.__table__.drop(bind=engine, checkfirst=True) |
|
158
|
|
|
EgonEvMetadata.__table__.create(bind=engine, checkfirst=True) |
|
159
|
|
|
|
|
160
|
|
|
# Create dir for results, if it does not exist |
|
161
|
|
|
result_dir = WORKING_DIR / Path("results") |
|
162
|
|
|
result_dir.mkdir(exist_ok=True, parents=True) |
|
163
|
|
|
|
|
164
|
|
|
|
|
165
|
|
|
def download_and_preprocess(): |
|
166
|
|
|
"""Downloads and preprocesses data from KBA and BMVI |
|
167
|
|
|
|
|
168
|
|
|
Returns |
|
169
|
|
|
------- |
|
170
|
|
|
pandas.DataFrame |
|
171
|
|
|
Vehicle registration data for registration district |
|
172
|
|
|
pandas.DataFrame |
|
173
|
|
|
RegioStaR7 data |
|
174
|
|
|
""" |
|
175
|
|
|
|
|
176
|
|
|
mit_sources = DATASET_CFG["original_data"]["sources"] |
|
177
|
|
|
|
|
178
|
|
|
# Create the folder, if it does not exist |
|
179
|
|
|
if not os.path.exists(WORKING_DIR): |
|
180
|
|
|
os.mkdir(WORKING_DIR) |
|
181
|
|
|
|
|
182
|
|
|
################################ |
|
183
|
|
|
# Download and import KBA data # |
|
184
|
|
|
################################ |
|
185
|
|
|
url = mit_sources["KBA"]["url"] |
|
186
|
|
|
file = WORKING_DIR / mit_sources["KBA"]["file"] |
|
187
|
|
|
if not os.path.isfile(file): |
|
188
|
|
|
urlretrieve(url, file) |
|
189
|
|
|
|
|
190
|
|
|
kba_data = pd.read_excel( |
|
191
|
|
|
file, |
|
192
|
|
|
sheet_name=mit_sources["KBA"]["sheet"], |
|
193
|
|
|
usecols=mit_sources["KBA"]["columns"], |
|
194
|
|
|
skiprows=mit_sources["KBA"]["skiprows"], |
|
195
|
|
|
) |
|
196
|
|
|
kba_data.columns = COLUMNS_KBA |
|
197
|
|
|
kba_data.replace( |
|
198
|
|
|
" ", |
|
199
|
|
|
np.nan, |
|
200
|
|
|
inplace=True, |
|
201
|
|
|
) |
|
202
|
|
|
kba_data = kba_data.dropna() |
|
203
|
|
|
kba_data[ |
|
204
|
|
|
["ags_reg_district", "reg_district"] |
|
205
|
|
|
] = kba_data.reg_district.str.split( |
|
206
|
|
|
" ", |
|
207
|
|
|
1, |
|
208
|
|
|
expand=True, |
|
209
|
|
|
) |
|
210
|
|
|
kba_data.ags_reg_district = kba_data.ags_reg_district.astype("int") |
|
211
|
|
|
|
|
212
|
|
|
kba_data.to_csv( |
|
213
|
|
|
WORKING_DIR / mit_sources["KBA"]["file_processed"], index=None |
|
214
|
|
|
) |
|
215
|
|
|
|
|
216
|
|
|
####################################### |
|
217
|
|
|
# Download and import RegioStaR7 data # |
|
218
|
|
|
####################################### |
|
219
|
|
|
|
|
220
|
|
|
url = mit_sources["RS7"]["url"] |
|
221
|
|
|
file = WORKING_DIR / mit_sources["RS7"]["file"] |
|
222
|
|
|
if not os.path.isfile(file): |
|
223
|
|
|
urlretrieve(url, file) |
|
224
|
|
|
|
|
225
|
|
|
rs7_data = pd.read_excel(file, sheet_name=mit_sources["RS7"]["sheet"]) |
|
226
|
|
|
|
|
227
|
|
|
rs7_data["ags_district"] = ( |
|
228
|
|
|
rs7_data.gem_20.multiply(1 / 1000).apply(np.floor).astype("int") |
|
229
|
|
|
) |
|
230
|
|
|
rs7_data = rs7_data.rename( |
|
231
|
|
|
columns={"gem_20": "ags", "RegioStaR7": "rs7_id"} |
|
232
|
|
|
) |
|
233
|
|
|
rs7_data.rs7_id = rs7_data.rs7_id.astype("int") |
|
234
|
|
|
|
|
235
|
|
|
rs7_data.to_csv( |
|
236
|
|
|
WORKING_DIR / mit_sources["RS7"]["file_processed"], index=None |
|
237
|
|
|
) |
|
238
|
|
|
|
|
239
|
|
|
|
|
240
|
|
|
def extract_trip_file(): |
|
241
|
|
|
"""Extract trip file from data bundle""" |
|
242
|
|
|
trip_dir = DATA_BUNDLE_DIR / Path("mit_trip_data") |
|
243
|
|
|
|
|
244
|
|
|
for scenario_name in ["eGon2035", "eGon100RE"]: |
|
245
|
|
|
print(f"SCENARIO: {scenario_name}") |
|
246
|
|
|
trip_file = trip_dir / Path( |
|
247
|
|
|
DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][ |
|
248
|
|
|
"file" |
|
249
|
|
|
] |
|
250
|
|
|
) |
|
251
|
|
|
|
|
252
|
|
|
tar = tarfile.open(trip_file) |
|
253
|
|
|
if os.path.isfile(trip_file): |
|
254
|
|
|
tar.extractall(trip_dir) |
|
255
|
|
|
else: |
|
256
|
|
|
raise FileNotFoundError( |
|
257
|
|
|
f"Trip file {trip_file} not found in data bundle." |
|
258
|
|
|
) |
|
259
|
|
|
|
|
260
|
|
|
|
|
261
|
|
|
def write_evs_trips_to_db(): |
|
262
|
|
|
"""Write EVs and trips generated by simBEV from data bundle to database |
|
263
|
|
|
table |
|
264
|
|
|
""" |
|
265
|
|
|
|
|
266
|
|
|
def import_csv(f): |
|
267
|
|
|
df = pd.read_csv(f, usecols=TRIP_COLUMN_MAPPING.keys()) |
|
268
|
|
|
df["rs7_id"] = int(f.parent.name) |
|
269
|
|
|
df["simbev_ev_id"] = "_".join(f.name.split("_")[0:3]) |
|
270
|
|
|
return df |
|
271
|
|
|
|
|
272
|
|
|
for scenario_name in ["eGon2035", "eGon100RE"]: |
|
273
|
|
|
print(f"SCENARIO: {scenario_name}") |
|
274
|
|
|
trip_dir_name = Path( |
|
275
|
|
|
DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][ |
|
276
|
|
|
"file" |
|
277
|
|
|
].split(".")[0] |
|
278
|
|
|
) |
|
279
|
|
|
|
|
280
|
|
|
trip_dir_root = DATA_BUNDLE_DIR / Path("mit_trip_data", trip_dir_name) |
|
281
|
|
|
|
|
282
|
|
|
if TESTMODE_OFF: |
|
283
|
|
|
trip_files = list(trip_dir_root.glob("*/*.csv")) |
|
284
|
|
|
else: |
|
285
|
|
|
# Load only 1000 EVs per region if in test mode |
|
286
|
|
|
trip_files = [ |
|
287
|
|
|
list(rdir.glob("*.csv"))[:1000] |
|
288
|
|
|
for rdir in [_ for _ in trip_dir_root.iterdir() if _.is_dir()] |
|
289
|
|
|
] |
|
290
|
|
|
# Flatten |
|
291
|
|
|
trip_files = [i for sub in trip_files for i in sub] |
|
292
|
|
|
|
|
293
|
|
|
# Read, concat and reorder cols |
|
294
|
|
|
print(f"Importing {len(trip_files)} EV trip CSV files...") |
|
295
|
|
|
trip_data = pd.concat(map(import_csv, trip_files)) |
|
296
|
|
|
trip_data.rename(columns=TRIP_COLUMN_MAPPING, inplace=True) |
|
297
|
|
|
trip_data = trip_data.reset_index().rename( |
|
298
|
|
|
columns={"index": "simbev_event_id"} |
|
299
|
|
|
) |
|
300
|
|
|
cols = ["rs7_id", "simbev_ev_id", "simbev_event_id"] + list( |
|
301
|
|
|
TRIP_COLUMN_MAPPING.values() |
|
302
|
|
|
) |
|
303
|
|
|
trip_data.index.name = "event_id" |
|
304
|
|
|
trip_data = trip_data[cols] |
|
305
|
|
|
|
|
306
|
|
|
# Extract EVs from trips |
|
307
|
|
|
evs_unique = trip_data[["rs7_id", "simbev_ev_id"]].drop_duplicates() |
|
308
|
|
|
evs_unique = evs_unique.reset_index().drop(columns=["event_id"]) |
|
309
|
|
|
evs_unique.index.name = "ev_id" |
|
310
|
|
|
|
|
311
|
|
|
# Add EV id to trip DF |
|
312
|
|
|
trip_data["egon_ev_pool_ev_id"] = pd.merge( |
|
313
|
|
|
trip_data, evs_unique.reset_index(), on=["rs7_id", "simbev_ev_id"] |
|
314
|
|
|
)["ev_id"] |
|
315
|
|
|
|
|
316
|
|
|
# Split simBEV id into type and id |
|
317
|
|
|
evs_unique[["type", "simbev_ev_id"]] = evs_unique[ |
|
318
|
|
|
"simbev_ev_id" |
|
319
|
|
|
].str.rsplit("_", 1, expand=True) |
|
320
|
|
|
evs_unique.simbev_ev_id = evs_unique.simbev_ev_id.astype(int) |
|
321
|
|
|
evs_unique["scenario"] = scenario_name |
|
322
|
|
|
|
|
323
|
|
|
trip_data.drop(columns=["rs7_id", "simbev_ev_id"], inplace=True) |
|
324
|
|
|
trip_data["scenario"] = scenario_name |
|
325
|
|
|
trip_data.sort_index(inplace=True) |
|
326
|
|
|
|
|
327
|
|
|
# Write EVs to DB |
|
328
|
|
|
print("Writing EVs to DB pool...") |
|
329
|
|
|
evs_unique.to_sql( |
|
330
|
|
|
name=EgonEvPool.__table__.name, |
|
331
|
|
|
schema=EgonEvPool.__table__.schema, |
|
332
|
|
|
con=db.engine(), |
|
333
|
|
|
if_exists="append", |
|
334
|
|
|
index=True, |
|
335
|
|
|
) |
|
336
|
|
|
|
|
337
|
|
|
# Write trips to CSV and import to DB |
|
338
|
|
|
print("Writing EV trips to CSV file...") |
|
339
|
|
|
trip_file = WORKING_DIR / f"trip_data_{scenario_name}.csv" |
|
340
|
|
|
trip_data.to_csv(trip_file) |
|
341
|
|
|
|
|
342
|
|
|
# Get DB config |
|
343
|
|
|
docker_db_config = db.credentials() |
|
344
|
|
|
host = ["-h", f"{docker_db_config['HOST']}"] |
|
345
|
|
|
port = ["-p", f"{docker_db_config['PORT']}"] |
|
346
|
|
|
pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"] |
|
347
|
|
|
user = ["-U", f"{docker_db_config['POSTGRES_USER']}"] |
|
348
|
|
|
command = [ |
|
349
|
|
|
"-c", |
|
350
|
|
|
rf"\copy {EgonEvTrip.__table__.schema}.{EgonEvTrip.__table__.name}" |
|
351
|
|
|
rf"({','.join(trip_data.reset_index().columns)})" |
|
352
|
|
|
rf" FROM '{str(trip_file)}' DELIMITER ',' CSV HEADER;", |
|
353
|
|
|
] |
|
354
|
|
|
|
|
355
|
|
|
print("Importing EV trips from CSV file to DB...") |
|
356
|
|
|
subprocess.run( |
|
357
|
|
|
["psql"] + host + port + pgdb + user + command, |
|
358
|
|
|
env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]}, |
|
359
|
|
|
) |
|
360
|
|
|
|
|
361
|
|
|
os.remove(trip_file) |
|
362
|
|
|
|
|
363
|
|
|
|
|
364
|
|
|
def write_metadata_to_db(): |
|
365
|
|
|
""" |
|
366
|
|
|
Write used SimBEV metadata per scenario to database. |
|
367
|
|
|
""" |
|
368
|
|
|
dtypes = { |
|
369
|
|
|
"scenario": str, |
|
370
|
|
|
"eta_cp": float, |
|
371
|
|
|
"stepsize": int, |
|
372
|
|
|
"start_date": np.datetime64, |
|
373
|
|
|
"end_date": np.datetime64, |
|
374
|
|
|
"soc_min": float, |
|
375
|
|
|
"grid_timeseries": bool, |
|
376
|
|
|
"grid_timeseries_by_usecase": bool, |
|
377
|
|
|
} |
|
378
|
|
|
|
|
379
|
|
|
for scenario_name in ["eGon2035", "eGon100RE"]: |
|
380
|
|
|
meta_run_config = read_simbev_metadata_file( |
|
381
|
|
|
scenario_name, "config" |
|
382
|
|
|
).loc["basic"] |
|
383
|
|
|
|
|
384
|
|
|
meta_run_config = ( |
|
385
|
|
|
meta_run_config.to_frame() |
|
386
|
|
|
.T.assign(scenario=scenario_name)[dtypes.keys()] |
|
387
|
|
|
.astype(dtypes) |
|
388
|
|
|
) |
|
389
|
|
|
|
|
390
|
|
|
meta_run_config.to_sql( |
|
391
|
|
|
name=EgonEvMetadata.__table__.name, |
|
392
|
|
|
schema=EgonEvMetadata.__table__.schema, |
|
393
|
|
|
con=db.engine(), |
|
394
|
|
|
if_exists="append", |
|
395
|
|
|
index=False, |
|
396
|
|
|
) |
|
397
|
|
|
|
|
398
|
|
|
|
|
399
|
|
|
class MotorizedIndividualTravel(Dataset): |
|
400
|
|
|
def __init__(self, dependencies): |
|
401
|
|
|
def generate_model_data_tasks(scenario_name): |
|
402
|
|
|
"""Dynamically generate tasks for model data creation. |
|
403
|
|
|
|
|
404
|
|
|
The goal is to speed up the creation of model timeseries. However, |
|
405
|
|
|
the exact number of parallel task cannot be determined during the |
|
406
|
|
|
DAG building as the number of grid districts (MVGD) is calculated |
|
407
|
|
|
within another pipeline task. |
|
408
|
|
|
Approach: assuming an approx. count of `mvgd_min_count` of 3700, |
|
409
|
|
|
the majority of the MVGDs can be parallelized. The remainder is |
|
410
|
|
|
handled subsequently in XXX. |
|
411
|
|
|
The number of parallel tasks is defined via parameter |
|
412
|
|
|
`parallel_tasks` in the dataset config `datasets.yml`. |
|
413
|
|
|
|
|
414
|
|
|
Parameters |
|
415
|
|
|
---------- |
|
416
|
|
|
scenario_name : str |
|
417
|
|
|
Scenario name |
|
418
|
|
|
|
|
419
|
|
|
Returns |
|
420
|
|
|
------- |
|
421
|
|
|
set of functools.partial |
|
422
|
|
|
The tasks. Each element is of |
|
423
|
|
|
:func:`egon.data.datasets.emobility.motorized_individual_travel.model_timeseries.generate_model_data` |
|
424
|
|
|
""" |
|
425
|
|
|
parallel_tasks = DATASET_CFG["model_timeseries"].get( |
|
426
|
|
|
"parallel_tasks", 1 |
|
427
|
|
|
) |
|
428
|
|
|
mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0] |
|
429
|
|
|
|
|
430
|
|
|
tasks = set() |
|
431
|
|
|
for _ in range(parallel_tasks): |
|
432
|
|
|
bunch = range(_ * mvgd_bunch_size, (_ + 1) * mvgd_bunch_size) |
|
433
|
|
|
tasks.add( |
|
434
|
|
|
PythonOperator( |
|
435
|
|
|
task_id=( |
|
436
|
|
|
f"generate_model_data_" |
|
437
|
|
|
f"{scenario_name}_" |
|
438
|
|
|
f"bunch{bunch[0]}-{bunch[-1]}" |
|
439
|
|
|
), |
|
440
|
|
|
python_callable=generate_model_data_bunch, |
|
441
|
|
|
op_kwargs={ |
|
442
|
|
|
"scenario_name": scenario_name, |
|
443
|
|
|
"bunch": bunch, |
|
444
|
|
|
}, |
|
445
|
|
|
) |
|
446
|
|
|
) |
|
447
|
|
|
|
|
448
|
|
|
if scenario_name == "eGon2035": |
|
449
|
|
|
tasks.add(generate_model_data_eGon2035_remaining) |
|
450
|
|
|
elif scenario_name == "eGon100RE": |
|
451
|
|
|
tasks.add(generate_model_data_eGon100RE_remaining) |
|
452
|
|
|
return tasks |
|
453
|
|
|
|
|
454
|
|
|
super().__init__( |
|
455
|
|
|
name="MotorizedIndividualTravel", |
|
456
|
|
|
version="0.0.6", |
|
457
|
|
|
dependencies=dependencies, |
|
458
|
|
|
tasks=( |
|
459
|
|
|
create_tables, |
|
460
|
|
|
{ |
|
461
|
|
|
( |
|
462
|
|
|
download_and_preprocess, |
|
463
|
|
|
allocate_evs_numbers, |
|
464
|
|
|
), |
|
465
|
|
|
( |
|
466
|
|
|
extract_trip_file, |
|
467
|
|
|
write_metadata_to_db, |
|
468
|
|
|
write_evs_trips_to_db, |
|
469
|
|
|
), |
|
470
|
|
|
}, |
|
471
|
|
|
allocate_evs_to_grid_districts, |
|
472
|
|
|
delete_model_data_from_db, |
|
473
|
|
|
{ |
|
474
|
|
|
*generate_model_data_tasks(scenario_name="eGon2035"), |
|
475
|
|
|
*generate_model_data_tasks(scenario_name="eGon100RE"), |
|
476
|
|
|
}, |
|
477
|
|
|
), |
|
478
|
|
|
) |
|
479
|
|
|
|