Passed
Pull Request — dev (#934)
by
unknown
01:43
created

clean_mastr_data()   B

Complexity

Conditions 2

Size

Total Lines 143
Code Lines 67

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 67
dl 0
loc 143
rs 8.08
c 0
b 0
f 0
cc 2
nop 5

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035"]  # , "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(30 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "building_id",
193
    "gens_id",
194
    "capacity",
195
    "einheitliche_ausrichtung_und_neigungswinkel",
196
    "hauptausrichtung",
197
    "hauptausrichtung_neigungswinkel",
198
    "voltage_level",
199
]
200
201
INCLUDE_SYNTHETIC_BUILDINGS = False
202
ONLY_BUILDINGS_WITH_DEMAND = True
203
204
205
def timer_func(func):
206
    @wraps(func)
207
    def timeit_wrapper(*args, **kwargs):
208
        start_time = perf_counter()
209
        result = func(*args, **kwargs)
210
        end_time = perf_counter()
211
        total_time = end_time - start_time
212
        print(f"Function {func.__name__} took {total_time:.4f} seconds.")
213
        return result
214
215
    return timeit_wrapper
216
217
218
@timer_func
219
def mastr_data(
220
    index_col: str | int | list[str] | list[int],
221
    usecols: list[str],
222
    dtype: dict[str, Any] | None,
223
    parse_dates: list[str] | None,
224
) -> pd.DataFrame:
225
    """
226
    Read MaStR data from csv.
227
228
    Parameters
229
    -----------
230
    index_col : str, int or list of str or int
231
        Column(s) to use as the row labels of the DataFrame.
232
    usecols : list of str
233
        Return a subset of the columns.
234
    dtype : dict of column (str) -> type (any), optional
235
        Data type for data or columns.
236
    parse_dates : list of names (str), optional
237
        Try to parse given columns to datetime.
238
    Returns
239
    -------
240
    pandas.DataFrame
241
        DataFrame containing MaStR data.
242
    """
243
    mastr_path = Path(
244
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
245
    ).resolve()
246
247
    mastr_df = pd.read_csv(
248
        mastr_path,
249
        index_col=index_col,
250
        usecols=usecols,
251
        dtype=dtype,
252
        parse_dates=parse_dates,
253
    )
254
255
    mastr_df = mastr_df.loc[
256
        (mastr_df.StatisikFlag == "B")
257
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
258
        & (mastr_df.Land == "Deutschland")
259
        & (mastr_df.Lage == "BaulicheAnlagen")
260
    ]
261
262
    if (
263
        config.settings()["egon-data"]["--dataset-boundary"]
264
        == "Schleswig-Holstein"
265
    ):
266
        init_len = len(mastr_df)
267
268
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
269
270
        logger.info(
271
            f"Using only MaStR data within Schleswig-Holstein. "
272
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
273
        )
274
275
    logger.debug("MaStR data loaded.")
276
277
    return mastr_df
278
279
280
@timer_func
281
def clean_mastr_data(
282
    mastr_df: pd.DataFrame,
283
    max_realistic_pv_cap: int | float,
284
    min_realistic_pv_cap: int | float,
285
    rounding: int,
286
    seed: int,
287
) -> pd.DataFrame:
288
    """
289
    Clean the MaStR data from implausible data.
290
291
    * Drop MaStR ID duplicates.
292
    * Drop generators with implausible capacities.
293
    * Drop generators without any kind of start-up date.
294
    * Clean up Standort column and capacity.
295
296
    Parameters
297
    -----------
298
    mastr_df : pandas.DataFrame
299
        DataFrame containing MaStR data.
300
    max_realistic_pv_cap : int or float
301
        Maximum capacity, which is considered to be realistic.
302
    min_realistic_pv_cap : int or float
303
        Minimum capacity, which is considered to be realistic.
304
    rounding : int
305
        Rounding to use when cleaning up capacity. E.g. when
306
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
307
    seed : int
308
        Seed to use for random operations with NumPy and pandas.
309
    Returns
310
    -------
311
    pandas.DataFrame
312
        DataFrame containing cleaned MaStR data.
313
    """
314
    init_len = len(mastr_df)
315
316
    # drop duplicates
317
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
318
319
    # drop invalid entries in standort
320
    index_to_drop = mastr_df.loc[
321
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
322
    ].index
323
324
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
325
326
    df = mastr_df[
327
        [
328
            "Bruttoleistung",
329
            "Bruttoleistung_extended",
330
            "Nettonennleistung",
331
            "zugeordneteWirkleistungWechselrichter",
332
            "InstallierteLeistung",
333
        ]
334
    ].round(rounding)
335
336
    # use only the smallest capacity rating if multiple are given
337
    mastr_df = mastr_df.assign(
338
        capacity=[
339
            most_plausible(p_tub, min_realistic_pv_cap)
340
            for p_tub in df.itertuples(index=False)
341
        ]
342
    )
343
344
    # drop generators without any capacity info
345
    # and capacity of zero
346
    # and if the capacity is > 23.5 MW, because
347
    # Germanies largest rooftop PV is 23 MW
348
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
349
    mastr_df = mastr_df.loc[
350
        (~mastr_df.capacity.isna())
351
        & (mastr_df.capacity <= max_realistic_pv_cap)
352
        & (mastr_df.capacity > min_realistic_pv_cap)
353
    ]
354
355
    # get zip and municipality
356
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
357
        mastr_df.Standort.astype(str)
358
        .apply(
359
            zip_and_municipality_from_standort,
360
            args=(VERBOSE,),
361
        )
362
        .tolist(),
363
        index=mastr_df.index,
364
    )
365
366
    # drop invalid entries
367
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
368
369
    # add ", Deutschland" just in case
370
    mastr_df = mastr_df.assign(
371
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
372
    )
373
374
    # get consistent start-up date
375
    mastr_df = mastr_df.assign(
376
        start_up_date=mastr_df.Inbetriebnahmedatum,
377
    )
378
379
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
380
        mastr_df.start_up_date.isna()
381
    ].assign(
382
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
383
            mastr_df.start_up_date.isna()
384
        ]
385
    )
386
387
    # randomly and weighted fill missing start-up dates
388
    pool = mastr_df.loc[
389
        ~mastr_df.start_up_date.isna()
390
    ].start_up_date.to_numpy()
391
392
    size = len(mastr_df) - len(pool)
393
394
    if size > 0:
395
        np.random.seed(seed)
396
397
        choice = np.random.choice(
398
            pool,
399
            size=size,
400
            replace=False,
401
        )
402
403
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
404
            mastr_df.start_up_date.isna()
405
        ].assign(start_up_date=choice)
406
407
        logger.info(
408
            f"Randomly and weigthed added start-up date to {size} generators."
409
        )
410
411
    mastr_df = mastr_df.assign(
412
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
413
    )
414
415
    end_len = len(mastr_df)
416
    logger.debug(
417
        f"Dropped {init_len - end_len} "
418
        f"({((init_len - end_len) / init_len) * 100:g}%)"
419
        f" of {init_len} rows from MaStR DataFrame."
420
    )
421
422
    return mastr_df
423
424
425
def zip_and_municipality_from_standort(
426
    standort: str,
427
    verbose: bool = False,
428
) -> tuple[str, bool]:
429
    """
430
    Get zip code and municipality from Standort string split into a list.
431
    Parameters
432
    -----------
433
    standort : str
434
        Standort as given from MaStR data.
435
    verbose : bool
436
        Logs additional info if True.
437
    Returns
438
    -------
439
    str
440
        Standort with only the zip code and municipality
441
        as well a ', Germany' added.
442
    """
443
    if verbose:
444
        logger.debug(f"Uncleaned String: {standort}")
445
446
    standort_list = standort.split()
447
448
    found = False
449
    count = 0
450
451
    for count, elem in enumerate(standort_list):
452
        if len(elem) != 5:
453
            continue
454
        if not elem.isnumeric():
455
            continue
456
457
        found = True
458
459
        break
460
461
    if found:
462
        cleaned_str = " ".join(standort_list[count:])
463
464
        if verbose:
465
            logger.debug(f"Cleaned String:   {cleaned_str}")
466
467
        return cleaned_str, found
468
469
    logger.warning(
470
        "Couldn't identify zip code. This entry will be dropped."
471
        f" Original standort: {standort}."
472
    )
473
474
    return standort, found
475
476
477
def most_plausible(
478
    p_tub: tuple,
479
    min_realistic_pv_cap: int | float,
480
) -> float:
481
    """
482
    Try to determine the most plausible capacity.
483
    Try to determine the most plausible capacity from a given
484
    generator from MaStR data.
485
    Parameters
486
    -----------
487
    p_tub : tuple
488
        Tuple containing the different capacities given in
489
        the MaStR data.
490
    min_realistic_pv_cap : int or float
491
        Minimum capacity, which is considered to be realistic.
492
    Returns
493
    -------
494
    float
495
        Capacity of the generator estimated as the most realistic.
496
    """
497
    count = Counter(p_tub).most_common(3)
498
499
    if len(count) == 1:
500
        return count[0][0]
501
502
    val1 = count[0][0]
503
    val2 = count[1][0]
504
505
    if len(count) == 2:
506
        min_val = min(val1, val2)
507
        max_val = max(val1, val2)
508
    else:
509
        val3 = count[2][0]
510
511
        min_val = min(val1, val2, val3)
512
        max_val = max(val1, val2, val3)
513
514
    if min_val < min_realistic_pv_cap:
515
        return max_val
516
517
    return min_val
518
519
520
def geocoder(
521
    user_agent: str,
522
    min_delay_seconds: int,
523
) -> RateLimiter:
524
    """
525
    Setup Nominatim geocoding class.
526
    Parameters
527
    -----------
528
    user_agent : str
529
        The app name.
530
    min_delay_seconds : int
531
        Delay in seconds to use between requests to Nominatim.
532
        A minimum of 1 is advised.
533
    Returns
534
    -------
535
    geopy.extra.rate_limiter.RateLimiter
536
        Nominatim RateLimiter geocoding class to use for geocoding.
537
    """
