Passed
Pull Request — dev (#934)
by
unknown
02:02
created

allocate_to_buildings()   A

Complexity

Conditions 1

Size

Total Lines 32
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 32
rs 9.85
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035", "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(20 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "building_id",
193
    "gens_id",
194
    "capacity",
195
    "einheitliche_ausrichtung_und_neigungswinkel",
196
    "hauptausrichtung",
197
    "hauptausrichtung_neigungswinkel",
198
    "voltage_level",
199
]
200
201
# TODO
202
INCLUDE_SYNTHETIC_BUILDINGS = True
203
ONLY_BUILDINGS_WITH_DEMAND = True
204
TEST_RUN = False
205
206
207
def timer_func(func):
208
    @wraps(func)
209
    def timeit_wrapper(*args, **kwargs):
210
        start_time = perf_counter()
211
        result = func(*args, **kwargs)
212
        end_time = perf_counter()
213
        total_time = end_time - start_time
214
        logger.debug(
215
            f"Function {func.__name__} took {total_time:.4f} seconds."
216
        )
217
        return result
218
219
    return timeit_wrapper
220
221
222
@timer_func
223
def mastr_data(
224
    index_col: str | int | list[str] | list[int],
225
    usecols: list[str],
226
    dtype: dict[str, Any] | None,
227
    parse_dates: list[str] | None,
228
) -> pd.DataFrame:
229
    """
230
    Read MaStR data from csv.
231
232
    Parameters
233
    -----------
234
    index_col : str, int or list of str or int
235
        Column(s) to use as the row labels of the DataFrame.
236
    usecols : list of str
237
        Return a subset of the columns.
238
    dtype : dict of column (str) -> type (any), optional
239
        Data type for data or columns.
240
    parse_dates : list of names (str), optional
241
        Try to parse given columns to datetime.
242
    Returns
243
    -------
244
    pandas.DataFrame
245
        DataFrame containing MaStR data.
246
    """
247
    mastr_path = Path(
248
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
249
    ).resolve()
250
251
    mastr_df = pd.read_csv(
252
        mastr_path,
253
        index_col=index_col,
254
        usecols=usecols,
255
        dtype=dtype,
256
        parse_dates=parse_dates,
257
    )
258
259
    mastr_df = mastr_df.loc[
260
        (mastr_df.StatisikFlag == "B")
261
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
262
        & (mastr_df.Land == "Deutschland")
263
        & (mastr_df.Lage == "BaulicheAnlagen")
264
    ]
265
266
    if (
267
        config.settings()["egon-data"]["--dataset-boundary"]
268
        == "Schleswig-Holstein"
269
    ):
270
        init_len = len(mastr_df)
271
272
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
273
274
        logger.info(
275
            f"Using only MaStR data within Schleswig-Holstein. "
276
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
277
        )
278
279
    logger.debug("MaStR data loaded.")
280
281
    return mastr_df
282
283
284
@timer_func
285
def clean_mastr_data(
286
    mastr_df: pd.DataFrame,
287
    max_realistic_pv_cap: int | float,
288
    min_realistic_pv_cap: int | float,
289
    rounding: int,
290
    seed: int,
291
) -> pd.DataFrame:
292
    """
293
    Clean the MaStR data from implausible data.
294
295
    * Drop MaStR ID duplicates.
296
    * Drop generators with implausible capacities.
297
    * Drop generators without any kind of start-up date.
298
    * Clean up Standort column and capacity.
299
300
    Parameters
301
    -----------
302
    mastr_df : pandas.DataFrame
303
        DataFrame containing MaStR data.
304
    max_realistic_pv_cap : int or float
305
        Maximum capacity, which is considered to be realistic.
306
    min_realistic_pv_cap : int or float
307
        Minimum capacity, which is considered to be realistic.
308
    rounding : int
309
        Rounding to use when cleaning up capacity. E.g. when
310
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
311
    seed : int
312
        Seed to use for random operations with NumPy and pandas.
313
    Returns
314
    -------
315
    pandas.DataFrame
316
        DataFrame containing cleaned MaStR data.
317
    """
318
    init_len = len(mastr_df)
319
320
    # drop duplicates
321
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
322
323
    # drop invalid entries in standort
324
    index_to_drop = mastr_df.loc[
325
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
326
    ].index
327
328
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
329
330
    df = mastr_df[
331
        [
332
            "Bruttoleistung",
333
            "Bruttoleistung_extended",
334
            "Nettonennleistung",
335
            "zugeordneteWirkleistungWechselrichter",
336
            "InstallierteLeistung",
337
        ]
338
    ].round(rounding)
339
340
    # use only the smallest capacity rating if multiple are given
341
    mastr_df = mastr_df.assign(
342
        capacity=[
343
            most_plausible(p_tub, min_realistic_pv_cap)
344
            for p_tub in df.itertuples(index=False)
345
        ]
346
    )
347
348
    # drop generators without any capacity info
349
    # and capacity of zero
350
    # and if the capacity is > 23.5 MW, because
351
    # Germanies largest rooftop PV is 23 MW
352
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
353
    mastr_df = mastr_df.loc[
354
        (~mastr_df.capacity.isna())
355
        & (mastr_df.capacity <= max_realistic_pv_cap)
356
        & (mastr_df.capacity > min_realistic_pv_cap)
357
    ]
358
359
    # get zip and municipality
360
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
361
        mastr_df.Standort.astype(str)
362
        .apply(
363
            zip_and_municipality_from_standort,
364
            args=(VERBOSE,),
365
        )
366
        .tolist(),
367
        index=mastr_df.index,
368
    )
369
370
    # drop invalid entries
371
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
372
373
    # add ", Deutschland" just in case
374
    mastr_df = mastr_df.assign(
375
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
376
    )
377
378
    # get consistent start-up date
379
    mastr_df = mastr_df.assign(
380
        start_up_date=mastr_df.Inbetriebnahmedatum,
381
    )
382
383
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
384
        mastr_df.start_up_date.isna()
385
    ].assign(
386
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
387
            mastr_df.start_up_date.isna()
388
        ]
389
    )
390
391
    # randomly and weighted fill missing start-up dates
392
    pool = mastr_df.loc[
393
        ~mastr_df.start_up_date.isna()
394
    ].start_up_date.to_numpy()
395
396
    size = len(mastr_df) - len(pool)
397
398
    if size > 0:
399
        np.random.seed(seed)
400
401
        choice = np.random.choice(
402
            pool,
403
            size=size,
404
            replace=False,
405
        )
406
407
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
408
            mastr_df.start_up_date.isna()
409
        ].assign(start_up_date=choice)
410
411
        logger.info(
412
            f"Randomly and weigthed added start-up date to {size} generators."
413
        )
414
415
    mastr_df = mastr_df.assign(
416
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
417
    )
418
419
    end_len = len(mastr_df)
420
    logger.debug(
421
        f"Dropped {init_len - end_len} "
422
        f"({((init_len - end_len) / init_len) * 100:g}%)"
423
        f" of {init_len} rows from MaStR DataFrame."
424
    )
425
426
    return mastr_df
427
428
429
def zip_and_municipality_from_standort(
430
    standort: str,
431
    verbose: bool = False,
432
) -> tuple[str, bool]:
433
    """
434
    Get zip code and municipality from Standort string split into a list.
435
    Parameters
436
    -----------
437
    standort : str
438
        Standort as given from MaStR data.
439
    verbose : bool
440
        Logs additional info if True.
441
    Returns
442
    -------
443
    str
444
        Standort with only the zip code and municipality
445
        as well a ', Germany' added.
446
    """
447
    if verbose:
448
        logger.debug(f"Uncleaned String: {standort}")
449
450
    standort_list = standort.split()
451
452
    found = False
453
    count = 0
454
455
    for count, elem in enumerate(standort_list):
456
        if len(elem) != 5:
457
            continue
458
        if not elem.isnumeric():
459
            continue
460
461
        found = True
462
463
        break
464
465
    if found:
466
        cleaned_str = " ".join(standort_list[count:])
467
468
        if verbose:
469
            logger.debug(f"Cleaned String:   {cleaned_str}")
470
471
        return cleaned_str, found
472
473
    logger.warning(
474
        "Couldn't identify zip code. This entry will be dropped."
475
        f" Original standort: {standort}."
476
    )
477
478
    return standort, found
479
480
481
def most_plausible(
482
    p_tub: tuple,
483
    min_realistic_pv_cap: int | float,
484
) -> float:
485
    """
486
    Try to determine the most plausible capacity.
487
    Try to determine the most plausible capacity from a given
488
    generator from MaStR data.
489
    Parameters
490
    -----------
491
    p_tub : tuple
492
        Tuple containing the different capacities given in
493
        the MaStR data.
494
    min_realistic_pv_cap : int or float
495
        Minimum capacity, which is considered to be realistic.
496
    Returns
497
    -------
498
    float
499
        Capacity of the generator estimated as the most realistic.
500
    """
501
    count = Counter(p_tub).most_common(3)
502
503
    if len(count) == 1:
504
        return count[0][0]
505
506
    val1 = count[0][0]
507
    val2 = count[1][0]
508
509
    if len(count) == 2:
510
        min_val = min(val1, val2)
511
        max_val = max(val1, val2)
512
    else:
513
        val3 = count[2][0]
514
515
        min_val = min(val1, val2, val3)
516
        max_val = max(val1, val2, val3)
517
518
    if min_val < min_realistic_pv_cap:
519
        return max_val
520
521
    return min_val
522
523
524
def geocoder(
525
    user_agent: str,
526
    min_delay_seconds: int,
527
) -> RateLimiter:
528
    """
529
    Setup Nominatim geocoding class.
530
    Parameters
531
    -----------
532
    user_agent : str
533
        The app name.
534
    min_delay_seconds : int
535
        Delay in seconds to use between requests to Nominatim.
536
        A minimum of 1 is advised.
537
    Returns
538
    -------
539
    geopy.extra.rate_limiter.RateLimiter
540
        Nominatim RateLimiter geocoding class to use for geocoding.
541
    """
