Completed
Push — dev ( 8582b4...82307e )
by
unknown
30s queued 19s
created

pipeline_status_quo   A

Complexity

Total Complexity 0

Size/Duplication

Total Lines 532
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 0
eloc 324
dl 0
loc 532
rs 10
c 0
b 0
f 0
1
import os
2
3
from airflow.utils.dates import days_ago
4
import airflow
5
6
from egon.data.config import settings as egon_settings
7
from egon.data.config import set_numexpr_threads
8
from egon.data.datasets import database
9
from egon.data.datasets.ch4_prod import CH4Production
10
from egon.data.datasets.ch4_storages import CH4Storages
11
from egon.data.datasets.chp import Chp
12
from egon.data.datasets.chp_etrago import ChpEtrago
13
from egon.data.datasets.data_bundle import DataBundle
14
from egon.data.datasets.demandregio import DemandRegio
15
from egon.data.datasets.district_heating_areas import DistrictHeatingAreas
16
from egon.data.datasets.electrical_neighbours import ElectricalNeighbours
17
from egon.data.datasets.electricity_demand import (
18
    CtsElectricityDemand,
19
    HouseholdElectricityDemand,
20
)
21
from egon.data.datasets.electricity_demand_etrago import ElectricalLoadEtrago
22
from egon.data.datasets.electricity_demand_timeseries import (
23
    hh_buildings,
24
    hh_profiles,
25
)
26
from egon.data.datasets.electricity_demand_timeseries.cts_buildings import (
27
    CtsDemandBuildings,
28
)
29
from egon.data.datasets.emobility.motorized_individual_travel import (
30
    MotorizedIndividualTravel,
31
)
32
from egon.data.datasets.emobility.motorized_individual_travel_charging_infrastructure import (  # noqa: E501
33
    MITChargingInfrastructure,
34
)
35
from egon.data.datasets.era5 import WeatherData
36
from egon.data.datasets.etrago_setup import EtragoSetup
37
from egon.data.datasets.fill_etrago_gen import Egon_etrago_gen
38
from egon.data.datasets.fix_ehv_subnetworks import FixEhvSubnetworks
39
from egon.data.datasets.gas_areas import GasAreas
40
from egon.data.datasets.gas_grid import GasNodesAndPipes
41
from egon.data.datasets.gas_neighbours import GasNeighbours
42
from egon.data.datasets.heat_demand import HeatDemandImport
43
from egon.data.datasets.heat_demand_europe import HeatDemandEurope
44
from egon.data.datasets.heat_demand_timeseries import HeatTimeSeries
45
from egon.data.datasets.heat_etrago import HeatEtrago
46
from egon.data.datasets.heat_etrago.hts_etrago import HtsEtragoTable
47
from egon.data.datasets.heat_supply import HeatSupply
48
from egon.data.datasets.heat_supply.individual_heating import HeatPumpsStatusQuo
49
from egon.data.datasets.industrial_sites import MergeIndustrialSites
50
from egon.data.datasets.industry import IndustrialDemandCurves
51
from egon.data.datasets.loadarea import LoadArea, OsmLanduse
52
from egon.data.datasets.mastr import mastr_data_setup
53
from egon.data.datasets.mv_grid_districts import mv_grid_districts_setup
54
from egon.data.datasets.osm import OpenStreetMap
55
from egon.data.datasets.osm_buildings_streets import OsmBuildingsStreets
56
from egon.data.datasets.osmtgmod import Osmtgmod
57
from egon.data.datasets.power_etrago import OpenCycleGasTurbineEtrago
58
from egon.data.datasets.power_plants import PowerPlants
59
from egon.data.datasets.pypsaeur import PreparePypsaEur, RunPypsaEur
60
from egon.data.datasets.renewable_feedin import RenewableFeedin
61
from egon.data.datasets.scenario_capacities import ScenarioCapacities
62
from egon.data.datasets.scenario_parameters import ScenarioParameters
63
from egon.data.datasets.society_prognosis import SocietyPrognosis
64
from egon.data.datasets.storages import Storages
65
from egon.data.datasets.storages_etrago import StorageEtrago
66
from egon.data.datasets.substation import SubstationExtraction
67
from egon.data.datasets.substation_voronoi import SubstationVoronoi
68
from egon.data.datasets.tyndp import Tyndp
69
from egon.data.datasets.vg250 import Vg250
70
from egon.data.datasets.vg250_mv_grid_districts import Vg250MvGridDistricts
71
from egon.data.datasets.zensus import ZensusMiscellaneous, ZensusPopulation
72
from egon.data.datasets.zensus_mv_grid_districts import ZensusMvGridDistricts
73
from egon.data.datasets.zensus_vg250 import ZensusVg250
74
75
# Set number of threads used by numpy and pandas
76
set_numexpr_threads()
77
78
prefix = egon_settings()["egon-data"].get("--prefix")
79
prefix = "" if prefix is None else f"{prefix}-"
80
81
with airflow.DAG(
82
    f"{prefix}powerd-status-quo-processing-pipeline",
83
    description="The PoWerD Status Quo data processing DAG.",
84
    default_args={"start_date": days_ago(1),
85
                  "email_on_failure": False,
86
                  "email":"[email protected]"},
87
    template_searchpath=[
88
        os.path.abspath(
89
            os.path.join(
90
                os.path.dirname(__file__), "..", "..", "processing", "vg250"
91
            )
92
        )
93
    ],
94
    is_paused_upon_creation=False,
95
    schedule_interval=None,
96
) as pipeline:
97
98
    tasks = pipeline.task_dict