538
    locator = Nominatim(user_agent=user_agent)
539
    return RateLimiter(
540
        locator.geocode,
541
        min_delay_seconds=min_delay_seconds,
542
    )
543
544
545
def geocoding_data(
546
    clean_mastr_df: pd.DataFrame,
547
) -> pd.DataFrame:
548
    """
549
    Setup DataFrame to geocode.
550
    Parameters
551
    -----------
552
    clean_mastr_df : pandas.DataFrame
553
        DataFrame containing cleaned MaStR data.
554
    Returns
555
    -------
556
    pandas.DataFrame
557
        DataFrame containing all unique combinations of
558
        zip codes with municipalities for geocoding.
559
    """
560
    return pd.DataFrame(
561
        data=clean_mastr_df.zip_and_municipality.unique(),
562
        columns=["zip_and_municipality"],
563
    )
564
565
566
@timer_func
567
def geocode_data(
568
    geocoding_df: pd.DataFrame,
569
    ratelimiter: RateLimiter,
570
    epsg: int,
571
) -> gpd.GeoDataFrame:
572
    """
573
    Geocode zip code and municipality.
574
    Extract latitude, longitude and altitude.
575
    Transfrom latitude and longitude to shapely
576
    Point and return a geopandas GeoDataFrame.
577
    Parameters
578
    -----------
579
    geocoding_df : pandas.DataFrame
580
        DataFrame containing all unique combinations of
581
        zip codes with municipalities for geocoding.
582
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
583
        Nominatim RateLimiter geocoding class to use for geocoding.
584
    epsg : int
585
        EPSG ID to use as CRS.
586
    Returns
587
    -------
588
    geopandas.GeoDataFrame
589
        GeoDataFrame containing all unique combinations of
590
        zip codes with municipalities with matching geolocation.
591
    """
592
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
593
594
    geocode_df = geocoding_df.assign(
595
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
596
    )
597
598
    geocode_df = geocode_df.assign(
599
        point=geocode_df.location.apply(
600
            lambda loc: tuple(loc.point) if loc else None
601
        )
602
    )
603
604
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
605
        geocode_df.point.tolist(), index=geocode_df.index
606
    )
607
608
    return gpd.GeoDataFrame(
609
        geocode_df,
610
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
611
        crs=f"EPSG:{epsg}",
612
    )
613
614
615
def merge_geocode_with_mastr(
616
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
617
) -> gpd.GeoDataFrame:
618
    """
619
    Merge geometry to original mastr data.
620
    Parameters
621
    -----------
622
    clean_mastr_df : pandas.DataFrame
623
        DataFrame containing cleaned MaStR data.
624
    geocode_gdf : geopandas.GeoDataFrame
625
        GeoDataFrame containing all unique combinations of
626
        zip codes with municipalities with matching geolocation.
627
    Returns
628
    -------
629
    gepandas.GeoDataFrame
630
        GeoDataFrame containing cleaned MaStR data with
631
        matching geolocation from geocoding.
632
    """
633
    return gpd.GeoDataFrame(
634
        clean_mastr_df.merge(
635
            geocode_gdf[["zip_and_municipality", "geometry"]],
636
            how="left",
637
            left_on="zip_and_municipality",
638
            right_on="zip_and_municipality",
639
        ),
640
        crs=geocode_gdf.crs,
641
    ).set_index(clean_mastr_df.index)
642
643
644
def drop_invalid_entries_from_gdf(
645
    gdf: gpd.GeoDataFrame,
646
) -> gpd.GeoDataFrame:
647
    """
648
    Drop invalid entries from geopandas GeoDataFrame.
649
    TODO: how to omit the logging from geos here???
650
    Parameters
651
    -----------
652
    gdf : geopandas.GeoDataFrame
653
        GeoDataFrame to be checked for validity.
654
    Returns
655
    -------
656
    gepandas.GeoDataFrame
657
        GeoDataFrame with rows with invalid geometries
658
        dropped.
659
    """
660
    valid_gdf = gdf.loc[gdf.is_valid]
661
662
    logger.debug(
663
        f"{len(gdf) - len(valid_gdf)} "
664
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
665
        f"of {len(gdf)} values were invalid and are dropped."
666
    )
667
668
    return valid_gdf
669
670
671
@timer_func
672
def municipality_data() -> gpd.GeoDataFrame:
673
    """
674
    Get municipality data from eGo^n Database.
675
    Returns
676
    -------
677
    gepandas.GeoDataFrame
678
        GeoDataFrame with municipality data.
679
    """
680
    with db.session_scope() as session:
681
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
682
683
    return gpd.read_postgis(
684
        query.statement, query.session.bind, index_col="ags"
685
    )
686
687
688
@timer_func
689
def add_ags_to_gens(
690
    valid_mastr_gdf: gpd.GeoDataFrame,
691
    municipalities_gdf: gpd.GeoDataFrame,
692
) -> gpd.GeoDataFrame:
693
    """
694
    Add information about AGS ID to generators.
695
    Parameters
696
    -----------
697
    valid_mastr_gdf : geopandas.GeoDataFrame
698
        GeoDataFrame with valid and cleaned MaStR data.
699
    municipalities_gdf : geopandas.GeoDataFrame
700
        GeoDataFrame with municipality data.
701
    Returns
702
    -------
703
    gepandas.GeoDataFrame
704
        GeoDataFrame with valid and cleaned MaStR data
705
        with AGS ID added.
706
    """
707
    return valid_mastr_gdf.sjoin(
708
        municipalities_gdf,
709
        how="left",
710
        predicate="intersects",
711
    ).rename(columns={"index_right": "ags"})
712
713
714
def drop_gens_outside_muns(
715
    valid_mastr_gdf: gpd.GeoDataFrame,
716
) -> gpd.GeoDataFrame:
717
    """
718
    Drop all generators outside of municipalities.
719
    Parameters
720
    -----------
721
    valid_mastr_gdf : geopandas.GeoDataFrame
722
        GeoDataFrame with valid and cleaned MaStR data.
723
    Returns
724
    -------
725
    gepandas.GeoDataFrame
726
        GeoDataFrame with valid and cleaned MaStR data
727
        with generatos without an AGS ID dropped.
728
    """
729
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
730
731
    logger.debug(
732
        f"{len(valid_mastr_gdf) - len(gdf)} "
733
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
734
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
735
        " and are therefore dropped."
736
    )
737
738
    return gdf
739
740
741
class EgonMastrPvRoofGeocoded(Base):
742
    __tablename__ = "egon_mastr_pv_roof_geocoded"
743
    __table_args__ = {"schema": "supply"}
744
745
    zip_and_municipality = Column(String, primary_key=True, index=True)
746
    location = Column(String)
747
    point = Column(String)
748
    latitude = Column(Float)
749
    longitude = Column(Float)
750
    altitude = Column(Float)
751
    geometry = Column(Geometry(srid=EPSG))
752
753
754
def create_geocoded_table(geocode_gdf):
755
    """
756
    Create geocoded table mastr pv rooftop
757
    Parameters
758
    -----------
759
    geocode_gdf : geopandas.GeoDataFrame
760
        GeoDataFrame containing geocoding information on pv rooftop locations.
761
    """
762
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
763
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
764
765
    geocode_gdf.to_postgis(
766
        name=EgonMastrPvRoofGeocoded.__table__.name,
767
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
768
        con=db.engine(),
769
        if_exists="append",
770
        index=False,
771
        # dtype={}
772
    )
773
774
775
def geocoded_data_from_db(
776
    epsg: str | int,
777
) -> gpd.GeoDataFrame:
778
    """
779
    Read OSM buildings data from eGo^n Database.
780
    Parameters
781
    -----------
782
    to_crs : pyproj.crs.crs.CRS
783
        CRS to transform geometries to.
784
    Returns
785
    -------
786
    geopandas.GeoDataFrame
787
        GeoDataFrame containing OSM buildings data.
788
    """
789
    with db.session_scope() as session:
790
        query = session.query(
791
            EgonMastrPvRoofGeocoded.zip_and_municipality,
792
            EgonMastrPvRoofGeocoded.geometry,
793
        )
794
795
    return gpd.read_postgis(
796
        query.statement, query.session.bind, geom_col="geometry"
797
    ).to_crs(f"EPSG:{epsg}")
798
799
800
def load_mastr_data():
801
    """Read PV rooftop data from MaStR CSV
802
    Note: the source will be replaced as soon as the MaStR data is available
803
    in DB.
804
    Returns
805
    -------
806
    geopandas.GeoDataFrame
807
        GeoDataFrame containing MaStR data with geocoded locations.
808
    """
809
    mastr_df = mastr_data(
810
        MASTR_INDEX_COL,
811
        MASTR_RELEVANT_COLS,
812
        MASTR_DTYPES,
813
        MASTR_PARSE_DATES,
814
    )
815
816
    clean_mastr_df = clean_mastr_data(
817
        mastr_df,
818
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
819
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
820
        seed=SEED,
821
        rounding=ROUNDING,
822
    )
823
824
    geocode_gdf = geocoded_data_from_db(EPSG)
825
826
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
827
828
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
829
830
    municipalities_gdf = municipality_data()
831
832
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
833
834
    return drop_gens_outside_muns(valid_mastr_gdf)
835
836
837
class OsmBuildingsFiltered(Base):
838
    __tablename__ = "osm_buildings_filtered"
839
    __table_args__ = {"schema": "openstreetmap"}
840
841
    osm_id = Column(BigInteger)
842
    amenity = Column(String)
843
    building = Column(String)
844
    name = Column(String)
845
    geom = Column(Geometry(srid=SRID), index=True)
846
    area = Column(Float)
847
    geom_point = Column(Geometry(srid=SRID), index=True)
848
    tags = Column(HSTORE)
849
    id = Column(BigInteger, primary_key=True, index=True)