542
    locator = Nominatim(user_agent=user_agent)
543
    return RateLimiter(
544
        locator.geocode,
545
        min_delay_seconds=min_delay_seconds,
546
    )
547
548
549
def geocoding_data(
550
    clean_mastr_df: pd.DataFrame,
551
) -> pd.DataFrame:
552
    """
553
    Setup DataFrame to geocode.
554
    Parameters
555
    -----------
556
    clean_mastr_df : pandas.DataFrame
557
        DataFrame containing cleaned MaStR data.
558
    Returns
559
    -------
560
    pandas.DataFrame
561
        DataFrame containing all unique combinations of
562
        zip codes with municipalities for geocoding.
563
    """
564
    return pd.DataFrame(
565
        data=clean_mastr_df.zip_and_municipality.unique(),
566
        columns=["zip_and_municipality"],
567
    )
568
569
570
@timer_func
571
def geocode_data(
572
    geocoding_df: pd.DataFrame,
573
    ratelimiter: RateLimiter,
574
    epsg: int,
575
) -> gpd.GeoDataFrame:
576
    """
577
    Geocode zip code and municipality.
578
    Extract latitude, longitude and altitude.
579
    Transfrom latitude and longitude to shapely
580
    Point and return a geopandas GeoDataFrame.
581
    Parameters
582
    -----------
583
    geocoding_df : pandas.DataFrame
584
        DataFrame containing all unique combinations of
585
        zip codes with municipalities for geocoding.
586
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
587
        Nominatim RateLimiter geocoding class to use for geocoding.
588
    epsg : int
589
        EPSG ID to use as CRS.
590
    Returns
591
    -------
592
    geopandas.GeoDataFrame
593
        GeoDataFrame containing all unique combinations of
594
        zip codes with municipalities with matching geolocation.
595
    """
596
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
597
598
    geocode_df = geocoding_df.assign(
599
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
600
    )
601
602
    geocode_df = geocode_df.assign(
603
        point=geocode_df.location.apply(
604
            lambda loc: tuple(loc.point) if loc else None
605
        )
606
    )
607
608
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
609
        geocode_df.point.tolist(), index=geocode_df.index
610
    )
611
612
    return gpd.GeoDataFrame(
613
        geocode_df,
614
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
615
        crs=f"EPSG:{epsg}",
616
    )
617
618
619
def merge_geocode_with_mastr(
620
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
621
) -> gpd.GeoDataFrame:
622
    """
623
    Merge geometry to original mastr data.
624
    Parameters
625
    -----------
626
    clean_mastr_df : pandas.DataFrame
627
        DataFrame containing cleaned MaStR data.
628
    geocode_gdf : geopandas.GeoDataFrame
629
        GeoDataFrame containing all unique combinations of
630
        zip codes with municipalities with matching geolocation.
631
    Returns
632
    -------
633
    gepandas.GeoDataFrame
634
        GeoDataFrame containing cleaned MaStR data with
635
        matching geolocation from geocoding.
636
    """
637
    return gpd.GeoDataFrame(
638
        clean_mastr_df.merge(
639
            geocode_gdf[["zip_and_municipality", "geometry"]],
640
            how="left",
641
            left_on="zip_and_municipality",
642
            right_on="zip_and_municipality",
643
        ),
644
        crs=geocode_gdf.crs,
645
    ).set_index(clean_mastr_df.index)
646
647
648
def drop_invalid_entries_from_gdf(
649
    gdf: gpd.GeoDataFrame,
650
) -> gpd.GeoDataFrame:
651
    """
652
    Drop invalid entries from geopandas GeoDataFrame.
653
    TODO: how to omit the logging from geos here???
654
    Parameters
655
    -----------
656
    gdf : geopandas.GeoDataFrame
657
        GeoDataFrame to be checked for validity.
658
    Returns
659
    -------
660
    gepandas.GeoDataFrame
661
        GeoDataFrame with rows with invalid geometries
662
        dropped.
663
    """
664
    valid_gdf = gdf.loc[gdf.is_valid]
665
666
    logger.debug(
667
        f"{len(gdf) - len(valid_gdf)} "
668
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
669
        f"of {len(gdf)} values were invalid and are dropped."
670
    )
671
672
    return valid_gdf
673
674
675
@timer_func
676
def municipality_data() -> gpd.GeoDataFrame:
677
    """
678
    Get municipality data from eGo^n Database.
679
    Returns
680
    -------
681
    gepandas.GeoDataFrame
682
        GeoDataFrame with municipality data.
683
    """
684
    with db.session_scope() as session:
685
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
686
687
    return gpd.read_postgis(
688
        query.statement, query.session.bind, index_col="ags"
689
    )
690
691
692
@timer_func
693
def add_ags_to_gens(
694
    valid_mastr_gdf: gpd.GeoDataFrame,
695
    municipalities_gdf: gpd.GeoDataFrame,
696
) -> gpd.GeoDataFrame:
697
    """
698
    Add information about AGS ID to generators.
699
    Parameters
700
    -----------
701
    valid_mastr_gdf : geopandas.GeoDataFrame
702
        GeoDataFrame with valid and cleaned MaStR data.
703
    municipalities_gdf : geopandas.GeoDataFrame
704
        GeoDataFrame with municipality data.
705
    Returns
706
    -------
707
    gepandas.GeoDataFrame
708
        GeoDataFrame with valid and cleaned MaStR data
709
        with AGS ID added.
710
    """
711
    return valid_mastr_gdf.sjoin(
712
        municipalities_gdf,
713
        how="left",
714
        predicate="intersects",
715
    ).rename(columns={"index_right": "ags"})
716
717
718
def drop_gens_outside_muns(
719
    valid_mastr_gdf: gpd.GeoDataFrame,
720
) -> gpd.GeoDataFrame:
721
    """
722
    Drop all generators outside of municipalities.
723
    Parameters
724
    -----------
725
    valid_mastr_gdf : geopandas.GeoDataFrame
726
        GeoDataFrame with valid and cleaned MaStR data.
727
    Returns
728
    -------
729
    gepandas.GeoDataFrame
730
        GeoDataFrame with valid and cleaned MaStR data
731
        with generatos without an AGS ID dropped.
732
    """
733
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
734
735
    logger.debug(
736
        f"{len(valid_mastr_gdf) - len(gdf)} "
737
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
738
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
739
        " and are therefore dropped."
740
    )
741
742
    return gdf
743
744
745
class EgonMastrPvRoofGeocoded(Base):
746
    __tablename__ = "egon_mastr_pv_roof_geocoded"
747
    __table_args__ = {"schema": "supply"}
748
749
    zip_and_municipality = Column(String, primary_key=True, index=True)
750
    location = Column(String)
751
    point = Column(String)
752
    latitude = Column(Float)
753
    longitude = Column(Float)
754
    altitude = Column(Float)
755
    geometry = Column(Geometry(srid=EPSG))
756
757
758
def create_geocoded_table(geocode_gdf):
759
    """
760
    Create geocoded table mastr pv rooftop
761
    Parameters
762
    -----------
763
    geocode_gdf : geopandas.GeoDataFrame
764
        GeoDataFrame containing geocoding information on pv rooftop locations.
765
    """
766
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
767
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
768
769
    geocode_gdf.to_postgis(
770
        name=EgonMastrPvRoofGeocoded.__table__.name,
771
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
772
        con=db.engine(),
773
        if_exists="append",
774
        index=False,
775
        # dtype={}
776
    )
777
778
779
def geocoded_data_from_db(
780
    epsg: str | int,
781
) -> gpd.GeoDataFrame:
782
    """
783
    Read OSM buildings data from eGo^n Database.
784
    Parameters
785
    -----------
786
    to_crs : pyproj.crs.crs.CRS
787
        CRS to transform geometries to.
788
    Returns
789
    -------
790
    geopandas.GeoDataFrame
791
        GeoDataFrame containing OSM buildings data.
792
    """
793
    with db.session_scope() as session:
794
        query = session.query(
795
            EgonMastrPvRoofGeocoded.zip_and_municipality,
796
            EgonMastrPvRoofGeocoded.geometry,
797
        )
798
799
    return gpd.read_postgis(
800
        query.statement, query.session.bind, geom_col="geometry"
801
    ).to_crs(f"EPSG:{epsg}")
802
803
804
def load_mastr_data():
805
    """Read PV rooftop data from MaStR CSV
806
    Note: the source will be replaced as soon as the MaStR data is available
807
    in DB.
808
    Returns
809
    -------
810
    geopandas.GeoDataFrame
811
        GeoDataFrame containing MaStR data with geocoded locations.
812
    """
813
    mastr_df = mastr_data(
814
        MASTR_INDEX_COL,
815
        MASTR_RELEVANT_COLS,
816
        MASTR_DTYPES,
817
        MASTR_PARSE_DATES,
818
    )
819
820
    clean_mastr_df = clean_mastr_data(
821
        mastr_df,
822
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
823
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
824
        seed=SEED,
825
        rounding=ROUNDING,
826
    )
827
828
    geocode_gdf = geocoded_data_from_db(EPSG)
829
830
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
831
832
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
833
834
    municipalities_gdf = municipality_data()
835
836
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
837
838
    return drop_gens_outside_muns(valid_mastr_gdf)
839
840
841
class OsmBuildingsFiltered(Base):
842
    __tablename__ = "osm_buildings_filtered"
843
    __table_args__ = {"schema": "openstreetmap"}
844
845
    osm_id = Column(BigInteger)
846
    amenity = Column(String)
847
    building = Column(String)
848
    name = Column(String)
849
    geom = Column(Geometry(srid=SRID), index=True)
850
    area = Column(Float)
851
    geom_point = Column(Geometry(srid=SRID), index=True)
852
    tags = Column(HSTORE)
853
    id = Column(BigInteger, primary_key=True, index=True)
