Completed
Push — dev ( 85d654...312d71 )
by
unknown
21s queued 16s
created

  B

Complexity

Conditions 3

Size

Total Lines 128
Code Lines 65

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 65
dl 0
loc 128
rs 8.1454
c 0
b 0
f 0
cc 3
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
The charging infrastructure allocation is based on [TracBEV[(
3
https://github.com/rl-institut/tracbev). TracBEV is a tool for the regional allocation
4
of charging infrastructure. In practice this allows users to use results generated via
5
[SimBEV](https://github.com/rl-institut/simbev) and place the corresponding charging
6
points on a map. These are split into the four use cases hpc, public, home and work.
7
"""
8
from __future__ import annotations
9
10
from pathlib import Path
11
12
import geopandas as gpd
13
import numpy as np
14
import pandas as pd
15
16
from egon.data import config, db
17
from egon.data.datasets.emobility.motorized_individual_travel_charging_infrastructure.use_cases import (  # noqa: E501
18
    home,
19
    hpc,
20
    public,
21
    work,
22
)
23
24
WORKING_DIR = Path(".", "charging_infrastructure").resolve()
25
DATASET_CFG = config.datasets()["charging_infrastructure"]
26
27
28
def write_to_db(
29
    gdf: gpd.GeoDataFrame, mv_grid_id: int | float, use_case: str
30
) -> None:
31
    """
32
    Write results to charging infrastructure DB table
33
34
    Parameters
35
    ----------
36
    gdf: geopandas.GeoDataFrame
37
        GeoDataFrame to save
38
    mv_grid_id: int or float
39
        MV grid ID corresponding to the data
40
    use_case: str
41
        Calculated use case
42
43
    """
44
    if gdf.empty:
45
        return
46
47
    if "energy" in gdf.columns:
48
        gdf = gdf.assign(weight=gdf.energy.div(gdf.energy.sum()))
49
    else:
50
        rng = np.random.default_rng(DATASET_CFG["constants"]["random_seed"])
51
52
        gdf = gdf.assign(weight=rng.integers(low=0, high=100, size=len(gdf)))
53
54
        gdf = gdf.assign(weight=gdf.weight.div(gdf.weight.sum()))
55
56
    max_id = db.select_dataframe(
57
        """
58
        SELECT MAX(cp_id) FROM grid.egon_emob_charging_infrastructure
59
        """
60
    )["max"][0]
61
62
    if max_id is None:
63
        max_id = 0
64
65
    gdf = gdf.assign(
66
        cp_id=range(max_id, max_id + len(gdf)),
67
        mv_grid_id=mv_grid_id,
68
        use_case=use_case,
69
    )
70
71
    targets = DATASET_CFG["targets"]
72
    cols_to_export = targets["charging_infrastructure"]["cols_to_export"]
73
74
    gpd.GeoDataFrame(gdf[cols_to_export], crs=gdf.crs).to_postgis(
75
        targets["charging_infrastructure"]["table"],
76
        schema=targets["charging_infrastructure"]["schema"],
77
        con=db.engine(),
78
        if_exists="append",
79
    )
80
81
82
def run_tracbev():
83
    """
84
    Wrapper function to run charging infrastructure allocation
85
    """
86
    data_dict = get_data()
87
88
    run_tracbev_potential(data_dict)
89
90
91
def run_tracbev_potential(data_dict: dict) -> None:
92
    """
93
    Main function to run TracBEV in potential (determination of all potential
94
    charging points).
95
96
    Parameters
97
    ----------
98
    data_dict: dict
99
        Data dict containing all TracBEV run information
100
    """
101
    bounds = data_dict["boundaries"]
102
103
    for mv_grid_id in data_dict["regions"].mv_grid_id:
104
        region = bounds.loc[bounds.bus_id == mv_grid_id].geom
105
106
        data_dict.update({"region": region, "key": mv_grid_id})
107
        # Start Use Cases
108
        run_use_cases(data_dict)
109
110
111
def run_use_cases(data_dict: dict) -> None:
112
    """
113
    Run all use cases
114
115
    Parameters
116
    ----------
117
    data_dict: dict
118
        Data dict containing all TracBEV run information
119
    """
120
    write_to_db(
121
        hpc(data_dict["hpc_positions"], data_dict),
122
        data_dict["key"],
123
        use_case="hpc",
124
    )
125
    write_to_db(
126
        public(
127
            data_dict["public_positions"], data_dict["poi_cluster"], data_dict
128
        ),
129
        data_dict["key"],
130
        use_case="public",
131
    )
132
    write_to_db(
133
        work(data_dict["landuse"], data_dict["work_dict"], data_dict),
134
        data_dict["key"],
135
        use_case="work",
136
    )
137
    write_to_db(
138
        home(data_dict["housing_data"], data_dict),
139
        data_dict["key"],
140
        use_case="home",
141
    )
142
143
144
def get_data() -> dict[gpd.GeoDataFrame]:
145
    """
146
    Load all data necessary for TracBEV. Data loaded:
147
148
    * 'hpc_positions' - Potential hpc positions
149
    * 'landuse' - Potential work related positions
150
    * 'poi_cluster' - Potential public related positions
151
    * 'public_positions' - Potential public related positions
152
    * 'housing_data' - Potential home related positions loaded from DB
153
    * 'boundaries' - MV grid boundaries
154
    * miscellaneous found in *datasets.yml* in section *charging_infrastructure*
155
156
    Returns
157
    -------
158
159
    """
160
    tracbev_cfg = DATASET_CFG["original_data"]["sources"]["tracbev"]
161
    srid = tracbev_cfg["srid"]
162
163
    # TODO: get zensus housing data from DB instead of gpkg?
164
    files = tracbev_cfg["files_to_use"]
165
166
    data_dict = {}
167
168
    # get TracBEV files
169
    for f in files:
170
        file = WORKING_DIR / "data" / f
171
        name = f.split(".")[0]
172
173
        data_dict[name] = gpd.read_file(file)
174
175
        if "undefined" in data_dict[name].crs.name.lower():
176
            data_dict[name] = data_dict[name].set_crs(
177
                epsg=srid, allow_override=True
178
            )
179
        else:
180
            data_dict[name] = data_dict[name].to_crs(epsg=srid)
181
182
    # get housing data from DB
183
    sql = """
184
    SELECT building_id, cell_id
185
    FROM demand.egon_household_electricity_profile_of_buildings
186
    """
187
188
    df = db.select_dataframe(sql)
189
190
    count_df = (
191
        df.groupby(["building_id", "cell_id"])
192
        .size()
193
        .reset_index()
194
        .rename(columns={0: "count"})
195
    )
196
197
    mfh_df = (
198
        count_df.loc[count_df["count"] > 1]
199
        .groupby(["cell_id"])
200
        .size()
201
        .reset_index()
202
        .rename(columns={0: "num_mfh"})
203
    )
204
    efh_df = (
205
        count_df.loc[count_df["count"] <= 1]
206
        .groupby(["cell_id"])
207
        .size()
208
        .reset_index()
209
        .rename(columns={0: "num"})
210
    )
211
212
    comb_df = (
213
        mfh_df.merge(
214
            right=efh_df, how="outer", left_on="cell_id", right_on="cell_id"
215
        )
216
        .fillna(0)
217
        .astype(int)
218
    )
219
220
    sql = """
221
    SELECT zensus_population_id, geom as geometry
222
    FROM society.egon_destatis_zensus_apartment_building_population_per_ha
223
    """
224
225
    gdf = db.select_geodataframe(sql, geom_col="geometry", epsg=srid)
226
227
    data_dict["housing_data"] = gpd.GeoDataFrame(
228
        gdf.merge(
229
            right=comb_df, left_on="zensus_population_id", right_on="cell_id"
230
        ),
231
        crs=gdf.crs,
232
    ).drop(columns=["cell_id"])
233
234
    # get boundaries aka grid districts
235
    sql = """
236
    SELECT bus_id, geom FROM grid.egon_mv_grid_district
237
    """
238
239
    data_dict["boundaries"] = db.select_geodataframe(
240
        sql, geom_col="geom", epsg=srid
241
    )
242
243
    data_dict["regions"] = pd.DataFrame(
244
        columns=["mv_grid_id"],
245
        data=data_dict["boundaries"].bus_id.unique(),
246
    )
247
248
    data_dict["work_dict"] = {
249
        "retail": DATASET_CFG["constants"]["work_weight_retail"],
250
        "commercial": DATASET_CFG["constants"]["work_weight_commercial"],
251
        "industrial": DATASET_CFG["constants"]["work_weight_industrial"],
252
    }
253
254
    data_dict["sfh_available"] = DATASET_CFG["constants"][
255
        "single_family_home_share"
256
    ]
257
    data_dict["sfh_avg_spots"] = DATASET_CFG["constants"][
258
        "single_family_home_spots"
259
    ]
260
    data_dict["mfh_available"] = DATASET_CFG["constants"][
261
        "multi_family_home_share"
262
    ]
263
    data_dict["mfh_avg_spots"] = DATASET_CFG["constants"][
264
        "multi_family_home_spots"
265
    ]
266
267
    data_dict["random_seed"] = np.random.default_rng(
268
        DATASET_CFG["constants"]["random_seed"]
269
    )
270
271
    return data_dict
272