850
851
852
@timer_func
853
def osm_buildings(
854
    to_crs: CRS,
855
) -> gpd.GeoDataFrame:
856
    """
857
    Read OSM buildings data from eGo^n Database.
858
    Parameters
859
    -----------
860
    to_crs : pyproj.crs.crs.CRS
861
        CRS to transform geometries to.
862
    Returns
863
    -------
864
    geopandas.GeoDataFrame
865
        GeoDataFrame containing OSM buildings data.
866
    """
867
    with db.session_scope() as session:
868
        query = session.query(
869
            OsmBuildingsFiltered.id,
870
            OsmBuildingsFiltered.area,
871
            OsmBuildingsFiltered.geom_point.label("geom"),
872
        )
873
874
    return gpd.read_postgis(
875
        query.statement, query.session.bind, index_col="id"
876
    ).to_crs(to_crs)
877
878
879
@timer_func
880
def synthetic_buildings(
881
    to_crs: CRS,
882
) -> gpd.GeoDataFrame:
883
    """
884
    Read synthetic buildings data from eGo^n Database.
885
    Parameters
886
    -----------
887
    to_crs : pyproj.crs.crs.CRS
888
        CRS to transform geometries to.
889
    Returns
890
    -------
891
    geopandas.GeoDataFrame
892
        GeoDataFrame containing OSM buildings data.
893
    """
894
    with db.session_scope() as session:
895
        query = session.query(
896
            OsmBuildingsSynthetic.id,
897
            OsmBuildingsSynthetic.area,
898
            OsmBuildingsSynthetic.geom_point.label("geom"),
899
        )
900
901
    return gpd.read_postgis(
902
        query.statement, query.session.bind, index_col="id"
903
    ).to_crs(to_crs)
904
905
906
@timer_func
907
def add_ags_to_buildings(
908
    buildings_gdf: gpd.GeoDataFrame,
909
    municipalities_gdf: gpd.GeoDataFrame,
910
) -> gpd.GeoDataFrame:
911
    """
912
    Add information about AGS ID to buildings.
913
    Parameters
914
    -----------
915
    buildings_gdf : geopandas.GeoDataFrame
916
        GeoDataFrame containing OSM buildings data.
917
    municipalities_gdf : geopandas.GeoDataFrame
918
        GeoDataFrame with municipality data.
919
    Returns
920
    -------
921
    gepandas.GeoDataFrame
922
        GeoDataFrame containing OSM buildings data
923
        with AGS ID added.
924
    """
925
    return buildings_gdf.sjoin(
926
        municipalities_gdf,
927
        how="left",
928
        predicate="intersects",
929
    ).rename(columns={"index_right": "ags"})
930
931
932
def drop_buildings_outside_muns(
933
    buildings_gdf: gpd.GeoDataFrame,
934
) -> gpd.GeoDataFrame:
935
    """
936
    Drop all buildings outside of municipalities.
937
    Parameters
938
    -----------
939
    buildings_gdf : geopandas.GeoDataFrame
940
        GeoDataFrame containing OSM buildings data.
941
    Returns
942
    -------
943
    gepandas.GeoDataFrame
944
        GeoDataFrame containing OSM buildings data
945
        with buildings without an AGS ID dropped.
946
    """
947
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
948
949
    logger.debug(
950
        f"{len(buildings_gdf) - len(gdf)} "
951
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
952
        f"of {len(buildings_gdf)} values are outside of the municipalities "
953
        "and are therefore dropped."
954
    )
955
956
    return gdf
957
958
959
def egon_building_peak_loads():
960
    sql = """
961
    SELECT building_id
962
    FROM demand.egon_building_electricity_peak_loads
963
    WHERE scenario = 'eGon2035'
964
    """
965
966
    return (
967
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
968
    )
969
970
971
@timer_func
972
def load_building_data():
973
    """
974
    Read buildings from DB
975
    Tables:
976
977
    * `openstreetmap.osm_buildings_filtered` (from OSM)
978
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
979
980
    Use column `id` for both as it is unique hence you concat both datasets. If
981
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
982
983
    Returns
984
    -------
985
    gepandas.GeoDataFrame
986
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
987
        dropped.
988
    """
989
990
    municipalities_gdf = municipality_data()
991
992
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
993
994
    if INCLUDE_SYNTHETIC_BUILDINGS:
995
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
996
997
        buildings_gdf = gpd.GeoDataFrame(
998
            pd.concat(
999
                [
1000
                    osm_buildings_gdf,
1001
                    synthetic_buildings_gdf,
1002
                ]
1003
            ),
1004
            geometry="geom",
1005
            crs=osm_buildings_gdf.crs,
1006
        ).rename(columns={"area": "building_area"})
1007
1008
    else:
1009
        buildings_gdf = osm_buildings_gdf.rename(
1010
            columns={"area": "building_area"}
1011
        )
1012
1013
    if ONLY_BUILDINGS_WITH_DEMAND:
1014
        building_ids = egon_building_peak_loads()
1015
1016
        init_len = len(building_ids)
1017
1018
        building_ids = np.intersect1d(
1019
            building_ids, buildings_gdf.index.to_numpy()
1020
        )
1021
1022
        end_len = len(building_ids)
1023
1024
        logger.debug(
1025
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) of IDs within OSM"
1026
            f" buildings."
1027
        )
1028
1029
        buildings_gdf = buildings_gdf.loc[building_ids]
1030
1031
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1032
1033
    logger.debug("Loaded buildings.")
1034
1035
    return drop_buildings_outside_muns(buildings_ags_gdf)
1036
1037
1038
@timer_func
1039
def sort_and_qcut_df(
1040
    df: pd.DataFrame | gpd.GeoDataFrame,
1041
    col: str,
1042
    q: int,
1043
) -> pd.DataFrame | gpd.GeoDataFrame:
1044
    """
1045
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1046
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1047
    Parameters
1048
    -----------
1049
    df : pandas.DataFrame or geopandas.GeoDataFrame
1050
        (Geo)DataFrame to sort and qcut.
1051
    col : str
1052
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1053
    q : int
1054
        Number of quantiles.
1055
    Returns
1056
    -------
1057
    pandas.DataFrame or gepandas.GeoDataFrame
1058
        Sorted and qcut (Geo)DataFrame.
1059
    """
1060
    df = df.sort_values(col, ascending=True)
1061
1062
    return df.assign(
1063
        quant=pd.qcut(
1064
            df[col],
1065
            q=q,
1066
            labels=range(q),
1067
        )
1068
    )
