Passed
Pull Request — dev (#934)
by
unknown
01:28
created

allocate_pv()   C

Complexity

Conditions 7

Size

Total Lines 125
Code Lines 62

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 62
dl 0
loc 125
rs 6.8436
c 0
b 0
f 0
cc 7
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035"]  # , "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(30 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "building_id",
193
    "gens_id",
194
    "capacity",
195
    "einheitliche_ausrichtung_und_neigungswinkel",
196
    "hauptausrichtung",
197
    "hauptausrichtung_neigungswinkel",
198
    "voltage_level",
199
]
200
201
# TODO
202
INCLUDE_SYNTHETIC_BUILDINGS = True
203
ONLY_BUILDINGS_WITH_DEMAND = True
204
TEST_RUN = False
205
206
207
def timer_func(func):
208
    @wraps(func)
209
    def timeit_wrapper(*args, **kwargs):
210
        start_time = perf_counter()
211
        result = func(*args, **kwargs)
212
        end_time = perf_counter()
213
        total_time = end_time - start_time
214
        logger.debug(
215
            f"Function {func.__name__} took {total_time:.4f} seconds."
216
        )
217
        return result
218
219
    return timeit_wrapper
220
221
222
@timer_func
223
def mastr_data(
224
    index_col: str | int | list[str] | list[int],
225
    usecols: list[str],
226
    dtype: dict[str, Any] | None,
227
    parse_dates: list[str] | None,
228
) -> pd.DataFrame:
229
    """
230
    Read MaStR data from csv.
231
232
    Parameters
233
    -----------
234
    index_col : str, int or list of str or int
235
        Column(s) to use as the row labels of the DataFrame.
236
    usecols : list of str
237
        Return a subset of the columns.
238
    dtype : dict of column (str) -> type (any), optional
239
        Data type for data or columns.
240
    parse_dates : list of names (str), optional
241
        Try to parse given columns to datetime.
242
    Returns
243
    -------
244
    pandas.DataFrame
245
        DataFrame containing MaStR data.
246
    """
247
    mastr_path = Path(
248
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
249
    ).resolve()
250
251
    mastr_df = pd.read_csv(
252
        mastr_path,
253
        index_col=index_col,
254
        usecols=usecols,
255
        dtype=dtype,
256
        parse_dates=parse_dates,
257
    )
258
259
    mastr_df = mastr_df.loc[
260
        (mastr_df.StatisikFlag == "B")
261
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
262
        & (mastr_df.Land == "Deutschland")
263
        & (mastr_df.Lage == "BaulicheAnlagen")
264
    ]
265
266
    if (
267
        config.settings()["egon-data"]["--dataset-boundary"]
268
        == "Schleswig-Holstein"
269
    ):
270
        init_len = len(mastr_df)
271
272
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
273
274
        logger.info(
275
            f"Using only MaStR data within Schleswig-Holstein. "
276
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
277
        )
278
279
    logger.debug("MaStR data loaded.")
280
281
    return mastr_df
282
283
284
@timer_func
285
def clean_mastr_data(
286
    mastr_df: pd.DataFrame,
287
    max_realistic_pv_cap: int | float,
288
    min_realistic_pv_cap: int | float,
289
    rounding: int,
290
    seed: int,
291
) -> pd.DataFrame:
292
    """
293
    Clean the MaStR data from implausible data.
294
295
    * Drop MaStR ID duplicates.
296
    * Drop generators with implausible capacities.
297
    * Drop generators without any kind of start-up date.
298
    * Clean up Standort column and capacity.
299
300
    Parameters
301
    -----------
302
    mastr_df : pandas.DataFrame
303
        DataFrame containing MaStR data.
304
    max_realistic_pv_cap : int or float
305
        Maximum capacity, which is considered to be realistic.
306
    min_realistic_pv_cap : int or float
307
        Minimum capacity, which is considered to be realistic.
308
    rounding : int
309
        Rounding to use when cleaning up capacity. E.g. when
310
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
311
    seed : int
312
        Seed to use for random operations with NumPy and pandas.
313
    Returns
314
    -------
315
    pandas.DataFrame
316
        DataFrame containing cleaned MaStR data.
317
    """
318
    init_len = len(mastr_df)
319
320
    # drop duplicates
321
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
322
323
    # drop invalid entries in standort
324
    index_to_drop = mastr_df.loc[
325
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
326
    ].index
327
328
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
329
330
    df = mastr_df[
331
        [
332
            "Bruttoleistung",
333
            "Bruttoleistung_extended",
334
            "Nettonennleistung",
335
            "zugeordneteWirkleistungWechselrichter",
336
            "InstallierteLeistung",
337
        ]
338
    ].round(rounding)
339
340
    # use only the smallest capacity rating if multiple are given
341
    mastr_df = mastr_df.assign(
342
        capacity=[
343
            most_plausible(p_tub, min_realistic_pv_cap)
344
            for p_tub in df.itertuples(index=False)
345
        ]
346
    )
347
348
    # drop generators without any capacity info
349
    # and capacity of zero
350
    # and if the capacity is > 23.5 MW, because
351
    # Germanies largest rooftop PV is 23 MW
352
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
353
    mastr_df = mastr_df.loc[
354
        (~mastr_df.capacity.isna())
355
        & (mastr_df.capacity <= max_realistic_pv_cap)
356
        & (mastr_df.capacity > min_realistic_pv_cap)
357
    ]
358
359
    # get zip and municipality
360
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
361
        mastr_df.Standort.astype(str)
362
        .apply(
363
            zip_and_municipality_from_standort,
364
            args=(VERBOSE,),
365
        )
366
        .tolist(),
367
        index=mastr_df.index,
368
    )
369
370
    # drop invalid entries
371
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
372
373
    # add ", Deutschland" just in case
374
    mastr_df = mastr_df.assign(
375
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
376
    )
377
378
    # get consistent start-up date
379
    mastr_df = mastr_df.assign(
380
        start_up_date=mastr_df.Inbetriebnahmedatum,
381
    )
382
383
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
384
        mastr_df.start_up_date.isna()
385
    ].assign(
386
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
387
            mastr_df.start_up_date.isna()
388
        ]
389
    )
390
391
    # randomly and weighted fill missing start-up dates
392
    pool = mastr_df.loc[
393
        ~mastr_df.start_up_date.isna()
394
    ].start_up_date.to_numpy()
395
396
    size = len(mastr_df) - len(pool)
397
398
    if size > 0:
399
        np.random.seed(seed)
400
401
        choice = np.random.choice(
402
            pool,
403
            size=size,
404
            replace=False,
405
        )
406
407
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
408
            mastr_df.start_up_date.isna()
409
        ].assign(start_up_date=choice)
410
411
        logger.info(
412
            f"Randomly and weigthed added start-up date to {size} generators."
413
        )
414
415
    mastr_df = mastr_df.assign(
416
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
417
    )
418
419
    end_len = len(mastr_df)
420
    logger.debug(
421
        f"Dropped {init_len - end_len} "
422
        f"({((init_len - end_len) / init_len) * 100:g}%)"
423
        f" of {init_len} rows from MaStR DataFrame."
424
    )
425
426
    return mastr_df
427
428
429
def zip_and_municipality_from_standort(
430
    standort: str,
431
    verbose: bool = False,
432
) -> tuple[str, bool]:
433
    """
434
    Get zip code and municipality from Standort string split into a list.
435
    Parameters
436
    -----------
437
    standort : str
438
        Standort as given from MaStR data.
439
    verbose : bool
440
        Logs additional info if True.
441
    Returns
442
    -------
443
    str
444
        Standort with only the zip code and municipality
445
        as well a ', Germany' added.
446
    """
447
    if verbose:
448
        logger.debug(f"Uncleaned String: {standort}")
449
450
    standort_list = standort.split()
451
452
    found = False
453
    count = 0
454
455
    for count, elem in enumerate(standort_list):
456
        if len(elem) != 5:
457
            continue
458
        if not elem.isnumeric():
459
            continue
460
461
        found = True
462
463
        break
464
465
    if found:
466
        cleaned_str = " ".join(standort_list[count:])
467
468
        if verbose:
469
            logger.debug(f"Cleaned String:   {cleaned_str}")
470
471
        return cleaned_str, found
472
473
    logger.warning(
474
        "Couldn't identify zip code. This entry will be dropped."
475
        f" Original standort: {standort}."
476
    )
477
478
    return standort, found
479
480
481
def most_plausible(
482
    p_tub: tuple,
483
    min_realistic_pv_cap: int | float,
484
) -> float:
485
    """
486
    Try to determine the most plausible capacity.
487
    Try to determine the most plausible capacity from a given
488
    generator from MaStR data.
489
    Parameters
490
    -----------
491
    p_tub : tuple
492
        Tuple containing the different capacities given in
493
        the MaStR data.
494
    min_realistic_pv_cap : int or float
495
        Minimum capacity, which is considered to be realistic.
496
    Returns
497
    -------
498
    float
499
        Capacity of the generator estimated as the most realistic.
500
    """
501
    count = Counter(p_tub).most_common(3)
502
503
    if len(count) == 1:
504
        return count[0][0]
505
506
    val1 = count[0][0]
507
    val2 = count[1][0]
508
509
    if len(count) == 2:
510
        min_val = min(val1, val2)
511
        max_val = max(val1, val2)
512
    else:
513
        val3 = count[2][0]
514
515
        min_val = min(val1, val2, val3)
516
        max_val = max(val1, val2, val3)
517
518
    if min_val < min_realistic_pv_cap:
519
        return max_val
520
521
    return min_val
522
523
524
def geocoder(
525
    user_agent: str,
526
    min_delay_seconds: int,
527
) -> RateLimiter:
528
    """
529
    Setup Nominatim geocoding class.
530
    Parameters
531
    -----------
532
    user_agent : str
533
        The app name.
534
    min_delay_seconds : int
535
        Delay in seconds to use between requests to Nominatim.
536
        A minimum of 1 is advised.
537
    Returns
538
    -------
539
    geopy.extra.rate_limiter.RateLimiter
540
        Nominatim RateLimiter geocoding class to use for geocoding.
541
    """
