Passed
Pull Request — dev (#809)
by
unknown
02:09
created

geocode_mastr_data()   A

Complexity

Conditions 1

Size

Total Lines 28
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 28
rs 9.6
c 0
b 0
f 0
cc 1
nop 0
1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035"]  # , "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(30 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "building_id",
193
    "gens_id",
194
    "capacity",
195
    "einheitliche_ausrichtung_und_neigungswinkel",
196
    "hauptausrichtung",
197
    "hauptausrichtung_neigungswinkel",
198
    "voltage_level",
199
]
200
201
INCLUDE_SYNTHETIC_BUILDINGS = False
202
ONLY_BUILDINGS_WITH_DEMAND = True
203
ASSURE_MINIMUM_BUILDING_SIZE = True
204
205
206
def timer_func(func):
207
    @wraps(func)
208
    def timeit_wrapper(*args, **kwargs):
209
        start_time = perf_counter()
210
        result = func(*args, **kwargs)
211
        end_time = perf_counter()
212
        total_time = end_time - start_time
213
        print(f"Function {func.__name__} took {total_time:.4f} seconds.")
214
        return result
215
216
    return timeit_wrapper
217
218
219
@timer_func
220
def mastr_data(
221
    index_col: str | int | list[str] | list[int],
222
    usecols: list[str],
223
    dtype: dict[str, Any] | None,
224
    parse_dates: list[str] | None,
225
) -> pd.DataFrame:
226
    """
227
    Read MaStR data from csv.
228
229
    Parameters
230
    -----------
231
    index_col : str, int or list of str or int
232
        Column(s) to use as the row labels of the DataFrame.
233
    usecols : list of str
234
        Return a subset of the columns.
235
    dtype : dict of column (str) -> type (any), optional
236
        Data type for data or columns.
237
    parse_dates : list of names (str), optional
238
        Try to parse given columns to datetime.
239
    Returns
240
    -------
241
    pandas.DataFrame
242
        DataFrame containing MaStR data.
243
    """
244
    mastr_path = Path(
245
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
246
    ).resolve()
247
248
    mastr_df = pd.read_csv(
249
        mastr_path,
250
        index_col=index_col,
251
        usecols=usecols,
252
        dtype=dtype,
253
        parse_dates=parse_dates,
254
    )
255
256
    mastr_df = mastr_df.loc[
257
        (mastr_df.StatisikFlag == "B")
258
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
259
        & (mastr_df.Land == "Deutschland")
260
        & (mastr_df.Lage == "BaulicheAnlagen")
261
    ]
262
263
    if (
264
        config.settings()["egon-data"]["--dataset-boundary"]
265
        == "Schleswig-Holstein"
266
    ):
267
        init_len = len(mastr_df)
268
269
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
270
271
        logger.info(
272
            f"Using only MaStR data within Schleswig-Holstein. "
273
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
274
        )
275
276
    logger.debug("MaStR data loaded.")
277
278
    return mastr_df
279
280
281
@timer_func
282
def clean_mastr_data(
283
    mastr_df: pd.DataFrame,
284
    max_realistic_pv_cap: int | float,
285
    min_realistic_pv_cap: int | float,
286
    rounding: int,
287
    seed: int,
288
) -> pd.DataFrame:
289
    """
290
    Clean the MaStR data from implausible data.
291
292
    * Drop MaStR ID duplicates.
293
    * Drop generators with implausible capacities.
294
    * Drop generators without any kind of start-up date.
295
    * Clean up Standort column and capacity.
296
297
    Parameters
298
    -----------
299
    mastr_df : pandas.DataFrame
300
        DataFrame containing MaStR data.
301
    max_realistic_pv_cap : int or float
302
        Maximum capacity, which is considered to be realistic.
303
    min_realistic_pv_cap : int or float
304
        Minimum capacity, which is considered to be realistic.
305
    rounding : int
306
        Rounding to use when cleaning up capacity. E.g. when
307
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
308
    seed : int
309
        Seed to use for random operations with NumPy and pandas.
310
    Returns
311
    -------
312
    pandas.DataFrame
313
        DataFrame containing cleaned MaStR data.
314
    """
315
    init_len = len(mastr_df)
316
317
    # drop duplicates
318
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
319
320
    # drop invalid entries in standort
321
    index_to_drop = mastr_df.loc[
322
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
323
    ].index
324
325
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
326
327
    df = mastr_df[
328
        [
329
            "Bruttoleistung",
330
            "Bruttoleistung_extended",
331
            "Nettonennleistung",
332
            "zugeordneteWirkleistungWechselrichter",
333
            "InstallierteLeistung",
334
        ]
335
    ].round(rounding)
336
337
    # use only the smallest capacity rating if multiple are given
338
    mastr_df = mastr_df.assign(
339
        capacity=[
340
            most_plausible(p_tub, min_realistic_pv_cap)
341
            for p_tub in df.itertuples(index=False)
342
        ]
343
    )
344
345
    # drop generators without any capacity info
346
    # and capacity of zero
347
    # and if the capacity is > 23.5 MW, because
348
    # Germanies largest rooftop PV is 23 MW
349
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
350
    mastr_df = mastr_df.loc[
351
        (~mastr_df.capacity.isna())
352
        & (mastr_df.capacity <= max_realistic_pv_cap)
353
        & (mastr_df.capacity > min_realistic_pv_cap)
354
    ]
355
356
    # get zip and municipality
357
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
358
        mastr_df.Standort.astype(str)
359
        .apply(
360
            zip_and_municipality_from_standort,
361
            args=(VERBOSE,),
362
        )
363
        .tolist(),
364
        index=mastr_df.index,
365
    )
366
367
    # drop invalid entries
368
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
369
370
    # add ", Deutschland" just in case
371
    mastr_df = mastr_df.assign(
372
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
373
    )
374
375
    # get consistent start-up date
376
    mastr_df = mastr_df.assign(
377
        start_up_date=mastr_df.Inbetriebnahmedatum,
378
    )
379
380
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
381
        mastr_df.start_up_date.isna()
382
    ].assign(
383
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
384
            mastr_df.start_up_date.isna()
385
        ]
386
    )
387
388
    # randomly and weighted fill missing start-up dates
389
    pool = mastr_df.loc[
390
        ~mastr_df.start_up_date.isna()
391
    ].start_up_date.to_numpy()
392
393
    size = len(mastr_df) - len(pool)
394
395
    if size > 0:
396
        np.random.seed(seed)
397
398
        choice = np.random.choice(
399
            pool,
400
            size=size,
401
            replace=False,
402
        )
403
404
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
405
            mastr_df.start_up_date.isna()
406
        ].assign(start_up_date=choice)
407
408
        logger.info(
409
            f"Randomly and weigthed added start-up date to {size} generators."
410
        )
411
412
    mastr_df = mastr_df.assign(
413
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
414
    )
415
416
    end_len = len(mastr_df)
417
    logger.debug(
418
        f"Dropped {init_len - end_len} "
419
        f"({((init_len - end_len) / init_len) * 100:g}%)"
420
        f" of {init_len} rows from MaStR DataFrame."
421
    )
422
423
    return mastr_df
424
425
426
def zip_and_municipality_from_standort(
427
    standort: str,
428
    verbose: bool = False,
429
) -> tuple[str, bool]:
430
    """
431
    Get zip code and municipality from Standort string split into a list.
432
    Parameters
433
    -----------
434
    standort : str
435
        Standort as given from MaStR data.
436
    verbose : bool
437
        Logs additional info if True.
438
    Returns
439
    -------
440
    str
441
        Standort with only the zip code and municipality
442
        as well a ', Germany' added.
443
    """
444
    if verbose:
445
        logger.debug(f"Uncleaned String: {standort}")
446
447
    standort_list = standort.split()
448
449
    found = False
450
    count = 0
451
452
    for count, elem in enumerate(standort_list):
453
        if len(elem) != 5:
454
            continue
455
        if not elem.isnumeric():
456
            continue
457
458
        found = True
459
460
        break
461
462
    if found:
463
        cleaned_str = " ".join(standort_list[count:])
464
465
        if verbose:
466
            logger.debug(f"Cleaned String:   {cleaned_str}")
467
468
        return cleaned_str, found
469
470
    logger.warning(
471
        "Couldn't identify zip code. This entry will be dropped."
472
        f" Original standort: {standort}."
473
    )
474
475
    return standort, found
476
477
478
def most_plausible(
479
    p_tub: tuple,
480
    min_realistic_pv_cap: int | float,
481
) -> float:
482
    """
483
    Try to determine the most plausible capacity.
484
    Try to determine the most plausible capacity from a given
485
    generator from MaStR data.
486
    Parameters
487
    -----------
488
    p_tub : tuple
489
        Tuple containing the different capacities given in
490
        the MaStR data.
491
    min_realistic_pv_cap : int or float
492
        Minimum capacity, which is considered to be realistic.
493
    Returns
494
    -------
495
    float
496
        Capacity of the generator estimated as the most realistic.
497
    """
498
    count = Counter(p_tub).most_common(3)
499
500
    if len(count) == 1:
501
        return count[0][0]
502
503
    val1 = count[0][0]
504
    val2 = count[1][0]
505
506
    if len(count) == 2:
507
        min_val = min(val1, val2)
508
        max_val = max(val1, val2)
509
    else:
510
        val3 = count[2][0]
511
512
        min_val = min(val1, val2, val3)
513
        max_val = max(val1, val2, val3)
514
515
    if min_val < min_realistic_pv_cap:
516
        return max_val
517
518
    return min_val
519
520
521
def geocoder(
522
    user_agent: str,
523
    min_delay_seconds: int,
524
) -> RateLimiter:
525
    """
526
    Setup Nominatim geocoding class.
527
    Parameters
528
    -----------
529
    user_agent : str
530
        The app name.
531
    min_delay_seconds : int
532
        Delay in seconds to use between requests to Nominatim.
533
        A minimum of 1 is advised.
534
    Returns
535
    -------
536
    geopy.extra.rate_limiter.RateLimiter
537
        Nominatim RateLimiter geocoding class to use for geocoding.
538
    """