99
100
    setup = database.Setup()
101
102
    osm = OpenStreetMap(dependencies=[setup])
103
104
    data_bundle = DataBundle(dependencies=[setup])
105
106
    # Import VG250 (Verwaltungsgebiete 250) data
107
    vg250 = Vg250(dependencies=[setup])
108
109
    # Scenario table
110
    scenario_parameters = ScenarioParameters(dependencies=[setup])
111
112
    # Download TYNDP data
113
    tyndp_data = Tyndp(dependencies=[setup])  # TODO: kick out or adjust
114
115
    # Import zensus population
116
    zensus_population = ZensusPopulation(dependencies=[setup, vg250])
117
118
    # Combine zensus and VG250 data
119
    zensus_vg250 = ZensusVg250(dependencies=[vg250, zensus_population])
120
121
    # Download and import zensus data on households, buildings and apartments
122
    zensus_miscellaneous = ZensusMiscellaneous(
123
        dependencies=[zensus_population, zensus_vg250]
124
    )
125
126
    # Import DemandRegio data
127
    demandregio = DemandRegio(
128
        dependencies=[data_bundle, scenario_parameters, setup, vg250]
129
    )
130
131
    # Society prognosis
132
    society_prognosis = SocietyPrognosis(
133
        dependencies=[demandregio, zensus_miscellaneous]
134
    )
135
136
    # OSM (OpenStreetMap) buildings, streets and amenities
137
    osm_buildings_streets = OsmBuildingsStreets(
138
        dependencies=[osm, zensus_miscellaneous]
139
    )
140
141
    # Import weather data
142
    weather_data = WeatherData(
143
        dependencies=[scenario_parameters, setup, vg250]
144
    )
145
146
    # Future national heat demands for foreign countries based on Hotmaps
147
    # download only, processing in PyPSA-Eur-Sec fork
148
    hd_abroad = HeatDemandEurope(dependencies=[setup])
149
150
    # Set eTraGo input tables
151
    setup_etrago = EtragoSetup(dependencies=[setup])
152
153
    substation_extraction = SubstationExtraction(dependencies=[osm, vg250])
154
155
    # Generate the osmTGmod ehv/hv grid model
156
    osmtgmod = Osmtgmod(
157
        dependencies=[
158
            scenario_parameters,
159
            setup_etrago,
160
            substation_extraction,
161
            tasks["osm.download"],
162
        ]
163
    )
