Passed
Pull Request — dev (#934)
by
unknown
01:32
created

grid_districts()   A

Complexity

Conditions 1

Size

Total Lines 32
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 32
rs 9.85
c 0
b 0
f 0
cc 1
nop 1
1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035", "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(20 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "building_id",
193
    "gens_id",
194
    "capacity",
195
    "einheitliche_ausrichtung_und_neigungswinkel",
196
    "hauptausrichtung",
197
    "hauptausrichtung_neigungswinkel",
198
    "voltage_level",
199
]
200
201
# TODO
202
INCLUDE_SYNTHETIC_BUILDINGS = True
203
ONLY_BUILDINGS_WITH_DEMAND = True
204
TEST_RUN = False
205
206
207
def timer_func(func):
208
    @wraps(func)
209
    def timeit_wrapper(*args, **kwargs):
210
        start_time = perf_counter()
211
        result = func(*args, **kwargs)
212
        end_time = perf_counter()
213
        total_time = end_time - start_time
214
        logger.debug(
215
            f"Function {func.__name__} took {total_time:.4f} seconds."
216
        )
217
        return result
218
219
    return timeit_wrapper
220
221
222
@timer_func
223
def mastr_data(
224
    index_col: str | int | list[str] | list[int],
225
    usecols: list[str],
226
    dtype: dict[str, Any] | None,
227
    parse_dates: list[str] | None,
228
) -> pd.DataFrame:
229
    """
230
    Read MaStR data from csv.
231
232
    Parameters
233
    -----------
234
    index_col : str, int or list of str or int
235
        Column(s) to use as the row labels of the DataFrame.
236
    usecols : list of str
237
        Return a subset of the columns.
238
    dtype : dict of column (str) -> type (any), optional
239
        Data type for data or columns.
240
    parse_dates : list of names (str), optional
241
        Try to parse given columns to datetime.
242
    Returns
243
    -------
244
    pandas.DataFrame
245
        DataFrame containing MaStR data.
246
    """
247
    mastr_path = Path(
248
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
249
    ).resolve()
250
251
    mastr_df = pd.read_csv(
252
        mastr_path,
253
        index_col=index_col,
254
        usecols=usecols,
255
        dtype=dtype,
256
        parse_dates=parse_dates,
257
    )
258
259
    mastr_df = mastr_df.loc[
260
        (mastr_df.StatisikFlag == "B")
261
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
262
        & (mastr_df.Land == "Deutschland")
263
        & (mastr_df.Lage == "BaulicheAnlagen")
264
    ]
265
266
    if (
267
        config.settings()["egon-data"]["--dataset-boundary"]
268
        == "Schleswig-Holstein"
269
    ):
270
        init_len = len(mastr_df)
271
272
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
273
274
        logger.info(
275
            f"Using only MaStR data within Schleswig-Holstein. "
276
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
277
        )
278
279
    logger.debug("MaStR data loaded.")
280
281
    return mastr_df
282
283
284
@timer_func
285
def clean_mastr_data(
286
    mastr_df: pd.DataFrame,
287
    max_realistic_pv_cap: int | float,
288
    min_realistic_pv_cap: int | float,
289
    rounding: int,
290
    seed: int,
291
) -> pd.DataFrame:
292
    """
293
    Clean the MaStR data from implausible data.
294
295
    * Drop MaStR ID duplicates.
296
    * Drop generators with implausible capacities.
297
    * Drop generators without any kind of start-up date.
298
    * Clean up Standort column and capacity.
299
300
    Parameters
301
    -----------
302
    mastr_df : pandas.DataFrame
303
        DataFrame containing MaStR data.
304
    max_realistic_pv_cap : int or float
305
        Maximum capacity, which is considered to be realistic.
306
    min_realistic_pv_cap : int or float
307
        Minimum capacity, which is considered to be realistic.
308
    rounding : int
309
        Rounding to use when cleaning up capacity. E.g. when
310
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
311
    seed : int
312
        Seed to use for random operations with NumPy and pandas.
313
    Returns
314
    -------
315
    pandas.DataFrame
316
        DataFrame containing cleaned MaStR data.
317
    """
318
    init_len = len(mastr_df)
319
320
    # drop duplicates
321
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
322
323
    # drop invalid entries in standort
324
    index_to_drop = mastr_df.loc[
325
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
326
    ].index
327
328
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
329
330
    df = mastr_df[
331
        [
332
            "Bruttoleistung",
333
            "Bruttoleistung_extended",
334
            "Nettonennleistung",
335
            "zugeordneteWirkleistungWechselrichter",
336
            "InstallierteLeistung",
337
        ]
338
    ].round(rounding)
339
340
    # use only the smallest capacity rating if multiple are given
341
    mastr_df = mastr_df.assign(
342
        capacity=[
343
            most_plausible(p_tub, min_realistic_pv_cap)
344
            for p_tub in df.itertuples(index=False)
345
        ]
346
    )
347
348
    # drop generators without any capacity info
349
    # and capacity of zero
350
    # and if the capacity is > 23.5 MW, because
351
    # Germanies largest rooftop PV is 23 MW
352
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
353
    mastr_df = mastr_df.loc[
354
        (~mastr_df.capacity.isna())
355
        & (mastr_df.capacity <= max_realistic_pv_cap)
356
        & (mastr_df.capacity > min_realistic_pv_cap)
357
    ]
358
359
    # get zip and municipality
360
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
361
        mastr_df.Standort.astype(str)
362
        .apply(
363
            zip_and_municipality_from_standort,
364
            args=(VERBOSE,),
365
        )
366
        .tolist(),
367
        index=mastr_df.index,
368
    )
369
370
    # drop invalid entries
371
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
372
373
    # add ", Deutschland" just in case
374
    mastr_df = mastr_df.assign(
375
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
376
    )
377
378
    # get consistent start-up date
379
    mastr_df = mastr_df.assign(
380
        start_up_date=mastr_df.Inbetriebnahmedatum,
381
    )
382
383
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
384
        mastr_df.start_up_date.isna()
385
    ].assign(
386
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
387
            mastr_df.start_up_date.isna()
388
        ]
389
    )
390
391
    # randomly and weighted fill missing start-up dates
392
    pool = mastr_df.loc[
393
        ~mastr_df.start_up_date.isna()
394
    ].start_up_date.to_numpy()
395
396
    size = len(mastr_df) - len(pool)
397
398
    if size > 0:
399
        np.random.seed(seed)
400
401
        choice = np.random.choice(
402
            pool,
403
            size=size,
404
            replace=False,
405
        )
406
407
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
408
            mastr_df.start_up_date.isna()
409
        ].assign(start_up_date=choice)
410
411
        logger.info(
412
            f"Randomly and weigthed added start-up date to {size} generators."
413
        )
414
415
    mastr_df = mastr_df.assign(
416
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
417
    )
418
419
    end_len = len(mastr_df)
420
    logger.debug(
421
        f"Dropped {init_len - end_len} "
422
        f"({((init_len - end_len) / init_len) * 100:g}%)"
423
        f" of {init_len} rows from MaStR DataFrame."
424
    )
425
426
    return mastr_df
427
428
429
def zip_and_municipality_from_standort(
430
    standort: str,
431
    verbose: bool = False,
432
) -> tuple[str, bool]:
433
    """
434
    Get zip code and municipality from Standort string split into a list.
435
    Parameters
436
    -----------
437
    standort : str
438
        Standort as given from MaStR data.
439
    verbose : bool
440
        Logs additional info if True.
441
    Returns
442
    -------
443
    str
444
        Standort with only the zip code and municipality
445
        as well a ', Germany' added.
446
    """
447
    if verbose:
448
        logger.debug(f"Uncleaned String: {standort}")
449
450
    standort_list = standort.split()
451
452
    found = False
453
    count = 0
454
455
    for count, elem in enumerate(standort_list):
456
        if len(elem) != 5:
457
            continue
458
        if not elem.isnumeric():
459
            continue
460
461
        found = True
462
463
        break
464
465
    if found:
466
        cleaned_str = " ".join(standort_list[count:])
467
468
        if verbose:
469
            logger.debug(f"Cleaned String:   {cleaned_str}")
470
471
        return cleaned_str, found
472
473
    logger.warning(
474
        "Couldn't identify zip code. This entry will be dropped."
475
        f" Original standort: {standort}."
476
    )
477
478
    return standort, found
479
480
481
def most_plausible(
482
    p_tub: tuple,
483
    min_realistic_pv_cap: int | float,
484
) -> float:
485
    """
486
    Try to determine the most plausible capacity.
487
    Try to determine the most plausible capacity from a given
488
    generator from MaStR data.
489
    Parameters
490
    -----------
491
    p_tub : tuple
492
        Tuple containing the different capacities given in
493
        the MaStR data.
494
    min_realistic_pv_cap : int or float
495
        Minimum capacity, which is considered to be realistic.
496
    Returns
497
    -------
498
    float
499
        Capacity of the generator estimated as the most realistic.
500
    """
501
    count = Counter(p_tub).most_common(3)
502
503
    if len(count) == 1:
504
        return count[0][0]
505
506
    val1 = count[0][0]
507
    val2 = count[1][0]
508
509
    if len(count) == 2:
510
        min_val = min(val1, val2)
511
        max_val = max(val1, val2)
512
    else:
513
        val3 = count[2][0]
514
515
        min_val = min(val1, val2, val3)
516
        max_val = max(val1, val2, val3)
517
518
    if min_val < min_realistic_pv_cap:
519
        return max_val
520
521
    return min_val
522
523
524
def geocoder(
525
    user_agent: str,
526
    min_delay_seconds: int,
527
) -> RateLimiter:
528
    """
529
    Setup Nominatim geocoding class.
530
    Parameters
531
    -----------
532
    user_agent : str
533
        The app name.
534
    min_delay_seconds : int
535
        Delay in seconds to use between requests to Nominatim.
536
        A minimum of 1 is advised.
537
    Returns
538
    -------
539
    geopy.extra.rate_limiter.RateLimiter
540
        Nominatim RateLimiter geocoding class to use for geocoding.
541
    """