539
    locator = Nominatim(user_agent=user_agent)
540
    return RateLimiter(
541
        locator.geocode,
542
        min_delay_seconds=min_delay_seconds,
543
    )
544
545
546
def geocoding_data(
547
    clean_mastr_df: pd.DataFrame,
548
) -> pd.DataFrame:
549
    """
550
    Setup DataFrame to geocode.
551
    Parameters
552
    -----------
553
    clean_mastr_df : pandas.DataFrame
554
        DataFrame containing cleaned MaStR data.
555
    Returns
556
    -------
557
    pandas.DataFrame
558
        DataFrame containing all unique combinations of
559
        zip codes with municipalities for geocoding.
560
    """
561
    return pd.DataFrame(
562
        data=clean_mastr_df.zip_and_municipality.unique(),
563
        columns=["zip_and_municipality"],
564
    )
565
566
567
@timer_func
568
def geocode_data(
569
    geocoding_df: pd.DataFrame,
570
    ratelimiter: RateLimiter,
571
    epsg: int,
572
) -> gpd.GeoDataFrame:
573
    """
574
    Geocode zip code and municipality.
575
    Extract latitude, longitude and altitude.
576
    Transfrom latitude and longitude to shapely
577
    Point and return a geopandas GeoDataFrame.
578
    Parameters
579
    -----------
580
    geocoding_df : pandas.DataFrame
581
        DataFrame containing all unique combinations of
582
        zip codes with municipalities for geocoding.
583
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
584
        Nominatim RateLimiter geocoding class to use for geocoding.
585
    epsg : int
586
        EPSG ID to use as CRS.
587
    Returns
588
    -------
589
    geopandas.GeoDataFrame
590
        GeoDataFrame containing all unique combinations of
591
        zip codes with municipalities with matching geolocation.
592
    """
593
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
594
595
    geocode_df = geocoding_df.assign(
596
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
597
    )
598
599
    geocode_df = geocode_df.assign(
600
        point=geocode_df.location.apply(
601
            lambda loc: tuple(loc.point) if loc else None
602
        )
603
    )
604
605
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
606
        geocode_df.point.tolist(), index=geocode_df.index
607
    )
608
609
    return gpd.GeoDataFrame(
610
        geocode_df,
611
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
612
        crs=f"EPSG:{epsg}",
613
    )
614
615
616
def merge_geocode_with_mastr(
617
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
618
) -> gpd.GeoDataFrame:
619
    """
620
    Merge geometry to original mastr data.
621
    Parameters
622
    -----------
623
    clean_mastr_df : pandas.DataFrame
624
        DataFrame containing cleaned MaStR data.
625
    geocode_gdf : geopandas.GeoDataFrame
626
        GeoDataFrame containing all unique combinations of
627
        zip codes with municipalities with matching geolocation.
628
    Returns
629
    -------
630
    gepandas.GeoDataFrame
631
        GeoDataFrame containing cleaned MaStR data with
632
        matching geolocation from geocoding.
633
    """
634
    return gpd.GeoDataFrame(
635
        clean_mastr_df.merge(
636
            geocode_gdf[["zip_and_municipality", "geometry"]],
637
            how="left",
638
            left_on="zip_and_municipality",
639
            right_on="zip_and_municipality",
640
        ),
641
        crs=geocode_gdf.crs,
642
    ).set_index(clean_mastr_df.index)
643
644
645
def drop_invalid_entries_from_gdf(
646
    gdf: gpd.GeoDataFrame,
647
) -> gpd.GeoDataFrame:
648
    """
649
    Drop invalid entries from geopandas GeoDataFrame.
650
    TODO: how to omit the logging from geos here???
651
    Parameters
652
    -----------
653
    gdf : geopandas.GeoDataFrame
654
        GeoDataFrame to be checked for validity.
655
    Returns
656
    -------
657
    gepandas.GeoDataFrame
658
        GeoDataFrame with rows with invalid geometries
659
        dropped.
660
    """
661
    valid_gdf = gdf.loc[gdf.is_valid]
662
663
    logger.debug(
664
        f"{len(gdf) - len(valid_gdf)} "
665
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
666
        f"of {len(gdf)} values were invalid and are dropped."
667
    )
668
669
    return valid_gdf
670
671
672
@timer_func
673
def municipality_data() -> gpd.GeoDataFrame:
674
    """
675
    Get municipality data from eGo^n Database.
676
    Returns
677
    -------
678
    gepandas.GeoDataFrame
679
        GeoDataFrame with municipality data.
680
    """
681
    with db.session_scope() as session:
682
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
683
684
    return gpd.read_postgis(
685
        query.statement, query.session.bind, index_col="ags"
686
    )
687
688
689
@timer_func
690
def add_ags_to_gens(
691
    valid_mastr_gdf: gpd.GeoDataFrame,
692
    municipalities_gdf: gpd.GeoDataFrame,
693
) -> gpd.GeoDataFrame:
694
    """
695
    Add information about AGS ID to generators.
696
    Parameters
697
    -----------
698
    valid_mastr_gdf : geopandas.GeoDataFrame
699
        GeoDataFrame with valid and cleaned MaStR data.
700
    municipalities_gdf : geopandas.GeoDataFrame
701
        GeoDataFrame with municipality data.
702
    Returns
703
    -------
704
    gepandas.GeoDataFrame
705
        GeoDataFrame with valid and cleaned MaStR data
706
        with AGS ID added.
707
    """
708
    return valid_mastr_gdf.sjoin(
709
        municipalities_gdf,
710
        how="left",
711
        predicate="intersects",
712
    ).rename(columns={"index_right": "ags"})
713
714
715
def drop_gens_outside_muns(
716
    valid_mastr_gdf: gpd.GeoDataFrame,
717
) -> gpd.GeoDataFrame:
718
    """
719
    Drop all generators outside of municipalities.
720
    Parameters
721
    -----------
722
    valid_mastr_gdf : geopandas.GeoDataFrame
723
        GeoDataFrame with valid and cleaned MaStR data.
724
    Returns
725
    -------
726
    gepandas.GeoDataFrame
727
        GeoDataFrame with valid and cleaned MaStR data
728
        with generatos without an AGS ID dropped.
729
    """
730
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
731
732
    logger.debug(
733
        f"{len(valid_mastr_gdf) - len(gdf)} "
734
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
735
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
736
        " and are therefore dropped."
737
    )
738
739
    return gdf
740
741
742
class EgonMastrPvRoofGeocoded(Base):
743
    __tablename__ = "egon_mastr_pv_roof_geocoded"
744
    __table_args__ = {"schema": "supply"}
745
746
    zip_and_municipality = Column(String, primary_key=True, index=True)
747
    location = Column(String)
748
    point = Column(String)
749
    latitude = Column(Float)
750
    longitude = Column(Float)
751
    altitude = Column(Float)
752
    geometry = Column(Geometry(srid=EPSG))
753
754
755
def create_geocoded_table(geocode_gdf):
756
    """
757
    Create geocoded table mastr pv rooftop
758
    Parameters
759
    -----------
760
    geocode_gdf : geopandas.GeoDataFrame
761
        GeoDataFrame containing geocoding information on pv rooftop locations.
762
    """
763
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
764
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
765
766
    geocode_gdf.to_postgis(
767
        name=EgonMastrPvRoofGeocoded.__table__.name,
768
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
769
        con=db.engine(),
770
        if_exists="append",
771
        index=False,
772
        # dtype={}
773
    )
774
775
776
def geocoded_data_from_db(
777
    epsg: str | int,
778
) -> gpd.GeoDataFrame:
779
    """
780
    Read OSM buildings data from eGo^n Database.
781
    Parameters
782
    -----------
783
    to_crs : pyproj.crs.crs.CRS
784
        CRS to transform geometries to.
785
    Returns
786
    -------
787
    geopandas.GeoDataFrame
788
        GeoDataFrame containing OSM buildings data.
789
    """
790
    with db.session_scope() as session:
791
        query = session.query(
792
            EgonMastrPvRoofGeocoded.zip_and_municipality,
793
            EgonMastrPvRoofGeocoded.geometry,
794
        )
795
796
    return gpd.read_postgis(
797
        query.statement, query.session.bind, geom_col="geometry"
798
    ).to_crs(f"EPSG:{epsg}")
799
800
801
def load_mastr_data():
802
    """Read PV rooftop data from MaStR CSV
803
    Note: the source will be replaced as soon as the MaStR data is available
804
    in DB.
805
    Returns
806
    -------
807
    geopandas.GeoDataFrame
808
        GeoDataFrame containing MaStR data with geocoded locations.
809
    """
810
    mastr_df = mastr_data(
811
        MASTR_INDEX_COL,
812
        MASTR_RELEVANT_COLS,
813
        MASTR_DTYPES,
814
        MASTR_PARSE_DATES,
815
    )
816
817
    clean_mastr_df = clean_mastr_data(
818
        mastr_df,
819
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
820
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
821
        seed=SEED,
822
        rounding=ROUNDING,
823
    )
824
825
    geocode_gdf = geocoded_data_from_db(EPSG)
826
827
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
828
829
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
830
831
    municipalities_gdf = municipality_data()
832
833
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
834
835
    return drop_gens_outside_muns(valid_mastr_gdf)
836
837
838
class OsmBuildingsFiltered(Base):
839
    __tablename__ = "osm_buildings_filtered"
840
    __table_args__ = {"schema": "openstreetmap"}
841
842
    osm_id = Column(BigInteger)
843
    amenity = Column(String)
844
    building = Column(String)
845
    name = Column(String)
846
    geom = Column(Geometry(srid=SRID), index=True)
847
    area = Column(Float)
848
    geom_point = Column(Geometry(srid=SRID), index=True)
849
    tags = Column(HSTORE)
850
    id = Column(BigInteger, primary_key=True, index=True)