164
165
    # Fix eHV subnetworks in Germany manually
166
    fix_subnetworks = FixEhvSubnetworks(dependencies=[osmtgmod])
167
168
    # Retrieve MaStR (Marktstammdatenregister) data
169
    mastr_data = mastr_data_setup(dependencies=[setup])
170
171
    # Create Voronoi polygons
172
    substation_voronoi = SubstationVoronoi(
173
        dependencies=[tasks["osmtgmod.substation.extract"], vg250]
174
    )
175
176
    # MV (medium voltage) grid districts
177
    mv_grid_districts = mv_grid_districts_setup(
178
        dependencies=[substation_voronoi]
179
    )
180
181
    # Calculate future heat demand based on Peta5_0_1 data
182
    heat_demand_Germany = HeatDemandImport(
183
        dependencies=[scenario_parameters, vg250, zensus_vg250]
184
    )
185
186
    # Extract landuse areas from the `osm` dataset
187
    osm_landuse = OsmLanduse(dependencies=[osm, vg250])
188
189
    # Calculate feedin from renewables
190
    renewable_feedin = RenewableFeedin(
191
        dependencies=[vg250, zensus_vg250, weather_data]
192
    )
193
194
    # Demarcate district heating areas
195
    district_heating_areas = DistrictHeatingAreas(
196
        dependencies=[
197
            heat_demand_Germany,
198
            scenario_parameters,
199
            zensus_miscellaneous,
200
        ]
201
    )
202
203
    # TODO: What does "trans" stand for?
204
    # Calculate dynamic line rating for HV (high voltage) trans lines
205
    # dlr = Calculate_dlr(
206
    #    dependencies=[data_bundle, osmtgmod, weather_data] # , fix_subnetworks]
207
    #)
208
209
    # Map zensus grid districts
210
    zensus_mv_grid_districts = ZensusMvGridDistricts(
211
        dependencies=[mv_grid_districts, zensus_population]
212
    )
213
214
    # Map federal states to mv_grid_districts
215
    vg250_mv_grid_districts = Vg250MvGridDistricts(
216
        dependencies=[mv_grid_districts, vg250]
217
    )
218
219
    # Create household demand profiles on zensus level
220
    hh_demand_profiles_setup = hh_profiles.HouseholdDemands(
221
        dependencies=[
222
            demandregio,
223
            tasks[
224
                "osm_buildings_streets"
225
                ".create-buildings-residential-zensus-mapping"
226
            ],
227
            vg250,
228
            zensus_miscellaneous,
229
            zensus_mv_grid_districts,
230
            zensus_vg250,
231
        ]
232
    )
233
234
    # Household electricity demand buildings
235
    hh_demand_buildings_setup = hh_buildings.setup(
236
        dependencies=[
237
            tasks[
238
                "electricity_demand_timeseries"
239
                ".hh_profiles"
240
                ".houseprofiles-in-census-cells"
241
            ]
242
        ]
243
    )
244
245
    # Get household electrical demands for cencus cells
246
    household_electricity_demand_annual = HouseholdElectricityDemand(
247
        dependencies=[
248
            tasks[
249
                "electricity_demand_timeseries"
250
                ".hh_buildings"
251
                ".map-houseprofiles-to-buildings"
252
            ]
253
        ]
254
    )
255
256
    # Distribute electrical CTS demands to zensus grid
257
    cts_electricity_demand_annual = CtsElectricityDemand(
258
        dependencies=[
259
            demandregio,
260
            heat_demand_Germany,
261
            # household_electricity_demand_annual,
262
            tasks["electricity_demand.create-tables"],
263
            tasks["etrago_setup.create-tables"],
264
            zensus_mv_grid_districts,
265
            zensus_vg250,
266
        ]
267
    )
268
269
    # Industry
270
    industrial_sites = MergeIndustrialSites(
271
        dependencies=[data_bundle, setup, vg250]
272
    )