854
855
856
@timer_func
857
def osm_buildings(
858
    to_crs: CRS,
859
) -> gpd.GeoDataFrame:
860
    """
861
    Read OSM buildings data from eGo^n Database.
862
    Parameters
863
    -----------
864
    to_crs : pyproj.crs.crs.CRS
865
        CRS to transform geometries to.
866
    Returns
867
    -------
868
    geopandas.GeoDataFrame
869
        GeoDataFrame containing OSM buildings data.
870
    """
871
    with db.session_scope() as session:
872
        query = session.query(
873
            OsmBuildingsFiltered.id,
874
            OsmBuildingsFiltered.area,
875
            OsmBuildingsFiltered.geom_point.label("geom"),
876
        )
877
878
    return gpd.read_postgis(
879
        query.statement, query.session.bind, index_col="id"
880
    ).to_crs(to_crs)
881
882
883
@timer_func
884
def synthetic_buildings(
885
    to_crs: CRS,
886
) -> gpd.GeoDataFrame:
887
    """
888
    Read synthetic buildings data from eGo^n Database.
889
    Parameters
890
    -----------
891
    to_crs : pyproj.crs.crs.CRS
892
        CRS to transform geometries to.
893
    Returns
894
    -------
895
    geopandas.GeoDataFrame
896
        GeoDataFrame containing OSM buildings data.
897
    """
898
    with db.session_scope() as session:
899
        query = session.query(
900
            OsmBuildingsSynthetic.id,
901
            OsmBuildingsSynthetic.area,
902
            OsmBuildingsSynthetic.geom_point.label("geom"),
903
        )
904
905
    return gpd.read_postgis(
906
        query.statement, query.session.bind, index_col="id"
907
    ).to_crs(to_crs)
908
909
910
@timer_func
911
def add_ags_to_buildings(
912
    buildings_gdf: gpd.GeoDataFrame,
913
    municipalities_gdf: gpd.GeoDataFrame,
914
) -> gpd.GeoDataFrame:
915
    """
916
    Add information about AGS ID to buildings.
917
    Parameters
918
    -----------
919
    buildings_gdf : geopandas.GeoDataFrame
920
        GeoDataFrame containing OSM buildings data.
921
    municipalities_gdf : geopandas.GeoDataFrame
922
        GeoDataFrame with municipality data.
923
    Returns
924
    -------
925
    gepandas.GeoDataFrame
926
        GeoDataFrame containing OSM buildings data
927
        with AGS ID added.
928
    """
929
    return buildings_gdf.sjoin(
930
        municipalities_gdf,
931
        how="left",
932
        predicate="intersects",
933
    ).rename(columns={"index_right": "ags"})
934
935
936
def drop_buildings_outside_muns(
937
    buildings_gdf: gpd.GeoDataFrame,
938
) -> gpd.GeoDataFrame:
939
    """
940
    Drop all buildings outside of municipalities.
941
    Parameters
942
    -----------
943
    buildings_gdf : geopandas.GeoDataFrame
944
        GeoDataFrame containing OSM buildings data.
945
    Returns
946
    -------
947
    gepandas.GeoDataFrame
948
        GeoDataFrame containing OSM buildings data
949
        with buildings without an AGS ID dropped.
950
    """
951
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
952
953
    logger.debug(
954
        f"{len(buildings_gdf) - len(gdf)} "
955
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
956
        f"of {len(buildings_gdf)} values are outside of the municipalities "
957
        "and are therefore dropped."
958
    )
959
960
    return gdf
961
962
963
def egon_building_peak_loads():
964
    sql = """
965
    SELECT building_id
966
    FROM demand.egon_building_electricity_peak_loads
967
    WHERE scenario = 'eGon2035'
968
    """
969
970
    return (
971
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
972
    )
973
974
975
@timer_func
976
def load_building_data():
977
    """
978
    Read buildings from DB
979
    Tables:
980
981
    * `openstreetmap.osm_buildings_filtered` (from OSM)
982
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
983
984
    Use column `id` for both as it is unique hence you concat both datasets. If
985
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
986
987
    Returns
988
    -------
989
    gepandas.GeoDataFrame
990
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
991
        dropped.
992
    """
993
994
    municipalities_gdf = municipality_data()
995
996
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
997
998
    if INCLUDE_SYNTHETIC_BUILDINGS:
999
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
1000
1001
        buildings_gdf = gpd.GeoDataFrame(
1002
            pd.concat(
1003
                [
1004
                    osm_buildings_gdf,
1005
                    synthetic_buildings_gdf,
1006
                ]
1007
            ),
1008
            geometry="geom",
1009
            crs=osm_buildings_gdf.crs,
1010
        ).rename(columns={"area": "building_area"})
1011
1012
        buildings_gdf.index = buildings_gdf.index.astype(int)
1013
1014
    else:
1015
        buildings_gdf = osm_buildings_gdf.rename(
1016
            columns={"area": "building_area"}
1017
        )
1018
1019
    if ONLY_BUILDINGS_WITH_DEMAND:
1020
        building_ids = egon_building_peak_loads()
1021
1022
        init_len = len(building_ids)
1023
1024
        building_ids = np.intersect1d(
1025
            list(map(int, building_ids)),
1026
            list(map(int, buildings_gdf.index.to_numpy())),
1027
        )
1028
1029
        end_len = len(building_ids)
1030
1031
        logger.debug(
1032
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of buildings have "
1033
            f"peak load."
1034
        )
1035
1036
        buildings_gdf = buildings_gdf.loc[building_ids]
1037
1038
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1039
1040
    buildings_ags_gdf = drop_buildings_outside_muns(buildings_ags_gdf)
1041
1042
    grid_districts_gdf = grid_districts(EPSG)
1043
1044
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
1045
1046
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
1047
        grid_districts_gdf,
1048
        federal_state_gdf,
1049
    )
1050
1051
    buildings_overlay_gdf = add_overlay_id_to_buildings(
1052
        buildings_ags_gdf,
1053
        grid_federal_state_gdf,
1054
    )
1055
1056
    logger.debug("Loaded buildings.")
1057
1058
    return drop_buildings_outside_grids(buildings_overlay_gdf)
1059
1060
1061
@timer_func
1062
def sort_and_qcut_df(
1063
    df: pd.DataFrame | gpd.GeoDataFrame,
1064
    col: str,
1065
    q: int,
1066
) -> pd.DataFrame | gpd.GeoDataFrame:
1067
    """
1068
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1069
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1070
    Parameters
1071
    -----------
1072
    df : pandas.DataFrame or geopandas.GeoDataFrame
1073
        (Geo)DataFrame to sort and qcut.
1074
    col : str
1075
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1076
    q : int
1077
        Number of quantiles.
1078
    Returns
1079
    -------
1080
    pandas.DataFrame or gepandas.GeoDataFrame
1081
        Sorted and qcut (Geo)DataFrame.
1082
    """
1083
    df = df.sort_values(col, ascending=True)
1084
1085
    return df.assign(
1086
        quant=pd.qcut(
1087
            df[col],
1088
            q=q,
1089
            labels=range(q),
1090
        )
1091
    )