1069
1070
1071
@timer_func
1072
def allocate_pv(
1073
    q_mastr_gdf: gpd.GeoDataFrame,
1074
    q_buildings_gdf: gpd.GeoDataFrame,
1075
    seed: int,
1076
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1077
    """
1078
    Allocate the MaStR pv generators to the OSM buildings.
1079
    This will determine a building for each pv generator if there are more
1080
    buildings than generators within a given AGS. Primarily generators are
1081
    distributed with the same qunatile as the buildings. Multiple assignment
1082
    is excluded.
1083
    Parameters
1084
    -----------
1085
    q_mastr_gdf : geopandas.GeoDataFrame
1086
        GeoDataFrame containing geocoded and qcut MaStR data.
1087
    q_buildings_gdf : geopandas.GeoDataFrame
1088
        GeoDataFrame containing qcut OSM buildings data.
1089
    seed : int
1090
        Seed to use for random operations with NumPy and pandas.
1091
    Returns
1092
    -------
1093
    tuple with two geopandas.GeoDataFrame s
1094
        GeoDataFrame containing MaStR data allocated to building IDs.
1095
        GeoDataFrame containing building data allocated to MaStR IDs.
1096
    """
1097
    rng = default_rng(seed=seed)
1098
1099
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1100
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1101
1102
    ags_list = q_buildings_gdf.ags.unique()
1103
1104
    num_ags = len(ags_list)
1105
1106
    t0 = perf_counter()
1107
1108
    for count, ags in enumerate(ags_list):
1109
1110
        buildings = q_buildings_gdf.loc[
1111
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1112
        ]
1113
        gens = q_mastr_gdf.loc[
1114
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1115
        ]
1116
1117
        len_build = len(buildings)
1118
        len_gens = len(gens)
1119
1120
        if len_build < len_gens:
1121
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1122
            logger.error(
1123
                f"There are {len_gens} generators and only {len_build}"
1124
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1125
                "generators were truncated to match the amount of buildings."
1126
            )
1127
1128
            assert len_build == len(gens)
1129
1130
        for quant in gens.quant.unique():
1131
            q_buildings = buildings.loc[
1132
                (buildings.quant == quant) & (buildings.gens_id.isna())
1133
            ]
1134
            q_gens = gens.loc[gens.quant == quant]
1135
1136
            len_build = len(q_buildings)
1137
            len_gens = len(q_gens)
1138
1139
            if len_build < len_gens:
1140
                delta = len_gens - len_build
1141
1142
                logger.warning(
1143
                    f"There are {len_gens} generators and only {len_build} "
1144
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1145
                    f"buildings from AGS {ags} will be added randomly."
1146
                )
1147
1148
                add_buildings = pd.Index(
1149
                    rng.choice(
1150
                        buildings.loc[
1151
                            (buildings.quant != quant)
1152
                            & (buildings.gens_id.isna())
1153
                        ].index,
1154
                        size=delta,
1155
                        replace=False,
1156
                    )
1157
                )
1158
1159
                q_buildings = buildings.loc[
1160
                    q_buildings.index.append(add_buildings)
1161
                ]
1162
1163
                assert len(q_buildings) == len_gens
1164
1165
            chosen_buildings = pd.Index(
1166
                rng.choice(
1167
                    q_buildings.index,
1168
                    size=len_gens,
1169
                    replace=False,
1170
                )
1171
            )
1172
1173
            # q_mastr_gdf.loc[q_gens.index, "building_id"] = chosen_buildings
1174
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1175
1176
        if count % 100 == 0:
1177
            logger.debug(
1178
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1179
                f"{perf_counter() - t0:g} seconds."
1180
            )
1181
1182
            t0 = perf_counter()
1183
1184
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1185
1186
    q_mastr_gdf.loc[
1187
        assigned_buildings.gens_id, "building_id"
1188
    ] = assigned_buildings.index
1189
1190
    logger.debug("Allocated status quo generators to buildings.")
1191
1192
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1193
1194
1195
def frame_to_numeric(
1196
    df: pd.DataFrame | gpd.GeoDataFrame,
1197
) -> pd.DataFrame | gpd.GeoDataFrame:
1198
    """
1199
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1200
    Parameters
1201
    ----------
1202
    df : pandas.DataFrame or geopandas.GeoDataFrame
1203
    Returns
1204
    -------
1205
    pandas.DataFrame or geopandas.GeoDataFrame
1206
    """
1207
    if str(df.index.dtype) == "object":
1208
        df.index = pd.to_numeric(df.index, errors="ignore")
1209
1210
    for col in df.columns:
1211
        if str(df[col].dtype) == "object":
1212
            df[col] = pd.to_numeric(df[col], errors="ignore")
1213
1214
    return df
1215
1216
1217
def validate_output(
1218
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1219
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1220
) -> None:
1221
    """
1222
    Validate output.
1223
1224
    * Validate that there are exactly as many buildings with a pv system as there are
1225
      pv systems with a building
1226
    * Validate that the building IDs with a pv system are the same building IDs as
1227
      assigned to the pv systems
1228
    * Validate that the pv system IDs with a building are the same pv system IDs as
1229
      assigned to the buildings
1230
1231
    Parameters
1232
    -----------
1233
    desagg_mastr_gdf : geopandas.GeoDataFrame
1234
        GeoDataFrame containing MaStR data allocated to building IDs.
1235
    desagg_buildings_gdf : geopandas.GeoDataFrame
1236
        GeoDataFrame containing building data allocated to MaStR IDs.
1237
    """
1238
    assert len(
1239
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1240
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1241
    assert (
1242
        np.sort(
1243
            desagg_mastr_gdf.loc[
1244
                ~desagg_mastr_gdf.building_id.isna()
1245
            ].building_id.unique()
1246
        )
1247
        == np.sort(
1248
            desagg_buildings_gdf.loc[
1249
                ~desagg_buildings_gdf.gens_id.isna()
1250
            ].index.unique()
1251
        )
1252
    ).all()
1253
    assert (
1254
        np.sort(
1255
            desagg_mastr_gdf.loc[
1256
                ~desagg_mastr_gdf.building_id.isna()
1257
            ].index.unique()
1258
        )
1259
        == np.sort(
1260
            desagg_buildings_gdf.loc[
1261
                ~desagg_buildings_gdf.gens_id.isna()
1262
            ].gens_id.unique()
1263
        )
1264
    ).all()
1265
1266
    logger.debug("Validated output.")
1267
1268
1269
def drop_unallocated_gens(
1270
    gdf: gpd.GeoDataFrame,
1271
) -> gpd.GeoDataFrame:
1272
    """
1273
    Drop generators which did not get allocated.
1274
1275
    Parameters
1276
    -----------
1277
    gdf : geopandas.GeoDataFrame
1278
        GeoDataFrame containing MaStR data allocated to building IDs.
1279
    Returns
1280
    -------
1281
    geopandas.GeoDataFrame
1282
        GeoDataFrame containing MaStR data with generators dropped which did not get
1283
        allocated.
1284
    """
1285
    init_len = len(gdf)
1286
    gdf = gdf.loc[~gdf.building_id.isna()]
1287
    end_len = len(gdf)
1288
1289
    logger.debug(
1290
        f"Dropped {init_len - end_len} "
1291
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1292
        f" of {init_len} unallocated rows from MaStR DataFrame."
1293
    )
1294
1295
    return gdf
1296
1297
1298
@timer_func
1299
def allocate_to_buildings(
1300
    mastr_gdf: gpd.GeoDataFrame,
1301
    buildings_gdf: gpd.GeoDataFrame,
1302
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1303
    """
1304
    Allocate status quo pv rooftop generators to buildings.
1305
    Parameters
1306
    -----------
1307
    mastr_gdf : geopandas.GeoDataFrame
1308
        GeoDataFrame containing MaStR data with geocoded locations.
1309
    buildings_gdf : geopandas.GeoDataFrame
1310
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1311
        dropped.
1312
    Returns
1313
    -------
1314
    tuple with two geopandas.GeoDataFrame s
1315
        GeoDataFrame containing MaStR data allocated to building IDs.
1316
        GeoDataFrame containing building data allocated to MaStR IDs.
1317
    """
1318
    logger.debug("Starting allocation of status quo.")
1319
1320
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1321
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1322
1323
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1324
        q_mastr_gdf, q_buildings_gdf, SEED
1325
    )
1326
1327
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1328
1329
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1330
1331
1332
@timer_func
1333
def grid_districts(
1334
    epsg: int,
1335
) -> gpd.GeoDataFrame:
1336
    """
1337
    Load mv grid district geo data from eGo^n Database as
1338
    geopandas.GeoDataFrame.
1339
    Parameters
1340
    -----------
1341
    epsg : int
1342
        EPSG ID to use as CRS.
1343
    Returns
1344
    -------
1345
    geopandas.GeoDataFrame
1346
        GeoDataFrame containing mv grid district ID and geo shapes data.
1347
    """
1348
    gdf = db.select_geodataframe(
1349
        """
1350
        SELECT bus_id, geom
1351
        FROM grid.egon_mv_grid_district
1352
        ORDER BY bus_id
1353
        """,
1354
        index_col="bus_id",
1355
        geom_col="geom",
1356
        epsg=epsg,
1357
    )
1358
1359
    gdf.index = gdf.index.astype(int)
1360
1361
    logger.debug("Grid districts loaded.")
1362
1363
    return gdf
1364
1365
1366
def scenario_data(
1367
    carrier: str = "solar_rooftop",
1368
    scenario: str = "eGon2035",
1369
) -> pd.DataFrame:
1370
    """
1371
    Get scenario capacity data from eGo^n Database.
1372
    Parameters
1373
    -----------
1374
    carrier : str
1375
        Carrier type to filter table by.
1376
    scenario : str
1377
        Scenario to filter table by.
1378
    Returns
1379
    -------
1380
    geopandas.GeoDataFrame
1381
        GeoDataFrame with scenario capacity data in GW.
1382
    """
1383
    with db.session_scope() as session:
1384
        query = session.query(EgonScenarioCapacities).filter(
1385
            EgonScenarioCapacities.carrier == carrier,
1386
            EgonScenarioCapacities.scenario_name == scenario,
1387
        )
1388
1389
    df = pd.read_sql(
1390
        query.statement, query.session.bind, index_col="index"
1391
    ).sort_index()
1392
1393
    logger.debug("Scenario capacity data loaded.")
1394
1395
    return df
1396
1397
1398 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1399
    __tablename__ = "vg250_lan"
1400
    __table_args__ = {"schema": "boundaries"}
1401
1402
    id = Column(BigInteger, primary_key=True, index=True)
1403
    ade = Column(BigInteger)
1404
    gf = Column(BigInteger)
1405
    bsg = Column(BigInteger)
1406
    ars = Column(String)
1407
    ags = Column(String)
1408
    sdv_ars = Column(String)
1409
    gen = Column(String)
1410
    bez = Column(String)
1411
    ibz = Column(BigInteger)
1412
    bem = Column(String)
1413
    nbd = Column(String)
1414
    sn_l = Column(String)
1415
    sn_r = Column(String)
1416
    sn_k = Column(String)
1417
    sn_v1 = Column(String)
1418
    sn_v2 = Column(String)
1419
    sn_g = Column(String)
1420
    fk_s3 = Column(String)
1421
    nuts = Column(String)
1422
    ars_0 = Column(String)
1423
    ags_0 = Column(String)
1424
    wsk = Column(String)
1425
    debkg_id = Column(String)
1426
    rs = Column(String)
1427
    sdv_rs = Column(String)
1428
    rs_0 = Column(String)
1429
    geometry = Column(Geometry(srid=EPSG), index=True)
1430
1431
1432
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1433
    """
1434
    Get feder state data from eGo^n Database.
1435
    Parameters
1436
    -----------
1437
    to_crs : pyproj.crs.crs.CRS
1438
        CRS to transform geometries to.
1439
    Returns
1440
    -------
1441
    geopandas.GeoDataFrame
1442
        GeoDataFrame with federal state data.
1443
    """
1444
    with db.session_scope() as session:
1445
        query = session.query(
1446
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1447
        )
1448
1449
    gdf = gpd.read_postgis(
1450
        query.statement, query.session.bind, index_col="id"
1451
    ).to_crs(to_crs)
1452
1453
    logger.debug("Federal State data loaded.")
1454
1455
    return gdf
1456
1457
1458
@timer_func
1459
def overlay_grid_districts_with_counties(
1460
    mv_grid_district_gdf: gpd.GeoDataFrame,
1461
    federal_state_gdf: gpd.GeoDataFrame,
1462
) -> gpd.GeoDataFrame:
1463
    """
1464
    Calculate the intersections of mv grid districts and counties.
1465
    Parameters
1466
    -----------
1467
    mv_grid_district_gdf : gpd.GeoDataFrame
1468
        GeoDataFrame containing mv grid district ID and geo shapes data.
1469
    federal_state_gdf : gpd.GeoDataFrame
1470
        GeoDataFrame with federal state data.
1471
    Returns
1472
    -------
1473
    geopandas.GeoDataFrame
1474
        GeoDataFrame containing OSM buildings data.
1475
    """
1476
    logger.debug(
1477
        "Calculating intersection overlay between mv grid districts and "
1478
        "counties. This may take a while..."
1479
    )
1480
1481
    gdf = gpd.overlay(
1482
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1483
        mv_grid_district_gdf.reset_index(),
1484
        how="intersection",
1485
        keep_geom_type=True,
1486
    )
1487
1488
    logger.debug("Done!")
1489
1490
    return gdf
1491
1492
1493
@timer_func
1494
def add_overlay_id_to_buildings(
1495
    buildings_gdf: gpd.GeoDataFrame,
1496
    grid_federal_state_gdf: gpd.GeoDataFrame,
1497
) -> gpd.GeoDataFrame:
1498
    """
1499
    Add information about overlay ID to buildings.
1500
    Parameters
1501
    -----------
1502
    buildings_gdf : geopandas.GeoDataFrame
1503
        GeoDataFrame containing OSM buildings data.
1504
    grid_federal_state_gdf : geopandas.GeoDataFrame
1505
        GeoDataFrame with intersection shapes between counties and grid districts.
1506
    Returns
1507
    -------
1508
    geopandas.GeoDataFrame
1509
        GeoDataFrame containing OSM buildings data with overlay ID added.
1510
    """
1511
    gdf = (
1512
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1513
        .sjoin(
1514
            grid_federal_state_gdf,
1515
            how="left",
1516
            predicate="intersects",
1517
        )
1518
        .rename(columns={"index_right": "overlay_id"})
1519
    )
1520
1521
    logger.debug("Added overlay ID to OSM buildings.")
1522
1523
    return gdf
1524
1525
1526
def drop_buildings_outside_grids(
1527
    buildings_gdf: gpd.GeoDataFrame,
1528
) -> gpd.GeoDataFrame:
1529
    """
1530
    Drop all buildings outside of grid areas.
1531
    Parameters
1532
    -----------
1533
    buildings_gdf : geopandas.GeoDataFrame
1534
        GeoDataFrame containing OSM buildings data.
1535
    Returns
1536
    -------
1537
    gepandas.GeoDataFrame
1538
        GeoDataFrame containing OSM buildings data
1539
        with buildings without an bus ID dropped.
1540
    """
1541
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1542
1543
    logger.debug(
1544
        f"{len(buildings_gdf) - len(gdf)} "
1545
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1546
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1547
        "and are therefore dropped."
1548
    )
1549
1550
    return gdf
1551
1552
1553
def cap_per_bus_id(
1554
    scenario: str,
1555
) -> pd.DataFrame:
1556
    """
1557
    Get table with total pv rooftop capacity per grid district.
1558
1559
    Parameters
1560
    -----------
1561
    scenario : str
1562
        Scenario name.
1563
    Returns
1564
    -------
1565
    pandas.DataFrame
1566
        DataFrame with total rooftop capacity per mv grid.
1567
    """
1568
    targets = config.datasets()["solar_rooftop"]["targets"]
1569
1570
    sql = f"""
1571
    SELECT bus as bus_id, p_nom as capacity
1572
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1573
    WHERE carrier = 'solar_rooftop'
1574
    AND scn_name = '{scenario}'
1575
    """
1576
1577
    return db.select_dataframe(sql, index_col="bus_id")
1578
1579
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1580
    #
1581
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1582
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1583
    #
1584
    #     capacity = nuts_gdf.building_area.multiply(
1585
    #         cap / nuts_gdf.building_area.sum()
1586
    #     )
1587
    #
1588
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1589
    #         nuts_gdf.index
1590
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1591
    #
1592
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1593
1594
1595
def determine_end_of_life_gens(
1596
    mastr_gdf: gpd.GeoDataFrame,
1597
    scenario_timestamp: pd.Timestamp,
1598
    pv_rooftop_lifetime: pd.Timedelta,
1599
) -> gpd.GeoDataFrame:
1600
    """
1601
    Determine if an old PV system has reached its end of life.
1602
    Parameters
1603
    -----------
1604
    mastr_gdf : geopandas.GeoDataFrame
1605
        GeoDataFrame containing geocoded MaStR data.
1606
    scenario_timestamp : pandas.Timestamp
1607
        Timestamp at which the scenario takes place.
1608
    pv_rooftop_lifetime : pandas.Timedelta
1609
        Average expected lifetime of PV rooftop systems.
1610
    Returns
1611
    -------
1612
    geopandas.GeoDataFrame
1613
        GeoDataFrame containing geocoded MaStR data and info if the system
1614
        has reached its end of life.
1615
    """
1616
    mastr_gdf = mastr_gdf.assign(
1617
        age=scenario_timestamp - mastr_gdf.start_up_date
1618
    )
1619
1620
    logger.debug("Determined if pv rooftop systems reached their end of life.")
1621
1622
    return mastr_gdf.assign(end_of_life=pv_rooftop_lifetime < mastr_gdf.age)
1623
1624
1625
def calculate_max_pv_cap_per_building(
1626
    buildings_gdf: gpd.GeoDataFrame,
1627
    mastr_gdf: gpd.GeoDataFrame,
1628
    pv_cap_per_sq_m: float | int,
1629
    roof_factor: float | int,
1630
) -> gpd.GeoDataFrame:
1631
    """
1632
    Calculate the estimated maximum possible PV capacity per building.
1633
    Parameters
1634
    -----------
1635
    buildings_gdf : geopandas.GeoDataFrame
1636
        GeoDataFrame containing OSM buildings data.
1637
    mastr_gdf : geopandas.GeoDataFrame
1638
        GeoDataFrame containing geocoded MaStR data.
1639
    pv_cap_per_sq_m : float, int
1640
        Average expected, installable PV capacity per square meter.
1641
    roof_factor : float, int
1642
        Average for PV usable roof area share.
1643
    Returns
1644
    -------
1645
    geopandas.GeoDataFrame
1646
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1647
        capacity.
1648
    """
1649
    gdf = (
1650
        buildings_gdf.reset_index()
1651
        .merge(
1652
            mastr_gdf[
1653
                [
1654
                    "capacity",
1655
                    "end_of_life",
1656
                    "building_id",
1657
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1658
                    "Hauptausrichtung",
1659
                    "HauptausrichtungNeigungswinkel",
1660
                ]
1661
            ],
1662
            how="left",
1663
            left_on="id",
1664
            right_on="building_id",
1665
        )
1666
        .set_index("id")
1667
        .drop(columns="building_id")
1668
    )
1669
1670
    return gdf.assign(
1671
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1672
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1673
        bus_id=gdf.bus_id.astype(int),
1674
    )
1675
1676
1677
def calculate_building_load_factor(
1678
    mastr_gdf: gpd.GeoDataFrame,
1679
    buildings_gdf: gpd.GeoDataFrame,
1680
    rounding: int = 4,
1681
) -> gpd.GeoDataFrame:
1682
    """
1683
    Calculate the roof load factor from existing PV systems.
1684
    Parameters
1685
    -----------
1686
    mastr_gdf : geopandas.GeoDataFrame
1687
        GeoDataFrame containing geocoded MaStR data.
1688
    buildings_gdf : geopandas.GeoDataFrame
1689
        GeoDataFrame containing OSM buildings data.
1690
    rounding : int
1691
        Rounding to use for load factor.
1692
    Returns
1693
    -------
1694
    geopandas.GeoDataFrame
1695
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1696
    """
1697
    gdf = mastr_gdf.merge(
1698
        buildings_gdf[["max_cap", "building_area"]].loc[
1699
            ~buildings_gdf["max_cap"].isna()
1700
        ],
1701
        how="left",
1702
        left_on="building_id",
1703
        right_index=True,
1704
    )
1705
1706
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1707
1708
1709
def get_probability_for_property(
1710
    mastr_gdf: gpd.GeoDataFrame,
1711
    cap_range: tuple[int | float, int | float],
1712
    prop: str,
1713
) -> tuple[np.array, np.array]:
1714
    """
1715
    Calculate the probability of the different options of a property of the
1716
    existing PV plants.
1717
    Parameters
1718
    -----------
1719
    mastr_gdf : geopandas.GeoDataFrame
1720
        GeoDataFrame containing geocoded MaStR data.
1721
    cap_range : tuple(int, int)
1722
        Capacity range of PV plants to look at.
1723
    prop : str
1724
        Property to calculate probabilities for. String needs to be in columns
1725
        of mastr_gdf.
1726
    Returns
1727
    -------
1728
    tuple
1729
        numpy.array
1730
            Unique values of property.
1731
        numpy.array
1732
            Probabilties per unique value.
1733
    """
1734
    cap_range_gdf = mastr_gdf.loc[
1735
        (mastr_gdf.capacity > cap_range[0])
1736
        & (mastr_gdf.capacity <= cap_range[1])
1737
    ]
1738
1739
    if prop == "load_factor":
1740
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1741
1742
    count = Counter(
1743
        cap_range_gdf[prop].loc[
1744
            ~cap_range_gdf[prop].isna()
1745
            & ~cap_range_gdf[prop].isnull()
1746
            & ~(cap_range_gdf[prop] == "None")
1747
        ]
1748
    )
1749
1750
    values = np.array(list(count.keys()))
1751
    probabilities = np.fromiter(count.values(), dtype=float)
1752
    probabilities = probabilities / np.sum(probabilities)
1753
1754
    return values, probabilities
1755
1756
1757
@timer_func
1758
def probabilities(
1759
    mastr_gdf: gpd.GeoDataFrame,
1760
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1761
    properties: list[str] | None = None,
1762
) -> dict:
1763
    """
1764
    Calculate the probability of the different options of properties of the
1765
    existing PV plants.
1766
    Parameters
1767
    -----------
1768
    mastr_gdf : geopandas.GeoDataFrame
1769
        GeoDataFrame containing geocoded MaStR data.
1770
    cap_ranges : list(tuple(int, int))
1771
        List of capacity ranges to distinguish between. The first tuple should
1772
        start with a zero and the last one should end with infinite.
1773
    properties : list(str)
1774
        List of properties to calculate probabilities for. Strings needs to be
1775
        in columns of mastr_gdf.
1776
    Returns
1777
    -------
1778
    dict
1779
        Dictionary with values and probabilities per capacity range.
1780
    """
1781
    if cap_ranges is None:
1782
        cap_ranges = [
1783
            (0, 30),
1784
            (30, 100),
1785
            (100, float("inf")),
1786
        ]
1787
    if properties is None:
1788
        properties = [
1789
            "EinheitlicheAusrichtungUndNeigungswinkel",
1790
            "Hauptausrichtung",
1791
            "HauptausrichtungNeigungswinkel",
1792
            "load_factor",
1793
        ]
1794
1795
    prob_dict = {}
1796
1797
    for cap_range in cap_ranges:
1798
        prob_dict[cap_range] = {
1799
            "values": {},
1800
            "probabilities": {},
1801
        }
1802
1803
        for prop in properties:
1804
            v, p = get_probability_for_property(
1805
                mastr_gdf,
1806
                cap_range,
1807
                prop,
1808
            )
1809
1810
            prob_dict[cap_range]["values"][prop] = v
1811
            prob_dict[cap_range]["probabilities"][prop] = p
1812
1813
    return prob_dict
1814
1815
1816
def cap_share_per_cap_range(
1817
    mastr_gdf: gpd.GeoDataFrame,
1818
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1819
) -> dict[tuple[int | float, int | float], float]:
1820
    """
1821
    Calculate the share of PV capacity from the total PV capacity within
1822
    capacity ranges.
1823
    Parameters
1824
    -----------
1825
    mastr_gdf : geopandas.GeoDataFrame
1826
        GeoDataFrame containing geocoded MaStR data.
1827
    cap_ranges : list(tuple(int, int))
1828
        List of capacity ranges to distinguish between. The first tuple should
1829
        start with a zero and the last one should end with infinite.
1830
    Returns
1831
    -------
1832
    dict
1833
        Dictionary with share of PV capacity from the total PV capacity within
1834
        capacity ranges.
1835
    """
1836
    if cap_ranges is None:
1837
        cap_ranges = [
1838
            (0, 30),
1839
            (30, 100),
1840
            (100, float("inf")),
1841
        ]
1842
1843
    cap_share_dict = {}
1844
1845
    total_cap = mastr_gdf.capacity.sum()
1846
1847
    for cap_range in cap_ranges:
1848
        cap_share = (
1849
            mastr_gdf.loc[
1850
                (mastr_gdf.capacity > cap_range[0])
1851
                & (mastr_gdf.capacity <= cap_range[1])
1852
            ].capacity.sum()
1853
            / total_cap
1854
        )
1855
1856
        cap_share_dict[cap_range] = cap_share
1857
1858
    return cap_share_dict
1859
1860
1861
def mean_load_factor_per_cap_range(
1862
    mastr_gdf: gpd.GeoDataFrame,
1863
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1864
) -> dict[tuple[int | float, int | float], float]:
1865
    """
1866
    Calculate the mean roof load factor per capacity range from existing PV
1867
    plants.
1868
    Parameters
1869
    -----------
1870
    mastr_gdf : geopandas.GeoDataFrame
1871
        GeoDataFrame containing geocoded MaStR data.
1872
    cap_ranges : list(tuple(int, int))
1873
        List of capacity ranges to distinguish between. The first tuple should
1874
        start with a zero and the last one should end with infinite.
1875
    Returns
1876
    -------
1877
    dict
1878
        Dictionary with mean roof load factor per capacity range.
1879
    """
1880
    if cap_ranges is None:
1881
        cap_ranges = [
1882
            (0, 30),
1883
            (30, 100),
1884
            (100, float("inf")),
1885
        ]
1886
1887
    load_factor_dict = {}
1888
1889
    for cap_range in cap_ranges:
1890
        load_factor = mastr_gdf.loc[
1891
            (mastr_gdf.load_factor <= 1)
1892
            & (mastr_gdf.capacity > cap_range[0])
1893
            & (mastr_gdf.capacity <= cap_range[1])
1894
        ].load_factor.mean()
1895
1896
        load_factor_dict[cap_range] = load_factor
1897
1898
    return load_factor_dict
1899
1900
1901
def building_area_range_per_cap_range(
1902
    mastr_gdf: gpd.GeoDataFrame,
1903
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1904
    min_building_size: int | float = 10.0,
1905
    upper_quantile: float = 0.95,
1906
    lower_quantile: float = 0.05,
1907
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1908
    """
1909
    Estimate normal building area range per capacity range.
1910
    Calculate the mean roof load factor per capacity range from existing PV
1911
    plants.
1912
    Parameters
1913
    -----------
1914
    mastr_gdf : geopandas.GeoDataFrame
1915
        GeoDataFrame containing geocoded MaStR data.
1916
    cap_ranges : list(tuple(int, int))
1917
        List of capacity ranges to distinguish between. The first tuple should
1918
        start with a zero and the last one should end with infinite.
1919
    min_building_size : int, float
1920
        Minimal building size to consider for PV plants.
1921
    upper_quantile : float
1922
        Upper quantile to estimate maximum building size per capacity range.
1923
    lower_quantile : float
1924
        Lower quantile to estimate minimum building size per capacity range.
1925
    Returns
1926
    -------
1927
    dict
1928
        Dictionary with estimated normal building area range per capacity
1929
        range.
1930
    """
1931
    if cap_ranges is None:
1932
        cap_ranges = [
1933
            (0, 30),
1934
            (30, 100),
1935
            (100, float("inf")),
1936
        ]
1937
1938
    building_area_range_dict = {}
1939
1940
    n_ranges = len(cap_ranges)
1941
1942
    for count, cap_range in enumerate(cap_ranges):
1943
        cap_range_gdf = mastr_gdf.loc[
1944
            (mastr_gdf.capacity > cap_range[0])
1945
            & (mastr_gdf.capacity <= cap_range[1])
1946
        ]
1947
1948
        if count == 0:
1949
            building_area_range_dict[cap_range] = (
1950
                min_building_size,
1951
                cap_range_gdf.building_area.quantile(upper_quantile),
1952
            )
1953
        elif count == n_ranges - 1:
1954
            building_area_range_dict[cap_range] = (
1955
                cap_range_gdf.building_area.quantile(lower_quantile),
1956
                float("inf"),
1957
            )
1958
        else:
1959
            building_area_range_dict[cap_range] = (
1960
                cap_range_gdf.building_area.quantile(lower_quantile),
1961
                cap_range_gdf.building_area.quantile(upper_quantile),
1962
            )
1963
1964
    values = list(building_area_range_dict.values())
1965
1966
    building_area_range_normed_dict = {}
1967
1968
    for count, (cap_range, (min_area, max_area)) in enumerate(
1969
        building_area_range_dict.items()
1970
    ):
1971
        if count == 0:
1972
            building_area_range_normed_dict[cap_range] = (
1973
                min_area,
1974
                np.mean((values[count + 1][0], max_area)),
1975
            )
1976
        elif count == n_ranges - 1:
1977
            building_area_range_normed_dict[cap_range] = (
1978
                np.mean((values[count - 1][1], min_area)),
1979
                max_area,
1980
            )
1981
        else:
1982
            building_area_range_normed_dict[cap_range] = (
1983
                np.mean((values[count - 1][1], min_area)),
1984
                np.mean((values[count + 1][0], max_area)),
1985
            )
1986
1987
    return building_area_range_normed_dict
1988
1989
1990
@timer_func
1991
def desaggregate_pv_in_mv_grid(
1992
    buildings_gdf: gpd.GeoDataFrame,
1993
    pv_cap: float | int,
1994
    **kwargs,
1995
) -> gpd.GeoDataFrame:
1996
    """
1997
    Desaggregate PV capacity on buildings within a given grid district.
1998
    Parameters
1999
    -----------
2000
    buildings_gdf : geopandas.GeoDataFrame
2001
        GeoDataFrame containing buildings within the grid district.
2002
    pv_cap : float, int
2003
        PV capacity to desaggregate.
2004
    Other Parameters
2005
    -----------
2006
    prob_dict : dict
2007
        Dictionary with values and probabilities per capacity range.
2008
    cap_share_dict : dict
2009
        Dictionary with share of PV capacity from the total PV capacity within
2010
        capacity ranges.
2011
    building_area_range_dict : dict
2012
        Dictionary with estimated normal building area range per capacity
2013
        range.
2014
    load_factor_dict : dict
2015
        Dictionary with mean roof load factor per capacity range.
2016
    seed : int
2017
        Seed to use for random operations with NumPy and pandas.
2018
    pv_cap_per_sq_m : float, int
2019
        Average expected, installable PV capacity per square meter.
2020
    Returns
2021
    -------
2022
    geopandas.GeoDataFrame
2023
        GeoDataFrame containing OSM building data with desaggregated PV
2024
        plants.
2025
    """
2026
    bus_id = int(buildings_gdf.bus_id.iat[0])
2027
2028
    rng = default_rng(seed=kwargs["seed"])
2029
    random_state = RandomState(seed=kwargs["seed"])
2030
2031
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2032
2033
    for cap_range, share in kwargs["cap_share_dict"].items():
2034
        pv_cap_range = pv_cap * share
2035
2036
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2037
2038
        cap_range_buildings_gdf = buildings_gdf.loc[
2039
            ~buildings_gdf.index.isin(results_df.index)
2040
            & (buildings_gdf.building_area > b_area_min)
2041
            & (buildings_gdf.building_area <= b_area_max)
2042
        ]
2043
2044
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2045
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2046
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2047
            load_factor=np.nan,
2048
            capacity=np.nan,
2049
        )
2050
2051
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2052
2053
        if total_mean_cap == 0:
2054
            logger.warning(
2055
                f"There are no matching roof for capacity range {cap_range} "
2056
                f"kW in grid {bus_id}. Using all buildings as fallback."
2057
            )
2058
2059
            cap_range_buildings_gdf = buildings_gdf.loc[
2060
                ~buildings_gdf.index.isin(results_df.index)
2061
            ]
2062
2063
            if len(cap_range_buildings_gdf) == 0:
2064
                logger.warning(
2065
                    "There are no roofes available for capacity range "
2066
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2067
                )
2068
                cap_range_buildings_gdf = buildings_gdf.copy()
2069
2070
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2071
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2072
                load_factor=np.nan,
2073
                capacity=np.nan,
2074
            )
2075
2076
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2077
2078
        elif total_mean_cap < pv_cap_range:
2079
            logger.warning(
2080
                f"Average roof utilization of the roof area in grid {bus_id} "
2081
                f"and capacity range {cap_range} kW is not sufficient. The "
2082
                "roof utilization will be above average."
2083
            )
2084
2085
        frac = max(
2086
            pv_cap_range / total_mean_cap,
2087
            1 / len(cap_range_buildings_gdf),
2088
        )
2089
2090
        samples_gdf = cap_range_buildings_gdf.sample(
2091
            frac=min(1, frac),
2092
            random_state=random_state,
2093
        )
2094
2095
        cap_range_dict = kwargs["prob_dict"][cap_range]
2096
2097
        values_dict = cap_range_dict["values"]
2098
        p_dict = cap_range_dict["probabilities"]
2099
2100
        load_factors = rng.choice(
2101
            a=values_dict["load_factor"],
2102
            size=len(samples_gdf),
2103
            p=p_dict["load_factor"],
2104
        )
2105
2106
        samples_gdf = samples_gdf.assign(
2107
            load_factor=load_factors,
2108
            capacity=(
2109
                samples_gdf.building_area
2110
                * load_factors
2111
                * kwargs["pv_cap_per_sq_m"]
2112
            ).clip(lower=0.4),
2113
        )
2114
2115
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2116
2117
        samples_gdf = samples_gdf.assign(
2118
            capacity=(samples_gdf.capacity * missing_factor),
2119
            load_factor=(samples_gdf.load_factor * missing_factor),
2120
        )
2121
2122
        assert np.isclose(
2123
            samples_gdf.capacity.sum(),
2124
            pv_cap_range,
2125
            rtol=1e-03,
2126
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2127
2128
        results_df = pd.concat(
2129
            [
2130
                results_df,
2131
                samples_gdf,
2132
            ],
2133
        )
2134
2135
    total_missing_factor = pv_cap / results_df.capacity.sum()
2136
2137
    results_df = results_df.assign(
2138
        capacity=(results_df.capacity * total_missing_factor),
2139
    )
2140
2141
    assert np.isclose(
2142
        results_df.capacity.sum(),
2143
        pv_cap,
2144
        rtol=1e-03,
2145
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2146
2147
    return gpd.GeoDataFrame(
2148
        results_df,
2149
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2033 is not entered. Are you sure this can never be the case?
Loading history...
2150
        geometry="geom",
2151
    )
2152
2153
2154
@timer_func
2155
def desaggregate_pv(
2156
    buildings_gdf: gpd.GeoDataFrame,
2157
    cap_df: pd.DataFrame,
2158
    **kwargs,
2159
) -> gpd.GeoDataFrame:
2160
    """
2161
    Desaggregate PV capacity on buildings within a given grid district.
2162
    Parameters
2163
    -----------
2164
    buildings_gdf : geopandas.GeoDataFrame
2165
        GeoDataFrame containing OSM buildings data.
2166
    cap_df : pandas.DataFrame
2167
        DataFrame with total rooftop capacity per mv grid.
2168
    Other Parameters
2169
    -----------
2170
    prob_dict : dict
2171
        Dictionary with values and probabilities per capacity range.
2172
    cap_share_dict : dict
2173
        Dictionary with share of PV capacity from the total PV capacity within
2174
        capacity ranges.
2175
    building_area_range_dict : dict
2176
        Dictionary with estimated normal building area range per capacity
2177
        range.
2178
    load_factor_dict : dict
2179
        Dictionary with mean roof load factor per capacity range.
2180
    seed : int
2181
        Seed to use for random operations with NumPy and pandas.
2182
    pv_cap_per_sq_m : float, int
2183
        Average expected, installable PV capacity per square meter.
2184
    Returns
2185
    -------
2186
    geopandas.GeoDataFrame
2187
        GeoDataFrame containing OSM building data with desaggregated PV
2188
        plants.
2189
    """
2190
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2191
2192
    building_bus_ids = set(buildings_gdf.bus_id)
2193
    cap_bus_ids = set(cap_df.index)
2194
2195
    logger.debug(
2196
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from capacity: "
2197
        f"{len(cap_bus_ids)}"
2198
    )
2199
2200
    if len(building_bus_ids) > len(cap_bus_ids):
2201
        missing = building_bus_ids - cap_bus_ids
2202
    else:
2203
        missing = cap_bus_ids - building_bus_ids
2204
2205
    logger.debug(str(missing))
2206
2207
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2208
2209
    for bus_id in buildings_gdf.bus_id.unique():
2210
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2211
2212
        pv_installed_gdf = buildings_grid_gdf.loc[
2213
            ~buildings_grid_gdf.end_of_life
2214
        ]
2215
2216
        pv_installed = pv_installed_gdf.capacity.sum()
2217
2218
        pot_buildings_gdf = buildings_grid_gdf.drop(
2219
            index=pv_installed_gdf.index
2220
        )
2221
2222
        if len(pot_buildings_gdf) == 0:
2223
            logger.error(
2224
                f"In grid {bus_id} there are no potential buildings to allocate "
2225
                "PV capacity to. The grid is skipped. This message should only "
2226
                "appear doing test runs with few buildings."
2227
            )
2228
2229
            continue
2230
2231
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2232
2233
        pv_missing = pv_target - pv_installed
2234
2235
        if pv_missing <= 0:
2236
            logger.info(
2237
                f"In grid {bus_id} there is more PV installed ({pv_installed: g}) in "
2238
                f"status Quo than allocated within the scenario ({pv_target: g}). No "
2239
                f"new generators are added."
2240
            )
2241
2242
            continue
2243
2244
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2245
            logger.error(
2246
                f"In grid {bus_id} there is less PV potential ("
2247
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV "
2248
                f"capacity ({pv_missing:g} kW). The grid is skipped. This message "
2249
                "should only appear doing test runs with few buildings."
2250
            )
2251
2252
            continue
2253
2254
        gdf = desaggregate_pv_in_mv_grid(
2255
            buildings_gdf=pot_buildings_gdf,
2256
            pv_cap=pv_missing,
2257
            **kwargs,
2258
        )
2259
2260
        # init_len = len(allocated_buildings_gdf)
2261
        #
2262
        # init_cap = allocated_buildings_gdf.capacity.sum()
2263
2264
        allocated_buildings_gdf = pd.concat(
2265
            [
2266
                allocated_buildings_gdf,
2267
                gdf,
2268
            ]
2269
        )
2270
2271
        # assert np.isclose(
2272
        #     init_cap + gdf.capacity.sum(),
2273
        #     allocated_buildings_gdf.capacity.sum(),
2274
        # )
2275
        #
2276
        # assert len(allocated_buildings_gdf) == init_len + len(gdf)
2277
        #
2278
        # assert np.isclose(
2279
        #     pv_missing, gdf.capacity.sum(), rtol=1e-03
2280
        # ), f"{pv_missing} != {gdf.capacity.sum()}"
2281
2282
    # assert np.isclose(
2283
    #     cap_df.loc[buildings_gdf.bus_id.unique()].capacity.sum() * 1000,
2284
    #     allocated_buildings_gdf.capacity.sum(),
2285
    #     rtol=1e-03,
2286
    # ), (
2287
    #     f"{cap_df.loc[buildings_gdf.bus_id.unique()].capacity.sum() * 1000} != "
2288
    #     f"{allocated_buildings_gdf.capacity.sum()}"
2289
    # )
2290
2291
    logger.debug("Desaggregated scenario.")
2292
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2293
    logger.debug(
2294
        f"Generator capacity: {allocated_buildings_gdf.capacity.sum(): g}"
2295
    )
2296
2297
    return gpd.GeoDataFrame(
2298
        allocated_buildings_gdf,
2299
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2300
        geometry="geom",
2301
    )
2302
2303
2304
@timer_func
2305
def add_buildings_meta_data(
2306
    buildings_gdf: gpd.GeoDataFrame,
2307
    prob_dict: dict,
2308
    seed: int,
2309
) -> gpd.GeoDataFrame:
2310
    """
2311
    Randomly add additional metadata to desaggregated PV plants.
2312
    Parameters
2313
    -----------
2314
    buildings_gdf : geopandas.GeoDataFrame
2315
        GeoDataFrame containing OSM buildings data with desaggregated PV
2316
        plants.
2317
    prob_dict : dict
2318
        Dictionary with values and probabilities per capacity range.
2319
    seed : int
2320
        Seed to use for random operations with NumPy and pandas.
2321
    Returns
2322
    -------
2323
    geopandas.GeoDataFrame
2324
        GeoDataFrame containing OSM building data with desaggregated PV
2325
        plants.
2326
    """
2327
    rng = default_rng(seed=seed)
2328
    buildings_gdf = buildings_gdf.reset_index().rename(
2329
        columns={
2330
            "index": "building_id",
2331
        }
2332
    )
2333
2334
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2335
        cap_range_gdf = buildings_gdf.loc[
2336
            (buildings_gdf.capacity >= min_cap)
2337
            & (buildings_gdf.capacity < max_cap)
2338
        ]
2339
2340
        for key, values in cap_range_prob_dict["values"].items():
2341
            if key == "load_factor":
2342
                continue
2343
2344
            gdf = cap_range_gdf.loc[
2345
                cap_range_gdf[key].isna()
2346
                | cap_range_gdf[key].isnull()
2347
                | (cap_range_gdf[key] == "None")
2348
            ]
2349
2350
            key_vals = rng.choice(
2351
                a=values,
2352
                size=len(gdf),
2353
                p=cap_range_prob_dict["probabilities"][key],
2354
            )
2355
2356
            buildings_gdf.loc[gdf.index, key] = key_vals
2357
2358
    return buildings_gdf
2359
2360
2361
def add_voltage_level(
2362
    buildings_gdf: gpd.GeoDataFrame,
2363
) -> gpd.GeoDataFrame:
2364
    """
2365
    Add voltage level derived from generator capacity to the power plants.
2366
    Parameters
2367
    -----------
2368
    buildings_gdf : geopandas.GeoDataFrame
2369
        GeoDataFrame containing OSM buildings data with desaggregated PV
2370
        plants.
2371
    Returns
2372
    -------
2373
    geopandas.GeoDataFrame
2374
        GeoDataFrame containing OSM building data with voltage level per generator.
2375
    """
2376
2377
    def voltage_levels(p: float) -> int:
2378
        if p < 100:
2379
            return 7
2380
        elif p < 200:
2381
            return 6
2382
        elif p < 5500:
2383
            return 5
2384
        elif p < 20000:
2385
            return 4
2386
        elif p < 120000:
2387
            return 3
2388
        return 1
2389
2390
    return buildings_gdf.assign(
2391
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2392
    )
2393
2394
2395
def add_start_up_date(
2396
    buildings_gdf: gpd.GeoDataFrame,
2397
    start: pd.Timestamp,
2398
    end: pd.Timestamp,
2399
    seed: int,
2400
):
2401
    """
2402
    Randomly and linear add start-up date to new pv generators.
2403
    Parameters
2404
    ----------
2405
    buildings_gdf : geopandas.GeoDataFrame
2406
        GeoDataFrame containing OSM buildings data with desaggregated PV
2407
        plants.
2408
    start : pandas.Timestamp
2409
        Minimum Timestamp to use.
2410
    end : pandas.Timestamp
2411
        Maximum Timestamp to use.
2412
    seed : int
2413
        Seed to use for random operations with NumPy and pandas.
2414
    Returns
2415
    -------
2416
    geopandas.GeoDataFrame
2417
        GeoDataFrame containing OSM buildings data with start-up date added.
2418
    """
2419
    rng = default_rng(seed=seed)
2420
2421
    date_range = pd.date_range(start=start, end=end, freq="1D")
2422
2423
    return buildings_gdf.assign(
2424
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2425
    )
2426
2427
2428
@timer_func
2429
def allocate_scenarios(
2430
    mastr_gdf: gpd.GeoDataFrame,
2431
    buildings_gdf: gpd.GeoDataFrame,
2432
    last_scenario_gdf: gpd.GeoDataFrame,
2433
    scenario: str,
2434
):
2435
    """
2436
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2437
    Parameters
2438
    ----------
2439
    mastr_gdf : geopandas.GeoDataFrame
2440
        GeoDataFrame containing geocoded MaStR data.
2441
    buildings_gdf : geopandas.GeoDataFrame
2442
        GeoDataFrame containing OSM buildings data.
2443
    last_scenario_gdf : geopandas.GeoDataFrame
2444
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2445
        preceding scenario.
2446
    scenario : str
2447
        Scenario to desaggrgate and allocate.
2448
    Returns
2449
    -------
2450
    tuple
2451
        geopandas.GeoDataFrame
2452
            GeoDataFrame containing OSM buildings matched with pv generators.
2453
        pandas.DataFrame
2454
            DataFrame containing pv rooftop capacity per grid id.
2455
    """
2456
    grid_districts_gdf = grid_districts(EPSG)
2457
2458
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
2459
2460
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
2461
        grid_districts_gdf,
2462
        federal_state_gdf,
2463
    )
2464
2465
    buildings_overlay_gdf = add_overlay_id_to_buildings(
2466
        buildings_gdf,
2467
        grid_federal_state_gdf,
2468
    )
2469
2470
    valid_buildings_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
2471
2472
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2473
2474
    logger.debug(
2475
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2476
    )
2477
2478
    last_scenario_gdf = determine_end_of_life_gens(
2479
        last_scenario_gdf,
2480
        SCENARIO_TIMESTAMP[scenario],
2481
        PV_ROOFTOP_LIFETIME,
2482
    )
2483
2484
    buildings_gdf = calculate_max_pv_cap_per_building(
2485
        valid_buildings_gdf,
2486
        last_scenario_gdf,
2487
        PV_CAP_PER_SQ_M,
2488
        ROOF_FACTOR,
2489
    )
2490
2491
    mastr_gdf = calculate_building_load_factor(
2492
        mastr_gdf,
2493
        buildings_gdf,
2494
    )
2495
2496
    probabilities_dict = probabilities(
2497
        mastr_gdf,
2498
        cap_ranges=CAP_RANGES,
2499
    )
2500
2501
    cap_share_dict = cap_share_per_cap_range(
2502
        mastr_gdf,
2503
        cap_ranges=CAP_RANGES,
2504
    )
2505
2506
    load_factor_dict = mean_load_factor_per_cap_range(
2507
        mastr_gdf,
2508
        cap_ranges=CAP_RANGES,
2509
    )
2510
2511
    building_area_range_dict = building_area_range_per_cap_range(
2512
        mastr_gdf,
2513
        cap_ranges=CAP_RANGES,
2514
        min_building_size=MIN_BUILDING_SIZE,
2515
        upper_quantile=UPPER_QUNATILE,
2516
        lower_quantile=LOWER_QUANTILE,
2517
    )
2518
2519
    allocated_buildings_gdf = desaggregate_pv(
2520
        buildings_gdf=buildings_gdf,
2521
        cap_df=cap_per_bus_id_df,
2522
        prob_dict=probabilities_dict,
2523
        cap_share_dict=cap_share_dict,
2524
        building_area_range_dict=building_area_range_dict,
2525
        load_factor_dict=load_factor_dict,
2526
        seed=SEED,
2527
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2528
    )
2529
2530
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2531
2532
    meta_buildings_gdf = frame_to_numeric(
2533
        add_buildings_meta_data(
2534
            allocated_buildings_gdf,
2535
            probabilities_dict,
2536
            SEED,
2537
        )
2538
    )
2539
2540
    return (
2541
        add_start_up_date(
2542
            meta_buildings_gdf,
2543
            start=last_scenario_gdf.start_up_date.max(),
2544
            end=SCENARIO_TIMESTAMP[scenario],
2545
            seed=SEED,
2546
        ),
2547
        cap_per_bus_id_df,
2548
    )
2549
2550
2551
class EgonPowerPlantPvRoofBuildingScenario(Base):
2552
    __tablename__ = "egon_power_plants_pv_roof_building"
2553
    __table_args__ = {"schema": "supply"}
2554
2555
    index = Column(Integer, primary_key=True, index=True)
2556
    scenario = Column(String)
2557
    building_id = Column(Integer)
2558
    gens_id = Column(String, nullable=True)
2559
    capacity = Column(Float)
2560
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2561
    hauptausrichtung = Column(String)
2562
    hauptausrichtung_neigungswinkel = Column(String)
2563
    voltage_level = Column(Integer)
2564
2565
2566
def create_scenario_table(buildings_gdf):
2567
    """Create mapping table pv_unit <-> building for scenario"""
2568
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2569
        bind=engine, checkfirst=True
2570
    )
2571
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2572
        bind=engine, checkfirst=True
2573
    )