851
852
853
@timer_func
854
def osm_buildings(
855
    to_crs: CRS,
856
) -> gpd.GeoDataFrame:
857
    """
858
    Read OSM buildings data from eGo^n Database.
859
    Parameters
860
    -----------
861
    to_crs : pyproj.crs.crs.CRS
862
        CRS to transform geometries to.
863
    Returns
864
    -------
865
    geopandas.GeoDataFrame
866
        GeoDataFrame containing OSM buildings data.
867
    """
868
    with db.session_scope() as session:
869
        query = session.query(
870
            OsmBuildingsFiltered.id,
871
            OsmBuildingsFiltered.area,
872
            OsmBuildingsFiltered.geom_point.label("geom"),
873
        )
874
875
    return gpd.read_postgis(
876
        query.statement, query.session.bind, index_col="id"
877
    ).to_crs(to_crs)
878
879
880
@timer_func
881
def synthetic_buildings(
882
    to_crs: CRS,
883
) -> gpd.GeoDataFrame:
884
    """
885
    Read synthetic buildings data from eGo^n Database.
886
    Parameters
887
    -----------
888
    to_crs : pyproj.crs.crs.CRS
889
        CRS to transform geometries to.
890
    Returns
891
    -------
892
    geopandas.GeoDataFrame
893
        GeoDataFrame containing OSM buildings data.
894
    """
895
    with db.session_scope() as session:
896
        query = session.query(
897
            OsmBuildingsSynthetic.id,
898
            OsmBuildingsSynthetic.area,
899
            OsmBuildingsSynthetic.geom_point.label("geom"),
900
        )
901
902
    return gpd.read_postgis(
903
        query.statement, query.session.bind, index_col="id"
904
    ).to_crs(to_crs)
905
906
907
@timer_func
908
def add_ags_to_buildings(
909
    buildings_gdf: gpd.GeoDataFrame,
910
    municipalities_gdf: gpd.GeoDataFrame,
911
) -> gpd.GeoDataFrame:
912
    """
913
    Add information about AGS ID to buildings.
914
    Parameters
915
    -----------
916
    buildings_gdf : geopandas.GeoDataFrame
917
        GeoDataFrame containing OSM buildings data.
918
    municipalities_gdf : geopandas.GeoDataFrame
919
        GeoDataFrame with municipality data.
920
    Returns
921
    -------
922
    gepandas.GeoDataFrame
923
        GeoDataFrame containing OSM buildings data
924
        with AGS ID added.
925
    """
926
    return buildings_gdf.sjoin(
927
        municipalities_gdf,
928
        how="left",
929
        predicate="intersects",
930
    ).rename(columns={"index_right": "ags"})
931
932
933
def drop_buildings_outside_muns(
934
    buildings_gdf: gpd.GeoDataFrame,
935
) -> gpd.GeoDataFrame:
936
    """
937
    Drop all buildings outside of municipalities.
938
    Parameters
939
    -----------
940
    buildings_gdf : geopandas.GeoDataFrame
941
        GeoDataFrame containing OSM buildings data.
942
    Returns
943
    -------
944
    gepandas.GeoDataFrame
945
        GeoDataFrame containing OSM buildings data
946
        with buildings without an AGS ID dropped.
947
    """
948
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
949
950
    logger.debug(
951
        f"{len(buildings_gdf) - len(gdf)} "
952
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
953
        f"of {len(buildings_gdf)} values are outside of the municipalities "
954
        "and are therefore dropped."
955
    )
956
957
    return gdf
958
959
960
def egon_building_peak_loads():
961
    sql = f"""
962
    SELECT building_id
963
    FROM demand.egon_building_peak_loads
964
    """
965
966
    return db.select_dataframe(sql).building_id.astype(int).sort_values()
967
968
969
@timer_func
970
def load_building_data():
971
    """
972
    Read buildings from DB
973
    Tables:
974
975
    * `openstreetmap.osm_buildings_filtered` (from OSM)
976
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
977
978
    Use column `id` for both as it is unique hence you concat both datasets. If
979
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
980
981
    Returns
982
    -------
983
    gepandas.GeoDataFrame
984
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
985
        dropped.
986
    """
987
988
    municipalities_gdf = municipality_data()
989
990
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
991
992
    if INCLUDE_SYNTHETIC_BUILDINGS:
993
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
994
995
        buildings_gdf = gpd.GeoDataFrame(
996
            pd.concat(
997
                [
998
                    osm_buildings_gdf,
999
                    synthetic_buildings_gdf,
1000
                ]
1001
            ),
1002
            geometry="geom",
1003
            crs=osm_buildings_gdf.crs,
1004
        ).rename(columns={"area": "building_area"})
1005
1006
    else:
1007
        buildings_gdf = osm_buildings_gdf.rename(
1008
            columns={"area": "building_area"}
1009
        )
1010
1011
    if ONLY_BUILDINGS_WITH_DEMAND:
1012
        building_ids = egon_building_peak_loads()
1013
1014
        init_len = len(building_ids)
1015
1016
        building_ids = [
1017
            b_id for b_id in building_ids if b_id in buildings_gdf.index
1018
        ]
1019
1020
        end_len = len(building_ids)
1021
1022
        logger.debug(
1023
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of IDs within OSM"
1024
            f" buildings."
1025
        )
1026
1027
        buildings_gdf = buildings_gdf.loc[building_ids]
1028
1029
    if ASSURE_MINIMUM_BUILDING_SIZE:
1030
        buildings_gdf = buildings_gdf.loc[buildings_gdf]
1031
1032
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1033
1034
    logger.debug("Loaded buildings.")
1035
1036
    return drop_buildings_outside_muns(buildings_ags_gdf)
1037
1038
1039
@timer_func
1040
def sort_and_qcut_df(
1041
    df: pd.DataFrame | gpd.GeoDataFrame,
1042
    col: str,
1043
    q: int,
1044
) -> pd.DataFrame | gpd.GeoDataFrame:
1045
    """
1046
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1047
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1048
    Parameters
1049
    -----------
1050
    df : pandas.DataFrame or geopandas.GeoDataFrame
1051
        (Geo)DataFrame to sort and qcut.
1052
    col : str
1053
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1054
    q : int
1055
        Number of quantiles.
1056
    Returns
1057
    -------
1058
    pandas.DataFrame or gepandas.GeoDataFrame
1059
        Sorted and qcut (Geo)DataFrame.
1060
    """
1061
    df = df.sort_values(col, ascending=True)
1062
1063
    return df.assign(
1064
        quant=pd.qcut(
1065
            df[col],
1066
            q=q,
1067
            labels=range(q),
1068
        )
1069
    )
