Passed
Pull Request — dev (#934)
by
unknown
01:33
created

probabilities()   B

Complexity

Conditions 5

Size

Total Lines 57
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 29
dl 0
loc 57
rs 8.7173
c 0
b 0
f 0
cc 5
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035"]  # , "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(30 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "building_id",
193
    "gens_id",
194
    "capacity",
195
    "einheitliche_ausrichtung_und_neigungswinkel",
196
    "hauptausrichtung",
197
    "hauptausrichtung_neigungswinkel",
198
    "voltage_level",
199
]
200
201
# TODO
202
INCLUDE_SYNTHETIC_BUILDINGS = True
203
ONLY_BUILDINGS_WITH_DEMAND = True
204
TEST_RUN = False
205
206
207
def timer_func(func):
208
    @wraps(func)
209
    def timeit_wrapper(*args, **kwargs):
210
        start_time = perf_counter()
211
        result = func(*args, **kwargs)
212
        end_time = perf_counter()
213
        total_time = end_time - start_time
214
        logger.debug(
215
            f"Function {func.__name__} took {total_time:.4f} seconds."
216
        )
217
        return result
218
219
    return timeit_wrapper
220
221
222
@timer_func
223
def mastr_data(
224
    index_col: str | int | list[str] | list[int],
225
    usecols: list[str],
226
    dtype: dict[str, Any] | None,
227
    parse_dates: list[str] | None,
228
) -> pd.DataFrame:
229
    """
230
    Read MaStR data from csv.
231
232
    Parameters
233
    -----------
234
    index_col : str, int or list of str or int
235
        Column(s) to use as the row labels of the DataFrame.
236
    usecols : list of str
237
        Return a subset of the columns.
238
    dtype : dict of column (str) -> type (any), optional
239
        Data type for data or columns.
240
    parse_dates : list of names (str), optional
241
        Try to parse given columns to datetime.
242
    Returns
243
    -------
244
    pandas.DataFrame
245
        DataFrame containing MaStR data.
246
    """
247
    mastr_path = Path(
248
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
249
    ).resolve()
250
251
    mastr_df = pd.read_csv(
252
        mastr_path,
253
        index_col=index_col,
254
        usecols=usecols,
255
        dtype=dtype,
256
        parse_dates=parse_dates,
257
    )
258
259
    mastr_df = mastr_df.loc[
260
        (mastr_df.StatisikFlag == "B")
261
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
262
        & (mastr_df.Land == "Deutschland")
263
        & (mastr_df.Lage == "BaulicheAnlagen")
264
    ]
265
266
    if (
267
        config.settings()["egon-data"]["--dataset-boundary"]
268
        == "Schleswig-Holstein"
269
    ):
270
        init_len = len(mastr_df)
271
272
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
273
274
        logger.info(
275
            f"Using only MaStR data within Schleswig-Holstein. "
276
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
277
        )
278
279
    logger.debug("MaStR data loaded.")
280
281
    return mastr_df
282
283
284
@timer_func
285
def clean_mastr_data(
286
    mastr_df: pd.DataFrame,
287
    max_realistic_pv_cap: int | float,
288
    min_realistic_pv_cap: int | float,
289
    rounding: int,
290
    seed: int,
291
) -> pd.DataFrame:
292
    """
293
    Clean the MaStR data from implausible data.
294
295
    * Drop MaStR ID duplicates.
296
    * Drop generators with implausible capacities.
297
    * Drop generators without any kind of start-up date.
298
    * Clean up Standort column and capacity.
299
300
    Parameters
301
    -----------
302
    mastr_df : pandas.DataFrame
303
        DataFrame containing MaStR data.
304
    max_realistic_pv_cap : int or float
305
        Maximum capacity, which is considered to be realistic.
306
    min_realistic_pv_cap : int or float
307
        Minimum capacity, which is considered to be realistic.
308
    rounding : int
309
        Rounding to use when cleaning up capacity. E.g. when
310
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
311
    seed : int
312
        Seed to use for random operations with NumPy and pandas.
313
    Returns
314
    -------
315
    pandas.DataFrame
316
        DataFrame containing cleaned MaStR data.
317
    """
318
    init_len = len(mastr_df)
319
320
    # drop duplicates
321
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
322
323
    # drop invalid entries in standort
324
    index_to_drop = mastr_df.loc[
325
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
326
    ].index
327
328
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
329
330
    df = mastr_df[
331
        [
332
            "Bruttoleistung",
333
            "Bruttoleistung_extended",
334
            "Nettonennleistung",
335
            "zugeordneteWirkleistungWechselrichter",
336
            "InstallierteLeistung",
337
        ]
338
    ].round(rounding)
339
340
    # use only the smallest capacity rating if multiple are given
341
    mastr_df = mastr_df.assign(
342
        capacity=[
343
            most_plausible(p_tub, min_realistic_pv_cap)
344
            for p_tub in df.itertuples(index=False)
345
        ]
346
    )
347
348
    # drop generators without any capacity info
349
    # and capacity of zero
350
    # and if the capacity is > 23.5 MW, because
351
    # Germanies largest rooftop PV is 23 MW
352
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
353
    mastr_df = mastr_df.loc[
354
        (~mastr_df.capacity.isna())
355
        & (mastr_df.capacity <= max_realistic_pv_cap)
356
        & (mastr_df.capacity > min_realistic_pv_cap)
357
    ]
358
359
    # get zip and municipality
360
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
361
        mastr_df.Standort.astype(str)
362
        .apply(
363
            zip_and_municipality_from_standort,
364
            args=(VERBOSE,),
365
        )
366
        .tolist(),
367
        index=mastr_df.index,
368
    )
369
370
    # drop invalid entries
371
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
372
373
    # add ", Deutschland" just in case
374
    mastr_df = mastr_df.assign(
375
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
376
    )
377
378
    # get consistent start-up date
379
    mastr_df = mastr_df.assign(
380
        start_up_date=mastr_df.Inbetriebnahmedatum,
381
    )
382
383
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
384
        mastr_df.start_up_date.isna()
385
    ].assign(
386
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
387
            mastr_df.start_up_date.isna()
388
        ]
389
    )
390
391
    # randomly and weighted fill missing start-up dates
392
    pool = mastr_df.loc[
393
        ~mastr_df.start_up_date.isna()
394
    ].start_up_date.to_numpy()
395
396
    size = len(mastr_df) - len(pool)
397
398
    if size > 0:
399
        np.random.seed(seed)
400
401
        choice = np.random.choice(
402
            pool,
403
            size=size,
404
            replace=False,
405
        )
406
407
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
408
            mastr_df.start_up_date.isna()
409
        ].assign(start_up_date=choice)
410
411
        logger.info(
412
            f"Randomly and weigthed added start-up date to {size} generators."
413
        )
414
415
    mastr_df = mastr_df.assign(
416
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
417
    )
418
419
    end_len = len(mastr_df)
420
    logger.debug(
421
        f"Dropped {init_len - end_len} "
422
        f"({((init_len - end_len) / init_len) * 100:g}%)"
423
        f" of {init_len} rows from MaStR DataFrame."
424
    )
425
426
    return mastr_df
427
428
429
def zip_and_municipality_from_standort(
430
    standort: str,
431
    verbose: bool = False,
432
) -> tuple[str, bool]:
433
    """
434
    Get zip code and municipality from Standort string split into a list.
435
    Parameters
436
    -----------
437
    standort : str
438
        Standort as given from MaStR data.
439
    verbose : bool
440
        Logs additional info if True.
441
    Returns
442
    -------
443
    str
444
        Standort with only the zip code and municipality
445
        as well a ', Germany' added.
446
    """
447
    if verbose:
448
        logger.debug(f"Uncleaned String: {standort}")
449
450
    standort_list = standort.split()
451
452
    found = False
453
    count = 0
454
455
    for count, elem in enumerate(standort_list):
456
        if len(elem) != 5:
457
            continue
458
        if not elem.isnumeric():
459
            continue
460
461
        found = True
462
463
        break
464
465
    if found:
466
        cleaned_str = " ".join(standort_list[count:])
467
468
        if verbose:
469
            logger.debug(f"Cleaned String:   {cleaned_str}")
470
471
        return cleaned_str, found
472
473
    logger.warning(
474
        "Couldn't identify zip code. This entry will be dropped."
475
        f" Original standort: {standort}."
476
    )
477
478
    return standort, found
479
480
481
def most_plausible(
482
    p_tub: tuple,
483
    min_realistic_pv_cap: int | float,
484
) -> float:
485
    """
486
    Try to determine the most plausible capacity.
487
    Try to determine the most plausible capacity from a given
488
    generator from MaStR data.
489
    Parameters
490
    -----------
491
    p_tub : tuple
492
        Tuple containing the different capacities given in
493
        the MaStR data.
494
    min_realistic_pv_cap : int or float
495
        Minimum capacity, which is considered to be realistic.
496
    Returns
497
    -------
498
    float
499
        Capacity of the generator estimated as the most realistic.
500
    """
501
    count = Counter(p_tub).most_common(3)
502
503
    if len(count) == 1:
504
        return count[0][0]
505
506
    val1 = count[0][0]
507
    val2 = count[1][0]
508
509
    if len(count) == 2:
510
        min_val = min(val1, val2)
511
        max_val = max(val1, val2)
512
    else:
513
        val3 = count[2][0]
514
515
        min_val = min(val1, val2, val3)
516
        max_val = max(val1, val2, val3)
517
518
    if min_val < min_realistic_pv_cap:
519
        return max_val
520
521
    return min_val
522
523
524
def geocoder(
525
    user_agent: str,
526
    min_delay_seconds: int,
527
) -> RateLimiter:
528
    """
529
    Setup Nominatim geocoding class.
530
    Parameters
531
    -----------
532
    user_agent : str
533
        The app name.
534
    min_delay_seconds : int
535
        Delay in seconds to use between requests to Nominatim.
536
        A minimum of 1 is advised.
537
    Returns
538
    -------
539
    geopy.extra.rate_limiter.RateLimiter
540
        Nominatim RateLimiter geocoding class to use for geocoding.
541
    """
