Passed
Pull Request — dev (#943)
by
unknown
01:47
created

motorized_individual_travel_charging_infrastructure.infrastructure_allocation   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 215
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 11
eloc 130
dl 0
loc 215
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
B get_data() 0 113 3
A run_tracbev_potential() 0 9 2
A run_use_cases() 0 22 1
A run_tracbev() 0 4 1
A write_to_db() 0 36 4
1
from __future__ import annotations
2
3
from pathlib import Path
4
5
import geopandas as gpd
6
import numpy as np
7
import pandas as pd
8
9
from egon.data import config, db
10
from egon.data.datasets.emobility.motorized_individual_travel_charging_infrastructure.use_cases import (  # noqa: E501
11
    home,
12
    hpc,
13
    public,
14
    work,
15
)
16
17
WORKING_DIR = Path(".", "charging_infrastructure").resolve()
18
DATASET_CFG = config.datasets()["charging_infrastructure"]
19
20
21
def write_to_db(gdf: gpd.GeoDataFrame, mv_grid_id: int | float, use_case: str):
22
    if gdf.empty:
23
        return
24
25
    if "energy" in gdf.columns:
26
        gdf = gdf.assign(weight=gdf.energy.div(gdf.energy.sum()))
27
    else:
28
        rng = np.random.default_rng(DATASET_CFG["constants"]["random_seed"])
29
30
        gdf = gdf.assign(weight=rng.integers(low=0, high=100, size=len(gdf)))
31
32
        gdf = gdf.assign(weight=gdf.weight.div(gdf.weight.sum()))
33
34
    max_id = db.select_dataframe(
35
        """
36
        SELECT MAX(cp_id) FROM grid.egon_emob_charging_infrastructure
37
        """
38
    )["max"][0]
39
40
    if max_id is None:
41
        max_id = 0
42
43
    gdf = gdf.assign(
44
        cp_id=range(max_id, max_id + len(gdf)),
45
        mv_grid_id=mv_grid_id,
46
        use_case=use_case,
47
    )
48
49
    targets = DATASET_CFG["targets"]
50
    cols_to_export = targets["charging_infrastructure"]["cols_to_export"]
51
52
    gpd.GeoDataFrame(gdf[cols_to_export], crs=gdf.crs).to_postgis(
53
        targets["charging_infrastructure"]["table"],
54
        schema=targets["charging_infrastructure"]["schema"],
55
        con=db.engine(),
56
        if_exists="append",
57
    )
58
59
60
def run_tracbev():
61
    data_dict = get_data()
62
63
    run_tracbev_potential(data_dict)
64
65
66
def run_tracbev_potential(data_dict):
67
    bounds = data_dict["boundaries"]
68
69
    for mv_grid_id in data_dict["regions"].mv_grid_id:
70
        region = bounds.loc[bounds.bus_id == mv_grid_id].geom
71
72
        data_dict.update({"region": region, "key": mv_grid_id})
73
        # Start Use Cases
74
        run_use_cases(data_dict)
75
76
77
def run_use_cases(data_dict):
78
    write_to_db(
79
        hpc(data_dict["hpc_positions"], data_dict),
80
        data_dict["key"],
81
        use_case="hpc",
82
    )
83
    write_to_db(
84
        public(
85
            data_dict["public_positions"], data_dict["poi_cluster"], data_dict
86
        ),
87
        data_dict["key"],
88
        use_case="public",
89
    )
90
    write_to_db(
91
        work(data_dict["landuse"], data_dict["work_dict"], data_dict),
92
        data_dict["key"],
93
        use_case="work",
94
    )
95
    write_to_db(
96
        home(data_dict["housing_data"], data_dict),
97
        data_dict["key"],
98
        use_case="home",
99
    )
100
101
102
def get_data() -> dict[gpd.GeoDataFrame]:
103
    tracbev_cfg = DATASET_CFG["original_data"]["sources"]["tracbev"]
104
    srid = tracbev_cfg["srid"]
105
106
    # TODO: get zensus housing data from DB instead of gpkg?
107
    files = tracbev_cfg["files_to_use"]
108
109
    data_dict = {}
110
111
    # get TracBEV files
112
    for f in files:
113
        file = WORKING_DIR / "data" / f
114
        name = f.split(".")[0]
115
116
        data_dict[name] = gpd.read_file(file)
117
118
        if "undefined" in data_dict[name].crs.name.lower():
119
            data_dict[name] = data_dict[name].set_crs(
120
                epsg=srid, allow_override=True
121
            )
122
        else:
123
            data_dict[name] = data_dict[name].to_crs(epsg=srid)
124
125
    # get housing data from DB
126
    sql = """
127
    SELECT building_id, cell_id
128
    FROM demand.egon_household_electricity_profile_of_buildings
129
    """
130
131
    df = db.select_dataframe(sql)
132
133
    count_df = (
134
        df.groupby(["building_id", "cell_id"])
135
        .size()
136
        .reset_index()
137
        .rename(columns={0: "count"})
138
    )
139
140
    mfh_df = (
141
        count_df.loc[count_df["count"] > 1]
142
        .groupby(["cell_id"])
143
        .size()
144
        .reset_index()
145
        .rename(columns={0: "num_mfh"})
146
    )
147
    efh_df = (
148
        count_df.loc[count_df["count"] <= 1]
149
        .groupby(["cell_id"])
150
        .size()
151
        .reset_index()
152
        .rename(columns={0: "num"})
153
    )
154
155
    comb_df = (
156
        mfh_df.merge(
157
            right=efh_df, how="outer", left_on="cell_id", right_on="cell_id"
158
        )
159
        .fillna(0)
160
        .astype(int)
161
    )
162
163
    sql = """
164
    SELECT zensus_population_id, geom as geometry
165
    FROM society.egon_destatis_zensus_apartment_building_population_per_ha
166
    """
167
168
    gdf = db.select_geodataframe(sql, geom_col="geometry", epsg=srid)
169
170
    data_dict["housing_data"] = gpd.GeoDataFrame(
171
        gdf.merge(
172
            right=comb_df, left_on="zensus_population_id", right_on="cell_id"
173
        ),
174
        crs=gdf.crs,
175
    ).drop(columns=["cell_id"])
176
177
    # get boundaries aka grid districts
178
    sql = """
179
    SELECT bus_id, geom FROM grid.egon_mv_grid_district
180
    """
181
182
    data_dict["boundaries"] = db.select_geodataframe(
183
        sql, geom_col="geom", epsg=srid
184
    )
185
186
    data_dict["regions"] = pd.DataFrame(
187
        columns=["mv_grid_id"],
188
        data=data_dict["boundaries"].bus_id.unique(),
189
    )
190
191
    data_dict["work_dict"] = {
192
        "retail": DATASET_CFG["constants"]["work_weight_retail"],
193
        "commercial": DATASET_CFG["constants"]["work_weight_commercial"],
194
        "industrial": DATASET_CFG["constants"]["work_weight_industrial"],
195
    }
196
197
    data_dict["sfh_available"] = DATASET_CFG["constants"][
198
        "single_family_home_share"
199
    ]
200
    data_dict["sfh_avg_spots"] = DATASET_CFG["constants"][
201
        "single_family_home_spots"
202
    ]
203
    data_dict["mfh_available"] = DATASET_CFG["constants"][
204
        "multi_family_home_share"
205
    ]
206
    data_dict["mfh_avg_spots"] = DATASET_CFG["constants"][
207
        "multi_family_home_spots"
208
    ]
209
210
    data_dict["random_seed"] = np.random.default_rng(
211
        DATASET_CFG["constants"]["random_seed"]
212
    )
213
214
    return data_dict
215