Passed
Pull Request — dev (#1008)
by
unknown
01:35
created

allocate_scenarios()   B

Complexity

Conditions 1

Size

Total Lines 104
Code Lines 58

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 58
dl 0
loc 104
rs 8.3745
c 0
b 0
f 0
cc 1
nop 4

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035", "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(20 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "bus_id",
193
    "building_id",
194
    "gens_id",
195
    "capacity",
196
    "einheitliche_ausrichtung_und_neigungswinkel",
197
    "hauptausrichtung",
198
    "hauptausrichtung_neigungswinkel",
199
    "voltage_level",
200
    "weather_cell_id",
201
]
202
203
# TODO
204
INCLUDE_SYNTHETIC_BUILDINGS = True
205
ONLY_BUILDINGS_WITH_DEMAND = True
206
TEST_RUN = False
207
208
209
def timer_func(func):
210
    @wraps(func)
211
    def timeit_wrapper(*args, **kwargs):
212
        start_time = perf_counter()
213
        result = func(*args, **kwargs)
214
        end_time = perf_counter()
215
        total_time = end_time - start_time
216
        logger.debug(
217
            f"Function {func.__name__} took {total_time:.4f} seconds."
218
        )
219
        return result
220
221
    return timeit_wrapper
222
223
224
@timer_func
225
def mastr_data(
226
    index_col: str | int | list[str] | list[int],
227
    usecols: list[str],
228
    dtype: dict[str, Any] | None,
229
    parse_dates: list[str] | None,
230
) -> pd.DataFrame:
231
    """
232
    Read MaStR data from csv.
233
234
    Parameters
235
    -----------
236
    index_col : str, int or list of str or int
237
        Column(s) to use as the row labels of the DataFrame.
238
    usecols : list of str
239
        Return a subset of the columns.
240
    dtype : dict of column (str) -> type (any), optional
241
        Data type for data or columns.
242
    parse_dates : list of names (str), optional
243
        Try to parse given columns to datetime.
244
    Returns
245
    -------
246
    pandas.DataFrame
247
        DataFrame containing MaStR data.
248
    """
249
    mastr_path = Path(
250
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
251
    ).resolve()
252
253
    mastr_df = pd.read_csv(
254
        mastr_path,
255
        index_col=index_col,
256
        usecols=usecols,
257
        dtype=dtype,
258
        parse_dates=parse_dates,
259
    )
260
261
    mastr_df = mastr_df.loc[
262
        (mastr_df.StatisikFlag == "B")
263
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
264
        & (mastr_df.Land == "Deutschland")
265
        & (mastr_df.Lage == "BaulicheAnlagen")
266
    ]
267
268
    if (
269
        config.settings()["egon-data"]["--dataset-boundary"]
270
        == "Schleswig-Holstein"
271
    ):
272
        init_len = len(mastr_df)
273
274
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
275
276
        logger.info(
277
            f"Using only MaStR data within Schleswig-Holstein. "
278
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
279
        )
280
281
    logger.debug("MaStR data loaded.")
282
283
    return mastr_df
284
285
286
@timer_func
287
def clean_mastr_data(
288
    mastr_df: pd.DataFrame,
289
    max_realistic_pv_cap: int | float,
290
    min_realistic_pv_cap: int | float,
291
    rounding: int,
292
    seed: int,
293
) -> pd.DataFrame:
294
    """
295
    Clean the MaStR data from implausible data.
296
297
    * Drop MaStR ID duplicates.
298
    * Drop generators with implausible capacities.
299
    * Drop generators without any kind of start-up date.
300
    * Clean up Standort column and capacity.
301
302
    Parameters
303
    -----------
304
    mastr_df : pandas.DataFrame
305
        DataFrame containing MaStR data.
306
    max_realistic_pv_cap : int or float
307
        Maximum capacity, which is considered to be realistic.
308
    min_realistic_pv_cap : int or float
309
        Minimum capacity, which is considered to be realistic.
310
    rounding : int
311
        Rounding to use when cleaning up capacity. E.g. when
312
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
313
    seed : int
314
        Seed to use for random operations with NumPy and pandas.
315
    Returns
316
    -------
317
    pandas.DataFrame
318
        DataFrame containing cleaned MaStR data.
319
    """
320
    init_len = len(mastr_df)
321
322
    # drop duplicates
323
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
324
325
    # drop invalid entries in standort
326
    index_to_drop = mastr_df.loc[
327
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
328
    ].index
329
330
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
331
332
    df = mastr_df[
333
        [
334
            "Bruttoleistung",
335
            "Bruttoleistung_extended",
336
            "Nettonennleistung",
337
            "zugeordneteWirkleistungWechselrichter",
338
            "InstallierteLeistung",
339
        ]
340
    ].round(rounding)
341
342
    # use only the smallest capacity rating if multiple are given
343
    mastr_df = mastr_df.assign(
344
        capacity=[
345
            most_plausible(p_tub, min_realistic_pv_cap)
346
            for p_tub in df.itertuples(index=False)
347
        ]
348
    )
349
350
    # drop generators without any capacity info
351
    # and capacity of zero
352
    # and if the capacity is > 23.5 MW, because
353
    # Germanies largest rooftop PV is 23 MW
354
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
355
    mastr_df = mastr_df.loc[
356
        (~mastr_df.capacity.isna())
357
        & (mastr_df.capacity <= max_realistic_pv_cap)
358
        & (mastr_df.capacity > min_realistic_pv_cap)
359
    ]
360
361
    # get zip and municipality
362
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
363
        mastr_df.Standort.astype(str)
364
        .apply(
365
            zip_and_municipality_from_standort,
366
            args=(VERBOSE,),
367
        )
368
        .tolist(),
369
        index=mastr_df.index,
370
    )
371
372
    # drop invalid entries
373
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
374
375
    # add ", Deutschland" just in case
376
    mastr_df = mastr_df.assign(
377
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
378
    )
379
380
    # get consistent start-up date
381
    mastr_df = mastr_df.assign(
382
        start_up_date=mastr_df.Inbetriebnahmedatum,
383
    )
384
385
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
386
        mastr_df.start_up_date.isna()
387
    ].assign(
388
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
389
            mastr_df.start_up_date.isna()
390
        ]
391
    )
392
393
    # randomly and weighted fill missing start-up dates
394
    pool = mastr_df.loc[
395
        ~mastr_df.start_up_date.isna()
396
    ].start_up_date.to_numpy()
397
398
    size = len(mastr_df) - len(pool)
399
400
    if size > 0:
401
        np.random.seed(seed)
402
403
        choice = np.random.choice(
404
            pool,
405
            size=size,
406
            replace=False,
407
        )
408
409
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
410
            mastr_df.start_up_date.isna()
411
        ].assign(start_up_date=choice)
412
413
        logger.info(
414
            f"Randomly and weigthed added start-up date to {size} generators."
415
        )
416
417
    mastr_df = mastr_df.assign(
418
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
419
    )
420
421
    end_len = len(mastr_df)
422
    logger.debug(
423
        f"Dropped {init_len - end_len} "
424
        f"({((init_len - end_len) / init_len) * 100:g}%)"
425
        f" of {init_len} rows from MaStR DataFrame."
426
    )
427
428
    return mastr_df
429
430
431
def zip_and_municipality_from_standort(
432
    standort: str,
433
    verbose: bool = False,
434
) -> tuple[str, bool]:
435
    """
436
    Get zip code and municipality from Standort string split into a list.
437
    Parameters
438
    -----------
439
    standort : str
440
        Standort as given from MaStR data.
441
    verbose : bool
442
        Logs additional info if True.
443
    Returns
444
    -------
445
    str
446
        Standort with only the zip code and municipality
447
        as well a ', Germany' added.
448
    """
449
    if verbose:
450
        logger.debug(f"Uncleaned String: {standort}")
451
452
    standort_list = standort.split()
453
454
    found = False
455
    count = 0
456
457
    for count, elem in enumerate(standort_list):
458
        if len(elem) != 5:
459
            continue
460
        if not elem.isnumeric():
461
            continue
462
463
        found = True
464
465
        break
466
467
    if found:
468
        cleaned_str = " ".join(standort_list[count:])
469
470
        if verbose:
471
            logger.debug(f"Cleaned String:   {cleaned_str}")
472
473
        return cleaned_str, found
474
475
    logger.warning(
476
        "Couldn't identify zip code. This entry will be dropped."
477
        f" Original standort: {standort}."
478
    )
479
480
    return standort, found
481
482
483
def most_plausible(
484
    p_tub: tuple,
485
    min_realistic_pv_cap: int | float,
486
) -> float:
487
    """
488
    Try to determine the most plausible capacity.
489
    Try to determine the most plausible capacity from a given
490
    generator from MaStR data.
491
    Parameters
492
    -----------
493
    p_tub : tuple
494
        Tuple containing the different capacities given in
495
        the MaStR data.
496
    min_realistic_pv_cap : int or float
497
        Minimum capacity, which is considered to be realistic.
498
    Returns
499
    -------
500
    float
501
        Capacity of the generator estimated as the most realistic.
502
    """
503
    count = Counter(p_tub).most_common(3)
504
505
    if len(count) == 1:
506
        return count[0][0]
507
508
    val1 = count[0][0]
509
    val2 = count[1][0]
510
511
    if len(count) == 2:
512
        min_val = min(val1, val2)
513
        max_val = max(val1, val2)
514
    else:
515
        val3 = count[2][0]
516
517
        min_val = min(val1, val2, val3)
518
        max_val = max(val1, val2, val3)
519
520
    if min_val < min_realistic_pv_cap:
521
        return max_val
522
523
    return min_val
524
525
526
def geocoder(
527
    user_agent: str,
528
    min_delay_seconds: int,
529
) -> RateLimiter:
530
    """
531
    Setup Nominatim geocoding class.
532
    Parameters
533
    -----------
534
    user_agent : str
535
        The app name.
536
    min_delay_seconds : int
537
        Delay in seconds to use between requests to Nominatim.
538
        A minimum of 1 is advised.
539
    Returns
540
    -------
541
    geopy.extra.rate_limiter.RateLimiter
542
        Nominatim RateLimiter geocoding class to use for geocoding.
543
    """
544
    locator = Nominatim(user_agent=user_agent)
545
    return RateLimiter(
546
        locator.geocode,
547
        min_delay_seconds=min_delay_seconds,
548
    )