542
    locator = Nominatim(user_agent=user_agent)
543
    return RateLimiter(
544
        locator.geocode,
545
        min_delay_seconds=min_delay_seconds,
546
    )
547
548
549
def geocoding_data(
550
    clean_mastr_df: pd.DataFrame,
551
) -> pd.DataFrame:
552
    """
553
    Setup DataFrame to geocode.
554
    Parameters
555
    -----------
556
    clean_mastr_df : pandas.DataFrame
557
        DataFrame containing cleaned MaStR data.
558
    Returns
559
    -------
560
    pandas.DataFrame
561
        DataFrame containing all unique combinations of
562
        zip codes with municipalities for geocoding.
563
    """
564
    return pd.DataFrame(
565
        data=clean_mastr_df.zip_and_municipality.unique(),
566
        columns=["zip_and_municipality"],
567
    )
568
569
570
@timer_func
571
def geocode_data(
572
    geocoding_df: pd.DataFrame,
573
    ratelimiter: RateLimiter,
574
    epsg: int,
575
) -> gpd.GeoDataFrame:
576
    """
577
    Geocode zip code and municipality.
578
    Extract latitude, longitude and altitude.
579
    Transfrom latitude and longitude to shapely
580
    Point and return a geopandas GeoDataFrame.
581
    Parameters
582
    -----------
583
    geocoding_df : pandas.DataFrame
584
        DataFrame containing all unique combinations of
585
        zip codes with municipalities for geocoding.
586
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
587
        Nominatim RateLimiter geocoding class to use for geocoding.
588
    epsg : int
589
        EPSG ID to use as CRS.
590
    Returns
591
    -------
592
    geopandas.GeoDataFrame
593
        GeoDataFrame containing all unique combinations of
594
        zip codes with municipalities with matching geolocation.
595
    """
596
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
597
598
    geocode_df = geocoding_df.assign(
599
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
600
    )
601
602
    geocode_df = geocode_df.assign(
603
        point=geocode_df.location.apply(
604
            lambda loc: tuple(loc.point) if loc else None
605
        )
606
    )
607
608
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
609
        geocode_df.point.tolist(), index=geocode_df.index
610
    )
611
612
    return gpd.GeoDataFrame(
613
        geocode_df,
614
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
615
        crs=f"EPSG:{epsg}",
616
    )
617
618
619
def merge_geocode_with_mastr(
620
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
621
) -> gpd.GeoDataFrame:
622
    """
623
    Merge geometry to original mastr data.
624
    Parameters
625
    -----------
626
    clean_mastr_df : pandas.DataFrame
627
        DataFrame containing cleaned MaStR data.
628
    geocode_gdf : geopandas.GeoDataFrame
629
        GeoDataFrame containing all unique combinations of
630
        zip codes with municipalities with matching geolocation.
631
    Returns
632
    -------
633
    gepandas.GeoDataFrame
634
        GeoDataFrame containing cleaned MaStR data with
635
        matching geolocation from geocoding.
636
    """
637
    return gpd.GeoDataFrame(
638
        clean_mastr_df.merge(
639
            geocode_gdf[["zip_and_municipality", "geometry"]],
640
            how="left",
641
            left_on="zip_and_municipality",
642
            right_on="zip_and_municipality",
643
        ),
644
        crs=geocode_gdf.crs,
645
    ).set_index(clean_mastr_df.index)
646
647
648
def drop_invalid_entries_from_gdf(
649
    gdf: gpd.GeoDataFrame,
650
) -> gpd.GeoDataFrame:
651
    """
652
    Drop invalid entries from geopandas GeoDataFrame.
653
    TODO: how to omit the logging from geos here???
654
    Parameters
655
    -----------
656
    gdf : geopandas.GeoDataFrame
657
        GeoDataFrame to be checked for validity.
658
    Returns
659
    -------
660
    gepandas.GeoDataFrame
661
        GeoDataFrame with rows with invalid geometries
662
        dropped.
663
    """
664
    valid_gdf = gdf.loc[gdf.is_valid]
665
666
    logger.debug(
667
        f"{len(gdf) - len(valid_gdf)} "
668
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
669
        f"of {len(gdf)} values were invalid and are dropped."
670
    )
671
672
    return valid_gdf
673
674
675
@timer_func
676
def municipality_data() -> gpd.GeoDataFrame:
677
    """
678
    Get municipality data from eGo^n Database.
679
    Returns
680
    -------
681
    gepandas.GeoDataFrame
682
        GeoDataFrame with municipality data.
683
    """
684
    with db.session_scope() as session:
685
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
686
687
    return gpd.read_postgis(
688
        query.statement, query.session.bind, index_col="ags"
689
    )
690
691
692
@timer_func
693
def add_ags_to_gens(
694
    valid_mastr_gdf: gpd.GeoDataFrame,
695
    municipalities_gdf: gpd.GeoDataFrame,
696
) -> gpd.GeoDataFrame:
697
    """
698
    Add information about AGS ID to generators.
699
    Parameters
700
    -----------
701
    valid_mastr_gdf : geopandas.GeoDataFrame
702
        GeoDataFrame with valid and cleaned MaStR data.
703
    municipalities_gdf : geopandas.GeoDataFrame
704
        GeoDataFrame with municipality data.
705
    Returns
706
    -------
707
    gepandas.GeoDataFrame
708
        GeoDataFrame with valid and cleaned MaStR data
709
        with AGS ID added.
710
    """
711
    return valid_mastr_gdf.sjoin(
712
        municipalities_gdf,
713
        how="left",
714
        predicate="intersects",
715
    ).rename(columns={"index_right": "ags"})
716
717
718
def drop_gens_outside_muns(
719
    valid_mastr_gdf: gpd.GeoDataFrame,
720
) -> gpd.GeoDataFrame:
721
    """
722
    Drop all generators outside of municipalities.
723
    Parameters
724
    -----------
725
    valid_mastr_gdf : geopandas.GeoDataFrame
726
        GeoDataFrame with valid and cleaned MaStR data.
727
    Returns
728
    -------
729
    gepandas.GeoDataFrame
730
        GeoDataFrame with valid and cleaned MaStR data
731
        with generatos without an AGS ID dropped.
732
    """
733
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
734
735
    logger.debug(
736
        f"{len(valid_mastr_gdf) - len(gdf)} "
737
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
738
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
739
        " and are therefore dropped."
740
    )
741
742
    return gdf
743
744
745
class EgonMastrPvRoofGeocoded(Base):
746
    __tablename__ = "egon_mastr_pv_roof_geocoded"
747
    __table_args__ = {"schema": "supply"}
748
749
    zip_and_municipality = Column(String, primary_key=True, index=True)
750
    location = Column(String)
751
    point = Column(String)
752
    latitude = Column(Float)
753
    longitude = Column(Float)
754
    altitude = Column(Float)
755
    geometry = Column(Geometry(srid=EPSG))
756
757
758
def create_geocoded_table(geocode_gdf):
759
    """
760
    Create geocoded table mastr pv rooftop
761
    Parameters
762
    -----------
763
    geocode_gdf : geopandas.GeoDataFrame
764
        GeoDataFrame containing geocoding information on pv rooftop locations.
765
    """
766
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
767
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
768
769
    geocode_gdf.to_postgis(
770
        name=EgonMastrPvRoofGeocoded.__table__.name,
771
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
772
        con=db.engine(),
773
        if_exists="append",
774
        index=False,
775
        # dtype={}
776
    )
777
778
779
def geocoded_data_from_db(
780
    epsg: str | int,
781
) -> gpd.GeoDataFrame:
782
    """
783
    Read OSM buildings data from eGo^n Database.
784
    Parameters
785
    -----------
786
    to_crs : pyproj.crs.crs.CRS
787
        CRS to transform geometries to.
788
    Returns
789
    -------
790
    geopandas.GeoDataFrame
791
        GeoDataFrame containing OSM buildings data.
792
    """
793
    with db.session_scope() as session:
794
        query = session.query(
795
            EgonMastrPvRoofGeocoded.zip_and_municipality,
796
            EgonMastrPvRoofGeocoded.geometry,
797
        )
798
799
    return gpd.read_postgis(
800
        query.statement, query.session.bind, geom_col="geometry"
801
    ).to_crs(f"EPSG:{epsg}")
802
803
804
def load_mastr_data():
805
    """Read PV rooftop data from MaStR CSV
806
    Note: the source will be replaced as soon as the MaStR data is available
807
    in DB.
808
    Returns
809
    -------
810
    geopandas.GeoDataFrame
811
        GeoDataFrame containing MaStR data with geocoded locations.
812
    """
813
    mastr_df = mastr_data(
814
        MASTR_INDEX_COL,
815
        MASTR_RELEVANT_COLS,
816
        MASTR_DTYPES,
817
        MASTR_PARSE_DATES,
818
    )
819
820
    clean_mastr_df = clean_mastr_data(
821
        mastr_df,
822
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
823
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
824
        seed=SEED,
825
        rounding=ROUNDING,
826
    )
827
828
    geocode_gdf = geocoded_data_from_db(EPSG)
829
830
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
831
832
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
833
834
    municipalities_gdf = municipality_data()
835
836
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
837
838
    return drop_gens_outside_muns(valid_mastr_gdf)
839
840
841
class OsmBuildingsFiltered(Base):
842
    __tablename__ = "osm_buildings_filtered"
843
    __table_args__ = {"schema": "openstreetmap"}
844
845
    osm_id = Column(BigInteger)
846
    amenity = Column(String)
847
    building = Column(String)
848
    name = Column(String)
849
    geom = Column(Geometry(srid=SRID), index=True)
850
    area = Column(Float)
851
    geom_point = Column(Geometry(srid=SRID), index=True)
852
    tags = Column(HSTORE)
