Passed
Pull Request — dev (#934)
by
unknown
02:12
created

data.datasets.power_plants.pv_rooftop_buildings   F

Complexity

Total Complexity 119

Size/Duplication

Total Lines 2741
Duplicated Lines 1.17 %

Importance

Changes 0
Metric Value
wmc 119
eloc 1242
dl 32
loc 2741
rs 0.8
c 0
b 0
f 0

53 Functions

Rating   Name   Duplication   Size   Complexity  
A egon_building_peak_loads() 0 9 1
B load_building_data() 0 105 3
A get_probability_for_property() 0 46 2
A synthetic_buildings() 0 25 2
A calculate_building_load_factor() 0 30 1
A determine_end_of_life_gens() 0 39 1
A add_start_up_date() 0 30 1
A geocoding_data() 0 18 1
A geocoded_data_from_db() 0 23 2
B add_voltage_level() 0 31 6
A cap_per_bus_id() 0 25 1
A add_overlay_id_to_buildings() 0 31 1
B desaggregate_pv_in_mv_grid() 0 161 5
A calculate_max_pv_cap_per_building() 0 50 1
A add_weather_cell_id() 0 33 2
A add_ags_to_gens() 0 24 1
A drop_buildings_outside_muns() 0 25 1
B clean_mastr_data() 0 143 2
A mastr_data() 0 60 2
A scenario_data() 0 30 2
A geocode_data() 0 46 3
B zip_and_municipality_from_standort() 0 50 7
A merge_geocode_with_mastr() 0 27 1
A drop_buildings_outside_grids() 0 25 1
C building_area_range_per_cap_range() 0 87 8
A municipality_data() 0 14 2
A osm_buildings() 0 25 2
A add_ags_to_buildings() 0 24 1
A timer_func() 0 13 1
A add_buildings_meta_data() 0 55 4
C allocate_pv() 0 125 7
A sort_and_qcut_df() 0 29 1
A federal_state_data() 0 24 2
A geocode_mastr_data() 0 28 1
A create_geocoded_table() 0 17 1
A mean_load_factor_per_cap_range() 0 38 3
C desaggregate_pv() 0 139 7
A grid_districts() 0 32 1
A load_mastr_data() 0 35 1
A frame_to_numeric() 0 20 4
A drop_gens_outside_muns() 0 25 1
A geocoder() 0 22 1
A pv_rooftop_to_buildings() 0 50 2
A allocate_to_buildings() 0 32 1
A overlay_grid_districts_with_counties() 0 33 1
B allocate_scenarios() 0 104 1
A drop_invalid_entries_from_gdf() 0 25 1
A validate_output() 0 50 1
A create_scenario_table() 0 17 1
A cap_share_per_cap_range() 0 43 3
A drop_unallocated_gens() 0 27 1
A most_plausible() 0 41 4
B probabilities() 0 57 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like data.datasets.power_plants.pv_rooftop_buildings often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035", "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(20 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "bus_id",
193
    "building_id",
194
    "gens_id",
195
    "capacity",
196
    "einheitliche_ausrichtung_und_neigungswinkel",
197
    "hauptausrichtung",
198
    "hauptausrichtung_neigungswinkel",
199
    "voltage_level",
200
    "weather_cell_id",
201
]
202
203
# TODO
204
INCLUDE_SYNTHETIC_BUILDINGS = True
205
ONLY_BUILDINGS_WITH_DEMAND = True
206
TEST_RUN = False
207
208
209
def timer_func(func):
210
    @wraps(func)
211
    def timeit_wrapper(*args, **kwargs):
212
        start_time = perf_counter()
213
        result = func(*args, **kwargs)
214
        end_time = perf_counter()
215
        total_time = end_time - start_time
216
        logger.debug(
217
            f"Function {func.__name__} took {total_time:.4f} seconds."
218
        )
219
        return result
220
221
    return timeit_wrapper
222
223
224
@timer_func
225
def mastr_data(
226
    index_col: str | int | list[str] | list[int],
227
    usecols: list[str],
228
    dtype: dict[str, Any] | None,
229
    parse_dates: list[str] | None,
230
) -> pd.DataFrame:
231
    """
232
    Read MaStR data from csv.
233
234
    Parameters
235
    -----------
236
    index_col : str, int or list of str or int
237
        Column(s) to use as the row labels of the DataFrame.
238
    usecols : list of str
239
        Return a subset of the columns.
240
    dtype : dict of column (str) -> type (any), optional
241
        Data type for data or columns.
242
    parse_dates : list of names (str), optional
243
        Try to parse given columns to datetime.
244
    Returns
245
    -------
246
    pandas.DataFrame
247
        DataFrame containing MaStR data.
248
    """
249
    mastr_path = Path(
250
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
251
    ).resolve()
252
253
    mastr_df = pd.read_csv(
254
        mastr_path,
255
        index_col=index_col,
256
        usecols=usecols,
257
        dtype=dtype,
258
        parse_dates=parse_dates,
259
    )
260
261
    mastr_df = mastr_df.loc[
262
        (mastr_df.StatisikFlag == "B")
263
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
264
        & (mastr_df.Land == "Deutschland")
265
        & (mastr_df.Lage == "BaulicheAnlagen")
266
    ]
267
268
    if (
269
        config.settings()["egon-data"]["--dataset-boundary"]
270
        == "Schleswig-Holstein"
271
    ):
272
        init_len = len(mastr_df)
273
274
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
275
276
        logger.info(
277
            f"Using only MaStR data within Schleswig-Holstein. "
278
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
279
        )
280
281
    logger.debug("MaStR data loaded.")
282
283
    return mastr_df
284
285
286
@timer_func
287
def clean_mastr_data(
288
    mastr_df: pd.DataFrame,
289
    max_realistic_pv_cap: int | float,
290
    min_realistic_pv_cap: int | float,
291
    rounding: int,
292
    seed: int,
293
) -> pd.DataFrame:
294
    """
295
    Clean the MaStR data from implausible data.
296
297
    * Drop MaStR ID duplicates.
298
    * Drop generators with implausible capacities.
299
    * Drop generators without any kind of start-up date.
300
    * Clean up Standort column and capacity.
301
302
    Parameters
303
    -----------
304
    mastr_df : pandas.DataFrame
305
        DataFrame containing MaStR data.
306
    max_realistic_pv_cap : int or float
307
        Maximum capacity, which is considered to be realistic.
308
    min_realistic_pv_cap : int or float
309
        Minimum capacity, which is considered to be realistic.
310
    rounding : int
311
        Rounding to use when cleaning up capacity. E.g. when
312
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
313
    seed : int
314
        Seed to use for random operations with NumPy and pandas.
315
    Returns
316
    -------
317
    pandas.DataFrame
318
        DataFrame containing cleaned MaStR data.
319
    """
320
    init_len = len(mastr_df)
321
322
    # drop duplicates
323
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
324
325
    # drop invalid entries in standort
326
    index_to_drop = mastr_df.loc[
327
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
328
    ].index
329
330
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
331
332
    df = mastr_df[
333
        [
334
            "Bruttoleistung",
335
            "Bruttoleistung_extended",
336
            "Nettonennleistung",
337
            "zugeordneteWirkleistungWechselrichter",
338
            "InstallierteLeistung",
339
        ]
340
    ].round(rounding)
341
342
    # use only the smallest capacity rating if multiple are given
343
    mastr_df = mastr_df.assign(
344
        capacity=[
345
            most_plausible(p_tub, min_realistic_pv_cap)
346
            for p_tub in df.itertuples(index=False)
347
        ]
348
    )
349
350
    # drop generators without any capacity info
351
    # and capacity of zero
352
    # and if the capacity is > 23.5 MW, because
353
    # Germanies largest rooftop PV is 23 MW
354
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
355
    mastr_df = mastr_df.loc[
356
        (~mastr_df.capacity.isna())
357
        & (mastr_df.capacity <= max_realistic_pv_cap)
358
        & (mastr_df.capacity > min_realistic_pv_cap)
359
    ]
360
361
    # get zip and municipality
362
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
363
        mastr_df.Standort.astype(str)
364
        .apply(
365
            zip_and_municipality_from_standort,
366
            args=(VERBOSE,),
367
        )
368
        .tolist(),
369
        index=mastr_df.index,
370
    )
371
372
    # drop invalid entries
373
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
374
375
    # add ", Deutschland" just in case
376
    mastr_df = mastr_df.assign(
377
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
378
    )
379
380
    # get consistent start-up date
381
    mastr_df = mastr_df.assign(
382
        start_up_date=mastr_df.Inbetriebnahmedatum,
383
    )
384
385
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
386
        mastr_df.start_up_date.isna()
387
    ].assign(
388
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
389
            mastr_df.start_up_date.isna()
390
        ]
391
    )
392
393
    # randomly and weighted fill missing start-up dates
394
    pool = mastr_df.loc[
395
        ~mastr_df.start_up_date.isna()
396
    ].start_up_date.to_numpy()
397
398
    size = len(mastr_df) - len(pool)
399
400
    if size > 0:
401
        np.random.seed(seed)
402
403
        choice = np.random.choice(
404
            pool,
405
            size=size,
406
            replace=False,
407
        )
408
409
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
410
            mastr_df.start_up_date.isna()
411
        ].assign(start_up_date=choice)
412
413
        logger.info(
414
            f"Randomly and weigthed added start-up date to {size} generators."
415
        )
416
417
    mastr_df = mastr_df.assign(
418
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
419
    )
420
421
    end_len = len(mastr_df)
422
    logger.debug(
423
        f"Dropped {init_len - end_len} "
424
        f"({((init_len - end_len) / init_len) * 100:g}%)"
425
        f" of {init_len} rows from MaStR DataFrame."
426
    )
427
428
    return mastr_df
429
430
431
def zip_and_municipality_from_standort(
432
    standort: str,
433
    verbose: bool = False,
434
) -> tuple[str, bool]:
435
    """
436
    Get zip code and municipality from Standort string split into a list.
437
    Parameters
438
    -----------
439
    standort : str
440
        Standort as given from MaStR data.
441
    verbose : bool
442
        Logs additional info if True.
443
    Returns
444
    -------
445
    str
446
        Standort with only the zip code and municipality
447
        as well a ', Germany' added.
448
    """
449
    if verbose:
450
        logger.debug(f"Uncleaned String: {standort}")
451
452
    standort_list = standort.split()
453
454
    found = False
455
    count = 0
456
457
    for count, elem in enumerate(standort_list):
458
        if len(elem) != 5:
459
            continue
460
        if not elem.isnumeric():
461
            continue
462
463
        found = True
464
465
        break
