Completed
Push — dev ( 85d654...312d71 )
by
unknown
21s queued 16s
created

data.datasets.emobility.heavy_duty_transport.h2_demand_distribution   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 189
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 10
eloc 105
dl 0
loc 189
rs 10
c 0
b 0
f 0

4 Functions

Rating   Name   Duplication   Size   Complexity  
A run_egon_truck() 0 37 2
A calculate_total_hydrogen_consumption() 0 24 2
A voronoi() 0 57 3
A geo_intersect() 0 42 3
1
"""
2
Calculation of hydrogen demand based on a Voronoi partition of counted truck traffic
3
used to allocate it to NUTS3 regions and finally aggregate it on NUTS3 level.
4
"""
5
from geovoronoi import points_to_coords, voronoi_regions_from_coords
6
from loguru import logger
7
from shapely import wkt
8
from shapely.geometry.multipolygon import MultiPolygon
9
from shapely.geometry.polygon import Polygon
10
from shapely.ops import cascaded_union
11
import geopandas as gpd
12
13
from egon.data import config, db
14
from egon.data.datasets.emobility.heavy_duty_transport.data_io import get_data
15
from egon.data.datasets.emobility.heavy_duty_transport.db_classes import (
16
    EgonHeavyDutyTransportVoronoi,
17
)
18
19
DATASET_CFG = config.datasets()["mobility_hgv"]
20
21
22
def run_egon_truck():
23
    boundary_gdf, bast_gdf, nuts3_gdf = get_data()
24
25
    bast_gdf_within = bast_gdf.dropna().loc[
26
        bast_gdf.within(boundary_gdf.geometry.iat[0])
27
    ]
28
29
    voronoi_gdf = voronoi(bast_gdf_within, boundary_gdf)
30
31
    nuts3_gdf = geo_intersect(voronoi_gdf, nuts3_gdf)
32
33
    nuts3_gdf = nuts3_gdf.assign(
34
        normalized_truck_traffic=(
35
            nuts3_gdf.truck_traffic / nuts3_gdf.truck_traffic.sum()
36
        )
37
    )
38
39
    scenarios = DATASET_CFG["constants"]["scenarios"]
40
41
    for scenario in scenarios:
42
        total_hydrogen_consumption = calculate_total_hydrogen_consumption(
43
            scenario=scenario
44
        )
45
46
        nuts3_gdf = nuts3_gdf.assign(
47
            hydrogen_consumption=(
48
                nuts3_gdf.normalized_truck_traffic * total_hydrogen_consumption
49
            ),
50
            scenario=scenario,
51
        )
52
53
        nuts3_gdf.reset_index().to_postgis(
54
            name=EgonHeavyDutyTransportVoronoi.__table__.name,
55
            con=db.engine(),
56
            schema=EgonHeavyDutyTransportVoronoi.__table__.schema,
57
            if_exists="append",
58
            index=False,
59
        )
60
61
62
def calculate_total_hydrogen_consumption(scenario: str = "eGon2035"):
63
    """Calculate the total hydrogen demand for trucking in Germany."""
64
    constants = DATASET_CFG["constants"]
65
    hgv_mileage = DATASET_CFG["hgv_mileage"]
66
67
    leakage = constants["leakage"]
68
    leakage_rate = constants["leakage_rate"]
69
    hydrogen_consumption = constants["hydrogen_consumption"]  # kg/100km
70
    fcev_share = constants["fcev_share"]
71
72
    hgv_mileage = hgv_mileage[scenario]  # km
73
74
    hydrogen_consumption_per_km = hydrogen_consumption / 100  # kg/km
75
76
    # calculate total hydrogen consumption kg/a
77
    if leakage:
78
        return (
79
            hgv_mileage
80
            * hydrogen_consumption_per_km
81
            * fcev_share
82
            / (1 - leakage_rate)
83
        )
84
    else:
85
        return hgv_mileage * hydrogen_consumption_per_km * fcev_share