1070
1071
1072
@timer_func
1073
def allocate_pv(
1074
    q_mastr_gdf: gpd.GeoDataFrame,
1075
    q_buildings_gdf: gpd.GeoDataFrame,
1076
    seed: int,
1077
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1078
    """
1079
    Allocate the MaStR pv generators to the OSM buildings.
1080
    This will determine a building for each pv generator if there are more
1081
    buildings than generators within a given AGS. Primarily generators are
1082
    distributed with the same qunatile as the buildings. Multiple assignment
1083
    is excluded.
1084
    Parameters
1085
    -----------
1086
    q_mastr_gdf : geopandas.GeoDataFrame
1087
        GeoDataFrame containing geocoded and qcut MaStR data.
1088
    q_buildings_gdf : geopandas.GeoDataFrame
1089
        GeoDataFrame containing qcut OSM buildings data.
1090
    seed : int
1091
        Seed to use for random operations with NumPy and pandas.
1092
    Returns
1093
    -------
1094
    tuple with two geopandas.GeoDataFrame s
1095
        GeoDataFrame containing MaStR data allocated to building IDs.
1096
        GeoDataFrame containing building data allocated to MaStR IDs.
1097
    """
1098
    rng = default_rng(seed=seed)
1099
1100
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1101
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1102
1103
    ags_list = q_buildings_gdf.ags.unique()
1104
1105
    num_ags = len(ags_list)
1106
1107
    t0 = perf_counter()
1108
1109
    for count, ags in enumerate(ags_list):
1110
1111
        buildings = q_buildings_gdf.loc[
1112
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1113
        ]
1114
        gens = q_mastr_gdf.loc[
1115
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1116
        ]
1117
1118
        len_build = len(buildings)
1119
        len_gens = len(gens)
1120
1121
        if len_build < len_gens:
1122
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1123
            logger.error(
1124
                f"There are {len_gens} generators and only {len_build}"
1125
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1126
                "generators were truncated to match the amount of buildings."
1127
            )
1128
1129
            assert len_build == len(gens)
1130
1131
        for quant in gens.quant.unique():
1132
            q_buildings = buildings.loc[
1133
                (buildings.quant == quant) & (buildings.gens_id.isna())
1134
            ]
1135
            q_gens = gens.loc[gens.quant == quant]
1136
1137
            len_build = len(q_buildings)
1138
            len_gens = len(q_gens)
1139
1140
            if len_build < len_gens:
1141
                delta = len_gens - len_build
1142
1143
                logger.warning(
1144
                    f"There are {len_gens} generators and only {len_build} "
1145
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1146
                    f"buildings from AGS {ags} will be added randomly."
1147
                )
1148
1149
                add_buildings = pd.Index(
1150
                    rng.choice(
1151
                        buildings.loc[
1152
                            (buildings.quant != quant)
1153
                            & (buildings.gens_id.isna())
1154
                        ].index,
1155
                        size=delta,
1156
                        replace=False,
1157
                    )
1158
                )
1159
1160
                q_buildings = buildings.loc[
1161
                    q_buildings.index.append(add_buildings)
1162
                ]
1163
1164
                assert len(q_buildings) == len_gens
1165
1166
            chosen_buildings = pd.Index(
1167
                rng.choice(
1168
                    q_buildings.index,
1169
                    size=len_gens,
1170
                    replace=False,
1171
                )
1172
            )
1173
1174
            # q_mastr_gdf.loc[q_gens.index] = q_mastr_gdf.loc[
1175
            #     q_gens.index
1176
            # ].assign(building_id=chosen_buildings)
1177
1178
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1179
1180
        if count % 100 == 0:
1181
            logger.debug(
1182
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1183
                f"{perf_counter() - t0:g} seconds."
1184
            )
1185
1186
            t0 = perf_counter()
1187
1188
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1189
1190
    q_mastr_gdf.loc[
1191
        assigned_buildings.gens_id, "building_id"
1192
    ] = assigned_buildings.index
1193
1194
    logger.debug("Allocated status quo generators to buildings.")
1195
1196
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1197
1198
1199
def frame_to_numeric(
1200
    df: pd.DataFrame | gpd.GeoDataFrame,
1201
) -> pd.DataFrame | gpd.GeoDataFrame:
1202
    """
1203
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1204
    Parameters
1205
    ----------
1206
    df : pandas.DataFrame or geopandas.GeoDataFrame
1207
    Returns
1208
    -------
1209
    pandas.DataFrame or geopandas.GeoDataFrame
1210
    """
1211
    if str(df.index.dtype) == "object":
1212
        df.index = pd.to_numeric(df.index, errors="ignore")
1213
1214
    for col in df.columns:
1215
        if str(df[col].dtype) == "object":
1216
            df[col] = pd.to_numeric(df[col], errors="ignore")
1217
1218
    return df
1219
1220
1221
def validate_output(
1222
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1223
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1224
) -> None:
1225
    """
1226
    Validate output.
1227
1228
    * Validate that there are exactly as many buildings with a pv system as there are
1229
      pv systems with a building
1230
    * Validate that the building IDs with a pv system are the same building IDs as
1231
      assigned to the pv systems
1232
    * Validate that the pv system IDs with a building are the same pv system IDs as
1233
      assigned to the buildings
1234
1235
    Parameters
1236
    -----------
1237
    desagg_mastr_gdf : geopandas.GeoDataFrame
1238
        GeoDataFrame containing MaStR data allocated to building IDs.
1239
    desagg_buildings_gdf : geopandas.GeoDataFrame
1240
        GeoDataFrame containing building data allocated to MaStR IDs.
1241
    """
1242
    assert len(
1243
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1244
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1245
    assert (
1246
        np.sort(
1247
            desagg_mastr_gdf.loc[
1248
                ~desagg_mastr_gdf.building_id.isna()
1249
            ].building_id.unique()
1250
        )
1251
        == np.sort(
1252
            desagg_buildings_gdf.loc[
1253
                ~desagg_buildings_gdf.gens_id.isna()
1254
            ].index.unique()
1255
        )
1256
    ).all()
1257
    assert (
1258
        np.sort(
1259
            desagg_mastr_gdf.loc[
1260
                ~desagg_mastr_gdf.building_id.isna()
1261
            ].index.unique()
1262
        )
1263
        == np.sort(
1264
            desagg_buildings_gdf.loc[
1265
                ~desagg_buildings_gdf.gens_id.isna()
1266
            ].gens_id.unique()
1267
        )
1268
    ).all()
1269
1270
    logger.debug("Validated output.")
1271
1272
1273
def drop_unallocated_gens(
1274
    gdf: gpd.GeoDataFrame,
1275
) -> gpd.GeoDataFrame:
1276
    """
1277
    Drop generators which did not get allocated.
1278
1279
    Parameters
1280
    -----------
1281
    gdf : geopandas.GeoDataFrame
1282
        GeoDataFrame containing MaStR data allocated to building IDs.
1283
    Returns
1284
    -------
1285
    geopandas.GeoDataFrame
1286
        GeoDataFrame containing MaStR data with generators dropped which did not get
1287
        allocated.
1288
    """
1289
    init_len = len(gdf)
1290
    gdf = gdf.loc[~gdf.building_id.isna()]
1291
    end_len = len(gdf)
1292
1293
    logger.debug(
1294
        f"Dropped {init_len - end_len} "
1295
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1296
        f" of {init_len} unallocated rows from MaStR DataFrame."
1297
    )
1298
1299
    return gdf
1300
1301
1302
@timer_func
1303
def allocate_to_buildings(
1304
    mastr_gdf: gpd.GeoDataFrame,
1305
    buildings_gdf: gpd.GeoDataFrame,
1306
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1307
    """
1308
    Allocate status quo pv rooftop generators to buildings.
1309
    Parameters
1310
    -----------
1311
    mastr_gdf : geopandas.GeoDataFrame
1312
        GeoDataFrame containing MaStR data with geocoded locations.
1313
    buildings_gdf : geopandas.GeoDataFrame
1314
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1315
        dropped.
1316
    Returns
1317
    -------
1318
    tuple with two geopandas.GeoDataFrame s
1319
        GeoDataFrame containing MaStR data allocated to building IDs.
1320
        GeoDataFrame containing building data allocated to MaStR IDs.
1321
    """
1322
    logger.debug("Starting allocation of status quo.")
1323
1324
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1325
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1326
1327
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1328
        q_mastr_gdf, q_buildings_gdf, SEED
1329
    )
1330
1331
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1332
1333
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1334
1335
1336
@timer_func
1337
def grid_districts(
1338
    epsg: int,
1339
) -> gpd.GeoDataFrame:
1340
    """
1341
    Load mv grid district geo data from eGo^n Database as
1342
    geopandas.GeoDataFrame.
1343
    Parameters
1344
    -----------
1345
    epsg : int
1346
        EPSG ID to use as CRS.
1347
    Returns
1348
    -------
1349
    geopandas.GeoDataFrame
1350
        GeoDataFrame containing mv grid district ID and geo shapes data.
1351
    """
1352
    gdf = db.select_geodataframe(
1353
        """
1354
        SELECT bus_id, geom
1355
        FROM grid.egon_mv_grid_district
1356
        ORDER BY bus_id
1357
        """,
1358
        index_col="bus_id",
1359
        geom_col="geom",
1360
        epsg=epsg,
1361
    )
1362
1363
    gdf.index = gdf.index.astype(int)
1364
1365
    logger.debug("Grid districts loaded.")
1366
1367
    return gdf
1368
1369
1370
def scenario_data(
1371
    carrier: str = "solar_rooftop",
1372
    scenario: str = "eGon2035",
1373
) -> pd.DataFrame:
1374
    """
1375
    Get scenario capacity data from eGo^n Database.
1376
    Parameters
1377
    -----------
1378
    carrier : str
1379
        Carrier type to filter table by.
1380
    scenario : str
1381
        Scenario to filter table by.
1382
    Returns
1383
    -------
1384
    geopandas.GeoDataFrame
1385
        GeoDataFrame with scenario capacity data in GW.
1386
    """
1387
    with db.session_scope() as session:
1388
        query = session.query(EgonScenarioCapacities).filter(
1389
            EgonScenarioCapacities.carrier == carrier,
1390
            EgonScenarioCapacities.scenario_name == scenario,
1391
        )
1392
1393
    df = pd.read_sql(
1394
        query.statement, query.session.bind, index_col="index"
1395
    ).sort_index()
1396
1397
    logger.debug("Scenario capacity data loaded.")
1398
1399
    return df
1400
1401
1402 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1403
    __tablename__ = "vg250_lan"
1404
    __table_args__ = {"schema": "boundaries"}
1405
1406
    id = Column(BigInteger, primary_key=True, index=True)
1407
    ade = Column(BigInteger)
1408
    gf = Column(BigInteger)
1409
    bsg = Column(BigInteger)
1410
    ars = Column(String)
1411
    ags = Column(String)
1412
    sdv_ars = Column(String)
1413
    gen = Column(String)
1414
    bez = Column(String)
1415
    ibz = Column(BigInteger)
1416
    bem = Column(String)
1417
    nbd = Column(String)
1418
    sn_l = Column(String)
1419
    sn_r = Column(String)
1420
    sn_k = Column(String)
1421
    sn_v1 = Column(String)
1422
    sn_v2 = Column(String)
1423
    sn_g = Column(String)
1424
    fk_s3 = Column(String)
1425
    nuts = Column(String)
1426
    ars_0 = Column(String)
1427
    ags_0 = Column(String)
1428
    wsk = Column(String)
1429
    debkg_id = Column(String)
1430
    rs = Column(String)
1431
    sdv_rs = Column(String)
1432
    rs_0 = Column(String)
1433
    geometry = Column(Geometry(srid=EPSG), index=True)
1434
1435
1436
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1437
    """
1438
    Get feder state data from eGo^n Database.
1439
    Parameters
1440
    -----------
1441
    to_crs : pyproj.crs.crs.CRS
1442
        CRS to transform geometries to.
1443
    Returns
1444
    -------
1445
    geopandas.GeoDataFrame
1446
        GeoDataFrame with federal state data.
1447
    """
1448
    with db.session_scope() as session:
1449
        query = session.query(
1450
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1451
        )
1452
1453
    gdf = gpd.read_postgis(
1454
        query.statement, query.session.bind, index_col="id"
1455
    ).to_crs(to_crs)
1456
1457
    logger.debug("Federal State data loaded.")
1458
1459
    return gdf
1460
1461
1462
@timer_func
1463
def overlay_grid_districts_with_counties(
1464
    mv_grid_district_gdf: gpd.GeoDataFrame,
1465
    federal_state_gdf: gpd.GeoDataFrame,
1466
) -> gpd.GeoDataFrame:
1467
    """
1468
    Calculate the intersections of mv grid districts and counties.
1469
    Parameters
1470
    -----------
1471
    mv_grid_district_gdf : gpd.GeoDataFrame
1472
        GeoDataFrame containing mv grid district ID and geo shapes data.
1473
    federal_state_gdf : gpd.GeoDataFrame
1474
        GeoDataFrame with federal state data.
1475
    Returns
1476
    -------
1477
    geopandas.GeoDataFrame
1478
        GeoDataFrame containing OSM buildings data.
1479
    """
1480
    logger.debug(
1481
        "Calculating intersection overlay between mv grid districts and "
1482
        "counties. This may take a while..."
1483
    )
1484
1485
    gdf = gpd.overlay(
1486
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1487
        mv_grid_district_gdf.reset_index(),
1488
        how="intersection",
1489
        keep_geom_type=True,
1490
    )
1491
1492
    logger.debug("Done!")
1493
1494
    return gdf
1495
1496
1497
@timer_func
1498
def add_overlay_id_to_buildings(
1499
    buildings_gdf: gpd.GeoDataFrame,
1500
    grid_federal_state_gdf: gpd.GeoDataFrame,
1501
) -> gpd.GeoDataFrame:
1502
    """
1503
    Add information about overlay ID to buildings.
1504
    Parameters
1505
    -----------
1506
    buildings_gdf : geopandas.GeoDataFrame
1507
        GeoDataFrame containing OSM buildings data.
1508
    grid_federal_state_gdf : geopandas.GeoDataFrame
1509
        GeoDataFrame with intersection shapes between counties and grid districts.
1510
    Returns
1511
    -------
1512
    geopandas.GeoDataFrame
1513
        GeoDataFrame containing OSM buildings data with overlay ID added.
1514
    """
1515
    gdf = (
1516
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1517
        .sjoin(
1518
            grid_federal_state_gdf,
1519
            how="left",
1520
            predicate="intersects",
1521
        )
1522
        .rename(columns={"index_right": "overlay_id"})
1523
    )
1524
1525
    logger.debug("Added overlay ID to OSM buildings.")
1526
1527
    return gdf
1528
1529
1530
def drop_buildings_outside_grids(
1531
    buildings_gdf: gpd.GeoDataFrame,
1532
) -> gpd.GeoDataFrame:
1533
    """
1534
    Drop all buildings outside of grid areas.
1535
    Parameters
1536
    -----------
1537
    buildings_gdf : geopandas.GeoDataFrame
1538
        GeoDataFrame containing OSM buildings data.
1539
    Returns
1540
    -------
1541
    gepandas.GeoDataFrame
1542
        GeoDataFrame containing OSM buildings data
1543
        with buildings without an bus ID dropped.
1544
    """
1545
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1546
1547
    logger.debug(
1548
        f"{len(buildings_gdf) - len(gdf)} "
1549
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1550
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1551
        "and are therefore dropped."
1552
    )
1553
1554
    return gdf
1555
1556
1557
def buildings_area_per_overlay_id(
1558
    valid_buildings_gdf: gpd.GeoDataFrame,
1559
    grid_federal_state_gdf: gpd.GeoDataFrame,
1560
) -> pd.DataFrame:
1561
    """
1562
    Calculate total area of building per overlay ID.
1563
    TODO: This is very simplified at the moment. If possible add some kind
1564
     of weights to the area per building to differentiate between single-
1565
     family and multi-family houses as well as agricultural and industrial
1566
     buildings.
1567
    Parameters
1568
    -----------
1569
    valid_buildings_gdf : geopandas.GeoDataFrame
1570
        GeoDataFrame containing OSM buildings data with overlay ID added.
1571
    grid_federal_state_gdf : geopandas.GeoDataFrame
1572
        GeoDataFrame with intersection shapes between counties and grid districts.
1573
    Returns
1574
    -------
1575
    geopandas.GeoDataFrame
1576
        GeoDataFrame with grid data and total buildings area.
1577
    """
1578
    return grid_federal_state_gdf.merge(
1579
        valid_buildings_gdf[
1580
            [
1581
                "building_area",
1582
                "overlay_id",
1583
            ]
1584
        ]
1585
        .groupby("overlay_id")
1586
        .sum(),
1587
        how="left",
1588
        left_index=True,
1589
        right_index=True,
1590
    )
1591
1592
1593
def cap_per_bus_id(
1594
    scenario: str,
1595
    # overlay_gdf: gpd.GeoDataFrame,
1596
    # scenario_df: pd.DataFrame,
1597
    # conversion: int | float = 10**3,
1598
) -> pd.DataFrame:
1599
    """
1600
    Calculate total pv rooftop capacity per grid district dependent on
1601
    available buildings area.
1602
    TODO: This is very simplified at the moment as it assumes an evenly
1603
     distribution of generators depending on the available buildings
1604
     area per grid district.
1605
    Parameters
1606
    -----------
1607
    overlay_gdf : geopandas.GeoDataFrame
1608
        GeoDataFrame with grid data and total buildings area.
1609
    conversion : int, float
1610
        Conversion factor to match units. E.g. MW -> kW
1611
    Returns
1612
    -------
1613
    pandas.DataFrame
1614
        DataFrame with total rooftop capacity per mv grid.
1615
    """
1616
    targets = config.datasets()["solar_rooftop"]["targets"]
1617
1618
    sql = f"""
1619
    SELECT bus as bus_id, p_nom as capacity
1620
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1621
    WHERE carrier = 'solar_rooftop'
1622
    AND scn_name = '{scenario}'
1623
    """
1624
1625
    return db.select_dataframe(sql, index_col="bus_id")
1626
1627
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1628
    #
1629
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1630
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1631
    #
1632
    #     capacity = nuts_gdf.building_area.multiply(
1633
    #         cap / nuts_gdf.building_area.sum()
1634
    #     )
1635
    #
1636
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1637
    #         nuts_gdf.index
1638
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1639
    #
1640
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1641
1642
1643
def determine_end_of_life_gens(
1644
    mastr_gdf: gpd.GeoDataFrame,
1645
    scenario_timestamp: pd.Timestamp,
1646
    pv_rooftop_lifetime: pd.Timedelta,
1647
) -> gpd.GeoDataFrame:
1648
    """
1649
    Determine if an old PV system has reached its end of life.
1650
    Parameters
1651
    -----------
1652
    mastr_gdf : geopandas.GeoDataFrame
1653
        GeoDataFrame containing geocoded MaStR data.
1654
    scenario_timestamp : pandas.Timestamp
1655
        Timestamp at which the scenario takes place.
1656
    pv_rooftop_lifetime : pandas.Timedelta
1657
        Average expected lifetime of PV rooftop systems.
1658
    Returns
1659
    -------
1660
    geopandas.GeoDataFrame
1661
        GeoDataFrame containing geocoded MaStR data and info if the system
1662
        has reached its end of life.
1663
    """
1664
    mastr_gdf = mastr_gdf.assign(
1665
        age=scenario_timestamp - mastr_gdf.start_up_date
1666
    )
1667
1668
    logger.debug("Determined if pv rooftop systems reached their end of life.")
1669
1670
    return mastr_gdf.assign(end_of_life=pv_rooftop_lifetime < mastr_gdf.age)
1671
1672
1673
def calculate_max_pv_cap_per_building(
1674
    buildings_gdf: gpd.GeoDataFrame,
1675
    mastr_gdf: gpd.GeoDataFrame,
1676
    pv_cap_per_sq_m: float | int,
1677
    roof_factor: float | int,
1678
) -> gpd.GeoDataFrame:
1679
    """
1680
    Calculate the estimated maximum possible PV capacity per building.
1681
    Parameters
1682
    -----------
1683
    buildings_gdf : geopandas.GeoDataFrame
1684
        GeoDataFrame containing OSM buildings data.
1685
    mastr_gdf : geopandas.GeoDataFrame
1686
        GeoDataFrame containing geocoded MaStR data.
1687
    pv_cap_per_sq_m : float, int
1688
        Average expected, installable PV capacity per square meter.
1689
    roof_factor : float, int
1690
        Average for PV usable roof area share.
1691
    Returns
1692
    -------
1693
    geopandas.GeoDataFrame
1694
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1695
        capacity.
1696
    """
1697
    gdf = (
1698
        buildings_gdf.reset_index()
1699
        .merge(
1700
            mastr_gdf[
1701
                [
1702
                    "capacity",
1703
                    "end_of_life",
1704
                    "building_id",
1705
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1706
                    "Hauptausrichtung",
1707
                    "HauptausrichtungNeigungswinkel",
1708
                ]
1709
            ],
1710
            how="left",
1711
            left_on="id",
1712
            right_on="building_id",
1713
        )
1714
        .set_index("id")
1715
        .drop(columns="building_id")
1716
    )
1717
1718
    return gdf.assign(
1719
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1720
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1721
        bus_id=gdf.bus_id.astype(int),
1722
    )
1723
1724
1725
def calculate_building_load_factor(
1726
    mastr_gdf: gpd.GeoDataFrame,
1727
    buildings_gdf: gpd.GeoDataFrame,
1728
    rounding: int = 4,
1729
) -> gpd.GeoDataFrame:
1730
    """
1731
    Calculate the roof load factor from existing PV systems.
1732
    Parameters
1733
    -----------
1734
    mastr_gdf : geopandas.GeoDataFrame
1735
        GeoDataFrame containing geocoded MaStR data.
1736
    buildings_gdf : geopandas.GeoDataFrame
1737
        GeoDataFrame containing OSM buildings data.
1738
    rounding : int
1739
        Rounding to use for load factor.
1740
    Returns
1741
    -------
1742
    geopandas.GeoDataFrame
1743
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1744
    """
1745
    gdf = mastr_gdf.merge(
1746
        buildings_gdf[["max_cap", "building_area"]].loc[
1747
            ~buildings_gdf["max_cap"].isna()
1748
        ],
1749
        how="left",
1750
        left_on="building_id",
1751
        right_index=True,
1752
    )
1753
1754
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1755
1756
1757
def get_probability_for_property(
1758
    mastr_gdf: gpd.GeoDataFrame,
1759
    cap_range: tuple[int | float, int | float],
1760
    prop: str,
1761
) -> tuple[np.array, np.array]:
1762
    """
1763
    Calculate the probability of the different options of a property of the
1764
    existing PV plants.
1765
    Parameters
1766
    -----------
1767
    mastr_gdf : geopandas.GeoDataFrame
1768
        GeoDataFrame containing geocoded MaStR data.
1769
    cap_range : tuple(int, int)
1770
        Capacity range of PV plants to look at.
1771
    prop : str
1772
        Property to calculate probabilities for. String needs to be in columns
1773
        of mastr_gdf.
1774
    Returns
1775
    -------
1776
    tuple
1777
        numpy.array
1778
            Unique values of property.
1779
        numpy.array
1780
            Probabilties per unique value.
1781
    """
1782
    cap_range_gdf = mastr_gdf.loc[
1783
        (mastr_gdf.capacity > cap_range[0])
1784
        & (mastr_gdf.capacity <= cap_range[1])
1785
    ]
1786
1787
    if prop == "load_factor":
1788
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1789
1790
    count = Counter(
1791
        cap_range_gdf[prop].loc[
1792
            ~cap_range_gdf[prop].isna()
1793
            & ~cap_range_gdf[prop].isnull()
1794
            & ~(cap_range_gdf[prop] == "None")
1795
        ]
1796
    )
1797
1798
    values = np.array(list(count.keys()))
1799
    probabilities = np.fromiter(count.values(), dtype=float)
1800
    probabilities = probabilities / np.sum(probabilities)
1801
1802
    return values, probabilities
1803
1804
1805
@timer_func
1806
def probabilities(
1807
    mastr_gdf: gpd.GeoDataFrame,
1808
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1809
    properties: list[str] | None = None,
1810
) -> dict:
1811
    """
1812
    Calculate the probability of the different options of properties of the
1813
    existing PV plants.
1814
    Parameters
1815
    -----------
1816
    mastr_gdf : geopandas.GeoDataFrame
1817
        GeoDataFrame containing geocoded MaStR data.
1818
    cap_ranges : list(tuple(int, int))
1819
        List of capacity ranges to distinguish between. The first tuple should
1820
        start with a zero and the last one should end with infinite.
1821
    properties : list(str)
1822
        List of properties to calculate probabilities for. Strings needs to be
1823
        in columns of mastr_gdf.
1824
    Returns
1825
    -------
1826
    dict
1827
        Dictionary with values and probabilities per capacity range.
1828
    """
1829
    if cap_ranges is None:
1830
        cap_ranges = [
1831
            (0, 30),
1832
            (30, 100),
1833
            (100, float("inf")),
1834
        ]
1835
    if properties is None:
1836
        properties = [
1837
            "EinheitlicheAusrichtungUndNeigungswinkel",
1838
            "Hauptausrichtung",
1839
            "HauptausrichtungNeigungswinkel",
1840
            "load_factor",
1841
        ]
1842
1843
    prob_dict = {}
1844
1845
    for cap_range in cap_ranges:
1846
        prob_dict[cap_range] = {
1847
            "values": {},
1848
            "probabilities": {},
1849
        }
1850
1851
        for prop in properties:
1852
            v, p = get_probability_for_property(
1853
                mastr_gdf,
1854
                cap_range,
1855
                prop,
1856
            )
1857
1858
            prob_dict[cap_range]["values"][prop] = v
1859
            prob_dict[cap_range]["probabilities"][prop] = p
1860
1861
    return prob_dict
1862
1863
1864
def cap_share_per_cap_range(
1865
    mastr_gdf: gpd.GeoDataFrame,
1866
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1867
) -> dict[tuple[int | float, int | float], float]:
1868
    """
1869
    Calculate the share of PV capacity from the total PV capacity within
1870
    capacity ranges.
1871
    Parameters
1872
    -----------
1873
    mastr_gdf : geopandas.GeoDataFrame
1874
        GeoDataFrame containing geocoded MaStR data.
1875
    cap_ranges : list(tuple(int, int))
1876
        List of capacity ranges to distinguish between. The first tuple should
1877
        start with a zero and the last one should end with infinite.
1878
    Returns
1879
    -------
1880
    dict
1881
        Dictionary with share of PV capacity from the total PV capacity within
1882
        capacity ranges.
1883
    """
1884
    if cap_ranges is None:
1885
        cap_ranges = [
1886
            (0, 30),
1887
            (30, 100),
1888
            (100, float("inf")),
1889
        ]
1890
1891
    cap_share_dict = {}
1892
1893
    total_cap = mastr_gdf.capacity.sum()
1894
1895
    for cap_range in cap_ranges:
1896
        cap_share = (
1897
            mastr_gdf.loc[
1898
                (mastr_gdf.capacity > cap_range[0])
1899
                & (mastr_gdf.capacity <= cap_range[1])
1900
            ].capacity.sum()
1901
            / total_cap
1902
        )
1903
1904
        cap_share_dict[cap_range] = cap_share
1905
1906
    return cap_share_dict
1907
1908
1909
def mean_load_factor_per_cap_range(
1910
    mastr_gdf: gpd.GeoDataFrame,
1911
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1912
) -> dict[tuple[int | float, int | float], float]:
1913
    """
1914
    Calculate the mean roof load factor per capacity range from existing PV
1915
    plants.
1916
    Parameters
1917
    -----------
1918
    mastr_gdf : geopandas.GeoDataFrame
1919
        GeoDataFrame containing geocoded MaStR data.
1920
    cap_ranges : list(tuple(int, int))
1921
        List of capacity ranges to distinguish between. The first tuple should
1922
        start with a zero and the last one should end with infinite.
1923
    Returns
1924
    -------
1925
    dict
1926
        Dictionary with mean roof load factor per capacity range.
1927
    """
1928
    if cap_ranges is None:
1929
        cap_ranges = [
1930
            (0, 30),
1931
            (30, 100),
1932
            (100, float("inf")),
1933
        ]
1934
1935
    load_factor_dict = {}
1936
1937
    for cap_range in cap_ranges:
1938
        load_factor = mastr_gdf.loc[
1939
            (mastr_gdf.load_factor <= 1)
1940
            & (mastr_gdf.capacity > cap_range[0])
1941
            & (mastr_gdf.capacity <= cap_range[1])
1942
        ].load_factor.mean()
1943
1944
        load_factor_dict[cap_range] = load_factor
1945
1946
    return load_factor_dict
1947
1948
1949
def building_area_range_per_cap_range(
1950
    mastr_gdf: gpd.GeoDataFrame,
1951
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1952
    min_building_size: int | float = 10.0,
1953
    upper_quantile: float = 0.95,
1954
    lower_quantile: float = 0.05,
1955
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1956
    """
1957
    Estimate normal building area range per capacity range.
1958
    Calculate the mean roof load factor per capacity range from existing PV
1959
    plants.
1960
    Parameters
1961
    -----------
1962
    mastr_gdf : geopandas.GeoDataFrame
1963
        GeoDataFrame containing geocoded MaStR data.
1964
    cap_ranges : list(tuple(int, int))
1965
        List of capacity ranges to distinguish between. The first tuple should
1966
        start with a zero and the last one should end with infinite.
1967
    min_building_size : int, float
1968
        Minimal building size to consider for PV plants.
1969
    upper_quantile : float
1970
        Upper quantile to estimate maximum building size per capacity range.
1971
    lower_quantile : float
1972
        Lower quantile to estimate minimum building size per capacity range.
1973
    Returns
1974
    -------
1975
    dict
1976
        Dictionary with estimated normal building area range per capacity
1977
        range.
1978
    """
1979
    if cap_ranges is None:
1980
        cap_ranges = [
1981
            (0, 30),
1982
            (30, 100),
1983
            (100, float("inf")),
1984
        ]
1985
1986
    building_area_range_dict = {}
1987
1988
    n_ranges = len(cap_ranges)
1989
1990
    for count, cap_range in enumerate(cap_ranges):
1991
        cap_range_gdf = mastr_gdf.loc[
1992
            (mastr_gdf.capacity > cap_range[0])
1993
            & (mastr_gdf.capacity <= cap_range[1])
1994
        ]
1995
1996
        if count == 0:
1997
            building_area_range_dict[cap_range] = (
1998
                min_building_size,
1999
                cap_range_gdf.building_area.quantile(upper_quantile),
2000
            )
2001
        elif count == n_ranges - 1:
2002
            building_area_range_dict[cap_range] = (
2003
                cap_range_gdf.building_area.quantile(lower_quantile),
2004
                float("inf"),
2005
            )
2006
        else:
2007
            building_area_range_dict[cap_range] = (
2008
                cap_range_gdf.building_area.quantile(lower_quantile),
2009
                cap_range_gdf.building_area.quantile(upper_quantile),
2010
            )
2011
2012
    values = list(building_area_range_dict.values())
2013
2014
    building_area_range_normed_dict = {}
2015
2016
    for count, (cap_range, (min_area, max_area)) in enumerate(
2017
        building_area_range_dict.items()
2018
    ):
2019
        if count == 0:
2020
            building_area_range_normed_dict[cap_range] = (
2021
                min_area,
2022
                np.mean((values[count + 1][0], max_area)),
2023
            )
2024
        elif count == n_ranges - 1:
2025
            building_area_range_normed_dict[cap_range] = (
2026
                np.mean((values[count - 1][1], min_area)),
2027
                max_area,
2028
            )
2029
        else:
2030
            building_area_range_normed_dict[cap_range] = (
2031
                np.mean((values[count - 1][1], min_area)),
2032
                np.mean((values[count + 1][0], max_area)),
2033
            )
2034
2035
    return building_area_range_normed_dict
2036
2037
2038
@timer_func
2039
def desaggregate_pv_in_mv_grid(
2040
    buildings_gdf: gpd.GeoDataFrame,
2041
    pv_cap: float | int,
2042
    **kwargs,
2043
) -> gpd.GeoDataFrame:
2044
    """
2045
    Desaggregate PV capacity on buildings within a given grid district.
2046
    Parameters
2047
    -----------
2048
    buildings_gdf : geopandas.GeoDataFrame
2049
        GeoDataFrame containing buildings within the grid district.
2050
    pv_cap : float, int
2051
        PV capacity to desaggregate.
2052
    Other Parameters
2053
    -----------
2054
    prob_dict : dict
2055
        Dictionary with values and probabilities per capacity range.
2056
    cap_share_dict : dict
2057
        Dictionary with share of PV capacity from the total PV capacity within
2058
        capacity ranges.
2059
    building_area_range_dict : dict
2060
        Dictionary with estimated normal building area range per capacity
2061
        range.
2062
    load_factor_dict : dict
2063
        Dictionary with mean roof load factor per capacity range.
2064
    seed : int
2065
        Seed to use for random operations with NumPy and pandas.
2066
    pv_cap_per_sq_m : float, int
2067
        Average expected, installable PV capacity per square meter.
2068
    Returns
2069
    -------
2070
    geopandas.GeoDataFrame
2071
        GeoDataFrame containing OSM building data with desaggregated PV
2072
        plants.
2073
    """
2074
    bus_id = int(buildings_gdf.bus_id.iat[0])
2075
2076
    rng = default_rng(seed=kwargs["seed"])
2077
    random_state = RandomState(seed=kwargs["seed"])
2078
2079
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2080
2081
    for cap_range, share in kwargs["cap_share_dict"].items():
2082
        pv_cap_range = pv_cap * share
2083
2084
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2085
2086
        cap_range_buildings_gdf = buildings_gdf.loc[
2087
            ~buildings_gdf.index.isin(results_df.index)
2088
            & (buildings_gdf.building_area > b_area_min)
2089
            & (buildings_gdf.building_area <= b_area_max)
2090
        ]
2091
2092
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2093
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2094
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2095
            load_factor=np.nan,
2096
            capacity=np.nan,
2097
        )
2098
2099
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2100
2101
        if total_mean_cap == 0:
2102
            logger.warning(
2103
                f"There are no matching roof for capacity range {cap_range} "
2104
                f"kW in grid {bus_id}. Using all buildings as fallback."
2105
            )
2106
2107
            cap_range_buildings_gdf = buildings_gdf.loc[
2108
                ~buildings_gdf.index.isin(results_df.index)
2109
            ]
2110
2111
            if len(cap_range_buildings_gdf) == 0:
2112
                logger.warning(
2113
                    "There are no roofes available for capacity range "
2114
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2115
                )
2116
                cap_range_buildings_gdf = buildings_gdf.copy()
2117
2118
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2119
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2120
                load_factor=np.nan,
2121
                capacity=np.nan,
2122
            )
2123
2124
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2125
2126
        elif total_mean_cap < pv_cap_range:
2127
            logger.warning(
2128
                f"Average roof utilization of the roof area in grid {bus_id} "
2129
                f"and capacity range {cap_range} kW is not sufficient. The "
2130
                "roof utilization will be above average."
2131
            )
2132
2133
        frac = max(
2134
            pv_cap_range / total_mean_cap,
2135
            1 / len(cap_range_buildings_gdf),
2136
        )
2137
2138
        samples_gdf = cap_range_buildings_gdf.sample(
2139
            frac=min(1, frac),
2140
            random_state=random_state,
2141
        )
2142
2143
        cap_range_dict = kwargs["prob_dict"][cap_range]
2144
2145
        values_dict = cap_range_dict["values"]
2146
        p_dict = cap_range_dict["probabilities"]
2147
2148
        load_factors = rng.choice(
2149
            a=values_dict["load_factor"],
2150
            size=len(samples_gdf),
2151
            p=p_dict["load_factor"],
2152
        )
2153
2154
        samples_gdf = samples_gdf.assign(
2155
            load_factor=load_factors,
2156
            capacity=(
2157
                samples_gdf.building_area
2158
                * load_factors
2159
                * kwargs["pv_cap_per_sq_m"]
2160
            ).clip(lower=0.4),
2161
        )
2162
2163
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2164
2165
        samples_gdf = samples_gdf.assign(
2166
            capacity=(samples_gdf.capacity * missing_factor),
2167
            load_factor=(samples_gdf.load_factor * missing_factor),
2168
        )
2169
2170
        assert np.isclose(
2171
            samples_gdf.capacity.sum(),
2172
            pv_cap_range,
2173
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2174
2175
        results_df = pd.concat(
2176
            [
2177
                results_df,
2178
                samples_gdf,
2179
            ],
2180
        )
2181
2182
    assert np.isclose(
2183
        results_df.capacity.sum(),
2184
        pv_cap,
2185
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2186
2187
    return gpd.GeoDataFrame(
2188
        results_df,
2189
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2081 is not entered. Are you sure this can never be the case?
Loading history...
2190
        geometry="geom",
2191
    )
2192
2193
2194
@timer_func
2195
def desaggregate_pv(
2196
    buildings_gdf: gpd.GeoDataFrame,
2197
    cap_df: pd.DataFrame,
2198
    **kwargs,
2199
) -> gpd.GeoDataFrame:
2200
    """
2201
    Desaggregate PV capacity on buildings within a given grid district.
2202
    Parameters
2203
    -----------
2204
    buildings_gdf : geopandas.GeoDataFrame
2205
        GeoDataFrame containing OSM buildings data.
2206
    cap_df : pandas.DataFrame
2207
        DataFrame with total rooftop capacity per mv grid.
2208
    Other Parameters
2209
    -----------
2210
    prob_dict : dict
2211
        Dictionary with values and probabilities per capacity range.
2212
    cap_share_dict : dict
2213
        Dictionary with share of PV capacity from the total PV capacity within
2214
        capacity ranges.
2215
    building_area_range_dict : dict
2216
        Dictionary with estimated normal building area range per capacity
2217
        range.
2218
    load_factor_dict : dict
2219
        Dictionary with mean roof load factor per capacity range.
2220
    seed : int
2221
        Seed to use for random operations with NumPy and pandas.
2222
    pv_cap_per_sq_m : float, int
2223
        Average expected, installable PV capacity per square meter.
2224
    Returns
2225
    -------
2226
    geopandas.GeoDataFrame
2227
        GeoDataFrame containing OSM building data with desaggregated PV
2228
        plants.
2229
    """
2230
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2231
2232
    for bus_id in buildings_gdf.bus_id.unique():
2233
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2234
2235
        pv_installed_gdf = buildings_grid_gdf.loc[
2236
            ~buildings_grid_gdf.end_of_life
2237
        ]
2238
2239
        pv_installed = pv_installed_gdf.capacity.sum()
2240
2241
        pot_buildings_gdf = buildings_grid_gdf.loc[
2242
            ~buildings_grid_gdf.index.isin(pv_installed_gdf.index)
2243
        ]
2244
2245
        if len(pot_buildings_gdf) == 0:
2246
            logger.error(
2247
                f"In grid {bus_id} there are no potential buildings to allocate "
2248
                "PV capacity to. The grid is skipped. This message should only "
2249
                "appear doing test runs with few buildings."
2250
            )
2251
2252
            continue
2253
2254
        pv_target = cap_df.at[
2255
            bus_id,
2256
            "capacity",
2257
        ]
2258
2259
        pv_missing = pv_target - pv_installed
2260
2261
        if pv_missing <= 0:
2262
            logger.info(
2263
                f"In grid {bus_id} there is more PV installed in status Quo than"
2264
                " allocated within the scenario. No new generators are added."
2265
            )
2266
2267
            continue
2268
2269
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2270
            logger.error(
2271
                f"In grid {bus_id} there is less PV potential ("
2272
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV "
2273
                f"capacity ({pv_missing:g} kW). The grid is skipped. This message "
2274
                "should only appear doing test runs with few buildings."
2275
            )
2276
2277
            continue
2278
2279
        gdf = desaggregate_pv_in_mv_grid(
2280
            buildings_gdf=pot_buildings_gdf,
2281
            pv_cap=pv_missing,
2282
            **kwargs,
2283
        )
2284
2285
        allocated_buildings_gdf = pd.concat(
2286
            [
2287
                allocated_buildings_gdf,
2288
                gdf,
2289
            ]
2290
        )
2291
2292
    logger.debug("Desaggregated scenario.")
2293
2294
    return gpd.GeoDataFrame(
2295
        allocated_buildings_gdf,
2296
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2297
        geometry="geom",
2298
    )
2299
2300
2301
@timer_func
2302
def add_buildings_meta_data(
2303
    buildings_gdf: gpd.GeoDataFrame,
2304
    prob_dict: dict,
2305
    seed: int,
2306
) -> gpd.GeoDataFrame:
2307
    """
2308
    Randomly add additional metadata to desaggregated PV plants.
2309
    Parameters
2310
    -----------
2311
    buildings_gdf : geopandas.GeoDataFrame
2312
        GeoDataFrame containing OSM buildings data with desaggregated PV
2313
        plants.
2314
    prob_dict : dict
2315
        Dictionary with values and probabilities per capacity range.
2316
    seed : int
2317
        Seed to use for random operations with NumPy and pandas.
2318
    Returns
2319
    -------
2320
    geopandas.GeoDataFrame
2321
        GeoDataFrame containing OSM building data with desaggregated PV
2322
        plants.
2323
    """
2324
    rng = default_rng(seed=seed)
2325
    buildings_gdf = buildings_gdf.reset_index().rename(
2326
        columns={
2327
            "index": "building_id",
2328
        }
2329
    )
2330
2331
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2332
        cap_range_gdf = buildings_gdf.loc[
2333
            (buildings_gdf.capacity >= min_cap)
2334
            & (buildings_gdf.capacity < max_cap)
2335
        ]
2336
2337
        for key, values in cap_range_prob_dict["values"].items():
2338
            if key == "load_factor":
2339
                continue
2340
2341
            gdf = cap_range_gdf.loc[
2342
                cap_range_gdf[key].isna()
2343
                | cap_range_gdf[key].isnull()
2344
                | (cap_range_gdf[key] == "None")
2345
            ]
2346
2347
            key_vals = rng.choice(
2348
                a=values,
2349
                size=len(gdf),
2350
                p=cap_range_prob_dict["probabilities"][key],
2351
            )
2352
2353
            buildings_gdf.loc[gdf.index, key] = key_vals
2354
2355
    return buildings_gdf
2356
2357
2358
def add_voltage_level(
2359
    buildings_gdf: gpd.GeoDataFrame,
2360
) -> gpd.GeoDataFrame:
2361
    """
2362
    Add voltage level derived from generator capacity to the power plants.
2363
    Parameters
2364
    -----------
2365
    buildings_gdf : geopandas.GeoDataFrame
2366
        GeoDataFrame containing OSM buildings data with desaggregated PV
2367
        plants.
2368
    Returns
2369
    -------
2370
    geopandas.GeoDataFrame
2371
        GeoDataFrame containing OSM building data with voltage level per generator.
2372
    """
2373
2374
    def voltage_levels(p: float) -> int:
2375
        if p < 100:
2376
            return 7
2377
        elif p < 200:
2378
            return 6
2379
        elif p < 5500:
2380
            return 5
2381
        elif p < 20000:
2382
            return 4
2383
        elif p < 120000:
2384
            return 3
2385
        return 1
2386
2387
    return buildings_gdf.assign(
2388
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2389
    )
2390
2391
2392
def add_start_up_date(
2393
    buildings_gdf: gpd.GeoDataFrame,
2394
    start: pd.Timestamp,
2395
    end: pd.Timestamp,
2396
    seed: int,
2397
):
2398
    """
2399
    Randomly and linear add start-up date to new pv generators.
2400
    Parameters
2401
    ----------
2402
    buildings_gdf : geopandas.GeoDataFrame
2403
        GeoDataFrame containing OSM buildings data with desaggregated PV
2404
        plants.
2405
    start : pandas.Timestamp
2406
        Minimum Timestamp to use.
2407
    end : pandas.Timestamp
2408
        Maximum Timestamp to use.
2409
    seed : int
2410
        Seed to use for random operations with NumPy and pandas.
2411
    Returns
2412
    -------
2413
    geopandas.GeoDataFrame
2414
        GeoDataFrame containing OSM buildings data with start-up date added.
2415
    """
2416
    rng = default_rng(seed=seed)
2417
2418
    date_range = pd.date_range(start=start, end=end, freq="1D")
2419
2420
    return buildings_gdf.assign(
2421
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2422
    )
2423
2424
2425
@timer_func
2426
def allocate_scenarios(
2427
    mastr_gdf: gpd.GeoDataFrame,
2428
    buildings_gdf: gpd.GeoDataFrame,
2429
    last_scenario_gdf: gpd.GeoDataFrame,
2430
    scenario: str,
2431
):
2432
    """
2433
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2434
    Parameters
2435
    ----------
2436
    mastr_gdf : geopandas.GeoDataFrame
2437
        GeoDataFrame containing geocoded MaStR data.
2438
    buildings_gdf : geopandas.GeoDataFrame
2439
        GeoDataFrame containing OSM buildings data.
2440
    last_scenario_gdf : geopandas.GeoDataFrame
2441
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2442
        preceding scenario.
2443
    scenario : str
2444
        Scenario to desaggrgate and allocate.
2445
    Returns
2446
    -------
2447
    tuple
2448
        geopandas.GeoDataFrame
2449
            GeoDataFrame containing OSM buildings matched with pv generators.
2450
        pandas.DataFrame
2451
            DataFrame containing pv rooftop capacity per grid id.
2452
    """
2453
    grid_districts_gdf = grid_districts(EPSG)
2454
2455
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
2456
2457
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
2458
        grid_districts_gdf,
2459
        federal_state_gdf,
2460
    )
2461
2462
    buildings_overlay_gdf = add_overlay_id_to_buildings(
2463
        buildings_gdf,
2464
        grid_federal_state_gdf,
2465
    )
2466
2467
    valid_buildings_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
2468
2469
    # buildings_area_per_overlay_gdf = buildings_area_per_overlay_id(
2470
    #     valid_buildings_gdf,
2471
    #     grid_federal_state_gdf,
2472
    # )
2473
2474
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2475
    #     buildings_area_per_overlay_gdf,
2476
    #     scenario_data(CARRIER, scenario),
2477
    # )
2478
2479
    last_scenario_gdf = determine_end_of_life_gens(
2480
        last_scenario_gdf,
2481
        SCENARIO_TIMESTAMP[scenario],
2482
        PV_ROOFTOP_LIFETIME,
2483
    )
2484
2485
    buildings_gdf = calculate_max_pv_cap_per_building(
2486
        valid_buildings_gdf,
2487
        last_scenario_gdf,
2488
        PV_CAP_PER_SQ_M,
2489
        ROOF_FACTOR,
2490
    )
2491
2492
    mastr_gdf = calculate_building_load_factor(
2493
        mastr_gdf,
2494
        buildings_gdf,
2495
    )
2496
2497
    probabilities_dict = probabilities(
2498
        mastr_gdf,
2499
        cap_ranges=CAP_RANGES,
2500
    )
2501
2502
    cap_share_dict = cap_share_per_cap_range(
2503
        mastr_gdf,
2504
        cap_ranges=CAP_RANGES,
2505
    )
2506
2507
    load_factor_dict = mean_load_factor_per_cap_range(
2508
        mastr_gdf,
2509
        cap_ranges=CAP_RANGES,
2510
    )
2511
2512
    building_area_range_dict = building_area_range_per_cap_range(
2513
        mastr_gdf,
2514
        cap_ranges=CAP_RANGES,
2515
        min_building_size=MIN_BUILDING_SIZE,
2516
        upper_quantile=UPPER_QUNATILE,
2517
        lower_quantile=LOWER_QUANTILE,
2518
    )
2519
2520
    allocated_buildings_gdf = desaggregate_pv(
2521
        buildings_gdf=buildings_gdf,
2522
        cap_df=cap_per_bus_id_df,
2523
        prob_dict=probabilities_dict,
2524
        cap_share_dict=cap_share_dict,
2525
        building_area_range_dict=building_area_range_dict,
2526
        load_factor_dict=load_factor_dict,
2527
        seed=SEED,
2528
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2529
    )
2530
2531
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2532
2533
    meta_buildings_gdf = frame_to_numeric(
2534
        add_buildings_meta_data(
2535
            allocated_buildings_gdf,
2536
            probabilities_dict,
2537
            SEED,
2538
        )
2539
    )
2540
2541
    return (
2542
        add_start_up_date(
2543
            meta_buildings_gdf,
2544
            start=last_scenario_gdf.start_up_date.max(),
2545
            end=SCENARIO_TIMESTAMP[scenario],
2546
            seed=SEED,
2547
        ),
2548
        cap_per_bus_id_df,
2549
    )
2550
2551
2552
class EgonPowerPlantPvRoofBuildingScenario(Base):
2553
    __tablename__ = "egon_power_plants_pv_roof_building"
2554
    __table_args__ = {"schema": "supply"}
2555
2556
    index = Column(Integer, primary_key=True, index=True)
2557
    scenario = Column(String)
2558
    building_id = Column(Integer)
2559
    gens_id = Column(String, nullable=True)
2560
    capacity = Column(Float)
2561
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2562
    hauptausrichtung = Column(String)
2563
    hauptausrichtung_neigungswinkel = Column(String)
2564
    voltage_level = Column(Integer)
2565
2566
2567
def create_scenario_table(buildings_gdf):
2568
    """Create mapping table pv_unit <-> building for scenario"""
2569
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2570
        bind=engine, checkfirst=True
2571
    )
2572
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2573
        bind=engine, checkfirst=True
2574
    )