549
550
551
def geocoding_data(
552
    clean_mastr_df: pd.DataFrame,
553
) -> pd.DataFrame:
554
    """
555
    Setup DataFrame to geocode.
556
    Parameters
557
    -----------
558
    clean_mastr_df : pandas.DataFrame
559
        DataFrame containing cleaned MaStR data.
560
    Returns
561
    -------
562
    pandas.DataFrame
563
        DataFrame containing all unique combinations of
564
        zip codes with municipalities for geocoding.
565
    """
566
    return pd.DataFrame(
567
        data=clean_mastr_df.zip_and_municipality.unique(),
568
        columns=["zip_and_municipality"],
569
    )
570
571
572
@timer_func
573
def geocode_data(
574
    geocoding_df: pd.DataFrame,
575
    ratelimiter: RateLimiter,
576
    epsg: int,
577
) -> gpd.GeoDataFrame:
578
    """
579
    Geocode zip code and municipality.
580
    Extract latitude, longitude and altitude.
581
    Transfrom latitude and longitude to shapely
582
    Point and return a geopandas GeoDataFrame.
583
    Parameters
584
    -----------
585
    geocoding_df : pandas.DataFrame
586
        DataFrame containing all unique combinations of
587
        zip codes with municipalities for geocoding.
588
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
589
        Nominatim RateLimiter geocoding class to use for geocoding.
590
    epsg : int
591
        EPSG ID to use as CRS.
592
    Returns
593
    -------
594
    geopandas.GeoDataFrame
595
        GeoDataFrame containing all unique combinations of
596
        zip codes with municipalities with matching geolocation.
597
    """
598
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
599
600
    geocode_df = geocoding_df.assign(
601
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
602
    )
603
604
    geocode_df = geocode_df.assign(
605
        point=geocode_df.location.apply(
606
            lambda loc: tuple(loc.point) if loc else None
607
        )
608
    )
609
610
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
611
        geocode_df.point.tolist(), index=geocode_df.index
612
    )
613
614
    return gpd.GeoDataFrame(
615
        geocode_df,
616
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
617
        crs=f"EPSG:{epsg}",
618
    )
619
620
621
def merge_geocode_with_mastr(
622
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
623
) -> gpd.GeoDataFrame:
624
    """
625
    Merge geometry to original mastr data.
626
    Parameters
627
    -----------
628
    clean_mastr_df : pandas.DataFrame
629
        DataFrame containing cleaned MaStR data.
630
    geocode_gdf : geopandas.GeoDataFrame
631
        GeoDataFrame containing all unique combinations of
632
        zip codes with municipalities with matching geolocation.
633
    Returns
634
    -------
635
    gepandas.GeoDataFrame
636
        GeoDataFrame containing cleaned MaStR data with
637
        matching geolocation from geocoding.
638
    """
639
    return gpd.GeoDataFrame(
640
        clean_mastr_df.merge(
641
            geocode_gdf[["zip_and_municipality", "geometry"]],
642
            how="left",
643
            left_on="zip_and_municipality",
644
            right_on="zip_and_municipality",
645
        ),
646
        crs=geocode_gdf.crs,
647
    ).set_index(clean_mastr_df.index)
648
649
650
def drop_invalid_entries_from_gdf(
651
    gdf: gpd.GeoDataFrame,
652
) -> gpd.GeoDataFrame:
653
    """
654
    Drop invalid entries from geopandas GeoDataFrame.
655
    TODO: how to omit the logging from geos here???
656
    Parameters
657
    -----------
658
    gdf : geopandas.GeoDataFrame
659
        GeoDataFrame to be checked for validity.
660
    Returns
661
    -------
662
    gepandas.GeoDataFrame
663
        GeoDataFrame with rows with invalid geometries
664
        dropped.
665
    """
666
    valid_gdf = gdf.loc[gdf.is_valid]
667
668
    logger.debug(
669
        f"{len(gdf) - len(valid_gdf)} "
670
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
671
        f"of {len(gdf)} values were invalid and are dropped."
672
    )
673
674
    return valid_gdf
675
676
677
@timer_func
678
def municipality_data() -> gpd.GeoDataFrame:
679
    """
680
    Get municipality data from eGo^n Database.
681
    Returns
682
    -------
683
    gepandas.GeoDataFrame
684
        GeoDataFrame with municipality data.
685
    """
686
    with db.session_scope() as session:
687
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
688
689
    return gpd.read_postgis(
690
        query.statement, query.session.bind, index_col="ags"
691
    )
692
693
694
@timer_func
695
def add_ags_to_gens(
696
    valid_mastr_gdf: gpd.GeoDataFrame,
697
    municipalities_gdf: gpd.GeoDataFrame,
698
) -> gpd.GeoDataFrame:
699
    """
700
    Add information about AGS ID to generators.
701
    Parameters
702
    -----------
703
    valid_mastr_gdf : geopandas.GeoDataFrame
704
        GeoDataFrame with valid and cleaned MaStR data.
705
    municipalities_gdf : geopandas.GeoDataFrame
706
        GeoDataFrame with municipality data.
707
    Returns
708
    -------
709
    gepandas.GeoDataFrame
710
        GeoDataFrame with valid and cleaned MaStR data
711
        with AGS ID added.
712
    """
713
    return valid_mastr_gdf.sjoin(
714
        municipalities_gdf,
715
        how="left",
716
        predicate="intersects",
717
    ).rename(columns={"index_right": "ags"})
718
719
720
def drop_gens_outside_muns(
721
    valid_mastr_gdf: gpd.GeoDataFrame,
722
) -> gpd.GeoDataFrame:
723
    """
724
    Drop all generators outside of municipalities.
725
    Parameters
726
    -----------
727
    valid_mastr_gdf : geopandas.GeoDataFrame
728
        GeoDataFrame with valid and cleaned MaStR data.
729
    Returns
730
    -------
731
    gepandas.GeoDataFrame
732
        GeoDataFrame with valid and cleaned MaStR data
733
        with generatos without an AGS ID dropped.
734
    """
735
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
736
737
    logger.debug(
738
        f"{len(valid_mastr_gdf) - len(gdf)} "
739
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
740
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
741
        " and are therefore dropped."
742
    )
743
744
    return gdf
745
746
747
class EgonMastrPvRoofGeocoded(Base):
748
    __tablename__ = "egon_mastr_pv_roof_geocoded"
749
    __table_args__ = {"schema": "supply"}
750
751
    zip_and_municipality = Column(String, primary_key=True, index=True)
752
    location = Column(String)
753
    point = Column(String)
754
    latitude = Column(Float)
755
    longitude = Column(Float)
756
    altitude = Column(Float)
757
    geometry = Column(Geometry(srid=EPSG))
758
759
760
def create_geocoded_table(geocode_gdf):
761
    """
762
    Create geocoded table mastr pv rooftop
763
    Parameters
764
    -----------
765
    geocode_gdf : geopandas.GeoDataFrame
766
        GeoDataFrame containing geocoding information on pv rooftop locations.
767
    """
768
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
769
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
770
771
    geocode_gdf.to_postgis(
772
        name=EgonMastrPvRoofGeocoded.__table__.name,
773
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
774
        con=db.engine(),
775
        if_exists="append",
776
        index=False,
777
        # dtype={}
778
    )
779
780
781
def geocoded_data_from_db(
782
    epsg: str | int,
783
) -> gpd.GeoDataFrame:
784
    """
785
    Read OSM buildings data from eGo^n Database.
786
    Parameters
787
    -----------
788
    to_crs : pyproj.crs.crs.CRS
789
        CRS to transform geometries to.
790
    Returns
791
    -------
792
    geopandas.GeoDataFrame
793
        GeoDataFrame containing OSM buildings data.
794
    """
795
    with db.session_scope() as session:
796
        query = session.query(
797
            EgonMastrPvRoofGeocoded.zip_and_municipality,
798
            EgonMastrPvRoofGeocoded.geometry,
799
        )
800
801
    return gpd.read_postgis(
802
        query.statement, query.session.bind, geom_col="geometry"
803
    ).to_crs(f"EPSG:{epsg}")
804
805
806
def load_mastr_data():
807
    """Read PV rooftop data from MaStR CSV
808
    Note: the source will be replaced as soon as the MaStR data is available
809
    in DB.
810
    Returns
811
    -------
812
    geopandas.GeoDataFrame
813
        GeoDataFrame containing MaStR data with geocoded locations.
814
    """
815
    mastr_df = mastr_data(
816
        MASTR_INDEX_COL,
817
        MASTR_RELEVANT_COLS,
818
        MASTR_DTYPES,
819
        MASTR_PARSE_DATES,
820
    )
821
822
    clean_mastr_df = clean_mastr_data(
823
        mastr_df,
824
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
825
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
826
        seed=SEED,
827
        rounding=ROUNDING,
828
    )
829
830
    geocode_gdf = geocoded_data_from_db(EPSG)
831
832
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
833
834
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
835
836
    municipalities_gdf = municipality_data()
837
838
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
839
840
    return drop_gens_outside_muns(valid_mastr_gdf)
841
842
843
class OsmBuildingsFiltered(Base):
844
    __tablename__ = "osm_buildings_filtered"
845
    __table_args__ = {"schema": "openstreetmap"}
846
847
    osm_id = Column(BigInteger)
848
    amenity = Column(String)
849
    building = Column(String)
850
    name = Column(String)
851
    geom = Column(Geometry(srid=SRID), index=True)
852
    area = Column(Float)
853
    geom_point = Column(Geometry(srid=SRID), index=True)
854
    tags = Column(HSTORE)
855
    id = Column(BigInteger, primary_key=True, index=True)
856
857
858
@timer_func
859
def osm_buildings(
860
    to_crs: CRS,
861
) -> gpd.GeoDataFrame:
862
    """
863
    Read OSM buildings data from eGo^n Database.
864
    Parameters
865
    -----------
866
    to_crs : pyproj.crs.crs.CRS
867
        CRS to transform geometries to.
868
    Returns
869
    -------
870
    geopandas.GeoDataFrame
871
        GeoDataFrame containing OSM buildings data.
872
    """
873
    with db.session_scope() as session:
874
        query = session.query(
875
            OsmBuildingsFiltered.id,
876
            OsmBuildingsFiltered.area,
877
            OsmBuildingsFiltered.geom_point.label("geom"),
878
        )