853
    id = Column(BigInteger, primary_key=True, index=True)
854
855
856
@timer_func
857
def osm_buildings(
858
    to_crs: CRS,
859
) -> gpd.GeoDataFrame:
860
    """
861
    Read OSM buildings data from eGo^n Database.
862
    Parameters
863
    -----------
864
    to_crs : pyproj.crs.crs.CRS
865
        CRS to transform geometries to.
866
    Returns
867
    -------
868
    geopandas.GeoDataFrame
869
        GeoDataFrame containing OSM buildings data.
870
    """
871
    with db.session_scope() as session:
872
        query = session.query(
873
            OsmBuildingsFiltered.id,
874
            OsmBuildingsFiltered.area,
875
            OsmBuildingsFiltered.geom_point.label("geom"),
876
        )
877
878
    return gpd.read_postgis(
879
        query.statement, query.session.bind, index_col="id"
880
    ).to_crs(to_crs)
881
882
883
@timer_func
884
def synthetic_buildings(
885
    to_crs: CRS,
886
) -> gpd.GeoDataFrame:
887
    """
888
    Read synthetic buildings data from eGo^n Database.
889
    Parameters
890
    -----------
891
    to_crs : pyproj.crs.crs.CRS
892
        CRS to transform geometries to.
893
    Returns
894
    -------
895
    geopandas.GeoDataFrame
896
        GeoDataFrame containing OSM buildings data.
897
    """
898
    with db.session_scope() as session:
899
        query = session.query(
900
            OsmBuildingsSynthetic.id,
901
            OsmBuildingsSynthetic.area,
902
            OsmBuildingsSynthetic.geom_point.label("geom"),
903
        )
904
905
    return gpd.read_postgis(
906
        query.statement, query.session.bind, index_col="id"
907
    ).to_crs(to_crs)
908
909
910
@timer_func
911
def add_ags_to_buildings(
912
    buildings_gdf: gpd.GeoDataFrame,
913
    municipalities_gdf: gpd.GeoDataFrame,
914
) -> gpd.GeoDataFrame:
915
    """
916
    Add information about AGS ID to buildings.
917
    Parameters
918
    -----------
919
    buildings_gdf : geopandas.GeoDataFrame
920
        GeoDataFrame containing OSM buildings data.
921
    municipalities_gdf : geopandas.GeoDataFrame
922
        GeoDataFrame with municipality data.
923
    Returns
924
    -------
925
    gepandas.GeoDataFrame
926
        GeoDataFrame containing OSM buildings data
927
        with AGS ID added.
928
    """
929
    return buildings_gdf.sjoin(
930
        municipalities_gdf,
931
        how="left",
932
        predicate="intersects",
933
    ).rename(columns={"index_right": "ags"})
934
935
936
def drop_buildings_outside_muns(
937
    buildings_gdf: gpd.GeoDataFrame,
938
) -> gpd.GeoDataFrame:
939
    """
940
    Drop all buildings outside of municipalities.
941
    Parameters
942
    -----------
943
    buildings_gdf : geopandas.GeoDataFrame
944
        GeoDataFrame containing OSM buildings data.
945
    Returns
946
    -------
947
    gepandas.GeoDataFrame
948
        GeoDataFrame containing OSM buildings data
949
        with buildings without an AGS ID dropped.
950
    """
951
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
952
953
    logger.debug(
954
        f"{len(buildings_gdf) - len(gdf)} "
955
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
956
        f"of {len(buildings_gdf)} values are outside of the municipalities "
957
        "and are therefore dropped."
958
    )
959
960
    return gdf
961
962
963
def egon_building_peak_loads():
964
    sql = """
965
    SELECT building_id
966
    FROM demand.egon_building_electricity_peak_loads
967
    WHERE scenario = 'eGon2035'
968
    """
969
970
    return (
971
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
972
    )
973
974
975
@timer_func
976
def load_building_data():
977
    """
978
    Read buildings from DB
979
    Tables:
980
981
    * `openstreetmap.osm_buildings_filtered` (from OSM)
982
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
983
984
    Use column `id` for both as it is unique hence you concat both datasets. If
985
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
986
987
    Returns
988
    -------
989
    gepandas.GeoDataFrame
990
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
991
        dropped.
992
    """
993
994
    municipalities_gdf = municipality_data()
995
996
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
997
998
    if INCLUDE_SYNTHETIC_BUILDINGS:
999
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
1000
1001
        buildings_gdf = gpd.GeoDataFrame(
1002
            pd.concat(
1003
                [
1004
                    osm_buildings_gdf,
1005
                    synthetic_buildings_gdf,
1006
                ]
1007
            ),
1008
            geometry="geom",
1009
            crs=osm_buildings_gdf.crs,
1010
        ).rename(columns={"area": "building_area"})
1011
1012
        buildings_gdf.index = buildings_gdf.index.astype(int)
1013
1014
    else:
1015
        buildings_gdf = osm_buildings_gdf.rename(
1016
            columns={"area": "building_area"}
1017
        )
1018
1019
    if ONLY_BUILDINGS_WITH_DEMAND:
1020
        building_ids = egon_building_peak_loads()
1021
1022
        init_len = len(building_ids)
1023
1024
        building_ids = np.intersect1d(
1025
            list(map(int, building_ids)),
1026
            list(map(int, buildings_gdf.index.to_numpy())),
1027
        )
1028
1029
        end_len = len(building_ids)
1030
1031
        logger.debug(
1032
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of buildings have "
1033
            f"peak load."
1034
        )
1035
1036
        buildings_gdf = buildings_gdf.loc[building_ids]
1037
1038
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1039
1040
    logger.debug("Loaded buildings.")
1041
1042
    return drop_buildings_outside_muns(buildings_ags_gdf)
1043
1044
1045
@timer_func
1046
def sort_and_qcut_df(
1047
    df: pd.DataFrame | gpd.GeoDataFrame,
1048
    col: str,
1049
    q: int,
1050
) -> pd.DataFrame | gpd.GeoDataFrame:
1051
    """
1052
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1053
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1054
    Parameters
1055
    -----------
1056
    df : pandas.DataFrame or geopandas.GeoDataFrame
1057
        (Geo)DataFrame to sort and qcut.
1058
    col : str
1059
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1060
    q : int
1061
        Number of quantiles.
1062
    Returns
1063
    -------
1064
    pandas.DataFrame or gepandas.GeoDataFrame
1065
        Sorted and qcut (Geo)DataFrame.
1066
    """
1067
    df = df.sort_values(col, ascending=True)
1068
1069
    return df.assign(
1070
        quant=pd.qcut(
1071
            df[col],
1072
            q=q,
1073
            labels=range(q),
1074
        )
1075
    )