466
467
    if found:
468
        cleaned_str = " ".join(standort_list[count:])
469
470
        if verbose:
471
            logger.debug(f"Cleaned String:   {cleaned_str}")
472
473
        return cleaned_str, found
474
475
    logger.warning(
476
        "Couldn't identify zip code. This entry will be dropped."
477
        f" Original standort: {standort}."
478
    )
479
480
    return standort, found
481
482
483
def most_plausible(
484
    p_tub: tuple,
485
    min_realistic_pv_cap: int | float,
486
) -> float:
487
    """
488
    Try to determine the most plausible capacity.
489
    Try to determine the most plausible capacity from a given
490
    generator from MaStR data.
491
    Parameters
492
    -----------
493
    p_tub : tuple
494
        Tuple containing the different capacities given in
495
        the MaStR data.
496
    min_realistic_pv_cap : int or float
497
        Minimum capacity, which is considered to be realistic.
498
    Returns
499
    -------
500
    float
501
        Capacity of the generator estimated as the most realistic.
502
    """
503
    count = Counter(p_tub).most_common(3)
504
505
    if len(count) == 1:
506
        return count[0][0]
507
508
    val1 = count[0][0]
509
    val2 = count[1][0]
510
511
    if len(count) == 2:
512
        min_val = min(val1, val2)
513
        max_val = max(val1, val2)
514
    else:
515
        val3 = count[2][0]
516
517
        min_val = min(val1, val2, val3)
518
        max_val = max(val1, val2, val3)
519
520
    if min_val < min_realistic_pv_cap:
521
        return max_val
522
523
    return min_val
524
525
526
def geocoder(
527
    user_agent: str,
528
    min_delay_seconds: int,
529
) -> RateLimiter:
530
    """
531
    Setup Nominatim geocoding class.
532
    Parameters
533
    -----------
534
    user_agent : str
535
        The app name.
536
    min_delay_seconds : int
537
        Delay in seconds to use between requests to Nominatim.
538
        A minimum of 1 is advised.
539
    Returns
540
    -------
541
    geopy.extra.rate_limiter.RateLimiter
542
        Nominatim RateLimiter geocoding class to use for geocoding.
543
    """
544
    locator = Nominatim(user_agent=user_agent)
545
    return RateLimiter(
546
        locator.geocode,
547
        min_delay_seconds=min_delay_seconds,
548
    )
549
550
551
def geocoding_data(
552
    clean_mastr_df: pd.DataFrame,
553
) -> pd.DataFrame:
554
    """
555
    Setup DataFrame to geocode.
556
    Parameters
557
    -----------
558
    clean_mastr_df : pandas.DataFrame
559
        DataFrame containing cleaned MaStR data.
560
    Returns
561
    -------
562
    pandas.DataFrame
563
        DataFrame containing all unique combinations of
564
        zip codes with municipalities for geocoding.
565
    """
566
    return pd.DataFrame(
567
        data=clean_mastr_df.zip_and_municipality.unique(),
568
        columns=["zip_and_municipality"],
569
    )
570
571
572
@timer_func
573
def geocode_data(
574
    geocoding_df: pd.DataFrame,
575
    ratelimiter: RateLimiter,
576
    epsg: int,
577
) -> gpd.GeoDataFrame:
578
    """
579
    Geocode zip code and municipality.
580
    Extract latitude, longitude and altitude.
581
    Transfrom latitude and longitude to shapely
582
    Point and return a geopandas GeoDataFrame.
583
    Parameters
584
    -----------
585
    geocoding_df : pandas.DataFrame
586
        DataFrame containing all unique combinations of
587
        zip codes with municipalities for geocoding.
588
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
589
        Nominatim RateLimiter geocoding class to use for geocoding.
590
    epsg : int
591
        EPSG ID to use as CRS.
592
    Returns
593
    -------
594
    geopandas.GeoDataFrame
595
        GeoDataFrame containing all unique combinations of
596
        zip codes with municipalities with matching geolocation.
597
    """
598
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
599
600
    geocode_df = geocoding_df.assign(
601
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
602
    )
603
604
    geocode_df = geocode_df.assign(
605
        point=geocode_df.location.apply(
606
            lambda loc: tuple(loc.point) if loc else None
607
        )
608
    )
609
610
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
611
        geocode_df.point.tolist(), index=geocode_df.index
612
    )
613
614
    return gpd.GeoDataFrame(
615
        geocode_df,
616
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
617
        crs=f"EPSG:{epsg}",
618
    )
619
620
621
def merge_geocode_with_mastr(
622
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
623
) -> gpd.GeoDataFrame:
624
    """
625
    Merge geometry to original mastr data.
626
    Parameters
627
    -----------
628
    clean_mastr_df : pandas.DataFrame
629
        DataFrame containing cleaned MaStR data.
630
    geocode_gdf : geopandas.GeoDataFrame
631
        GeoDataFrame containing all unique combinations of
632
        zip codes with municipalities with matching geolocation.
633
    Returns
634
    -------
635
    gepandas.GeoDataFrame
636
        GeoDataFrame containing cleaned MaStR data with
637
        matching geolocation from geocoding.
638
    """
639
    return gpd.GeoDataFrame(
640
        clean_mastr_df.merge(
641
            geocode_gdf[["zip_and_municipality", "geometry"]],
642
            how="left",
643
            left_on="zip_and_municipality",
644
            right_on="zip_and_municipality",
645
        ),
646
        crs=geocode_gdf.crs,
647
    ).set_index(clean_mastr_df.index)
648
649
650
def drop_invalid_entries_from_gdf(
651
    gdf: gpd.GeoDataFrame,
652
) -> gpd.GeoDataFrame:
653
    """
654
    Drop invalid entries from geopandas GeoDataFrame.
655
    TODO: how to omit the logging from geos here???
656
    Parameters
657
    -----------
658
    gdf : geopandas.GeoDataFrame
659
        GeoDataFrame to be checked for validity.
660
    Returns
661
    -------
662
    gepandas.GeoDataFrame
663
        GeoDataFrame with rows with invalid geometries
664
        dropped.
665
    """
666
    valid_gdf = gdf.loc[gdf.is_valid]
667
668
    logger.debug(
669
        f"{len(gdf) - len(valid_gdf)} "
670
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
671
        f"of {len(gdf)} values were invalid and are dropped."
672
    )
673
674
    return valid_gdf
675
676
677
@timer_func
678
def municipality_data() -> gpd.GeoDataFrame:
679
    """
680
    Get municipality data from eGo^n Database.
681
    Returns
682
    -------
683
    gepandas.GeoDataFrame
684
        GeoDataFrame with municipality data.
685
    """
686
    with db.session_scope() as session:
687
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
688
689
    return gpd.read_postgis(
690
        query.statement, query.session.bind, index_col="ags"
691
    )
692
693
694
@timer_func
695
def add_ags_to_gens(
696
    valid_mastr_gdf: gpd.GeoDataFrame,
697
    municipalities_gdf: gpd.GeoDataFrame,
698
) -> gpd.GeoDataFrame:
699
    """
700
    Add information about AGS ID to generators.
701
    Parameters
702
    -----------
703
    valid_mastr_gdf : geopandas.GeoDataFrame
704
        GeoDataFrame with valid and cleaned MaStR data.
705
    municipalities_gdf : geopandas.GeoDataFrame
706
        GeoDataFrame with municipality data.
707
    Returns
708
    -------
709
    gepandas.GeoDataFrame
710
        GeoDataFrame with valid and cleaned MaStR data
711
        with AGS ID added.
712
    """
713
    return valid_mastr_gdf.sjoin(
714
        municipalities_gdf,
715
        how="left",
716
        predicate="intersects",
717
    ).rename(columns={"index_right": "ags"})
718
719
720
def drop_gens_outside_muns(
721
    valid_mastr_gdf: gpd.GeoDataFrame,
722
) -> gpd.GeoDataFrame:
723
    """
724
    Drop all generators outside of municipalities.
725
    Parameters
726
    -----------
727
    valid_mastr_gdf : geopandas.GeoDataFrame
728
        GeoDataFrame with valid and cleaned MaStR data.
729
    Returns
730
    -------
731
    gepandas.GeoDataFrame
732
        GeoDataFrame with valid and cleaned MaStR data
733
        with generatos without an AGS ID dropped.
734
    """
735
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
736
737
    logger.debug(
738
        f"{len(valid_mastr_gdf) - len(gdf)} "
739
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
740
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
741
        " and are therefore dropped."
742
    )
743
744
    return gdf
745
746
747
class EgonMastrPvRoofGeocoded(Base):
748
    __tablename__ = "egon_mastr_pv_roof_geocoded"
749
    __table_args__ = {"schema": "supply"}
750
751
    zip_and_municipality = Column(String, primary_key=True, index=True)
752
    location = Column(String)
753
    point = Column(String)
754
    latitude = Column(Float)
755
    longitude = Column(Float)
756
    altitude = Column(Float)
757
    geometry = Column(Geometry(srid=EPSG))
758
759
760
def create_geocoded_table(geocode_gdf):
761
    """
762
    Create geocoded table mastr pv rooftop
763
    Parameters
764
    -----------
765
    geocode_gdf : geopandas.GeoDataFrame
766
        GeoDataFrame containing geocoding information on pv rooftop locations.
767
    """
768
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
769
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
770
771
    geocode_gdf.to_postgis(
772
        name=EgonMastrPvRoofGeocoded.__table__.name,
773
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
774
        con=db.engine(),
775
        if_exists="append",
776
        index=False,
777
        # dtype={}
778
    )
779
780
781
def geocoded_data_from_db(
782
    epsg: str | int,
783
) -> gpd.GeoDataFrame:
784
    """
785
    Read OSM buildings data from eGo^n Database.
786
    Parameters
787
    -----------
788
    to_crs : pyproj.crs.crs.CRS
789
        CRS to transform geometries to.
790
    Returns
791
    -------
792
    geopandas.GeoDataFrame
793
        GeoDataFrame containing OSM buildings data.
794
    """
795
    with db.session_scope() as session:
796
        query = session.query(
797
            EgonMastrPvRoofGeocoded.zip_and_municipality,
798
            EgonMastrPvRoofGeocoded.geometry,
799
        )
800
801
    return gpd.read_postgis(
802
        query.statement, query.session.bind, geom_col="geometry"
803
    ).to_crs(f"EPSG:{epsg}")
804
805
806
def load_mastr_data():
807
    """Read PV rooftop data from MaStR CSV
808
    Note: the source will be replaced as soon as the MaStR data is available
809
    in DB.
810
    Returns
811
    -------
812
    geopandas.GeoDataFrame
813
        GeoDataFrame containing MaStR data with geocoded locations.
814
    """
