1
|
|
|
""" |
2
|
|
|
Main module for preparation of model data (static and timeseries) for |
3
|
|
|
motorized individual travel (MIT). |
4
|
|
|
|
5
|
|
|
**Contents of this module** |
6
|
|
|
* Creation of DB tables |
7
|
|
|
* Download and preprocessing of vehicle registration data from KBA and BMVI |
8
|
|
|
* Calculate number of electric vehicles and allocate on different spatial |
9
|
|
|
levels. |
10
|
|
|
* Extract and write pre-generated trips to DB |
11
|
|
|
|
12
|
|
|
""" |
13
|
|
|
|
14
|
|
|
from pathlib import Path |
15
|
|
|
from urllib.request import urlretrieve |
16
|
|
|
import os |
17
|
|
|
import tarfile |
18
|
|
|
|
19
|
|
|
from airflow.operators.python_operator import PythonOperator |
20
|
|
|
from psycopg2.extensions import AsIs, register_adapter |
21
|
|
|
import numpy as np |
22
|
|
|
import pandas as pd |
23
|
|
|
|
24
|
|
|
from egon.data import db, subprocess |
25
|
|
|
from egon.data.datasets import Dataset |
26
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import ( # noqa: E501 |
27
|
|
|
EgonEvCountMunicipality, |
28
|
|
|
EgonEvCountMvGridDistrict, |
29
|
|
|
EgonEvCountRegistrationDistrict, |
30
|
|
|
EgonEvMetadata, |
31
|
|
|
EgonEvMvGridDistrict, |
32
|
|
|
EgonEvPool, |
33
|
|
|
EgonEvTrip, |
34
|
|
|
) |
35
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.ev_allocation import ( # noqa: E501 |
36
|
|
|
allocate_evs_numbers, |
37
|
|
|
allocate_evs_to_grid_districts, |
38
|
|
|
) |
39
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.helpers import ( |
40
|
|
|
COLUMNS_KBA, |
41
|
|
|
DATA_BUNDLE_DIR, |
42
|
|
|
DATASET_CFG, |
43
|
|
|
MVGD_MIN_COUNT, |
44
|
|
|
TESTMODE_OFF, |
45
|
|
|
TRIP_COLUMN_MAPPING, |
46
|
|
|
WORKING_DIR, |
47
|
|
|
) |
48
|
|
|
from egon.data.datasets.emobility.motorized_individual_travel.model_timeseries import ( # noqa: E501 |
49
|
|
|
delete_model_data_from_db, |
50
|
|
|
generate_model_data_bunch, |
51
|
|
|
generate_model_data_eGon100RE_remaining, |
52
|
|
|
generate_model_data_eGon2035_remaining, |
53
|
|
|
read_simbev_metadata_file, |
54
|
|
|
) |
55
|
|
|
|
56
|
|
|
|
57
|
|
|
# ========== Register np datatypes with SQLA ========== |
58
|
|
|
def adapt_numpy_float64(numpy_float64): |
59
|
|
|
return AsIs(numpy_float64) |
60
|
|
|
|
61
|
|
|
|
62
|
|
|
def adapt_numpy_int64(numpy_int64): |
63
|
|
|
return AsIs(numpy_int64) |
64
|
|
|
|
65
|
|
|
|
66
|
|
|
register_adapter(np.float64, adapt_numpy_float64) |
67
|
|
|
register_adapter(np.int64, adapt_numpy_int64) |
68
|
|
|
# ===================================================== |
69
|
|
|
|
70
|
|
|
|
71
|
|
|
def create_tables(): |
72
|
|
|
"""Create tables for electric vehicles |
73
|
|
|
|
74
|
|
|
Returns |
75
|
|
|
------- |
76
|
|
|
None |
77
|
|
|
""" |
78
|
|
|
|
79
|
|
|
engine = db.engine() |
80
|
|
|
EgonEvCountRegistrationDistrict.__table__.drop( |
81
|
|
|
bind=engine, checkfirst=True |
82
|
|
|
) |
83
|
|
|
EgonEvCountRegistrationDistrict.__table__.create( |
84
|
|
|
bind=engine, checkfirst=True |
85
|
|
|
) |
86
|
|
|
EgonEvCountMunicipality.__table__.drop(bind=engine, checkfirst=True) |
87
|
|
|
EgonEvCountMunicipality.__table__.create(bind=engine, checkfirst=True) |
88
|
|
|
EgonEvCountMvGridDistrict.__table__.drop(bind=engine, checkfirst=True) |
89
|
|
|
EgonEvCountMvGridDistrict.__table__.create(bind=engine, checkfirst=True) |
90
|
|
|
EgonEvPool.__table__.drop(bind=engine, checkfirst=True) |
91
|
|
|
EgonEvPool.__table__.create(bind=engine, checkfirst=True) |
92
|
|
|
EgonEvTrip.__table__.drop(bind=engine, checkfirst=True) |
93
|
|
|
EgonEvTrip.__table__.create(bind=engine, checkfirst=True) |
94
|
|
|
EgonEvMvGridDistrict.__table__.drop(bind=engine, checkfirst=True) |
95
|
|
|
EgonEvMvGridDistrict.__table__.create(bind=engine, checkfirst=True) |
96
|
|
|
EgonEvMetadata.__table__.drop(bind=engine, checkfirst=True) |
97
|
|
|
EgonEvMetadata.__table__.create(bind=engine, checkfirst=True) |
98
|
|
|
|
99
|
|
|
# Create dir for results, if it does not exist |
100
|
|
|
result_dir = WORKING_DIR / Path("results") |
101
|
|
|
result_dir.mkdir(exist_ok=True, parents=True) |
102
|
|
|
|
103
|
|
|
|
104
|
|
|
def download_and_preprocess(): |
105
|
|
|
"""Downloads and preprocesses data from KBA and BMVI |
106
|
|
|
|
107
|
|
|
Returns |
108
|
|
|
------- |
109
|
|
|
pandas.DataFrame |
110
|
|
|
Vehicle registration data for registration district |
111
|
|
|
pandas.DataFrame |
112
|
|
|
RegioStaR7 data |
113
|
|
|
""" |
114
|
|
|
|
115
|
|
|
mit_sources = DATASET_CFG["original_data"]["sources"] |
116
|
|
|
|
117
|
|
|
# Create the folder, if it does not exist |
118
|
|
|
if not os.path.exists(WORKING_DIR): |
119
|
|
|
os.mkdir(WORKING_DIR) |
120
|
|
|
|
121
|
|
|
################################ |
122
|
|
|
# Download and import KBA data # |
123
|
|
|
################################ |
124
|
|
|
url = mit_sources["KBA"]["url"] |
125
|
|
|
file = WORKING_DIR / mit_sources["KBA"]["file"] |
126
|
|
|
if not os.path.isfile(file): |
127
|
|
|
urlretrieve(url, file) |
128
|
|
|
|
129
|
|
|
kba_data = pd.read_excel( |
130
|
|
|
file, |
131
|
|
|
sheet_name=mit_sources["KBA"]["sheet"], |
132
|
|
|
usecols=mit_sources["KBA"]["columns"], |
133
|
|
|
skiprows=mit_sources["KBA"]["skiprows"], |
134
|
|
|
) |
135
|
|
|
kba_data.columns = COLUMNS_KBA |
136
|
|
|
kba_data.replace( |
137
|
|
|
" ", |
138
|
|
|
np.nan, |
139
|
|
|
inplace=True, |
140
|
|
|
) |
141
|
|
|
kba_data = kba_data.dropna() |
142
|
|
|
kba_data[ |
143
|
|
|
["ags_reg_district", "reg_district"] |
144
|
|
|
] = kba_data.reg_district.str.split( |
145
|
|
|
" ", |
146
|
|
|
1, |
147
|
|
|
expand=True, |
148
|
|
|
) |
149
|
|
|
kba_data.ags_reg_district = kba_data.ags_reg_district.astype("int") |
150
|
|
|
|
151
|
|
|
kba_data.to_csv( |
152
|
|
|
WORKING_DIR / mit_sources["KBA"]["file_processed"], index=None |
153
|
|
|
) |
154
|
|
|
|
155
|
|
|
####################################### |
156
|
|
|
# Download and import RegioStaR7 data # |
157
|
|
|
####################################### |
158
|
|
|
|
159
|
|
|
url = mit_sources["RS7"]["url"] |
160
|
|
|
file = WORKING_DIR / mit_sources["RS7"]["file"] |
161
|
|
|
if not os.path.isfile(file): |
162
|
|
|
urlretrieve(url, file) |
163
|
|
|
|
164
|
|
|
rs7_data = pd.read_excel(file, sheet_name=mit_sources["RS7"]["sheet"]) |
165
|
|
|
|
166
|
|
|
rs7_data["ags_district"] = ( |
167
|
|
|
rs7_data.gem_20.multiply(1 / 1000).apply(np.floor).astype("int") |
168
|
|
|
) |
169
|
|
|
rs7_data = rs7_data.rename( |
170
|
|
|
columns={"gem_20": "ags", "RegioStaR7": "rs7_id"} |
171
|
|
|
) |
172
|
|
|
rs7_data.rs7_id = rs7_data.rs7_id.astype("int") |
173
|
|
|
|
174
|
|
|
rs7_data.to_csv( |
175
|
|
|
WORKING_DIR / mit_sources["RS7"]["file_processed"], index=None |
176
|
|
|
) |
177
|
|
|
|
178
|
|
|
|
179
|
|
|
def extract_trip_file(): |
180
|
|
|
"""Extract trip file from data bundle""" |
181
|
|
|
trip_dir = DATA_BUNDLE_DIR / Path("mit_trip_data") |
182
|
|
|
|
183
|
|
|
for scenario_name in ["eGon2035", "eGon100RE"]: |
184
|
|
|
print(f"SCENARIO: {scenario_name}") |
185
|
|
|
trip_file = trip_dir / Path( |
186
|
|
|
DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][ |
187
|
|
|
"file" |
188
|
|
|
] |
189
|
|
|
) |
190
|
|
|
|
191
|
|
|
tar = tarfile.open(trip_file) |
192
|
|
|
if os.path.isfile(trip_file): |
193
|
|
|
tar.extractall(trip_dir) |
194
|
|
|
else: |
195
|
|
|
raise FileNotFoundError( |
196
|
|
|
f"Trip file {trip_file} not found in data bundle." |
197
|
|
|
) |
198
|
|
|
|
199
|
|
|
|
200
|
|
|
def write_evs_trips_to_db(): |
201
|
|
|
"""Write EVs and trips generated by simBEV from data bundle to database |
202
|
|
|
table |
203
|
|
|
""" |
204
|
|
|
|
205
|
|
|
def import_csv(f): |
206
|
|
|
df = pd.read_csv(f, usecols=TRIP_COLUMN_MAPPING.keys()) |
207
|
|
|
df["rs7_id"] = int(f.parent.name) |
208
|
|
|
df["simbev_ev_id"] = "_".join(f.name.split("_")[0:3]) |
209
|
|
|
return df |
210
|
|
|
|
211
|
|
|
for scenario_name in ["eGon2035", "eGon100RE"]: |
212
|
|
|
print(f"SCENARIO: {scenario_name}") |
213
|
|
|
trip_dir_name = Path( |
214
|
|
|
DATASET_CFG["original_data"]["sources"]["trips"][scenario_name][ |
215
|
|
|
"file" |
216
|
|
|
].split(".")[0] |
217
|
|
|
) |
218
|
|
|
|
219
|
|
|
trip_dir_root = DATA_BUNDLE_DIR / Path("mit_trip_data", trip_dir_name) |
220
|
|
|
|
221
|
|
|
if TESTMODE_OFF: |
222
|
|
|
trip_files = list(trip_dir_root.glob("*/*.csv")) |
223
|
|
|
else: |
224
|
|
|
# Load only 1000 EVs per region if in test mode |
225
|
|
|
trip_files = [ |
226
|
|
|
list(rdir.glob("*.csv"))[:1000] |
227
|
|
|
for rdir in [_ for _ in trip_dir_root.iterdir() if _.is_dir()] |
228
|
|
|
] |
229
|
|
|
# Flatten |
230
|
|
|
trip_files = [i for sub in trip_files for i in sub] |
231
|
|
|
|
232
|
|
|
# Read, concat and reorder cols |
233
|
|
|
print(f"Importing {len(trip_files)} EV trip CSV files...") |
234
|
|
|
trip_data = pd.concat(map(import_csv, trip_files)) |
235
|
|
|
trip_data.rename(columns=TRIP_COLUMN_MAPPING, inplace=True) |
236
|
|
|
trip_data = trip_data.reset_index().rename( |
237
|
|
|
columns={"index": "simbev_event_id"} |
238
|
|
|
) |
239
|
|
|
cols = ["rs7_id", "simbev_ev_id", "simbev_event_id"] + list( |
240
|
|
|
TRIP_COLUMN_MAPPING.values() |
241
|
|
|
) |
242
|
|
|
trip_data.index.name = "event_id" |
243
|
|
|
trip_data = trip_data[cols] |
244
|
|
|
|
245
|
|
|
# Extract EVs from trips |
246
|
|
|
evs_unique = trip_data[["rs7_id", "simbev_ev_id"]].drop_duplicates() |
247
|
|
|
evs_unique = evs_unique.reset_index().drop(columns=["event_id"]) |
248
|
|
|
evs_unique.index.name = "ev_id" |
249
|
|
|
|
250
|
|
|
# Add EV id to trip DF |
251
|
|
|
trip_data["egon_ev_pool_ev_id"] = pd.merge( |
252
|
|
|
trip_data, evs_unique.reset_index(), on=["rs7_id", "simbev_ev_id"] |
253
|
|
|
)["ev_id"] |
254
|
|
|
|
255
|
|
|
# Split simBEV id into type and id |
256
|
|
|
evs_unique[["type", "simbev_ev_id"]] = evs_unique[ |
257
|
|
|
"simbev_ev_id" |
258
|
|
|
].str.rsplit("_", 1, expand=True) |
259
|
|
|
evs_unique.simbev_ev_id = evs_unique.simbev_ev_id.astype(int) |
260
|
|
|
evs_unique["scenario"] = scenario_name |
261
|
|
|
|
262
|
|
|
trip_data.drop(columns=["rs7_id", "simbev_ev_id"], inplace=True) |
263
|
|
|
trip_data["scenario"] = scenario_name |
264
|
|
|
trip_data.sort_index(inplace=True) |
265
|
|
|
|
266
|
|
|
# Write EVs to DB |
267
|
|
|
print("Writing EVs to DB pool...") |
268
|
|
|
evs_unique.to_sql( |
269
|
|
|
name=EgonEvPool.__table__.name, |
270
|
|
|
schema=EgonEvPool.__table__.schema, |
271
|
|
|
con=db.engine(), |
272
|
|
|
if_exists="append", |
273
|
|
|
index=True, |
274
|
|
|
) |
275
|
|
|
|
276
|
|
|
# Write trips to CSV and import to DB |
277
|
|
|
print("Writing EV trips to CSV file...") |
278
|
|
|
trip_file = WORKING_DIR / f"trip_data_{scenario_name}.csv" |
279
|
|
|
trip_data.to_csv(trip_file) |
280
|
|
|
|
281
|
|
|
# Get DB config |
282
|
|
|
docker_db_config = db.credentials() |
283
|
|
|
host = ["-h", f"{docker_db_config['HOST']}"] |
284
|
|
|
port = ["-p", f"{docker_db_config['PORT']}"] |
285
|
|
|
pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"] |
286
|
|
|
user = ["-U", f"{docker_db_config['POSTGRES_USER']}"] |
287
|
|
|
command = [ |
288
|
|
|
"-c", |
289
|
|
|
rf"\copy {EgonEvTrip.__table__.schema}.{EgonEvTrip.__table__.name}" |
290
|
|
|
rf"({','.join(trip_data.reset_index().columns)})" |
291
|
|
|
rf" FROM '{str(trip_file)}' DELIMITER ',' CSV HEADER;", |
292
|
|
|
] |
293
|
|
|
|
294
|
|
|
print("Importing EV trips from CSV file to DB...") |
295
|
|
|
subprocess.run( |
296
|
|
|
["psql"] + host + port + pgdb + user + command, |
297
|
|
|
env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]}, |
298
|
|
|
) |
299
|
|
|
|
300
|
|
|
os.remove(trip_file) |
301
|
|
|
|
302
|
|
|
|
303
|
|
|
def write_metadata_to_db(): |
304
|
|
|
""" |
305
|
|
|
Write used SimBEV metadata per scenario to database. |
306
|
|
|
""" |
307
|
|
|
dtypes = { |
308
|
|
|
"scenario": str, |
309
|
|
|
"eta_cp": float, |
310
|
|
|
"stepsize": int, |
311
|
|
|
"start_date": np.datetime64, |
312
|
|
|
"end_date": np.datetime64, |
313
|
|
|
"soc_min": float, |
314
|
|
|
"grid_timeseries": bool, |
315
|
|
|
"grid_timeseries_by_usecase": bool, |
316
|
|
|
} |
317
|
|
|
|
318
|
|
|
for scenario_name in ["eGon2035", "eGon100RE"]: |
319
|
|
|
meta_run_config = read_simbev_metadata_file( |
320
|
|
|
scenario_name, "config" |
321
|
|
|
).loc["basic"] |
322
|
|
|
|
323
|
|
|
meta_run_config = ( |
324
|
|
|
meta_run_config.to_frame() |
325
|
|
|
.T.assign(scenario=scenario_name)[dtypes.keys()] |
326
|
|
|
.astype(dtypes) |
327
|
|
|
) |
328
|
|
|
|
329
|
|
|
meta_run_config.to_sql( |
330
|
|
|
name=EgonEvMetadata.__table__.name, |
331
|
|
|
schema=EgonEvMetadata.__table__.schema, |
332
|
|
|
con=db.engine(), |
333
|
|
|
if_exists="append", |
334
|
|
|
index=False, |
335
|
|
|
) |
336
|
|
|
|
337
|
|
|
|
338
|
|
|
class MotorizedIndividualTravel(Dataset): |
339
|
|
|
""" |
340
|
|
|
Class to set up static and timeseries data for motorized individual travel (MIT). |
341
|
|
|
|
342
|
|
|
For more information see data documentation on :ref:`mobility-demand-mit-ref`. |
343
|
|
|
|
344
|
|
|
*Dependencies* |
345
|
|
|
* :py:class:`DataBundle <egon.data.datasets.data_bundle.DataBundle>` |
346
|
|
|
* :py:class:`MvGridDistricts |
347
|
|
|
<egon.data.datasets.mv_grid_districts.mv_grid_districts_setup>` |
348
|
|
|
* :py:class:`ScenarioParameters |
349
|
|
|
<egon.data.datasets.scenario_parameters.ScenarioParameters>` |
350
|
|
|
* :py:class:`EtragoSetup <egon.data.datasets.etrago_setup.EtragoSetup>` |
351
|
|
|
* :py:class:`ZensusMvGridDistricts |
352
|
|
|
<egon.data.datasets.zensus_mv_grid_districts.ZensusMvGridDistricts>` |
353
|
|
|
* :py:class:`ZensusVg250 <egon.data.datasets.zensus_vg250.ZensusVg250>` |
354
|
|
|
* :py:class:`StorageEtrago <egon.data.datasets.storages_etrago.StorageEtrago>` |
355
|
|
|
* :py:class:`HtsEtragoTable |
356
|
|
|
<egon.data.datasets.heat_etrago.hts_etrago.HtsEtragoTable>` |
357
|
|
|
* :py:class:`ChpEtrago <egon.data.datasets.chp_etrago.ChpEtrago>` |
358
|
|
|
* :py:class:`DsmPotential <egon.data.datasets.DSM_cts_ind.DsmPotential>` |
359
|
|
|
* :py:class:`HeatEtrago <egon.data.datasets.heat_etrago.HeatEtrago>` |
360
|
|
|
* :py:class:`Egon_etrago_gen <egon.data.datasets.fill_etrago_gen.Egon_etrago_gen>` |
361
|
|
|
* :py:class:`OpenCycleGasTurbineEtrago |
362
|
|
|
<egon.data.datasets.power_etrago.OpenCycleGasTurbineEtrago>` |
363
|
|
|
* :py:class:`HydrogenStoreEtrago |
364
|
|
|
<egon.data.datasets.hydrogen_etrago.HydrogenStoreEtrago>` |
365
|
|
|
* :py:class:`HydrogenPowerLinkEtrago |
366
|
|
|
<egon.data.datasets.hydrogen_etrago.HydrogenPowerLinkEtrago>` |
367
|
|
|
* :py:class:`HydrogenMethaneLinkEtrago |
368
|
|
|
<egon.data.datasets.hydrogen_etrago.HydrogenMethaneLinkEtrago>` |
369
|
|
|
* :py:class:`GasAreaseGon100RE <egon.data.datasets.gas_areas.GasAreaseGon100RE>` |
370
|
|
|
* :py:class:`CH4Production <egon.data.datasets.ch4_prod.CH4Production>` |
371
|
|
|
* :py:class:`CH4Storages <egon.data.datasets.ch4_storages.CH4Storages>` |
372
|
|
|
|
373
|
|
|
*Resulting Tables* |
374
|
|
|
* :py:class:`EgonEvPool <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvPool>` |
375
|
|
|
is created and filled |
376
|
|
|
* :py:class:`EgonEvTrip <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvTrip>` |
377
|
|
|
is created and filled |
378
|
|
|
* :py:class:`EgonEvCountRegistrationDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountRegistrationDistrict>` |
379
|
|
|
is created and filled |
380
|
|
|
* :py:class:`EgonEvCountMunicipality <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountMunicipality>` |
381
|
|
|
is created and filled |
382
|
|
|
* :py:class:`EgonEvCountMvGridDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvCountMvGridDistrict>` |
383
|
|
|
is created and filled |
384
|
|
|
* :py:class:`EgonEvMvGridDistrict <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvMvGridDistrict>` |
385
|
|
|
is created and filled |
386
|
|
|
* :py:class:`EgonEvMetadata <egon.data.datasets.emobility.motorized_individual_travel.db_classes.EgonEvMetadata>` |
387
|
|
|
is created and filled |
388
|
|
|
|
389
|
|
|
*Configuration* |
390
|
|
|
|
391
|
|
|
The config of this dataset can be found in *datasets.yml* in section |
392
|
|
|
*emobility_mit*. |
393
|
|
|
|
394
|
|
|
""" |
395
|
|
|
|
396
|
|
|
#: |
397
|
|
|
name: str = "MotorizedIndividualTravel" |
398
|
|
|
#: |
399
|
|
|
version: str = "0.0.7" |
400
|
|
|
|
401
|
|
|
def __init__(self, dependencies): |
402
|
|
|
def generate_model_data_tasks(scenario_name): |
403
|
|
|
"""Dynamically generate tasks for model data creation. |
404
|
|
|
|
405
|
|
|
The goal is to speed up the creation of model timeseries. However, |
406
|
|
|
the exact number of parallel task cannot be determined during the |
407
|
|
|
DAG building as the number of grid districts (MVGD) is calculated |
408
|
|
|
within another pipeline task. |
409
|
|
|
Approach: assuming an approx. count of `mvgd_min_count` of 3700, |
410
|
|
|
the majority of the MVGDs can be parallelized. The remainder is |
411
|
|
|
handled subsequently in XXX. |
412
|
|
|
The number of parallel tasks is defined via parameter |
413
|
|
|
`parallel_tasks` in the dataset config `datasets.yml`. |
414
|
|
|
|
415
|
|
|
Parameters |
416
|
|
|
---------- |
417
|
|
|
scenario_name : str |
418
|
|
|
Scenario name |
419
|
|
|
|
420
|
|
|
Returns |
421
|
|
|
------- |
422
|
|
|
set of functools.partial |
423
|
|
|
The tasks. Each element is of |
424
|
|
|
:func:`egon.data.datasets.emobility.motorized_individual_travel.model_timeseries.generate_model_data` |
425
|
|
|
""" |
426
|
|
|
parallel_tasks = DATASET_CFG["model_timeseries"].get( |
427
|
|
|
"parallel_tasks", 1 |
428
|
|
|
) |
429
|
|
|
mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0] |
430
|
|
|
|
431
|
|
|
tasks = set() |
432
|
|
|
for _ in range(parallel_tasks): |
433
|
|
|
bunch = range(_ * mvgd_bunch_size, (_ + 1) * mvgd_bunch_size) |
434
|
|
|
tasks.add( |
435
|
|
|
PythonOperator( |
436
|
|
|
task_id=( |
437
|
|
|
f"generate_model_data_" |
438
|
|
|
f"{scenario_name}_" |
439
|
|
|
f"bunch{bunch[0]}-{bunch[-1]}" |
440
|
|
|
), |
441
|
|
|
python_callable=generate_model_data_bunch, |
442
|
|
|
op_kwargs={ |
443
|
|
|
"scenario_name": scenario_name, |
444
|
|
|
"bunch": bunch, |
445
|
|
|
}, |
446
|
|
|
) |
447
|
|
|
) |
448
|
|
|
|
449
|
|
|
if scenario_name == "eGon2035": |
450
|
|
|
tasks.add(generate_model_data_eGon2035_remaining) |
451
|
|
|
elif scenario_name == "eGon100RE": |
452
|
|
|
tasks.add(generate_model_data_eGon100RE_remaining) |
453
|
|
|
return tasks |
454
|
|
|
|
455
|
|
|
super().__init__( |
456
|
|
|
name=self.name, |
457
|
|
|
version=self.version, |
458
|
|
|
dependencies=dependencies, |
459
|
|
|
tasks=( |
460
|
|
|
create_tables, |
461
|
|
|
{ |
462
|
|
|
( |
463
|
|
|
download_and_preprocess, |
464
|
|
|
allocate_evs_numbers, |
465
|
|
|
), |
466
|
|
|
( |
467
|
|
|
extract_trip_file, |
468
|
|
|
write_metadata_to_db, |
469
|
|
|
write_evs_trips_to_db, |
470
|
|
|
), |
471
|
|
|
}, |
472
|
|
|
allocate_evs_to_grid_districts, |
473
|
|
|
delete_model_data_from_db, |
474
|
|
|
{ |
475
|
|
|
*generate_model_data_tasks(scenario_name="eGon2035"), |
476
|
|
|
*generate_model_data_tasks(scenario_name="eGon100RE"), |
477
|
|
|
}, |
478
|
|
|
), |
479
|
|
|
) |
480
|
|
|
|