542
    locator = Nominatim(user_agent=user_agent)
543
    return RateLimiter(
544
        locator.geocode,
545
        min_delay_seconds=min_delay_seconds,
546
    )
547
548
549
def geocoding_data(
550
    clean_mastr_df: pd.DataFrame,
551
) -> pd.DataFrame:
552
    """
553
    Setup DataFrame to geocode.
554
    Parameters
555
    -----------
556
    clean_mastr_df : pandas.DataFrame
557
        DataFrame containing cleaned MaStR data.
558
    Returns
559
    -------
560
    pandas.DataFrame
561
        DataFrame containing all unique combinations of
562
        zip codes with municipalities for geocoding.
563
    """
564
    return pd.DataFrame(
565
        data=clean_mastr_df.zip_and_municipality.unique(),
566
        columns=["zip_and_municipality"],
567
    )
568
569
570
@timer_func
571
def geocode_data(
572
    geocoding_df: pd.DataFrame,
573
    ratelimiter: RateLimiter,
574
    epsg: int,
575
) -> gpd.GeoDataFrame:
576
    """
577
    Geocode zip code and municipality.
578
    Extract latitude, longitude and altitude.
579
    Transfrom latitude and longitude to shapely
580
    Point and return a geopandas GeoDataFrame.
581
    Parameters
582
    -----------
583
    geocoding_df : pandas.DataFrame
584
        DataFrame containing all unique combinations of
585
        zip codes with municipalities for geocoding.
586
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
587
        Nominatim RateLimiter geocoding class to use for geocoding.
588
    epsg : int
589
        EPSG ID to use as CRS.
590
    Returns
591
    -------
592
    geopandas.GeoDataFrame
593
        GeoDataFrame containing all unique combinations of
594
        zip codes with municipalities with matching geolocation.
595
    """
596
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
597
598
    geocode_df = geocoding_df.assign(
599
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
600
    )
601
602
    geocode_df = geocode_df.assign(
603
        point=geocode_df.location.apply(
604
            lambda loc: tuple(loc.point) if loc else None
605
        )
606
    )
607
608
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
609
        geocode_df.point.tolist(), index=geocode_df.index
610
    )
611
612
    return gpd.GeoDataFrame(
613
        geocode_df,
614
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
615
        crs=f"EPSG:{epsg}",
616
    )
617
618
619
def merge_geocode_with_mastr(
620
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
621
) -> gpd.GeoDataFrame:
622
    """
623
    Merge geometry to original mastr data.
624
    Parameters
625
    -----------
626
    clean_mastr_df : pandas.DataFrame
627
        DataFrame containing cleaned MaStR data.
628
    geocode_gdf : geopandas.GeoDataFrame
629
        GeoDataFrame containing all unique combinations of
630
        zip codes with municipalities with matching geolocation.
631
    Returns
632
    -------
633
    gepandas.GeoDataFrame
634
        GeoDataFrame containing cleaned MaStR data with
635
        matching geolocation from geocoding.
636
    """
637
    return gpd.GeoDataFrame(
638
        clean_mastr_df.merge(
639
            geocode_gdf[["zip_and_municipality", "geometry"]],
640
            how="left",
641
            left_on="zip_and_municipality",
642
            right_on="zip_and_municipality",
643
        ),
644
        crs=geocode_gdf.crs,
645
    ).set_index(clean_mastr_df.index)
646
647
648
def drop_invalid_entries_from_gdf(
649
    gdf: gpd.GeoDataFrame,
650
) -> gpd.GeoDataFrame:
651
    """
652
    Drop invalid entries from geopandas GeoDataFrame.
653
    TODO: how to omit the logging from geos here???
654
    Parameters
655
    -----------
656
    gdf : geopandas.GeoDataFrame
657
        GeoDataFrame to be checked for validity.
658
    Returns
659
    -------
660
    gepandas.GeoDataFrame
661
        GeoDataFrame with rows with invalid geometries
662
        dropped.
663
    """
664
    valid_gdf = gdf.loc[gdf.is_valid]
665
666
    logger.debug(
667
        f"{len(gdf) - len(valid_gdf)} "
668
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
669
        f"of {len(gdf)} values were invalid and are dropped."
670
    )
671
672
    return valid_gdf
673
674
675
@timer_func
676
def municipality_data() -> gpd.GeoDataFrame:
677
    """
678
    Get municipality data from eGo^n Database.
679
    Returns
680
    -------
681
    gepandas.GeoDataFrame
682
        GeoDataFrame with municipality data.
683
    """
684
    with db.session_scope() as session:
685
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
686
687
    return gpd.read_postgis(
688
        query.statement, query.session.bind, index_col="ags"
689
    )
690
691
692
@timer_func
693
def add_ags_to_gens(
694
    valid_mastr_gdf: gpd.GeoDataFrame,
695
    municipalities_gdf: gpd.GeoDataFrame,
696
) -> gpd.GeoDataFrame:
697
    """
698
    Add information about AGS ID to generators.
699
    Parameters
700
    -----------
701
    valid_mastr_gdf : geopandas.GeoDataFrame
702
        GeoDataFrame with valid and cleaned MaStR data.
703
    municipalities_gdf : geopandas.GeoDataFrame
704
        GeoDataFrame with municipality data.
705
    Returns
706
    -------
707
    gepandas.GeoDataFrame
708
        GeoDataFrame with valid and cleaned MaStR data
709
        with AGS ID added.
710
    """
711
    return valid_mastr_gdf.sjoin(
712
        municipalities_gdf,
713
        how="left",
714
        predicate="intersects",
715
    ).rename(columns={"index_right": "ags"})
716
717
718
def drop_gens_outside_muns(
719
    valid_mastr_gdf: gpd.GeoDataFrame,
720
) -> gpd.GeoDataFrame:
721
    """
722
    Drop all generators outside of municipalities.
723
    Parameters
724
    -----------
725
    valid_mastr_gdf : geopandas.GeoDataFrame
726
        GeoDataFrame with valid and cleaned MaStR data.
727
    Returns
728
    -------
729
    gepandas.GeoDataFrame
730
        GeoDataFrame with valid and cleaned MaStR data
731
        with generatos without an AGS ID dropped.
732
    """
733
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
734
735
    logger.debug(
736
        f"{len(valid_mastr_gdf) - len(gdf)} "
737
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
738
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
739
        " and are therefore dropped."
740
    )
741
742
    return gdf
743
744
745
class EgonMastrPvRoofGeocoded(Base):
746
    __tablename__ = "egon_mastr_pv_roof_geocoded"
747
    __table_args__ = {"schema": "supply"}
748
749
    zip_and_municipality = Column(String, primary_key=True, index=True)
750
    location = Column(String)
751
    point = Column(String)
752
    latitude = Column(Float)
753
    longitude = Column(Float)
754
    altitude = Column(Float)
755
    geometry = Column(Geometry(srid=EPSG))
756
757
758
def create_geocoded_table(geocode_gdf):
759
    """
760
    Create geocoded table mastr pv rooftop
761
    Parameters
762
    -----------
763
    geocode_gdf : geopandas.GeoDataFrame
764
        GeoDataFrame containing geocoding information on pv rooftop locations.
765
    """
766
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
767
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
768
769
    geocode_gdf.to_postgis(
770
        name=EgonMastrPvRoofGeocoded.__table__.name,
771
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
772
        con=db.engine(),
773
        if_exists="append",
774
        index=False,
775
        # dtype={}
776
    )
777
778
779
def geocoded_data_from_db(
780
    epsg: str | int,
781
) -> gpd.GeoDataFrame:
782
    """
783
    Read OSM buildings data from eGo^n Database.
784
    Parameters
785
    -----------
786
    to_crs : pyproj.crs.crs.CRS
787
        CRS to transform geometries to.
788
    Returns
789
    -------
790
    geopandas.GeoDataFrame
791
        GeoDataFrame containing OSM buildings data.
792
    """
793
    with db.session_scope() as session:
794
        query = session.query(
795
            EgonMastrPvRoofGeocoded.zip_and_municipality,
796
            EgonMastrPvRoofGeocoded.geometry,
797
        )
798
799
    return gpd.read_postgis(
800
        query.statement, query.session.bind, geom_col="geometry"
801
    ).to_crs(f"EPSG:{epsg}")
802
803
804
def load_mastr_data():
805
    """Read PV rooftop data from MaStR CSV
806
    Note: the source will be replaced as soon as the MaStR data is available
807
    in DB.
808
    Returns
809
    -------
810
    geopandas.GeoDataFrame
811
        GeoDataFrame containing MaStR data with geocoded locations.
812
    """
813
    mastr_df = mastr_data(
814
        MASTR_INDEX_COL,
815
        MASTR_RELEVANT_COLS,
816
        MASTR_DTYPES,
817
        MASTR_PARSE_DATES,
818
    )
819
820
    clean_mastr_df = clean_mastr_data(
821
        mastr_df,
822
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
823
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
824
        seed=SEED,
825
        rounding=ROUNDING,
826
    )
827
828
    geocode_gdf = geocoded_data_from_db(EPSG)
829
830
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
831
832
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
833
834
    municipalities_gdf = municipality_data()
835
836
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
837
838
    return drop_gens_outside_muns(valid_mastr_gdf)
839
840
841
class OsmBuildingsFiltered(Base):
842
    __tablename__ = "osm_buildings_filtered"
843
    __table_args__ = {"schema": "openstreetmap"}
844
845
    osm_id = Column(BigInteger)
846
    amenity = Column(String)
847
    building = Column(String)
848
    name = Column(String)
849
    geom = Column(Geometry(srid=SRID), index=True)
850
    area = Column(Float)
851
    geom_point = Column(Geometry(srid=SRID), index=True)
852
    tags = Column(HSTORE)
853
    id = Column(BigInteger, primary_key=True, index=True)