2574
2575
    buildings_gdf.rename(columns=COLS_TO_RENAME)[
2576
        COLS_TO_EXPORT
2577
    ].reset_index().to_sql(
2578
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2579
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2580
        con=db.engine(),
2581
        if_exists="append",
2582
        index=False,
2583
    )
2584
2585
2586
def geocode_mastr_data():
2587
    """
2588
    Read PV rooftop data from MaStR CSV
2589
    TODO: the source will be replaced as soon as the MaStR data is available
2590
     in DB.
2591
    """
2592
    mastr_df = mastr_data(
2593
        MASTR_INDEX_COL,
2594
        MASTR_RELEVANT_COLS,
2595
        MASTR_DTYPES,
2596
        MASTR_PARSE_DATES,
2597
    )
2598
2599
    clean_mastr_df = clean_mastr_data(
2600
        mastr_df,
2601
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2602
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2603
        seed=SEED,
2604
        rounding=ROUNDING,
2605
    )
2606
2607
    geocoding_df = geocoding_data(clean_mastr_df)
2608
2609
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2610
2611
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2612
2613
    create_geocoded_table(geocode_gdf)
2614
2615
2616
def pv_rooftop_to_buildings():
2617
    """Main script, executed as task"""
2618
2619
    mastr_gdf = load_mastr_data()