542
    locator = Nominatim(user_agent=user_agent)
543
    return RateLimiter(
544
        locator.geocode,
545
        min_delay_seconds=min_delay_seconds,
546
    )
547
548
549
def geocoding_data(
550
    clean_mastr_df: pd.DataFrame,
551
) -> pd.DataFrame:
552
    """
553
    Setup DataFrame to geocode.
554
    Parameters
555
    -----------
556
    clean_mastr_df : pandas.DataFrame
557
        DataFrame containing cleaned MaStR data.
558
    Returns
559
    -------
560
    pandas.DataFrame
561
        DataFrame containing all unique combinations of
562
        zip codes with municipalities for geocoding.
563
    """
564
    return pd.DataFrame(
565
        data=clean_mastr_df.zip_and_municipality.unique(),
566
        columns=["zip_and_municipality"],
567
    )
568
569
570
@timer_func
571
def geocode_data(
572
    geocoding_df: pd.DataFrame,
573
    ratelimiter: RateLimiter,
574
    epsg: int,
575
) -> gpd.GeoDataFrame:
576
    """
577
    Geocode zip code and municipality.
578
    Extract latitude, longitude and altitude.
579
    Transfrom latitude and longitude to shapely
580
    Point and return a geopandas GeoDataFrame.
581
    Parameters
582
    -----------
583
    geocoding_df : pandas.DataFrame
584
        DataFrame containing all unique combinations of
585
        zip codes with municipalities for geocoding.
586
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
587
        Nominatim RateLimiter geocoding class to use for geocoding.
588
    epsg : int
589
        EPSG ID to use as CRS.
590
    Returns
591
    -------
592
    geopandas.GeoDataFrame
593
        GeoDataFrame containing all unique combinations of
594
        zip codes with municipalities with matching geolocation.
595
    """
596
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
597
598
    geocode_df = geocoding_df.assign(
599
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
600
    )
601
602
    geocode_df = geocode_df.assign(
603
        point=geocode_df.location.apply(
604
            lambda loc: tuple(loc.point) if loc else None
605
        )
606
    )
607
608
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
609
        geocode_df.point.tolist(), index=geocode_df.index
610
    )
611
612
    return gpd.GeoDataFrame(
613
        geocode_df,
614
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
615
        crs=f"EPSG:{epsg}",
616
    )
617
618
619
def merge_geocode_with_mastr(
620
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
621
) -> gpd.GeoDataFrame:
622
    """
623
    Merge geometry to original mastr data.
624
    Parameters
625
    -----------
626
    clean_mastr_df : pandas.DataFrame
627
        DataFrame containing cleaned MaStR data.
628
    geocode_gdf : geopandas.GeoDataFrame
629
        GeoDataFrame containing all unique combinations of
630
        zip codes with municipalities with matching geolocation.
631
    Returns
632
    -------
633
    gepandas.GeoDataFrame
634
        GeoDataFrame containing cleaned MaStR data with
635
        matching geolocation from geocoding.
636
    """
637
    return gpd.GeoDataFrame(
638
        clean_mastr_df.merge(
639
            geocode_gdf[["zip_and_municipality", "geometry"]],
640
            how="left",
641
            left_on="zip_and_municipality",
642
            right_on="zip_and_municipality",
643
        ),
644
        crs=geocode_gdf.crs,
645
    ).set_index(clean_mastr_df.index)
646
647
648
def drop_invalid_entries_from_gdf(
649
    gdf: gpd.GeoDataFrame,
650
) -> gpd.GeoDataFrame:
651
    """
652
    Drop invalid entries from geopandas GeoDataFrame.
653
    TODO: how to omit the logging from geos here???
654
    Parameters
655
    -----------
656
    gdf : geopandas.GeoDataFrame
657
        GeoDataFrame to be checked for validity.
658
    Returns
659
    -------
660
    gepandas.GeoDataFrame
661
        GeoDataFrame with rows with invalid geometries
662
        dropped.
663
    """
664
    valid_gdf = gdf.loc[gdf.is_valid]
665
666
    logger.debug(
667
        f"{len(gdf) - len(valid_gdf)} "
668
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
669
        f"of {len(gdf)} values were invalid and are dropped."
670
    )
671
672
    return valid_gdf
673
674
675
@timer_func
676
def municipality_data() -> gpd.GeoDataFrame:
677
    """
678
    Get municipality data from eGo^n Database.
679
    Returns
680
    -------
681
    gepandas.GeoDataFrame
682
        GeoDataFrame with municipality data.
683
    """
684
    with db.session_scope() as session:
685
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
686
687
    return gpd.read_postgis(
688
        query.statement, query.session.bind, index_col="ags"
689
    )
690
691
692
@timer_func
693
def add_ags_to_gens(
694
    valid_mastr_gdf: gpd.GeoDataFrame,
695
    municipalities_gdf: gpd.GeoDataFrame,
696
) -> gpd.GeoDataFrame:
697
    """
698
    Add information about AGS ID to generators.
699
    Parameters
700
    -----------
701
    valid_mastr_gdf : geopandas.GeoDataFrame
702
        GeoDataFrame with valid and cleaned MaStR data.
703
    municipalities_gdf : geopandas.GeoDataFrame
704
        GeoDataFrame with municipality data.
705
    Returns
706
    -------
707
    gepandas.GeoDataFrame
708
        GeoDataFrame with valid and cleaned MaStR data
709
        with AGS ID added.
710
    """
711
    return valid_mastr_gdf.sjoin(
712
        municipalities_gdf,
713
        how="left",
714
        predicate="intersects",
715
    ).rename(columns={"index_right": "ags"})
716
717
718
def drop_gens_outside_muns(
719
    valid_mastr_gdf: gpd.GeoDataFrame,
720
) -> gpd.GeoDataFrame:
721
    """
722
    Drop all generators outside of municipalities.
723
    Parameters
724
    -----------
725
    valid_mastr_gdf : geopandas.GeoDataFrame
726
        GeoDataFrame with valid and cleaned MaStR data.
727
    Returns
728
    -------
729
    gepandas.GeoDataFrame
730
        GeoDataFrame with valid and cleaned MaStR data
731
        with generatos without an AGS ID dropped.
732
    """
733
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
734
735
    logger.debug(
736
        f"{len(valid_mastr_gdf) - len(gdf)} "
737
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
738
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
739
        " and are therefore dropped."
740
    )
741
742
    return gdf
743
744
745
class EgonMastrPvRoofGeocoded(Base):
746
    __tablename__ = "egon_mastr_pv_roof_geocoded"
747
    __table_args__ = {"schema": "supply"}
748
749
    zip_and_municipality = Column(String, primary_key=True, index=True)
750
    location = Column(String)
751
    point = Column(String)
752
    latitude = Column(Float)
753
    longitude = Column(Float)
754
    altitude = Column(Float)
755
    geometry = Column(Geometry(srid=EPSG))
756
757
758
def create_geocoded_table(geocode_gdf):
759
    """
760
    Create geocoded table mastr pv rooftop
761
    Parameters
762
    -----------
763
    geocode_gdf : geopandas.GeoDataFrame
764
        GeoDataFrame containing geocoding information on pv rooftop locations.
765
    """
766
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
767
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
768
769
    geocode_gdf.to_postgis(
770
        name=EgonMastrPvRoofGeocoded.__table__.name,
771
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
772
        con=db.engine(),
773
        if_exists="append",
774
        index=False,
775
        # dtype={}
776
    )
777
778
779
def geocoded_data_from_db(
780
    epsg: str | int,
781
) -> gpd.GeoDataFrame:
782
    """
783
    Read OSM buildings data from eGo^n Database.
784
    Parameters
785
    -----------
786
    to_crs : pyproj.crs.crs.CRS
787
        CRS to transform geometries to.
788
    Returns
789
    -------
790
    geopandas.GeoDataFrame
791
        GeoDataFrame containing OSM buildings data.
792
    """
793
    with db.session_scope() as session:
794
        query = session.query(
795
            EgonMastrPvRoofGeocoded.zip_and_municipality,
796
            EgonMastrPvRoofGeocoded.geometry,
797
        )
798
799
    return gpd.read_postgis(
800
        query.statement, query.session.bind, geom_col="geometry"
801
    ).to_crs(f"EPSG:{epsg}")
802
803
804
def load_mastr_data():
805
    """Read PV rooftop data from MaStR CSV
806
    Note: the source will be replaced as soon as the MaStR data is available
807
    in DB.
808
    Returns
809
    -------
810
    geopandas.GeoDataFrame
811
        GeoDataFrame containing MaStR data with geocoded locations.
812
    """
813
    mastr_df = mastr_data(
814
        MASTR_INDEX_COL,
815
        MASTR_RELEVANT_COLS,
816
        MASTR_DTYPES,
817
        MASTR_PARSE_DATES,
818
    )
819
820
    clean_mastr_df = clean_mastr_data(
821
        mastr_df,
822
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
823
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
824
        seed=SEED,
825
        rounding=ROUNDING,
826
    )
827
828
    geocode_gdf = geocoded_data_from_db(EPSG)
829
830
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
831
832
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
833
834
    municipalities_gdf = municipality_data()
835
836
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
837
838
    return drop_gens_outside_muns(valid_mastr_gdf)
839
840
841
class OsmBuildingsFiltered(Base):
842
    __tablename__ = "osm_buildings_filtered"
843
    __table_args__ = {"schema": "openstreetmap"}
844
845
    osm_id = Column(BigInteger)
846
    amenity = Column(String)
847
    building = Column(String)
848
    name = Column(String)
849
    geom = Column(Geometry(srid=SRID), index=True)
850
    area = Column(Float)
851
    geom_point = Column(Geometry(srid=SRID), index=True)
852
    tags = Column(HSTORE)