854
855
856
@timer_func
857
def osm_buildings(
858
    to_crs: CRS,
859
) -> gpd.GeoDataFrame:
860
    """
861
    Read OSM buildings data from eGo^n Database.
862
    Parameters
863
    -----------
864
    to_crs : pyproj.crs.crs.CRS
865
        CRS to transform geometries to.
866
    Returns
867
    -------
868
    geopandas.GeoDataFrame
869
        GeoDataFrame containing OSM buildings data.
870
    """
871
    with db.session_scope() as session:
872
        query = session.query(
873
            OsmBuildingsFiltered.id,
874
            OsmBuildingsFiltered.area,
875
            OsmBuildingsFiltered.geom_point.label("geom"),
876
        )
877
878
    return gpd.read_postgis(
879
        query.statement, query.session.bind, index_col="id"
880
    ).to_crs(to_crs)
881
882
883
@timer_func
884
def synthetic_buildings(
885
    to_crs: CRS,
886
) -> gpd.GeoDataFrame:
887
    """
888
    Read synthetic buildings data from eGo^n Database.
889
    Parameters
890
    -----------
891
    to_crs : pyproj.crs.crs.CRS
892
        CRS to transform geometries to.
893
    Returns
894
    -------
895
    geopandas.GeoDataFrame
896
        GeoDataFrame containing OSM buildings data.
897
    """
898
    with db.session_scope() as session:
899
        query = session.query(
900
            OsmBuildingsSynthetic.id,
901
            OsmBuildingsSynthetic.area,
902
            OsmBuildingsSynthetic.geom_point.label("geom"),
903
        )
904
905
    return gpd.read_postgis(
906
        query.statement, query.session.bind, index_col="id"
907
    ).to_crs(to_crs)
908
909
910
@timer_func
911
def add_ags_to_buildings(
912
    buildings_gdf: gpd.GeoDataFrame,
913
    municipalities_gdf: gpd.GeoDataFrame,
914
) -> gpd.GeoDataFrame:
915
    """
916
    Add information about AGS ID to buildings.
917
    Parameters
918
    -----------
919
    buildings_gdf : geopandas.GeoDataFrame
920
        GeoDataFrame containing OSM buildings data.
921
    municipalities_gdf : geopandas.GeoDataFrame
922
        GeoDataFrame with municipality data.
923
    Returns
924
    -------
925
    gepandas.GeoDataFrame
926
        GeoDataFrame containing OSM buildings data
927
        with AGS ID added.
928
    """
929
    return buildings_gdf.sjoin(
930
        municipalities_gdf,
931
        how="left",
932
        predicate="intersects",
933
    ).rename(columns={"index_right": "ags"})
934
935
936
def drop_buildings_outside_muns(
937
    buildings_gdf: gpd.GeoDataFrame,
938
) -> gpd.GeoDataFrame:
939
    """
940
    Drop all buildings outside of municipalities.
941
    Parameters
942
    -----------
943
    buildings_gdf : geopandas.GeoDataFrame
944
        GeoDataFrame containing OSM buildings data.
945
    Returns
946
    -------
947
    gepandas.GeoDataFrame
948
        GeoDataFrame containing OSM buildings data
949
        with buildings without an AGS ID dropped.
950
    """
951
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
952
953
    logger.debug(
954
        f"{len(buildings_gdf) - len(gdf)} "
955
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
956
        f"of {len(buildings_gdf)} values are outside of the municipalities "
957
        "and are therefore dropped."
958
    )
959
960
    return gdf
961
962
963
def egon_building_peak_loads():
964
    sql = """
965
    SELECT building_id
966
    FROM demand.egon_building_electricity_peak_loads
967
    WHERE scenario = 'eGon2035'
968
    """
969
970
    return (
971
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
972
    )
973
974
975
@timer_func
976
def load_building_data():
977
    """
978
    Read buildings from DB
979
    Tables:
980
981
    * `openstreetmap.osm_buildings_filtered` (from OSM)
982
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
983
984
    Use column `id` for both as it is unique hence you concat both datasets. If
985
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
986
987
    Returns
988
    -------
989
    gepandas.GeoDataFrame
990
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
991
        dropped.
992
    """
993
994
    municipalities_gdf = municipality_data()
995
996
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
997
998
    if INCLUDE_SYNTHETIC_BUILDINGS:
999
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
1000
1001
        buildings_gdf = gpd.GeoDataFrame(
1002
            pd.concat(
1003
                [
1004
                    osm_buildings_gdf,
1005
                    synthetic_buildings_gdf,
1006
                ]
1007
            ),
1008
            geometry="geom",
1009
            crs=osm_buildings_gdf.crs,
1010
        ).rename(columns={"area": "building_area"})
1011
1012
        buildings_gdf.index = buildings_gdf.index.astype(int)
1013
1014
    else:
1015
        buildings_gdf = osm_buildings_gdf.rename(
1016
            columns={"area": "building_area"}
1017
        )
1018
1019
    if ONLY_BUILDINGS_WITH_DEMAND:
1020
        building_ids = egon_building_peak_loads()
1021
1022
        init_len = len(building_ids)
1023
1024
        building_ids = np.intersect1d(
1025
            list(map(int, building_ids)),
1026
            list(map(int, buildings_gdf.index.to_numpy())),
1027
        )
1028
1029
        end_len = len(building_ids)
1030
1031
        logger.debug(
1032
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of buildings have "
1033
            f"peak load."
1034
        )
1035
1036
        buildings_gdf = buildings_gdf.loc[building_ids]
1037
1038
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1039
1040
    logger.debug("Loaded buildings.")
1041
1042
    return drop_buildings_outside_muns(buildings_ags_gdf)
1043
1044
1045
@timer_func
1046
def sort_and_qcut_df(
1047
    df: pd.DataFrame | gpd.GeoDataFrame,
1048
    col: str,
1049
    q: int,
1050
) -> pd.DataFrame | gpd.GeoDataFrame:
1051
    """
1052
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1053
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1054
    Parameters
1055
    -----------
1056
    df : pandas.DataFrame or geopandas.GeoDataFrame
1057
        (Geo)DataFrame to sort and qcut.
1058
    col : str
1059
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1060
    q : int
1061
        Number of quantiles.
1062
    Returns
1063
    -------
1064
    pandas.DataFrame or gepandas.GeoDataFrame
1065
        Sorted and qcut (Geo)DataFrame.
1066
    """
1067
    df = df.sort_values(col, ascending=True)
1068
1069
    return df.assign(
1070
        quant=pd.qcut(
1071
            df[col],
1072
            q=q,
1073
            labels=range(q),
1074
        )
1075
    )