815
    mastr_df = mastr_data(
816
        MASTR_INDEX_COL,
817
        MASTR_RELEVANT_COLS,
818
        MASTR_DTYPES,
819
        MASTR_PARSE_DATES,
820
    )
821
822
    clean_mastr_df = clean_mastr_data(
823
        mastr_df,
824
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
825
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
826
        seed=SEED,
827
        rounding=ROUNDING,
828
    )
829
830
    geocode_gdf = geocoded_data_from_db(EPSG)
831
832
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
833
834
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
835
836
    municipalities_gdf = municipality_data()
837
838
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
839
840
    return drop_gens_outside_muns(valid_mastr_gdf)
841
842
843
class OsmBuildingsFiltered(Base):
844
    __tablename__ = "osm_buildings_filtered"
845
    __table_args__ = {"schema": "openstreetmap"}
846
847
    osm_id = Column(BigInteger)
848
    amenity = Column(String)
849
    building = Column(String)
850
    name = Column(String)
851
    geom = Column(Geometry(srid=SRID), index=True)
852
    area = Column(Float)
853
    geom_point = Column(Geometry(srid=SRID), index=True)
854
    tags = Column(HSTORE)
855
    id = Column(BigInteger, primary_key=True, index=True)
856
857
858
@timer_func
859
def osm_buildings(
860
    to_crs: CRS,
861
) -> gpd.GeoDataFrame:
862
    """
863
    Read OSM buildings data from eGo^n Database.
864
    Parameters
865
    -----------
866
    to_crs : pyproj.crs.crs.CRS
867
        CRS to transform geometries to.
868
    Returns
869
    -------
870
    geopandas.GeoDataFrame
871
        GeoDataFrame containing OSM buildings data.
872
    """
873
    with db.session_scope() as session:
874
        query = session.query(
875
            OsmBuildingsFiltered.id,
876
            OsmBuildingsFiltered.area,
877
            OsmBuildingsFiltered.geom_point.label("geom"),
878
        )
879
880
    return gpd.read_postgis(
881
        query.statement, query.session.bind, index_col="id"
882
    ).to_crs(to_crs)
883
884
885
@timer_func
886
def synthetic_buildings(
887
    to_crs: CRS,
888
) -> gpd.GeoDataFrame:
889
    """
890
    Read synthetic buildings data from eGo^n Database.
891
    Parameters
892
    -----------
893
    to_crs : pyproj.crs.crs.CRS
894
        CRS to transform geometries to.
895
    Returns
896
    -------
897
    geopandas.GeoDataFrame
898
        GeoDataFrame containing OSM buildings data.
899
    """
900
    with db.session_scope() as session:
901
        query = session.query(
902
            OsmBuildingsSynthetic.id,
903
            OsmBuildingsSynthetic.area,
904
            OsmBuildingsSynthetic.geom_point.label("geom"),
905
        )
906
907
    return gpd.read_postgis(
908
        query.statement, query.session.bind, index_col="id"
909
    ).to_crs(to_crs)
910
911
912
@timer_func
913
def add_ags_to_buildings(
914
    buildings_gdf: gpd.GeoDataFrame,
915
    municipalities_gdf: gpd.GeoDataFrame,
916
) -> gpd.GeoDataFrame:
917
    """
918
    Add information about AGS ID to buildings.
919
    Parameters
920
    -----------
921
    buildings_gdf : geopandas.GeoDataFrame
922
        GeoDataFrame containing OSM buildings data.
923
    municipalities_gdf : geopandas.GeoDataFrame
924
        GeoDataFrame with municipality data.
925
    Returns
926
    -------
927
    gepandas.GeoDataFrame
928
        GeoDataFrame containing OSM buildings data
929
        with AGS ID added.
930
    """
931
    return buildings_gdf.sjoin(
932
        municipalities_gdf,
933
        how="left",
934
        predicate="intersects",
935
    ).rename(columns={"index_right": "ags"})
936
937
938
def drop_buildings_outside_muns(
939
    buildings_gdf: gpd.GeoDataFrame,
940
) -> gpd.GeoDataFrame:
941
    """
942
    Drop all buildings outside of municipalities.
943
    Parameters
944
    -----------
945
    buildings_gdf : geopandas.GeoDataFrame
946
        GeoDataFrame containing OSM buildings data.
947
    Returns
948
    -------
949
    gepandas.GeoDataFrame
950
        GeoDataFrame containing OSM buildings data
951
        with buildings without an AGS ID dropped.
952
    """
953
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
954
955
    logger.debug(
956
        f"{len(buildings_gdf) - len(gdf)} "
957
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
958
        f"of {len(buildings_gdf)} values are outside of the municipalities "
959
        "and are therefore dropped."
960
    )
961
962
    return gdf
963
964
965
def egon_building_peak_loads():
966
    sql = """
967
    SELECT building_id
968
    FROM demand.egon_building_electricity_peak_loads
969
    WHERE scenario = 'eGon2035'
970
    """
971
972
    return (
973
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
974
    )
975
976
977
@timer_func
978
def load_building_data():
979
    """
980
    Read buildings from DB
981
    Tables:
982
983
    * `openstreetmap.osm_buildings_filtered` (from OSM)
984
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
985
986
    Use column `id` for both as it is unique hence you concat both datasets. If
987
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
988
989
    Returns
990
    -------
991
    gepandas.GeoDataFrame
992
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
993
        dropped.
994
    """
995
996
    municipalities_gdf = municipality_data()
997
998
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
999
1000
    if INCLUDE_SYNTHETIC_BUILDINGS:
1001
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
1002
1003
        buildings_gdf = gpd.GeoDataFrame(
1004
            pd.concat(
1005
                [
1006
                    osm_buildings_gdf,
1007
                    synthetic_buildings_gdf,
1008
                ]
1009
            ),
1010
            geometry="geom",
1011
            crs=osm_buildings_gdf.crs,
1012
        ).rename(columns={"area": "building_area"})
1013
1014
        buildings_gdf.index = buildings_gdf.index.astype(int)
1015
1016
    else:
1017
        buildings_gdf = osm_buildings_gdf.rename(
1018
            columns={"area": "building_area"}
1019
        )
1020
1021
    if ONLY_BUILDINGS_WITH_DEMAND:
1022
        building_ids = egon_building_peak_loads()
1023
1024
        init_len = len(building_ids)
1025
1026
        building_ids = np.intersect1d(
1027
            list(map(int, building_ids)),
1028
            list(map(int, buildings_gdf.index.to_numpy())),
1029
        )
1030
1031
        end_len = len(building_ids)
1032
1033
        logger.debug(
1034
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of buildings have "
1035
            f"peak load."
1036
        )
1037
1038
        buildings_gdf = buildings_gdf.loc[building_ids]
1039
1040
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1041
1042
    buildings_ags_gdf = drop_buildings_outside_muns(buildings_ags_gdf)
1043
1044
    grid_districts_gdf = grid_districts(EPSG)
1045
1046
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
1047
1048
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
1049
        grid_districts_gdf,
1050
        federal_state_gdf,
1051
    )
1052
1053
    buildings_overlay_gdf = add_overlay_id_to_buildings(
1054
        buildings_ags_gdf,
1055
        grid_federal_state_gdf,
1056
    )
1057
1058
    logger.debug("Loaded buildings.")
1059
1060
    buildings_overlay_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
1061
1062
    # overwrite bus_id with data from new table
1063
    sql = "SELECT building_id, bus_id FROM boundaries.egon_map_zensus_mvgd_buildings"
1064
    map_building_bus_df = db.select_dataframe(sql)
1065
1066
    building_ids = np.intersect1d(
1067
        list(map(int, map_building_bus_df.building_id.unique())),
1068
        list(map(int, buildings_overlay_gdf.index.to_numpy())),
1069
    )
1070
1071
    buildings_within_gdf = buildings_overlay_gdf.loc[building_ids]
1072
1073
    return (
1074
        buildings_within_gdf.drop(columns=["bus_id"])
1075
        .merge(
1076
            how="left",
1077
            right=map_building_bus_df,
1078
            left_index=True,
1079
            right_on="building_id",
1080
        )
1081
        .drop(columns=["building_id"])
1082
    )
1083
1084
1085
@timer_func
1086
def sort_and_qcut_df(
1087
    df: pd.DataFrame | gpd.GeoDataFrame,
1088
    col: str,
1089
    q: int,
1090
) -> pd.DataFrame | gpd.GeoDataFrame:
1091
    """
1092
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1093
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1094
    Parameters
1095
    -----------
1096
    df : pandas.DataFrame or geopandas.GeoDataFrame
1097
        (Geo)DataFrame to sort and qcut.
1098
    col : str
1099
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1100
    q : int
1101
        Number of quantiles.
1102
    Returns
1103
    -------
1104
    pandas.DataFrame or gepandas.GeoDataFrame
1105
        Sorted and qcut (Geo)DataFrame.
1106
    """
1107
    df = df.sort_values(col, ascending=True)
1108
1109
    return df.assign(
1110
        quant=pd.qcut(
1111
            df[col],
1112
            q=q,
1113
            labels=range(q),
1114
        )
1115
    )