1076
1077
1078
@timer_func
1079
def allocate_pv(
1080
    q_mastr_gdf: gpd.GeoDataFrame,
1081
    q_buildings_gdf: gpd.GeoDataFrame,
1082
    seed: int,
1083
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1084
    """
1085
    Allocate the MaStR pv generators to the OSM buildings.
1086
    This will determine a building for each pv generator if there are more
1087
    buildings than generators within a given AGS. Primarily generators are
1088
    distributed with the same qunatile as the buildings. Multiple assignment
1089
    is excluded.
1090
    Parameters
1091
    -----------
1092
    q_mastr_gdf : geopandas.GeoDataFrame
1093
        GeoDataFrame containing geocoded and qcut MaStR data.
1094
    q_buildings_gdf : geopandas.GeoDataFrame
1095
        GeoDataFrame containing qcut OSM buildings data.
1096
    seed : int
1097
        Seed to use for random operations with NumPy and pandas.
1098
    Returns
1099
    -------
1100
    tuple with two geopandas.GeoDataFrame s
1101
        GeoDataFrame containing MaStR data allocated to building IDs.
1102
        GeoDataFrame containing building data allocated to MaStR IDs.
1103
    """
1104
    rng = default_rng(seed=seed)
1105
1106
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1107
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1108
1109
    ags_list = q_buildings_gdf.ags.unique()
1110
1111
    if TEST_RUN:
1112
        ags_list = ags_list[:250]
1113
1114
    num_ags = len(ags_list)
1115
1116
    t0 = perf_counter()
1117
1118
    for count, ags in enumerate(ags_list):
1119
1120
        buildings = q_buildings_gdf.loc[
1121
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1122
        ]
1123
        gens = q_mastr_gdf.loc[
1124
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1125
        ]
1126
1127
        len_build = len(buildings)
1128
        len_gens = len(gens)
1129
1130
        if len_build < len_gens:
1131
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1132
            logger.error(
1133
                f"There are {len_gens} generators and only {len_build}"
1134
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1135
                "generators were truncated to match the amount of buildings."
1136
            )
1137
1138
            assert len_build == len(gens)
1139
1140
        for quant in gens.quant.unique():
1141
            q_buildings = buildings.loc[
1142
                (buildings.quant == quant) & (buildings.gens_id.isna())
1143
            ]
1144
            q_gens = gens.loc[gens.quant == quant]
1145
1146
            len_build = len(q_buildings)
1147
            len_gens = len(q_gens)
1148
1149
            if len_build < len_gens:
1150
                delta = len_gens - len_build
1151
1152
                logger.warning(
1153
                    f"There are {len_gens} generators and only {len_build} "
1154
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1155
                    f"buildings from AGS {ags} will be added randomly."
1156
                )
1157
1158
                add_buildings = pd.Index(
1159
                    rng.choice(
1160
                        buildings.loc[
1161
                            (buildings.quant != quant)
1162
                            & (buildings.gens_id.isna())
1163
                        ].index,
1164
                        size=delta,
1165
                        replace=False,
1166
                    )
1167
                )
1168
1169
                q_buildings = buildings.loc[
1170
                    q_buildings.index.append(add_buildings)
1171
                ]
1172
1173
                assert len(q_buildings) == len_gens
1174
1175
            chosen_buildings = pd.Index(
1176
                rng.choice(
1177
                    q_buildings.index,
1178
                    size=len_gens,
1179
                    replace=False,
1180
                )
1181
            )
1182
1183
            # q_mastr_gdf.loc[q_gens.index, "building_id"] = chosen_buildings
1184
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1185
1186
        if count % 100 == 0:
1187
            logger.debug(
1188
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1189
                f"{perf_counter() - t0:g} seconds."
1190
            )
1191
1192
            t0 = perf_counter()
1193
1194
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1195
1196
    q_mastr_gdf.loc[
1197
        assigned_buildings.gens_id, "building_id"
1198
    ] = assigned_buildings.index
1199
1200
    logger.debug("Allocated status quo generators to buildings.")
1201
1202
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1203
1204
1205
def frame_to_numeric(
1206
    df: pd.DataFrame | gpd.GeoDataFrame,
1207
) -> pd.DataFrame | gpd.GeoDataFrame:
1208
    """
1209
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1210
    Parameters
1211
    ----------
1212
    df : pandas.DataFrame or geopandas.GeoDataFrame
1213
    Returns
1214
    -------
1215
    pandas.DataFrame or geopandas.GeoDataFrame
1216
    """
1217
    if str(df.index.dtype) == "object":
1218
        df.index = pd.to_numeric(df.index, errors="ignore")
1219
1220
    for col in df.columns:
1221
        if str(df[col].dtype) == "object":
1222
            df[col] = pd.to_numeric(df[col], errors="ignore")
1223
1224
    return df
1225
1226
1227
def validate_output(
1228
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1229
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1230
) -> None:
1231
    """
1232
    Validate output.
1233
1234
    * Validate that there are exactly as many buildings with a pv system as there are
1235
      pv systems with a building
1236
    * Validate that the building IDs with a pv system are the same building IDs as
1237
      assigned to the pv systems
1238
    * Validate that the pv system IDs with a building are the same pv system IDs as
1239
      assigned to the buildings
1240
1241
    Parameters
1242
    -----------
1243
    desagg_mastr_gdf : geopandas.GeoDataFrame
1244
        GeoDataFrame containing MaStR data allocated to building IDs.
1245
    desagg_buildings_gdf : geopandas.GeoDataFrame
1246
        GeoDataFrame containing building data allocated to MaStR IDs.
1247
    """
1248
    assert len(
1249
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1250
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1251
    assert (
1252
        np.sort(
1253
            desagg_mastr_gdf.loc[
1254
                ~desagg_mastr_gdf.building_id.isna()
1255
            ].building_id.unique()
1256
        )
1257
        == np.sort(
1258
            desagg_buildings_gdf.loc[
1259
                ~desagg_buildings_gdf.gens_id.isna()
1260
            ].index.unique()
1261
        )
1262
    ).all()
1263
    assert (
1264
        np.sort(
1265
            desagg_mastr_gdf.loc[
1266
                ~desagg_mastr_gdf.building_id.isna()
1267
            ].index.unique()
1268
        )
1269
        == np.sort(
1270
            desagg_buildings_gdf.loc[
1271
                ~desagg_buildings_gdf.gens_id.isna()
1272
            ].gens_id.unique()
1273
        )
1274
    ).all()
1275
1276
    logger.debug("Validated output.")
1277
1278
1279
def drop_unallocated_gens(
1280
    gdf: gpd.GeoDataFrame,
1281
) -> gpd.GeoDataFrame:
1282
    """
1283
    Drop generators which did not get allocated.
1284
1285
    Parameters
1286
    -----------
1287
    gdf : geopandas.GeoDataFrame
1288
        GeoDataFrame containing MaStR data allocated to building IDs.
1289
    Returns
1290
    -------
1291
    geopandas.GeoDataFrame
1292
        GeoDataFrame containing MaStR data with generators dropped which did not get
1293
        allocated.
1294
    """
1295
    init_len = len(gdf)
1296
    gdf = gdf.loc[~gdf.building_id.isna()]
1297
    end_len = len(gdf)
1298
1299
    logger.debug(
1300
        f"Dropped {init_len - end_len} "
1301
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1302
        f" of {init_len} unallocated rows from MaStR DataFrame."
1303
    )
1304
1305
    return gdf
1306
1307
1308
@timer_func
1309
def allocate_to_buildings(
1310
    mastr_gdf: gpd.GeoDataFrame,
1311
    buildings_gdf: gpd.GeoDataFrame,
1312
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1313
    """
1314
    Allocate status quo pv rooftop generators to buildings.
1315
    Parameters
1316
    -----------
1317
    mastr_gdf : geopandas.GeoDataFrame
1318
        GeoDataFrame containing MaStR data with geocoded locations.
1319
    buildings_gdf : geopandas.GeoDataFrame
1320
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1321
        dropped.
1322
    Returns
1323
    -------
1324
    tuple with two geopandas.GeoDataFrame s
1325
        GeoDataFrame containing MaStR data allocated to building IDs.
1326
        GeoDataFrame containing building data allocated to MaStR IDs.
1327
    """
1328
    logger.debug("Starting allocation of status quo.")
1329
1330
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1331
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1332
1333
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1334
        q_mastr_gdf, q_buildings_gdf, SEED
1335
    )
1336
1337
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1338
1339
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1340
1341
1342
@timer_func
1343
def grid_districts(
1344
    epsg: int,
1345
) -> gpd.GeoDataFrame:
1346
    """
1347
    Load mv grid district geo data from eGo^n Database as
1348
    geopandas.GeoDataFrame.
1349
    Parameters
1350
    -----------
1351
    epsg : int
1352
        EPSG ID to use as CRS.
1353
    Returns
1354
    -------
1355
    geopandas.GeoDataFrame
1356
        GeoDataFrame containing mv grid district ID and geo shapes data.
1357
    """
1358
    gdf = db.select_geodataframe(
1359
        """
1360
        SELECT bus_id, geom
1361
        FROM grid.egon_mv_grid_district
1362
        ORDER BY bus_id
1363
        """,
1364
        index_col="bus_id",
1365
        geom_col="geom",
1366
        epsg=epsg,
1367
    )
1368
1369
    gdf.index = gdf.index.astype(int)
1370
1371
    logger.debug("Grid districts loaded.")
1372
1373
    return gdf
1374
1375
1376
def scenario_data(
1377
    carrier: str = "solar_rooftop",
1378
    scenario: str = "eGon2035",
1379
) -> pd.DataFrame:
1380
    """
1381
    Get scenario capacity data from eGo^n Database.
1382
    Parameters
1383
    -----------
1384
    carrier : str
1385
        Carrier type to filter table by.
1386
    scenario : str
1387
        Scenario to filter table by.
1388
    Returns
1389
    -------
1390
    geopandas.GeoDataFrame
1391
        GeoDataFrame with scenario capacity data in GW.
1392
    """
1393
    with db.session_scope() as session:
1394
        query = session.query(EgonScenarioCapacities).filter(
1395
            EgonScenarioCapacities.carrier == carrier,
1396
            EgonScenarioCapacities.scenario_name == scenario,
1397
        )
1398
1399
    df = pd.read_sql(
1400
        query.statement, query.session.bind, index_col="index"
1401
    ).sort_index()
1402
1403
    logger.debug("Scenario capacity data loaded.")
1404
1405
    return df
1406
1407
1408 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1409
    __tablename__ = "vg250_lan"
1410
    __table_args__ = {"schema": "boundaries"}
1411
1412
    id = Column(BigInteger, primary_key=True, index=True)
1413
    ade = Column(BigInteger)
1414
    gf = Column(BigInteger)
1415
    bsg = Column(BigInteger)
1416
    ars = Column(String)
1417
    ags = Column(String)
1418
    sdv_ars = Column(String)
1419
    gen = Column(String)
1420
    bez = Column(String)
1421
    ibz = Column(BigInteger)
1422
    bem = Column(String)
1423
    nbd = Column(String)
1424
    sn_l = Column(String)
1425
    sn_r = Column(String)
1426
    sn_k = Column(String)
1427
    sn_v1 = Column(String)
1428
    sn_v2 = Column(String)
1429
    sn_g = Column(String)
1430
    fk_s3 = Column(String)
1431
    nuts = Column(String)
1432
    ars_0 = Column(String)
1433
    ags_0 = Column(String)
1434
    wsk = Column(String)
1435
    debkg_id = Column(String)
1436
    rs = Column(String)
1437
    sdv_rs = Column(String)
1438
    rs_0 = Column(String)
1439
    geometry = Column(Geometry(srid=EPSG), index=True)
1440
1441
1442
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1443
    """
1444
    Get feder state data from eGo^n Database.
1445
    Parameters
1446
    -----------
1447
    to_crs : pyproj.crs.crs.CRS
1448
        CRS to transform geometries to.
1449
    Returns
1450
    -------
1451
    geopandas.GeoDataFrame
1452
        GeoDataFrame with federal state data.
1453
    """
1454
    with db.session_scope() as session:
1455
        query = session.query(
1456
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1457
        )
1458
1459
    gdf = gpd.read_postgis(
1460
        query.statement, query.session.bind, index_col="id"
1461
    ).to_crs(to_crs)
1462
1463
    logger.debug("Federal State data loaded.")
1464
1465
    return gdf
1466
1467
1468
@timer_func
1469
def overlay_grid_districts_with_counties(
1470
    mv_grid_district_gdf: gpd.GeoDataFrame,
1471
    federal_state_gdf: gpd.GeoDataFrame,
1472
) -> gpd.GeoDataFrame:
1473
    """
1474
    Calculate the intersections of mv grid districts and counties.
1475
    Parameters
1476
    -----------
1477
    mv_grid_district_gdf : gpd.GeoDataFrame
1478
        GeoDataFrame containing mv grid district ID and geo shapes data.
1479
    federal_state_gdf : gpd.GeoDataFrame
1480
        GeoDataFrame with federal state data.
1481
    Returns
1482
    -------
1483
    geopandas.GeoDataFrame
1484
        GeoDataFrame containing OSM buildings data.
1485
    """
1486
    logger.debug(
1487
        "Calculating intersection overlay between mv grid districts and "
1488
        "counties. This may take a while..."
1489
    )
1490
1491
    gdf = gpd.overlay(
1492
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1493
        mv_grid_district_gdf.reset_index(),
1494
        how="intersection",
1495
        keep_geom_type=True,
1496
    )
1497
1498
    logger.debug("Done!")
1499
1500
    return gdf
1501
1502
1503
@timer_func
1504
def add_overlay_id_to_buildings(
1505
    buildings_gdf: gpd.GeoDataFrame,
1506
    grid_federal_state_gdf: gpd.GeoDataFrame,
1507
) -> gpd.GeoDataFrame:
1508
    """
1509
    Add information about overlay ID to buildings.
1510
    Parameters
1511
    -----------
1512
    buildings_gdf : geopandas.GeoDataFrame
1513
        GeoDataFrame containing OSM buildings data.
1514
    grid_federal_state_gdf : geopandas.GeoDataFrame
1515
        GeoDataFrame with intersection shapes between counties and grid districts.
1516
    Returns
1517
    -------
1518
    geopandas.GeoDataFrame
1519
        GeoDataFrame containing OSM buildings data with overlay ID added.
1520
    """
1521
    gdf = (
1522
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1523
        .sjoin(
1524
            grid_federal_state_gdf,
1525
            how="left",
1526
            predicate="intersects",
1527
        )
1528
        .rename(columns={"index_right": "overlay_id"})
1529
    )
1530
1531
    logger.debug("Added overlay ID to OSM buildings.")
1532
1533
    return gdf
1534
1535
1536
def drop_buildings_outside_grids(
1537
    buildings_gdf: gpd.GeoDataFrame,
1538
) -> gpd.GeoDataFrame:
1539
    """
1540
    Drop all buildings outside of grid areas.
1541
    Parameters
1542
    -----------
1543
    buildings_gdf : geopandas.GeoDataFrame
1544
        GeoDataFrame containing OSM buildings data.
1545
    Returns
1546
    -------
1547
    gepandas.GeoDataFrame
1548
        GeoDataFrame containing OSM buildings data
1549
        with buildings without an bus ID dropped.
1550
    """
1551
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1552
1553
    logger.debug(
1554
        f"{len(buildings_gdf) - len(gdf)} "
1555
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1556
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1557
        "and are therefore dropped."
1558
    )
1559
1560
    return gdf
1561
1562
1563
def cap_per_bus_id(
1564
    scenario: str,
1565
) -> pd.DataFrame:
1566
    """
1567
    Get table with total pv rooftop capacity per grid district.
1568
1569
    Parameters
1570
    -----------
1571
    scenario : str
1572
        Scenario name.
1573
    Returns
1574
    -------
1575
    pandas.DataFrame
1576
        DataFrame with total rooftop capacity per mv grid.
1577
    """
1578
    targets = config.datasets()["solar_rooftop"]["targets"]
1579
1580
    sql = f"""
1581
    SELECT bus as bus_id, p_nom as capacity
1582
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1583
    WHERE carrier = 'solar_rooftop'
1584
    AND scn_name = '{scenario}'
1585
    """
1586
1587
    return db.select_dataframe(sql, index_col="bus_id")
1588
1589
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1590
    #
1591
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1592
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1593
    #
1594
    #     capacity = nuts_gdf.building_area.multiply(
1595
    #         cap / nuts_gdf.building_area.sum()
1596
    #     )
1597
    #
1598
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1599
    #         nuts_gdf.index
1600
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1601
    #
1602
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1603
1604
1605
def determine_end_of_life_gens(
1606
    mastr_gdf: gpd.GeoDataFrame,
1607
    scenario_timestamp: pd.Timestamp,
1608
    pv_rooftop_lifetime: pd.Timedelta,
1609
) -> gpd.GeoDataFrame:
1610
    """
1611
    Determine if an old PV system has reached its end of life.
1612
    Parameters
1613
    -----------
1614
    mastr_gdf : geopandas.GeoDataFrame
1615
        GeoDataFrame containing geocoded MaStR data.
1616
    scenario_timestamp : pandas.Timestamp
1617
        Timestamp at which the scenario takes place.
1618
    pv_rooftop_lifetime : pandas.Timedelta
1619
        Average expected lifetime of PV rooftop systems.
1620
    Returns
1621
    -------
1622
    geopandas.GeoDataFrame
1623
        GeoDataFrame containing geocoded MaStR data and info if the system
1624
        has reached its end of life.
1625
    """
1626
    mastr_gdf = mastr_gdf.assign(
1627
        age=scenario_timestamp - mastr_gdf.start_up_date
1628
    )
1629
1630
    logger.debug("Determined if pv rooftop systems reached their end of life.")
1631
1632
    return mastr_gdf.assign(end_of_life=pv_rooftop_lifetime < mastr_gdf.age)
1633
1634
1635
def calculate_max_pv_cap_per_building(
1636
    buildings_gdf: gpd.GeoDataFrame,
1637
    mastr_gdf: gpd.GeoDataFrame,
1638
    pv_cap_per_sq_m: float | int,
1639
    roof_factor: float | int,
1640
) -> gpd.GeoDataFrame:
1641
    """
1642
    Calculate the estimated maximum possible PV capacity per building.
1643
    Parameters
1644
    -----------
1645
    buildings_gdf : geopandas.GeoDataFrame
1646
        GeoDataFrame containing OSM buildings data.
1647
    mastr_gdf : geopandas.GeoDataFrame
1648
        GeoDataFrame containing geocoded MaStR data.
1649
    pv_cap_per_sq_m : float, int
1650
        Average expected, installable PV capacity per square meter.
1651
    roof_factor : float, int
1652
        Average for PV usable roof area share.
1653
    Returns
1654
    -------
1655
    geopandas.GeoDataFrame
1656
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1657
        capacity.
1658
    """
1659
    gdf = (
1660
        buildings_gdf.reset_index()
1661
        .merge(
1662
            mastr_gdf[
1663
                [
1664
                    "capacity",
1665
                    "end_of_life",
1666
                    "building_id",
1667
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1668
                    "Hauptausrichtung",
1669
                    "HauptausrichtungNeigungswinkel",
1670
                ]
1671
            ],
1672
            how="left",
1673
            left_on="id",
1674
            right_on="building_id",
1675
        )
1676
        .set_index("id")
1677
        .drop(columns="building_id")
1678
    )
1679
1680
    return gdf.assign(
1681
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1682
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1683
        bus_id=gdf.bus_id.astype(int),
1684
    )
1685
1686
1687
def calculate_building_load_factor(
1688
    mastr_gdf: gpd.GeoDataFrame,
1689
    buildings_gdf: gpd.GeoDataFrame,
1690
    rounding: int = 4,
1691
) -> gpd.GeoDataFrame:
1692
    """
1693
    Calculate the roof load factor from existing PV systems.
1694
    Parameters
1695
    -----------
1696
    mastr_gdf : geopandas.GeoDataFrame
1697
        GeoDataFrame containing geocoded MaStR data.
1698
    buildings_gdf : geopandas.GeoDataFrame
1699
        GeoDataFrame containing OSM buildings data.
1700
    rounding : int
1701
        Rounding to use for load factor.
1702
    Returns
1703
    -------
1704
    geopandas.GeoDataFrame
1705
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1706
    """
1707
    gdf = mastr_gdf.merge(
1708
        buildings_gdf[["max_cap", "building_area"]].loc[
1709
            ~buildings_gdf["max_cap"].isna()
1710
        ],
1711
        how="left",
1712
        left_on="building_id",
1713
        right_index=True,
1714
    )
1715
1716
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1717
1718
1719
def get_probability_for_property(
1720
    mastr_gdf: gpd.GeoDataFrame,
1721
    cap_range: tuple[int | float, int | float],
1722
    prop: str,
1723
) -> tuple[np.array, np.array]:
1724
    """
1725
    Calculate the probability of the different options of a property of the
1726
    existing PV plants.
1727
    Parameters
1728
    -----------
1729
    mastr_gdf : geopandas.GeoDataFrame
1730
        GeoDataFrame containing geocoded MaStR data.
1731
    cap_range : tuple(int, int)
1732
        Capacity range of PV plants to look at.
1733
    prop : str
1734
        Property to calculate probabilities for. String needs to be in columns
1735
        of mastr_gdf.
1736
    Returns
1737
    -------
1738
    tuple
1739
        numpy.array
1740
            Unique values of property.
1741
        numpy.array
1742
            Probabilties per unique value.
1743
    """
1744
    cap_range_gdf = mastr_gdf.loc[
1745
        (mastr_gdf.capacity > cap_range[0])
1746
        & (mastr_gdf.capacity <= cap_range[1])
1747
    ]
1748
1749
    if prop == "load_factor":
1750
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1751
1752
    count = Counter(
1753
        cap_range_gdf[prop].loc[
1754
            ~cap_range_gdf[prop].isna()
1755
            & ~cap_range_gdf[prop].isnull()
1756
            & ~(cap_range_gdf[prop] == "None")
1757
        ]
1758
    )
1759
1760
    values = np.array(list(count.keys()))
1761
    probabilities = np.fromiter(count.values(), dtype=float)
1762
    probabilities = probabilities / np.sum(probabilities)
1763
1764
    return values, probabilities
1765
1766
1767
@timer_func
1768
def probabilities(
1769
    mastr_gdf: gpd.GeoDataFrame,
1770
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1771
    properties: list[str] | None = None,
1772
) -> dict:
1773
    """
1774
    Calculate the probability of the different options of properties of the
1775
    existing PV plants.
1776
    Parameters
1777
    -----------
1778
    mastr_gdf : geopandas.GeoDataFrame
1779
        GeoDataFrame containing geocoded MaStR data.
1780
    cap_ranges : list(tuple(int, int))
1781
        List of capacity ranges to distinguish between. The first tuple should
1782
        start with a zero and the last one should end with infinite.
1783
    properties : list(str)
1784
        List of properties to calculate probabilities for. Strings needs to be
1785
        in columns of mastr_gdf.
1786
    Returns
1787
    -------
1788
    dict
1789
        Dictionary with values and probabilities per capacity range.
1790
    """
1791
    if cap_ranges is None:
1792
        cap_ranges = [
1793
            (0, 30),
1794
            (30, 100),
1795
            (100, float("inf")),
1796
        ]
1797
    if properties is None:
1798
        properties = [
1799
            "EinheitlicheAusrichtungUndNeigungswinkel",
1800
            "Hauptausrichtung",
1801
            "HauptausrichtungNeigungswinkel",
1802
            "load_factor",
1803
        ]
1804
1805
    prob_dict = {}
1806
1807
    for cap_range in cap_ranges:
1808
        prob_dict[cap_range] = {
1809
            "values": {},
1810
            "probabilities": {},
1811
        }
1812
1813
        for prop in properties:
1814
            v, p = get_probability_for_property(
1815
                mastr_gdf,
1816
                cap_range,
1817
                prop,
1818
            )
1819
1820
            prob_dict[cap_range]["values"][prop] = v
1821
            prob_dict[cap_range]["probabilities"][prop] = p
1822
1823
    return prob_dict
1824
1825
1826
def cap_share_per_cap_range(
1827
    mastr_gdf: gpd.GeoDataFrame,
1828
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1829
) -> dict[tuple[int | float, int | float], float]:
1830
    """
1831
    Calculate the share of PV capacity from the total PV capacity within
1832
    capacity ranges.
1833
    Parameters
1834
    -----------
1835
    mastr_gdf : geopandas.GeoDataFrame
1836
        GeoDataFrame containing geocoded MaStR data.
1837
    cap_ranges : list(tuple(int, int))
1838
        List of capacity ranges to distinguish between. The first tuple should
1839
        start with a zero and the last one should end with infinite.
1840
    Returns
1841
    -------
1842
    dict
1843
        Dictionary with share of PV capacity from the total PV capacity within
1844
        capacity ranges.
1845
    """
1846
    if cap_ranges is None:
1847
        cap_ranges = [
1848
            (0, 30),
1849
            (30, 100),
1850
            (100, float("inf")),
1851
        ]
1852
1853
    cap_share_dict = {}
1854
1855
    total_cap = mastr_gdf.capacity.sum()
1856
1857
    for cap_range in cap_ranges:
1858
        cap_share = (
1859
            mastr_gdf.loc[
1860
                (mastr_gdf.capacity > cap_range[0])
1861
                & (mastr_gdf.capacity <= cap_range[1])
1862
            ].capacity.sum()
1863
            / total_cap
1864
        )
1865
1866
        cap_share_dict[cap_range] = cap_share
1867
1868
    return cap_share_dict
1869
1870
1871
def mean_load_factor_per_cap_range(
1872
    mastr_gdf: gpd.GeoDataFrame,
1873
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1874
) -> dict[tuple[int | float, int | float], float]:
1875
    """
1876
    Calculate the mean roof load factor per capacity range from existing PV
1877
    plants.
1878
    Parameters
1879
    -----------
1880
    mastr_gdf : geopandas.GeoDataFrame
1881
        GeoDataFrame containing geocoded MaStR data.
1882
    cap_ranges : list(tuple(int, int))
1883
        List of capacity ranges to distinguish between. The first tuple should
1884
        start with a zero and the last one should end with infinite.
1885
    Returns
1886
    -------
1887
    dict
1888
        Dictionary with mean roof load factor per capacity range.
1889
    """
1890
    if cap_ranges is None:
1891
        cap_ranges = [
1892
            (0, 30),
1893
            (30, 100),
1894
            (100, float("inf")),
1895
        ]
1896
1897
    load_factor_dict = {}
1898
1899
    for cap_range in cap_ranges:
1900
        load_factor = mastr_gdf.loc[
1901
            (mastr_gdf.load_factor <= 1)
1902
            & (mastr_gdf.capacity > cap_range[0])
1903
            & (mastr_gdf.capacity <= cap_range[1])
1904
        ].load_factor.mean()
1905
1906
        load_factor_dict[cap_range] = load_factor
1907
1908
    return load_factor_dict
1909
1910
1911
def building_area_range_per_cap_range(
1912
    mastr_gdf: gpd.GeoDataFrame,
1913
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1914
    min_building_size: int | float = 10.0,
1915
    upper_quantile: float = 0.95,
1916
    lower_quantile: float = 0.05,
1917
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1918
    """
1919
    Estimate normal building area range per capacity range.
1920
    Calculate the mean roof load factor per capacity range from existing PV
1921
    plants.
1922
    Parameters
1923
    -----------
1924
    mastr_gdf : geopandas.GeoDataFrame
1925
        GeoDataFrame containing geocoded MaStR data.
1926
    cap_ranges : list(tuple(int, int))
1927
        List of capacity ranges to distinguish between. The first tuple should
1928
        start with a zero and the last one should end with infinite.
1929
    min_building_size : int, float
1930
        Minimal building size to consider for PV plants.
1931
    upper_quantile : float
1932
        Upper quantile to estimate maximum building size per capacity range.
1933
    lower_quantile : float
1934
        Lower quantile to estimate minimum building size per capacity range.
1935
    Returns
1936
    -------
1937
    dict
1938
        Dictionary with estimated normal building area range per capacity
1939
        range.
1940
    """
1941
    if cap_ranges is None:
1942
        cap_ranges = [
1943
            (0, 30),
1944
            (30, 100),
1945
            (100, float("inf")),
1946
        ]
1947
1948
    building_area_range_dict = {}
1949
1950
    n_ranges = len(cap_ranges)
1951
1952
    for count, cap_range in enumerate(cap_ranges):
1953
        cap_range_gdf = mastr_gdf.loc[
1954
            (mastr_gdf.capacity > cap_range[0])
1955
            & (mastr_gdf.capacity <= cap_range[1])
1956
        ]
1957
1958
        if count == 0:
1959
            building_area_range_dict[cap_range] = (
1960
                min_building_size,
1961
                cap_range_gdf.building_area.quantile(upper_quantile),
1962
            )
1963
        elif count == n_ranges - 1:
1964
            building_area_range_dict[cap_range] = (
1965
                cap_range_gdf.building_area.quantile(lower_quantile),
1966
                float("inf"),
1967
            )
1968
        else:
1969
            building_area_range_dict[cap_range] = (
1970
                cap_range_gdf.building_area.quantile(lower_quantile),
1971
                cap_range_gdf.building_area.quantile(upper_quantile),
1972
            )
1973
1974
    values = list(building_area_range_dict.values())
1975
1976
    building_area_range_normed_dict = {}
1977
1978
    for count, (cap_range, (min_area, max_area)) in enumerate(
1979
        building_area_range_dict.items()
1980
    ):
1981
        if count == 0:
1982
            building_area_range_normed_dict[cap_range] = (
1983
                min_area,
1984
                np.mean((values[count + 1][0], max_area)),
1985
            )
1986
        elif count == n_ranges - 1:
1987
            building_area_range_normed_dict[cap_range] = (
1988
                np.mean((values[count - 1][1], min_area)),
1989
                max_area,
1990
            )
1991
        else:
1992
            building_area_range_normed_dict[cap_range] = (
1993
                np.mean((values[count - 1][1], min_area)),
1994
                np.mean((values[count + 1][0], max_area)),
1995
            )
1996
1997
    return building_area_range_normed_dict
1998
1999
2000
@timer_func
2001
def desaggregate_pv_in_mv_grid(
2002
    buildings_gdf: gpd.GeoDataFrame,
2003
    pv_cap: float | int,
2004
    **kwargs,
2005
) -> gpd.GeoDataFrame:
2006
    """
2007
    Desaggregate PV capacity on buildings within a given grid district.
2008
    Parameters
2009
    -----------
2010
    buildings_gdf : geopandas.GeoDataFrame
2011
        GeoDataFrame containing buildings within the grid district.
2012
    pv_cap : float, int
2013
        PV capacity to desaggregate.
2014
    Other Parameters
2015
    -----------
2016
    prob_dict : dict
2017
        Dictionary with values and probabilities per capacity range.
2018
    cap_share_dict : dict
2019
        Dictionary with share of PV capacity from the total PV capacity within
2020
        capacity ranges.
2021
    building_area_range_dict : dict
2022
        Dictionary with estimated normal building area range per capacity
2023
        range.
2024
    load_factor_dict : dict
2025
        Dictionary with mean roof load factor per capacity range.
2026
    seed : int
2027
        Seed to use for random operations with NumPy and pandas.
2028
    pv_cap_per_sq_m : float, int
2029
        Average expected, installable PV capacity per square meter.
2030
    Returns
2031
    -------
2032
    geopandas.GeoDataFrame
2033
        GeoDataFrame containing OSM building data with desaggregated PV
2034
        plants.
2035
    """
2036
    bus_id = int(buildings_gdf.bus_id.iat[0])
2037
2038
    rng = default_rng(seed=kwargs["seed"])
2039
    random_state = RandomState(seed=kwargs["seed"])
2040
2041
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2042
2043
    for cap_range, share in kwargs["cap_share_dict"].items():
2044
        pv_cap_range = pv_cap * share
2045
2046
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2047
2048
        cap_range_buildings_gdf = buildings_gdf.loc[
2049
            ~buildings_gdf.index.isin(results_df.index)
2050
            & (buildings_gdf.building_area > b_area_min)
2051
            & (buildings_gdf.building_area <= b_area_max)
2052
        ]
2053
2054
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2055
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2056
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2057
            load_factor=np.nan,
2058
            capacity=np.nan,
2059
        )
2060
2061
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2062
2063
        if total_mean_cap == 0:
2064
            logger.warning(
2065
                f"There are no matching roof for capacity range {cap_range} "
2066
                f"kW in grid {bus_id}. Using all buildings as fallback."
2067
            )
2068
2069
            cap_range_buildings_gdf = buildings_gdf.loc[
2070
                ~buildings_gdf.index.isin(results_df.index)
2071
            ]
2072
2073
            if len(cap_range_buildings_gdf) == 0:
2074
                logger.warning(
2075
                    "There are no roofes available for capacity range "
2076
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2077
                )
2078
                cap_range_buildings_gdf = buildings_gdf.copy()
2079
2080
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2081
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2082
                load_factor=np.nan,
2083
                capacity=np.nan,
2084
            )
2085
2086
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2087
2088
        elif total_mean_cap < pv_cap_range:
2089
            logger.warning(
2090
                f"Average roof utilization of the roof area in grid {bus_id} "
2091
                f"and capacity range {cap_range} kW is not sufficient. The "
2092
                "roof utilization will be above average."
2093
            )
2094
2095
        frac = max(
2096
            pv_cap_range / total_mean_cap,
2097
            1 / len(cap_range_buildings_gdf),
2098
        )
2099
2100
        samples_gdf = cap_range_buildings_gdf.sample(
2101
            frac=min(1, frac),
2102
            random_state=random_state,
2103
        )
2104
2105
        cap_range_dict = kwargs["prob_dict"][cap_range]
2106
2107
        values_dict = cap_range_dict["values"]
2108
        p_dict = cap_range_dict["probabilities"]
2109
2110
        load_factors = rng.choice(
2111
            a=values_dict["load_factor"],
2112
            size=len(samples_gdf),
2113
            p=p_dict["load_factor"],
2114
        )
2115
2116
        samples_gdf = samples_gdf.assign(
2117
            load_factor=load_factors,
2118
            capacity=(
2119
                samples_gdf.building_area
2120
                * load_factors
2121
                * kwargs["pv_cap_per_sq_m"]
2122
            ).clip(lower=0.4),
2123
        )
2124
2125
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2126
2127
        samples_gdf = samples_gdf.assign(
2128
            capacity=(samples_gdf.capacity * missing_factor),
2129
            load_factor=(samples_gdf.load_factor * missing_factor),
2130
        )
2131
2132
        assert np.isclose(
2133
            samples_gdf.capacity.sum(),
2134
            pv_cap_range,
2135
            rtol=1e-03,
2136
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2137
2138
        results_df = pd.concat(
2139
            [
2140
                results_df,
2141
                samples_gdf,
2142
            ],
2143
        )
2144
2145
    total_missing_factor = pv_cap / results_df.capacity.sum()
2146
2147
    results_df = results_df.assign(
2148
        capacity=(results_df.capacity * total_missing_factor),
2149
    )
2150
2151
    assert np.isclose(
2152
        results_df.capacity.sum(),
2153
        pv_cap,
2154
        rtol=1e-03,
2155
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2156
2157
    return gpd.GeoDataFrame(
2158
        results_df,
2159
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2043 is not entered. Are you sure this can never be the case?
Loading history...
2160
        geometry="geom",
2161
    )
2162
2163
2164
@timer_func
2165
def desaggregate_pv(
2166
    buildings_gdf: gpd.GeoDataFrame,
2167
    cap_df: pd.DataFrame,
2168
    **kwargs,
2169
) -> gpd.GeoDataFrame:
2170
    """
2171
    Desaggregate PV capacity on buildings within a given grid district.
2172
    Parameters
2173
    -----------
2174
    buildings_gdf : geopandas.GeoDataFrame
2175
        GeoDataFrame containing OSM buildings data.
2176
    cap_df : pandas.DataFrame
2177
        DataFrame with total rooftop capacity per mv grid.
2178
    Other Parameters
2179
    -----------
2180
    prob_dict : dict
2181
        Dictionary with values and probabilities per capacity range.
2182
    cap_share_dict : dict
2183
        Dictionary with share of PV capacity from the total PV capacity within
2184
        capacity ranges.
2185
    building_area_range_dict : dict
2186
        Dictionary with estimated normal building area range per capacity
2187
        range.
2188
    load_factor_dict : dict
2189
        Dictionary with mean roof load factor per capacity range.
2190
    seed : int
2191
        Seed to use for random operations with NumPy and pandas.
2192
    pv_cap_per_sq_m : float, int
2193
        Average expected, installable PV capacity per square meter.
2194
    Returns
2195
    -------
2196
    geopandas.GeoDataFrame
2197
        GeoDataFrame containing OSM building data with desaggregated PV
2198
        plants.
2199
    """
2200
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2201
2202
    building_bus_ids = set(buildings_gdf.bus_id)
2203
    cap_bus_ids = set(cap_df.index)
2204
2205
    logger.debug(
2206
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from capacity: "
2207
        f"{len(cap_bus_ids)}"
2208
    )
2209
2210
    if len(building_bus_ids) > len(cap_bus_ids):
2211
        missing = building_bus_ids - cap_bus_ids
2212
    else:
2213
        missing = cap_bus_ids - building_bus_ids
2214
2215
    logger.debug(str(missing))
2216
2217
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2218
2219
    for bus_id in buildings_gdf.bus_id.unique():
2220
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2221
2222
        pv_installed_gdf = buildings_grid_gdf.loc[
2223
            ~buildings_grid_gdf.end_of_life
2224
        ]
2225
2226
        pv_installed = pv_installed_gdf.capacity.sum()
2227
2228
        pot_buildings_gdf = buildings_grid_gdf.drop(
2229
            index=pv_installed_gdf.index
2230
        )
2231
2232
        if len(pot_buildings_gdf) == 0:
2233
            logger.error(
2234
                f"In grid {bus_id} there are no potential buildings to allocate "
2235
                "PV capacity to. The grid is skipped. This message should only "
2236
                "appear doing test runs with few buildings."
2237
            )
2238
2239
            continue
2240
2241
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2242
2243
        logger.debug(f"pv_target: {pv_target}")
2244
2245
        pv_missing = pv_target - pv_installed
2246
2247
        if pv_missing <= 0:
2248
            logger.info(
2249
                f"In grid {bus_id} there is more PV installed ({pv_installed: g}) in "
2250
                f"status Quo than allocated within the scenario ({pv_target: g}). No "
2251
                f"new generators are added."
2252
            )
2253
2254
            continue
2255
2256
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2257
            logger.error(
2258
                f"In grid {bus_id} there is less PV potential ("
2259
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV  capacity "
2260
                f"({pv_missing:g} kW). The average roof utilization will be very high."
2261
            )
2262
2263
        gdf = desaggregate_pv_in_mv_grid(
2264
            buildings_gdf=pot_buildings_gdf,
2265
            pv_cap=pv_missing,
2266
            **kwargs,
2267
        )
2268
2269
        logger.debug(f"New cap in grid {bus_id}: {gdf.capacity.sum()}")
2270
        logger.debug(f"Installed cap in grid {bus_id}: {pv_installed}")
2271
        logger.debug(
2272
            f"Total cap in grid {bus_id}: {gdf.capacity.sum() + pv_installed}"
2273
        )
2274
2275
        if not np.isclose(
2276
            gdf.capacity.sum() + pv_installed, pv_target, rtol=1e-3
2277
        ):
2278
            logger.warning(
2279
                f"The desired capacity and actual capacity in grid {bus_id} differ.\n"
2280
                f"Desired cap: {pv_target}\nActual cap: "
2281
                f"{gdf.capacity.sum() + pv_installed}"
2282
            )
2283
2284
        allocated_buildings_gdf = pd.concat(
2285
            [
2286
                allocated_buildings_gdf,
2287
                gdf,
2288
            ]
2289
        )
2290
2291
    logger.debug("Desaggregated scenario.")
2292
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2293
    logger.debug(
2294
        f"Generator capacity: {allocated_buildings_gdf.capacity.sum(): g}"
2295
    )
2296
2297
    return gpd.GeoDataFrame(
2298
        allocated_buildings_gdf,
2299
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2300
        geometry="geom",
2301
    )
2302
2303
2304
@timer_func
2305
def add_buildings_meta_data(
2306
    buildings_gdf: gpd.GeoDataFrame,
2307
    prob_dict: dict,
2308
    seed: int,
2309
) -> gpd.GeoDataFrame:
2310
    """
2311
    Randomly add additional metadata to desaggregated PV plants.
2312
    Parameters
2313
    -----------
2314
    buildings_gdf : geopandas.GeoDataFrame
2315
        GeoDataFrame containing OSM buildings data with desaggregated PV
2316
        plants.
2317
    prob_dict : dict
2318
        Dictionary with values and probabilities per capacity range.
2319
    seed : int
2320
        Seed to use for random operations with NumPy and pandas.
2321
    Returns
2322
    -------
2323
    geopandas.GeoDataFrame
2324
        GeoDataFrame containing OSM building data with desaggregated PV
2325
        plants.
2326
    """
2327
    rng = default_rng(seed=seed)
2328
    buildings_gdf = buildings_gdf.reset_index().rename(
2329
        columns={
2330
            "index": "building_id",
2331
        }
2332
    )
2333
2334
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2335
        cap_range_gdf = buildings_gdf.loc[
2336
            (buildings_gdf.capacity >= min_cap)
2337
            & (buildings_gdf.capacity < max_cap)
2338
        ]
2339
2340
        for key, values in cap_range_prob_dict["values"].items():
2341
            if key == "load_factor":
2342
                continue
2343
2344
            gdf = cap_range_gdf.loc[
2345
                cap_range_gdf[key].isna()
2346
                | cap_range_gdf[key].isnull()
2347
                | (cap_range_gdf[key] == "None")
2348
            ]
2349
2350
            key_vals = rng.choice(
2351
                a=values,
2352
                size=len(gdf),
2353
                p=cap_range_prob_dict["probabilities"][key],
2354
            )
2355
2356
            buildings_gdf.loc[gdf.index, key] = key_vals
2357
2358
    return buildings_gdf
2359
2360
2361
def add_voltage_level(
2362
    buildings_gdf: gpd.GeoDataFrame,
2363
) -> gpd.GeoDataFrame:
2364
    """
2365
    Add voltage level derived from generator capacity to the power plants.
2366
    Parameters
2367
    -----------
2368
    buildings_gdf : geopandas.GeoDataFrame
2369
        GeoDataFrame containing OSM buildings data with desaggregated PV
2370
        plants.
2371
    Returns
2372
    -------
2373
    geopandas.GeoDataFrame
2374
        GeoDataFrame containing OSM building data with voltage level per generator.
2375
    """
2376
2377
    def voltage_levels(p: float) -> int:
2378
        if p < 100:
2379
            return 7
2380
        elif p < 200:
2381
            return 6
2382
        elif p < 5500:
2383
            return 5
2384
        elif p < 20000:
2385
            return 4
2386
        elif p < 120000:
2387
            return 3
2388
        return 1
2389
2390
    return buildings_gdf.assign(
2391
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2392
    )
2393
2394
2395
def add_start_up_date(
2396
    buildings_gdf: gpd.GeoDataFrame,
2397
    start: pd.Timestamp,
2398
    end: pd.Timestamp,
2399
    seed: int,
2400
):
2401
    """
2402
    Randomly and linear add start-up date to new pv generators.
2403
    Parameters
2404
    ----------
2405
    buildings_gdf : geopandas.GeoDataFrame
2406
        GeoDataFrame containing OSM buildings data with desaggregated PV
2407
        plants.
2408
    start : pandas.Timestamp
2409
        Minimum Timestamp to use.
2410
    end : pandas.Timestamp
2411
        Maximum Timestamp to use.
2412
    seed : int
2413
        Seed to use for random operations with NumPy and pandas.
2414
    Returns
2415
    -------
2416
    geopandas.GeoDataFrame
2417
        GeoDataFrame containing OSM buildings data with start-up date added.
2418
    """
2419
    rng = default_rng(seed=seed)
2420
2421
    date_range = pd.date_range(start=start, end=end, freq="1D")
2422
2423
    return buildings_gdf.assign(
2424
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2425
    )
2426
2427
2428
@timer_func
2429
def allocate_scenarios(
2430
    mastr_gdf: gpd.GeoDataFrame,
2431
    buildings_gdf: gpd.GeoDataFrame,
2432
    last_scenario_gdf: gpd.GeoDataFrame,
2433
    scenario: str,
2434
):
2435
    """
2436
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2437
    Parameters
2438
    ----------
2439
    mastr_gdf : geopandas.GeoDataFrame
2440
        GeoDataFrame containing geocoded MaStR data.
2441
    buildings_gdf : geopandas.GeoDataFrame
2442
        GeoDataFrame containing OSM buildings data.
2443
    last_scenario_gdf : geopandas.GeoDataFrame
2444
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2445
        preceding scenario.
2446
    scenario : str
2447
        Scenario to desaggrgate and allocate.
2448
    Returns
2449
    -------
2450
    tuple
2451
        geopandas.GeoDataFrame
2452
            GeoDataFrame containing OSM buildings matched with pv generators.
2453
        pandas.DataFrame
2454
            DataFrame containing pv rooftop capacity per grid id.
2455
    """
2456
    grid_districts_gdf = grid_districts(EPSG)
2457
2458
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
2459
2460
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
2461
        grid_districts_gdf,
2462
        federal_state_gdf,
2463
    )
2464
2465
    buildings_overlay_gdf = add_overlay_id_to_buildings(
2466
        buildings_gdf,
2467
        grid_federal_state_gdf,
2468
    )
2469
2470
    valid_buildings_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
2471
2472
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2473
2474
    logger.debug(
2475
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2476
    )
2477
2478
    last_scenario_gdf = determine_end_of_life_gens(
2479
        last_scenario_gdf,
2480
        SCENARIO_TIMESTAMP[scenario],
2481
        PV_ROOFTOP_LIFETIME,
2482
    )
2483
2484
    buildings_gdf = calculate_max_pv_cap_per_building(
2485
        valid_buildings_gdf,
2486
        last_scenario_gdf,
2487
        PV_CAP_PER_SQ_M,
2488
        ROOF_FACTOR,
2489
    )
2490
2491
    mastr_gdf = calculate_building_load_factor(
2492
        mastr_gdf,
2493
        buildings_gdf,
2494
    )
2495
2496
    probabilities_dict = probabilities(
2497
        mastr_gdf,
2498
        cap_ranges=CAP_RANGES,
2499
    )
2500
2501
    cap_share_dict = cap_share_per_cap_range(
2502
        mastr_gdf,
2503
        cap_ranges=CAP_RANGES,
2504
    )
2505
2506
    load_factor_dict = mean_load_factor_per_cap_range(
2507
        mastr_gdf,
2508
        cap_ranges=CAP_RANGES,
2509
    )
2510
2511
    building_area_range_dict = building_area_range_per_cap_range(
2512
        mastr_gdf,
2513
        cap_ranges=CAP_RANGES,
2514
        min_building_size=MIN_BUILDING_SIZE,
2515
        upper_quantile=UPPER_QUNATILE,
2516
        lower_quantile=LOWER_QUANTILE,
2517
    )
2518
2519
    allocated_buildings_gdf = desaggregate_pv(
2520
        buildings_gdf=buildings_gdf,
2521
        cap_df=cap_per_bus_id_df,
2522
        prob_dict=probabilities_dict,
2523
        cap_share_dict=cap_share_dict,
2524
        building_area_range_dict=building_area_range_dict,
2525
        load_factor_dict=load_factor_dict,
2526
        seed=SEED,
2527
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2528
    )
2529
2530
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2531
2532
    meta_buildings_gdf = frame_to_numeric(
2533
        add_buildings_meta_data(
2534
            allocated_buildings_gdf,
2535
            probabilities_dict,
2536
            SEED,
2537
        )
2538
    )
2539
2540
    return (
2541
        add_start_up_date(
2542
            meta_buildings_gdf,
2543
            start=last_scenario_gdf.start_up_date.max(),
2544
            end=SCENARIO_TIMESTAMP[scenario],
2545
            seed=SEED,
2546
        ),
2547
        cap_per_bus_id_df,
2548
    )
2549
2550
2551
class EgonPowerPlantPvRoofBuildingScenario(Base):
2552
    __tablename__ = "egon_power_plants_pv_roof_building"
2553
    __table_args__ = {"schema": "supply"}
2554
2555
    index = Column(Integer, primary_key=True, index=True)
2556
    scenario = Column(String)
2557
    building_id = Column(Integer)
2558
    gens_id = Column(String, nullable=True)
2559
    capacity = Column(Float)
2560
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2561
    hauptausrichtung = Column(String)
2562
    hauptausrichtung_neigungswinkel = Column(String)
2563
    voltage_level = Column(Integer)
2564
2565
2566
def create_scenario_table(buildings_gdf):
2567
    """Create mapping table pv_unit <-> building for scenario"""
2568
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2569
        bind=engine, checkfirst=True
2570
    )
2571
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2572
        bind=engine, checkfirst=True
2573
    )
