Passed
Pull Request — dev (#1170)
by
unknown
05:05
created

pipeline_status_quo   A

Complexity

Total Complexity 0

Size/Duplication

Total Lines 521
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 0
eloc 315
dl 0
loc 521
rs 10
c 0
b 0
f 0
1
import os
2
3
from airflow.utils.dates import days_ago
4
import airflow
5
6
from egon.data.config import set_numexpr_threads
7
from egon.data.datasets import database
8
from egon.data.datasets.ch4_prod import CH4Production
9
from egon.data.datasets.ch4_storages import CH4Storages
10
from egon.data.datasets.chp import Chp
11
from egon.data.datasets.chp_etrago import ChpEtrago
12
from egon.data.datasets.data_bundle import DataBundle
13
from egon.data.datasets.demandregio import DemandRegio
14
from egon.data.datasets.district_heating_areas import DistrictHeatingAreas
15
from egon.data.datasets.electrical_neighbours import ElectricalNeighbours
16
from egon.data.datasets.electricity_demand import (
17
    CtsElectricityDemand,
18
    HouseholdElectricityDemand,
19
)
20
from egon.data.datasets.electricity_demand_etrago import ElectricalLoadEtrago
21
from egon.data.datasets.electricity_demand_timeseries import (
22
    hh_buildings,
23
    hh_profiles,
24
)
25
from egon.data.datasets.electricity_demand_timeseries.cts_buildings import (
26
    CtsDemandBuildings,
27
)
28
from egon.data.datasets.emobility.motorized_individual_travel import (
29
    MotorizedIndividualTravel,
30
)
31
from egon.data.datasets.emobility.motorized_individual_travel_charging_infrastructure import (  # noqa: E501
32
    MITChargingInfrastructure,
33
)
34
from egon.data.datasets.era5 import WeatherData
35
from egon.data.datasets.etrago_setup import EtragoSetup
36
from egon.data.datasets.fill_etrago_gen import Egon_etrago_gen
37
from egon.data.datasets.fix_ehv_subnetworks import FixEhvSubnetworks
38
from egon.data.datasets.gas_areas import GasAreasStatusQuo
39
from egon.data.datasets.gas_grid import GasNodesAndPipes
40
from egon.data.datasets.gas_neighbours import GasNeighbours
41
from egon.data.datasets.heat_demand import HeatDemandImport
42
from egon.data.datasets.heat_demand_europe import HeatDemandEurope
43
from egon.data.datasets.heat_demand_timeseries import HeatTimeSeries
44
from egon.data.datasets.heat_etrago import HeatEtrago
45
from egon.data.datasets.heat_etrago.hts_etrago import HtsEtragoTable
46
from egon.data.datasets.heat_supply import HeatSupply
47
from egon.data.datasets.heat_supply.individual_heating import HeatPumpsStatusQuo
48
from egon.data.datasets.industrial_sites import MergeIndustrialSites
49
from egon.data.datasets.industry import IndustrialDemandCurves
50
from egon.data.datasets.loadarea import LoadArea, OsmLanduse
51
from egon.data.datasets.mastr import mastr_data_setup
52
from egon.data.datasets.mv_grid_districts import mv_grid_districts_setup
53
from egon.data.datasets.osm import OpenStreetMap
54
from egon.data.datasets.osm_buildings_streets import OsmBuildingsStreets
55
from egon.data.datasets.osmtgmod import Osmtgmod
56
from egon.data.datasets.power_etrago import OpenCycleGasTurbineEtrago
57
from egon.data.datasets.power_plants import PowerPlants
58
from egon.data.datasets.pypsaeursec import PypsaEurSec
59
from egon.data.datasets.renewable_feedin import RenewableFeedin
60
from egon.data.datasets.scenario_capacities import ScenarioCapacities
61
from egon.data.datasets.scenario_parameters import ScenarioParameters
62
from egon.data.datasets.society_prognosis import SocietyPrognosis
63
from egon.data.datasets.storages import Storages
64
from egon.data.datasets.storages_etrago import StorageEtrago
65
from egon.data.datasets.substation import SubstationExtraction
66
from egon.data.datasets.substation_voronoi import SubstationVoronoi
67
from egon.data.datasets.tyndp import Tyndp
68
from egon.data.datasets.vg250 import Vg250
69
from egon.data.datasets.vg250_mv_grid_districts import Vg250MvGridDistricts
70
from egon.data.datasets.zensus import ZensusMiscellaneous, ZensusPopulation
71
from egon.data.datasets.zensus_mv_grid_districts import ZensusMvGridDistricts
72
from egon.data.datasets.zensus_vg250 import ZensusVg250
73
74
# Set number of threads used by numpy and pandas
75
set_numexpr_threads()
76
77
78
79
with airflow.DAG(
80
    "powerd-status-quo-processing-pipeline",
81
    description="The PoWerD Status Quo data processing DAG.",
82
    default_args={"start_date": days_ago(1),
83
                  "email_on_failure": False,
84
                  "email":"[email protected]"},
85
    template_searchpath=[
86
        os.path.abspath(
87
            os.path.join(
88
                os.path.dirname(__file__), "..", "..", "processing", "vg250"
89
            )
90
        )
91
    ],
92
    is_paused_upon_creation=False,
93
    schedule_interval=None,
94
) as pipeline:
95
96
    tasks = pipeline.task_dict
97
98
    setup = database.Setup()
99
100
    osm = OpenStreetMap(dependencies=[setup])
101
102
    data_bundle = DataBundle(dependencies=[setup])
103
104
    # Import VG250 (Verwaltungsgebiete 250) data
105
    vg250 = Vg250(dependencies=[setup])
106
107
    # Scenario table
108
    scenario_parameters = ScenarioParameters(dependencies=[setup])
109
110
    # Download TYNDP data
111
    tyndp_data = Tyndp(dependencies=[setup]) #TODO: kick out or adjust
112
113
    # Import zensus population
114
    zensus_population = ZensusPopulation(dependencies=[setup, vg250])
115
116
    # Combine zensus and VG250 data
117
    zensus_vg250 = ZensusVg250(dependencies=[vg250, zensus_population])
118
119
    # Download and import zensus data on households, buildings and apartments
120
    zensus_miscellaneous = ZensusMiscellaneous(
121
        dependencies=[zensus_population, zensus_vg250]
122
    )
123
124
    # Import DemandRegio data
125
    demandregio = DemandRegio(
126
        dependencies=[data_bundle, scenario_parameters, setup, vg250]
127
    )
128
129
    # Society prognosis
130
    society_prognosis = SocietyPrognosis(
131
        dependencies=[demandregio, zensus_miscellaneous]
132
    )
133
134
    # OSM (OpenStreetMap) buildings, streets and amenities
135
    osm_buildings_streets = OsmBuildingsStreets(
136
        dependencies=[osm, zensus_miscellaneous]
137
    )
138
139
    # Import weather data
140
    weather_data = WeatherData(
141
        dependencies=[scenario_parameters, setup, vg250]
142
    )
143
144
    # Future national heat demands for foreign countries based on Hotmaps
145
    # download only, processing in PyPSA-Eur-Sec fork
146
    hd_abroad = HeatDemandEurope(dependencies=[setup])
147
148
    # Set eTraGo input tables
149
    setup_etrago = EtragoSetup(dependencies=[setup])
150
151
    substation_extraction = SubstationExtraction(dependencies=[osm, vg250])
152
153
    # Generate the osmTGmod ehv/hv grid model
154
    osmtgmod = Osmtgmod(
155
        dependencies=[
156
            scenario_parameters,
157
            setup_etrago,
158
            substation_extraction,
159
            tasks["osm.download"],
160
        ]
161
    )
162
163
    # Fix eHV subnetworks in Germany manually
164
    fix_subnetworks = FixEhvSubnetworks(dependencies=[osmtgmod])
165
166
    # Retrieve MaStR (Marktstammdatenregister) data
167
    mastr_data = mastr_data_setup(dependencies=[setup])
168
169
    # Create Voronoi polygons
170
    substation_voronoi = SubstationVoronoi(
171
        dependencies=[tasks["osmtgmod.substation.extract"], vg250]
172
    )
173
174
    # MV (medium voltage) grid districts
175
    mv_grid_districts = mv_grid_districts_setup(
176
        dependencies=[substation_voronoi]
177
    )
178
179
    # Calculate future heat demand based on Peta5_0_1 data
180
    heat_demand_Germany = HeatDemandImport(
181
        dependencies=[scenario_parameters, vg250, zensus_vg250]
182
    )
183
184
    # Extract landuse areas from the `osm` dataset
185
    osm_landuse = OsmLanduse(dependencies=[osm, vg250])
186
187
    # Calculate feedin from renewables
188
    renewable_feedin = RenewableFeedin(
189
        dependencies=[vg250, zensus_vg250, weather_data]
190
    )
191
192
    # Demarcate district heating areas
193
    district_heating_areas = DistrictHeatingAreas(
194
        dependencies=[
195
            heat_demand_Germany,
196
            scenario_parameters,
197
            zensus_miscellaneous,
198
        ]
199
    )
200
201
    # TODO: What does "trans" stand for?
202
    # Calculate dynamic line rating for HV (high voltage) trans lines
203
    # dlr = Calculate_dlr(
204
    #    dependencies=[data_bundle, osmtgmod, weather_data] # , fix_subnetworks]
205
    #)
206
207
    # Map zensus grid districts
208
    zensus_mv_grid_districts = ZensusMvGridDistricts(
209
        dependencies=[mv_grid_districts, zensus_population]
210
    )
211
212
    # Map federal states to mv_grid_districts
213
    vg250_mv_grid_districts = Vg250MvGridDistricts(
214
        dependencies=[mv_grid_districts, vg250]
215
    )
216
217
    # Create household demand profiles on zensus level
218
    hh_demand_profiles_setup = hh_profiles.HouseholdDemands(
219
        dependencies=[
220
            demandregio,
221
            tasks[
222
                "osm_buildings_streets"
223
                ".create-buildings-residential-zensus-mapping"
224
            ],
225
            vg250,
226
            zensus_miscellaneous,
227
            zensus_mv_grid_districts,
228
            zensus_vg250,
229
        ]
230
    )
231
232
    # Household electricity demand buildings
233
    hh_demand_buildings_setup = hh_buildings.setup(
234
        dependencies=[
235
            tasks[
236
                "electricity_demand_timeseries"
237
                ".hh_profiles"
238
                ".houseprofiles-in-census-cells"
239
            ]
240
        ]
241
    )
242
243
    # Get household electrical demands for cencus cells
244
    household_electricity_demand_annual = HouseholdElectricityDemand(
245
        dependencies=[
246
            tasks[
247
                "electricity_demand_timeseries"
248
                ".hh_buildings"
249
                ".map-houseprofiles-to-buildings"
250
            ]
251
        ]
252
    )
253
254
    # Distribute electrical CTS demands to zensus grid
255
    cts_electricity_demand_annual = CtsElectricityDemand(
256
        dependencies=[
257
            demandregio,
258
            heat_demand_Germany,
259
            # household_electricity_demand_annual,
260
            tasks["electricity_demand.create-tables"],
261
            tasks["etrago_setup.create-tables"],
262
            zensus_mv_grid_districts,
263
            zensus_vg250,
264
        ]
265
    )
266
267
    # Industry
268
    industrial_sites = MergeIndustrialSites(
269
        dependencies=[data_bundle, setup, vg250]
270
    )
271
    demand_curves_industry = IndustrialDemandCurves(
272
        dependencies=[
273
            demandregio,
274
            industrial_sites,
275
            osm_landuse,
276
            mv_grid_districts,
277
            osm,
278
        ]
279
    )
280
281
    # Electrical loads to eTraGo
282
    electrical_load_etrago = ElectricalLoadEtrago(
283
        dependencies=[
284
            cts_electricity_demand_annual,
285
            demand_curves_industry,
286
            hh_demand_buildings_setup,
287
            household_electricity_demand_annual,
288
        ]
289
    )
290
291
    # Heat time Series
292
    heat_time_series = HeatTimeSeries(
293
        dependencies=[
294
            data_bundle,
295
            demandregio,
296
            heat_demand_Germany,
297
            district_heating_areas,
298
            vg250,
299
            zensus_mv_grid_districts,
300
            hh_demand_buildings_setup,
301
            weather_data,
302
        ]
303
    )
304
305
    cts_demand_buildings = CtsDemandBuildings(
306
        dependencies=[
307
            osm_buildings_streets,
308
            cts_electricity_demand_annual,
309
            hh_demand_buildings_setup,
310
            tasks["heat_demand_timeseries.export-etrago-cts-heat-profiles"],
311
        ]
312
    )
313
314
    # run pypsa-eur-sec
315
    run_pypsaeursec = PypsaEurSec(
316
        dependencies=[
317
            weather_data,
318
            hd_abroad,
319
            osmtgmod,
320
            setup_etrago,
321
            data_bundle,
322
            electrical_load_etrago,
323
            heat_time_series,
324
        ]
325
    )
326
327
    # Deal with electrical neighbours
328
    foreign_lines = ElectricalNeighbours(
329
        dependencies=[run_pypsaeursec, tyndp_data]
330
    )
331
332
    # Import NEP (Netzentwicklungsplan) data
333
    scenario_capacities = ScenarioCapacities(
334
        dependencies=[
335
            data_bundle,
336
            run_pypsaeursec,
337
            setup,
338
            vg250,
339
            zensus_population,
340
        ]
341
    )
342
343
    # Import gas grid
344
    gas_grid_insert_data = GasNodesAndPipes(
345
        dependencies=[
346
            data_bundle,
347
            foreign_lines,
348
            osmtgmod,
349
            scenario_parameters,
350
            tasks["etrago_setup.create-tables"],
351
        ]
352
    )
353
    # Create gas voronoi status quo
354
    create_gas_polygons_statusquo = GasAreasStatusQuo(
355
        dependencies=[setup_etrago, vg250, gas_grid_insert_data, substation_voronoi]
356
    )
357
358
    # Gas abroad
359
    gas_abroad_insert_data = GasNeighbours(
360
        dependencies=[
361
            gas_grid_insert_data,
362
            run_pypsaeursec,
363
            foreign_lines,
364
            create_gas_polygons_statusquo,
365
        ]
366
    )
367
368
    # Import gas production
369
    gas_production_insert_data = CH4Production(
370
        dependencies=[create_gas_polygons_statusquo]
371
    )
372
373
    # Import CH4 storages
374
    insert_data_ch4_storages = CH4Storages(
375
        dependencies=[create_gas_polygons_statusquo]
376
    )
377
378
    # CHP locations
379
    chp = Chp(
380
        dependencies=[
381
            create_gas_polygons_statusquo,
382
            demand_curves_industry,
383
            district_heating_areas,
384
            industrial_sites,
385
            osm_landuse,
386
            mastr_data,
387
            mv_grid_districts,
388
        ]
389
    )
390
391
    # Power plants
392
    power_plants = PowerPlants(
393
        dependencies=[
394
            chp,
395
            cts_electricity_demand_annual,
396
            household_electricity_demand_annual,
397
            mastr_data,
398
            mv_grid_districts,
399
            renewable_feedin,
400
            scenario_parameters,
401
            setup,
402
            substation_extraction,
403
            tasks["etrago_setup.create-tables"],
404
            vg250_mv_grid_districts,
405
            zensus_mv_grid_districts,
406
        ]
407
    )
408
409
    create_ocgt = OpenCycleGasTurbineEtrago(
410
        dependencies=[create_gas_polygons_statusquo, power_plants]
411
    )
412
413
    # Fill eTraGo generators tables
414
    fill_etrago_generators = Egon_etrago_gen(
415
        dependencies=[power_plants, weather_data]
416
    )
417
418
    # Heat supply
419
    heat_supply = HeatSupply(
420
        dependencies=[
421
            chp,
422
            data_bundle,
423
            district_heating_areas,
424
            zensus_mv_grid_districts,
425
            scenario_capacities,
426
        ]
427
    )
428
429
430
    # Pumped hydro and home storage units
431
    storage_units = Storages(
432
        dependencies=[
433
            mastr_data,
434
            mv_grid_districts,
435
            power_plants,
436
            scenario_parameters,
437
            setup,
438
            vg250_mv_grid_districts,
439
            fill_etrago_generators,
440
        ]
441
    )
442
443
    # Heat to eTraGo
444
    heat_etrago = HeatEtrago(
445
        dependencies=[
446
            heat_supply,
447
            mv_grid_districts,
448
            renewable_feedin,
449
            setup_etrago,
450
            heat_time_series,
451
        ]
452
    )
453
454
    # CHP to eTraGo
455
    chp_etrago = ChpEtrago(dependencies=[chp, heat_etrago])
456
457
    # Heat pump disaggregation for status quo
458
    heat_pumps_sq = HeatPumpsStatusQuo(
459
        dependencies=[
460
            cts_demand_buildings,
461
            DistrictHeatingAreas,
462
            heat_supply,
463
            heat_time_series,
464
            power_plants,
465
        ]
466
    )
467
468
    # HTS to eTraGo table
469
    hts_etrago_table = HtsEtragoTable(
470
        dependencies=[
471
            district_heating_areas,
472
            heat_etrago,
473
            heat_time_series,
474
            mv_grid_districts,
475
            heat_pumps_sq,
476
        ]
477
    )
478
479
    # Storages to eTraGo
480
    storage_etrago = StorageEtrago(
481
        dependencies=[storage_units, scenario_parameters, setup_etrago]
482
    )
483
484
    # eMobility: motorized individual travel TODO: adjust for SQ
485
    emobility_mit = MotorizedIndividualTravel(
486
        dependencies=[
487
            data_bundle,
488
            mv_grid_districts,
489
            scenario_parameters,
490
            setup_etrago,
491
            zensus_mv_grid_districts,
492
            zensus_vg250,
493
            storage_etrago,
494
            hts_etrago_table,
495
            chp_etrago,
496
            heat_etrago,
497
            fill_etrago_generators,
498
            create_ocgt,
499
            gas_production_insert_data,
500
            insert_data_ch4_storages,
501
        ]
502
    )
503
504
    mit_charging_infrastructure = MITChargingInfrastructure(
505
        dependencies=[mv_grid_districts, hh_demand_buildings_setup]
506
    )
507
508
    # Create load areas
509
    load_areas = LoadArea(
510
        dependencies=[
511
            osm_landuse,
512
            zensus_vg250,
513
            household_electricity_demand_annual,
514
            tasks[
515
                "electricity_demand_timeseries"
516
                ".hh_buildings"
517
                ".get-building-peak-loads"
518
            ],
519
            cts_demand_buildings,
520
            demand_curves_industry,
521
        ]
522
    )
523
524
525