1076
1077
1078
@timer_func
1079
def allocate_pv(
1080
    q_mastr_gdf: gpd.GeoDataFrame,
1081
    q_buildings_gdf: gpd.GeoDataFrame,
1082
    seed: int,
1083
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1084
    """
1085
    Allocate the MaStR pv generators to the OSM buildings.
1086
    This will determine a building for each pv generator if there are more
1087
    buildings than generators within a given AGS. Primarily generators are
1088
    distributed with the same qunatile as the buildings. Multiple assignment
1089
    is excluded.
1090
    Parameters
1091
    -----------
1092
    q_mastr_gdf : geopandas.GeoDataFrame
1093
        GeoDataFrame containing geocoded and qcut MaStR data.
1094
    q_buildings_gdf : geopandas.GeoDataFrame
1095
        GeoDataFrame containing qcut OSM buildings data.
1096
    seed : int
1097
        Seed to use for random operations with NumPy and pandas.
1098
    Returns
1099
    -------
1100
    tuple with two geopandas.GeoDataFrame s
1101
        GeoDataFrame containing MaStR data allocated to building IDs.
1102
        GeoDataFrame containing building data allocated to MaStR IDs.
1103
    """
1104
    rng = default_rng(seed=seed)
1105
1106
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1107
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1108
1109
    ags_list = q_buildings_gdf.ags.unique()
1110
1111
    if TEST_RUN:
1112
        ags_list = ags_list[:250]
1113
1114
    num_ags = len(ags_list)
1115
1116
    t0 = perf_counter()
1117
1118
    for count, ags in enumerate(ags_list):
1119
1120
        buildings = q_buildings_gdf.loc[
1121
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1122
        ]
1123
        gens = q_mastr_gdf.loc[
1124
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1125
        ]
1126
1127
        len_build = len(buildings)
1128
        len_gens = len(gens)
1129
1130
        if len_build < len_gens:
1131
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1132
            logger.error(
1133
                f"There are {len_gens} generators and only {len_build}"
1134
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1135
                "generators were truncated to match the amount of buildings."
1136
            )
1137
1138
            assert len_build == len(gens)
1139
1140
        for quant in gens.quant.unique():
1141
            q_buildings = buildings.loc[
1142
                (buildings.quant == quant) & (buildings.gens_id.isna())
1143
            ]
1144
            q_gens = gens.loc[gens.quant == quant]
1145
1146
            len_build = len(q_buildings)
1147
            len_gens = len(q_gens)
1148
1149
            if len_build < len_gens:
1150
                delta = len_gens - len_build
1151
1152
                logger.warning(
1153
                    f"There are {len_gens} generators and only {len_build} "
1154
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1155
                    f"buildings from AGS {ags} will be added randomly."
1156
                )
1157
1158
                add_buildings = pd.Index(
1159
                    rng.choice(
1160
                        buildings.loc[
1161
                            (buildings.quant != quant)
1162
                            & (buildings.gens_id.isna())
1163
                        ].index,
1164
                        size=delta,
1165
                        replace=False,
1166
                    )
1167
                )
1168
1169
                q_buildings = buildings.loc[
1170
                    q_buildings.index.append(add_buildings)
1171
                ]
1172
1173
                assert len(q_buildings) == len_gens
1174
1175
            chosen_buildings = pd.Index(
1176
                rng.choice(
1177
                    q_buildings.index,
1178
                    size=len_gens,
1179
                    replace=False,
1180
                )
1181
            )
1182
1183
            # q_mastr_gdf.loc[q_gens.index, "building_id"] = chosen_buildings
1184
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1185
1186
        if count % 100 == 0:
1187
            logger.debug(
1188
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1189
                f"{perf_counter() - t0:g} seconds."
1190
            )
1191
1192
            t0 = perf_counter()
1193
1194
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1195
1196
    q_mastr_gdf.loc[
1197
        assigned_buildings.gens_id, "building_id"
1198
    ] = assigned_buildings.index
1199
1200
    logger.debug("Allocated status quo generators to buildings.")
1201
1202
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1203
1204
1205
def frame_to_numeric(
1206
    df: pd.DataFrame | gpd.GeoDataFrame,
1207
) -> pd.DataFrame | gpd.GeoDataFrame:
1208
    """
1209
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1210
    Parameters
1211
    ----------
1212
    df : pandas.DataFrame or geopandas.GeoDataFrame
1213
    Returns
1214
    -------
1215
    pandas.DataFrame or geopandas.GeoDataFrame
1216
    """
1217
    if str(df.index.dtype) == "object":
1218
        df.index = pd.to_numeric(df.index, errors="ignore")
1219
1220
    for col in df.columns:
1221
        if str(df[col].dtype) == "object":
1222
            df[col] = pd.to_numeric(df[col], errors="ignore")
1223
1224
    return df
1225
1226
1227
def validate_output(
1228
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1229
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1230
) -> None:
1231
    """
1232
    Validate output.
1233
1234
    * Validate that there are exactly as many buildings with a pv system as there are
1235
      pv systems with a building
1236
    * Validate that the building IDs with a pv system are the same building IDs as
1237
      assigned to the pv systems
1238
    * Validate that the pv system IDs with a building are the same pv system IDs as
1239
      assigned to the buildings
1240
1241
    Parameters
1242
    -----------
1243
    desagg_mastr_gdf : geopandas.GeoDataFrame
1244
        GeoDataFrame containing MaStR data allocated to building IDs.
1245
    desagg_buildings_gdf : geopandas.GeoDataFrame
1246
        GeoDataFrame containing building data allocated to MaStR IDs.
1247
    """
1248
    assert len(
1249
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1250
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1251
    assert (
1252
        np.sort(
1253
            desagg_mastr_gdf.loc[
1254
                ~desagg_mastr_gdf.building_id.isna()
1255
            ].building_id.unique()
1256
        )
1257
        == np.sort(
1258
            desagg_buildings_gdf.loc[
1259
                ~desagg_buildings_gdf.gens_id.isna()
1260
            ].index.unique()
1261
        )
1262
    ).all()
1263
    assert (
1264
        np.sort(
1265
            desagg_mastr_gdf.loc[
1266
                ~desagg_mastr_gdf.building_id.isna()
1267
            ].index.unique()
1268
        )
1269
        == np.sort(
1270
            desagg_buildings_gdf.loc[
1271
                ~desagg_buildings_gdf.gens_id.isna()
1272
            ].gens_id.unique()
1273
        )
1274
    ).all()
1275
1276
    logger.debug("Validated output.")
1277
1278
1279
def drop_unallocated_gens(
1280
    gdf: gpd.GeoDataFrame,
1281
) -> gpd.GeoDataFrame:
1282
    """
1283
    Drop generators which did not get allocated.
1284
1285
    Parameters
1286
    -----------
1287
    gdf : geopandas.GeoDataFrame
1288
        GeoDataFrame containing MaStR data allocated to building IDs.
1289
    Returns
1290
    -------
1291
    geopandas.GeoDataFrame
1292
        GeoDataFrame containing MaStR data with generators dropped which did not get
1293
        allocated.
1294
    """
1295
    init_len = len(gdf)
1296
    gdf = gdf.loc[~gdf.building_id.isna()]
1297
    end_len = len(gdf)
1298
1299
    logger.debug(
1300
        f"Dropped {init_len - end_len} "
1301
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1302
        f" of {init_len} unallocated rows from MaStR DataFrame."
1303
    )
1304
1305
    return gdf
1306
1307
1308
@timer_func
1309
def allocate_to_buildings(
1310
    mastr_gdf: gpd.GeoDataFrame,
1311
    buildings_gdf: gpd.GeoDataFrame,
1312
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1313
    """
1314
    Allocate status quo pv rooftop generators to buildings.
1315
    Parameters
1316
    -----------
1317
    mastr_gdf : geopandas.GeoDataFrame
1318
        GeoDataFrame containing MaStR data with geocoded locations.
1319
    buildings_gdf : geopandas.GeoDataFrame
1320
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1321
        dropped.
1322
    Returns
1323
    -------
1324
    tuple with two geopandas.GeoDataFrame s
1325
        GeoDataFrame containing MaStR data allocated to building IDs.
1326
        GeoDataFrame containing building data allocated to MaStR IDs.
1327
    """
1328
    logger.debug("Starting allocation of status quo.")
1329
1330
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1331
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1332
1333
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1334
        q_mastr_gdf, q_buildings_gdf, SEED
1335
    )
1336
1337
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1338
1339
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1340
1341
1342
@timer_func
1343
def grid_districts(
1344
    epsg: int,
1345
) -> gpd.GeoDataFrame:
1346
    """
1347
    Load mv grid district geo data from eGo^n Database as
1348
    geopandas.GeoDataFrame.
1349
    Parameters
1350
    -----------
1351
    epsg : int
1352
        EPSG ID to use as CRS.
1353
    Returns
1354
    -------
1355
    geopandas.GeoDataFrame
1356
        GeoDataFrame containing mv grid district ID and geo shapes data.
1357
    """
1358
    gdf = db.select_geodataframe(
1359
        """
1360
        SELECT bus_id, geom
1361
        FROM grid.egon_mv_grid_district
1362
        ORDER BY bus_id
1363
        """,
1364
        index_col="bus_id",
1365
        geom_col="geom",
1366
        epsg=epsg,
1367
    )
1368
1369
    gdf.index = gdf.index.astype(int)
1370
1371
    logger.debug("Grid districts loaded.")
1372
1373
    return gdf
1374
1375
1376
def scenario_data(
1377
    carrier: str = "solar_rooftop",
1378
    scenario: str = "eGon2035",
1379
) -> pd.DataFrame:
1380
    """
1381
    Get scenario capacity data from eGo^n Database.
1382
    Parameters
1383
    -----------
1384
    carrier : str
1385
        Carrier type to filter table by.
1386
    scenario : str
1387
        Scenario to filter table by.
1388
    Returns
1389
    -------
1390
    geopandas.GeoDataFrame
1391
        GeoDataFrame with scenario capacity data in GW.
1392
    """
1393
    with db.session_scope() as session:
1394
        query = session.query(EgonScenarioCapacities).filter(
1395
            EgonScenarioCapacities.carrier == carrier,
1396
            EgonScenarioCapacities.scenario_name == scenario,
1397
        )
1398
1399
    df = pd.read_sql(
1400
        query.statement, query.session.bind, index_col="index"
1401
    ).sort_index()
1402
1403
    logger.debug("Scenario capacity data loaded.")
1404
1405
    return df
1406
1407
1408 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1409
    __tablename__ = "vg250_lan"
1410
    __table_args__ = {"schema": "boundaries"}
1411
1412
    id = Column(BigInteger, primary_key=True, index=True)
1413
    ade = Column(BigInteger)
1414
    gf = Column(BigInteger)
1415
    bsg = Column(BigInteger)
1416
    ars = Column(String)
1417
    ags = Column(String)
1418
    sdv_ars = Column(String)
1419
    gen = Column(String)
1420
    bez = Column(String)
1421
    ibz = Column(BigInteger)
1422
    bem = Column(String)
1423
    nbd = Column(String)
1424
    sn_l = Column(String)
1425
    sn_r = Column(String)
1426
    sn_k = Column(String)
1427
    sn_v1 = Column(String)
1428
    sn_v2 = Column(String)
1429
    sn_g = Column(String)
1430
    fk_s3 = Column(String)
1431
    nuts = Column(String)
1432
    ars_0 = Column(String)
1433
    ags_0 = Column(String)
1434
    wsk = Column(String)
1435
    debkg_id = Column(String)
1436
    rs = Column(String)
1437
    sdv_rs = Column(String)
1438
    rs_0 = Column(String)
1439
    geometry = Column(Geometry(srid=EPSG), index=True)
1440
1441
1442
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1443
    """
1444
    Get feder state data from eGo^n Database.
1445
    Parameters
1446
    -----------
1447
    to_crs : pyproj.crs.crs.CRS
1448
        CRS to transform geometries to.
1449
    Returns
1450
    -------
1451
    geopandas.GeoDataFrame
1452
        GeoDataFrame with federal state data.
1453
    """
1454
    with db.session_scope() as session:
1455
        query = session.query(
1456
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1457
        )
1458
1459
    gdf = gpd.read_postgis(
1460
        query.statement, query.session.bind, index_col="id"
1461
    ).to_crs(to_crs)
1462
1463
    logger.debug("Federal State data loaded.")
1464
1465
    return gdf
1466
1467
1468
@timer_func
1469
def overlay_grid_districts_with_counties(
1470
    mv_grid_district_gdf: gpd.GeoDataFrame,
1471
    federal_state_gdf: gpd.GeoDataFrame,
1472
) -> gpd.GeoDataFrame:
1473
    """
1474
    Calculate the intersections of mv grid districts and counties.
1475
    Parameters
1476
    -----------
1477
    mv_grid_district_gdf : gpd.GeoDataFrame
1478
        GeoDataFrame containing mv grid district ID and geo shapes data.
1479
    federal_state_gdf : gpd.GeoDataFrame
1480
        GeoDataFrame with federal state data.
1481
    Returns
1482
    -------
1483
    geopandas.GeoDataFrame
1484
        GeoDataFrame containing OSM buildings data.
1485
    """
1486
    logger.debug(
1487
        "Calculating intersection overlay between mv grid districts and "
1488
        "counties. This may take a while..."
1489
    )
1490
1491
    gdf = gpd.overlay(
1492
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1493
        mv_grid_district_gdf.reset_index(),
1494
        how="intersection",
1495
        keep_geom_type=True,
1496
    )
1497
1498
    logger.debug("Done!")
1499
1500
    return gdf
1501
1502
1503
@timer_func
1504
def add_overlay_id_to_buildings(
1505
    buildings_gdf: gpd.GeoDataFrame,
1506
    grid_federal_state_gdf: gpd.GeoDataFrame,
1507
) -> gpd.GeoDataFrame:
1508
    """
1509
    Add information about overlay ID to buildings.
1510
    Parameters
1511
    -----------
1512
    buildings_gdf : geopandas.GeoDataFrame
1513
        GeoDataFrame containing OSM buildings data.
1514
    grid_federal_state_gdf : geopandas.GeoDataFrame
1515
        GeoDataFrame with intersection shapes between counties and grid districts.
1516
    Returns
1517
    -------
1518
    geopandas.GeoDataFrame
1519
        GeoDataFrame containing OSM buildings data with overlay ID added.
1520
    """
1521
    gdf = (
1522
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1523
        .sjoin(
1524
            grid_federal_state_gdf,
1525
            how="left",
1526
            predicate="intersects",
1527
        )
1528
        .rename(columns={"index_right": "overlay_id"})
1529
    )
1530
1531
    logger.debug("Added overlay ID to OSM buildings.")
1532
1533
    return gdf
1534
1535
1536
def drop_buildings_outside_grids(
1537
    buildings_gdf: gpd.GeoDataFrame,
1538
) -> gpd.GeoDataFrame:
1539
    """
1540
    Drop all buildings outside of grid areas.
1541
    Parameters
1542
    -----------
1543
    buildings_gdf : geopandas.GeoDataFrame
1544
        GeoDataFrame containing OSM buildings data.
1545
    Returns
1546
    -------
1547
    gepandas.GeoDataFrame
1548
        GeoDataFrame containing OSM buildings data
1549
        with buildings without an bus ID dropped.
1550
    """
1551
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1552
1553
    logger.debug(
1554
        f"{len(buildings_gdf) - len(gdf)} "
1555
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1556
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1557
        "and are therefore dropped."
1558
    )
1559
1560
    return gdf
1561
1562
1563
def cap_per_bus_id(
1564
    scenario: str,
1565
) -> pd.DataFrame:
1566
    """
1567
    Get table with total pv rooftop capacity per grid district.
1568
1569
    Parameters
1570
    -----------
1571
    scenario : str
1572
        Scenario name.
1573
    Returns
1574
    -------
1575
    pandas.DataFrame
1576
        DataFrame with total rooftop capacity per mv grid.
1577
    """
1578
    targets = config.datasets()["solar_rooftop"]["targets"]
1579
1580
    sql = f"""
1581
    SELECT bus as bus_id, p_nom as capacity
1582
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1583
    WHERE carrier = 'solar_rooftop'
1584
    AND scn_name = '{scenario}'
1585
    """
1586
1587
    return db.select_dataframe(sql, index_col="bus_id")
1588
1589
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1590
    #
1591
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1592
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1593
    #
1594
    #     capacity = nuts_gdf.building_area.multiply(
1595
    #         cap / nuts_gdf.building_area.sum()
1596
    #     )
1597
    #
1598
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1599
    #         nuts_gdf.index
1600
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1601
    #
1602
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1603
1604
1605
def determine_end_of_life_gens(
1606
    mastr_gdf: gpd.GeoDataFrame,
1607
    scenario_timestamp: pd.Timestamp,
1608
    pv_rooftop_lifetime: pd.Timedelta,
1609
) -> gpd.GeoDataFrame:
1610
    """
1611
    Determine if an old PV system has reached its end of life.
1612
    Parameters
1613
    -----------
1614
    mastr_gdf : geopandas.GeoDataFrame
1615
        GeoDataFrame containing geocoded MaStR data.
1616
    scenario_timestamp : pandas.Timestamp
1617
        Timestamp at which the scenario takes place.
1618
    pv_rooftop_lifetime : pandas.Timedelta
1619
        Average expected lifetime of PV rooftop systems.
1620
    Returns
1621
    -------
1622
    geopandas.GeoDataFrame
1623
        GeoDataFrame containing geocoded MaStR data and info if the system
1624
        has reached its end of life.
1625
    """
1626
    before = mastr_gdf.capacity.sum()
1627
1628
    mastr_gdf = mastr_gdf.assign(
1629
        age=scenario_timestamp - mastr_gdf.start_up_date
1630
    )
1631
1632
    mastr_gdf = mastr_gdf.assign(
1633
        end_of_life=pv_rooftop_lifetime < mastr_gdf.age
1634
    )
1635
1636
    after = mastr_gdf.loc[~mastr_gdf.end_of_life].capacity.sum()
1637
1638
    logger.debug(
1639
        "Determined if pv rooftop systems reached their end of life.\nTotal capacity: "
1640
        f"{before}\nActive capacity: {after}"
1641
    )
1642
1643
    return mastr_gdf
1644
1645
1646
def calculate_max_pv_cap_per_building(
1647
    buildings_gdf: gpd.GeoDataFrame,
1648
    mastr_gdf: gpd.GeoDataFrame,
1649
    pv_cap_per_sq_m: float | int,
1650
    roof_factor: float | int,
1651
) -> gpd.GeoDataFrame:
1652
    """
1653
    Calculate the estimated maximum possible PV capacity per building.
1654
    Parameters
1655
    -----------
1656
    buildings_gdf : geopandas.GeoDataFrame
1657
        GeoDataFrame containing OSM buildings data.
1658
    mastr_gdf : geopandas.GeoDataFrame
1659
        GeoDataFrame containing geocoded MaStR data.
1660
    pv_cap_per_sq_m : float, int
1661
        Average expected, installable PV capacity per square meter.
1662
    roof_factor : float, int
1663
        Average for PV usable roof area share.
1664
    Returns
1665
    -------
1666
    geopandas.GeoDataFrame
1667
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1668
        capacity.
1669
    """
1670
    gdf = (
1671
        buildings_gdf.reset_index()
1672
        .merge(
1673
            mastr_gdf[
1674
                [
1675
                    "capacity",
1676
                    "end_of_life",
1677
                    "building_id",
1678
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1679
                    "Hauptausrichtung",
1680
                    "HauptausrichtungNeigungswinkel",
1681
                ]
1682
            ],
1683
            how="left",
1684
            left_on="id",
1685
            right_on="building_id",
1686
        )
1687
        .set_index("id")
1688
        .drop(columns="building_id")
1689
    )
1690
1691
    return gdf.assign(
1692
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1693
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1694
        bus_id=gdf.bus_id.astype(int),
1695
    )
1696
1697
1698
def calculate_building_load_factor(
1699
    mastr_gdf: gpd.GeoDataFrame,
1700
    buildings_gdf: gpd.GeoDataFrame,
1701
    rounding: int = 4,
1702
) -> gpd.GeoDataFrame:
1703
    """
1704
    Calculate the roof load factor from existing PV systems.
1705
    Parameters
1706
    -----------
1707
    mastr_gdf : geopandas.GeoDataFrame
1708
        GeoDataFrame containing geocoded MaStR data.
1709
    buildings_gdf : geopandas.GeoDataFrame
1710
        GeoDataFrame containing OSM buildings data.
1711
    rounding : int
1712
        Rounding to use for load factor.
1713
    Returns
1714
    -------
1715
    geopandas.GeoDataFrame
1716
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1717
    """
1718
    gdf = mastr_gdf.merge(
1719
        buildings_gdf[["max_cap", "building_area"]].loc[
1720
            ~buildings_gdf["max_cap"].isna()
1721
        ],
1722
        how="left",
1723
        left_on="building_id",
1724
        right_index=True,
1725
    )
1726
1727
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1728
1729
1730
def get_probability_for_property(
1731
    mastr_gdf: gpd.GeoDataFrame,
1732
    cap_range: tuple[int | float, int | float],
1733
    prop: str,
1734
) -> tuple[np.array, np.array]:
1735
    """
1736
    Calculate the probability of the different options of a property of the
1737
    existing PV plants.
1738
    Parameters
1739
    -----------
1740
    mastr_gdf : geopandas.GeoDataFrame
1741
        GeoDataFrame containing geocoded MaStR data.
1742
    cap_range : tuple(int, int)
1743
        Capacity range of PV plants to look at.
1744
    prop : str
1745
        Property to calculate probabilities for. String needs to be in columns
1746
        of mastr_gdf.
1747
    Returns
1748
    -------
1749
    tuple
1750
        numpy.array
1751
            Unique values of property.
1752
        numpy.array
1753
            Probabilties per unique value.
1754
    """
1755
    cap_range_gdf = mastr_gdf.loc[
1756
        (mastr_gdf.capacity > cap_range[0])
1757
        & (mastr_gdf.capacity <= cap_range[1])
1758
    ]
1759
1760
    if prop == "load_factor":
1761
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1762
1763
    count = Counter(
1764
        cap_range_gdf[prop].loc[
1765
            ~cap_range_gdf[prop].isna()
1766
            & ~cap_range_gdf[prop].isnull()
1767
            & ~(cap_range_gdf[prop] == "None")
1768
        ]
1769
    )
1770
1771
    values = np.array(list(count.keys()))
1772
    probabilities = np.fromiter(count.values(), dtype=float)
1773
    probabilities = probabilities / np.sum(probabilities)
1774
1775
    return values, probabilities
1776
1777
1778
@timer_func
1779
def probabilities(
1780
    mastr_gdf: gpd.GeoDataFrame,
1781
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1782
    properties: list[str] | None = None,
1783
) -> dict:
1784
    """
1785
    Calculate the probability of the different options of properties of the
1786
    existing PV plants.
1787
    Parameters
1788
    -----------
1789
    mastr_gdf : geopandas.GeoDataFrame
1790
        GeoDataFrame containing geocoded MaStR data.
1791
    cap_ranges : list(tuple(int, int))
1792
        List of capacity ranges to distinguish between. The first tuple should
1793
        start with a zero and the last one should end with infinite.
1794
    properties : list(str)
1795
        List of properties to calculate probabilities for. Strings needs to be
1796
        in columns of mastr_gdf.
1797
    Returns
1798
    -------
1799
    dict
1800
        Dictionary with values and probabilities per capacity range.
1801
    """
1802
    if cap_ranges is None:
1803
        cap_ranges = [
1804
            (0, 30),
1805
            (30, 100),
1806
            (100, float("inf")),
1807
        ]
1808
    if properties is None:
1809
        properties = [
1810
            "EinheitlicheAusrichtungUndNeigungswinkel",
1811
            "Hauptausrichtung",
1812
            "HauptausrichtungNeigungswinkel",
1813
            "load_factor",
1814
        ]
1815
1816
    prob_dict = {}
1817
1818
    for cap_range in cap_ranges:
1819
        prob_dict[cap_range] = {
1820
            "values": {},
1821
            "probabilities": {},
1822
        }
1823
1824
        for prop in properties:
1825
            v, p = get_probability_for_property(
1826
                mastr_gdf,
1827
                cap_range,
1828
                prop,
1829
            )
1830
1831
            prob_dict[cap_range]["values"][prop] = v
1832
            prob_dict[cap_range]["probabilities"][prop] = p
1833
1834
    return prob_dict
1835
1836
1837
def cap_share_per_cap_range(
1838
    mastr_gdf: gpd.GeoDataFrame,
1839
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1840
) -> dict[tuple[int | float, int | float], float]:
1841
    """
1842
    Calculate the share of PV capacity from the total PV capacity within
1843
    capacity ranges.
1844
    Parameters
1845
    -----------
1846
    mastr_gdf : geopandas.GeoDataFrame
1847
        GeoDataFrame containing geocoded MaStR data.
1848
    cap_ranges : list(tuple(int, int))
1849
        List of capacity ranges to distinguish between. The first tuple should
1850
        start with a zero and the last one should end with infinite.
1851
    Returns
1852
    -------
1853
    dict
1854
        Dictionary with share of PV capacity from the total PV capacity within
1855
        capacity ranges.
1856
    """
1857
    if cap_ranges is None:
1858
        cap_ranges = [
1859
            (0, 30),
1860
            (30, 100),
1861
            (100, float("inf")),
1862
        ]
1863
1864
    cap_share_dict = {}
1865
1866
    total_cap = mastr_gdf.capacity.sum()
1867
1868
    for cap_range in cap_ranges:
1869
        cap_share = (
1870
            mastr_gdf.loc[
1871
                (mastr_gdf.capacity > cap_range[0])
1872
                & (mastr_gdf.capacity <= cap_range[1])
1873
            ].capacity.sum()
1874
            / total_cap
1875
        )
1876
1877
        cap_share_dict[cap_range] = cap_share
1878
1879
    return cap_share_dict
1880
1881
1882
def mean_load_factor_per_cap_range(
1883
    mastr_gdf: gpd.GeoDataFrame,
1884
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1885
) -> dict[tuple[int | float, int | float], float]:
1886
    """
1887
    Calculate the mean roof load factor per capacity range from existing PV
1888
    plants.
1889
    Parameters
1890
    -----------
1891
    mastr_gdf : geopandas.GeoDataFrame
1892
        GeoDataFrame containing geocoded MaStR data.
1893
    cap_ranges : list(tuple(int, int))
1894
        List of capacity ranges to distinguish between. The first tuple should
1895
        start with a zero and the last one should end with infinite.
1896
    Returns
1897
    -------
1898
    dict
1899
        Dictionary with mean roof load factor per capacity range.
1900
    """
1901
    if cap_ranges is None:
1902
        cap_ranges = [
1903
            (0, 30),
1904
            (30, 100),
1905
            (100, float("inf")),
1906
        ]
1907
1908
    load_factor_dict = {}
1909
1910
    for cap_range in cap_ranges:
1911
        load_factor = mastr_gdf.loc[
1912
            (mastr_gdf.load_factor <= 1)
1913
            & (mastr_gdf.capacity > cap_range[0])
1914
            & (mastr_gdf.capacity <= cap_range[1])
1915
        ].load_factor.mean()
1916
1917
        load_factor_dict[cap_range] = load_factor
1918
1919
    return load_factor_dict
1920
1921
1922
def building_area_range_per_cap_range(
1923
    mastr_gdf: gpd.GeoDataFrame,
1924
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1925
    min_building_size: int | float = 10.0,
1926
    upper_quantile: float = 0.95,
1927
    lower_quantile: float = 0.05,
1928
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1929
    """
1930
    Estimate normal building area range per capacity range.
1931
    Calculate the mean roof load factor per capacity range from existing PV
1932
    plants.
1933
    Parameters
1934
    -----------
1935
    mastr_gdf : geopandas.GeoDataFrame
1936
        GeoDataFrame containing geocoded MaStR data.
1937
    cap_ranges : list(tuple(int, int))
1938
        List of capacity ranges to distinguish between. The first tuple should
1939
        start with a zero and the last one should end with infinite.
1940
    min_building_size : int, float
1941
        Minimal building size to consider for PV plants.
1942
    upper_quantile : float
1943
        Upper quantile to estimate maximum building size per capacity range.
1944
    lower_quantile : float
1945
        Lower quantile to estimate minimum building size per capacity range.
1946
    Returns
1947
    -------
1948
    dict
1949
        Dictionary with estimated normal building area range per capacity
1950
        range.
1951
    """
1952
    if cap_ranges is None:
1953
        cap_ranges = [
1954
            (0, 30),
1955
            (30, 100),
1956
            (100, float("inf")),
1957
        ]
1958
1959
    building_area_range_dict = {}
1960
1961
    n_ranges = len(cap_ranges)
1962
1963
    for count, cap_range in enumerate(cap_ranges):
1964
        cap_range_gdf = mastr_gdf.loc[
1965
            (mastr_gdf.capacity > cap_range[0])
1966
            & (mastr_gdf.capacity <= cap_range[1])
1967
        ]
1968
1969
        if count == 0:
1970
            building_area_range_dict[cap_range] = (
1971
                min_building_size,
1972
                cap_range_gdf.building_area.quantile(upper_quantile),
1973
            )
1974
        elif count == n_ranges - 1:
1975
            building_area_range_dict[cap_range] = (
1976
                cap_range_gdf.building_area.quantile(lower_quantile),
1977
                float("inf"),
1978
            )
1979
        else:
1980
            building_area_range_dict[cap_range] = (
1981
                cap_range_gdf.building_area.quantile(lower_quantile),
1982
                cap_range_gdf.building_area.quantile(upper_quantile),
1983
            )
1984
1985
    values = list(building_area_range_dict.values())
1986
1987
    building_area_range_normed_dict = {}
1988
1989
    for count, (cap_range, (min_area, max_area)) in enumerate(
1990
        building_area_range_dict.items()
1991
    ):
1992
        if count == 0:
1993
            building_area_range_normed_dict[cap_range] = (
1994
                min_area,
1995
                np.mean((values[count + 1][0], max_area)),
1996
            )
1997
        elif count == n_ranges - 1:
1998
            building_area_range_normed_dict[cap_range] = (
1999
                np.mean((values[count - 1][1], min_area)),
2000
                max_area,
2001
            )
2002
        else:
2003
            building_area_range_normed_dict[cap_range] = (
2004
                np.mean((values[count - 1][1], min_area)),
2005
                np.mean((values[count + 1][0], max_area)),
2006
            )
2007
2008
    return building_area_range_normed_dict
2009
2010
2011
@timer_func
2012
def desaggregate_pv_in_mv_grid(
2013
    buildings_gdf: gpd.GeoDataFrame,
2014
    pv_cap: float | int,
2015
    **kwargs,
2016
) -> gpd.GeoDataFrame:
2017
    """
2018
    Desaggregate PV capacity on buildings within a given grid district.
2019
    Parameters
2020
    -----------
2021
    buildings_gdf : geopandas.GeoDataFrame
2022
        GeoDataFrame containing buildings within the grid district.
2023
    pv_cap : float, int
2024
        PV capacity to desaggregate.
2025
    Other Parameters
2026
    -----------
2027
    prob_dict : dict
2028
        Dictionary with values and probabilities per capacity range.
2029
    cap_share_dict : dict
2030
        Dictionary with share of PV capacity from the total PV capacity within
2031
        capacity ranges.
2032
    building_area_range_dict : dict
2033
        Dictionary with estimated normal building area range per capacity
2034
        range.
2035
    load_factor_dict : dict
2036
        Dictionary with mean roof load factor per capacity range.
2037
    seed : int
2038
        Seed to use for random operations with NumPy and pandas.
2039
    pv_cap_per_sq_m : float, int
2040
        Average expected, installable PV capacity per square meter.
2041
    Returns
2042
    -------
2043
    geopandas.GeoDataFrame
2044
        GeoDataFrame containing OSM building data with desaggregated PV
2045
        plants.
2046
    """
2047
    bus_id = int(buildings_gdf.bus_id.iat[0])
2048
2049
    rng = default_rng(seed=kwargs["seed"])
2050
    random_state = RandomState(seed=kwargs["seed"])
2051
2052
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2053
2054
    for cap_range, share in kwargs["cap_share_dict"].items():
2055
        pv_cap_range = pv_cap * share
2056
2057
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2058
2059
        cap_range_buildings_gdf = buildings_gdf.loc[
2060
            ~buildings_gdf.index.isin(results_df.index)
2061
            & (buildings_gdf.building_area > b_area_min)
2062
            & (buildings_gdf.building_area <= b_area_max)
2063
        ]
2064
2065
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2066
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2067
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2068
            load_factor=np.nan,
2069
            capacity=np.nan,
2070
        )
2071
2072
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2073
2074
        if total_mean_cap == 0:
2075
            logger.warning(
2076
                f"There are no matching roof for capacity range {cap_range} "
2077
                f"kW in grid {bus_id}. Using all buildings as fallback."
2078
            )
2079
2080
            cap_range_buildings_gdf = buildings_gdf.loc[
2081
                ~buildings_gdf.index.isin(results_df.index)
2082
            ]
2083
2084
            if len(cap_range_buildings_gdf) == 0:
2085
                logger.warning(
2086
                    "There are no roofes available for capacity range "
2087
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2088
                )
2089
                cap_range_buildings_gdf = buildings_gdf.copy()
2090
2091
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2092
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2093
                load_factor=np.nan,
2094
                capacity=np.nan,
2095
            )
2096
2097
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2098
2099
        elif total_mean_cap < pv_cap_range:
2100
            logger.warning(
2101
                f"Average roof utilization of the roof area in grid {bus_id} "
2102
                f"and capacity range {cap_range} kW is not sufficient. The "
2103
                "roof utilization will be above average."
2104
            )
2105
2106
        frac = max(
2107
            pv_cap_range / total_mean_cap,
2108
            1 / len(cap_range_buildings_gdf),
2109
        )
2110
2111
        samples_gdf = cap_range_buildings_gdf.sample(
2112
            frac=min(1, frac),
2113
            random_state=random_state,
2114
        )
2115
2116
        cap_range_dict = kwargs["prob_dict"][cap_range]
2117
2118
        values_dict = cap_range_dict["values"]
2119
        p_dict = cap_range_dict["probabilities"]
2120
2121
        load_factors = rng.choice(
2122
            a=values_dict["load_factor"],
2123
            size=len(samples_gdf),
2124
            p=p_dict["load_factor"],
2125
        )
2126
2127
        samples_gdf = samples_gdf.assign(
2128
            load_factor=load_factors,
2129
            capacity=(
2130
                samples_gdf.building_area
2131
                * load_factors
2132
                * kwargs["pv_cap_per_sq_m"]
2133
            ).clip(lower=0.4),
2134
        )
2135
2136
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2137
2138
        samples_gdf = samples_gdf.assign(
2139
            capacity=(samples_gdf.capacity * missing_factor),
2140
            load_factor=(samples_gdf.load_factor * missing_factor),
2141
        )
2142
2143
        assert np.isclose(
2144
            samples_gdf.capacity.sum(),
2145
            pv_cap_range,
2146
            rtol=1e-03,
2147
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2148
2149
        results_df = pd.concat(
2150
            [
2151
                results_df,
2152
                samples_gdf,
2153
            ],
2154
        )
2155
2156
    total_missing_factor = pv_cap / results_df.capacity.sum()
2157
2158
    results_df = results_df.assign(
2159
        capacity=(results_df.capacity * total_missing_factor),
2160
    )
2161
2162
    assert np.isclose(
2163
        results_df.capacity.sum(),
2164
        pv_cap,
2165
        rtol=1e-03,
2166
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2167
2168
    return gpd.GeoDataFrame(
2169
        results_df,
2170
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2054 is not entered. Are you sure this can never be the case?
Loading history...
2171
        geometry="geom",
2172
    )
2173
2174
2175
@timer_func
2176
def desaggregate_pv(
2177
    buildings_gdf: gpd.GeoDataFrame,
2178
    cap_df: pd.DataFrame,
2179
    **kwargs,
2180
) -> gpd.GeoDataFrame:
2181
    """
2182
    Desaggregate PV capacity on buildings within a given grid district.
2183
    Parameters
2184
    -----------
2185
    buildings_gdf : geopandas.GeoDataFrame
2186
        GeoDataFrame containing OSM buildings data.
2187
    cap_df : pandas.DataFrame
2188
        DataFrame with total rooftop capacity per mv grid.
2189
    Other Parameters
2190
    -----------
2191
    prob_dict : dict
2192
        Dictionary with values and probabilities per capacity range.
2193
    cap_share_dict : dict
2194
        Dictionary with share of PV capacity from the total PV capacity within
2195
        capacity ranges.
2196
    building_area_range_dict : dict
2197
        Dictionary with estimated normal building area range per capacity
2198
        range.
2199
    load_factor_dict : dict
2200
        Dictionary with mean roof load factor per capacity range.
2201
    seed : int
2202
        Seed to use for random operations with NumPy and pandas.
2203
    pv_cap_per_sq_m : float, int
2204
        Average expected, installable PV capacity per square meter.
2205
    Returns
2206
    -------
2207
    geopandas.GeoDataFrame
2208
        GeoDataFrame containing OSM building data with desaggregated PV
2209
        plants.
2210
    """
2211
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2212
2213
    building_bus_ids = set(buildings_gdf.bus_id)
2214
    cap_bus_ids = set(cap_df.index)
2215
2216
    logger.debug(
2217
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from capacity: "
2218
        f"{len(cap_bus_ids)}"
2219
    )
2220
2221
    if len(building_bus_ids) > len(cap_bus_ids):
2222
        missing = building_bus_ids - cap_bus_ids
2223
    else:
2224
        missing = cap_bus_ids - building_bus_ids
2225
2226
    logger.debug(str(missing))
2227
2228
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2229
2230
    for bus_id in buildings_gdf.bus_id.unique():
2231
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2232
2233
        pv_installed_gdf = buildings_grid_gdf.loc[
2234
            ~buildings_grid_gdf.end_of_life
2235
        ]
2236
2237
        pv_installed = pv_installed_gdf.capacity.sum()
2238
2239
        pot_buildings_gdf = buildings_grid_gdf.drop(
2240
            index=pv_installed_gdf.index
2241
        )
2242
2243
        if len(pot_buildings_gdf) == 0:
2244
            logger.error(
2245
                f"In grid {bus_id} there are no potential buildings to allocate "
2246
                "PV capacity to. The grid is skipped. This message should only "
2247
                "appear doing test runs with few buildings."
2248
            )
2249
2250
            continue
2251
2252
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2253
2254
        logger.debug(f"pv_target: {pv_target}")
2255
2256
        pv_missing = pv_target - pv_installed
2257
2258
        if pv_missing <= 0:
2259
            logger.info(
2260
                f"In grid {bus_id} there is more PV installed ({pv_installed: g}) in "
2261
                f"status Quo than allocated within the scenario ({pv_target: g}). No "
2262
                f"new generators are added."
2263
            )
2264
2265
            continue
2266
2267
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2268
            logger.error(
2269
                f"In grid {bus_id} there is less PV potential ("
2270
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV  capacity "
2271
                f"({pv_missing:g} kW). The average roof utilization will be very high."
2272
            )
2273
2274
        gdf = desaggregate_pv_in_mv_grid(
2275
            buildings_gdf=pot_buildings_gdf,
2276
            pv_cap=pv_missing,
2277
            **kwargs,
2278
        )
2279
2280
        logger.debug(f"New cap in grid {bus_id}: {gdf.capacity.sum()}")
2281
        logger.debug(f"Installed cap in grid {bus_id}: {pv_installed}")
2282
        logger.debug(
2283
            f"Total cap in grid {bus_id}: {gdf.capacity.sum() + pv_installed}"
2284
        )
2285
2286
        if not np.isclose(
2287
            gdf.capacity.sum() + pv_installed, pv_target, rtol=1e-3
2288
        ):
2289
            logger.warning(
2290
                f"The desired capacity and actual capacity in grid {bus_id} differ.\n"
2291
                f"Desired cap: {pv_target}\nActual cap: "
2292
                f"{gdf.capacity.sum() + pv_installed}"
2293
            )
2294
2295
        allocated_buildings_gdf = pd.concat(
2296
            [
2297
                allocated_buildings_gdf,
2298
                gdf,
2299
            ]
2300
        )
2301
2302
    logger.debug("Desaggregated scenario.")
2303
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2304
    logger.debug(
2305
        f"Generator capacity: {allocated_buildings_gdf.capacity.sum(): g}"
2306
    )
2307
2308
    return gpd.GeoDataFrame(
2309
        allocated_buildings_gdf,
2310
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2311
        geometry="geom",
2312
    )
2313
2314
2315
@timer_func
2316
def add_buildings_meta_data(
2317
    buildings_gdf: gpd.GeoDataFrame,
2318
    prob_dict: dict,
2319
    seed: int,
2320
) -> gpd.GeoDataFrame:
2321
    """
2322
    Randomly add additional metadata to desaggregated PV plants.
2323
    Parameters
2324
    -----------
2325
    buildings_gdf : geopandas.GeoDataFrame
2326
        GeoDataFrame containing OSM buildings data with desaggregated PV
2327
        plants.
2328
    prob_dict : dict
2329
        Dictionary with values and probabilities per capacity range.
2330
    seed : int
2331
        Seed to use for random operations with NumPy and pandas.
2332
    Returns
2333
    -------
2334
    geopandas.GeoDataFrame
2335
        GeoDataFrame containing OSM building data with desaggregated PV
2336
        plants.
2337
    """
2338
    rng = default_rng(seed=seed)
2339
    buildings_gdf = buildings_gdf.reset_index().rename(
2340
        columns={
2341
            "index": "building_id",
2342
        }
2343
    )
2344
2345
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2346
        cap_range_gdf = buildings_gdf.loc[
2347
            (buildings_gdf.capacity >= min_cap)
2348
            & (buildings_gdf.capacity < max_cap)
2349
        ]
2350
2351
        for key, values in cap_range_prob_dict["values"].items():
2352
            if key == "load_factor":
2353
                continue
2354
2355
            gdf = cap_range_gdf.loc[
2356
                cap_range_gdf[key].isna()
2357
                | cap_range_gdf[key].isnull()
2358
                | (cap_range_gdf[key] == "None")
2359
            ]
2360
2361
            key_vals = rng.choice(
2362
                a=values,
2363
                size=len(gdf),
2364
                p=cap_range_prob_dict["probabilities"][key],
2365
            )
2366
2367
            buildings_gdf.loc[gdf.index, key] = key_vals
2368
2369
    return buildings_gdf
2370
2371
2372
def add_voltage_level(
2373
    buildings_gdf: gpd.GeoDataFrame,
2374
) -> gpd.GeoDataFrame:
2375
    """
2376
    Add voltage level derived from generator capacity to the power plants.
2377
    Parameters
2378
    -----------
2379
    buildings_gdf : geopandas.GeoDataFrame
2380
        GeoDataFrame containing OSM buildings data with desaggregated PV
2381
        plants.
2382
    Returns
2383
    -------
2384
    geopandas.GeoDataFrame
2385
        GeoDataFrame containing OSM building data with voltage level per generator.
2386
    """
2387
2388
    def voltage_levels(p: float) -> int:
2389
        if p < 100:
2390
            return 7
2391
        elif p < 200:
2392
            return 6
2393
        elif p < 5500:
2394
            return 5
2395
        elif p < 20000:
2396
            return 4
2397
        elif p < 120000:
2398
            return 3
2399
        return 1
2400
2401
    return buildings_gdf.assign(
2402
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2403
    )
2404
2405
2406
def add_start_up_date(
2407
    buildings_gdf: gpd.GeoDataFrame,
2408
    start: pd.Timestamp,
2409
    end: pd.Timestamp,
2410
    seed: int,
2411
):
2412
    """
2413
    Randomly and linear add start-up date to new pv generators.
2414
    Parameters
2415
    ----------
2416
    buildings_gdf : geopandas.GeoDataFrame
2417
        GeoDataFrame containing OSM buildings data with desaggregated PV
2418
        plants.
2419
    start : pandas.Timestamp
2420
        Minimum Timestamp to use.
2421
    end : pandas.Timestamp
2422
        Maximum Timestamp to use.
2423
    seed : int
2424
        Seed to use for random operations with NumPy and pandas.
2425
    Returns
2426
    -------
2427
    geopandas.GeoDataFrame
2428
        GeoDataFrame containing OSM buildings data with start-up date added.
2429
    """
2430
    rng = default_rng(seed=seed)
2431
2432
    date_range = pd.date_range(start=start, end=end, freq="1D")
2433
2434
    return buildings_gdf.assign(
2435
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2436
    )
2437
2438
2439
@timer_func
2440
def allocate_scenarios(
2441
    mastr_gdf: gpd.GeoDataFrame,
2442
    buildings_gdf: gpd.GeoDataFrame,
2443
    last_scenario_gdf: gpd.GeoDataFrame,
2444
    scenario: str,
2445
):
2446
    """
2447
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2448
    Parameters
2449
    ----------
2450
    mastr_gdf : geopandas.GeoDataFrame
2451
        GeoDataFrame containing geocoded MaStR data.
2452
    buildings_gdf : geopandas.GeoDataFrame
2453
        GeoDataFrame containing OSM buildings data.
2454
    last_scenario_gdf : geopandas.GeoDataFrame
2455
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2456
        preceding scenario.
2457
    scenario : str
2458
        Scenario to desaggrgate and allocate.
2459
    Returns
2460
    -------
2461
    tuple
2462
        geopandas.GeoDataFrame
2463
            GeoDataFrame containing OSM buildings matched with pv generators.
2464
        pandas.DataFrame
2465
            DataFrame containing pv rooftop capacity per grid id.
2466
    """
2467
    grid_districts_gdf = grid_districts(EPSG)
2468
2469
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
2470
2471
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
2472
        grid_districts_gdf,
2473
        federal_state_gdf,
2474
    )
2475
2476
    buildings_overlay_gdf = add_overlay_id_to_buildings(
2477
        buildings_gdf,
2478
        grid_federal_state_gdf,
2479
    )
2480
2481
    valid_buildings_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
2482
2483
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2484
2485
    logger.debug(
2486
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2487
    )
2488
2489
    last_scenario_gdf = determine_end_of_life_gens(
2490
        last_scenario_gdf,
2491
        SCENARIO_TIMESTAMP[scenario],
2492
        PV_ROOFTOP_LIFETIME,
2493
    )
2494
2495
    buildings_gdf = calculate_max_pv_cap_per_building(
2496
        valid_buildings_gdf,
2497
        last_scenario_gdf,
2498
        PV_CAP_PER_SQ_M,
2499
        ROOF_FACTOR,
2500
    )
2501
2502
    mastr_gdf = calculate_building_load_factor(
2503
        mastr_gdf,
2504
        buildings_gdf,
2505
    )
2506
2507
    probabilities_dict = probabilities(
2508
        mastr_gdf,
2509
        cap_ranges=CAP_RANGES,
2510
    )
2511
2512
    cap_share_dict = cap_share_per_cap_range(
2513
        mastr_gdf,
2514
        cap_ranges=CAP_RANGES,
2515
    )
2516
2517
    load_factor_dict = mean_load_factor_per_cap_range(
2518
        mastr_gdf,
2519
        cap_ranges=CAP_RANGES,
2520
    )
2521
2522
    building_area_range_dict = building_area_range_per_cap_range(
2523
        mastr_gdf,
2524
        cap_ranges=CAP_RANGES,
2525
        min_building_size=MIN_BUILDING_SIZE,
2526
        upper_quantile=UPPER_QUNATILE,
2527
        lower_quantile=LOWER_QUANTILE,
2528
    )
2529
2530
    allocated_buildings_gdf = desaggregate_pv(
2531
        buildings_gdf=buildings_gdf,
2532
        cap_df=cap_per_bus_id_df,
2533
        prob_dict=probabilities_dict,
2534
        cap_share_dict=cap_share_dict,
2535
        building_area_range_dict=building_area_range_dict,
2536
        load_factor_dict=load_factor_dict,
2537
        seed=SEED,
2538
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2539
    )
2540
2541
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2542
2543
    meta_buildings_gdf = frame_to_numeric(
2544
        add_buildings_meta_data(
2545
            allocated_buildings_gdf,
2546
            probabilities_dict,
2547
            SEED,
2548
        )
2549
    )
2550
2551
    return (
2552
        add_start_up_date(
2553
            meta_buildings_gdf,
2554
            start=last_scenario_gdf.start_up_date.max(),
2555
            end=SCENARIO_TIMESTAMP[scenario],
2556
            seed=SEED,
2557
        ),
2558
        cap_per_bus_id_df,
2559
    )
2560
2561
2562
class EgonPowerPlantPvRoofBuildingScenario(Base):
2563
    __tablename__ = "egon_power_plants_pv_roof_building"
2564
    __table_args__ = {"schema": "supply"}
2565
2566
    index = Column(Integer, primary_key=True, index=True)
2567
    scenario = Column(String)
2568
    building_id = Column(Integer)
2569
    gens_id = Column(String, nullable=True)
2570
    capacity = Column(Float)
2571
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2572
    hauptausrichtung = Column(String)
2573
    hauptausrichtung_neigungswinkel = Column(String)
2574
    voltage_level = Column(Integer)
2575
2576
2577
def create_scenario_table(buildings_gdf):
2578
    """Create mapping table pv_unit <-> building for scenario"""
2579
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2580
        bind=engine, checkfirst=True
2581
    )
2582
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2583
        bind=engine, checkfirst=True
2584
    )