879
880
    return gpd.read_postgis(
881
        query.statement, query.session.bind, index_col="id"
882
    ).to_crs(to_crs)
883
884
885
@timer_func
886
def synthetic_buildings(
887
    to_crs: CRS,
888
) -> gpd.GeoDataFrame:
889
    """
890
    Read synthetic buildings data from eGo^n Database.
891
    Parameters
892
    -----------
893
    to_crs : pyproj.crs.crs.CRS
894
        CRS to transform geometries to.
895
    Returns
896
    -------
897
    geopandas.GeoDataFrame
898
        GeoDataFrame containing OSM buildings data.
899
    """
900
    with db.session_scope() as session:
901
        query = session.query(
902
            OsmBuildingsSynthetic.id,
903
            OsmBuildingsSynthetic.area,
904
            OsmBuildingsSynthetic.geom_point.label("geom"),
905
        )
906
907
    return gpd.read_postgis(
908
        query.statement, query.session.bind, index_col="id"
909
    ).to_crs(to_crs)
910
911
912
@timer_func
913
def add_ags_to_buildings(
914
    buildings_gdf: gpd.GeoDataFrame,
915
    municipalities_gdf: gpd.GeoDataFrame,
916
) -> gpd.GeoDataFrame:
917
    """
918
    Add information about AGS ID to buildings.
919
    Parameters
920
    -----------
921
    buildings_gdf : geopandas.GeoDataFrame
922
        GeoDataFrame containing OSM buildings data.
923
    municipalities_gdf : geopandas.GeoDataFrame
924
        GeoDataFrame with municipality data.
925
    Returns
926
    -------
927
    gepandas.GeoDataFrame
928
        GeoDataFrame containing OSM buildings data
929
        with AGS ID added.
930
    """
931
    return buildings_gdf.sjoin(
932
        municipalities_gdf,
933
        how="left",
934
        predicate="intersects",
935
    ).rename(columns={"index_right": "ags"})
936
937
938
def drop_buildings_outside_muns(
939
    buildings_gdf: gpd.GeoDataFrame,
940
) -> gpd.GeoDataFrame:
941
    """
942
    Drop all buildings outside of municipalities.
943
    Parameters
944
    -----------
945
    buildings_gdf : geopandas.GeoDataFrame
946
        GeoDataFrame containing OSM buildings data.
947
    Returns
948
    -------
949
    gepandas.GeoDataFrame
950
        GeoDataFrame containing OSM buildings data
951
        with buildings without an AGS ID dropped.
952
    """
953
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
954
955
    logger.debug(
956
        f"{len(buildings_gdf) - len(gdf)} "
957
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
958
        f"of {len(buildings_gdf)} values are outside of the municipalities "
959
        "and are therefore dropped."
960
    )
961
962
    return gdf
963
964
965
def egon_building_peak_loads():
966
    sql = """
967
    SELECT building_id
968
    FROM demand.egon_building_electricity_peak_loads
969
    WHERE scenario = 'eGon2035'
970
    """
971
972
    return (
973
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
974
    )
975
976
977
@timer_func
978
def load_building_data():
979
    """
980
    Read buildings from DB
981
    Tables:
982
983
    * `openstreetmap.osm_buildings_filtered` (from OSM)
984
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
985
986
    Use column `id` for both as it is unique hence you concat both datasets. If
987
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
988
989
    Returns
990
    -------
991
    gepandas.GeoDataFrame
992
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
993
        dropped.
994
    """
995
996
    municipalities_gdf = municipality_data()
997
998
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
999
1000
    if INCLUDE_SYNTHETIC_BUILDINGS:
1001
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
1002
1003
        buildings_gdf = gpd.GeoDataFrame(
1004
            pd.concat(
1005
                [
1006
                    osm_buildings_gdf,
1007
                    synthetic_buildings_gdf,
1008
                ]
1009
            ),
1010
            geometry="geom",
1011
            crs=osm_buildings_gdf.crs,
1012
        ).rename(columns={"area": "building_area"})
1013
1014
        buildings_gdf.index = buildings_gdf.index.astype(int)
1015
1016
    else:
1017
        buildings_gdf = osm_buildings_gdf.rename(
1018
            columns={"area": "building_area"}
1019
        )
1020
1021
    if ONLY_BUILDINGS_WITH_DEMAND:
1022
        building_ids = egon_building_peak_loads()
1023
1024
        init_len = len(building_ids)
1025
1026
        building_ids = np.intersect1d(
1027
            list(map(int, building_ids)),
1028
            list(map(int, buildings_gdf.index.to_numpy())),
1029
        )
1030
1031
        end_len = len(building_ids)
1032
1033
        logger.debug(
1034
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of buildings have "
1035
            f"peak load."
1036
        )
1037
1038
        buildings_gdf = buildings_gdf.loc[building_ids]
1039
1040
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1041
1042
    buildings_ags_gdf = drop_buildings_outside_muns(buildings_ags_gdf)
1043
1044
    grid_districts_gdf = grid_districts(EPSG)
1045
1046
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
1047
1048
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
1049
        grid_districts_gdf,
1050
        federal_state_gdf,
1051
    )
1052
1053
    buildings_overlay_gdf = add_overlay_id_to_buildings(
1054
        buildings_ags_gdf,
1055
        grid_federal_state_gdf,
1056
    )
1057
1058
    logger.debug("Loaded buildings.")
1059
1060
    buildings_overlay_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
1061
1062
    # overwrite bus_id with data from new table
1063
    sql = "SELECT building_id, bus_id FROM boundaries.egon_map_zensus_mvgd_buildings"
1064
    map_building_bus_df = db.select_dataframe(sql)
1065
1066
    building_ids = np.intersect1d(
1067
        list(map(int, map_building_bus_df.building_id.unique())),
1068
        list(map(int, buildings_overlay_gdf.index.to_numpy())),
1069
    )
1070
1071
    buildings_within_gdf = buildings_overlay_gdf.loc[building_ids]
1072
1073
    gdf = (
1074
        buildings_within_gdf.reset_index()
1075
        .drop(columns=["bus_id"])
1076
        .merge(
1077
            how="left",
1078
            right=map_building_bus_df,
1079
            left_on="id",
1080
            right_on="building_id",
1081
        )
1082
        .drop(columns=["building_id"])
1083
        .set_index("id")
1084
        .sort_index()
1085
    )
1086
1087
    return gdf[~gdf.index.duplicated(keep="first")]
1088
1089
1090
@timer_func
1091
def sort_and_qcut_df(
1092
    df: pd.DataFrame | gpd.GeoDataFrame,
1093
    col: str,
1094
    q: int,
1095
) -> pd.DataFrame | gpd.GeoDataFrame:
1096
    """
1097
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1098
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1099
    Parameters
1100
    -----------
1101
    df : pandas.DataFrame or geopandas.GeoDataFrame
1102
        (Geo)DataFrame to sort and qcut.
1103
    col : str
1104
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1105
    q : int
1106
        Number of quantiles.
1107
    Returns
1108
    -------
1109
    pandas.DataFrame or gepandas.GeoDataFrame
1110
        Sorted and qcut (Geo)DataFrame.
1111
    """
1112
    df = df.sort_values(col, ascending=True)
1113
1114
    return df.assign(
1115
        quant=pd.qcut(
1116
            df[col],
1117
            q=q,
1118
            labels=range(q),
1119
        )
1120
    )