273
    demand_curves_industry = IndustrialDemandCurves(
274
        dependencies=[
275
            demandregio,
276
            industrial_sites,
277
            osm_landuse,
278
            mv_grid_districts,
279
            osm,
280
        ]
281
    )
282
283
    # Electrical loads to eTraGo
284
    electrical_load_etrago = ElectricalLoadEtrago(
285
        dependencies=[
286
            cts_electricity_demand_annual,
287
            demand_curves_industry,
288
            hh_demand_buildings_setup,
289
            household_electricity_demand_annual,
290
            hh_demand_profiles_setup,
291
        ]
292
    )
293
294
    # Heat time Series
295
    heat_time_series = HeatTimeSeries(
296
        dependencies=[
297
            data_bundle,
298
            demandregio,
299
            heat_demand_Germany,
300
            district_heating_areas,
301
            vg250,
302
            zensus_mv_grid_districts,
303
            hh_demand_buildings_setup,
304
            weather_data,
305
        ]
306
    )
307
308
    cts_demand_buildings = CtsDemandBuildings(
309
        dependencies=[
310
            osm_buildings_streets,
311
            cts_electricity_demand_annual,
312
            hh_demand_buildings_setup,
313
            tasks["heat_demand_timeseries.export-etrago-cts-heat-profiles"],
314
        ]
315
    )
316
317
    prepare_pypsa_eur = PreparePypsaEur(
318
        dependencies=[
319
            weather_data,
320
            data_bundle,
321
        ]
322
    )
323
324
    # run pypsa-eur-sec
325
    run_pypsaeur = RunPypsaEur(
326
        dependencies=[
327
            prepare_pypsa_eur,
328
            weather_data,
329
            hd_abroad,
330
            osmtgmod,
331
            setup_etrago,
332
            data_bundle,
333
            electrical_load_etrago,
334
            heat_time_series,
335
        ]
336
    )
337
338
    # Deal with electrical neighbours
339
    foreign_lines = ElectricalNeighbours(
340
        dependencies=[prepare_pypsa_eur, tyndp_data, osmtgmod, fix_subnetworks]
341
    )
342
343
    # Import NEP (Netzentwicklungsplan) data
344
    scenario_capacities = ScenarioCapacities(
345
        dependencies=[
346
            data_bundle,
347
            run_pypsaeur,
348
            setup,
349
            vg250,
350
            zensus_population,
351
        ]
352
    )
353
354
    # Import gas grid
355
    gas_grid_insert_data = GasNodesAndPipes(
356
        dependencies=[
357
            data_bundle,
358
            foreign_lines,
359
            osmtgmod,
360
            scenario_parameters,
361
            tasks["etrago_setup.create-tables"],
362
        ]
363
    )
364
    # Create gas voronoi status quo
365
    create_gas_polygons_statusquo = GasAreas(
366
        dependencies=[setup_etrago, vg250, gas_grid_insert_data, substation_voronoi]
367
    )
368
369
    # Gas abroad
370
    gas_abroad_insert_data = GasNeighbours(
371
        dependencies=[
372
            gas_grid_insert_data,
373
            run_pypsaeur,
374
            foreign_lines,
375
            create_gas_polygons_statusquo,
376
        ]
377
    )
378
379
    # Import gas production
380
    gas_production_insert_data = CH4Production(
381
        dependencies=[create_gas_polygons_statusquo]
382
    )
383
384
    # Import CH4 storages
385
    insert_data_ch4_storages = CH4Storages(
386
        dependencies=[create_gas_polygons_statusquo]
387
    )
388
389
    # CHP locations
390
    chp = Chp(
391
        dependencies=[
392
            create_gas_polygons_statusquo,
393
            demand_curves_industry,
394
            district_heating_areas,
395
            industrial_sites,
396
            osm_landuse,
397
            mastr_data,
398
            mv_grid_districts,
399
        ]
400
    )