2585
2586
    buildings_gdf.rename(columns=COLS_TO_RENAME).assign(
2587
        capacity=buildings_gdf.capacity.div(10**3)  # kW -> MW
2588
    )[COLS_TO_EXPORT].reset_index().to_sql(
2589
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2590
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2591
        con=db.engine(),
2592
        if_exists="append",
2593
        index=False,
2594
    )
2595
2596
2597
def geocode_mastr_data():
2598
    """
2599
    Read PV rooftop data from MaStR CSV
2600
    TODO: the source will be replaced as soon as the MaStR data is available
2601
     in DB.
2602
    """
2603
    mastr_df = mastr_data(
2604
        MASTR_INDEX_COL,
2605
        MASTR_RELEVANT_COLS,
2606
        MASTR_DTYPES,
2607
        MASTR_PARSE_DATES,
2608
    )
2609
2610
    clean_mastr_df = clean_mastr_data(
2611
        mastr_df,
2612
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2613
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2614
        seed=SEED,
2615
        rounding=ROUNDING,
2616
    )
2617
2618
    geocoding_df = geocoding_data(clean_mastr_df)
2619
2620
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2621
2622
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2623
2624
    create_geocoded_table(geocode_gdf)
2625
2626
2627
def pv_rooftop_to_buildings():
2628
    """Main script, executed as task"""