853
    id = Column(BigInteger, primary_key=True, index=True)
854
855
856
@timer_func
857
def osm_buildings(
858
    to_crs: CRS,
859
) -> gpd.GeoDataFrame:
860
    """
861
    Read OSM buildings data from eGo^n Database.
862
    Parameters
863
    -----------
864
    to_crs : pyproj.crs.crs.CRS
865
        CRS to transform geometries to.
866
    Returns
867
    -------
868
    geopandas.GeoDataFrame
869
        GeoDataFrame containing OSM buildings data.
870
    """
871
    with db.session_scope() as session:
872
        query = session.query(
873
            OsmBuildingsFiltered.id,
874
            OsmBuildingsFiltered.area,
875
            OsmBuildingsFiltered.geom_point.label("geom"),
876
        )
877
878
    return gpd.read_postgis(
879
        query.statement, query.session.bind, index_col="id"
880
    ).to_crs(to_crs)
881
882
883
@timer_func
884
def synthetic_buildings(
885
    to_crs: CRS,
886
) -> gpd.GeoDataFrame:
887
    """
888
    Read synthetic buildings data from eGo^n Database.
889
    Parameters
890
    -----------
891
    to_crs : pyproj.crs.crs.CRS
892
        CRS to transform geometries to.
893
    Returns
894
    -------
895
    geopandas.GeoDataFrame
896
        GeoDataFrame containing OSM buildings data.
897
    """
898
    with db.session_scope() as session:
899
        query = session.query(
900
            OsmBuildingsSynthetic.id,
901
            OsmBuildingsSynthetic.area,
902
            OsmBuildingsSynthetic.geom_point.label("geom"),
903
        )
904
905
    return gpd.read_postgis(
906
        query.statement, query.session.bind, index_col="id"
907
    ).to_crs(to_crs)
908
909
910
@timer_func
911
def add_ags_to_buildings(
912
    buildings_gdf: gpd.GeoDataFrame,
913
    municipalities_gdf: gpd.GeoDataFrame,
914
) -> gpd.GeoDataFrame:
915
    """
916
    Add information about AGS ID to buildings.
917
    Parameters
918
    -----------
919
    buildings_gdf : geopandas.GeoDataFrame
920
        GeoDataFrame containing OSM buildings data.
921
    municipalities_gdf : geopandas.GeoDataFrame
922
        GeoDataFrame with municipality data.
923
    Returns
924
    -------
925
    gepandas.GeoDataFrame
926
        GeoDataFrame containing OSM buildings data
927
        with AGS ID added.
928
    """
929
    return buildings_gdf.sjoin(
930
        municipalities_gdf,
931
        how="left",
932
        predicate="intersects",
933
    ).rename(columns={"index_right": "ags"})
934
935
936
def drop_buildings_outside_muns(
937
    buildings_gdf: gpd.GeoDataFrame,
938
) -> gpd.GeoDataFrame:
939
    """
940
    Drop all buildings outside of municipalities.
941
    Parameters
942
    -----------
943
    buildings_gdf : geopandas.GeoDataFrame
944
        GeoDataFrame containing OSM buildings data.
945
    Returns
946
    -------
947
    gepandas.GeoDataFrame
948
        GeoDataFrame containing OSM buildings data
949
        with buildings without an AGS ID dropped.
950
    """
951
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
952
953
    logger.debug(
954
        f"{len(buildings_gdf) - len(gdf)} "
955
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
956
        f"of {len(buildings_gdf)} values are outside of the municipalities "
957
        "and are therefore dropped."
958
    )
959
960
    return gdf
961
962
963
def egon_building_peak_loads():
964
    sql = """
965
    SELECT building_id
966
    FROM demand.egon_building_electricity_peak_loads
967
    WHERE scenario = 'eGon2035'
968
    """
969
970
    return (
971
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
972
    )
973
974
975
@timer_func
976
def load_building_data():
977
    """
978
    Read buildings from DB
979
    Tables:
980
981
    * `openstreetmap.osm_buildings_filtered` (from OSM)
982
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
983
984
    Use column `id` for both as it is unique hence you concat both datasets. If
985
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
986
987
    Returns
988
    -------
989
    gepandas.GeoDataFrame
990
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
991
        dropped.
992
    """
993
994
    municipalities_gdf = municipality_data()
995
996
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
997
998
    if INCLUDE_SYNTHETIC_BUILDINGS:
999
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
1000
1001
        buildings_gdf = gpd.GeoDataFrame(
1002
            pd.concat(
1003
                [
1004
                    osm_buildings_gdf,
1005
                    synthetic_buildings_gdf,
1006
                ]
1007
            ),
1008
            geometry="geom",
1009
            crs=osm_buildings_gdf.crs,
1010
        ).rename(columns={"area": "building_area"})
1011
1012
        buildings_gdf.index = buildings_gdf.index.astype(int)
1013
1014
    else:
1015
        buildings_gdf = osm_buildings_gdf.rename(
1016
            columns={"area": "building_area"}
1017
        )
1018
1019
    if ONLY_BUILDINGS_WITH_DEMAND:
1020
        building_ids = egon_building_peak_loads()
1021
1022
        init_len = len(building_ids)
1023
1024
        logger.debug(f"Dtype building_ids: {building_ids.type}")
1025
        logger.debug(
1026
            f"Dtype buildings_gdf.index: {buildings_gdf.index.to_numpy().type}"
1027
        )
1028
1029
        building_ids = np.intersect1d(
1030
            list(map(int, building_ids)),
1031
            list(map(int, buildings_gdf.index.to_numpy())),
1032
        )
1033
1034
        end_len = len(building_ids)
1035
1036
        logger.debug(
1037
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of buildings have "
1038
            f"peak load."
1039
        )
1040
1041
        buildings_gdf = buildings_gdf.loc[building_ids]
1042
1043
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1044
1045
    logger.debug("Loaded buildings.")
1046
1047
    return drop_buildings_outside_muns(buildings_ags_gdf)
1048
1049
1050
@timer_func
1051
def sort_and_qcut_df(
1052
    df: pd.DataFrame | gpd.GeoDataFrame,
1053
    col: str,
1054
    q: int,
1055
) -> pd.DataFrame | gpd.GeoDataFrame:
1056
    """
1057
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1058
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1059
    Parameters
1060
    -----------
1061
    df : pandas.DataFrame or geopandas.GeoDataFrame
1062
        (Geo)DataFrame to sort and qcut.
1063
    col : str
1064
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1065
    q : int
1066
        Number of quantiles.
1067
    Returns
1068
    -------
1069
    pandas.DataFrame or gepandas.GeoDataFrame
1070
        Sorted and qcut (Geo)DataFrame.
1071
    """
1072
    df = df.sort_values(col, ascending=True)
1073
1074
    return df.assign(
1075
        quant=pd.qcut(
1076
            df[col],
1077
            q=q,
1078
            labels=range(q),
1079
        )
1080
    )