2620
2621
    buildings_gdf = load_building_data()
2622
2623
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2624
        mastr_gdf, buildings_gdf
2625
    )
2626
2627
    all_buildings_gdf = (
2628
        desagg_mastr_gdf.assign(scenario="status_quo")
2629
        .reset_index()
2630
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2631
    )
2632
2633
    scenario_buildings_gdf = all_buildings_gdf.copy()
2634
2635
    cap_per_bus_id_df = pd.DataFrame()
2636
2637
    for scenario in SCENARIOS:
2638
        logger.debug(f"Desaggregating scenario {scenario}.")
2639
        (
2640
            scenario_buildings_gdf,
2641
            cap_per_bus_id_scenario_df,
2642
        ) = allocate_scenarios(  # noqa: F841
2643
            desagg_mastr_gdf,
2644
            desagg_buildings_gdf,
2645
            scenario_buildings_gdf,
2646
            scenario,
2647
        )
2648
2649
        all_buildings_gdf = gpd.GeoDataFrame(
2650
            pd.concat(
2651
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2652
            ),
2653
            crs=scenario_buildings_gdf.crs,
2654
            geometry="geom",
2655
        )
2656
2657
        cap_per_bus_id_df = pd.concat(
2658
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2659
        )
2660
2661
    # export scenario
2662
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2663