2629
2630
    mastr_gdf = load_mastr_data()
2631
2632
    buildings_gdf = load_building_data()
2633
2634
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2635
        mastr_gdf, buildings_gdf
2636
    )
2637
2638
    all_buildings_gdf = (
2639
        desagg_mastr_gdf.assign(scenario="status_quo")
2640
        .reset_index()
2641
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2642
    )
2643
2644
    scenario_buildings_gdf = all_buildings_gdf.copy()
2645
2646
    cap_per_bus_id_df = pd.DataFrame()
2647
2648
    for scenario in SCENARIOS:
2649
        logger.debug(f"Desaggregating scenario {scenario}.")
2650
        (
2651
            scenario_buildings_gdf,
2652
            cap_per_bus_id_scenario_df,
2653
        ) = allocate_scenarios(  # noqa: F841
2654
            desagg_mastr_gdf,
2655
            desagg_buildings_gdf,
2656
            scenario_buildings_gdf,
2657
            scenario,
2658
        )
2659
2660
        all_buildings_gdf = gpd.GeoDataFrame(
2661
            pd.concat(
2662
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2663
            ),
2664
            crs=scenario_buildings_gdf.crs,
2665
            geometry="geom",
2666
        )
2667
2668
        cap_per_bus_id_df = pd.concat(
2669
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2670
        )
2671
2672
    # export scenario
2673
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2674