1081
1082
1083
@timer_func
1084
def allocate_pv(
1085
    q_mastr_gdf: gpd.GeoDataFrame,
1086
    q_buildings_gdf: gpd.GeoDataFrame,
1087
    seed: int,
1088
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1089
    """
1090
    Allocate the MaStR pv generators to the OSM buildings.
1091
    This will determine a building for each pv generator if there are more
1092
    buildings than generators within a given AGS. Primarily generators are
1093
    distributed with the same qunatile as the buildings. Multiple assignment
1094
    is excluded.
1095
    Parameters
1096
    -----------
1097
    q_mastr_gdf : geopandas.GeoDataFrame
1098
        GeoDataFrame containing geocoded and qcut MaStR data.
1099
    q_buildings_gdf : geopandas.GeoDataFrame
1100
        GeoDataFrame containing qcut OSM buildings data.
1101
    seed : int
1102
        Seed to use for random operations with NumPy and pandas.
1103
    Returns
1104
    -------
1105
    tuple with two geopandas.GeoDataFrame s
1106
        GeoDataFrame containing MaStR data allocated to building IDs.
1107
        GeoDataFrame containing building data allocated to MaStR IDs.
1108
    """
1109
    rng = default_rng(seed=seed)
1110
1111
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1112
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1113
1114
    ags_list = q_buildings_gdf.ags.unique()
1115
1116
    if TEST_RUN:
1117
        ags_list = ags_list[:250]
1118
1119
    num_ags = len(ags_list)
1120
1121
    t0 = perf_counter()
1122
1123
    for count, ags in enumerate(ags_list):
1124
1125
        buildings = q_buildings_gdf.loc[
1126
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1127
        ]
1128
        gens = q_mastr_gdf.loc[
1129
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1130
        ]
1131
1132
        len_build = len(buildings)
1133
        len_gens = len(gens)
1134
1135
        if len_build < len_gens:
1136
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1137
            logger.error(
1138
                f"There are {len_gens} generators and only {len_build}"
1139
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1140
                "generators were truncated to match the amount of buildings."
1141
            )
1142
1143
            assert len_build == len(gens)
1144
1145
        for quant in gens.quant.unique():
1146
            q_buildings = buildings.loc[
1147
                (buildings.quant == quant) & (buildings.gens_id.isna())
1148
            ]
1149
            q_gens = gens.loc[gens.quant == quant]
1150
1151
            len_build = len(q_buildings)
1152
            len_gens = len(q_gens)
1153
1154
            if len_build < len_gens:
1155
                delta = len_gens - len_build
1156
1157
                logger.warning(
1158
                    f"There are {len_gens} generators and only {len_build} "
1159
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1160
                    f"buildings from AGS {ags} will be added randomly."
1161
                )
1162
1163
                add_buildings = pd.Index(
1164
                    rng.choice(
1165
                        buildings.loc[
1166
                            (buildings.quant != quant)
1167
                            & (buildings.gens_id.isna())
1168
                        ].index,
1169
                        size=delta,
1170
                        replace=False,
1171
                    )
1172
                )
1173
1174
                q_buildings = buildings.loc[
1175
                    q_buildings.index.append(add_buildings)
1176
                ]
1177
1178
                assert len(q_buildings) == len_gens
1179
1180
            chosen_buildings = pd.Index(
1181
                rng.choice(
1182
                    q_buildings.index,
1183
                    size=len_gens,
1184
                    replace=False,
1185
                )
1186
            )
1187
1188
            # q_mastr_gdf.loc[q_gens.index, "building_id"] = chosen_buildings
1189
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1190
1191
        if count % 100 == 0:
1192
            logger.debug(
1193
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1194
                f"{perf_counter() - t0:g} seconds."
1195
            )
1196
1197
            t0 = perf_counter()
1198
1199
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1200
1201
    q_mastr_gdf.loc[
1202
        assigned_buildings.gens_id, "building_id"
1203
    ] = assigned_buildings.index
1204
1205
    logger.debug("Allocated status quo generators to buildings.")
1206
1207
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1208
1209
1210
def frame_to_numeric(
1211
    df: pd.DataFrame | gpd.GeoDataFrame,
1212
) -> pd.DataFrame | gpd.GeoDataFrame:
1213
    """
1214
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1215
    Parameters
1216
    ----------
1217
    df : pandas.DataFrame or geopandas.GeoDataFrame
1218
    Returns
1219
    -------
1220
    pandas.DataFrame or geopandas.GeoDataFrame
1221
    """
1222
    if str(df.index.dtype) == "object":
1223
        df.index = pd.to_numeric(df.index, errors="ignore")
1224
1225
    for col in df.columns:
1226
        if str(df[col].dtype) == "object":
1227
            df[col] = pd.to_numeric(df[col], errors="ignore")
1228
1229
    return df
1230
1231
1232
def validate_output(
1233
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1234
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1235
) -> None:
1236
    """
1237
    Validate output.
1238
1239
    * Validate that there are exactly as many buildings with a pv system as there are
1240
      pv systems with a building
1241
    * Validate that the building IDs with a pv system are the same building IDs as
1242
      assigned to the pv systems
1243
    * Validate that the pv system IDs with a building are the same pv system IDs as
1244
      assigned to the buildings
1245
1246
    Parameters
1247
    -----------
1248
    desagg_mastr_gdf : geopandas.GeoDataFrame
1249
        GeoDataFrame containing MaStR data allocated to building IDs.
1250
    desagg_buildings_gdf : geopandas.GeoDataFrame
1251
        GeoDataFrame containing building data allocated to MaStR IDs.
1252
    """
1253
    assert len(
1254
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1255
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1256
    assert (
1257
        np.sort(
1258
            desagg_mastr_gdf.loc[
1259
                ~desagg_mastr_gdf.building_id.isna()
1260
            ].building_id.unique()
1261
        )
1262
        == np.sort(
1263
            desagg_buildings_gdf.loc[
1264
                ~desagg_buildings_gdf.gens_id.isna()
1265
            ].index.unique()
1266
        )
1267
    ).all()
1268
    assert (
1269
        np.sort(
1270
            desagg_mastr_gdf.loc[
1271
                ~desagg_mastr_gdf.building_id.isna()
1272
            ].index.unique()
1273
        )
1274
        == np.sort(
1275
            desagg_buildings_gdf.loc[
1276
                ~desagg_buildings_gdf.gens_id.isna()
1277
            ].gens_id.unique()
1278
        )
1279
    ).all()
1280
1281
    logger.debug("Validated output.")
1282
1283
1284
def drop_unallocated_gens(
1285
    gdf: gpd.GeoDataFrame,
1286
) -> gpd.GeoDataFrame:
1287
    """
1288
    Drop generators which did not get allocated.
1289
1290
    Parameters
1291
    -----------
1292
    gdf : geopandas.GeoDataFrame
1293
        GeoDataFrame containing MaStR data allocated to building IDs.
1294
    Returns
1295
    -------
1296
    geopandas.GeoDataFrame
1297
        GeoDataFrame containing MaStR data with generators dropped which did not get
1298
        allocated.
1299
    """
1300
    init_len = len(gdf)
1301
    gdf = gdf.loc[~gdf.building_id.isna()]
1302
    end_len = len(gdf)
1303
1304
    logger.debug(
1305
        f"Dropped {init_len - end_len} "
1306
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1307
        f" of {init_len} unallocated rows from MaStR DataFrame."
1308
    )
1309
1310
    return gdf
1311
1312
1313
@timer_func
1314
def allocate_to_buildings(
1315
    mastr_gdf: gpd.GeoDataFrame,
1316
    buildings_gdf: gpd.GeoDataFrame,
1317
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1318
    """
1319
    Allocate status quo pv rooftop generators to buildings.
1320
    Parameters
1321
    -----------
1322
    mastr_gdf : geopandas.GeoDataFrame
1323
        GeoDataFrame containing MaStR data with geocoded locations.
1324
    buildings_gdf : geopandas.GeoDataFrame
1325
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1326
        dropped.
1327
    Returns
1328
    -------
1329
    tuple with two geopandas.GeoDataFrame s
1330
        GeoDataFrame containing MaStR data allocated to building IDs.
1331
        GeoDataFrame containing building data allocated to MaStR IDs.
1332
    """
1333
    logger.debug("Starting allocation of status quo.")
1334
1335
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1336
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1337
1338
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1339
        q_mastr_gdf, q_buildings_gdf, SEED
1340
    )
1341
1342
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1343
1344
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1345
1346
1347
@timer_func
1348
def grid_districts(
1349
    epsg: int,
1350
) -> gpd.GeoDataFrame:
1351
    """
1352
    Load mv grid district geo data from eGo^n Database as
1353
    geopandas.GeoDataFrame.
1354
    Parameters
1355
    -----------
1356
    epsg : int
1357
        EPSG ID to use as CRS.
1358
    Returns
1359
    -------
1360
    geopandas.GeoDataFrame
1361
        GeoDataFrame containing mv grid district ID and geo shapes data.
1362
    """
1363
    gdf = db.select_geodataframe(
1364
        """
1365
        SELECT bus_id, geom
1366
        FROM grid.egon_mv_grid_district
1367
        ORDER BY bus_id
1368
        """,
1369
        index_col="bus_id",
1370
        geom_col="geom",
1371
        epsg=epsg,
1372
    )
1373
1374
    gdf.index = gdf.index.astype(int)
1375
1376
    logger.debug("Grid districts loaded.")
1377
1378
    return gdf
1379
1380
1381
def scenario_data(
1382
    carrier: str = "solar_rooftop",
1383
    scenario: str = "eGon2035",
1384
) -> pd.DataFrame:
1385
    """
1386
    Get scenario capacity data from eGo^n Database.
1387
    Parameters
1388
    -----------
1389
    carrier : str
1390
        Carrier type to filter table by.
1391
    scenario : str
1392
        Scenario to filter table by.
1393
    Returns
1394
    -------
1395
    geopandas.GeoDataFrame
1396
        GeoDataFrame with scenario capacity data in GW.
1397
    """
1398
    with db.session_scope() as session:
1399
        query = session.query(EgonScenarioCapacities).filter(
1400
            EgonScenarioCapacities.carrier == carrier,
1401
            EgonScenarioCapacities.scenario_name == scenario,
1402
        )
1403
1404
    df = pd.read_sql(
1405
        query.statement, query.session.bind, index_col="index"
1406
    ).sort_index()
1407
1408
    logger.debug("Scenario capacity data loaded.")
1409
1410
    return df
1411
1412
1413 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1414
    __tablename__ = "vg250_lan"
1415
    __table_args__ = {"schema": "boundaries"}
1416
1417
    id = Column(BigInteger, primary_key=True, index=True)
1418
    ade = Column(BigInteger)
1419
    gf = Column(BigInteger)
1420
    bsg = Column(BigInteger)
1421
    ars = Column(String)
1422
    ags = Column(String)
1423
    sdv_ars = Column(String)
1424
    gen = Column(String)
1425
    bez = Column(String)
1426
    ibz = Column(BigInteger)
1427
    bem = Column(String)
1428
    nbd = Column(String)
1429
    sn_l = Column(String)
1430
    sn_r = Column(String)
1431
    sn_k = Column(String)
1432
    sn_v1 = Column(String)
1433
    sn_v2 = Column(String)
1434
    sn_g = Column(String)
1435
    fk_s3 = Column(String)
1436
    nuts = Column(String)
1437
    ars_0 = Column(String)
1438
    ags_0 = Column(String)
1439
    wsk = Column(String)
1440
    debkg_id = Column(String)
1441
    rs = Column(String)
1442
    sdv_rs = Column(String)
1443
    rs_0 = Column(String)
1444
    geometry = Column(Geometry(srid=EPSG), index=True)
1445
1446
1447
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1448
    """
1449
    Get feder state data from eGo^n Database.
1450
    Parameters
1451
    -----------
1452
    to_crs : pyproj.crs.crs.CRS
1453
        CRS to transform geometries to.
1454
    Returns
1455
    -------
1456
    geopandas.GeoDataFrame
1457
        GeoDataFrame with federal state data.
1458
    """
1459
    with db.session_scope() as session:
1460
        query = session.query(
1461
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1462
        )
1463
1464
    gdf = gpd.read_postgis(
1465
        query.statement, query.session.bind, index_col="id"
1466
    ).to_crs(to_crs)
1467
1468
    logger.debug("Federal State data loaded.")
1469
1470
    return gdf
1471
1472
1473
@timer_func
1474
def overlay_grid_districts_with_counties(
1475
    mv_grid_district_gdf: gpd.GeoDataFrame,
1476
    federal_state_gdf: gpd.GeoDataFrame,
1477
) -> gpd.GeoDataFrame:
1478
    """
1479
    Calculate the intersections of mv grid districts and counties.
1480
    Parameters
1481
    -----------
1482
    mv_grid_district_gdf : gpd.GeoDataFrame
1483
        GeoDataFrame containing mv grid district ID and geo shapes data.
1484
    federal_state_gdf : gpd.GeoDataFrame
1485
        GeoDataFrame with federal state data.
1486
    Returns
1487
    -------
1488
    geopandas.GeoDataFrame
1489
        GeoDataFrame containing OSM buildings data.
1490
    """
1491
    logger.debug(
1492
        "Calculating intersection overlay between mv grid districts and "
1493
        "counties. This may take a while..."
1494
    )
1495
1496
    gdf = gpd.overlay(
1497
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1498
        mv_grid_district_gdf.reset_index(),
1499
        how="intersection",
1500
        keep_geom_type=True,
1501
    )
1502
1503
    logger.debug("Done!")
1504
1505
    return gdf
1506
1507
1508
@timer_func
1509
def add_overlay_id_to_buildings(
1510
    buildings_gdf: gpd.GeoDataFrame,
1511
    grid_federal_state_gdf: gpd.GeoDataFrame,
1512
) -> gpd.GeoDataFrame:
1513
    """
1514
    Add information about overlay ID to buildings.
1515
    Parameters
1516
    -----------
1517
    buildings_gdf : geopandas.GeoDataFrame
1518
        GeoDataFrame containing OSM buildings data.
1519
    grid_federal_state_gdf : geopandas.GeoDataFrame
1520
        GeoDataFrame with intersection shapes between counties and grid districts.
1521
    Returns
1522
    -------
1523
    geopandas.GeoDataFrame
1524
        GeoDataFrame containing OSM buildings data with overlay ID added.
1525
    """
1526
    gdf = (
1527
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1528
        .sjoin(
1529
            grid_federal_state_gdf,
1530
            how="left",
1531
            predicate="intersects",
1532
        )
1533
        .rename(columns={"index_right": "overlay_id"})
1534
    )
1535
1536
    logger.debug("Added overlay ID to OSM buildings.")
1537
1538
    return gdf
1539
1540
1541
def drop_buildings_outside_grids(
1542
    buildings_gdf: gpd.GeoDataFrame,
1543
) -> gpd.GeoDataFrame:
1544
    """
1545
    Drop all buildings outside of grid areas.
1546
    Parameters
1547
    -----------
1548
    buildings_gdf : geopandas.GeoDataFrame
1549
        GeoDataFrame containing OSM buildings data.
1550
    Returns
1551
    -------
1552
    gepandas.GeoDataFrame
1553
        GeoDataFrame containing OSM buildings data
1554
        with buildings without an bus ID dropped.
1555
    """
1556
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1557
1558
    logger.debug(
1559
        f"{len(buildings_gdf) - len(gdf)} "
1560
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1561
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1562
        "and are therefore dropped."
1563
    )
1564
1565
    return gdf
1566
1567
1568
def cap_per_bus_id(
1569
    scenario: str,
1570
) -> pd.DataFrame:
1571
    """
1572
    Get table with total pv rooftop capacity per grid district.
1573
1574
    Parameters
1575
    -----------
1576
    scenario : str
1577
        Scenario name.
1578
    Returns
1579
    -------
1580
    pandas.DataFrame
1581
        DataFrame with total rooftop capacity per mv grid.
1582
    """
1583
    targets = config.datasets()["solar_rooftop"]["targets"]
1584
1585
    sql = f"""
1586
    SELECT bus as bus_id, p_nom as capacity
1587
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1588
    WHERE carrier = 'solar_rooftop'
1589
    AND scn_name = '{scenario}'
1590
    """
1591
1592
    return db.select_dataframe(sql, index_col="bus_id")
1593
1594
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1595
    #
1596
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1597
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1598
    #
1599
    #     capacity = nuts_gdf.building_area.multiply(
1600
    #         cap / nuts_gdf.building_area.sum()
1601
    #     )
1602
    #
1603
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1604
    #         nuts_gdf.index
1605
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1606
    #
1607
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1608
1609
1610
def determine_end_of_life_gens(
1611
    mastr_gdf: gpd.GeoDataFrame,
1612
    scenario_timestamp: pd.Timestamp,
1613
    pv_rooftop_lifetime: pd.Timedelta,
1614
) -> gpd.GeoDataFrame:
1615
    """
1616
    Determine if an old PV system has reached its end of life.
1617
    Parameters
1618
    -----------
1619
    mastr_gdf : geopandas.GeoDataFrame
1620
        GeoDataFrame containing geocoded MaStR data.
1621
    scenario_timestamp : pandas.Timestamp
1622
        Timestamp at which the scenario takes place.
1623
    pv_rooftop_lifetime : pandas.Timedelta
1624
        Average expected lifetime of PV rooftop systems.
1625
    Returns
1626
    -------
1627
    geopandas.GeoDataFrame
1628
        GeoDataFrame containing geocoded MaStR data and info if the system
1629
        has reached its end of life.
1630
    """
1631
    mastr_gdf = mastr_gdf.assign(
1632
        age=scenario_timestamp - mastr_gdf.start_up_date
1633
    )
1634
1635
    logger.debug("Determined if pv rooftop systems reached their end of life.")
1636
1637
    return mastr_gdf.assign(end_of_life=pv_rooftop_lifetime < mastr_gdf.age)
1638
1639
1640
def calculate_max_pv_cap_per_building(
1641
    buildings_gdf: gpd.GeoDataFrame,
1642
    mastr_gdf: gpd.GeoDataFrame,
1643
    pv_cap_per_sq_m: float | int,
1644
    roof_factor: float | int,
1645
) -> gpd.GeoDataFrame:
1646
    """
1647
    Calculate the estimated maximum possible PV capacity per building.
1648
    Parameters
1649
    -----------
1650
    buildings_gdf : geopandas.GeoDataFrame
1651
        GeoDataFrame containing OSM buildings data.
1652
    mastr_gdf : geopandas.GeoDataFrame
1653
        GeoDataFrame containing geocoded MaStR data.
1654
    pv_cap_per_sq_m : float, int
1655
        Average expected, installable PV capacity per square meter.
1656
    roof_factor : float, int
1657
        Average for PV usable roof area share.
1658
    Returns
1659
    -------
1660
    geopandas.GeoDataFrame
1661
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1662
        capacity.
1663
    """
1664
    gdf = (
1665
        buildings_gdf.reset_index()
1666
        .merge(
1667
            mastr_gdf[
1668
                [
1669
                    "capacity",
1670
                    "end_of_life",
1671
                    "building_id",
1672
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1673
                    "Hauptausrichtung",
1674
                    "HauptausrichtungNeigungswinkel",
1675
                ]
1676
            ],
1677
            how="left",
1678
            left_on="id",
1679
            right_on="building_id",
1680
        )
1681
        .set_index("id")
1682
        .drop(columns="building_id")
1683
    )
1684
1685
    return gdf.assign(
1686
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1687
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1688
        bus_id=gdf.bus_id.astype(int),
1689
    )
1690
1691
1692
def calculate_building_load_factor(
1693
    mastr_gdf: gpd.GeoDataFrame,
1694
    buildings_gdf: gpd.GeoDataFrame,
1695
    rounding: int = 4,
1696
) -> gpd.GeoDataFrame:
1697
    """
1698
    Calculate the roof load factor from existing PV systems.
1699
    Parameters
1700
    -----------
1701
    mastr_gdf : geopandas.GeoDataFrame
1702
        GeoDataFrame containing geocoded MaStR data.
1703
    buildings_gdf : geopandas.GeoDataFrame
1704
        GeoDataFrame containing OSM buildings data.
1705
    rounding : int
1706
        Rounding to use for load factor.
1707
    Returns
1708
    -------
1709
    geopandas.GeoDataFrame
1710
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1711
    """
1712
    gdf = mastr_gdf.merge(
1713
        buildings_gdf[["max_cap", "building_area"]].loc[
1714
            ~buildings_gdf["max_cap"].isna()
1715
        ],
1716
        how="left",
1717
        left_on="building_id",
1718
        right_index=True,
1719
    )
1720
1721
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1722
1723
1724
def get_probability_for_property(
1725
    mastr_gdf: gpd.GeoDataFrame,
1726
    cap_range: tuple[int | float, int | float],
1727
    prop: str,
1728
) -> tuple[np.array, np.array]:
1729
    """
1730
    Calculate the probability of the different options of a property of the
1731
    existing PV plants.
1732
    Parameters
1733
    -----------
1734
    mastr_gdf : geopandas.GeoDataFrame
1735
        GeoDataFrame containing geocoded MaStR data.
1736
    cap_range : tuple(int, int)
1737
        Capacity range of PV plants to look at.
1738
    prop : str
1739
        Property to calculate probabilities for. String needs to be in columns
1740
        of mastr_gdf.
1741
    Returns
1742
    -------
1743
    tuple
1744
        numpy.array
1745
            Unique values of property.
1746
        numpy.array
1747
            Probabilties per unique value.
1748
    """
1749
    cap_range_gdf = mastr_gdf.loc[
1750
        (mastr_gdf.capacity > cap_range[0])
1751
        & (mastr_gdf.capacity <= cap_range[1])
1752
    ]
1753
1754
    if prop == "load_factor":
1755
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1756
1757
    count = Counter(
1758
        cap_range_gdf[prop].loc[
1759
            ~cap_range_gdf[prop].isna()
1760
            & ~cap_range_gdf[prop].isnull()
1761
            & ~(cap_range_gdf[prop] == "None")
1762
        ]
1763
    )
1764
1765
    values = np.array(list(count.keys()))
1766
    probabilities = np.fromiter(count.values(), dtype=float)
1767
    probabilities = probabilities / np.sum(probabilities)
1768
1769
    return values, probabilities
1770
1771
1772
@timer_func
1773
def probabilities(
1774
    mastr_gdf: gpd.GeoDataFrame,
1775
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1776
    properties: list[str] | None = None,
1777
) -> dict:
1778
    """
1779
    Calculate the probability of the different options of properties of the
1780
    existing PV plants.
1781
    Parameters
1782
    -----------
1783
    mastr_gdf : geopandas.GeoDataFrame
1784
        GeoDataFrame containing geocoded MaStR data.
1785
    cap_ranges : list(tuple(int, int))
1786
        List of capacity ranges to distinguish between. The first tuple should
1787
        start with a zero and the last one should end with infinite.
1788
    properties : list(str)
1789
        List of properties to calculate probabilities for. Strings needs to be
1790
        in columns of mastr_gdf.
1791
    Returns
1792
    -------
1793
    dict
1794
        Dictionary with values and probabilities per capacity range.
1795
    """
1796
    if cap_ranges is None:
1797
        cap_ranges = [
1798
            (0, 30),
1799
            (30, 100),
1800
            (100, float("inf")),
1801
        ]
1802
    if properties is None:
1803
        properties = [
1804
            "EinheitlicheAusrichtungUndNeigungswinkel",
1805
            "Hauptausrichtung",
1806
            "HauptausrichtungNeigungswinkel",
1807
            "load_factor",
1808
        ]
1809
1810
    prob_dict = {}
1811
1812
    for cap_range in cap_ranges:
1813
        prob_dict[cap_range] = {
1814
            "values": {},
1815
            "probabilities": {},
1816
        }
1817
1818
        for prop in properties:
1819
            v, p = get_probability_for_property(
1820
                mastr_gdf,
1821
                cap_range,
1822
                prop,
1823
            )
1824
1825
            prob_dict[cap_range]["values"][prop] = v
1826
            prob_dict[cap_range]["probabilities"][prop] = p
1827
1828
    return prob_dict
1829
1830
1831
def cap_share_per_cap_range(
1832
    mastr_gdf: gpd.GeoDataFrame,
1833
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1834
) -> dict[tuple[int | float, int | float], float]:
1835
    """
1836
    Calculate the share of PV capacity from the total PV capacity within
1837
    capacity ranges.
1838
    Parameters
1839
    -----------
1840
    mastr_gdf : geopandas.GeoDataFrame
1841
        GeoDataFrame containing geocoded MaStR data.
1842
    cap_ranges : list(tuple(int, int))
1843
        List of capacity ranges to distinguish between. The first tuple should
1844
        start with a zero and the last one should end with infinite.
1845
    Returns
1846
    -------
1847
    dict
1848
        Dictionary with share of PV capacity from the total PV capacity within
1849
        capacity ranges.
1850
    """
1851
    if cap_ranges is None:
1852
        cap_ranges = [
1853
            (0, 30),
1854
            (30, 100),
1855
            (100, float("inf")),
1856
        ]
1857
1858
    cap_share_dict = {}
1859
1860
    total_cap = mastr_gdf.capacity.sum()
1861
1862
    for cap_range in cap_ranges:
1863
        cap_share = (
1864
            mastr_gdf.loc[
1865
                (mastr_gdf.capacity > cap_range[0])
1866
                & (mastr_gdf.capacity <= cap_range[1])
1867
            ].capacity.sum()
1868
            / total_cap
1869
        )
1870
1871
        cap_share_dict[cap_range] = cap_share
1872
1873
    return cap_share_dict
1874
1875
1876
def mean_load_factor_per_cap_range(
1877
    mastr_gdf: gpd.GeoDataFrame,
1878
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1879
) -> dict[tuple[int | float, int | float], float]:
1880
    """
1881
    Calculate the mean roof load factor per capacity range from existing PV
1882
    plants.
1883
    Parameters
1884
    -----------
1885
    mastr_gdf : geopandas.GeoDataFrame
1886
        GeoDataFrame containing geocoded MaStR data.
1887
    cap_ranges : list(tuple(int, int))
1888
        List of capacity ranges to distinguish between. The first tuple should
1889
        start with a zero and the last one should end with infinite.
1890
    Returns
1891
    -------
1892
    dict
1893
        Dictionary with mean roof load factor per capacity range.
1894
    """
1895
    if cap_ranges is None:
1896
        cap_ranges = [
1897
            (0, 30),
1898
            (30, 100),
1899
            (100, float("inf")),
1900
        ]
1901
1902
    load_factor_dict = {}
1903
1904
    for cap_range in cap_ranges:
1905
        load_factor = mastr_gdf.loc[
1906
            (mastr_gdf.load_factor <= 1)
1907
            & (mastr_gdf.capacity > cap_range[0])
1908
            & (mastr_gdf.capacity <= cap_range[1])
1909
        ].load_factor.mean()
1910
1911
        load_factor_dict[cap_range] = load_factor
1912
1913
    return load_factor_dict
1914
1915
1916
def building_area_range_per_cap_range(
1917
    mastr_gdf: gpd.GeoDataFrame,
1918
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1919
    min_building_size: int | float = 10.0,
1920
    upper_quantile: float = 0.95,
1921
    lower_quantile: float = 0.05,
1922
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1923
    """
1924
    Estimate normal building area range per capacity range.
1925
    Calculate the mean roof load factor per capacity range from existing PV
1926
    plants.
1927
    Parameters
1928
    -----------
1929
    mastr_gdf : geopandas.GeoDataFrame
1930
        GeoDataFrame containing geocoded MaStR data.
1931
    cap_ranges : list(tuple(int, int))
1932
        List of capacity ranges to distinguish between. The first tuple should
1933
        start with a zero and the last one should end with infinite.
1934
    min_building_size : int, float
1935
        Minimal building size to consider for PV plants.
1936
    upper_quantile : float
1937
        Upper quantile to estimate maximum building size per capacity range.
1938
    lower_quantile : float
1939
        Lower quantile to estimate minimum building size per capacity range.
1940
    Returns
1941
    -------
1942
    dict
1943
        Dictionary with estimated normal building area range per capacity
1944
        range.
1945
    """
1946
    if cap_ranges is None:
1947
        cap_ranges = [
1948
            (0, 30),
1949
            (30, 100),
1950
            (100, float("inf")),
1951
        ]
1952
1953
    building_area_range_dict = {}
1954
1955
    n_ranges = len(cap_ranges)
1956
1957
    for count, cap_range in enumerate(cap_ranges):
1958
        cap_range_gdf = mastr_gdf.loc[
1959
            (mastr_gdf.capacity > cap_range[0])
1960
            & (mastr_gdf.capacity <= cap_range[1])
1961
        ]
1962
1963
        if count == 0:
1964
            building_area_range_dict[cap_range] = (
1965
                min_building_size,
1966
                cap_range_gdf.building_area.quantile(upper_quantile),
1967
            )
1968
        elif count == n_ranges - 1:
1969
            building_area_range_dict[cap_range] = (
1970
                cap_range_gdf.building_area.quantile(lower_quantile),
1971
                float("inf"),
1972
            )
1973
        else:
1974
            building_area_range_dict[cap_range] = (
1975
                cap_range_gdf.building_area.quantile(lower_quantile),
1976
                cap_range_gdf.building_area.quantile(upper_quantile),
1977
            )
1978
1979
    values = list(building_area_range_dict.values())
1980
1981
    building_area_range_normed_dict = {}
1982
1983
    for count, (cap_range, (min_area, max_area)) in enumerate(
1984
        building_area_range_dict.items()
1985
    ):
1986
        if count == 0:
1987
            building_area_range_normed_dict[cap_range] = (
1988
                min_area,
1989
                np.mean((values[count + 1][0], max_area)),
1990
            )
1991
        elif count == n_ranges - 1:
1992
            building_area_range_normed_dict[cap_range] = (
1993
                np.mean((values[count - 1][1], min_area)),
1994
                max_area,
1995
            )
1996
        else:
1997
            building_area_range_normed_dict[cap_range] = (
1998
                np.mean((values[count - 1][1], min_area)),
1999
                np.mean((values[count + 1][0], max_area)),
2000
            )
2001
2002
    return building_area_range_normed_dict
2003
2004
2005
@timer_func
2006
def desaggregate_pv_in_mv_grid(
2007
    buildings_gdf: gpd.GeoDataFrame,
2008
    pv_cap: float | int,
2009
    **kwargs,
2010
) -> gpd.GeoDataFrame:
2011
    """
2012
    Desaggregate PV capacity on buildings within a given grid district.
2013
    Parameters
2014
    -----------
2015
    buildings_gdf : geopandas.GeoDataFrame
2016
        GeoDataFrame containing buildings within the grid district.
2017
    pv_cap : float, int
2018
        PV capacity to desaggregate.
2019
    Other Parameters
2020
    -----------
2021
    prob_dict : dict
2022
        Dictionary with values and probabilities per capacity range.
2023
    cap_share_dict : dict
2024
        Dictionary with share of PV capacity from the total PV capacity within
2025
        capacity ranges.
2026
    building_area_range_dict : dict
2027
        Dictionary with estimated normal building area range per capacity
2028
        range.
2029
    load_factor_dict : dict
2030
        Dictionary with mean roof load factor per capacity range.
2031
    seed : int
2032
        Seed to use for random operations with NumPy and pandas.
2033
    pv_cap_per_sq_m : float, int
2034
        Average expected, installable PV capacity per square meter.
2035
    Returns
2036
    -------
2037
    geopandas.GeoDataFrame
2038
        GeoDataFrame containing OSM building data with desaggregated PV
2039
        plants.
2040
    """
2041
    bus_id = int(buildings_gdf.bus_id.iat[0])
2042
2043
    rng = default_rng(seed=kwargs["seed"])
2044
    random_state = RandomState(seed=kwargs["seed"])
2045
2046
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2047
2048
    for cap_range, share in kwargs["cap_share_dict"].items():
2049
        pv_cap_range = pv_cap * share
2050
2051
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2052
2053
        cap_range_buildings_gdf = buildings_gdf.loc[
2054
            ~buildings_gdf.index.isin(results_df.index)
2055
            & (buildings_gdf.building_area > b_area_min)
2056
            & (buildings_gdf.building_area <= b_area_max)
2057
        ]
2058
2059
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2060
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2061
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2062
            load_factor=np.nan,
2063
            capacity=np.nan,
2064
        )
2065
2066
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2067
2068
        if total_mean_cap == 0:
2069
            logger.warning(
2070
                f"There are no matching roof for capacity range {cap_range} "
2071
                f"kW in grid {bus_id}. Using all buildings as fallback."
2072
            )
2073
2074
            cap_range_buildings_gdf = buildings_gdf.loc[
2075
                ~buildings_gdf.index.isin(results_df.index)
2076
            ]
2077
2078
            if len(cap_range_buildings_gdf) == 0:
2079
                logger.warning(
2080
                    "There are no roofes available for capacity range "
2081
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2082
                )
2083
                cap_range_buildings_gdf = buildings_gdf.copy()
2084
2085
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2086
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2087
                load_factor=np.nan,
2088
                capacity=np.nan,
2089
            )
2090
2091
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2092
2093
        elif total_mean_cap < pv_cap_range:
2094
            logger.warning(
2095
                f"Average roof utilization of the roof area in grid {bus_id} "
2096
                f"and capacity range {cap_range} kW is not sufficient. The "
2097
                "roof utilization will be above average."
2098
            )
2099
2100
        frac = max(
2101
            pv_cap_range / total_mean_cap,
2102
            1 / len(cap_range_buildings_gdf),
2103
        )
2104
2105
        samples_gdf = cap_range_buildings_gdf.sample(
2106
            frac=min(1, frac),
2107
            random_state=random_state,
2108
        )
2109
2110
        cap_range_dict = kwargs["prob_dict"][cap_range]
2111
2112
        values_dict = cap_range_dict["values"]
2113
        p_dict = cap_range_dict["probabilities"]
2114
2115
        load_factors = rng.choice(
2116
            a=values_dict["load_factor"],
2117
            size=len(samples_gdf),
2118
            p=p_dict["load_factor"],
2119
        )
2120
2121
        samples_gdf = samples_gdf.assign(
2122
            load_factor=load_factors,
2123
            capacity=(
2124
                samples_gdf.building_area
2125
                * load_factors
2126
                * kwargs["pv_cap_per_sq_m"]
2127
            ).clip(lower=0.4),
2128
        )
2129
2130
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2131
2132
        samples_gdf = samples_gdf.assign(
2133
            capacity=(samples_gdf.capacity * missing_factor),
2134
            load_factor=(samples_gdf.load_factor * missing_factor),
2135
        )
2136
2137
        assert np.isclose(
2138
            samples_gdf.capacity.sum(),
2139
            pv_cap_range,
2140
            rtol=1e-03,
2141
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2142
2143
        results_df = pd.concat(
2144
            [
2145
                results_df,
2146
                samples_gdf,
2147
            ],
2148
        )
2149
2150
    total_missing_factor = pv_cap / results_df.capacity.sum()
2151
2152
    results_df = results_df.assign(
2153
        capacity=(results_df.capacity * total_missing_factor),
2154
    )
2155
2156
    assert np.isclose(
2157
        results_df.capacity.sum(),
2158
        pv_cap,
2159
        rtol=1e-03,
2160
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2161
2162
    return gpd.GeoDataFrame(
2163
        results_df,
2164
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2048 is not entered. Are you sure this can never be the case?
Loading history...
2165
        geometry="geom",
2166
    )
2167
2168
2169
@timer_func
2170
def desaggregate_pv(
2171
    buildings_gdf: gpd.GeoDataFrame,
2172
    cap_df: pd.DataFrame,
2173
    **kwargs,
2174
) -> gpd.GeoDataFrame:
2175
    """
2176
    Desaggregate PV capacity on buildings within a given grid district.
2177
    Parameters
2178
    -----------
2179
    buildings_gdf : geopandas.GeoDataFrame
2180
        GeoDataFrame containing OSM buildings data.
2181
    cap_df : pandas.DataFrame
2182
        DataFrame with total rooftop capacity per mv grid.
2183
    Other Parameters
2184
    -----------
2185
    prob_dict : dict
2186
        Dictionary with values and probabilities per capacity range.
2187
    cap_share_dict : dict
2188
        Dictionary with share of PV capacity from the total PV capacity within
2189
        capacity ranges.
2190
    building_area_range_dict : dict
2191
        Dictionary with estimated normal building area range per capacity
2192
        range.
2193
    load_factor_dict : dict
2194
        Dictionary with mean roof load factor per capacity range.
2195
    seed : int
2196
        Seed to use for random operations with NumPy and pandas.
2197
    pv_cap_per_sq_m : float, int
2198
        Average expected, installable PV capacity per square meter.
2199
    Returns
2200
    -------
2201
    geopandas.GeoDataFrame
2202
        GeoDataFrame containing OSM building data with desaggregated PV
2203
        plants.
2204
    """
2205
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2206
2207
    building_bus_ids = set(buildings_gdf.bus_id)
2208
    cap_bus_ids = set(cap_df.index)
2209
2210
    logger.debug(
2211
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from capacity: "
2212
        f"{len(cap_bus_ids)}"
2213
    )
2214
2215
    if len(building_bus_ids) > len(cap_bus_ids):
2216
        missing = building_bus_ids - cap_bus_ids
2217
    else:
2218
        missing = cap_bus_ids - building_bus_ids
2219
2220
    logger.debug(str(missing))
2221
2222
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2223
2224
    for bus_id in buildings_gdf.bus_id.unique():
2225
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2226
2227
        pv_installed_gdf = buildings_grid_gdf.loc[
2228
            ~buildings_grid_gdf.end_of_life
2229
        ]
2230
2231
        pv_installed = pv_installed_gdf.capacity.sum()
2232
2233
        pot_buildings_gdf = buildings_grid_gdf.drop(
2234
            index=pv_installed_gdf.index
2235
        )
2236
2237
        if len(pot_buildings_gdf) == 0:
2238
            logger.error(
2239
                f"In grid {bus_id} there are no potential buildings to allocate "
2240
                "PV capacity to. The grid is skipped. This message should only "
2241
                "appear doing test runs with few buildings."
2242
            )
2243
2244
            continue
2245
2246
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2247
2248
        logger.debug(f"pv_target: {pv_target}")
2249
2250
        pv_missing = pv_target - pv_installed
2251
2252
        if pv_missing <= 0:
2253
            logger.info(
2254
                f"In grid {bus_id} there is more PV installed ({pv_installed: g}) in "
2255
                f"status Quo than allocated within the scenario ({pv_target: g}). No "
2256
                f"new generators are added."
2257
            )
2258
2259
            continue
2260
2261
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2262
            logger.error(
2263
                f"In grid {bus_id} there is less PV potential ("
2264
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV  capacity "
2265
                f"({pv_missing:g} kW). The average roof utilization will be very high."
2266
            )
2267
2268
        gdf = desaggregate_pv_in_mv_grid(
2269
            buildings_gdf=pot_buildings_gdf,
2270
            pv_cap=pv_missing,
2271
            **kwargs,
2272
        )
2273
2274
        logger.debug(f"New cap in grid {bus_id}: {gdf.capacity.sum()}")
2275
        logger.debug(f"Installed cap in grid {bus_id}: {pv_installed}")
2276
        logger.debug(
2277
            f"Total cap in grid {bus_id}: {gdf.capacity.sum() + pv_installed}"
2278
        )
2279
2280
        if not np.isclose(
2281
            gdf.capacity.sum() + pv_installed, pv_target, rtol=1e-3
2282
        ):
2283
            logger.warning(
2284
                f"The desired capacity and actual capacity in grid {bus_id} differ.\n"
2285
                f"Desired cap: {pv_target}\nActual cap: "
2286
                f"{gdf.capacity.sum() + pv_installed}"
2287
            )
2288
2289
        allocated_buildings_gdf = pd.concat(
2290
            [
2291
                allocated_buildings_gdf,
2292
                gdf,
2293
            ]
2294
        )
2295
2296
    logger.debug("Desaggregated scenario.")
2297
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2298
    logger.debug(
2299
        f"Generator capacity: {allocated_buildings_gdf.capacity.sum(): g}"
2300
    )
2301
2302
    return gpd.GeoDataFrame(
2303
        allocated_buildings_gdf,
2304
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2305
        geometry="geom",
2306
    )
2307
2308
2309
@timer_func
2310
def add_buildings_meta_data(
2311
    buildings_gdf: gpd.GeoDataFrame,
2312
    prob_dict: dict,
2313
    seed: int,
2314
) -> gpd.GeoDataFrame:
2315
    """
2316
    Randomly add additional metadata to desaggregated PV plants.
2317
    Parameters
2318
    -----------
2319
    buildings_gdf : geopandas.GeoDataFrame
2320
        GeoDataFrame containing OSM buildings data with desaggregated PV
2321
        plants.
2322
    prob_dict : dict
2323
        Dictionary with values and probabilities per capacity range.
2324
    seed : int
2325
        Seed to use for random operations with NumPy and pandas.
2326
    Returns
2327
    -------
2328
    geopandas.GeoDataFrame
2329
        GeoDataFrame containing OSM building data with desaggregated PV
2330
        plants.
2331
    """
2332
    rng = default_rng(seed=seed)
2333
    buildings_gdf = buildings_gdf.reset_index().rename(
2334
        columns={
2335
            "index": "building_id",
2336
        }
2337
    )
2338
2339
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2340
        cap_range_gdf = buildings_gdf.loc[
2341
            (buildings_gdf.capacity >= min_cap)
2342
            & (buildings_gdf.capacity < max_cap)
2343
        ]
2344
2345
        for key, values in cap_range_prob_dict["values"].items():
2346
            if key == "load_factor":
2347
                continue
2348
2349
            gdf = cap_range_gdf.loc[
2350
                cap_range_gdf[key].isna()
2351
                | cap_range_gdf[key].isnull()
2352
                | (cap_range_gdf[key] == "None")
2353
            ]
2354
2355
            key_vals = rng.choice(
2356
                a=values,
2357
                size=len(gdf),
2358
                p=cap_range_prob_dict["probabilities"][key],
2359
            )
2360
2361
            buildings_gdf.loc[gdf.index, key] = key_vals
2362
2363
    return buildings_gdf
2364
2365
2366
def add_voltage_level(
2367
    buildings_gdf: gpd.GeoDataFrame,
2368
) -> gpd.GeoDataFrame:
2369
    """
2370
    Add voltage level derived from generator capacity to the power plants.
2371
    Parameters
2372
    -----------
2373
    buildings_gdf : geopandas.GeoDataFrame
2374
        GeoDataFrame containing OSM buildings data with desaggregated PV
2375
        plants.
2376
    Returns
2377
    -------
2378
    geopandas.GeoDataFrame
2379
        GeoDataFrame containing OSM building data with voltage level per generator.
2380
    """
2381
2382
    def voltage_levels(p: float) -> int:
2383
        if p < 100:
2384
            return 7
2385
        elif p < 200:
2386
            return 6
2387
        elif p < 5500:
2388
            return 5
2389
        elif p < 20000:
2390
            return 4
2391
        elif p < 120000:
2392
            return 3
2393
        return 1
2394
2395
    return buildings_gdf.assign(
2396
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2397
    )
2398
2399
2400
def add_start_up_date(
2401
    buildings_gdf: gpd.GeoDataFrame,
2402
    start: pd.Timestamp,
2403
    end: pd.Timestamp,
2404
    seed: int,
2405
):
2406
    """
2407
    Randomly and linear add start-up date to new pv generators.
2408
    Parameters
2409
    ----------
2410
    buildings_gdf : geopandas.GeoDataFrame
2411
        GeoDataFrame containing OSM buildings data with desaggregated PV
2412
        plants.
2413
    start : pandas.Timestamp
2414
        Minimum Timestamp to use.
2415
    end : pandas.Timestamp
2416
        Maximum Timestamp to use.
2417
    seed : int
2418
        Seed to use for random operations with NumPy and pandas.
2419
    Returns
2420
    -------
2421
    geopandas.GeoDataFrame
2422
        GeoDataFrame containing OSM buildings data with start-up date added.
2423
    """
2424
    rng = default_rng(seed=seed)
2425
2426
    date_range = pd.date_range(start=start, end=end, freq="1D")
2427
2428
    return buildings_gdf.assign(
2429
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2430
    )
2431
2432
2433
@timer_func
2434
def allocate_scenarios(
2435
    mastr_gdf: gpd.GeoDataFrame,
2436
    buildings_gdf: gpd.GeoDataFrame,
2437
    last_scenario_gdf: gpd.GeoDataFrame,
2438
    scenario: str,
2439
):
2440
    """
2441
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2442
    Parameters
2443
    ----------
2444
    mastr_gdf : geopandas.GeoDataFrame
2445
        GeoDataFrame containing geocoded MaStR data.
2446
    buildings_gdf : geopandas.GeoDataFrame
2447
        GeoDataFrame containing OSM buildings data.
2448
    last_scenario_gdf : geopandas.GeoDataFrame
2449
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2450
        preceding scenario.
2451
    scenario : str
2452
        Scenario to desaggrgate and allocate.
2453
    Returns
2454
    -------
2455
    tuple
2456
        geopandas.GeoDataFrame
2457
            GeoDataFrame containing OSM buildings matched with pv generators.
2458
        pandas.DataFrame
2459
            DataFrame containing pv rooftop capacity per grid id.
2460
    """
2461
    grid_districts_gdf = grid_districts(EPSG)
2462
2463
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
2464
2465
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
2466
        grid_districts_gdf,
2467
        federal_state_gdf,
2468
    )
2469
2470
    buildings_overlay_gdf = add_overlay_id_to_buildings(
2471
        buildings_gdf,
2472
        grid_federal_state_gdf,
2473
    )
2474
2475
    valid_buildings_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
2476
2477
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2478
2479
    logger.debug(
2480
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2481
    )
2482
2483
    last_scenario_gdf = determine_end_of_life_gens(
2484
        last_scenario_gdf,
2485
        SCENARIO_TIMESTAMP[scenario],
2486
        PV_ROOFTOP_LIFETIME,
2487
    )
2488
2489
    buildings_gdf = calculate_max_pv_cap_per_building(
2490
        valid_buildings_gdf,
2491
        last_scenario_gdf,
2492
        PV_CAP_PER_SQ_M,
2493
        ROOF_FACTOR,
2494
    )
2495
2496
    mastr_gdf = calculate_building_load_factor(
2497
        mastr_gdf,
2498
        buildings_gdf,
2499
    )
2500
2501
    probabilities_dict = probabilities(
2502
        mastr_gdf,
2503
        cap_ranges=CAP_RANGES,
2504
    )
2505
2506
    cap_share_dict = cap_share_per_cap_range(
2507
        mastr_gdf,
2508
        cap_ranges=CAP_RANGES,
2509
    )
2510
2511
    load_factor_dict = mean_load_factor_per_cap_range(
2512
        mastr_gdf,
2513
        cap_ranges=CAP_RANGES,
2514
    )
2515
2516
    building_area_range_dict = building_area_range_per_cap_range(
2517
        mastr_gdf,
2518
        cap_ranges=CAP_RANGES,
2519
        min_building_size=MIN_BUILDING_SIZE,
2520
        upper_quantile=UPPER_QUNATILE,
2521
        lower_quantile=LOWER_QUANTILE,
2522
    )
2523
2524
    allocated_buildings_gdf = desaggregate_pv(
2525
        buildings_gdf=buildings_gdf,
2526
        cap_df=cap_per_bus_id_df,
2527
        prob_dict=probabilities_dict,
2528
        cap_share_dict=cap_share_dict,
2529
        building_area_range_dict=building_area_range_dict,
2530
        load_factor_dict=load_factor_dict,
2531
        seed=SEED,
2532
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2533
    )
2534
2535
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2536
2537
    meta_buildings_gdf = frame_to_numeric(
2538
        add_buildings_meta_data(
2539
            allocated_buildings_gdf,
2540
            probabilities_dict,
2541
            SEED,
2542
        )
2543
    )
2544
2545
    return (
2546
        add_start_up_date(
2547
            meta_buildings_gdf,
2548
            start=last_scenario_gdf.start_up_date.max(),
2549
            end=SCENARIO_TIMESTAMP[scenario],
2550
            seed=SEED,
2551
        ),
2552
        cap_per_bus_id_df,
2553
    )
2554
2555
2556
class EgonPowerPlantPvRoofBuildingScenario(Base):
2557
    __tablename__ = "egon_power_plants_pv_roof_building"
2558
    __table_args__ = {"schema": "supply"}
2559
2560
    index = Column(Integer, primary_key=True, index=True)
2561
    scenario = Column(String)
2562
    building_id = Column(Integer)
2563
    gens_id = Column(String, nullable=True)
2564
    capacity = Column(Float)
2565
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2566
    hauptausrichtung = Column(String)
2567
    hauptausrichtung_neigungswinkel = Column(String)
2568
    voltage_level = Column(Integer)
2569
2570
2571
def create_scenario_table(buildings_gdf):
2572
    """Create mapping table pv_unit <-> building for scenario"""
2573
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2574
        bind=engine, checkfirst=True
2575
    )
2576
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2577
        bind=engine, checkfirst=True
2578
    )
