Passed
Pull Request — dev (#943)
by
unknown
02:17
created

motorized_individual_travel_charging_infrastructure.infrastructure_allocation   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 222
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 11
eloc 130
dl 0
loc 222
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
B get_data() 0 113 3
A run_tracbev_potential() 0 9 2
A run_use_cases() 0 22 1
A run_tracbev() 0 4 1
A write_to_db() 0 36 4
1
"""
2
The charging infrastructure allocation is based on [TracBEV[(
3
https://github.com/rl-institut/tracbev). TracBEV is a tool for the regional allocation
4
of charging infrastructure. In practice this allows users to use results generated via
5
[SimBEV](https://github.com/rl-institut/simbev) and place the corresponding charging
6
points on a map. These are split into the four use cases hpc, public, home and work.
7
"""
8
from __future__ import annotations
9
10
from pathlib import Path
11
12
import geopandas as gpd
13
import numpy as np
14
import pandas as pd
15
16
from egon.data import config, db
17
from egon.data.datasets.emobility.motorized_individual_travel_charging_infrastructure.use_cases import (  # noqa: E501
18
    home,
19
    hpc,
20
    public,
21
    work,
22
)
23
24
WORKING_DIR = Path(".", "charging_infrastructure").resolve()
25
DATASET_CFG = config.datasets()["charging_infrastructure"]
26
27
28
def write_to_db(gdf: gpd.GeoDataFrame, mv_grid_id: int | float, use_case: str):
29
    if gdf.empty:
30
        return
31
32
    if "energy" in gdf.columns:
33
        gdf = gdf.assign(weight=gdf.energy.div(gdf.energy.sum()))
34
    else:
35
        rng = np.random.default_rng(DATASET_CFG["constants"]["random_seed"])
36
37
        gdf = gdf.assign(weight=rng.integers(low=0, high=100, size=len(gdf)))
38
39
        gdf = gdf.assign(weight=gdf.weight.div(gdf.weight.sum()))
40
41
    max_id = db.select_dataframe(
42
        """
43
        SELECT MAX(cp_id) FROM grid.egon_emob_charging_infrastructure
44
        """
45
    )["max"][0]
46
47
    if max_id is None:
48
        max_id = 0
49
50
    gdf = gdf.assign(
51
        cp_id=range(max_id, max_id + len(gdf)),
52
        mv_grid_id=mv_grid_id,
53
        use_case=use_case,
54
    )
55
56
    targets = DATASET_CFG["targets"]
57
    cols_to_export = targets["charging_infrastructure"]["cols_to_export"]
58
59
    gpd.GeoDataFrame(gdf[cols_to_export], crs=gdf.crs).to_postgis(
60
        targets["charging_infrastructure"]["table"],
61
        schema=targets["charging_infrastructure"]["schema"],
62
        con=db.engine(),
63
        if_exists="append",
64
    )
65
66
67
def run_tracbev():
68
    data_dict = get_data()
69
70
    run_tracbev_potential(data_dict)
71
72
73
def run_tracbev_potential(data_dict):
74
    bounds = data_dict["boundaries"]
75
76
    for mv_grid_id in data_dict["regions"].mv_grid_id:
77
        region = bounds.loc[bounds.bus_id == mv_grid_id].geom
78
79
        data_dict.update({"region": region, "key": mv_grid_id})
80
        # Start Use Cases
81
        run_use_cases(data_dict)
82
83
84
def run_use_cases(data_dict):
85
    write_to_db(
86
        hpc(data_dict["hpc_positions"], data_dict),
87
        data_dict["key"],
88
        use_case="hpc",
89
    )
90
    write_to_db(
91
        public(
92
            data_dict["public_positions"], data_dict["poi_cluster"], data_dict
93
        ),
94
        data_dict["key"],
95
        use_case="public",
96
    )
97
    write_to_db(
98
        work(data_dict["landuse"], data_dict["work_dict"], data_dict),
99
        data_dict["key"],
100
        use_case="work",
101
    )
102
    write_to_db(
103
        home(data_dict["housing_data"], data_dict),
104
        data_dict["key"],
105
        use_case="home",
106
    )
107
108
109
def get_data() -> dict[gpd.GeoDataFrame]:
110
    tracbev_cfg = DATASET_CFG["original_data"]["sources"]["tracbev"]
111
    srid = tracbev_cfg["srid"]
112
113
    # TODO: get zensus housing data from DB instead of gpkg?
114
    files = tracbev_cfg["files_to_use"]
115
116
    data_dict = {}
117
118
    # get TracBEV files
119
    for f in files:
120
        file = WORKING_DIR / "data" / f
121
        name = f.split(".")[0]
122
123
        data_dict[name] = gpd.read_file(file)
124
125
        if "undefined" in data_dict[name].crs.name.lower():
126
            data_dict[name] = data_dict[name].set_crs(
127
                epsg=srid, allow_override=True
128
            )
129
        else:
130
            data_dict[name] = data_dict[name].to_crs(epsg=srid)
131
132
    # get housing data from DB
133
    sql = """
134
    SELECT building_id, cell_id
135
    FROM demand.egon_household_electricity_profile_of_buildings
136
    """
137
138
    df = db.select_dataframe(sql)
139
140
    count_df = (
141
        df.groupby(["building_id", "cell_id"])
142
        .size()
143
        .reset_index()
144
        .rename(columns={0: "count"})
145
    )
146
147
    mfh_df = (
148
        count_df.loc[count_df["count"] > 1]
149
        .groupby(["cell_id"])
150
        .size()
151
        .reset_index()
152
        .rename(columns={0: "num_mfh"})
153
    )
154
    efh_df = (
155
        count_df.loc[count_df["count"] <= 1]
156
        .groupby(["cell_id"])
157
        .size()
158
        .reset_index()
159
        .rename(columns={0: "num"})
160
    )
161
162
    comb_df = (
163
        mfh_df.merge(
164
            right=efh_df, how="outer", left_on="cell_id", right_on="cell_id"
165
        )
166
        .fillna(0)
167
        .astype(int)
168
    )
169
170
    sql = """
171
    SELECT zensus_population_id, geom as geometry
172
    FROM society.egon_destatis_zensus_apartment_building_population_per_ha
173
    """
174
175
    gdf = db.select_geodataframe(sql, geom_col="geometry", epsg=srid)
176
177
    data_dict["housing_data"] = gpd.GeoDataFrame(
178
        gdf.merge(
179
            right=comb_df, left_on="zensus_population_id", right_on="cell_id"
180
        ),
181
        crs=gdf.crs,
182
    ).drop(columns=["cell_id"])
183
184
    # get boundaries aka grid districts
185
    sql = """
186
    SELECT bus_id, geom FROM grid.egon_mv_grid_district
187
    """
188
189
    data_dict["boundaries"] = db.select_geodataframe(
190
        sql, geom_col="geom", epsg=srid
191
    )
192
193
    data_dict["regions"] = pd.DataFrame(
194
        columns=["mv_grid_id"],
195
        data=data_dict["boundaries"].bus_id.unique(),
196
    )
197
198
    data_dict["work_dict"] = {
199
        "retail": DATASET_CFG["constants"]["work_weight_retail"],
200
        "commercial": DATASET_CFG["constants"]["work_weight_commercial"],
201
        "industrial": DATASET_CFG["constants"]["work_weight_industrial"],
202
    }
203
204
    data_dict["sfh_available"] = DATASET_CFG["constants"][
205
        "single_family_home_share"
206
    ]
207
    data_dict["sfh_avg_spots"] = DATASET_CFG["constants"][
208
        "single_family_home_spots"
209
    ]
210
    data_dict["mfh_available"] = DATASET_CFG["constants"][
211
        "multi_family_home_share"
212
    ]
213
    data_dict["mfh_avg_spots"] = DATASET_CFG["constants"][
214
        "multi_family_home_spots"
215
    ]
216
217
    data_dict["random_seed"] = np.random.default_rng(
218
        DATASET_CFG["constants"]["random_seed"]
219
    )
220
221
    return data_dict
222