86
87
88
def geo_intersect(
89
    voronoi_gdf: gpd.GeoDataFrame,
90
    nuts3_gdf: gpd.GeoDataFrame,
91
    mode: str = "intersection",
92
):
93
    """Calculate Intersections between two GeoDataFrames and distribute truck traffic"""
94
    logger.info(
95
        "Calculating Intersections between Voronoi Field and Grid Districts "
96
        "and distributing truck traffic accordingly to the area share."
97
    )
98
    voronoi_gdf = voronoi_gdf.assign(voronoi_id=voronoi_gdf.index.tolist())
99
100
    # Find Intersections between both GeoDataFrames
101
    intersection_gdf = gpd.overlay(
102
        voronoi_gdf, nuts3_gdf.reset_index()[["nuts3", "geometry"]], how=mode
103
    )
104
105
    # Calc Area of Intersections
106
    intersection_gdf = intersection_gdf.assign(
107
        surface_area=intersection_gdf.geometry.area / 10**6
108
    )  # km²
109
110
    # Initialize results column
111
    nuts3_gdf = nuts3_gdf.assign(truck_traffic=0)
112
113
    for voronoi_id in intersection_gdf.voronoi_id.unique():
114
        voronoi_id_intersection_gdf = intersection_gdf.loc[
115
            intersection_gdf.voronoi_id == voronoi_id
116
        ]
117
118
        total_area = voronoi_id_intersection_gdf.surface_area.sum()
119
120
        truck_traffic = voronoi_id_intersection_gdf.truck_traffic.iat[0]
121
122
        for idx, row in voronoi_id_intersection_gdf.iterrows():
123
            traffic_share = truck_traffic * row["surface_area"] / total_area
124
125
            nuts3_gdf.at[row["nuts3"], "truck_traffic"] += traffic_share
126
127
    logger.info("Done.")
128
129
    return nuts3_gdf
130
131
132
def voronoi(
133
    points: gpd.GeoDataFrame,
134
    boundary: gpd.GeoDataFrame,
135
):
136
    """Building a Voronoi Field from points and a boundary."""
137
    logger.info("Building Voronoi Field.")
138
139
    sources = DATASET_CFG["original_data"]["sources"]
140
    relevant_columns = sources["BAST"]["relevant_columns"]
141
    truck_col = relevant_columns[0]
142
    srid = DATASET_CFG["tables"]["srid"]
143
144
    # convert the boundary geometry into a union of the polygon
145
    # convert the Geopandas GeoSeries of Point objects to NumPy array of coordinates.
146
    boundary_shape = cascaded_union(boundary.geometry)
147
    coords = points_to_coords(points.geometry)
148
149
    # calculate Voronoi regions
150
    poly_shapes, pts, unassigned_pts = voronoi_regions_from_coords(
151
        coords, boundary_shape, return_unassigned_points=True
152
    )
153
154
    multipoly_shapes = {}
155
156
    for key, shape in poly_shapes.items():
157
        if isinstance(shape, Polygon):
158
            shape = wkt.loads(str(shape))
159
            shape = MultiPolygon([shape])
160
161
        multipoly_shapes[key] = [shape]
162
163
    poly_gdf = gpd.GeoDataFrame.from_dict(
164
        multipoly_shapes, orient="index", columns=["geometry"]
165
    )
166
167
    # match points to old index
168
    # FIXME: This seems overcomplicated
169
    poly_gdf.index = [v[0] for v in pts.values()]
170
171
    poly_gdf = poly_gdf.sort_index()
172
173
    unmatched = [points.index[idx] for idx in unassigned_pts]
174
175
    points_matched = points.drop(unmatched)
176
177
    poly_gdf.index = points_matched.index
178
179
    # match truck traffic to new polys
180
    poly_gdf = poly_gdf.assign(
181
        truck_traffic=points.loc[poly_gdf.index][truck_col]
182
    )
183
184
    poly_gdf = poly_gdf.set_crs(epsg=srid, inplace=True)
185
186
    logger.info("Done.")
187
188
    return poly_gdf
189