2579
2580
    buildings_gdf.rename(columns=COLS_TO_RENAME)[
2581
        COLS_TO_EXPORT
2582
    ].reset_index().to_sql(
2583
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2584
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2585
        con=db.engine(),
2586
        if_exists="append",
2587
        index=False,
2588
    )
2589
2590
2591
def geocode_mastr_data():
2592
    """
2593
    Read PV rooftop data from MaStR CSV
2594
    TODO: the source will be replaced as soon as the MaStR data is available
2595
     in DB.
2596
    """
2597
    mastr_df = mastr_data(
2598
        MASTR_INDEX_COL,
2599
        MASTR_RELEVANT_COLS,
2600
        MASTR_DTYPES,
2601
        MASTR_PARSE_DATES,
2602
    )
2603
2604
    clean_mastr_df = clean_mastr_data(
2605
        mastr_df,
2606
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2607
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2608
        seed=SEED,
2609
        rounding=ROUNDING,
2610
    )
2611
2612
    geocoding_df = geocoding_data(clean_mastr_df)
2613
2614
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2615
2616
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2617
2618
    create_geocoded_table(geocode_gdf)
2619
2620
2621
def pv_rooftop_to_buildings():
2622
    """Main script, executed as task"""
2623
2624
    mastr_gdf = load_mastr_data()