1092
1093
1094
@timer_func
1095
def allocate_pv(
1096
    q_mastr_gdf: gpd.GeoDataFrame,
1097
    q_buildings_gdf: gpd.GeoDataFrame,
1098
    seed: int,
1099
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1100
    """
1101
    Allocate the MaStR pv generators to the OSM buildings.
1102
    This will determine a building for each pv generator if there are more
1103
    buildings than generators within a given AGS. Primarily generators are
1104
    distributed with the same qunatile as the buildings. Multiple assignment
1105
    is excluded.
1106
    Parameters
1107
    -----------
1108
    q_mastr_gdf : geopandas.GeoDataFrame
1109
        GeoDataFrame containing geocoded and qcut MaStR data.
1110
    q_buildings_gdf : geopandas.GeoDataFrame
1111
        GeoDataFrame containing qcut OSM buildings data.
1112
    seed : int
1113
        Seed to use for random operations with NumPy and pandas.
1114
    Returns
1115
    -------
1116
    tuple with two geopandas.GeoDataFrame s
1117
        GeoDataFrame containing MaStR data allocated to building IDs.
1118
        GeoDataFrame containing building data allocated to MaStR IDs.
1119
    """
1120
    rng = default_rng(seed=seed)
1121
1122
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1123
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1124
1125
    ags_list = q_buildings_gdf.ags.unique()
1126
1127
    if TEST_RUN:
1128
        ags_list = ags_list[:250]
1129
1130
    num_ags = len(ags_list)
1131
1132
    t0 = perf_counter()
1133
1134
    for count, ags in enumerate(ags_list):
1135
1136
        buildings = q_buildings_gdf.loc[
1137
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1138
        ]
1139
        gens = q_mastr_gdf.loc[
1140
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1141
        ]
1142
1143
        len_build = len(buildings)
1144
        len_gens = len(gens)
1145
1146
        if len_build < len_gens:
1147
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1148
            logger.error(
1149
                f"There are {len_gens} generators and only {len_build}"
1150
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1151
                "generators were truncated to match the amount of buildings."
1152
            )
1153
1154
            assert len_build == len(gens)
1155
1156
        for quant in gens.quant.unique():
1157
            q_buildings = buildings.loc[
1158
                (buildings.quant == quant) & (buildings.gens_id.isna())
1159
            ]
1160
            q_gens = gens.loc[gens.quant == quant]
1161
1162
            len_build = len(q_buildings)
1163
            len_gens = len(q_gens)
1164
1165
            if len_build < len_gens:
1166
                delta = len_gens - len_build
1167
1168
                logger.warning(
1169
                    f"There are {len_gens} generators and only {len_build} "
1170
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1171
                    f"buildings from AGS {ags} will be added randomly."
1172
                )
1173
1174
                add_buildings = pd.Index(
1175
                    rng.choice(
1176
                        buildings.loc[
1177
                            (buildings.quant != quant)
1178
                            & (buildings.gens_id.isna())
1179
                        ].index,
1180
                        size=delta,
1181
                        replace=False,
1182
                    )
1183
                )
1184
1185
                q_buildings = buildings.loc[
1186
                    q_buildings.index.append(add_buildings)
1187
                ]
1188
1189
                assert len(q_buildings) == len_gens
1190
1191
            chosen_buildings = pd.Index(
1192
                rng.choice(
1193
                    q_buildings.index,
1194
                    size=len_gens,
1195
                    replace=False,
1196
                )
1197
            )
1198
1199
            # q_mastr_gdf.loc[q_gens.index, "building_id"] = chosen_buildings
1200
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1201
1202
        if count % 100 == 0:
1203
            logger.debug(
1204
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1205
                f"{perf_counter() - t0:g} seconds."
1206
            )
1207
1208
            t0 = perf_counter()
1209
1210
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1211
1212
    q_mastr_gdf.loc[
1213
        assigned_buildings.gens_id, "building_id"
1214
    ] = assigned_buildings.index
1215
1216
    logger.debug("Allocated status quo generators to buildings.")
1217
1218
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1219
1220
1221
def frame_to_numeric(
1222
    df: pd.DataFrame | gpd.GeoDataFrame,
1223
) -> pd.DataFrame | gpd.GeoDataFrame:
1224
    """
1225
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1226
    Parameters
1227
    ----------
1228
    df : pandas.DataFrame or geopandas.GeoDataFrame
1229
    Returns
1230
    -------
1231
    pandas.DataFrame or geopandas.GeoDataFrame
1232
    """
1233
    if str(df.index.dtype) == "object":
1234
        df.index = pd.to_numeric(df.index, errors="ignore")
1235
1236
    for col in df.columns:
1237
        if str(df[col].dtype) == "object":
1238
            df[col] = pd.to_numeric(df[col], errors="ignore")
1239
1240
    return df
1241
1242
1243
def validate_output(
1244
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1245
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1246
) -> None:
1247
    """
1248
    Validate output.
1249
1250
    * Validate that there are exactly as many buildings with a pv system as there are
1251
      pv systems with a building
1252
    * Validate that the building IDs with a pv system are the same building IDs as
1253
      assigned to the pv systems
1254
    * Validate that the pv system IDs with a building are the same pv system IDs as
1255
      assigned to the buildings
1256
1257
    Parameters
1258
    -----------
1259
    desagg_mastr_gdf : geopandas.GeoDataFrame
1260
        GeoDataFrame containing MaStR data allocated to building IDs.
1261
    desagg_buildings_gdf : geopandas.GeoDataFrame
1262
        GeoDataFrame containing building data allocated to MaStR IDs.
1263
    """
1264
    assert len(
1265
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1266
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1267
    assert (
1268
        np.sort(
1269
            desagg_mastr_gdf.loc[
1270
                ~desagg_mastr_gdf.building_id.isna()
1271
            ].building_id.unique()
1272
        )
1273
        == np.sort(
1274
            desagg_buildings_gdf.loc[
1275
                ~desagg_buildings_gdf.gens_id.isna()
1276
            ].index.unique()
1277
        )
1278
    ).all()
1279
    assert (
1280
        np.sort(
1281
            desagg_mastr_gdf.loc[
1282
                ~desagg_mastr_gdf.building_id.isna()
1283
            ].index.unique()
1284
        )
1285
        == np.sort(
1286
            desagg_buildings_gdf.loc[
1287
                ~desagg_buildings_gdf.gens_id.isna()
1288
            ].gens_id.unique()
1289
        )
1290
    ).all()
1291
1292
    logger.debug("Validated output.")
1293
1294
1295
def drop_unallocated_gens(
1296
    gdf: gpd.GeoDataFrame,
1297
) -> gpd.GeoDataFrame:
1298
    """
1299
    Drop generators which did not get allocated.
1300
1301
    Parameters
1302
    -----------
1303
    gdf : geopandas.GeoDataFrame
1304
        GeoDataFrame containing MaStR data allocated to building IDs.
1305
    Returns
1306
    -------
1307
    geopandas.GeoDataFrame
1308
        GeoDataFrame containing MaStR data with generators dropped which did not get
1309
        allocated.
1310
    """
1311
    init_len = len(gdf)
1312
    gdf = gdf.loc[~gdf.building_id.isna()]
1313
    end_len = len(gdf)
1314
1315
    logger.debug(
1316
        f"Dropped {init_len - end_len} "
1317
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1318
        f" of {init_len} unallocated rows from MaStR DataFrame."
1319
    )
1320
1321
    return gdf
1322
1323
1324
@timer_func
1325
def allocate_to_buildings(
1326
    mastr_gdf: gpd.GeoDataFrame,
1327
    buildings_gdf: gpd.GeoDataFrame,
1328
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1329
    """
1330
    Allocate status quo pv rooftop generators to buildings.
1331
    Parameters
1332
    -----------
1333
    mastr_gdf : geopandas.GeoDataFrame
1334
        GeoDataFrame containing MaStR data with geocoded locations.
1335
    buildings_gdf : geopandas.GeoDataFrame
1336
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1337
        dropped.
1338
    Returns
1339
    -------
1340
    tuple with two geopandas.GeoDataFrame s
1341
        GeoDataFrame containing MaStR data allocated to building IDs.
1342
        GeoDataFrame containing building data allocated to MaStR IDs.
1343
    """
1344
    logger.debug("Starting allocation of status quo.")
1345
1346
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1347
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1348
1349
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1350
        q_mastr_gdf, q_buildings_gdf, SEED
1351
    )
1352
1353
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1354
1355
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1356
1357
1358
@timer_func
1359
def grid_districts(
1360
    epsg: int,
1361
) -> gpd.GeoDataFrame:
1362
    """
1363
    Load mv grid district geo data from eGo^n Database as
1364
    geopandas.GeoDataFrame.
1365
    Parameters
1366
    -----------
1367
    epsg : int
1368
        EPSG ID to use as CRS.
1369
    Returns
1370
    -------
1371
    geopandas.GeoDataFrame
1372
        GeoDataFrame containing mv grid district ID and geo shapes data.
1373
    """
1374
    gdf = db.select_geodataframe(
1375
        """
1376
        SELECT bus_id, geom
1377
        FROM grid.egon_mv_grid_district
1378
        ORDER BY bus_id
1379
        """,
1380
        index_col="bus_id",
1381
        geom_col="geom",
1382
        epsg=epsg,
1383
    )
1384
1385
    gdf.index = gdf.index.astype(int)
1386
1387
    logger.debug("Grid districts loaded.")
1388
1389
    return gdf
1390
1391
1392
def scenario_data(
1393
    carrier: str = "solar_rooftop",
1394
    scenario: str = "eGon2035",
1395
) -> pd.DataFrame:
1396
    """
1397
    Get scenario capacity data from eGo^n Database.
1398
    Parameters
1399
    -----------
1400
    carrier : str
1401
        Carrier type to filter table by.
1402
    scenario : str
1403
        Scenario to filter table by.
1404
    Returns
1405
    -------
1406
    geopandas.GeoDataFrame
1407
        GeoDataFrame with scenario capacity data in GW.
1408
    """
1409
    with db.session_scope() as session:
1410
        query = session.query(EgonScenarioCapacities).filter(
1411
            EgonScenarioCapacities.carrier == carrier,
1412
            EgonScenarioCapacities.scenario_name == scenario,
1413
        )
1414
1415
    df = pd.read_sql(
1416
        query.statement, query.session.bind, index_col="index"
1417
    ).sort_index()
1418
1419
    logger.debug("Scenario capacity data loaded.")
1420
1421
    return df
1422
1423
1424 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1425
    __tablename__ = "vg250_lan"
1426
    __table_args__ = {"schema": "boundaries"}
1427
1428
    id = Column(BigInteger, primary_key=True, index=True)
1429
    ade = Column(BigInteger)
1430
    gf = Column(BigInteger)
1431
    bsg = Column(BigInteger)
1432
    ars = Column(String)
1433
    ags = Column(String)
1434
    sdv_ars = Column(String)
1435
    gen = Column(String)
1436
    bez = Column(String)
1437
    ibz = Column(BigInteger)
1438
    bem = Column(String)
1439
    nbd = Column(String)
1440
    sn_l = Column(String)
1441
    sn_r = Column(String)
1442
    sn_k = Column(String)
1443
    sn_v1 = Column(String)
1444
    sn_v2 = Column(String)
1445
    sn_g = Column(String)
1446
    fk_s3 = Column(String)
1447
    nuts = Column(String)
1448
    ars_0 = Column(String)
1449
    ags_0 = Column(String)
1450
    wsk = Column(String)
1451
    debkg_id = Column(String)
1452
    rs = Column(String)
1453
    sdv_rs = Column(String)
1454
    rs_0 = Column(String)
1455
    geometry = Column(Geometry(srid=EPSG), index=True)
1456
1457
1458
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1459
    """
1460
    Get feder state data from eGo^n Database.
1461
    Parameters
1462
    -----------
1463
    to_crs : pyproj.crs.crs.CRS
1464
        CRS to transform geometries to.
1465
    Returns
1466
    -------
1467
    geopandas.GeoDataFrame
1468
        GeoDataFrame with federal state data.
1469
    """
1470
    with db.session_scope() as session:
1471
        query = session.query(
1472
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1473
        )
1474
1475
    gdf = gpd.read_postgis(
1476
        query.statement, query.session.bind, index_col="id"
1477
    ).to_crs(to_crs)
1478
1479
    logger.debug("Federal State data loaded.")
1480
1481
    return gdf
1482
1483
1484
@timer_func
1485
def overlay_grid_districts_with_counties(
1486
    mv_grid_district_gdf: gpd.GeoDataFrame,
1487
    federal_state_gdf: gpd.GeoDataFrame,
1488
) -> gpd.GeoDataFrame:
1489
    """
1490
    Calculate the intersections of mv grid districts and counties.
1491
    Parameters
1492
    -----------
1493
    mv_grid_district_gdf : gpd.GeoDataFrame
1494
        GeoDataFrame containing mv grid district ID and geo shapes data.
1495
    federal_state_gdf : gpd.GeoDataFrame
1496
        GeoDataFrame with federal state data.
1497
    Returns
1498
    -------
1499
    geopandas.GeoDataFrame
1500
        GeoDataFrame containing OSM buildings data.
1501
    """
1502
    logger.debug(
1503
        "Calculating intersection overlay between mv grid districts and "
1504
        "counties. This may take a while..."
1505
    )
1506
1507
    gdf = gpd.overlay(
1508
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1509
        mv_grid_district_gdf.reset_index(),
1510
        how="intersection",
1511
        keep_geom_type=True,
1512
    )
1513
1514
    logger.debug("Done!")
1515
1516
    return gdf
1517
1518
1519
@timer_func
1520
def add_overlay_id_to_buildings(
1521
    buildings_gdf: gpd.GeoDataFrame,
1522
    grid_federal_state_gdf: gpd.GeoDataFrame,
1523
) -> gpd.GeoDataFrame:
1524
    """
1525
    Add information about overlay ID to buildings.
1526
    Parameters
1527
    -----------
1528
    buildings_gdf : geopandas.GeoDataFrame
1529
        GeoDataFrame containing OSM buildings data.
1530
    grid_federal_state_gdf : geopandas.GeoDataFrame
1531
        GeoDataFrame with intersection shapes between counties and grid districts.
1532
    Returns
1533
    -------
1534
    geopandas.GeoDataFrame
1535
        GeoDataFrame containing OSM buildings data with overlay ID added.
1536
    """
1537
    gdf = (
1538
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1539
        .sjoin(
1540
            grid_federal_state_gdf,
1541
            how="left",
1542
            predicate="intersects",
1543
        )
1544
        .rename(columns={"index_right": "overlay_id"})
1545
    )
1546
1547
    logger.debug("Added overlay ID to OSM buildings.")
1548
1549
    return gdf
1550
1551
1552
def drop_buildings_outside_grids(
1553
    buildings_gdf: gpd.GeoDataFrame,
1554
) -> gpd.GeoDataFrame:
1555
    """
1556
    Drop all buildings outside of grid areas.
1557
    Parameters
1558
    -----------
1559
    buildings_gdf : geopandas.GeoDataFrame
1560
        GeoDataFrame containing OSM buildings data.
1561
    Returns
1562
    -------
1563
    gepandas.GeoDataFrame
1564
        GeoDataFrame containing OSM buildings data
1565
        with buildings without an bus ID dropped.
1566
    """
1567
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1568
1569
    logger.debug(
1570
        f"{len(buildings_gdf) - len(gdf)} "
1571
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1572
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1573
        "and are therefore dropped."
1574
    )
1575
1576
    return gdf
1577
1578
1579
def cap_per_bus_id(
1580
    scenario: str,
1581
) -> pd.DataFrame:
1582
    """
1583
    Get table with total pv rooftop capacity per grid district.
1584
1585
    Parameters
1586
    -----------
1587
    scenario : str
1588
        Scenario name.
1589
    Returns
1590
    -------
1591
    pandas.DataFrame
1592
        DataFrame with total rooftop capacity per mv grid.
1593
    """
1594
    targets = config.datasets()["solar_rooftop"]["targets"]
1595
1596
    sql = f"""
1597
    SELECT bus as bus_id, p_nom as capacity
1598
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1599
    WHERE carrier = 'solar_rooftop'
1600
    AND scn_name = '{scenario}'
1601
    """
1602
1603
    return db.select_dataframe(sql, index_col="bus_id")
1604
1605
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1606
    #
1607
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1608
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1609
    #
1610
    #     capacity = nuts_gdf.building_area.multiply(
1611
    #         cap / nuts_gdf.building_area.sum()
1612
    #     )
1613
    #
1614
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1615
    #         nuts_gdf.index
1616
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1617
    #
1618
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1619
1620
1621
def determine_end_of_life_gens(
1622
    mastr_gdf: gpd.GeoDataFrame,
1623
    scenario_timestamp: pd.Timestamp,
1624
    pv_rooftop_lifetime: pd.Timedelta,
1625
) -> gpd.GeoDataFrame:
1626
    """
1627
    Determine if an old PV system has reached its end of life.
1628
    Parameters
1629
    -----------
1630
    mastr_gdf : geopandas.GeoDataFrame
1631
        GeoDataFrame containing geocoded MaStR data.
1632
    scenario_timestamp : pandas.Timestamp
1633
        Timestamp at which the scenario takes place.
1634
    pv_rooftop_lifetime : pandas.Timedelta
1635
        Average expected lifetime of PV rooftop systems.
1636
    Returns
1637
    -------
1638
    geopandas.GeoDataFrame
1639
        GeoDataFrame containing geocoded MaStR data and info if the system
1640
        has reached its end of life.
1641
    """
1642
    before = mastr_gdf.capacity.sum()
1643
1644
    mastr_gdf = mastr_gdf.assign(
1645
        age=scenario_timestamp - mastr_gdf.start_up_date
1646
    )
1647
1648
    mastr_gdf = mastr_gdf.assign(
1649
        end_of_life=pv_rooftop_lifetime < mastr_gdf.age
1650
    )
1651
1652
    after = mastr_gdf.loc[~mastr_gdf.end_of_life].capacity.sum()
1653
1654
    logger.debug(
1655
        "Determined if pv rooftop systems reached their end of life.\nTotal capacity: "
1656
        f"{before}\nActive capacity: {after}"
1657
    )
1658
1659
    return mastr_gdf
1660
1661
1662
def calculate_max_pv_cap_per_building(
1663
    buildings_gdf: gpd.GeoDataFrame,
1664
    mastr_gdf: gpd.GeoDataFrame,
1665
    pv_cap_per_sq_m: float | int,
1666
    roof_factor: float | int,
1667
) -> gpd.GeoDataFrame:
1668
    """
1669
    Calculate the estimated maximum possible PV capacity per building.
1670
    Parameters
1671
    -----------
1672
    buildings_gdf : geopandas.GeoDataFrame
1673
        GeoDataFrame containing OSM buildings data.
1674
    mastr_gdf : geopandas.GeoDataFrame
1675
        GeoDataFrame containing geocoded MaStR data.
1676
    pv_cap_per_sq_m : float, int
1677
        Average expected, installable PV capacity per square meter.
1678
    roof_factor : float, int
1679
        Average for PV usable roof area share.
1680
    Returns
1681
    -------
1682
    geopandas.GeoDataFrame
1683
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1684
        capacity.
1685
    """
1686
    gdf = (
1687
        buildings_gdf.reset_index()
1688
        .merge(
1689
            mastr_gdf[
1690
                [
1691
                    "capacity",
1692
                    "end_of_life",
1693
                    "building_id",
1694
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1695
                    "Hauptausrichtung",
1696
                    "HauptausrichtungNeigungswinkel",
1697
                ]
1698
            ],
1699
            how="left",
1700
            left_on="id",
1701
            right_on="building_id",
1702
        )
1703
        .set_index("id")
1704
        .drop(columns="building_id")
1705
    )
1706
1707
    return gdf.assign(
1708
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1709
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1710
        bus_id=gdf.bus_id.astype(int),
1711
    )
1712
1713
1714
def calculate_building_load_factor(
1715
    mastr_gdf: gpd.GeoDataFrame,
1716
    buildings_gdf: gpd.GeoDataFrame,
1717
    rounding: int = 4,
1718
) -> gpd.GeoDataFrame:
1719
    """
1720
    Calculate the roof load factor from existing PV systems.
1721
    Parameters
1722
    -----------
1723
    mastr_gdf : geopandas.GeoDataFrame
1724
        GeoDataFrame containing geocoded MaStR data.
1725
    buildings_gdf : geopandas.GeoDataFrame
1726
        GeoDataFrame containing OSM buildings data.
1727
    rounding : int
1728
        Rounding to use for load factor.
1729
    Returns
1730
    -------
1731
    geopandas.GeoDataFrame
1732
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1733
    """
1734
    gdf = mastr_gdf.merge(
1735
        buildings_gdf[["max_cap", "building_area"]].loc[
1736
            ~buildings_gdf["max_cap"].isna()
1737
        ],
1738
        how="left",
1739
        left_on="building_id",
1740
        right_index=True,
1741
    )
1742
1743
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1744
1745
1746
def get_probability_for_property(
1747
    mastr_gdf: gpd.GeoDataFrame,
1748
    cap_range: tuple[int | float, int | float],
1749
    prop: str,
1750
) -> tuple[np.array, np.array]:
1751
    """
1752
    Calculate the probability of the different options of a property of the
1753
    existing PV plants.
1754
    Parameters
1755
    -----------
1756
    mastr_gdf : geopandas.GeoDataFrame
1757
        GeoDataFrame containing geocoded MaStR data.
1758
    cap_range : tuple(int, int)
1759
        Capacity range of PV plants to look at.
1760
    prop : str
1761
        Property to calculate probabilities for. String needs to be in columns
1762
        of mastr_gdf.
1763
    Returns
1764
    -------
1765
    tuple
1766
        numpy.array
1767
            Unique values of property.
1768
        numpy.array
1769
            Probabilties per unique value.
1770
    """
1771
    cap_range_gdf = mastr_gdf.loc[
1772
        (mastr_gdf.capacity > cap_range[0])
1773
        & (mastr_gdf.capacity <= cap_range[1])
1774
    ]
1775
1776
    if prop == "load_factor":
1777
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1778
1779
    count = Counter(
1780
        cap_range_gdf[prop].loc[
1781
            ~cap_range_gdf[prop].isna()
1782
            & ~cap_range_gdf[prop].isnull()
1783
            & ~(cap_range_gdf[prop] == "None")
1784
        ]
1785
    )
1786
1787
    values = np.array(list(count.keys()))
1788
    probabilities = np.fromiter(count.values(), dtype=float)
1789
    probabilities = probabilities / np.sum(probabilities)
1790
1791
    return values, probabilities
1792
1793
1794
@timer_func
1795
def probabilities(
1796
    mastr_gdf: gpd.GeoDataFrame,
1797
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1798
    properties: list[str] | None = None,
1799
) -> dict:
1800
    """
1801
    Calculate the probability of the different options of properties of the
1802
    existing PV plants.
1803
    Parameters
1804
    -----------
1805
    mastr_gdf : geopandas.GeoDataFrame
1806
        GeoDataFrame containing geocoded MaStR data.
1807
    cap_ranges : list(tuple(int, int))
1808
        List of capacity ranges to distinguish between. The first tuple should
1809
        start with a zero and the last one should end with infinite.
1810
    properties : list(str)
1811
        List of properties to calculate probabilities for. Strings needs to be
1812
        in columns of mastr_gdf.
1813
    Returns
1814
    -------
1815
    dict
1816
        Dictionary with values and probabilities per capacity range.
1817
    """
1818
    if cap_ranges is None:
1819
        cap_ranges = [
1820
            (0, 30),
1821
            (30, 100),
1822
            (100, float("inf")),
1823
        ]
1824
    if properties is None:
1825
        properties = [
1826
            "EinheitlicheAusrichtungUndNeigungswinkel",
1827
            "Hauptausrichtung",
1828
            "HauptausrichtungNeigungswinkel",
1829
            "load_factor",
1830
        ]
1831
1832
    prob_dict = {}
1833
1834
    for cap_range in cap_ranges:
1835
        prob_dict[cap_range] = {
1836
            "values": {},
1837
            "probabilities": {},
1838
        }
1839
1840
        for prop in properties:
1841
            v, p = get_probability_for_property(
1842
                mastr_gdf,
1843
                cap_range,
1844
                prop,
1845
            )
1846
1847
            prob_dict[cap_range]["values"][prop] = v
1848
            prob_dict[cap_range]["probabilities"][prop] = p
1849
1850
    return prob_dict
1851
1852
1853
def cap_share_per_cap_range(
1854
    mastr_gdf: gpd.GeoDataFrame,
1855
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1856
) -> dict[tuple[int | float, int | float], float]:
1857
    """
1858
    Calculate the share of PV capacity from the total PV capacity within
1859
    capacity ranges.
1860
    Parameters
1861
    -----------
1862
    mastr_gdf : geopandas.GeoDataFrame
1863
        GeoDataFrame containing geocoded MaStR data.
1864
    cap_ranges : list(tuple(int, int))
1865
        List of capacity ranges to distinguish between. The first tuple should
1866
        start with a zero and the last one should end with infinite.
1867
    Returns
1868
    -------
1869
    dict
1870
        Dictionary with share of PV capacity from the total PV capacity within
1871
        capacity ranges.
1872
    """
1873
    if cap_ranges is None:
1874
        cap_ranges = [
1875
            (0, 30),
1876
            (30, 100),
1877
            (100, float("inf")),
1878
        ]
1879
1880
    cap_share_dict = {}
1881
1882
    total_cap = mastr_gdf.capacity.sum()
1883
1884
    for cap_range in cap_ranges:
1885
        cap_share = (
1886
            mastr_gdf.loc[
1887
                (mastr_gdf.capacity > cap_range[0])
1888
                & (mastr_gdf.capacity <= cap_range[1])
1889
            ].capacity.sum()
1890
            / total_cap
1891
        )
1892
1893
        cap_share_dict[cap_range] = cap_share
1894
1895
    return cap_share_dict
1896
1897
1898
def mean_load_factor_per_cap_range(
1899
    mastr_gdf: gpd.GeoDataFrame,
1900
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1901
) -> dict[tuple[int | float, int | float], float]:
1902
    """
1903
    Calculate the mean roof load factor per capacity range from existing PV
1904
    plants.
1905
    Parameters
1906
    -----------
1907
    mastr_gdf : geopandas.GeoDataFrame
1908
        GeoDataFrame containing geocoded MaStR data.
1909
    cap_ranges : list(tuple(int, int))
1910
        List of capacity ranges to distinguish between. The first tuple should
1911
        start with a zero and the last one should end with infinite.
1912
    Returns
1913
    -------
1914
    dict
1915
        Dictionary with mean roof load factor per capacity range.
1916
    """
1917
    if cap_ranges is None:
1918
        cap_ranges = [
1919
            (0, 30),
1920
            (30, 100),
1921
            (100, float("inf")),
1922
        ]
1923
1924
    load_factor_dict = {}
1925
1926
    for cap_range in cap_ranges:
1927
        load_factor = mastr_gdf.loc[
1928
            (mastr_gdf.load_factor <= 1)
1929
            & (mastr_gdf.capacity > cap_range[0])
1930
            & (mastr_gdf.capacity <= cap_range[1])
1931
        ].load_factor.mean()
1932
1933
        load_factor_dict[cap_range] = load_factor
1934
1935
    return load_factor_dict
1936
1937
1938
def building_area_range_per_cap_range(
1939
    mastr_gdf: gpd.GeoDataFrame,
1940
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1941
    min_building_size: int | float = 10.0,
1942
    upper_quantile: float = 0.95,
1943
    lower_quantile: float = 0.05,
1944
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1945
    """
1946
    Estimate normal building area range per capacity range.
1947
    Calculate the mean roof load factor per capacity range from existing PV
1948
    plants.
1949
    Parameters
1950
    -----------
1951
    mastr_gdf : geopandas.GeoDataFrame
1952
        GeoDataFrame containing geocoded MaStR data.
1953
    cap_ranges : list(tuple(int, int))
1954
        List of capacity ranges to distinguish between. The first tuple should
1955
        start with a zero and the last one should end with infinite.
1956
    min_building_size : int, float
1957
        Minimal building size to consider for PV plants.
1958
    upper_quantile : float
1959
        Upper quantile to estimate maximum building size per capacity range.
1960
    lower_quantile : float
1961
        Lower quantile to estimate minimum building size per capacity range.
1962
    Returns
1963
    -------
1964
    dict
1965
        Dictionary with estimated normal building area range per capacity
1966
        range.
1967
    """
1968
    if cap_ranges is None:
1969
        cap_ranges = [
1970
            (0, 30),
1971
            (30, 100),
1972
            (100, float("inf")),
1973
        ]
1974
1975
    building_area_range_dict = {}
1976
1977
    n_ranges = len(cap_ranges)
1978
1979
    for count, cap_range in enumerate(cap_ranges):
1980
        cap_range_gdf = mastr_gdf.loc[
1981
            (mastr_gdf.capacity > cap_range[0])
1982
            & (mastr_gdf.capacity <= cap_range[1])
1983
        ]
1984
1985
        if count == 0:
1986
            building_area_range_dict[cap_range] = (
1987
                min_building_size,
1988
                cap_range_gdf.building_area.quantile(upper_quantile),
1989
            )
1990
        elif count == n_ranges - 1:
1991
            building_area_range_dict[cap_range] = (
1992
                cap_range_gdf.building_area.quantile(lower_quantile),
1993
                float("inf"),
1994
            )
1995
        else:
1996
            building_area_range_dict[cap_range] = (
1997
                cap_range_gdf.building_area.quantile(lower_quantile),
1998
                cap_range_gdf.building_area.quantile(upper_quantile),
1999
            )
2000
2001
    values = list(building_area_range_dict.values())
2002
2003
    building_area_range_normed_dict = {}
2004
2005
    for count, (cap_range, (min_area, max_area)) in enumerate(
2006
        building_area_range_dict.items()
2007
    ):
2008
        if count == 0:
2009
            building_area_range_normed_dict[cap_range] = (
2010
                min_area,
2011
                np.mean((values[count + 1][0], max_area)),
2012
            )
2013
        elif count == n_ranges - 1:
2014
            building_area_range_normed_dict[cap_range] = (
2015
                np.mean((values[count - 1][1], min_area)),
2016
                max_area,
2017
            )
2018
        else:
2019
            building_area_range_normed_dict[cap_range] = (
2020
                np.mean((values[count - 1][1], min_area)),
2021
                np.mean((values[count + 1][0], max_area)),
2022
            )
2023
2024
    return building_area_range_normed_dict
2025
2026
2027
@timer_func
2028
def desaggregate_pv_in_mv_grid(
2029
    buildings_gdf: gpd.GeoDataFrame,
2030
    pv_cap: float | int,
2031
    **kwargs,
2032
) -> gpd.GeoDataFrame:
2033
    """
2034
    Desaggregate PV capacity on buildings within a given grid district.
2035
    Parameters
2036
    -----------
2037
    buildings_gdf : geopandas.GeoDataFrame
2038
        GeoDataFrame containing buildings within the grid district.
2039
    pv_cap : float, int
2040
        PV capacity to desaggregate.
2041
    Other Parameters
2042
    -----------
2043
    prob_dict : dict
2044
        Dictionary with values and probabilities per capacity range.
2045
    cap_share_dict : dict
2046
        Dictionary with share of PV capacity from the total PV capacity within
2047
        capacity ranges.
2048
    building_area_range_dict : dict
2049
        Dictionary with estimated normal building area range per capacity
2050
        range.
2051
    load_factor_dict : dict
2052
        Dictionary with mean roof load factor per capacity range.
2053
    seed : int
2054
        Seed to use for random operations with NumPy and pandas.
2055
    pv_cap_per_sq_m : float, int
2056
        Average expected, installable PV capacity per square meter.
2057
    Returns
2058
    -------
2059
    geopandas.GeoDataFrame
2060
        GeoDataFrame containing OSM building data with desaggregated PV
2061
        plants.
2062
    """
2063
    bus_id = int(buildings_gdf.bus_id.iat[0])
2064
2065
    rng = default_rng(seed=kwargs["seed"])
2066
    random_state = RandomState(seed=kwargs["seed"])
2067
2068
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2069
2070
    for cap_range, share in kwargs["cap_share_dict"].items():
2071
        pv_cap_range = pv_cap * share
2072
2073
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2074
2075
        cap_range_buildings_gdf = buildings_gdf.loc[
2076
            ~buildings_gdf.index.isin(results_df.index)
2077
            & (buildings_gdf.building_area > b_area_min)
2078
            & (buildings_gdf.building_area <= b_area_max)
2079
        ]
2080
2081
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2082
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2083
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2084
            load_factor=np.nan,
2085
            capacity=np.nan,
2086
        )
2087
2088
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2089
2090
        if total_mean_cap == 0:
2091
            logger.warning(
2092
                f"There are no matching roof for capacity range {cap_range} "
2093
                f"kW in grid {bus_id}. Using all buildings as fallback."
2094
            )
2095
2096
            cap_range_buildings_gdf = buildings_gdf.loc[
2097
                ~buildings_gdf.index.isin(results_df.index)
2098
            ]
2099
2100
            if len(cap_range_buildings_gdf) == 0:
2101
                logger.warning(
2102
                    "There are no roofes available for capacity range "
2103
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2104
                )
2105
                cap_range_buildings_gdf = buildings_gdf.copy()
2106
2107
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2108
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2109
                load_factor=np.nan,
2110
                capacity=np.nan,
2111
            )
2112
2113
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2114
2115
        elif total_mean_cap < pv_cap_range:
2116
            logger.warning(
2117
                f"Average roof utilization of the roof area in grid {bus_id} "
2118
                f"and capacity range {cap_range} kW is not sufficient. The "
2119
                "roof utilization will be above average."
2120
            )
2121
2122
        frac = max(
2123
            pv_cap_range / total_mean_cap,
2124
            1 / len(cap_range_buildings_gdf),
2125
        )
2126
2127
        samples_gdf = cap_range_buildings_gdf.sample(
2128
            frac=min(1, frac),
2129
            random_state=random_state,
2130
        )
2131
2132
        cap_range_dict = kwargs["prob_dict"][cap_range]
2133
2134
        values_dict = cap_range_dict["values"]
2135
        p_dict = cap_range_dict["probabilities"]
2136
2137
        load_factors = rng.choice(
2138
            a=values_dict["load_factor"],
2139
            size=len(samples_gdf),
2140
            p=p_dict["load_factor"],
2141
        )
2142
2143
        samples_gdf = samples_gdf.assign(
2144
            load_factor=load_factors,
2145
            capacity=(
2146
                samples_gdf.building_area
2147
                * load_factors
2148
                * kwargs["pv_cap_per_sq_m"]
2149
            ).clip(lower=0.4),
2150
        )
2151
2152
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2153
2154
        samples_gdf = samples_gdf.assign(
2155
            capacity=(samples_gdf.capacity * missing_factor),
2156
            load_factor=(samples_gdf.load_factor * missing_factor),
2157
        )
2158
2159
        assert np.isclose(
2160
            samples_gdf.capacity.sum(),
2161
            pv_cap_range,
2162
            rtol=1e-03,
2163
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2164
2165
        results_df = pd.concat(
2166
            [
2167
                results_df,
2168
                samples_gdf,
2169
            ],
2170
        )
2171
2172
    total_missing_factor = pv_cap / results_df.capacity.sum()
2173
2174
    results_df = results_df.assign(
2175
        capacity=(results_df.capacity * total_missing_factor),
2176
    )
2177
2178
    assert np.isclose(
2179
        results_df.capacity.sum(),
2180
        pv_cap,
2181
        rtol=1e-03,
2182
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2183
2184
    return gpd.GeoDataFrame(
2185
        results_df,
2186
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2070 is not entered. Are you sure this can never be the case?
Loading history...
2187
        geometry="geom",
2188
    )
2189
2190
2191
@timer_func
2192
def desaggregate_pv(
2193
    buildings_gdf: gpd.GeoDataFrame,
2194
    cap_df: pd.DataFrame,
2195
    **kwargs,
2196
) -> gpd.GeoDataFrame:
2197
    """
2198
    Desaggregate PV capacity on buildings within a given grid district.
2199
    Parameters
2200
    -----------
2201
    buildings_gdf : geopandas.GeoDataFrame
2202
        GeoDataFrame containing OSM buildings data.
2203
    cap_df : pandas.DataFrame
2204
        DataFrame with total rooftop capacity per mv grid.
2205
    Other Parameters
2206
    -----------
2207
    prob_dict : dict
2208
        Dictionary with values and probabilities per capacity range.
2209
    cap_share_dict : dict
2210
        Dictionary with share of PV capacity from the total PV capacity within
2211
        capacity ranges.
2212
    building_area_range_dict : dict
2213
        Dictionary with estimated normal building area range per capacity
2214
        range.
2215
    load_factor_dict : dict
2216
        Dictionary with mean roof load factor per capacity range.
2217
    seed : int
2218
        Seed to use for random operations with NumPy and pandas.
2219
    pv_cap_per_sq_m : float, int
2220
        Average expected, installable PV capacity per square meter.
2221
    Returns
2222
    -------
2223
    geopandas.GeoDataFrame
2224
        GeoDataFrame containing OSM building data with desaggregated PV
2225
        plants.
2226
    """
2227
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2228
2229
    building_bus_ids = set(buildings_gdf.bus_id)
2230
    cap_bus_ids = set(cap_df.index)
2231
2232
    logger.debug(
2233
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from capacity: "
2234
        f"{len(cap_bus_ids)}"
2235
    )
2236
2237
    if len(building_bus_ids) > len(cap_bus_ids):
2238
        missing = building_bus_ids - cap_bus_ids
2239
    else:
2240
        missing = cap_bus_ids - building_bus_ids
2241
2242
    logger.debug(str(missing))
2243
2244
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2245
2246
    for bus_id in buildings_gdf.bus_id.unique():
2247
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2248
2249
        pv_installed_gdf = buildings_grid_gdf.loc[
2250
            ~buildings_grid_gdf.end_of_life
2251
        ]
2252
2253
        pv_installed = pv_installed_gdf.capacity.sum()
2254
2255
        pot_buildings_gdf = buildings_grid_gdf.drop(
2256
            index=pv_installed_gdf.index
2257
        )
2258
2259
        if len(pot_buildings_gdf) == 0:
2260
            logger.error(
2261
                f"In grid {bus_id} there are no potential buildings to allocate "
2262
                "PV capacity to. The grid is skipped. This message should only "
2263
                "appear doing test runs with few buildings."
2264
            )
2265
2266
            continue
2267
2268
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2269
2270
        logger.debug(f"pv_target: {pv_target}")
2271
2272
        pv_missing = pv_target - pv_installed
2273
2274
        if pv_missing <= 0:
2275
            logger.info(
2276
                f"In grid {bus_id} there is more PV installed ({pv_installed: g}) in "
2277
                f"status Quo than allocated within the scenario ({pv_target: g}). No "
2278
                f"new generators are added."
2279
            )
2280
2281
            continue
2282
2283
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2284
            logger.error(
2285
                f"In grid {bus_id} there is less PV potential ("
2286
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV  capacity "
2287
                f"({pv_missing:g} kW). The average roof utilization will be very high."
2288
            )
2289
2290
        gdf = desaggregate_pv_in_mv_grid(
2291
            buildings_gdf=pot_buildings_gdf,
2292
            pv_cap=pv_missing,
2293
            **kwargs,
2294
        )
2295
2296
        logger.debug(f"New cap in grid {bus_id}: {gdf.capacity.sum()}")
2297
        logger.debug(f"Installed cap in grid {bus_id}: {pv_installed}")
2298
        logger.debug(
2299
            f"Total cap in grid {bus_id}: {gdf.capacity.sum() + pv_installed}"
2300
        )
2301
2302
        if not np.isclose(
2303
            gdf.capacity.sum() + pv_installed, pv_target, rtol=1e-3
2304
        ):
2305
            logger.warning(
2306
                f"The desired capacity and actual capacity in grid {bus_id} differ.\n"
2307
                f"Desired cap: {pv_target}\nActual cap: "
2308
                f"{gdf.capacity.sum() + pv_installed}"
2309
            )
2310
2311
        allocated_buildings_gdf = pd.concat(
2312
            [
2313
                allocated_buildings_gdf,
2314
                gdf,
2315
            ]
2316
        )
2317
2318
    logger.debug("Desaggregated scenario.")
2319
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2320
    logger.debug(
2321
        f"Generator capacity: {allocated_buildings_gdf.capacity.sum(): g}"
2322
    )
2323
2324
    return gpd.GeoDataFrame(
2325
        allocated_buildings_gdf,
2326
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2327
        geometry="geom",
2328
    )
2329
2330
2331
@timer_func
2332
def add_buildings_meta_data(
2333
    buildings_gdf: gpd.GeoDataFrame,
2334
    prob_dict: dict,
2335
    seed: int,
2336
) -> gpd.GeoDataFrame:
2337
    """
2338
    Randomly add additional metadata to desaggregated PV plants.
2339
    Parameters
2340
    -----------
2341
    buildings_gdf : geopandas.GeoDataFrame
2342
        GeoDataFrame containing OSM buildings data with desaggregated PV
2343
        plants.
2344
    prob_dict : dict
2345
        Dictionary with values and probabilities per capacity range.
2346
    seed : int
2347
        Seed to use for random operations with NumPy and pandas.
2348
    Returns
2349
    -------
2350
    geopandas.GeoDataFrame
2351
        GeoDataFrame containing OSM building data with desaggregated PV
2352
        plants.
2353
    """
2354
    rng = default_rng(seed=seed)
2355
    buildings_gdf = buildings_gdf.reset_index().rename(
2356
        columns={
2357
            "index": "building_id",
2358
        }
2359
    )
2360
2361
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2362
        cap_range_gdf = buildings_gdf.loc[
2363
            (buildings_gdf.capacity >= min_cap)
2364
            & (buildings_gdf.capacity < max_cap)
2365
        ]
2366
2367
        for key, values in cap_range_prob_dict["values"].items():
2368
            if key == "load_factor":
2369
                continue
2370
2371
            gdf = cap_range_gdf.loc[
2372
                cap_range_gdf[key].isna()
2373
                | cap_range_gdf[key].isnull()
2374
                | (cap_range_gdf[key] == "None")
2375
            ]
2376
2377
            key_vals = rng.choice(
2378
                a=values,
2379
                size=len(gdf),
2380
                p=cap_range_prob_dict["probabilities"][key],
2381
            )
2382
2383
            buildings_gdf.loc[gdf.index, key] = key_vals
2384
2385
    return buildings_gdf
2386
2387
2388
def add_voltage_level(
2389
    buildings_gdf: gpd.GeoDataFrame,
2390
) -> gpd.GeoDataFrame:
2391
    """
2392
    Add voltage level derived from generator capacity to the power plants.
2393
    Parameters
2394
    -----------
2395
    buildings_gdf : geopandas.GeoDataFrame
2396
        GeoDataFrame containing OSM buildings data with desaggregated PV
2397
        plants.
2398
    Returns
2399
    -------
2400
    geopandas.GeoDataFrame
2401
        GeoDataFrame containing OSM building data with voltage level per generator.
2402
    """
2403
2404
    def voltage_levels(p: float) -> int:
2405
        if p < 100:
2406
            return 7
2407
        elif p < 200:
2408
            return 6
2409
        elif p < 5500:
2410
            return 5
2411
        elif p < 20000:
2412
            return 4
2413
        elif p < 120000:
2414
            return 3
2415
        return 1
2416
2417
    return buildings_gdf.assign(
2418
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2419
    )
2420
2421
2422
def add_start_up_date(
2423
    buildings_gdf: gpd.GeoDataFrame,
2424
    start: pd.Timestamp,
2425
    end: pd.Timestamp,
2426
    seed: int,
2427
):
2428
    """
2429
    Randomly and linear add start-up date to new pv generators.
2430
    Parameters
2431
    ----------
2432
    buildings_gdf : geopandas.GeoDataFrame
2433
        GeoDataFrame containing OSM buildings data with desaggregated PV
2434
        plants.
2435
    start : pandas.Timestamp
2436
        Minimum Timestamp to use.
2437
    end : pandas.Timestamp
2438
        Maximum Timestamp to use.
2439
    seed : int
2440
        Seed to use for random operations with NumPy and pandas.
2441
    Returns
2442
    -------
2443
    geopandas.GeoDataFrame
2444
        GeoDataFrame containing OSM buildings data with start-up date added.
2445
    """
2446
    rng = default_rng(seed=seed)
2447
2448
    date_range = pd.date_range(start=start, end=end, freq="1D")
2449
2450
    return buildings_gdf.assign(
2451
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2452
    )
2453
2454
2455
@timer_func
2456
def allocate_scenarios(
2457
    mastr_gdf: gpd.GeoDataFrame,
2458
    valid_buildings_gdf: gpd.GeoDataFrame,
2459
    last_scenario_gdf: gpd.GeoDataFrame,
2460
    scenario: str,
2461
):
2462
    """
2463
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2464
    Parameters
2465
    ----------
2466
    mastr_gdf : geopandas.GeoDataFrame
2467
        GeoDataFrame containing geocoded MaStR data.
2468
    valid_buildings_gdf : geopandas.GeoDataFrame
2469
        GeoDataFrame containing OSM buildings data.
2470
    last_scenario_gdf : geopandas.GeoDataFrame
2471
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2472
        preceding scenario.
2473
    scenario : str
2474
        Scenario to desaggrgate and allocate.
2475
    Returns
2476
    -------
2477
    tuple
2478
        geopandas.GeoDataFrame
2479
            GeoDataFrame containing OSM buildings matched with pv generators.
2480
        pandas.DataFrame
2481
            DataFrame containing pv rooftop capacity per grid id.
2482
    """
2483
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2484
2485
    logger.debug(
2486
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2487
    )
2488
2489
    last_scenario_gdf = determine_end_of_life_gens(
2490
        last_scenario_gdf,
2491
        SCENARIO_TIMESTAMP[scenario],
2492
        PV_ROOFTOP_LIFETIME,
2493
    )
2494
2495
    buildings_gdf = calculate_max_pv_cap_per_building(
2496
        valid_buildings_gdf,
2497
        last_scenario_gdf,
2498
        PV_CAP_PER_SQ_M,
2499
        ROOF_FACTOR,
2500
    )
2501
2502
    mastr_gdf = calculate_building_load_factor(
2503
        mastr_gdf,
2504
        buildings_gdf,
2505
    )
2506
2507
    probabilities_dict = probabilities(
2508
        mastr_gdf,
2509
        cap_ranges=CAP_RANGES,
2510
    )
2511
2512
    cap_share_dict = cap_share_per_cap_range(
2513
        mastr_gdf,
2514
        cap_ranges=CAP_RANGES,
2515
    )
2516
2517
    load_factor_dict = mean_load_factor_per_cap_range(
2518
        mastr_gdf,
2519
        cap_ranges=CAP_RANGES,
2520
    )
2521
2522
    building_area_range_dict = building_area_range_per_cap_range(
2523
        mastr_gdf,
2524
        cap_ranges=CAP_RANGES,
2525
        min_building_size=MIN_BUILDING_SIZE,
2526
        upper_quantile=UPPER_QUNATILE,
2527
        lower_quantile=LOWER_QUANTILE,
2528
    )
2529
2530
    allocated_buildings_gdf = desaggregate_pv(
2531
        buildings_gdf=buildings_gdf,
2532
        cap_df=cap_per_bus_id_df,
2533
        prob_dict=probabilities_dict,
2534
        cap_share_dict=cap_share_dict,
2535
        building_area_range_dict=building_area_range_dict,
2536
        load_factor_dict=load_factor_dict,
2537
        seed=SEED,
2538
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2539
    )
2540
2541
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2542
2543
    meta_buildings_gdf = frame_to_numeric(
2544
        add_buildings_meta_data(
2545
            allocated_buildings_gdf,
2546
            probabilities_dict,
2547
            SEED,
2548
        )
2549
    )
2550
2551
    return (
2552
        add_start_up_date(
2553
            meta_buildings_gdf,
2554
            start=last_scenario_gdf.start_up_date.max(),
2555
            end=SCENARIO_TIMESTAMP[scenario],
2556
            seed=SEED,
2557
        ),
2558
        cap_per_bus_id_df,
2559
    )
2560
2561
2562
class EgonPowerPlantPvRoofBuildingScenario(Base):
2563
    __tablename__ = "egon_power_plants_pv_roof_building"
2564
    __table_args__ = {"schema": "supply"}
2565
2566
    index = Column(Integer, primary_key=True, index=True)
2567
    scenario = Column(String)
2568
    building_id = Column(Integer)
2569
    gens_id = Column(String, nullable=True)
2570
    capacity = Column(Float)
2571
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2572
    hauptausrichtung = Column(String)
2573
    hauptausrichtung_neigungswinkel = Column(String)
2574
    voltage_level = Column(Integer)
2575
2576
2577
def create_scenario_table(buildings_gdf):
2578
    """Create mapping table pv_unit <-> building for scenario"""
2579
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2580
        bind=engine, checkfirst=True
2581
    )
2582
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2583
        bind=engine, checkfirst=True
2584
    )
2585
2586
    buildings_gdf.rename(columns=COLS_TO_RENAME).assign(
2587
        capacity=buildings_gdf.capacity.div(10**3)  # kW -> MW
2588
    )[COLS_TO_EXPORT].reset_index().to_sql(
2589
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2590
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2591
        con=db.engine(),
2592
        if_exists="append",
2593
        index=False,
2594
    )
2595
2596
2597
def geocode_mastr_data():
2598
    """
2599
    Read PV rooftop data from MaStR CSV
2600
    TODO: the source will be replaced as soon as the MaStR data is available
2601
     in DB.
2602
    """
2603
    mastr_df = mastr_data(
2604
        MASTR_INDEX_COL,
2605
        MASTR_RELEVANT_COLS,
2606
        MASTR_DTYPES,
2607
        MASTR_PARSE_DATES,
2608
    )
2609
2610
    clean_mastr_df = clean_mastr_data(
2611
        mastr_df,
2612
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2613
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2614
        seed=SEED,
2615
        rounding=ROUNDING,
2616
    )
2617
2618
    geocoding_df = geocoding_data(clean_mastr_df)
2619
2620
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2621
2622
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2623
2624
    create_geocoded_table(geocode_gdf)
2625
2626
2627
def pv_rooftop_to_buildings():
2628
    """Main script, executed as task"""
2629
2630
    mastr_gdf = load_mastr_data()
2631
2632
    buildings_gdf = load_building_data()
2633
2634
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2635
        mastr_gdf, buildings_gdf
2636
    )
2637
2638
    all_buildings_gdf = (
2639
        desagg_mastr_gdf.assign(scenario="status_quo")
2640
        .reset_index()
2641
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2642
    )
2643
2644
    scenario_buildings_gdf = all_buildings_gdf.copy()
2645
2646
    cap_per_bus_id_df = pd.DataFrame()
2647
2648
    for scenario in SCENARIOS:
2649
        logger.debug(f"Desaggregating scenario {scenario}.")
2650
        (
2651
            scenario_buildings_gdf,
2652
            cap_per_bus_id_scenario_df,
2653
        ) = allocate_scenarios(  # noqa: F841
2654
            desagg_mastr_gdf,
2655
            desagg_buildings_gdf,
2656
            scenario_buildings_gdf,
2657
            scenario,
2658
        )
2659
2660
        all_buildings_gdf = gpd.GeoDataFrame(
2661
            pd.concat(
2662
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2663
            ),
2664
            crs=scenario_buildings_gdf.crs,
2665
            geometry="geom",
2666
        )
2667
2668
        cap_per_bus_id_df = pd.concat(
2669
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2670
        )
2671
2672
    # export scenario
2673
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2674