2574
2575
    buildings_gdf.rename(columns=COLS_TO_RENAME).assign(
2576
        capacity=buildings_gdf.capacity.div(10**3)  # kW -> MW
2577
    )[COLS_TO_EXPORT].reset_index().to_sql(
2578
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2579
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2580
        con=db.engine(),
2581
        if_exists="append",
2582
        index=False,
2583
    )
2584
2585
2586
def geocode_mastr_data():
2587
    """
2588
    Read PV rooftop data from MaStR CSV
2589
    TODO: the source will be replaced as soon as the MaStR data is available
2590
     in DB.
2591
    """
2592
    mastr_df = mastr_data(
2593
        MASTR_INDEX_COL,
2594
        MASTR_RELEVANT_COLS,
2595
        MASTR_DTYPES,
2596
        MASTR_PARSE_DATES,
2597
    )
2598
2599
    clean_mastr_df = clean_mastr_data(
2600
        mastr_df,
2601
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2602
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2603
        seed=SEED,
2604
        rounding=ROUNDING,
2605
    )
2606
2607
    geocoding_df = geocoding_data(clean_mastr_df)
2608
2609
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2610
2611
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2612
2613
    create_geocoded_table(geocode_gdf)
2614
2615
2616
def pv_rooftop_to_buildings():
2617
    """Main script, executed as task"""