2625
2626
    buildings_gdf = load_building_data()
2627
2628
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2629
        mastr_gdf, buildings_gdf
2630
    )
2631
2632
    all_buildings_gdf = (
2633
        desagg_mastr_gdf.assign(scenario="status_quo")
2634
        .reset_index()
2635
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2636
    )
2637
2638
    scenario_buildings_gdf = all_buildings_gdf.copy()
2639
2640
    cap_per_bus_id_df = pd.DataFrame()
2641
2642
    for scenario in SCENARIOS:
2643
        logger.debug(f"Desaggregating scenario {scenario}.")
2644
        (
2645
            scenario_buildings_gdf,
2646
            cap_per_bus_id_scenario_df,
2647
        ) = allocate_scenarios(  # noqa: F841
2648
            desagg_mastr_gdf,
2649
            desagg_buildings_gdf,
2650
            scenario_buildings_gdf,
2651
            scenario,
2652
        )
2653
2654
        all_buildings_gdf = gpd.GeoDataFrame(
2655
            pd.concat(
2656
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2657
            ),
2658
            crs=scenario_buildings_gdf.crs,
2659
            geometry="geom",
2660
        )
2661
2662
        cap_per_bus_id_df = pd.concat(
2663
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2664
        )
2665
2666
    # export scenario
2667
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2668