401
402
    # Power plants
403
    power_plants = PowerPlants(
404
        dependencies=[
405
            chp,
406
            cts_electricity_demand_annual,
407
            household_electricity_demand_annual,
408
            mastr_data,
409
            mv_grid_districts,
410
            renewable_feedin,
411
            scenario_parameters,
412
            setup,
413
            substation_extraction,
414
            tasks["etrago_setup.create-tables"],
415
            vg250_mv_grid_districts,
416
            zensus_mv_grid_districts,
417
        ]
418
    )
419
420
    create_ocgt = OpenCycleGasTurbineEtrago(
421
        dependencies=[create_gas_polygons_statusquo, power_plants]
422
    )
423
424
    # Fill eTraGo generators tables
425
    fill_etrago_generators = Egon_etrago_gen(
426
        dependencies=[power_plants, weather_data]
427
    )
428
429
    # Heat supply
430
    heat_supply = HeatSupply(
431
        dependencies=[
432
            chp,
433
            data_bundle,
434
            district_heating_areas,
435
            zensus_mv_grid_districts,
436
            scenario_capacities,
437
        ]
438
    )
439
440
441
    # Pumped hydro and home storage units
442
    storage_units = Storages(
443
        dependencies=[
444
            mastr_data,
445
            mv_grid_districts,
446
            power_plants,
447
            scenario_parameters,
448
            setup,
449
            vg250_mv_grid_districts,
450
            fill_etrago_generators,
451
        ]
452
    )
453
454
    # Heat to eTraGo
455
    heat_etrago = HeatEtrago(
456
        dependencies=[
457
            heat_supply,
458
            mv_grid_districts,
459
            renewable_feedin,
460
            setup_etrago,
461
            heat_time_series,
462
        ]
463
    )
464
465
    # CHP to eTraGo
466
    chp_etrago = ChpEtrago(dependencies=[chp, heat_etrago])
467
468
    # Heat pump disaggregation for status quo
469
    heat_pumps_sq = HeatPumpsStatusQuo(
470
        dependencies=[
471
            cts_demand_buildings,
472
            DistrictHeatingAreas,
473
            heat_supply,
474
            heat_time_series,
475
            power_plants,
476
        ]
477
    )
478
479
    # HTS to eTraGo table
480
    hts_etrago_table = HtsEtragoTable(
481
        dependencies=[
482
            district_heating_areas,
483
            heat_etrago,
484
            heat_time_series,
485
            mv_grid_districts,
486
            heat_pumps_sq,
487
        ]
488
    )
489
490
    # Storages to eTraGo
491
    storage_etrago = StorageEtrago(
492
        dependencies=[storage_units, scenario_parameters, setup_etrago]
493
    )
494
495
    # eMobility: motorized individual travel TODO: adjust for SQ
496
    emobility_mit = MotorizedIndividualTravel(
497
        dependencies=[
498
            data_bundle,
499
            mv_grid_districts,
500
            scenario_parameters,
501
            setup_etrago,
502
            zensus_mv_grid_districts,
503
            zensus_vg250,
504
            storage_etrago,
505
            hts_etrago_table,
506
            chp_etrago,
507
            heat_etrago,
508
            fill_etrago_generators,
509
            create_ocgt,
510
            gas_production_insert_data,
511
            insert_data_ch4_storages,
512
        ]
513
    )
514
515
    mit_charging_infrastructure = MITChargingInfrastructure(
516
        dependencies=[mv_grid_districts, hh_demand_buildings_setup]
517
    )
518
519
    # Create load areas
520
    load_areas = LoadArea(
521
        dependencies=[
522
            osm_landuse,
523
            zensus_vg250,
524
            household_electricity_demand_annual,
525
            tasks[
526
                "electricity_demand_timeseries"
527
                ".hh_buildings"
528
                ".get-building-peak-loads"
529
            ],
530
            cts_demand_buildings,
531
            demand_curves_industry,
532
        ]
533
    )
534
535
536