1116
1117
1118
@timer_func
1119
def allocate_pv(
1120
    q_mastr_gdf: gpd.GeoDataFrame,
1121
    q_buildings_gdf: gpd.GeoDataFrame,
1122
    seed: int,
1123
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1124
    """
1125
    Allocate the MaStR pv generators to the OSM buildings.
1126
    This will determine a building for each pv generator if there are more
1127
    buildings than generators within a given AGS. Primarily generators are
1128
    distributed with the same qunatile as the buildings. Multiple assignment
1129
    is excluded.
1130
    Parameters
1131
    -----------
1132
    q_mastr_gdf : geopandas.GeoDataFrame
1133
        GeoDataFrame containing geocoded and qcut MaStR data.
1134
    q_buildings_gdf : geopandas.GeoDataFrame
1135
        GeoDataFrame containing qcut OSM buildings data.
1136
    seed : int
1137
        Seed to use for random operations with NumPy and pandas.
1138
    Returns
1139
    -------
1140
    tuple with two geopandas.GeoDataFrame s
1141
        GeoDataFrame containing MaStR data allocated to building IDs.
1142
        GeoDataFrame containing building data allocated to MaStR IDs.
1143
    """
1144
    rng = default_rng(seed=seed)
1145
1146
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1147
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1148
1149
    ags_list = q_buildings_gdf.ags.unique()
1150
1151
    if TEST_RUN:
1152
        ags_list = ags_list[:250]
1153
1154
    num_ags = len(ags_list)
1155
1156
    t0 = perf_counter()
1157
1158
    for count, ags in enumerate(ags_list):
1159
1160
        buildings = q_buildings_gdf.loc[
1161
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1162
        ]
1163
        gens = q_mastr_gdf.loc[
1164
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1165
        ]
1166
1167
        len_build = len(buildings)
1168
        len_gens = len(gens)
1169
1170
        if len_build < len_gens:
1171
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1172
            logger.error(
1173
                f"There are {len_gens} generators and only {len_build}"
1174
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1175
                "generators were truncated to match the amount of buildings."
1176
            )
1177
1178
            assert len_build == len(gens)
1179
1180
        for quant in gens.quant.unique():
1181
            q_buildings = buildings.loc[
1182
                (buildings.quant == quant) & (buildings.gens_id.isna())
1183
            ]
1184
            q_gens = gens.loc[gens.quant == quant]
1185
1186
            len_build = len(q_buildings)
1187
            len_gens = len(q_gens)
1188
1189
            if len_build < len_gens:
1190
                delta = len_gens - len_build
1191
1192
                logger.warning(
1193
                    f"There are {len_gens} generators and only {len_build} "
1194
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1195
                    f"buildings from AGS {ags} will be added randomly."
1196
                )
1197
1198
                add_buildings = pd.Index(
1199
                    rng.choice(
1200
                        buildings.loc[
1201
                            (buildings.quant != quant)
1202
                            & (buildings.gens_id.isna())
1203
                        ].index,
1204
                        size=delta,
1205
                        replace=False,
1206
                    )
1207
                )
1208
1209
                q_buildings = buildings.loc[
1210
                    q_buildings.index.append(add_buildings)
1211
                ]
1212
1213
                assert len(q_buildings) == len_gens
1214
1215
            chosen_buildings = pd.Index(
1216
                rng.choice(
1217
                    q_buildings.index,
1218
                    size=len_gens,
1219
                    replace=False,
1220
                )
1221
            )
1222
1223
            # q_mastr_gdf.loc[q_gens.index, "building_id"] = chosen_buildings
1224
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1225
1226
        if count % 100 == 0:
1227
            logger.debug(
1228
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1229
                f"{perf_counter() - t0:g} seconds."
1230
            )
1231
1232
            t0 = perf_counter()
1233
1234
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1235
1236
    q_mastr_gdf.loc[
1237
        assigned_buildings.gens_id, "building_id"
1238
    ] = assigned_buildings.index
1239
1240
    logger.debug("Allocated status quo generators to buildings.")
1241
1242
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1243
1244
1245
def frame_to_numeric(
1246
    df: pd.DataFrame | gpd.GeoDataFrame,
1247
) -> pd.DataFrame | gpd.GeoDataFrame:
1248
    """
1249
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1250
    Parameters
1251
    ----------
1252
    df : pandas.DataFrame or geopandas.GeoDataFrame
1253
    Returns
1254
    -------
1255
    pandas.DataFrame or geopandas.GeoDataFrame
1256
    """
1257
    if str(df.index.dtype) == "object":
1258
        df.index = pd.to_numeric(df.index, errors="ignore")
1259
1260
    for col in df.columns:
1261
        if str(df[col].dtype) == "object":
1262
            df[col] = pd.to_numeric(df[col], errors="ignore")
1263
1264
    return df
1265
1266
1267
def validate_output(
1268
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1269
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1270
) -> None:
1271
    """
1272
    Validate output.
1273
1274
    * Validate that there are exactly as many buildings with a pv system as there are
1275
      pv systems with a building
1276
    * Validate that the building IDs with a pv system are the same building IDs as
1277
      assigned to the pv systems
1278
    * Validate that the pv system IDs with a building are the same pv system IDs as
1279
      assigned to the buildings
1280
1281
    Parameters
1282
    -----------
1283
    desagg_mastr_gdf : geopandas.GeoDataFrame
1284
        GeoDataFrame containing MaStR data allocated to building IDs.
1285
    desagg_buildings_gdf : geopandas.GeoDataFrame
1286
        GeoDataFrame containing building data allocated to MaStR IDs.
1287
    """
1288
    assert len(
1289
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1290
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1291
    assert (
1292
        np.sort(
1293
            desagg_mastr_gdf.loc[
1294
                ~desagg_mastr_gdf.building_id.isna()
1295
            ].building_id.unique()
1296
        )
1297
        == np.sort(
1298
            desagg_buildings_gdf.loc[
1299
                ~desagg_buildings_gdf.gens_id.isna()
1300
            ].index.unique()
1301
        )
1302
    ).all()
1303
    assert (
1304
        np.sort(
1305
            desagg_mastr_gdf.loc[
1306
                ~desagg_mastr_gdf.building_id.isna()
1307
            ].index.unique()
1308
        )
1309
        == np.sort(
1310
            desagg_buildings_gdf.loc[
1311
                ~desagg_buildings_gdf.gens_id.isna()
1312
            ].gens_id.unique()
1313
        )
1314
    ).all()
1315
1316
    logger.debug("Validated output.")
1317
1318
1319
def drop_unallocated_gens(
1320
    gdf: gpd.GeoDataFrame,
1321
) -> gpd.GeoDataFrame:
1322
    """
1323
    Drop generators which did not get allocated.
1324
1325
    Parameters
1326
    -----------
1327
    gdf : geopandas.GeoDataFrame
1328
        GeoDataFrame containing MaStR data allocated to building IDs.
1329
    Returns
1330
    -------
1331
    geopandas.GeoDataFrame
1332
        GeoDataFrame containing MaStR data with generators dropped which did not get
1333
        allocated.
1334
    """
1335
    init_len = len(gdf)
1336
    gdf = gdf.loc[~gdf.building_id.isna()]
1337
    end_len = len(gdf)
1338
1339
    logger.debug(
1340
        f"Dropped {init_len - end_len} "
1341
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1342
        f" of {init_len} unallocated rows from MaStR DataFrame."
1343
    )
1344
1345
    return gdf
1346
1347
1348
@timer_func
1349
def allocate_to_buildings(
1350
    mastr_gdf: gpd.GeoDataFrame,
1351
    buildings_gdf: gpd.GeoDataFrame,
1352
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1353
    """
1354
    Allocate status quo pv rooftop generators to buildings.
1355
    Parameters
1356
    -----------
1357
    mastr_gdf : geopandas.GeoDataFrame
1358
        GeoDataFrame containing MaStR data with geocoded locations.
1359
    buildings_gdf : geopandas.GeoDataFrame
1360
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1361
        dropped.
1362
    Returns
1363
    -------
1364
    tuple with two geopandas.GeoDataFrame s
1365
        GeoDataFrame containing MaStR data allocated to building IDs.
1366
        GeoDataFrame containing building data allocated to MaStR IDs.
1367
    """
1368
    logger.debug("Starting allocation of status quo.")
1369
1370
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1371
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1372
1373
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1374
        q_mastr_gdf, q_buildings_gdf, SEED
1375
    )
1376
1377
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1378
1379
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1380
1381
1382
@timer_func
1383
def grid_districts(
1384
    epsg: int,
1385
) -> gpd.GeoDataFrame:
1386
    """
1387
    Load mv grid district geo data from eGo^n Database as
1388
    geopandas.GeoDataFrame.
1389
    Parameters
1390
    -----------
1391
    epsg : int
1392
        EPSG ID to use as CRS.
1393
    Returns
1394
    -------
1395
    geopandas.GeoDataFrame
1396
        GeoDataFrame containing mv grid district ID and geo shapes data.
1397
    """
1398
    gdf = db.select_geodataframe(
1399
        """
1400
        SELECT bus_id, geom
1401
        FROM grid.egon_mv_grid_district
1402
        ORDER BY bus_id
1403
        """,
1404
        index_col="bus_id",
1405
        geom_col="geom",
1406
        epsg=epsg,
1407
    )
1408
1409
    gdf.index = gdf.index.astype(int)
1410
1411
    logger.debug("Grid districts loaded.")
1412
1413
    return gdf
1414
1415
1416
def scenario_data(
1417
    carrier: str = "solar_rooftop",
1418
    scenario: str = "eGon2035",
1419
) -> pd.DataFrame:
1420
    """
1421
    Get scenario capacity data from eGo^n Database.
1422
    Parameters
1423
    -----------
1424
    carrier : str
1425
        Carrier type to filter table by.
1426
    scenario : str
1427
        Scenario to filter table by.
1428
    Returns
1429
    -------
1430
    geopandas.GeoDataFrame
1431
        GeoDataFrame with scenario capacity data in GW.
1432
    """
1433
    with db.session_scope() as session:
1434
        query = session.query(EgonScenarioCapacities).filter(
1435
            EgonScenarioCapacities.carrier == carrier,
1436
            EgonScenarioCapacities.scenario_name == scenario,
1437
        )
1438
1439
    df = pd.read_sql(
1440
        query.statement, query.session.bind, index_col="index"
1441
    ).sort_index()
1442
1443
    logger.debug("Scenario capacity data loaded.")
1444
1445
    return df
1446
1447
1448 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1449
    __tablename__ = "vg250_lan"
1450
    __table_args__ = {"schema": "boundaries"}
1451
1452
    id = Column(BigInteger, primary_key=True, index=True)
1453
    ade = Column(BigInteger)
1454
    gf = Column(BigInteger)
1455
    bsg = Column(BigInteger)
1456
    ars = Column(String)
1457
    ags = Column(String)
1458
    sdv_ars = Column(String)
1459
    gen = Column(String)
1460
    bez = Column(String)
1461
    ibz = Column(BigInteger)
1462
    bem = Column(String)
1463
    nbd = Column(String)
1464
    sn_l = Column(String)
1465
    sn_r = Column(String)
1466
    sn_k = Column(String)
1467
    sn_v1 = Column(String)
1468
    sn_v2 = Column(String)
1469
    sn_g = Column(String)
1470
    fk_s3 = Column(String)
1471
    nuts = Column(String)
1472
    ars_0 = Column(String)
1473
    ags_0 = Column(String)
1474
    wsk = Column(String)
1475
    debkg_id = Column(String)
1476
    rs = Column(String)
1477
    sdv_rs = Column(String)
1478
    rs_0 = Column(String)
1479
    geometry = Column(Geometry(srid=EPSG), index=True)
1480
1481
1482
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1483
    """
1484
    Get feder state data from eGo^n Database.
1485
    Parameters
1486
    -----------
1487
    to_crs : pyproj.crs.crs.CRS
1488
        CRS to transform geometries to.
1489
    Returns
1490
    -------
1491
    geopandas.GeoDataFrame
1492
        GeoDataFrame with federal state data.
1493
    """
1494
    with db.session_scope() as session:
1495
        query = session.query(
1496
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1497
        )
1498
1499
        gdf = gpd.read_postgis(
1500
            query.statement, session.connection(), index_col="id"
1501
        ).to_crs(to_crs)
1502
1503
    logger.debug("Federal State data loaded.")
1504
1505
    return gdf
1506
1507
1508
@timer_func
1509
def overlay_grid_districts_with_counties(
1510
    mv_grid_district_gdf: gpd.GeoDataFrame,
1511
    federal_state_gdf: gpd.GeoDataFrame,
1512
) -> gpd.GeoDataFrame:
1513
    """
1514
    Calculate the intersections of mv grid districts and counties.
1515
    Parameters
1516
    -----------
1517
    mv_grid_district_gdf : gpd.GeoDataFrame
1518
        GeoDataFrame containing mv grid district ID and geo shapes data.
1519
    federal_state_gdf : gpd.GeoDataFrame
1520
        GeoDataFrame with federal state data.
1521
    Returns
1522
    -------
1523
    geopandas.GeoDataFrame
1524
        GeoDataFrame containing OSM buildings data.
1525
    """
1526
    logger.debug(
1527
        "Calculating intersection overlay between mv grid districts and "
1528
        "counties. This may take a while..."
1529
    )
1530
1531
    gdf = gpd.overlay(
1532
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1533
        mv_grid_district_gdf.reset_index(),
1534
        how="intersection",
1535
        keep_geom_type=True,
1536
    )
1537
1538
    logger.debug("Done!")
1539
1540
    return gdf
1541
1542
1543
@timer_func
1544
def add_overlay_id_to_buildings(
1545
    buildings_gdf: gpd.GeoDataFrame,
1546
    grid_federal_state_gdf: gpd.GeoDataFrame,
1547
) -> gpd.GeoDataFrame:
1548
    """
1549
    Add information about overlay ID to buildings.
1550
    Parameters
1551
    -----------
1552
    buildings_gdf : geopandas.GeoDataFrame
1553
        GeoDataFrame containing OSM buildings data.
1554
    grid_federal_state_gdf : geopandas.GeoDataFrame
1555
        GeoDataFrame with intersection shapes between counties and grid districts.
1556
    Returns
1557
    -------
1558
    geopandas.GeoDataFrame
1559
        GeoDataFrame containing OSM buildings data with overlay ID added.
1560
    """
1561
    gdf = (
1562
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1563
        .sjoin(
1564
            grid_federal_state_gdf,
1565
            how="left",
1566
            predicate="intersects",
1567
        )
1568
        .rename(columns={"index_right": "overlay_id"})
1569
    )
1570
1571
    logger.debug("Added overlay ID to OSM buildings.")
1572
1573
    return gdf
1574
1575
1576
def drop_buildings_outside_grids(
1577
    buildings_gdf: gpd.GeoDataFrame,
1578
) -> gpd.GeoDataFrame:
1579
    """
1580
    Drop all buildings outside of grid areas.
1581
    Parameters
1582
    -----------
1583
    buildings_gdf : geopandas.GeoDataFrame
1584
        GeoDataFrame containing OSM buildings data.
1585
    Returns
1586
    -------
1587
    gepandas.GeoDataFrame
1588
        GeoDataFrame containing OSM buildings data
1589
        with buildings without an bus ID dropped.
1590
    """
1591
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1592
1593
    logger.debug(
1594
        f"{len(buildings_gdf) - len(gdf)} "
1595
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1596
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1597
        "and are therefore dropped."
1598
    )
1599
1600
    return gdf
1601
1602
1603
def cap_per_bus_id(
1604
    scenario: str,
1605
) -> pd.DataFrame:
1606
    """
1607
    Get table with total pv rooftop capacity per grid district.
1608
1609
    Parameters
1610
    -----------
1611
    scenario : str
1612
        Scenario name.
1613
    Returns
1614
    -------
1615
    pandas.DataFrame
1616
        DataFrame with total rooftop capacity per mv grid.
1617
    """
1618
    targets = config.datasets()["solar_rooftop"]["targets"]
1619
1620
    sql = f"""
1621
    SELECT bus as bus_id, p_nom as capacity
1622
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1623
    WHERE carrier = 'solar_rooftop'
1624
    AND scn_name = '{scenario}'
1625
    """
1626
1627
    return db.select_dataframe(sql, index_col="bus_id")
1628
1629
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1630
    #
1631
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1632
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1633
    #
1634
    #     capacity = nuts_gdf.building_area.multiply(
1635
    #         cap / nuts_gdf.building_area.sum()
1636
    #     )
1637
    #
1638
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1639
    #         nuts_gdf.index
1640
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1641
    #
1642
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1643
1644
1645
def determine_end_of_life_gens(
1646
    mastr_gdf: gpd.GeoDataFrame,
1647
    scenario_timestamp: pd.Timestamp,
1648
    pv_rooftop_lifetime: pd.Timedelta,
1649
) -> gpd.GeoDataFrame:
1650
    """
1651
    Determine if an old PV system has reached its end of life.
1652
    Parameters
1653
    -----------
1654
    mastr_gdf : geopandas.GeoDataFrame
1655
        GeoDataFrame containing geocoded MaStR data.
1656
    scenario_timestamp : pandas.Timestamp
1657
        Timestamp at which the scenario takes place.
1658
    pv_rooftop_lifetime : pandas.Timedelta
1659
        Average expected lifetime of PV rooftop systems.
1660
    Returns
1661
    -------
1662
    geopandas.GeoDataFrame
1663
        GeoDataFrame containing geocoded MaStR data and info if the system
1664
        has reached its end of life.
1665
    """
1666
    before = mastr_gdf.capacity.sum()
1667
1668
    mastr_gdf = mastr_gdf.assign(
1669
        age=scenario_timestamp - mastr_gdf.start_up_date
1670
    )
1671
1672
    mastr_gdf = mastr_gdf.assign(
1673
        end_of_life=pv_rooftop_lifetime < mastr_gdf.age
1674
    )
1675
1676
    after = mastr_gdf.loc[~mastr_gdf.end_of_life].capacity.sum()
1677
1678
    logger.debug(
1679
        "Determined if pv rooftop systems reached their end of life.\nTotal capacity: "
1680
        f"{before}\nActive capacity: {after}"
1681
    )
1682
1683
    return mastr_gdf
1684
1685
1686
def calculate_max_pv_cap_per_building(
1687
    buildings_gdf: gpd.GeoDataFrame,
1688
    mastr_gdf: gpd.GeoDataFrame,
1689
    pv_cap_per_sq_m: float | int,
1690
    roof_factor: float | int,
1691
) -> gpd.GeoDataFrame:
1692
    """
1693
    Calculate the estimated maximum possible PV capacity per building.
1694
    Parameters
1695
    -----------
1696
    buildings_gdf : geopandas.GeoDataFrame
1697
        GeoDataFrame containing OSM buildings data.
1698
    mastr_gdf : geopandas.GeoDataFrame
1699
        GeoDataFrame containing geocoded MaStR data.
1700
    pv_cap_per_sq_m : float, int
1701
        Average expected, installable PV capacity per square meter.
1702
    roof_factor : float, int
1703
        Average for PV usable roof area share.
1704
    Returns
1705
    -------
1706
    geopandas.GeoDataFrame
1707
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1708
        capacity.
1709
    """
1710
    gdf = (
1711
        buildings_gdf.reset_index()
1712
        .rename(columns={"index": "id"})
1713
        .merge(
1714
            mastr_gdf[
1715
                [
1716
                    "capacity",
1717
                    "end_of_life",
1718
                    "building_id",
1719
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1720
                    "Hauptausrichtung",
1721
                    "HauptausrichtungNeigungswinkel",
1722
                ]
1723
            ],
1724
            how="left",
1725
            left_on="id",
1726
            right_on="building_id",
1727
        )
1728
        .set_index("id")
1729
        .drop(columns="building_id")
1730
    )
1731
1732
    return gdf.assign(
1733
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1734
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1735
        bus_id=gdf.bus_id.astype(int),
1736
    )
1737
1738
1739
def calculate_building_load_factor(
1740
    mastr_gdf: gpd.GeoDataFrame,
1741
    buildings_gdf: gpd.GeoDataFrame,
1742
    rounding: int = 4,
1743
) -> gpd.GeoDataFrame:
1744
    """
1745
    Calculate the roof load factor from existing PV systems.
1746
    Parameters
1747
    -----------
1748
    mastr_gdf : geopandas.GeoDataFrame
1749
        GeoDataFrame containing geocoded MaStR data.
1750
    buildings_gdf : geopandas.GeoDataFrame
1751
        GeoDataFrame containing OSM buildings data.
1752
    rounding : int
1753
        Rounding to use for load factor.
1754
    Returns
1755
    -------
1756
    geopandas.GeoDataFrame
1757
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1758
    """
1759
    gdf = mastr_gdf.merge(
1760
        buildings_gdf[["max_cap", "building_area"]].loc[
1761
            ~buildings_gdf["max_cap"].isna()
1762
        ],
1763
        how="left",
1764
        left_on="building_id",
1765
        right_index=True,
1766
    )
1767
1768
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1769
1770
1771
def get_probability_for_property(
1772
    mastr_gdf: gpd.GeoDataFrame,
1773
    cap_range: tuple[int | float, int | float],
1774
    prop: str,
1775
) -> tuple[np.array, np.array]:
1776
    """
1777
    Calculate the probability of the different options of a property of the
1778
    existing PV plants.
1779
    Parameters
1780
    -----------
1781
    mastr_gdf : geopandas.GeoDataFrame
1782
        GeoDataFrame containing geocoded MaStR data.
1783
    cap_range : tuple(int, int)
1784
        Capacity range of PV plants to look at.
1785
    prop : str
1786
        Property to calculate probabilities for. String needs to be in columns
1787
        of mastr_gdf.
1788
    Returns
1789
    -------
1790
    tuple
1791
        numpy.array
1792
            Unique values of property.
1793
        numpy.array
1794
            Probabilties per unique value.
1795
    """
1796
    cap_range_gdf = mastr_gdf.loc[
1797
        (mastr_gdf.capacity > cap_range[0])
1798
        & (mastr_gdf.capacity <= cap_range[1])
1799
    ]
1800
1801
    if prop == "load_factor":
1802
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1803
1804
    count = Counter(
1805
        cap_range_gdf[prop].loc[
1806
            ~cap_range_gdf[prop].isna()
1807
            & ~cap_range_gdf[prop].isnull()
1808
            & ~(cap_range_gdf[prop] == "None")
1809
        ]
1810
    )
1811
1812
    values = np.array(list(count.keys()))
1813
    probabilities = np.fromiter(count.values(), dtype=float)
1814
    probabilities = probabilities / np.sum(probabilities)
1815
1816
    return values, probabilities
1817
1818
1819
@timer_func
1820
def probabilities(
1821
    mastr_gdf: gpd.GeoDataFrame,
1822
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1823
    properties: list[str] | None = None,
1824
) -> dict:
1825
    """
1826
    Calculate the probability of the different options of properties of the
1827
    existing PV plants.
1828
    Parameters
1829
    -----------
1830
    mastr_gdf : geopandas.GeoDataFrame
1831
        GeoDataFrame containing geocoded MaStR data.
1832
    cap_ranges : list(tuple(int, int))
1833
        List of capacity ranges to distinguish between. The first tuple should
1834
        start with a zero and the last one should end with infinite.
1835
    properties : list(str)
1836
        List of properties to calculate probabilities for. Strings needs to be
1837
        in columns of mastr_gdf.
1838
    Returns
1839
    -------
1840
    dict
1841
        Dictionary with values and probabilities per capacity range.
1842
    """
1843
    if cap_ranges is None:
1844
        cap_ranges = [
1845
            (0, 30),
1846
            (30, 100),
1847
            (100, float("inf")),
1848
        ]
1849
    if properties is None:
1850
        properties = [
1851
            "EinheitlicheAusrichtungUndNeigungswinkel",
1852
            "Hauptausrichtung",
1853
            "HauptausrichtungNeigungswinkel",
1854
            "load_factor",
1855
        ]
1856
1857
    prob_dict = {}
1858
1859
    for cap_range in cap_ranges:
1860
        prob_dict[cap_range] = {
1861
            "values": {},
1862
            "probabilities": {},
1863
        }
1864
1865
        for prop in properties:
1866
            v, p = get_probability_for_property(
1867
                mastr_gdf,
1868
                cap_range,
1869
                prop,
1870
            )
1871
1872
            prob_dict[cap_range]["values"][prop] = v
1873
            prob_dict[cap_range]["probabilities"][prop] = p
1874
1875
    return prob_dict
1876
1877
1878
def cap_share_per_cap_range(
1879
    mastr_gdf: gpd.GeoDataFrame,
1880
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1881
) -> dict[tuple[int | float, int | float], float]:
1882
    """
1883
    Calculate the share of PV capacity from the total PV capacity within
1884
    capacity ranges.
1885
    Parameters
1886
    -----------
1887
    mastr_gdf : geopandas.GeoDataFrame
1888
        GeoDataFrame containing geocoded MaStR data.
1889
    cap_ranges : list(tuple(int, int))
1890
        List of capacity ranges to distinguish between. The first tuple should
1891
        start with a zero and the last one should end with infinite.
1892
    Returns
1893
    -------
1894
    dict
1895
        Dictionary with share of PV capacity from the total PV capacity within
1896
        capacity ranges.
1897
    """
1898
    if cap_ranges is None:
1899
        cap_ranges = [
1900
            (0, 30),
1901
            (30, 100),
1902
            (100, float("inf")),
1903
        ]
1904
1905
    cap_share_dict = {}
1906
1907
    total_cap = mastr_gdf.capacity.sum()
1908
1909
    for cap_range in cap_ranges:
1910
        cap_share = (
1911
            mastr_gdf.loc[
1912
                (mastr_gdf.capacity > cap_range[0])
1913
                & (mastr_gdf.capacity <= cap_range[1])
1914
            ].capacity.sum()
1915
            / total_cap
1916
        )
1917
1918
        cap_share_dict[cap_range] = cap_share
1919
1920
    return cap_share_dict
1921
1922
1923
def mean_load_factor_per_cap_range(
1924
    mastr_gdf: gpd.GeoDataFrame,
1925
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1926
) -> dict[tuple[int | float, int | float], float]:
1927
    """
1928
    Calculate the mean roof load factor per capacity range from existing PV
1929
    plants.
1930
    Parameters
1931
    -----------
1932
    mastr_gdf : geopandas.GeoDataFrame
1933
        GeoDataFrame containing geocoded MaStR data.
1934
    cap_ranges : list(tuple(int, int))
1935
        List of capacity ranges to distinguish between. The first tuple should
1936
        start with a zero and the last one should end with infinite.
1937
    Returns
1938
    -------
1939
    dict
1940
        Dictionary with mean roof load factor per capacity range.
1941
    """
1942
    if cap_ranges is None:
1943
        cap_ranges = [
1944
            (0, 30),
1945
            (30, 100),
1946
            (100, float("inf")),
1947
        ]
1948
1949
    load_factor_dict = {}
1950
1951
    for cap_range in cap_ranges:
1952
        load_factor = mastr_gdf.loc[
1953
            (mastr_gdf.load_factor <= 1)
1954
            & (mastr_gdf.capacity > cap_range[0])
1955
            & (mastr_gdf.capacity <= cap_range[1])
1956
        ].load_factor.mean()
1957
1958
        load_factor_dict[cap_range] = load_factor
1959
1960
    return load_factor_dict
1961
1962
1963
def building_area_range_per_cap_range(
1964
    mastr_gdf: gpd.GeoDataFrame,
1965
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1966
    min_building_size: int | float = 10.0,
1967
    upper_quantile: float = 0.95,
1968
    lower_quantile: float = 0.05,
1969
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1970
    """
1971
    Estimate normal building area range per capacity range.
1972
    Calculate the mean roof load factor per capacity range from existing PV
1973
    plants.
1974
    Parameters
1975
    -----------
1976
    mastr_gdf : geopandas.GeoDataFrame
1977
        GeoDataFrame containing geocoded MaStR data.
1978
    cap_ranges : list(tuple(int, int))
1979
        List of capacity ranges to distinguish between. The first tuple should
1980
        start with a zero and the last one should end with infinite.
1981
    min_building_size : int, float
1982
        Minimal building size to consider for PV plants.
1983
    upper_quantile : float
1984
        Upper quantile to estimate maximum building size per capacity range.
1985
    lower_quantile : float
1986
        Lower quantile to estimate minimum building size per capacity range.
1987
    Returns
1988
    -------
1989
    dict
1990
        Dictionary with estimated normal building area range per capacity
1991
        range.
1992
    """
1993
    if cap_ranges is None:
1994
        cap_ranges = [
1995
            (0, 30),
1996
            (30, 100),
1997
            (100, float("inf")),
1998
        ]
1999
2000
    building_area_range_dict = {}
2001
2002
    n_ranges = len(cap_ranges)
2003
2004
    for count, cap_range in enumerate(cap_ranges):
2005
        cap_range_gdf = mastr_gdf.loc[
2006
            (mastr_gdf.capacity > cap_range[0])
2007
            & (mastr_gdf.capacity <= cap_range[1])
2008
        ]
2009
2010
        if count == 0:
2011
            building_area_range_dict[cap_range] = (
2012
                min_building_size,
2013
                cap_range_gdf.building_area.quantile(upper_quantile),
2014
            )
2015
        elif count == n_ranges - 1:
2016
            building_area_range_dict[cap_range] = (
2017
                cap_range_gdf.building_area.quantile(lower_quantile),
2018
                float("inf"),
2019
            )
2020
        else:
2021
            building_area_range_dict[cap_range] = (
2022
                cap_range_gdf.building_area.quantile(lower_quantile),
2023
                cap_range_gdf.building_area.quantile(upper_quantile),
2024
            )
2025
2026
    values = list(building_area_range_dict.values())
2027
2028
    building_area_range_normed_dict = {}
2029
2030
    for count, (cap_range, (min_area, max_area)) in enumerate(
2031
        building_area_range_dict.items()
2032
    ):
2033
        if count == 0:
2034
            building_area_range_normed_dict[cap_range] = (
2035
                min_area,
2036
                np.mean((values[count + 1][0], max_area)),
2037
            )
2038
        elif count == n_ranges - 1:
2039
            building_area_range_normed_dict[cap_range] = (
2040
                np.mean((values[count - 1][1], min_area)),
2041
                max_area,
2042
            )
2043
        else:
2044
            building_area_range_normed_dict[cap_range] = (
2045
                np.mean((values[count - 1][1], min_area)),
2046
                np.mean((values[count + 1][0], max_area)),
2047
            )
2048
2049
    return building_area_range_normed_dict
2050
2051
2052
@timer_func
2053
def desaggregate_pv_in_mv_grid(
2054
    buildings_gdf: gpd.GeoDataFrame,
2055
    pv_cap: float | int,
2056
    **kwargs,
2057
) -> gpd.GeoDataFrame:
2058
    """
2059
    Desaggregate PV capacity on buildings within a given grid district.
2060
    Parameters
2061
    -----------
2062
    buildings_gdf : geopandas.GeoDataFrame
2063
        GeoDataFrame containing buildings within the grid district.
2064
    pv_cap : float, int
2065
        PV capacity to desaggregate.
2066
    Other Parameters
2067
    -----------
2068
    prob_dict : dict
2069
        Dictionary with values and probabilities per capacity range.
2070
    cap_share_dict : dict
2071
        Dictionary with share of PV capacity from the total PV capacity within
2072
        capacity ranges.
2073
    building_area_range_dict : dict
2074
        Dictionary with estimated normal building area range per capacity
2075
        range.
2076
    load_factor_dict : dict
2077
        Dictionary with mean roof load factor per capacity range.
2078
    seed : int
2079
        Seed to use for random operations with NumPy and pandas.
2080
    pv_cap_per_sq_m : float, int
2081
        Average expected, installable PV capacity per square meter.
2082
    Returns
2083
    -------
2084
    geopandas.GeoDataFrame
2085
        GeoDataFrame containing OSM building data with desaggregated PV
2086
        plants.
2087
    """
2088
    bus_id = int(buildings_gdf.bus_id.iat[0])
2089
2090
    rng = default_rng(seed=kwargs["seed"])
2091
    random_state = RandomState(seed=kwargs["seed"])
2092
2093
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2094
2095
    for cap_range, share in kwargs["cap_share_dict"].items():
2096
        pv_cap_range = pv_cap * share
2097
2098
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2099
2100
        cap_range_buildings_gdf = buildings_gdf.loc[
2101
            ~buildings_gdf.index.isin(results_df.index)
2102
            & (buildings_gdf.building_area > b_area_min)
2103
            & (buildings_gdf.building_area <= b_area_max)
2104
        ]
2105
2106
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2107
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2108
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2109
            load_factor=np.nan,
2110
            capacity=np.nan,
2111
        )
2112
2113
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2114
2115
        if total_mean_cap == 0:
2116
            logger.warning(
2117
                f"There are no matching roof for capacity range {cap_range} "
2118
                f"kW in grid {bus_id}. Using all buildings as fallback."
2119
            )
2120
2121
            cap_range_buildings_gdf = buildings_gdf.loc[
2122
                ~buildings_gdf.index.isin(results_df.index)
2123
            ]
2124
2125
            if len(cap_range_buildings_gdf) == 0:
2126
                logger.warning(
2127
                    "There are no roofes available for capacity range "
2128
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2129
                )
2130
                cap_range_buildings_gdf = buildings_gdf.copy()
2131
2132
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2133
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2134
                load_factor=np.nan,
2135
                capacity=np.nan,
2136
            )
2137
2138
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2139
2140
        elif total_mean_cap < pv_cap_range:
2141
            logger.warning(
2142
                f"Average roof utilization of the roof area in grid {bus_id} "
2143
                f"and capacity range {cap_range} kW is not sufficient. The "
2144
                "roof utilization will be above average."
2145
            )
2146
2147
        frac = max(
2148
            pv_cap_range / total_mean_cap,
2149
            1 / len(cap_range_buildings_gdf),
2150
        )
2151
2152
        samples_gdf = cap_range_buildings_gdf.sample(
2153
            frac=min(1, frac),
2154
            random_state=random_state,
2155
        )
2156
2157
        cap_range_dict = kwargs["prob_dict"][cap_range]
2158
2159
        values_dict = cap_range_dict["values"]
2160
        p_dict = cap_range_dict["probabilities"]
2161
2162
        load_factors = rng.choice(
2163
            a=values_dict["load_factor"],
2164
            size=len(samples_gdf),
2165
            p=p_dict["load_factor"],
2166
        )
2167
2168
        samples_gdf = samples_gdf.assign(
2169
            load_factor=load_factors,
2170
            capacity=(
2171
                samples_gdf.building_area
2172
                * load_factors
2173
                * kwargs["pv_cap_per_sq_m"]
2174
            ).clip(lower=0.4),
2175
        )
2176
2177
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2178
2179
        samples_gdf = samples_gdf.assign(
2180
            capacity=(samples_gdf.capacity * missing_factor),
2181
            load_factor=(samples_gdf.load_factor * missing_factor),
2182
        )
2183
2184
        assert np.isclose(
2185
            samples_gdf.capacity.sum(),
2186
            pv_cap_range,
2187
            rtol=1e-03,
2188
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2189
2190
        results_df = pd.concat(
2191
            [
2192
                results_df,
2193
                samples_gdf,
2194
            ],
2195
        )
2196
2197
    total_missing_factor = pv_cap / results_df.capacity.sum()
2198
2199
    results_df = results_df.assign(
2200
        capacity=(results_df.capacity * total_missing_factor),
2201
    )
2202
2203
    assert np.isclose(
2204
        results_df.capacity.sum(),
2205
        pv_cap,
2206
        rtol=1e-03,
2207
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2208
2209
    return gpd.GeoDataFrame(
2210
        results_df,
2211
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2095 is not entered. Are you sure this can never be the case?
Loading history...
2212
        geometry="geom",
2213
    )
2214
2215
2216
@timer_func
2217
def desaggregate_pv(
2218
    buildings_gdf: gpd.GeoDataFrame,
2219
    cap_df: pd.DataFrame,
2220
    **kwargs,
2221
) -> gpd.GeoDataFrame:
2222
    """
2223
    Desaggregate PV capacity on buildings within a given grid district.
2224
    Parameters
2225
    -----------
2226
    buildings_gdf : geopandas.GeoDataFrame
2227
        GeoDataFrame containing OSM buildings data.
2228
    cap_df : pandas.DataFrame
2229
        DataFrame with total rooftop capacity per mv grid.
2230
    Other Parameters
2231
    -----------
2232
    prob_dict : dict
2233
        Dictionary with values and probabilities per capacity range.
2234
    cap_share_dict : dict
2235
        Dictionary with share of PV capacity from the total PV capacity within
2236
        capacity ranges.
2237
    building_area_range_dict : dict
2238
        Dictionary with estimated normal building area range per capacity
2239
        range.
2240
    load_factor_dict : dict
2241
        Dictionary with mean roof load factor per capacity range.
2242
    seed : int
2243
        Seed to use for random operations with NumPy and pandas.
2244
    pv_cap_per_sq_m : float, int
2245
        Average expected, installable PV capacity per square meter.
2246
    Returns
2247
    -------
2248
    geopandas.GeoDataFrame
2249
        GeoDataFrame containing OSM building data with desaggregated PV
2250
        plants.
2251
    """
2252
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2253
2254
    building_bus_ids = set(buildings_gdf.bus_id)
2255
    cap_bus_ids = set(cap_df.index)
2256
2257
    logger.debug(
2258
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from capacity: "
2259
        f"{len(cap_bus_ids)}"
2260
    )
2261
2262
    if len(building_bus_ids) > len(cap_bus_ids):
2263
        missing = building_bus_ids - cap_bus_ids
2264
    else:
2265
        missing = cap_bus_ids - building_bus_ids
2266
2267
    logger.debug(str(missing))
2268
2269
    bus_ids = np.intersect1d(list(building_bus_ids), list(cap_bus_ids))
2270
2271
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2272
2273
    for bus_id in bus_ids:
2274
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2275
2276
        pv_installed_gdf = buildings_grid_gdf.loc[
2277
            ~buildings_grid_gdf.end_of_life
2278
        ]
2279
2280
        pv_installed = pv_installed_gdf.capacity.sum()
2281
2282
        pot_buildings_gdf = buildings_grid_gdf.drop(
2283
            index=pv_installed_gdf.index
2284
        )
2285
2286
        if len(pot_buildings_gdf) == 0:
2287
            logger.error(
2288
                f"In grid {bus_id} there are no potential buildings to allocate "
2289
                "PV capacity to. The grid is skipped. This message should only "
2290
                "appear doing test runs with few buildings."
2291
            )
2292
2293
            continue
2294
2295
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2296
2297
        logger.debug(f"pv_target: {pv_target}")
2298
2299
        pv_missing = pv_target - pv_installed
2300
2301
        if pv_missing <= 0:
2302
            logger.info(
2303
                f"In grid {bus_id} there is more PV installed ({pv_installed: g}) in "
2304
                f"status Quo than allocated within the scenario ({pv_target: g}). No "
2305
                f"new generators are added."
2306
            )
2307
2308
            continue
2309
2310
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2311
            logger.error(
2312
                f"In grid {bus_id} there is less PV potential ("
2313
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV  capacity "
2314
                f"({pv_missing:g} kW). The average roof utilization will be very high."
2315
            )
2316
2317
        gdf = desaggregate_pv_in_mv_grid(
2318
            buildings_gdf=pot_buildings_gdf,
2319
            pv_cap=pv_missing,
2320
            **kwargs,
2321
        )
2322
2323
        logger.debug(f"New cap in grid {bus_id}: {gdf.capacity.sum()}")
2324
        logger.debug(f"Installed cap in grid {bus_id}: {pv_installed}")
2325
        logger.debug(
2326
            f"Total cap in grid {bus_id}: {gdf.capacity.sum() + pv_installed}"
2327
        )
2328
2329
        if not np.isclose(
2330
            gdf.capacity.sum() + pv_installed, pv_target, rtol=1e-3
2331
        ):
2332
            logger.warning(
2333
                f"The desired capacity and actual capacity in grid {bus_id} differ.\n"
2334
                f"Desired cap: {pv_target}\nActual cap: "
2335
                f"{gdf.capacity.sum() + pv_installed}"
2336
            )
2337
2338
        allocated_buildings_gdf = pd.concat(
2339
            [
2340
                allocated_buildings_gdf,
2341
                gdf,
2342
            ]
2343
        )
2344
2345
    logger.debug("Desaggregated scenario.")
2346
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2347
    logger.debug(
2348
        f"Generator capacity: {allocated_buildings_gdf.capacity.sum(): g}"
2349
    )
2350
2351
    return gpd.GeoDataFrame(
2352
        allocated_buildings_gdf,
2353
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2354
        geometry="geom",
2355
    )
2356
2357
2358
@timer_func
2359
def add_buildings_meta_data(
2360
    buildings_gdf: gpd.GeoDataFrame,
2361
    prob_dict: dict,
2362
    seed: int,
2363
) -> gpd.GeoDataFrame:
2364
    """
2365
    Randomly add additional metadata to desaggregated PV plants.
2366
    Parameters
2367
    -----------
2368
    buildings_gdf : geopandas.GeoDataFrame
2369
        GeoDataFrame containing OSM buildings data with desaggregated PV
2370
        plants.
2371
    prob_dict : dict
2372
        Dictionary with values and probabilities per capacity range.
2373
    seed : int
2374
        Seed to use for random operations with NumPy and pandas.
2375
    Returns
2376
    -------
2377
    geopandas.GeoDataFrame
2378
        GeoDataFrame containing OSM building data with desaggregated PV
2379
        plants.
2380
    """
2381
    rng = default_rng(seed=seed)
2382
    buildings_gdf = buildings_gdf.reset_index().rename(
2383
        columns={
2384
            "index": "building_id",
2385
        }
2386
    )
2387
2388
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2389
        cap_range_gdf = buildings_gdf.loc[
2390
            (buildings_gdf.capacity >= min_cap)
2391
            & (buildings_gdf.capacity < max_cap)
2392
        ]
2393
2394
        for key, values in cap_range_prob_dict["values"].items():
2395
            if key == "load_factor":
2396
                continue
2397
2398
            gdf = cap_range_gdf.loc[
2399
                cap_range_gdf[key].isna()
2400
                | cap_range_gdf[key].isnull()
2401
                | (cap_range_gdf[key] == "None")
2402
            ]
2403
2404
            key_vals = rng.choice(
2405
                a=values,
2406
                size=len(gdf),
2407
                p=cap_range_prob_dict["probabilities"][key],
2408
            )
2409
2410
            buildings_gdf.loc[gdf.index, key] = key_vals
2411
2412
    return buildings_gdf
2413
2414
2415
def add_voltage_level(
2416
    buildings_gdf: gpd.GeoDataFrame,
2417
) -> gpd.GeoDataFrame:
2418
    """
2419
    Add voltage level derived from generator capacity to the power plants.
2420
    Parameters
2421
    -----------
2422
    buildings_gdf : geopandas.GeoDataFrame
2423
        GeoDataFrame containing OSM buildings data with desaggregated PV
2424
        plants.
2425
    Returns
2426
    -------
2427
    geopandas.GeoDataFrame
2428
        GeoDataFrame containing OSM building data with voltage level per generator.
2429
    """
2430
2431
    def voltage_levels(p: float) -> int:
2432
        if p < 100:
2433
            return 7
2434
        elif p < 200:
2435
            return 6
2436
        elif p < 5500:
2437
            return 5
2438
        elif p < 20000:
2439
            return 4
2440
        elif p < 120000:
2441
            return 3
2442
        return 1
2443
2444
    return buildings_gdf.assign(
2445
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2446
    )
2447
2448
2449
def add_start_up_date(
2450
    buildings_gdf: gpd.GeoDataFrame,
2451
    start: pd.Timestamp,
2452
    end: pd.Timestamp,
2453
    seed: int,
2454
):
2455
    """
2456
    Randomly and linear add start-up date to new pv generators.
2457
    Parameters
2458
    ----------
2459
    buildings_gdf : geopandas.GeoDataFrame
2460
        GeoDataFrame containing OSM buildings data with desaggregated PV
2461
        plants.
2462
    start : pandas.Timestamp
2463
        Minimum Timestamp to use.
2464
    end : pandas.Timestamp
2465
        Maximum Timestamp to use.
2466
    seed : int
2467
        Seed to use for random operations with NumPy and pandas.
2468
    Returns
2469
    -------
2470
    geopandas.GeoDataFrame
2471
        GeoDataFrame containing OSM buildings data with start-up date added.
2472
    """
2473
    rng = default_rng(seed=seed)
2474
2475
    date_range = pd.date_range(start=start, end=end, freq="1D")
2476
2477
    return buildings_gdf.assign(
2478
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2479
    )
2480
2481
2482
@timer_func
2483
def allocate_scenarios(
2484
    mastr_gdf: gpd.GeoDataFrame,
2485
    valid_buildings_gdf: gpd.GeoDataFrame,
2486
    last_scenario_gdf: gpd.GeoDataFrame,
2487
    scenario: str,
2488
):
2489
    """
2490
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2491
    Parameters
2492
    ----------
2493
    mastr_gdf : geopandas.GeoDataFrame
2494
        GeoDataFrame containing geocoded MaStR data.
2495
    valid_buildings_gdf : geopandas.GeoDataFrame
2496
        GeoDataFrame containing OSM buildings data.
2497
    last_scenario_gdf : geopandas.GeoDataFrame
2498
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2499
        preceding scenario.
2500
    scenario : str
2501
        Scenario to desaggrgate and allocate.
2502
    Returns
2503
    -------
2504
    tuple
2505
        geopandas.GeoDataFrame
2506
            GeoDataFrame containing OSM buildings matched with pv generators.
2507
        pandas.DataFrame
2508
            DataFrame containing pv rooftop capacity per grid id.
2509
    """
2510
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2511
2512
    logger.debug(
2513
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2514
    )
2515
2516
    last_scenario_gdf = determine_end_of_life_gens(
2517
        last_scenario_gdf,
2518
        SCENARIO_TIMESTAMP[scenario],
2519
        PV_ROOFTOP_LIFETIME,
2520
    )
2521
2522
    buildings_gdf = calculate_max_pv_cap_per_building(
2523
        valid_buildings_gdf,
2524
        last_scenario_gdf,
2525
        PV_CAP_PER_SQ_M,
2526
        ROOF_FACTOR,
2527
    )
2528
2529
    mastr_gdf = calculate_building_load_factor(
2530
        mastr_gdf,
2531
        buildings_gdf,
2532
    )
2533
2534
    probabilities_dict = probabilities(
2535
        mastr_gdf,
2536
        cap_ranges=CAP_RANGES,
2537
    )
2538
2539
    cap_share_dict = cap_share_per_cap_range(
2540
        mastr_gdf,
2541
        cap_ranges=CAP_RANGES,
2542
    )
2543
2544
    load_factor_dict = mean_load_factor_per_cap_range(
2545
        mastr_gdf,
2546
        cap_ranges=CAP_RANGES,
2547
    )
2548
2549
    building_area_range_dict = building_area_range_per_cap_range(
2550
        mastr_gdf,
2551
        cap_ranges=CAP_RANGES,
2552
        min_building_size=MIN_BUILDING_SIZE,
2553
        upper_quantile=UPPER_QUNATILE,
2554
        lower_quantile=LOWER_QUANTILE,
2555
    )
2556
2557
    allocated_buildings_gdf = desaggregate_pv(
2558
        buildings_gdf=buildings_gdf,
2559
        cap_df=cap_per_bus_id_df,
2560
        prob_dict=probabilities_dict,
2561
        cap_share_dict=cap_share_dict,
2562
        building_area_range_dict=building_area_range_dict,
2563
        load_factor_dict=load_factor_dict,
2564
        seed=SEED,
2565
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2566
    )
2567
2568
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2569
2570
    meta_buildings_gdf = frame_to_numeric(
2571
        add_buildings_meta_data(
2572
            allocated_buildings_gdf,
2573
            probabilities_dict,
2574
            SEED,
2575
        )
2576
    )
2577
2578
    return (
2579
        add_start_up_date(
2580
            meta_buildings_gdf,
2581
            start=last_scenario_gdf.start_up_date.max(),
2582
            end=SCENARIO_TIMESTAMP[scenario],
2583
            seed=SEED,
2584
        ),
2585
        cap_per_bus_id_df,
2586
    )
2587
2588
2589
class EgonPowerPlantPvRoofBuildingScenario(Base):
2590
    __tablename__ = "egon_power_plants_pv_roof_building"
2591
    __table_args__ = {"schema": "supply"}
2592
2593
    index = Column(Integer, primary_key=True, index=True)
2594
    scenario = Column(String)
2595
    bus_id = Column(Integer, nullable=True)
2596
    building_id = Column(Integer)
2597
    gens_id = Column(String, nullable=True)
2598
    capacity = Column(Float)
2599
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2600
    hauptausrichtung = Column(String)
2601
    hauptausrichtung_neigungswinkel = Column(String)
2602
    voltage_level = Column(Integer)
2603
    weather_cell_id = Column(Integer)
2604
2605
2606
def create_scenario_table(buildings_gdf):
2607
    """Create mapping table pv_unit <-> building for scenario"""
2608
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2609
        bind=engine, checkfirst=True
2610
    )
2611
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2612
        bind=engine, checkfirst=True
2613
    )
2614
2615
    buildings_gdf.rename(columns=COLS_TO_RENAME).assign(
2616
        capacity=buildings_gdf.capacity.div(10**3)  # kW -> MW
2617
    )[COLS_TO_EXPORT].reset_index().to_sql(
2618
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2619
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2620
        con=db.engine(),
2621
        if_exists="append",
2622
        index=False,
2623
    )
2624
2625
2626
def geocode_mastr_data():
2627
    """
2628
    Read PV rooftop data from MaStR CSV
2629
    TODO: the source will be replaced as soon as the MaStR data is available
2630
     in DB.
2631
    """
2632
    mastr_df = mastr_data(
2633
        MASTR_INDEX_COL,
2634
        MASTR_RELEVANT_COLS,
2635
        MASTR_DTYPES,
2636
        MASTR_PARSE_DATES,
2637
    )
2638
2639
    clean_mastr_df = clean_mastr_data(
2640
        mastr_df,
2641
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2642
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2643
        seed=SEED,
2644
        rounding=ROUNDING,
2645
    )
2646
2647
    geocoding_df = geocoding_data(clean_mastr_df)
2648
2649
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2650
2651
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2652
2653
    create_geocoded_table(geocode_gdf)
2654
2655
2656
def add_weather_cell_id(buildings_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
2657
    sql = "SELECT building_id, zensus_population_id FROM boundaries.egon_map_zensus_mvgd_buildings"
2658
2659
    buildings_gdf = gpd.GeoDataFrame(
2660
        buildings_gdf.merge(
2661
            right=db.select_dataframe(sql),
2662
            how="left",
2663
            on="building_id",
2664
        ),
2665
        crs=buildings_gdf.crs,
2666
    )
2667
2668
    sql = """
2669
    SELECT zensus_population_id, w_id as weather_cell_id
2670
    FROM boundaries.egon_map_zensus_weather_cell
2671
    """
2672
2673
    buildings_gdf = gpd.GeoDataFrame(
2674
        buildings_gdf.merge(
2675
            right=db.select_dataframe(sql),
2676
            how="left",
2677
            on="zensus_population_id",
2678
        ),
2679
        crs=buildings_gdf.crs,
2680
    )
2681
2682
    if buildings_gdf.weather_cell_id.isna().any():
2683
        raise ValueError(
2684
            f"Following buildings don't have a weather cell id: "
2685
            f"{buildings_gdf.loc[buildings_gdf.weather_cell_id.isna()].building_id}"
2686
        )
2687
2688
    return buildings_gdf
2689
2690
2691
def pv_rooftop_to_buildings():
2692
    """Main script, executed as task"""
2693
2694
    mastr_gdf = load_mastr_data()
2695
2696
    buildings_gdf = load_building_data()
2697
2698
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2699
        mastr_gdf, buildings_gdf
2700
    )
2701
2702
    all_buildings_gdf = (
2703
        desagg_mastr_gdf.assign(scenario="status_quo")
2704
        .reset_index()
2705
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2706
    )
2707
2708
    scenario_buildings_gdf = all_buildings_gdf.copy()
2709
2710
    cap_per_bus_id_df = pd.DataFrame()
2711
2712
    for scenario in SCENARIOS:
2713
        logger.debug(f"Desaggregating scenario {scenario}.")
2714
        (
2715
            scenario_buildings_gdf,
2716
            cap_per_bus_id_scenario_df,
2717
        ) = allocate_scenarios(  # noqa: F841
2718
            desagg_mastr_gdf,
2719
            desagg_buildings_gdf,
2720
            scenario_buildings_gdf,
2721
            scenario,
2722
        )
2723
2724
        all_buildings_gdf = gpd.GeoDataFrame(
2725
            pd.concat(
2726
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2727
            ),
2728
            crs=scenario_buildings_gdf.crs,
2729
            geometry="geom",
2730
        )
2731
2732
        cap_per_bus_id_df = pd.concat(
2733
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2734
        )
2735
2736
    # add weather cell
2737
    all_buildings_gdf = add_weather_cell_id(all_buildings_gdf)
2738
2739
    # export scenario
2740
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2741