2618
2619
    mastr_gdf = load_mastr_data()
2620
2621
    buildings_gdf = load_building_data()
2622
2623
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2624
        mastr_gdf, buildings_gdf
2625
    )
2626
2627
    all_buildings_gdf = (
2628
        desagg_mastr_gdf.assign(scenario="status_quo")
2629
        .reset_index()
2630
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2631
    )
2632
2633
    scenario_buildings_gdf = all_buildings_gdf.copy()
2634
2635
    cap_per_bus_id_df = pd.DataFrame()
2636
2637
    for scenario in SCENARIOS:
2638
        logger.debug(f"Desaggregating scenario {scenario}.")
2639
        (
2640
            scenario_buildings_gdf,
2641
            cap_per_bus_id_scenario_df,
2642
        ) = allocate_scenarios(  # noqa: F841
2643
            desagg_mastr_gdf,
2644
            desagg_buildings_gdf,
2645
            scenario_buildings_gdf,
2646
            scenario,
2647
        )
2648
2649
        all_buildings_gdf = gpd.GeoDataFrame(
2650
            pd.concat(
2651
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2652
            ),
2653
            crs=scenario_buildings_gdf.crs,
2654
            geometry="geom",
2655
        )
2656
2657
        cap_per_bus_id_df = pd.concat(
2658
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2659
        )
2660
2661
    # export scenario
2662
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2663