1121
1122
1123
@timer_func
1124
def allocate_pv(
1125
    q_mastr_gdf: gpd.GeoDataFrame,
1126
    q_buildings_gdf: gpd.GeoDataFrame,
1127
    seed: int,
1128
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1129
    """
1130
    Allocate the MaStR pv generators to the OSM buildings.
1131
    This will determine a building for each pv generator if there are more
1132
    buildings than generators within a given AGS. Primarily generators are
1133
    distributed with the same qunatile as the buildings. Multiple assignment
1134
    is excluded.
1135
    Parameters
1136
    -----------
1137
    q_mastr_gdf : geopandas.GeoDataFrame
1138
        GeoDataFrame containing geocoded and qcut MaStR data.
1139
    q_buildings_gdf : geopandas.GeoDataFrame
1140
        GeoDataFrame containing qcut OSM buildings data.
1141
    seed : int
1142
        Seed to use for random operations with NumPy and pandas.
1143
    Returns
1144
    -------
1145
    tuple with two geopandas.GeoDataFrame s
1146
        GeoDataFrame containing MaStR data allocated to building IDs.
1147
        GeoDataFrame containing building data allocated to MaStR IDs.
1148
    """
1149
    rng = default_rng(seed=seed)
1150
1151
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1152
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1153
1154
    ags_list = q_buildings_gdf.ags.unique()
1155
1156
    if TEST_RUN:
1157
        ags_list = ags_list[:250]
1158
1159
    num_ags = len(ags_list)
1160
1161
    t0 = perf_counter()
1162
1163
    for count, ags in enumerate(ags_list):
1164
1165
        buildings = q_buildings_gdf.loc[
1166
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1167
        ]
1168
        gens = q_mastr_gdf.loc[q_mastr_gdf.ags == ags]
1169
1170
        len_build = len(buildings)
1171
        len_gens = len(gens)
1172
1173
        if len_build < len_gens:
1174
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1175
            logger.error(
1176
                f"There are {len_gens} generators and only {len_build}"
1177
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1178
                "generators were truncated to match the amount of buildings."
1179
            )
1180
1181
            assert len_build == len(gens)
1182
1183
        for quant in gens.quant.unique():
1184
            q_buildings = buildings.loc[buildings.quant == quant]
1185
            q_gens = gens.loc[gens.quant == quant]
1186
1187
            len_build = len(q_buildings)
1188
            len_gens = len(q_gens)
1189
1190
            if len_build < len_gens:
1191
                delta = len_gens - len_build
1192
1193
                logger.warning(
1194
                    f"There are {len_gens} generators and only {len_build} "
1195
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1196
                    f"buildings from AGS {ags} will be added randomly."
1197
                )
1198
1199
                add_buildings = pd.Index(
1200
                    rng.choice(
1201
                        buildings.loc[
1202
                            (buildings.quant != quant)
1203
                            & (buildings.gens_id.isna())
1204
                        ].index,
1205
                        size=delta,
1206
                        replace=False,
1207
                    )
1208
                )
1209
1210
                q_buildings = buildings.loc[
1211
                    q_buildings.index.append(add_buildings)
1212
                ]
1213
1214
                assert len(q_buildings) == len_gens
1215
1216
            chosen_buildings = pd.Index(
1217
                rng.choice(
1218
                    q_buildings.index,
1219
                    size=len_gens,
1220
                    replace=False,
1221
                )
1222
            )
1223
1224
            # q_mastr_gdf.loc[q_gens.index, "building_id"] = chosen_buildings
1225
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1226
            buildings = buildings.drop(chosen_buildings)
1227
1228
        if count % 100 == 0:
1229
            logger.debug(
1230
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1231
                f"{perf_counter() - t0:g} seconds."
1232
            )
1233
1234
            t0 = perf_counter()
1235
1236
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1237
1238
    assert len(assigned_buildings) == len(assigned_buildings.gens_id.unique())
1239
1240
    q_mastr_gdf.loc[
1241
        assigned_buildings.gens_id, "building_id"
1242
    ] = assigned_buildings.index
1243
1244
    assigned_gens = q_mastr_gdf.loc[~q_mastr_gdf.building_id.isna()]
1245
1246
    assert len(assigned_buildings) == len(assigned_gens)
1247
1248
    logger.debug("Allocated status quo generators to buildings.")
1249
1250
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1251
1252
1253
def frame_to_numeric(
1254
    df: pd.DataFrame | gpd.GeoDataFrame,
1255
) -> pd.DataFrame | gpd.GeoDataFrame:
1256
    """
1257
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1258
    Parameters
1259
    ----------
1260
    df : pandas.DataFrame or geopandas.GeoDataFrame
1261
    Returns
1262
    -------
1263
    pandas.DataFrame or geopandas.GeoDataFrame
1264
    """
1265
    if str(df.index.dtype) == "object":
1266
        df.index = pd.to_numeric(df.index, errors="ignore")
1267
1268
    for col in df.columns:
1269
        if str(df[col].dtype) == "object":
1270
            df[col] = pd.to_numeric(df[col], errors="ignore")
1271
1272
    return df
1273
1274
1275
def validate_output(
1276
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1277
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1278
) -> None:
1279
    """
1280
    Validate output.
1281
1282
    * Validate that there are exactly as many buildings with a pv system as there are
1283
      pv systems with a building
1284
    * Validate that the building IDs with a pv system are the same building IDs as
1285
      assigned to the pv systems
1286
    * Validate that the pv system IDs with a building are the same pv system IDs as
1287
      assigned to the buildings
1288
1289
    Parameters
1290
    -----------
1291
    desagg_mastr_gdf : geopandas.GeoDataFrame
1292
        GeoDataFrame containing MaStR data allocated to building IDs.
1293
    desagg_buildings_gdf : geopandas.GeoDataFrame
1294
        GeoDataFrame containing building data allocated to MaStR IDs.
1295
    """
1296
    assert len(
1297
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1298
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1299
    assert (
1300
        np.sort(
1301
            desagg_mastr_gdf.loc[
1302
                ~desagg_mastr_gdf.building_id.isna()
1303
            ].building_id.unique()
1304
        )
1305
        == np.sort(
1306
            desagg_buildings_gdf.loc[
1307
                ~desagg_buildings_gdf.gens_id.isna()
1308
            ].index.unique()
1309
        )
1310
    ).all()
1311
    assert (
1312
        np.sort(
1313
            desagg_mastr_gdf.loc[
1314
                ~desagg_mastr_gdf.building_id.isna()
1315
            ].index.unique()
1316
        )
1317
        == np.sort(
1318
            desagg_buildings_gdf.loc[
1319
                ~desagg_buildings_gdf.gens_id.isna()
1320
            ].gens_id.unique()
1321
        )
1322
    ).all()
1323
1324
    logger.debug("Validated output.")
1325
1326
1327
def drop_unallocated_gens(
1328
    gdf: gpd.GeoDataFrame,
1329
) -> gpd.GeoDataFrame:
1330
    """
1331
    Drop generators which did not get allocated.
1332
1333
    Parameters
1334
    -----------
1335
    gdf : geopandas.GeoDataFrame
1336
        GeoDataFrame containing MaStR data allocated to building IDs.
1337
    Returns
1338
    -------
1339
    geopandas.GeoDataFrame
1340
        GeoDataFrame containing MaStR data with generators dropped which did not get
1341
        allocated.
1342
    """
1343
    init_len = len(gdf)
1344
    gdf = gdf.loc[~gdf.building_id.isna()]
1345
    end_len = len(gdf)
1346
1347
    logger.debug(
1348
        f"Dropped {init_len - end_len} "
1349
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1350
        f" of {init_len} unallocated rows from MaStR DataFrame."
1351
    )
1352
1353
    return gdf
1354
1355
1356
@timer_func
1357
def allocate_to_buildings(
1358
    mastr_gdf: gpd.GeoDataFrame,
1359
    buildings_gdf: gpd.GeoDataFrame,
1360
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1361
    """
1362
    Allocate status quo pv rooftop generators to buildings.
1363
    Parameters
1364
    -----------
1365
    mastr_gdf : geopandas.GeoDataFrame
1366
        GeoDataFrame containing MaStR data with geocoded locations.
1367
    buildings_gdf : geopandas.GeoDataFrame
1368
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1369
        dropped.
1370
    Returns
1371
    -------
1372
    tuple with two geopandas.GeoDataFrame s
1373
        GeoDataFrame containing MaStR data allocated to building IDs.
1374
        GeoDataFrame containing building data allocated to MaStR IDs.
1375
    """
1376
    logger.debug("Starting allocation of status quo.")
1377
1378
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1379
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1380
1381
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1382
        q_mastr_gdf, q_buildings_gdf, SEED
1383
    )
1384
1385
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1386
1387
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1388
1389
1390
@timer_func
1391
def grid_districts(
1392
    epsg: int,
1393
) -> gpd.GeoDataFrame:
1394
    """
1395
    Load mv grid district geo data from eGo^n Database as
1396
    geopandas.GeoDataFrame.
1397
    Parameters
1398
    -----------
1399
    epsg : int
1400
        EPSG ID to use as CRS.
1401
    Returns
1402
    -------
1403
    geopandas.GeoDataFrame
1404
        GeoDataFrame containing mv grid district ID and geo shapes data.
1405
    """
1406
    gdf = db.select_geodataframe(
1407
        """
1408
        SELECT bus_id, geom
1409
        FROM grid.egon_mv_grid_district
1410
        ORDER BY bus_id
1411
        """,
1412
        index_col="bus_id",
1413
        geom_col="geom",
1414
        epsg=epsg,
1415
    )
1416
1417
    gdf.index = gdf.index.astype(int)
1418
1419
    logger.debug("Grid districts loaded.")
1420
1421
    return gdf
1422
1423
1424
def scenario_data(
1425
    carrier: str = "solar_rooftop",
1426
    scenario: str = "eGon2035",
1427
) -> pd.DataFrame:
1428
    """
1429
    Get scenario capacity data from eGo^n Database.
1430
    Parameters
1431
    -----------
1432
    carrier : str
1433
        Carrier type to filter table by.
1434
    scenario : str
1435
        Scenario to filter table by.
1436
    Returns
1437
    -------
1438
    geopandas.GeoDataFrame
1439
        GeoDataFrame with scenario capacity data in GW.
1440
    """
1441
    with db.session_scope() as session:
1442
        query = session.query(EgonScenarioCapacities).filter(
1443
            EgonScenarioCapacities.carrier == carrier,
1444
            EgonScenarioCapacities.scenario_name == scenario,
1445
        )
1446
1447
    df = pd.read_sql(
1448
        query.statement, query.session.bind, index_col="index"
1449
    ).sort_index()
1450
1451
    logger.debug("Scenario capacity data loaded.")
1452
1453
    return df
1454
1455
1456 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1457
    __tablename__ = "vg250_lan"
1458
    __table_args__ = {"schema": "boundaries"}
1459
1460
    id = Column(BigInteger, primary_key=True, index=True)
1461
    ade = Column(BigInteger)
1462
    gf = Column(BigInteger)
1463
    bsg = Column(BigInteger)
1464
    ars = Column(String)
1465
    ags = Column(String)
1466
    sdv_ars = Column(String)
1467
    gen = Column(String)
1468
    bez = Column(String)
1469
    ibz = Column(BigInteger)
1470
    bem = Column(String)
1471
    nbd = Column(String)
1472
    sn_l = Column(String)
1473
    sn_r = Column(String)
1474
    sn_k = Column(String)
1475
    sn_v1 = Column(String)
1476
    sn_v2 = Column(String)
1477
    sn_g = Column(String)
1478
    fk_s3 = Column(String)
1479
    nuts = Column(String)
1480
    ars_0 = Column(String)
1481
    ags_0 = Column(String)
1482
    wsk = Column(String)
1483
    debkg_id = Column(String)
1484
    rs = Column(String)
1485
    sdv_rs = Column(String)
1486
    rs_0 = Column(String)
1487
    geometry = Column(Geometry(srid=EPSG), index=True)
1488
1489
1490
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1491
    """
1492
    Get feder state data from eGo^n Database.
1493
    Parameters
1494
    -----------
1495
    to_crs : pyproj.crs.crs.CRS
1496
        CRS to transform geometries to.
1497
    Returns
1498
    -------
1499
    geopandas.GeoDataFrame
1500
        GeoDataFrame with federal state data.
1501
    """
1502
    with db.session_scope() as session:
1503
        query = session.query(
1504
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1505
        )
1506
1507
        gdf = gpd.read_postgis(
1508
            query.statement, session.connection(), index_col="id"
1509
        ).to_crs(to_crs)
1510
1511
    logger.debug("Federal State data loaded.")
1512
1513
    return gdf
1514
1515
1516
@timer_func
1517
def overlay_grid_districts_with_counties(
1518
    mv_grid_district_gdf: gpd.GeoDataFrame,
1519
    federal_state_gdf: gpd.GeoDataFrame,
1520
) -> gpd.GeoDataFrame:
1521
    """
1522
    Calculate the intersections of mv grid districts and counties.
1523
    Parameters
1524
    -----------
1525
    mv_grid_district_gdf : gpd.GeoDataFrame
1526
        GeoDataFrame containing mv grid district ID and geo shapes data.
1527
    federal_state_gdf : gpd.GeoDataFrame
1528
        GeoDataFrame with federal state data.
1529
    Returns
1530
    -------
1531
    geopandas.GeoDataFrame
1532
        GeoDataFrame containing OSM buildings data.
1533
    """
1534
    logger.debug(
1535
        "Calculating intersection overlay between mv grid districts and "
1536
        "counties. This may take a while..."
1537
    )
1538
1539
    gdf = gpd.overlay(
1540
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1541
        mv_grid_district_gdf.reset_index(),
1542
        how="intersection",
1543
        keep_geom_type=True,
1544
    )
1545
1546
    logger.debug("Done!")
1547
1548
    return gdf
1549
1550
1551
@timer_func
1552
def add_overlay_id_to_buildings(
1553
    buildings_gdf: gpd.GeoDataFrame,
1554
    grid_federal_state_gdf: gpd.GeoDataFrame,
1555
) -> gpd.GeoDataFrame:
1556
    """
1557
    Add information about overlay ID to buildings.
1558
    Parameters
1559
    -----------
1560
    buildings_gdf : geopandas.GeoDataFrame
1561
        GeoDataFrame containing OSM buildings data.
1562
    grid_federal_state_gdf : geopandas.GeoDataFrame
1563
        GeoDataFrame with intersection shapes between counties and grid districts.
1564
    Returns
1565
    -------
1566
    geopandas.GeoDataFrame
1567
        GeoDataFrame containing OSM buildings data with overlay ID added.
1568
    """
1569
    gdf = (
1570
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1571
        .sjoin(
1572
            grid_federal_state_gdf,
1573
            how="left",
1574
            predicate="intersects",
1575
        )
1576
        .rename(columns={"index_right": "overlay_id"})
1577
    )
1578
1579
    logger.debug("Added overlay ID to OSM buildings.")
1580
1581
    return gdf
1582
1583
1584
def drop_buildings_outside_grids(
1585
    buildings_gdf: gpd.GeoDataFrame,
1586
) -> gpd.GeoDataFrame:
1587
    """
1588
    Drop all buildings outside of grid areas.
1589
    Parameters
1590
    -----------
1591
    buildings_gdf : geopandas.GeoDataFrame
1592
        GeoDataFrame containing OSM buildings data.
1593
    Returns
1594
    -------
1595
    gepandas.GeoDataFrame
1596
        GeoDataFrame containing OSM buildings data
1597
        with buildings without an bus ID dropped.
1598
    """
1599
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1600
1601
    logger.debug(
1602
        f"{len(buildings_gdf) - len(gdf)} "
1603
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1604
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1605
        "and are therefore dropped."
1606
    )
1607
1608
    return gdf
1609
1610
1611
def cap_per_bus_id(
1612
    scenario: str,
1613
) -> pd.DataFrame:
1614
    """
1615
    Get table with total pv rooftop capacity per grid district.
1616
1617
    Parameters
1618
    -----------
1619
    scenario : str
1620
        Scenario name.
1621
    Returns
1622
    -------
1623
    pandas.DataFrame
1624
        DataFrame with total rooftop capacity per mv grid.
1625
    """
1626
    targets = config.datasets()["solar_rooftop"]["targets"]
1627
1628
    sql = f"""
1629
    SELECT bus as bus_id, p_nom as capacity
1630
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1631
    WHERE carrier = 'solar_rooftop'
1632
    AND scn_name = '{scenario}'
1633
    """
1634
1635
    return db.select_dataframe(sql, index_col="bus_id")
1636
1637
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1638
    #
1639
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1640
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1641
    #
1642
    #     capacity = nuts_gdf.building_area.multiply(
1643
    #         cap / nuts_gdf.building_area.sum()
1644
    #     )
1645
    #
1646
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1647
    #         nuts_gdf.index
1648
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1649
    #
1650
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1651
1652
1653
def determine_end_of_life_gens(
1654
    mastr_gdf: gpd.GeoDataFrame,
1655
    scenario_timestamp: pd.Timestamp,
1656
    pv_rooftop_lifetime: pd.Timedelta,
1657
) -> gpd.GeoDataFrame:
1658
    """
1659
    Determine if an old PV system has reached its end of life.
1660
    Parameters
1661
    -----------
1662
    mastr_gdf : geopandas.GeoDataFrame
1663
        GeoDataFrame containing geocoded MaStR data.
1664
    scenario_timestamp : pandas.Timestamp
1665
        Timestamp at which the scenario takes place.
1666
    pv_rooftop_lifetime : pandas.Timedelta
1667
        Average expected lifetime of PV rooftop systems.
1668
    Returns
1669
    -------
1670
    geopandas.GeoDataFrame
1671
        GeoDataFrame containing geocoded MaStR data and info if the system
1672
        has reached its end of life.
1673
    """
1674
    before = mastr_gdf.capacity.sum()
1675
1676
    mastr_gdf = mastr_gdf.assign(
1677
        age=scenario_timestamp - mastr_gdf.start_up_date
1678
    )
1679
1680
    mastr_gdf = mastr_gdf.assign(
1681
        end_of_life=pv_rooftop_lifetime < mastr_gdf.age
1682
    )
1683
1684
    after = mastr_gdf.loc[~mastr_gdf.end_of_life].capacity.sum()
1685
1686
    logger.debug(
1687
        "Determined if pv rooftop systems reached their end of life.\nTotal capacity: "
1688
        f"{before}\nActive capacity: {after}"
1689
    )
1690
1691
    return mastr_gdf
1692
1693
1694
def calculate_max_pv_cap_per_building(
1695
    buildings_gdf: gpd.GeoDataFrame,
1696
    mastr_gdf: gpd.GeoDataFrame,
1697
    pv_cap_per_sq_m: float | int,
1698
    roof_factor: float | int,
1699
) -> gpd.GeoDataFrame:
1700
    """
1701
    Calculate the estimated maximum possible PV capacity per building.
1702
    Parameters
1703
    -----------
1704
    buildings_gdf : geopandas.GeoDataFrame
1705
        GeoDataFrame containing OSM buildings data.
1706
    mastr_gdf : geopandas.GeoDataFrame
1707
        GeoDataFrame containing geocoded MaStR data.
1708
    pv_cap_per_sq_m : float, int
1709
        Average expected, installable PV capacity per square meter.
1710
    roof_factor : float, int
1711
        Average for PV usable roof area share.
1712
    Returns
1713
    -------
1714
    geopandas.GeoDataFrame
1715
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1716
        capacity.
1717
    """
1718
    gdf = (
1719
        buildings_gdf.reset_index()
1720
        .rename(columns={"index": "id"})
1721
        .merge(
1722
            mastr_gdf[
1723
                [
1724
                    "capacity",
1725
                    "end_of_life",
1726
                    "building_id",
1727
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1728
                    "Hauptausrichtung",
1729
                    "HauptausrichtungNeigungswinkel",
1730
                ]
1731
            ],
1732
            how="left",
1733
            left_on="id",
1734
            right_on="building_id",
1735
        )
1736
        .set_index("id")
1737
        .drop(columns="building_id")
1738
    )
1739
1740
    return gdf.assign(
1741
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1742
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1743
        bus_id=gdf.bus_id.astype(int),
1744
    )
1745
1746
1747
def calculate_building_load_factor(
1748
    mastr_gdf: gpd.GeoDataFrame,
1749
    buildings_gdf: gpd.GeoDataFrame,
1750
    rounding: int = 4,
1751
) -> gpd.GeoDataFrame:
1752
    """
1753
    Calculate the roof load factor from existing PV systems.
1754
    Parameters
1755
    -----------
1756
    mastr_gdf : geopandas.GeoDataFrame
1757
        GeoDataFrame containing geocoded MaStR data.
1758
    buildings_gdf : geopandas.GeoDataFrame
1759
        GeoDataFrame containing OSM buildings data.
1760
    rounding : int
1761
        Rounding to use for load factor.
1762
    Returns
1763
    -------
1764
    geopandas.GeoDataFrame
1765
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1766
    """
1767
    gdf = mastr_gdf.merge(
1768
        buildings_gdf[["max_cap", "building_area"]]
1769
        .loc[~buildings_gdf["max_cap"].isna()]
1770
        .reset_index(),
1771
        how="left",
1772
        left_on="building_id",
1773
        right_on="id",
1774
    ).set_index("id")
1775
1776
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1777
1778
1779
def get_probability_for_property(
1780
    mastr_gdf: gpd.GeoDataFrame,
1781
    cap_range: tuple[int | float, int | float],
1782
    prop: str,
1783
) -> tuple[np.array, np.array]:
1784
    """
1785
    Calculate the probability of the different options of a property of the
1786
    existing PV plants.
1787
    Parameters
1788
    -----------
1789
    mastr_gdf : geopandas.GeoDataFrame
1790
        GeoDataFrame containing geocoded MaStR data.
1791
    cap_range : tuple(int, int)
1792
        Capacity range of PV plants to look at.
1793
    prop : str
1794
        Property to calculate probabilities for. String needs to be in columns
1795
        of mastr_gdf.
1796
    Returns
1797
    -------
1798
    tuple
1799
        numpy.array
1800
            Unique values of property.
1801
        numpy.array
1802
            Probabilties per unique value.
1803
    """
1804
    cap_range_gdf = mastr_gdf.loc[
1805
        (mastr_gdf.capacity > cap_range[0])
1806
        & (mastr_gdf.capacity <= cap_range[1])
1807
    ]
1808
1809
    if prop == "load_factor":
1810
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1811
1812
    count = Counter(
1813
        cap_range_gdf[prop].loc[
1814
            ~cap_range_gdf[prop].isna()
1815
            & ~cap_range_gdf[prop].isnull()
1816
            & ~(cap_range_gdf[prop] == "None")
1817
        ]
1818
    )
1819
1820
    values = np.array(list(count.keys()))
1821
    probabilities = np.fromiter(count.values(), dtype=float)
1822
    probabilities = probabilities / np.sum(probabilities)
1823
1824
    return values, probabilities
1825
1826
1827
@timer_func
1828
def probabilities(
1829
    mastr_gdf: gpd.GeoDataFrame,
1830
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1831
    properties: list[str] | None = None,
1832
) -> dict:
1833
    """
1834
    Calculate the probability of the different options of properties of the
1835
    existing PV plants.
1836
    Parameters
1837
    -----------
1838
    mastr_gdf : geopandas.GeoDataFrame
1839
        GeoDataFrame containing geocoded MaStR data.
1840
    cap_ranges : list(tuple(int, int))
1841
        List of capacity ranges to distinguish between. The first tuple should
1842
        start with a zero and the last one should end with infinite.
1843
    properties : list(str)
1844
        List of properties to calculate probabilities for. Strings needs to be
1845
        in columns of mastr_gdf.
1846
    Returns
1847
    -------
1848
    dict
1849
        Dictionary with values and probabilities per capacity range.
1850
    """
1851
    if cap_ranges is None:
1852
        cap_ranges = [
1853
            (0, 30),
1854
            (30, 100),
1855
            (100, float("inf")),
1856
        ]
1857
    if properties is None:
1858
        properties = [
1859
            "EinheitlicheAusrichtungUndNeigungswinkel",
1860
            "Hauptausrichtung",
1861
            "HauptausrichtungNeigungswinkel",
1862
            "load_factor",
1863
        ]
1864
1865
    prob_dict = {}
1866
1867
    for cap_range in cap_ranges:
1868
        prob_dict[cap_range] = {
1869
            "values": {},
1870
            "probabilities": {},
1871
        }
1872
1873
        for prop in properties:
1874
            v, p = get_probability_for_property(
1875
                mastr_gdf,
1876
                cap_range,
1877
                prop,
1878
            )
1879
1880
            prob_dict[cap_range]["values"][prop] = v
1881
            prob_dict[cap_range]["probabilities"][prop] = p
1882
1883
    return prob_dict
1884
1885
1886
def cap_share_per_cap_range(
1887
    mastr_gdf: gpd.GeoDataFrame,
1888
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1889
) -> dict[tuple[int | float, int | float], float]:
1890
    """
1891
    Calculate the share of PV capacity from the total PV capacity within
1892
    capacity ranges.
1893
    Parameters
1894
    -----------
1895
    mastr_gdf : geopandas.GeoDataFrame
1896
        GeoDataFrame containing geocoded MaStR data.
1897
    cap_ranges : list(tuple(int, int))
1898
        List of capacity ranges to distinguish between. The first tuple should
1899
        start with a zero and the last one should end with infinite.
1900
    Returns
1901
    -------
1902
    dict
1903
        Dictionary with share of PV capacity from the total PV capacity within
1904
        capacity ranges.
1905
    """
1906
    if cap_ranges is None:
1907
        cap_ranges = [
1908
            (0, 30),
1909
            (30, 100),
1910
            (100, float("inf")),
1911
        ]
1912
1913
    cap_share_dict = {}
1914
1915
    total_cap = mastr_gdf.capacity.sum()
1916
1917
    for cap_range in cap_ranges:
1918
        cap_share = (
1919
            mastr_gdf.loc[
1920
                (mastr_gdf.capacity > cap_range[0])
1921
                & (mastr_gdf.capacity <= cap_range[1])
1922
            ].capacity.sum()
1923
            / total_cap
1924
        )
1925
1926
        cap_share_dict[cap_range] = cap_share
1927
1928
    return cap_share_dict
1929
1930
1931
def mean_load_factor_per_cap_range(
1932
    mastr_gdf: gpd.GeoDataFrame,
1933
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1934
) -> dict[tuple[int | float, int | float], float]:
1935
    """
1936
    Calculate the mean roof load factor per capacity range from existing PV
1937
    plants.
1938
    Parameters
1939
    -----------
1940
    mastr_gdf : geopandas.GeoDataFrame
1941
        GeoDataFrame containing geocoded MaStR data.
1942
    cap_ranges : list(tuple(int, int))
1943
        List of capacity ranges to distinguish between. The first tuple should
1944
        start with a zero and the last one should end with infinite.
1945
    Returns
1946
    -------
1947
    dict
1948
        Dictionary with mean roof load factor per capacity range.
1949
    """
1950
    if cap_ranges is None:
1951
        cap_ranges = [
1952
            (0, 30),
1953
            (30, 100),
1954
            (100, float("inf")),
1955
        ]
1956
1957
    load_factor_dict = {}
1958
1959
    for cap_range in cap_ranges:
1960
        load_factor = mastr_gdf.loc[
1961
            (mastr_gdf.load_factor <= 1)
1962
            & (mastr_gdf.capacity > cap_range[0])
1963
            & (mastr_gdf.capacity <= cap_range[1])
1964
        ].load_factor.mean()
1965
1966
        load_factor_dict[cap_range] = load_factor
1967
1968
    return load_factor_dict
1969
1970
1971
def building_area_range_per_cap_range(
1972
    mastr_gdf: gpd.GeoDataFrame,
1973
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1974
    min_building_size: int | float = 10.0,
1975
    upper_quantile: float = 0.95,
1976
    lower_quantile: float = 0.05,
1977
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1978
    """
1979
    Estimate normal building area range per capacity range.
1980
    Calculate the mean roof load factor per capacity range from existing PV
1981
    plants.
1982
    Parameters
1983
    -----------
1984
    mastr_gdf : geopandas.GeoDataFrame
1985
        GeoDataFrame containing geocoded MaStR data.
1986
    cap_ranges : list(tuple(int, int))
1987
        List of capacity ranges to distinguish between. The first tuple should
1988
        start with a zero and the last one should end with infinite.
1989
    min_building_size : int, float
1990
        Minimal building size to consider for PV plants.
1991
    upper_quantile : float
1992
        Upper quantile to estimate maximum building size per capacity range.
1993
    lower_quantile : float
1994
        Lower quantile to estimate minimum building size per capacity range.
1995
    Returns
1996
    -------
1997
    dict
1998
        Dictionary with estimated normal building area range per capacity
1999
        range.
2000
    """
2001
    if cap_ranges is None:
2002
        cap_ranges = [
2003
            (0, 30),
2004
            (30, 100),
2005
            (100, float("inf")),
2006
        ]
2007
2008
    building_area_range_dict = {}
2009
2010
    n_ranges = len(cap_ranges)
2011
2012
    for count, cap_range in enumerate(cap_ranges):
2013
        cap_range_gdf = mastr_gdf.loc[
2014
            (mastr_gdf.capacity > cap_range[0])
2015
            & (mastr_gdf.capacity <= cap_range[1])
2016
        ]
2017
2018
        if count == 0:
2019
            building_area_range_dict[cap_range] = (
2020
                min_building_size,
2021
                cap_range_gdf.building_area.quantile(upper_quantile),
2022
            )
2023
        elif count == n_ranges - 1:
2024
            building_area_range_dict[cap_range] = (
2025
                cap_range_gdf.building_area.quantile(lower_quantile),
2026
                float("inf"),
2027
            )
2028
        else:
2029
            building_area_range_dict[cap_range] = (
2030
                cap_range_gdf.building_area.quantile(lower_quantile),
2031
                cap_range_gdf.building_area.quantile(upper_quantile),
2032
            )
2033
2034
    values = list(building_area_range_dict.values())
2035
2036
    building_area_range_normed_dict = {}
2037
2038
    for count, (cap_range, (min_area, max_area)) in enumerate(
2039
        building_area_range_dict.items()
2040
    ):
2041
        if count == 0:
2042
            building_area_range_normed_dict[cap_range] = (
2043
                min_area,
2044
                np.mean((values[count + 1][0], max_area)),
2045
            )
2046
        elif count == n_ranges - 1:
2047
            building_area_range_normed_dict[cap_range] = (
2048
                np.mean((values[count - 1][1], min_area)),
2049
                max_area,
2050
            )
2051
        else:
2052
            building_area_range_normed_dict[cap_range] = (
2053
                np.mean((values[count - 1][1], min_area)),
2054
                np.mean((values[count + 1][0], max_area)),
2055
            )
2056
2057
    return building_area_range_normed_dict
2058
2059
2060
@timer_func
2061
def desaggregate_pv_in_mv_grid(
2062
    buildings_gdf: gpd.GeoDataFrame,
2063
    pv_cap: float | int,
2064
    **kwargs,
2065
) -> gpd.GeoDataFrame:
2066
    """
2067
    Desaggregate PV capacity on buildings within a given grid district.
2068
    Parameters
2069
    -----------
2070
    buildings_gdf : geopandas.GeoDataFrame
2071
        GeoDataFrame containing buildings within the grid district.
2072
    pv_cap : float, int
2073
        PV capacity to desaggregate.
2074
    Other Parameters
2075
    -----------
2076
    prob_dict : dict
2077
        Dictionary with values and probabilities per capacity range.
2078
    cap_share_dict : dict
2079
        Dictionary with share of PV capacity from the total PV capacity within
2080
        capacity ranges.
2081
    building_area_range_dict : dict
2082
        Dictionary with estimated normal building area range per capacity
2083
        range.
2084
    load_factor_dict : dict
2085
        Dictionary with mean roof load factor per capacity range.
2086
    seed : int
2087
        Seed to use for random operations with NumPy and pandas.
2088
    pv_cap_per_sq_m : float, int
2089
        Average expected, installable PV capacity per square meter.
2090
    Returns
2091
    -------
2092
    geopandas.GeoDataFrame
2093
        GeoDataFrame containing OSM building data with desaggregated PV
2094
        plants.
2095
    """
2096
    bus_id = int(buildings_gdf.bus_id.iat[0])
2097
2098
    rng = default_rng(seed=kwargs["seed"])
2099
    random_state = RandomState(seed=kwargs["seed"])
2100
2101
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2102
2103
    for cap_range, share in kwargs["cap_share_dict"].items():
2104
        pv_cap_range = pv_cap * share
2105
2106
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2107
2108
        cap_range_buildings_gdf = buildings_gdf.loc[
2109
            ~buildings_gdf.index.isin(results_df.index)
2110
            & (buildings_gdf.building_area > b_area_min)
2111
            & (buildings_gdf.building_area <= b_area_max)
2112
        ]
2113
2114
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2115
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2116
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2117
            load_factor=np.nan,
2118
            capacity=np.nan,
2119
        )
2120
2121
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2122
2123
        if total_mean_cap == 0:
2124
            logger.warning(
2125
                f"There are no matching roof for capacity range {cap_range} "
2126
                f"kW in grid {bus_id}. Using all buildings as fallback."
2127
            )
2128
2129
            cap_range_buildings_gdf = buildings_gdf.loc[
2130
                ~buildings_gdf.index.isin(results_df.index)
2131
            ]
2132
2133
            if len(cap_range_buildings_gdf) == 0:
2134
                logger.warning(
2135
                    "There are no roofes available for capacity range "
2136
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2137
                )
2138
                cap_range_buildings_gdf = buildings_gdf.copy()
2139
2140
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2141
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2142
                load_factor=np.nan,
2143
                capacity=np.nan,
2144
            )
2145
2146
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2147
2148
        elif total_mean_cap < pv_cap_range:
2149
            logger.warning(
2150
                f"Average roof utilization of the roof area in grid {bus_id} "
2151
                f"and capacity range {cap_range} kW is not sufficient. The "
2152
                "roof utilization will be above average."
2153
            )
2154
2155
        frac = max(
2156
            pv_cap_range / total_mean_cap,
2157
            1 / len(cap_range_buildings_gdf),
2158
        )
2159
2160
        samples_gdf = cap_range_buildings_gdf.sample(
2161
            frac=min(1, frac),
2162
            random_state=random_state,
2163
        )
2164
2165
        cap_range_dict = kwargs["prob_dict"][cap_range]
2166
2167
        values_dict = cap_range_dict["values"]
2168
        p_dict = cap_range_dict["probabilities"]
2169
2170
        load_factors = rng.choice(
2171
            a=values_dict["load_factor"],
2172
            size=len(samples_gdf),
2173
            p=p_dict["load_factor"],
2174
        )
2175
2176
        samples_gdf = samples_gdf.assign(
2177
            load_factor=load_factors,
2178
            capacity=(
2179
                samples_gdf.building_area
2180
                * load_factors
2181
                * kwargs["pv_cap_per_sq_m"]
2182
            ).clip(lower=0.4),
2183
        )
2184
2185
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2186
2187
        samples_gdf = samples_gdf.assign(
2188
            capacity=(samples_gdf.capacity * missing_factor),
2189
            load_factor=(samples_gdf.load_factor * missing_factor),
2190
        )
2191
2192
        assert np.isclose(
2193
            samples_gdf.capacity.sum(),
2194
            pv_cap_range,
2195
            rtol=1e-03,
2196
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2197
2198
        results_df = pd.concat(
2199
            [
2200
                results_df,
2201
                samples_gdf,
2202
            ],
2203
        )
2204
2205
    total_missing_factor = pv_cap / results_df.capacity.sum()
2206
2207
    results_df = results_df.assign(
2208
        capacity=(results_df.capacity * total_missing_factor),
2209
    )
2210
2211
    assert np.isclose(
2212
        results_df.capacity.sum(),
2213
        pv_cap,
2214
        rtol=1e-03,
2215
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2216
2217
    return gpd.GeoDataFrame(
2218
        results_df,
2219
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2103 is not entered. Are you sure this can never be the case?
Loading history...
2220
        geometry="geom",
2221
    )
2222
2223
2224
@timer_func
2225
def desaggregate_pv(
2226
    buildings_gdf: gpd.GeoDataFrame,
2227
    cap_df: pd.DataFrame,
2228
    **kwargs,
2229
) -> gpd.GeoDataFrame:
2230
    """
2231
    Desaggregate PV capacity on buildings within a given grid district.
2232
    Parameters
2233
    -----------
2234
    buildings_gdf : geopandas.GeoDataFrame
2235
        GeoDataFrame containing OSM buildings data.
2236
    cap_df : pandas.DataFrame
2237
        DataFrame with total rooftop capacity per mv grid.
2238
    Other Parameters
2239
    -----------
2240
    prob_dict : dict
2241
        Dictionary with values and probabilities per capacity range.
2242
    cap_share_dict : dict
2243
        Dictionary with share of PV capacity from the total PV capacity within
2244
        capacity ranges.
2245
    building_area_range_dict : dict
2246
        Dictionary with estimated normal building area range per capacity
2247
        range.
2248
    load_factor_dict : dict
2249
        Dictionary with mean roof load factor per capacity range.
2250
    seed : int
2251
        Seed to use for random operations with NumPy and pandas.
2252
    pv_cap_per_sq_m : float, int
2253
        Average expected, installable PV capacity per square meter.
2254
    Returns
2255
    -------
2256
    geopandas.GeoDataFrame
2257
        GeoDataFrame containing OSM building data with desaggregated PV
2258
        plants.
2259
    """
2260
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2261
2262
    building_bus_ids = set(buildings_gdf.bus_id)
2263
    cap_bus_ids = set(cap_df.index)
2264
2265
    logger.debug(
2266
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from capacity: "
2267
        f"{len(cap_bus_ids)}"
2268
    )
2269
2270
    if len(building_bus_ids) > len(cap_bus_ids):
2271
        missing = building_bus_ids - cap_bus_ids
2272
    else:
2273
        missing = cap_bus_ids - building_bus_ids
2274
2275
    logger.debug(str(missing))
2276
2277
    bus_ids = np.intersect1d(list(building_bus_ids), list(cap_bus_ids))
2278
2279
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2280
2281
    for bus_id in bus_ids:
2282
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2283
2284
        pv_installed_gdf = buildings_grid_gdf.loc[
2285
            ~buildings_grid_gdf.end_of_life
2286
        ]
2287
2288
        pv_installed = pv_installed_gdf.capacity.sum()
2289
2290
        pot_buildings_gdf = buildings_grid_gdf.drop(
2291
            index=pv_installed_gdf.index
2292
        )
2293
2294
        if len(pot_buildings_gdf) == 0:
2295
            logger.error(
2296
                f"In grid {bus_id} there are no potential buildings to allocate "
2297
                "PV capacity to. The grid is skipped. This message should only "
2298
                "appear doing test runs with few buildings."
2299
            )
2300
2301
            continue
2302
2303
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2304
2305
        logger.debug(f"pv_target: {pv_target}")
2306
2307
        pv_missing = pv_target - pv_installed
2308
2309
        if pv_missing <= 0:
2310
            logger.info(
2311
                f"In grid {bus_id} there is more PV installed ({pv_installed: g}) in "
2312
                f"status Quo than allocated within the scenario ({pv_target: g}). No "
2313
                f"new generators are added."
2314
            )
2315
2316
            continue
2317
2318
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2319
            logger.error(
2320
                f"In grid {bus_id} there is less PV potential ("
2321
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV  capacity "
2322
                f"({pv_missing:g} kW). The average roof utilization will be very high."
2323
            )
2324
2325
        gdf = desaggregate_pv_in_mv_grid(
2326
            buildings_gdf=pot_buildings_gdf,
2327
            pv_cap=pv_missing,
2328
            **kwargs,
2329
        )
2330
2331
        logger.debug(f"New cap in grid {bus_id}: {gdf.capacity.sum()}")
2332
        logger.debug(f"Installed cap in grid {bus_id}: {pv_installed}")
2333
        logger.debug(
2334
            f"Total cap in grid {bus_id}: {gdf.capacity.sum() + pv_installed}"
2335
        )
2336
2337
        if not np.isclose(
2338
            gdf.capacity.sum() + pv_installed, pv_target, rtol=1e-3
2339
        ):
2340
            logger.warning(
2341
                f"The desired capacity and actual capacity in grid {bus_id} differ.\n"
2342
                f"Desired cap: {pv_target}\nActual cap: "
2343
                f"{gdf.capacity.sum() + pv_installed}"
2344
            )
2345
2346
        pre_cap = allocated_buildings_gdf.capacity.sum()
2347
        new_cap = gdf.capacity.sum()
2348
2349
        allocated_buildings_gdf = pd.concat(
2350
            [
2351
                allocated_buildings_gdf,
2352
                gdf,
2353
            ]
2354
        )
2355
2356
        total_cap = allocated_buildings_gdf.capacity.sum()
2357
2358
        assert np.isclose(pre_cap + new_cap, total_cap)
2359
2360
    logger.debug("Desaggregated scenario.")
2361
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2362
    logger.debug(
2363
        f"Generator capacity: {allocated_buildings_gdf.capacity.sum() / 1000: g}"
2364
    )
2365
2366
    return gpd.GeoDataFrame(
2367
        allocated_buildings_gdf,
2368
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2369
        geometry="geom",
2370
    )
2371
2372
2373
@timer_func
2374
def add_buildings_meta_data(
2375
    buildings_gdf: gpd.GeoDataFrame,
2376
    prob_dict: dict,
2377
    seed: int,
2378
) -> gpd.GeoDataFrame:
2379
    """
2380
    Randomly add additional metadata to desaggregated PV plants.
2381
    Parameters
2382
    -----------
2383
    buildings_gdf : geopandas.GeoDataFrame
2384
        GeoDataFrame containing OSM buildings data with desaggregated PV
2385
        plants.
2386
    prob_dict : dict
2387
        Dictionary with values and probabilities per capacity range.
2388
    seed : int
2389
        Seed to use for random operations with NumPy and pandas.
2390
    Returns
2391
    -------
2392
    geopandas.GeoDataFrame
2393
        GeoDataFrame containing OSM building data with desaggregated PV
2394
        plants.
2395
    """
2396
    rng = default_rng(seed=seed)
2397
    buildings_gdf = buildings_gdf.reset_index().rename(
2398
        columns={
2399
            "index": "building_id",
2400
        }
2401
    )
2402
2403
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2404
        cap_range_gdf = buildings_gdf.loc[
2405
            (buildings_gdf.capacity >= min_cap)
2406
            & (buildings_gdf.capacity < max_cap)
2407
        ]
2408
2409
        for key, values in cap_range_prob_dict["values"].items():
2410
            if key == "load_factor":
2411
                continue
2412
2413
            gdf = cap_range_gdf.loc[
2414
                cap_range_gdf[key].isna()
2415
                | cap_range_gdf[key].isnull()
2416
                | (cap_range_gdf[key] == "None")
2417
            ]
2418
2419
            key_vals = rng.choice(
2420
                a=values,
2421
                size=len(gdf),
2422
                p=cap_range_prob_dict["probabilities"][key],
2423
            )
2424
2425
            buildings_gdf.loc[gdf.index, key] = key_vals
2426
2427
    return buildings_gdf
2428
2429
2430
def add_voltage_level(
2431
    buildings_gdf: gpd.GeoDataFrame,
2432
) -> gpd.GeoDataFrame:
2433
    """
2434
    Add voltage level derived from generator capacity to the power plants.
2435
    Parameters
2436
    -----------
2437
    buildings_gdf : geopandas.GeoDataFrame
2438
        GeoDataFrame containing OSM buildings data with desaggregated PV
2439
        plants.
2440
    Returns
2441
    -------
2442
    geopandas.GeoDataFrame
2443
        GeoDataFrame containing OSM building data with voltage level per generator.
2444
    """
2445
2446
    def voltage_levels(p: float) -> int:
2447
        if p < 100:
2448
            return 7
2449
        elif p < 200:
2450
            return 6
2451
        elif p < 5500:
2452
            return 5
2453
        elif p < 20000:
2454
            return 4
2455
        elif p < 120000:
2456
            return 3
2457
        return 1
2458
2459
    return buildings_gdf.assign(
2460
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2461
    )
2462
2463
2464
def add_start_up_date(
2465
    buildings_gdf: gpd.GeoDataFrame,
2466
    start: pd.Timestamp,
2467
    end: pd.Timestamp,
2468
    seed: int,
2469
):
2470
    """
2471
    Randomly and linear add start-up date to new pv generators.
2472
    Parameters
2473
    ----------
2474
    buildings_gdf : geopandas.GeoDataFrame
2475
        GeoDataFrame containing OSM buildings data with desaggregated PV
2476
        plants.
2477
    start : pandas.Timestamp
2478
        Minimum Timestamp to use.
2479
    end : pandas.Timestamp
2480
        Maximum Timestamp to use.
2481
    seed : int
2482
        Seed to use for random operations with NumPy and pandas.
2483
    Returns
2484
    -------
2485
    geopandas.GeoDataFrame
2486
        GeoDataFrame containing OSM buildings data with start-up date added.
2487
    """
2488
    rng = default_rng(seed=seed)
2489
2490
    date_range = pd.date_range(start=start, end=end, freq="1D")
2491
2492
    return buildings_gdf.assign(
2493
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2494
    )
2495
2496
2497
@timer_func
2498
def allocate_scenarios(
2499
    mastr_gdf: gpd.GeoDataFrame,
2500
    valid_buildings_gdf: gpd.GeoDataFrame,
2501
    last_scenario_gdf: gpd.GeoDataFrame,
2502
    scenario: str,
2503
):
2504
    """
2505
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2506
    Parameters
2507
    ----------
2508
    mastr_gdf : geopandas.GeoDataFrame
2509
        GeoDataFrame containing geocoded MaStR data.
2510
    valid_buildings_gdf : geopandas.GeoDataFrame
2511
        GeoDataFrame containing OSM buildings data.
2512
    last_scenario_gdf : geopandas.GeoDataFrame
2513
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2514
        preceding scenario.
2515
    scenario : str
2516
        Scenario to desaggrgate and allocate.
2517
    Returns
2518
    -------
2519
    tuple
2520
        geopandas.GeoDataFrame
2521
            GeoDataFrame containing OSM buildings matched with pv generators.
2522
        pandas.DataFrame
2523
            DataFrame containing pv rooftop capacity per grid id.
2524
    """
2525
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2526
2527
    logger.debug(
2528
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2529
    )
2530
2531
    last_scenario_gdf = determine_end_of_life_gens(
2532
        last_scenario_gdf,
2533
        SCENARIO_TIMESTAMP[scenario],
2534
        PV_ROOFTOP_LIFETIME,
2535
    )
2536
2537
    buildings_gdf = calculate_max_pv_cap_per_building(
2538
        valid_buildings_gdf,
2539
        last_scenario_gdf,
2540
        PV_CAP_PER_SQ_M,
2541
        ROOF_FACTOR,
2542
    )
2543
2544
    mastr_gdf = calculate_building_load_factor(
2545
        mastr_gdf,
2546
        buildings_gdf,
2547
    )
2548
2549
    probabilities_dict = probabilities(
2550
        mastr_gdf,
2551
        cap_ranges=CAP_RANGES,
2552
    )
2553
2554
    cap_share_dict = cap_share_per_cap_range(
2555
        mastr_gdf,
2556
        cap_ranges=CAP_RANGES,
2557
    )
2558
2559
    load_factor_dict = mean_load_factor_per_cap_range(
2560
        mastr_gdf,
2561
        cap_ranges=CAP_RANGES,
2562
    )
2563
2564
    building_area_range_dict = building_area_range_per_cap_range(
2565
        mastr_gdf,
2566
        cap_ranges=CAP_RANGES,
2567
        min_building_size=MIN_BUILDING_SIZE,
2568
        upper_quantile=UPPER_QUNATILE,
2569
        lower_quantile=LOWER_QUANTILE,
2570
    )
2571
2572
    allocated_buildings_gdf = desaggregate_pv(
2573
        buildings_gdf=buildings_gdf,
2574
        cap_df=cap_per_bus_id_df,
2575
        prob_dict=probabilities_dict,
2576
        cap_share_dict=cap_share_dict,
2577
        building_area_range_dict=building_area_range_dict,
2578
        load_factor_dict=load_factor_dict,
2579
        seed=SEED,
2580
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2581
    )
2582
2583
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2584
2585
    meta_buildings_gdf = frame_to_numeric(
2586
        add_buildings_meta_data(
2587
            allocated_buildings_gdf,
2588
            probabilities_dict,
2589
            SEED,
2590
        )
2591
    )
2592
2593
    return (
2594
        add_start_up_date(
2595
            meta_buildings_gdf,
2596
            start=last_scenario_gdf.start_up_date.max(),
2597
            end=SCENARIO_TIMESTAMP[scenario],
2598
            seed=SEED,
2599
        ),
2600
        cap_per_bus_id_df,
2601
    )
2602
2603
2604
class EgonPowerPlantPvRoofBuildingScenario(Base):
2605
    __tablename__ = "egon_power_plants_pv_roof_building"
2606
    __table_args__ = {"schema": "supply"}
2607
2608
    index = Column(Integer, primary_key=True, index=True)
2609
    scenario = Column(String)
2610
    bus_id = Column(Integer, nullable=True)
2611
    building_id = Column(Integer)
2612
    gens_id = Column(String, nullable=True)
2613
    capacity = Column(Float)
2614
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2615
    hauptausrichtung = Column(String)
2616
    hauptausrichtung_neigungswinkel = Column(String)
2617
    voltage_level = Column(Integer)
2618
    weather_cell_id = Column(Integer)
2619
2620
2621
def create_scenario_table(buildings_gdf):
2622
    """Create mapping table pv_unit <-> building for scenario"""
2623
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2624
        bind=engine, checkfirst=True
2625
    )
2626
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2627
        bind=engine, checkfirst=True
2628
    )
2629
2630
    buildings_gdf.rename(columns=COLS_TO_RENAME).assign(
2631
        capacity=buildings_gdf.capacity.div(10**3)  # kW -> MW
2632
    )[COLS_TO_EXPORT].reset_index().to_sql(
2633
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2634
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2635
        con=db.engine(),
2636
        if_exists="append",
2637
        index=False,
2638
    )
2639
2640
2641
def geocode_mastr_data():
2642
    """
2643
    Read PV rooftop data from MaStR CSV
2644
    TODO: the source will be replaced as soon as the MaStR data is available
2645
     in DB.
2646
    """
2647
    mastr_df = mastr_data(
2648
        MASTR_INDEX_COL,
2649
        MASTR_RELEVANT_COLS,
2650
        MASTR_DTYPES,
2651
        MASTR_PARSE_DATES,
2652
    )
2653
2654
    clean_mastr_df = clean_mastr_data(
2655
        mastr_df,
2656
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2657
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2658
        seed=SEED,
2659
        rounding=ROUNDING,
2660
    )
2661
2662
    geocoding_df = geocoding_data(clean_mastr_df)
2663
2664
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2665
2666
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2667
2668
    create_geocoded_table(geocode_gdf)
2669
2670
2671
def add_weather_cell_id(buildings_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
2672
    sql = """
2673
    SELECT building_id, zensus_population_id
2674
    FROM boundaries.egon_map_zensus_mvgd_buildings
2675
    """
2676
2677
    buildings_gdf = buildings_gdf.merge(
2678
        right=db.select_dataframe(sql),
2679
        how="left",
2680
        on="building_id",
2681
    )
2682
2683
    sql = """
2684
    SELECT zensus_population_id, w_id as weather_cell_id
2685
    FROM boundaries.egon_map_zensus_weather_cell
2686
    """
2687
2688
    buildings_gdf = buildings_gdf.merge(
2689
        right=db.select_dataframe(sql),
2690
        how="left",
2691
        on="zensus_population_id",
2692
    )
2693
2694
    if buildings_gdf.weather_cell_id.isna().any():
2695
        missing = buildings_gdf.loc[
2696
            buildings_gdf.weather_cell_id.isna()
2697
        ].building_id.tolist()
2698
        raise ValueError(
2699
            f"Following buildings don't have a weather cell id: {missing}"
2700
        )
2701
2702
    return buildings_gdf
2703
2704
2705
def pv_rooftop_to_buildings():
2706
    """Main script, executed as task"""
2707
2708
    mastr_gdf = load_mastr_data()
2709
2710
    buildings_gdf = load_building_data()
2711
2712
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2713
        mastr_gdf, buildings_gdf
2714
    )
2715
2716
    all_buildings_gdf = (
2717
        desagg_mastr_gdf.assign(scenario="status_quo")
2718
        .reset_index()
2719
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2720
    )
2721
2722
    scenario_buildings_gdf = all_buildings_gdf.copy()
2723
2724
    cap_per_bus_id_df = pd.DataFrame()
2725
2726
    for scenario in SCENARIOS:
2727
        logger.debug(f"Desaggregating scenario {scenario}.")
2728
        (
2729
            scenario_buildings_gdf,
2730
            cap_per_bus_id_scenario_df,
2731
        ) = allocate_scenarios(  # noqa: F841
2732
            desagg_mastr_gdf,
2733
            desagg_buildings_gdf,
2734
            scenario_buildings_gdf,
2735
            scenario,
2736
        )
2737
2738
        all_buildings_gdf = gpd.GeoDataFrame(
2739
            pd.concat(
2740
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2741
            ),
2742
            crs=scenario_buildings_gdf.crs,
2743
            geometry="geom",
2744
        )
2745
2746
        cap_per_bus_id_df = pd.concat(
2747
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2748
        )
2749
2750
    # add weather cell
2751
    all_buildings_gdf = add_weather_cell_id(all_buildings_gdf)
2752
2753
    # export scenario
2754
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2755