2575
2576
    buildings_gdf.rename(columns=COLS_TO_RENAME)[
2577
        COLS_TO_EXPORT
2578
    ].reset_index().to_sql(
2579
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2580
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2581
        con=db.engine(),
2582
        if_exists="append",
2583
        index=False,
2584
    )
2585
2586
2587
def geocode_mastr_data():
2588
    """
2589
    Read PV rooftop data from MaStR CSV
2590
    TODO: the source will be replaced as soon as the MaStR data is available
2591
     in DB.
2592
    """
2593
    mastr_df = mastr_data(
2594
        MASTR_INDEX_COL,
2595
        MASTR_RELEVANT_COLS,
2596
        MASTR_DTYPES,
2597
        MASTR_PARSE_DATES,
2598
    )
2599
2600
    clean_mastr_df = clean_mastr_data(
2601
        mastr_df,
2602
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2603
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2604
        seed=SEED,
2605
        rounding=ROUNDING,
2606
    )
2607
2608
    geocoding_df = geocoding_data(clean_mastr_df)
2609
2610
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2611
2612
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2613
2614
    create_geocoded_table(geocode_gdf)
2615
2616
2617
def pv_rooftop_to_buildings():
2618
    """Main script, executed as task"""
2619
2620
    mastr_gdf = load_mastr_data()
2621
2622
    buildings_gdf = load_building_data()
2623
2624
    logger.debug(f"1: {buildings_gdf.head()}")
2625
2626
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2627
        mastr_gdf, buildings_gdf
2628
    )
2629
2630
    logger.debug(f"2: {desagg_buildings_gdf.head()}")
2631
2632
    all_buildings_gdf = (
2633
        desagg_mastr_gdf.assign(scenario="status_quo")
2634
        .reset_index()
2635
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2636
    )
2637
2638
    scenario_buildings_gdf = all_buildings_gdf.copy()
2639
2640
    cap_per_bus_id_df = pd.DataFrame()
2641
2642
    for scenario in SCENARIOS:
2643
        logger.debug(f"Desaggregating scenario {scenario}.")
2644
        (
2645
            scenario_buildings_gdf,
2646
            cap_per_bus_id_scenario_df,
2647
        ) = allocate_scenarios(  # noqa: F841
2648
            desagg_mastr_gdf,
2649
            desagg_buildings_gdf,
2650
            scenario_buildings_gdf,
2651
            scenario,
2652
        )
2653
2654
        all_buildings_gdf = gpd.GeoDataFrame(
2655
            pd.concat(
2656
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2657
            ),
2658
            crs=scenario_buildings_gdf.crs,
2659
            geometry="geom",
2660
        )
2661
2662
        cap_per_bus_id_df = pd.concat(
2663
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2664
        )
2665
2666
    # export scenario
2667
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2668