Passed
Pull Request — dev (#1063)
by
unknown
01:34
created

data.datasets.power_plants.pv_rooftop_buildings   F

Complexity

Total Complexity 121

Size/Duplication

Total Lines 2830
Duplicated Lines 1.55 %

Importance

Changes 0
Metric Value
wmc 121
eloc 1274
dl 44
loc 2830
rs 0.8
c 0
b 0
f 0

54 Functions

Rating   Name   Duplication   Size   Complexity  
A egon_building_peak_loads() 0 9 1
B load_building_data() 0 115 3
A get_probability_for_property() 0 46 2
A synthetic_buildings() 0 25 2
A calculate_building_load_factor() 0 31 1
A determine_end_of_life_gens() 0 39 1
A add_start_up_date() 0 30 1
A geocoding_data() 0 18 1
A geocoded_data_from_db() 0 23 2
B add_voltage_level() 12 55 7
A cap_per_bus_id() 0 28 1
A add_overlay_id_to_buildings() 0 32 1
B desaggregate_pv_in_mv_grid() 0 161 5
A calculate_max_pv_cap_per_building() 0 50 1
A add_weather_cell_id() 0 35 2
A add_ags_to_gens() 0 24 1
A drop_buildings_outside_muns() 0 25 1
B clean_mastr_data() 0 143 2
A mastr_data() 0 60 2
A scenario_data() 0 30 2
A geocode_data() 0 46 3
B zip_and_municipality_from_standort() 0 50 7
A merge_geocode_with_mastr() 0 27 1
A drop_buildings_outside_grids() 0 25 1
C building_area_range_per_cap_range() 0 87 8
A municipality_data() 0 14 2
A osm_buildings() 0 25 2
A add_ags_to_buildings() 0 24 1
A timer_func() 0 13 1
A add_buildings_meta_data() 0 55 4
C allocate_pv() 0 121 7
A sort_and_qcut_df() 0 29 1
A federal_state_data() 0 24 2
A geocode_mastr_data() 0 28 1
A create_geocoded_table() 0 17 1
A mean_load_factor_per_cap_range() 0 38 3
C desaggregate_pv() 0 150 7
A grid_districts() 0 32 1
A load_mastr_data() 0 35 1
A frame_to_numeric() 0 20 4
A drop_gens_outside_muns() 0 25 1
A geocoder() 0 22 1
A pv_rooftop_to_buildings() 0 53 2
A allocate_to_buildings() 0 32 1
A add_bus_ids_sq() 0 24 1
A overlay_grid_districts_with_counties() 0 33 1
B allocate_scenarios() 0 104 1
A drop_invalid_entries_from_gdf() 0 25 1
A create_scenario_table() 0 17 1
A validate_output() 0 50 1
A cap_share_per_cap_range() 0 43 3
A drop_unallocated_gens() 0 27 1
A most_plausible() 0 41 4
B probabilities() 0 57 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like data.datasets.power_plants.pv_rooftop_buildings often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate
3
new PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
5
Data cleaning and inference:
6
* Drop duplicates and entries with missing critical data.
7
* Determine most plausible capacity from multiple values given in MaStR data.
8
* Drop generators which don't have any plausible capacity data
9
  (23.5MW > P > 0.1).
10
* Randomly and weighted add a start-up date if it is missing.
11
* Extract zip and municipality from 'Standort' given in MaStR data.
12
* Geocode unique zip and municipality combinations with Nominatim (1 sec
13
  delay). Drop generators for which geocoding failed or which are located
14
  outside the municipalities of Germany.
15
* Add some visual sanity checks for cleaned data.
16
17
Allocation of MaStR data:
18
* Allocate each generator to an existing building from OSM.
19
* Determine the quantile each generator and building is in depending on the
20
  capacity of the generator and the area of the polygon of the building.
21
* Randomly distribute generators within each municipality preferably within
22
  the same building area quantile as the generators are capacity wise.
23
* If not enough buildings exists within a municipality and quantile additional
24
  buildings from other quantiles are chosen randomly.
25
26
Desegregation of pv rooftop scenarios:
27
* The scenario data per federal state is linearly distributed to the mv grid
28
  districts according to the pv rooftop potential per mv grid district.
29
* The rooftop potential is estimated from the building area given from the OSM
30
  buildings.
31
* Grid districts, which are located in several federal states, are allocated
32
  PV capacity according to their respective roof potential in the individual
33
  federal states.
34
* The desegregation of PV plants within a grid districts respects existing
35
  plants from MaStR, which did not reach their end of life.
36
* New PV plants are randomly and weighted generated using a breakdown of MaStR
37
  data as generator basis.
38
* Plant metadata (e.g. plant orientation) is also added random and weighted
39
  from MaStR data as basis.
40
"""
41
from __future__ import annotations
42
43
from collections import Counter
44
from functools import wraps
45
from pathlib import Path
46
from time import perf_counter
47
from typing import Any
48
49
from geoalchemy2 import Geometry
50
from geopy.extra.rate_limiter import RateLimiter
51
from geopy.geocoders import Nominatim
52
from loguru import logger
53
from numpy.random import RandomState, default_rng
54
from pyproj.crs.crs import CRS
55
from sqlalchemy import BigInteger, Column, Float, Integer, String
56
from sqlalchemy.dialects.postgresql import HSTORE
57
from sqlalchemy.ext.declarative import declarative_base
58
import geopandas as gpd
59
import numpy as np
60
import pandas as pd
61
62
from egon.data import config, db
63
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
64
    OsmBuildingsSynthetic,
65
)
66
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
67
from egon.data.datasets.power_plants.mastr import EgonPowerPlantsPv
68
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
69
from egon.data.datasets.zensus_vg250 import Vg250Gem
70
71
engine = db.engine()
72
Base = declarative_base()
73
SEED = int(config.settings()["egon-data"]["--random-seed"])
74
75
# TODO: move to yml
76
# mastr data
77
MASTR_RELEVANT_COLS = [
78
    "EinheitMastrNummer",
79
    "Bruttoleistung",
80
    "Bruttoleistung_extended",
81
    "Nettonennleistung",
82
    "InstallierteLeistung",
83
    "zugeordneteWirkleistungWechselrichter",
84
    "EinheitBetriebsstatus",
85
    "Standort",
86
    "Bundesland",
87
    "Land",
88
    "Landkreis",
89
    "Gemeinde",
90
    "Postleitzahl",
91
    "Ort",
92
    "GeplantesInbetriebnahmedatum",
93
    "Inbetriebnahmedatum",
94
    "GemeinsamerWechselrichterMitSpeicher",
95
    "Lage",
96
    "Leistungsbegrenzung",
97
    "EinheitlicheAusrichtungUndNeigungswinkel",
98
    "Hauptausrichtung",
99
    "HauptausrichtungNeigungswinkel",
100
    "Nebenausrichtung",
101
]
102
103
MASTR_DTYPES = {
104
    "EinheitMastrNummer": str,
105
    "Bruttoleistung": float,
106
    "Bruttoleistung_extended": float,
107
    "Nettonennleistung": float,
108
    "InstallierteLeistung": float,
109
    "zugeordneteWirkleistungWechselrichter": float,
110
    "EinheitBetriebsstatus": str,
111
    "Standort": str,
112
    "Bundesland": str,
113
    "Land": str,
114
    "Landkreis": str,
115
    "Gemeinde": str,
116
    # "Postleitzahl": int,  # fails because of nan values
117
    "Ort": str,
118
    "GemeinsamerWechselrichterMitSpeicher": str,
119
    "Lage": str,
120
    "Leistungsbegrenzung": str,
121
    # this will parse nan values as false wich is not always correct
122
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
123
    "Hauptausrichtung": str,
124
    "HauptausrichtungNeigungswinkel": str,
125
    "Nebenausrichtung": str,
126
    "NebenausrichtungNeigungswinkel": str,
127
}
128
129
MASTR_PARSE_DATES = [
130
    "GeplantesInbetriebnahmedatum",
131
    "Inbetriebnahmedatum",
132
]
133
134
MASTR_INDEX_COL = "EinheitMastrNummer"
135
136
EPSG = 4326
137
SRID = 3035
138
139
# data cleaning
140
MAX_REALISTIC_PV_CAP = 23500
141
MIN_REALISTIC_PV_CAP = 0.1
142
ROUNDING = 1
143
144
# geopy
145
MIN_DELAY_SECONDS = 1
146
USER_AGENT = "rli_kh_geocoder"
147
148
# show additional logging information
149
VERBOSE = False
150
151
EXPORT_DIR = Path(__name__).resolve().parent / "data"
152
EXPORT_FILE = "mastr_geocoded.gpkg"
153
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
154
DRIVER = "GPKG"
155
156
# Number of quantiles
157
Q = 5
158
159
# Scenario Data
160
CARRIER = "solar_rooftop"
161
SCENARIOS = ["eGon2035", "eGon100RE"]
162
SCENARIO_TIMESTAMP = {
163
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
164
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
165
}
166
PV_ROOFTOP_LIFETIME = pd.Timedelta(20 * 365, unit="D")
167
168
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
169
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
170
MODUL_CAP = 0.4  # kWp
171
MODUL_SIZE = 1.096 * 1.754  # m²
172
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
173
174
# Estimation of usable roof area
175
# Factor for the conversion of building area to roof area
176
# estimation mean roof pitch: 35°
177
# estimation usable roof share: 80%
178
# estimation that only the south side of the building is used for pv
179
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
180
# AREA_FACTOR = 1.221
181
# USABLE_ROOF_SHARE = 0.8
182
# SOUTH_SHARE = 0.5
183
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
184
ROOF_FACTOR = 0.5
185
186
CAP_RANGES = [
187
    (0, 30),
188
    (30, 100),
189
    (100, float("inf")),
190
]
191
192
MIN_BUILDING_SIZE = 10.0
193
UPPER_QUNATILE = 0.95
194
LOWER_QUANTILE = 0.05
195
196
COLS_TO_RENAME = {
197
    "EinheitlicheAusrichtungUndNeigungswinkel": (
198
        "einheitliche_ausrichtung_und_neigungswinkel"
199
    ),
200
    "Hauptausrichtung": "hauptausrichtung",
201
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
202
}
203
204
COLS_TO_EXPORT = [
205
    "scenario",
206
    "bus_id",
207
    "building_id",
208
    "gens_id",
209
    "capacity",
210
    "einheitliche_ausrichtung_und_neigungswinkel",
211
    "hauptausrichtung",
212
    "hauptausrichtung_neigungswinkel",
213
    "voltage_level",
214
    "weather_cell_id",
215
]
216
217
# TODO
218
INCLUDE_SYNTHETIC_BUILDINGS = True
219
ONLY_BUILDINGS_WITH_DEMAND = True
220
TEST_RUN = False
221
222
223
def timer_func(func):
224
    @wraps(func)
225
    def timeit_wrapper(*args, **kwargs):
226
        start_time = perf_counter()
227
        result = func(*args, **kwargs)
228
        end_time = perf_counter()
229
        total_time = end_time - start_time
230
        logger.debug(
231
            f"Function {func.__name__} took {total_time:.4f} seconds."
232
        )
233
        return result
234
235
    return timeit_wrapper
236
237
238
@timer_func
239
def mastr_data(
240
    index_col: str | int | list[str] | list[int],
241
    usecols: list[str],
242
    dtype: dict[str, Any] | None,
243
    parse_dates: list[str] | None,
244
) -> pd.DataFrame:
245
    """
246
    Read MaStR data from csv.
247
248
    Parameters
249
    -----------
250
    index_col : str, int or list of str or int
251
        Column(s) to use as the row labels of the DataFrame.
252
    usecols : list of str
253
        Return a subset of the columns.
254
    dtype : dict of column (str) -> type (any), optional
255
        Data type for data or columns.
256
    parse_dates : list of names (str), optional
257
        Try to parse given columns to datetime.
258
    Returns
259
    -------
260
    pandas.DataFrame
261
        DataFrame containing MaStR data.
262
    """
263
    mastr_path = Path(
264
        WORKING_DIR_MASTR_NEW
265
        / config.datasets()["power_plants"]["sources"]["mastr_pv"]
266
    ).resolve()
267
268
    mastr_df = pd.read_csv(
269
        mastr_path,
270
        index_col=index_col,
271
        usecols=usecols,
272
        dtype=dtype,
273
        parse_dates=parse_dates,
274
    )
275
276
    mastr_df = mastr_df.loc[
277
        (mastr_df.EinheitBetriebsstatus == "InBetrieb")
278
        & (mastr_df.Land == "Deutschland")
279
        & (mastr_df.Lage == "Bauliche Anlagen (Hausdach, Gebäude und Fassade)")
280
    ]
281
282
    if (
283
        config.settings()["egon-data"]["--dataset-boundary"]
284
        == "Schleswig-Holstein"
285
    ):
286
        init_len = len(mastr_df)
287
288
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
289
290
        logger.info(
291
            f"Using only MaStR data within Schleswig-Holstein. "
292
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
293
        )
294
295
    logger.debug("MaStR data loaded.")
296
297
    return mastr_df
298
299
300
@timer_func
301
def clean_mastr_data(
302
    mastr_df: pd.DataFrame,
303
    max_realistic_pv_cap: int | float,
304
    min_realistic_pv_cap: int | float,
305
    rounding: int,
306
    seed: int,
307
) -> pd.DataFrame:
308
    """
309
    Clean the MaStR data from implausible data.
310
311
    * Drop MaStR ID duplicates.
312
    * Drop generators with implausible capacities.
313
    * Drop generators without any kind of start-up date.
314
    * Clean up Standort column and capacity.
315
316
    Parameters
317
    -----------
318
    mastr_df : pandas.DataFrame
319
        DataFrame containing MaStR data.
320
    max_realistic_pv_cap : int or float
321
        Maximum capacity, which is considered to be realistic.
322
    min_realistic_pv_cap : int or float
323
        Minimum capacity, which is considered to be realistic.
324
    rounding : int
325
        Rounding to use when cleaning up capacity. E.g. when
326
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
327
    seed : int
328
        Seed to use for random operations with NumPy and pandas.
329
    Returns
330
    -------
331
    pandas.DataFrame
332
        DataFrame containing cleaned MaStR data.
333
    """
334
    init_len = len(mastr_df)
335
336
    # drop duplicates
337
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
338
339
    # drop invalid entries in standort
340
    index_to_drop = mastr_df.loc[
341
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
342
    ].index
343
344
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
345
346
    df = mastr_df[
347
        [
348
            "Bruttoleistung",
349
            "Bruttoleistung_extended",
350
            "Nettonennleistung",
351
            "zugeordneteWirkleistungWechselrichter",
352
            "InstallierteLeistung",
353
        ]
354
    ].round(rounding)
355
356
    # use only the smallest capacity rating if multiple are given
357
    mastr_df = mastr_df.assign(
358
        capacity=[
359
            most_plausible(p_tub, min_realistic_pv_cap)
360
            for p_tub in df.itertuples(index=False)
361
        ]
362
    )
363
364
    # drop generators without any capacity info
365
    # and capacity of zero
366
    # and if the capacity is > 23.5 MW, because
367
    # Germanies largest rooftop PV is 23 MW
368
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
369
    mastr_df = mastr_df.loc[
370
        (~mastr_df.capacity.isna())
371
        & (mastr_df.capacity <= max_realistic_pv_cap)
372
        & (mastr_df.capacity > min_realistic_pv_cap)
373
    ]
374
375
    # get zip and municipality
376
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
377
        mastr_df.Standort.astype(str)
378
        .apply(
379
            zip_and_municipality_from_standort,
380
            args=(VERBOSE,),
381
        )
382
        .tolist(),
383
        index=mastr_df.index,
384
    )
385
386
    # drop invalid entries
387
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
388
389
    # add ", Deutschland" just in case
390
    mastr_df = mastr_df.assign(
391
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
392
    )
393
394
    # get consistent start-up date
395
    mastr_df = mastr_df.assign(
396
        start_up_date=mastr_df.Inbetriebnahmedatum,
397
    )
398
399
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
400
        mastr_df.start_up_date.isna()
401
    ].assign(
402
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
403
            mastr_df.start_up_date.isna()
404
        ]
405
    )
406
407
    # randomly and weighted fill missing start-up dates
408
    pool = mastr_df.loc[
409
        ~mastr_df.start_up_date.isna()
410
    ].start_up_date.to_numpy()
411
412
    size = len(mastr_df) - len(pool)
413
414
    if size > 0:
415
        np.random.seed(seed)
416
417
        choice = np.random.choice(
418
            pool,
419
            size=size,
420
            replace=False,
421
        )
422
423
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
424
            mastr_df.start_up_date.isna()
425
        ].assign(start_up_date=choice)
426
427
        logger.info(
428
            f"Randomly and weigthed added start-up date to {size} generators."
429
        )
430
431
    mastr_df = mastr_df.assign(
432
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
433
    )
434
435
    end_len = len(mastr_df)
436
    logger.debug(
437
        f"Dropped {init_len - end_len} "
438
        f"({((init_len - end_len) / init_len) * 100:g}%)"
439
        f" of {init_len} rows from MaStR DataFrame."
440
    )
441
442
    return mastr_df
443
444
445
def zip_and_municipality_from_standort(
446
    standort: str,
447
    verbose: bool = False,
448
) -> tuple[str, bool]:
449
    """
450
    Get zip code and municipality from Standort string split into a list.
451
    Parameters
452
    -----------
453
    standort : str
454
        Standort as given from MaStR data.
455
    verbose : bool
456
        Logs additional info if True.
457
    Returns
458
    -------
459
    str
460
        Standort with only the zip code and municipality
461
        as well a ', Germany' added.
462
    """
463
    if verbose:
464
        logger.debug(f"Uncleaned String: {standort}")
465
466
    standort_list = standort.split()
467
468
    found = False
469
    count = 0
470
471
    for count, elem in enumerate(standort_list):
472
        if len(elem) != 5:
473
            continue
474
        if not elem.isnumeric():
475
            continue
476
477
        found = True
478
479
        break
480
481
    if found:
482
        cleaned_str = " ".join(standort_list[count:])
483
484
        if verbose:
485
            logger.debug(f"Cleaned String:   {cleaned_str}")
486
487
        return cleaned_str, found
488
489
    logger.warning(
490
        "Couldn't identify zip code. This entry will be dropped."
491
        f" Original standort: {standort}."
492
    )
493
494
    return standort, found
495
496
497
def most_plausible(
498
    p_tub: tuple,
499
    min_realistic_pv_cap: int | float,
500
) -> float:
501
    """
502
    Try to determine the most plausible capacity.
503
    Try to determine the most plausible capacity from a given
504
    generator from MaStR data.
505
    Parameters
506
    -----------
507
    p_tub : tuple
508
        Tuple containing the different capacities given in
509
        the MaStR data.
510
    min_realistic_pv_cap : int or float
511
        Minimum capacity, which is considered to be realistic.
512
    Returns
513
    -------
514
    float
515
        Capacity of the generator estimated as the most realistic.
516
    """
517
    count = Counter(p_tub).most_common(3)
518
519
    if len(count) == 1:
520
        return count[0][0]
521
522
    val1 = count[0][0]
523
    val2 = count[1][0]
524
525
    if len(count) == 2:
526
        min_val = min(val1, val2)
527
        max_val = max(val1, val2)
528
    else:
529
        val3 = count[2][0]
530
531
        min_val = min(val1, val2, val3)
532
        max_val = max(val1, val2, val3)
533
534
    if min_val < min_realistic_pv_cap:
535
        return max_val
536
537
    return min_val
538
539
540
def geocoder(
541
    user_agent: str,
542
    min_delay_seconds: int,
543
) -> RateLimiter:
544
    """
545
    Setup Nominatim geocoding class.
546
    Parameters
547
    -----------
548
    user_agent : str
549
        The app name.
550
    min_delay_seconds : int
551
        Delay in seconds to use between requests to Nominatim.
552
        A minimum of 1 is advised.
553
    Returns
554
    -------
555
    geopy.extra.rate_limiter.RateLimiter
556
        Nominatim RateLimiter geocoding class to use for geocoding.
557
    """
558
    locator = Nominatim(user_agent=user_agent)
559
    return RateLimiter(
560
        locator.geocode,
561
        min_delay_seconds=min_delay_seconds,
562
    )
563
564
565
def geocoding_data(
566
    clean_mastr_df: pd.DataFrame,
567
) -> pd.DataFrame:
568
    """
569
    Setup DataFrame to geocode.
570
    Parameters
571
    -----------
572
    clean_mastr_df : pandas.DataFrame
573
        DataFrame containing cleaned MaStR data.
574
    Returns
575
    -------
576
    pandas.DataFrame
577
        DataFrame containing all unique combinations of
578
        zip codes with municipalities for geocoding.
579
    """
580
    return pd.DataFrame(
581
        data=clean_mastr_df.zip_and_municipality.unique(),
582
        columns=["zip_and_municipality"],
583
    )
584
585
586
@timer_func
587
def geocode_data(
588
    geocoding_df: pd.DataFrame,
589
    ratelimiter: RateLimiter,
590
    epsg: int,
591
) -> gpd.GeoDataFrame:
592
    """
593
    Geocode zip code and municipality.
594
    Extract latitude, longitude and altitude.
595
    Transfrom latitude and longitude to shapely
596
    Point and return a geopandas GeoDataFrame.
597
    Parameters
598
    -----------
599
    geocoding_df : pandas.DataFrame
600
        DataFrame containing all unique combinations of
601
        zip codes with municipalities for geocoding.
602
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
603
        Nominatim RateLimiter geocoding class to use for geocoding.
604
    epsg : int
605
        EPSG ID to use as CRS.
606
    Returns
607
    -------
608
    geopandas.GeoDataFrame
609
        GeoDataFrame containing all unique combinations of
610
        zip codes with municipalities with matching geolocation.
611
    """
612
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
613
614
    geocode_df = geocoding_df.assign(
615
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
616
    )
617
618
    geocode_df = geocode_df.assign(
619
        point=geocode_df.location.apply(
620
            lambda loc: tuple(loc.point) if loc else None
621
        )
622
    )
623
624
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
625
        geocode_df.point.tolist(), index=geocode_df.index
626
    )
627
628
    return gpd.GeoDataFrame(
629
        geocode_df,
630
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
631
        crs=f"EPSG:{epsg}",
632
    )
633
634
635
def merge_geocode_with_mastr(
636
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
637
) -> gpd.GeoDataFrame:
638
    """
639
    Merge geometry to original mastr data.
640
    Parameters
641
    -----------
642
    clean_mastr_df : pandas.DataFrame
643
        DataFrame containing cleaned MaStR data.
644
    geocode_gdf : geopandas.GeoDataFrame
645
        GeoDataFrame containing all unique combinations of
646
        zip codes with municipalities with matching geolocation.
647
    Returns
648
    -------
649
    gepandas.GeoDataFrame
650
        GeoDataFrame containing cleaned MaStR data with
651
        matching geolocation from geocoding.
652
    """
653
    return gpd.GeoDataFrame(
654
        clean_mastr_df.merge(
655
            geocode_gdf[["zip_and_municipality", "geometry"]],
656
            how="left",
657
            left_on="zip_and_municipality",
658
            right_on="zip_and_municipality",
659
        ),
660
        crs=geocode_gdf.crs,
661
    ).set_index(clean_mastr_df.index)
662
663
664
def drop_invalid_entries_from_gdf(
665
    gdf: gpd.GeoDataFrame,
666
) -> gpd.GeoDataFrame:
667
    """
668
    Drop invalid entries from geopandas GeoDataFrame.
669
    TODO: how to omit the logging from geos here???
670
    Parameters
671
    -----------
672
    gdf : geopandas.GeoDataFrame
673
        GeoDataFrame to be checked for validity.
674
    Returns
675
    -------
676
    gepandas.GeoDataFrame
677
        GeoDataFrame with rows with invalid geometries
678
        dropped.
679
    """
680
    valid_gdf = gdf.loc[gdf.is_valid]
681
682
    logger.debug(
683
        f"{len(gdf) - len(valid_gdf)} "
684
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
685
        f"of {len(gdf)} values were invalid and are dropped."
686
    )
687
688
    return valid_gdf
689
690
691
@timer_func
692
def municipality_data() -> gpd.GeoDataFrame:
693
    """
694
    Get municipality data from eGo^n Database.
695
    Returns
696
    -------
697
    gepandas.GeoDataFrame
698
        GeoDataFrame with municipality data.
699
    """
700
    with db.session_scope() as session:
701
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
702
703
    return gpd.read_postgis(
704
        query.statement, query.session.bind, index_col="ags"
705
    )
706
707
708
@timer_func
709
def add_ags_to_gens(
710
    valid_mastr_gdf: gpd.GeoDataFrame,
711
    municipalities_gdf: gpd.GeoDataFrame,
712
) -> gpd.GeoDataFrame:
713
    """
714
    Add information about AGS ID to generators.
715
    Parameters
716
    -----------
717
    valid_mastr_gdf : geopandas.GeoDataFrame
718
        GeoDataFrame with valid and cleaned MaStR data.
719
    municipalities_gdf : geopandas.GeoDataFrame
720
        GeoDataFrame with municipality data.
721
    Returns
722
    -------
723
    gepandas.GeoDataFrame
724
        GeoDataFrame with valid and cleaned MaStR data
725
        with AGS ID added.
726
    """
727
    return valid_mastr_gdf.sjoin(
728
        municipalities_gdf,
729
        how="left",
730
        predicate="intersects",
731
    ).rename(columns={"index_right": "ags"})
732
733
734
def drop_gens_outside_muns(
735
    valid_mastr_gdf: gpd.GeoDataFrame,
736
) -> gpd.GeoDataFrame:
737
    """
738
    Drop all generators outside of municipalities.
739
    Parameters
740
    -----------
741
    valid_mastr_gdf : geopandas.GeoDataFrame
742
        GeoDataFrame with valid and cleaned MaStR data.
743
    Returns
744
    -------
745
    gepandas.GeoDataFrame
746
        GeoDataFrame with valid and cleaned MaStR data
747
        with generatos without an AGS ID dropped.
748
    """
749
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
750
751
    logger.debug(
752
        f"{len(valid_mastr_gdf) - len(gdf)} "
753
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
754
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
755
        " and are therefore dropped."
756
    )
757
758
    return gdf
759
760
761
class EgonMastrPvRoofGeocoded(Base):
762
    __tablename__ = "egon_mastr_pv_roof_geocoded"
763
    __table_args__ = {"schema": "supply"}
764
765
    zip_and_municipality = Column(String, primary_key=True, index=True)
766
    location = Column(String)
767
    point = Column(String)
768
    latitude = Column(Float)
769
    longitude = Column(Float)
770
    altitude = Column(Float)
771
    geometry = Column(Geometry(srid=EPSG))
772
773
774
def create_geocoded_table(geocode_gdf):
775
    """
776
    Create geocoded table mastr pv rooftop
777
    Parameters
778
    -----------
779
    geocode_gdf : geopandas.GeoDataFrame
780
        GeoDataFrame containing geocoding information on pv rooftop locations.
781
    """
782
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
783
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
784
785
    geocode_gdf.to_postgis(
786
        name=EgonMastrPvRoofGeocoded.__table__.name,
787
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
788
        con=db.engine(),
789
        if_exists="append",
790
        index=False,
791
        # dtype={}
792
    )
793
794
795
def geocoded_data_from_db(
796
    epsg: str | int,
797
) -> gpd.GeoDataFrame:
798
    """
799
    Read OSM buildings data from eGo^n Database.
800
    Parameters
801
    -----------
802
    to_crs : pyproj.crs.crs.CRS
803
        CRS to transform geometries to.
804
    Returns
805
    -------
806
    geopandas.GeoDataFrame
807
        GeoDataFrame containing OSM buildings data.
808
    """
809
    with db.session_scope() as session:
810
        query = session.query(
811
            EgonMastrPvRoofGeocoded.zip_and_municipality,
812
            EgonMastrPvRoofGeocoded.geometry,
813
        )
814
815
    return gpd.read_postgis(
816
        query.statement, query.session.bind, geom_col="geometry"
817
    ).to_crs(f"EPSG:{epsg}")
818
819
820
def load_mastr_data():
821
    """Read PV rooftop data from MaStR CSV
822
    Note: the source will be replaced as soon as the MaStR data is available
823
    in DB.
824
    Returns
825
    -------
826
    geopandas.GeoDataFrame
827
        GeoDataFrame containing MaStR data with geocoded locations.
828
    """
829
    mastr_df = mastr_data(
830
        MASTR_INDEX_COL,
831
        MASTR_RELEVANT_COLS,
832
        MASTR_DTYPES,
833
        MASTR_PARSE_DATES,
834
    )
835
836
    clean_mastr_df = clean_mastr_data(
837
        mastr_df,
838
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
839
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
840
        seed=SEED,
841
        rounding=ROUNDING,
842
    )
843
844
    geocode_gdf = geocoded_data_from_db(EPSG)
845
846
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
847
848
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
849
850
    municipalities_gdf = municipality_data()
851
852
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
853
854
    return drop_gens_outside_muns(valid_mastr_gdf)
855
856
857
class OsmBuildingsFiltered(Base):
858
    __tablename__ = "osm_buildings_filtered"
859
    __table_args__ = {"schema": "openstreetmap"}
860
861
    osm_id = Column(BigInteger)
862
    amenity = Column(String)
863
    building = Column(String)
864
    name = Column(String)
865
    geom = Column(Geometry(srid=SRID), index=True)
866
    area = Column(Float)
867
    geom_point = Column(Geometry(srid=SRID), index=True)
868
    tags = Column(HSTORE)
869
    id = Column(BigInteger, primary_key=True, index=True)
870
871
872
@timer_func
873
def osm_buildings(
874
    to_crs: CRS,
875
) -> gpd.GeoDataFrame:
876
    """
877
    Read OSM buildings data from eGo^n Database.
878
    Parameters
879
    -----------
880
    to_crs : pyproj.crs.crs.CRS
881
        CRS to transform geometries to.
882
    Returns
883
    -------
884
    geopandas.GeoDataFrame
885
        GeoDataFrame containing OSM buildings data.
886
    """
887
    with db.session_scope() as session:
888
        query = session.query(
889
            OsmBuildingsFiltered.id,
890
            OsmBuildingsFiltered.area,
891
            OsmBuildingsFiltered.geom_point.label("geom"),
892
        )
893
894
    return gpd.read_postgis(
895
        query.statement, query.session.bind, index_col="id"
896
    ).to_crs(to_crs)
897
898
899
@timer_func
900
def synthetic_buildings(
901
    to_crs: CRS,
902
) -> gpd.GeoDataFrame:
903
    """
904
    Read synthetic buildings data from eGo^n Database.
905
    Parameters
906
    -----------
907
    to_crs : pyproj.crs.crs.CRS
908
        CRS to transform geometries to.
909
    Returns
910
    -------
911
    geopandas.GeoDataFrame
912
        GeoDataFrame containing OSM buildings data.
913
    """
914
    with db.session_scope() as session:
915
        query = session.query(
916
            OsmBuildingsSynthetic.id,
917
            OsmBuildingsSynthetic.area,
918
            OsmBuildingsSynthetic.geom_point.label("geom"),
919
        )
920
921
    return gpd.read_postgis(
922
        query.statement, query.session.bind, index_col="id"
923
    ).to_crs(to_crs)
924
925
926
@timer_func
927
def add_ags_to_buildings(
928
    buildings_gdf: gpd.GeoDataFrame,
929
    municipalities_gdf: gpd.GeoDataFrame,
930
) -> gpd.GeoDataFrame:
931
    """
932
    Add information about AGS ID to buildings.
933
    Parameters
934
    -----------
935
    buildings_gdf : geopandas.GeoDataFrame
936
        GeoDataFrame containing OSM buildings data.
937
    municipalities_gdf : geopandas.GeoDataFrame
938
        GeoDataFrame with municipality data.
939
    Returns
940
    -------
941
    gepandas.GeoDataFrame
942
        GeoDataFrame containing OSM buildings data
943
        with AGS ID added.
944
    """
945
    return buildings_gdf.sjoin(
946
        municipalities_gdf,
947
        how="left",
948
        predicate="intersects",
949
    ).rename(columns={"index_right": "ags"})
950
951
952
def drop_buildings_outside_muns(
953
    buildings_gdf: gpd.GeoDataFrame,
954
) -> gpd.GeoDataFrame:
955
    """
956
    Drop all buildings outside of municipalities.
957
    Parameters
958
    -----------
959
    buildings_gdf : geopandas.GeoDataFrame
960
        GeoDataFrame containing OSM buildings data.
961
    Returns
962
    -------
963
    gepandas.GeoDataFrame
964
        GeoDataFrame containing OSM buildings data
965
        with buildings without an AGS ID dropped.
966
    """
967
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
968
969
    logger.debug(
970
        f"{len(buildings_gdf) - len(gdf)} "
971
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
972
        f"of {len(buildings_gdf)} values are outside of the municipalities "
973
        "and are therefore dropped."
974
    )
975
976
    return gdf
977
978
979
def egon_building_peak_loads():
980
    sql = """
981
    SELECT building_id
982
    FROM demand.egon_building_electricity_peak_loads
983
    WHERE scenario = 'eGon2035'
984
    """
985
986
    return (
987
        db.select_dataframe(sql).building_id.astype(int).sort_values().unique()
988
    )
989
990
991
@timer_func
992
def load_building_data():
993
    """
994
    Read buildings from DB
995
    Tables:
996
997
    * `openstreetmap.osm_buildings_filtered` (from OSM)
998
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
999
1000
    Use column `id` for both as it is unique hence you concat both datasets.
1001
    If INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be
1002
    loaded.
1003
1004
    Returns
1005
    -------
1006
    gepandas.GeoDataFrame
1007
        GeoDataFrame containing OSM buildings data with buildings without an
1008
        AGS ID dropped.
1009
    """
1010
1011
    municipalities_gdf = municipality_data()
1012
1013
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
1014
1015
    if INCLUDE_SYNTHETIC_BUILDINGS:
1016
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
1017
1018
        buildings_gdf = gpd.GeoDataFrame(
1019
            pd.concat(
1020
                [
1021
                    osm_buildings_gdf,
1022
                    synthetic_buildings_gdf,
1023
                ]
1024
            ),
1025
            geometry="geom",
1026
            crs=osm_buildings_gdf.crs,
1027
        ).rename(columns={"area": "building_area"})
1028
1029
        buildings_gdf.index = buildings_gdf.index.astype(int)
1030
1031
    else:
1032
        buildings_gdf = osm_buildings_gdf.rename(
1033
            columns={"area": "building_area"}
1034
        )
1035
1036
    if ONLY_BUILDINGS_WITH_DEMAND:
1037
        building_ids = egon_building_peak_loads()
1038
1039
        init_len = len(building_ids)
1040
1041
        building_ids = np.intersect1d(
1042
            list(map(int, building_ids)),
1043
            list(map(int, buildings_gdf.index.to_numpy())),
1044
        )
1045
1046
        end_len = len(building_ids)
1047
1048
        logger.debug(
1049
            f"{end_len/init_len * 100: g} % ({end_len} / {init_len}) "
1050
            f"of buildings have peak load."
1051
        )
1052
1053
        buildings_gdf = buildings_gdf.loc[building_ids]
1054
1055
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1056
1057
    buildings_ags_gdf = drop_buildings_outside_muns(buildings_ags_gdf)
1058
1059
    grid_districts_gdf = grid_districts(EPSG)
1060
1061
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
1062
1063
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
1064
        grid_districts_gdf,
1065
        federal_state_gdf,
1066
    )
1067
1068
    buildings_overlay_gdf = add_overlay_id_to_buildings(
1069
        buildings_ags_gdf,
1070
        grid_federal_state_gdf,
1071
    )
1072
1073
    logger.debug("Loaded buildings.")
1074
1075
    buildings_overlay_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
1076
1077
    # overwrite bus_id with data from new table
1078
    sql = (
1079
        "SELECT building_id, bus_id FROM "
1080
        "boundaries.egon_map_zensus_mvgd_buildings"
1081
    )
1082
    map_building_bus_df = db.select_dataframe(sql)
1083
1084
    building_ids = np.intersect1d(
1085
        list(map(int, map_building_bus_df.building_id.unique())),
1086
        list(map(int, buildings_overlay_gdf.index.to_numpy())),
1087
    )
1088
1089
    buildings_within_gdf = buildings_overlay_gdf.loc[building_ids]
1090
1091
    gdf = (
1092
        buildings_within_gdf.reset_index()
1093
        .drop(columns=["bus_id"])
1094
        .merge(
1095
            how="left",
1096
            right=map_building_bus_df,
1097
            left_on="id",
1098
            right_on="building_id",
1099
        )
1100
        .drop(columns=["building_id"])
1101
        .set_index("id")
1102
        .sort_index()
1103
    )
1104
1105
    return gdf[~gdf.index.duplicated(keep="first")]
1106
1107
1108
@timer_func
1109
def sort_and_qcut_df(
1110
    df: pd.DataFrame | gpd.GeoDataFrame,
1111
    col: str,
1112
    q: int,
1113
) -> pd.DataFrame | gpd.GeoDataFrame:
1114
    """
1115
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1116
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1117
    Parameters
1118
    -----------
1119
    df : pandas.DataFrame or geopandas.GeoDataFrame
1120
        (Geo)DataFrame to sort and qcut.
1121
    col : str
1122
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1123
    q : int
1124
        Number of quantiles.
1125
    Returns
1126
    -------
1127
    pandas.DataFrame or gepandas.GeoDataFrame
1128
        Sorted and qcut (Geo)DataFrame.
1129
    """
1130
    df = df.sort_values(col, ascending=True)
1131
1132
    return df.assign(
1133
        quant=pd.qcut(
1134
            df[col],
1135
            q=q,
1136
            labels=range(q),
1137
        )
1138
    )
1139
1140
1141
@timer_func
1142
def allocate_pv(
1143
    q_mastr_gdf: gpd.GeoDataFrame,
1144
    q_buildings_gdf: gpd.GeoDataFrame,
1145
    seed: int,
1146
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1147
    """
1148
    Allocate the MaStR pv generators to the OSM buildings.
1149
    This will determine a building for each pv generator if there are more
1150
    buildings than generators within a given AGS. Primarily generators are
1151
    distributed with the same qunatile as the buildings. Multiple assignment
1152
    is excluded.
1153
    Parameters
1154
    -----------
1155
    q_mastr_gdf : geopandas.GeoDataFrame
1156
        GeoDataFrame containing geocoded and qcut MaStR data.
1157
    q_buildings_gdf : geopandas.GeoDataFrame
1158
        GeoDataFrame containing qcut OSM buildings data.
1159
    seed : int
1160
        Seed to use for random operations with NumPy and pandas.
1161
    Returns
1162
    -------
1163
    tuple with two geopandas.GeoDataFrame s
1164
        GeoDataFrame containing MaStR data allocated to building IDs.
1165
        GeoDataFrame containing building data allocated to MaStR IDs.
1166
    """
1167
    rng = default_rng(seed=seed)
1168
1169
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan).sort_values(
1170
        by=["ags", "quant"]
1171
    )
1172
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan).sort_values(
1173
        by=["ags", "quant"]
1174
    )
1175
1176
    ags_list = q_buildings_gdf.ags.unique()
1177
1178
    if TEST_RUN:
1179
        ags_list = ags_list[:250]
1180
1181
    num_ags = len(ags_list)
1182
1183
    t0 = perf_counter()
1184
1185
    for count, ags in enumerate(ags_list):
1186
1187
        buildings = q_buildings_gdf.loc[q_buildings_gdf.ags == ags]
1188
        gens = q_mastr_gdf.loc[q_mastr_gdf.ags == ags]
1189
1190
        len_build = len(buildings)
1191
        len_gens = len(gens)
1192
1193
        if len_build < len_gens:
1194
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1195
            logger.error(
1196
                f"There are {len_gens} generators and only {len_build}"
1197
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1198
                "generators were truncated to match the amount of buildings."
1199
            )
1200
1201
            assert len_build == len(gens)
1202
1203
        for quant in gens.quant.unique():
1204
            q_buildings = buildings.loc[buildings.quant == quant]
1205
            q_gens = gens.loc[gens.quant == quant]
1206
1207
            len_build = len(q_buildings)
1208
            len_gens = len(q_gens)
1209
1210
            if len_build < len_gens:
1211
                delta = len_gens - len_build
1212
1213
                logger.warning(
1214
                    f"There are {len_gens} generators and only {len_build} "
1215
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1216
                    f"buildings from AGS {ags} will be added randomly."
1217
                )
1218
1219
                add_buildings = pd.Index(
1220
                    rng.choice(
1221
                        list(set(buildings.index) - set(q_buildings.index)),
1222
                        size=delta,
1223
                        replace=False,
1224
                    )
1225
                )
1226
1227
                chosen_buildings = q_buildings.index.append(add_buildings)
1228
1229
            else:
1230
                chosen_buildings = rng.choice(
1231
                    q_buildings.index,
1232
                    size=len_gens,
1233
                    replace=False,
1234
                )
1235
1236
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1237
            buildings = buildings.drop(chosen_buildings)
1238
1239
        if count % 500 == 0:
1240
            logger.debug(
1241
                f"Allocation of {count / num_ags * 100:g} % of AGS done. "
1242
                f"It took {perf_counter() - t0:g} seconds."
1243
            )
1244
1245
            t0 = perf_counter()
1246
1247
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1248
1249
    assert len(assigned_buildings) == len(assigned_buildings.gens_id.unique())
1250
1251
    q_mastr_gdf.loc[
1252
        assigned_buildings.gens_id, "building_id"
1253
    ] = assigned_buildings.index
1254
1255
    assigned_gens = q_mastr_gdf.loc[~q_mastr_gdf.building_id.isna()]
1256
1257
    assert len(assigned_buildings) == len(assigned_gens)
1258
1259
    logger.debug("Allocated status quo generators to buildings.")
1260
1261
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1262
1263
1264
def frame_to_numeric(
1265
    df: pd.DataFrame | gpd.GeoDataFrame,
1266
) -> pd.DataFrame | gpd.GeoDataFrame:
1267
    """
1268
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1269
    Parameters
1270
    ----------
1271
    df : pandas.DataFrame or geopandas.GeoDataFrame
1272
    Returns
1273
    -------
1274
    pandas.DataFrame or geopandas.GeoDataFrame
1275
    """
1276
    if str(df.index.dtype) == "object":
1277
        df.index = pd.to_numeric(df.index, errors="ignore")
1278
1279
    for col in df.columns:
1280
        if str(df[col].dtype) == "object":
1281
            df[col] = pd.to_numeric(df[col], errors="ignore")
1282
1283
    return df
1284
1285
1286
def validate_output(
1287
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1288
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1289
) -> None:
1290
    """
1291
    Validate output.
1292
1293
    * Validate that there are exactly as many buildings with a pv system as
1294
      there are pv systems with a building
1295
    * Validate that the building IDs with a pv system are the same building
1296
      IDs as assigned to the pv systems
1297
    * Validate that the pv system IDs with a building are the same pv system
1298
      IDs as assigned to the buildings
1299
1300
    Parameters
1301
    -----------
1302
    desagg_mastr_gdf : geopandas.GeoDataFrame
1303
        GeoDataFrame containing MaStR data allocated to building IDs.
1304
    desagg_buildings_gdf : geopandas.GeoDataFrame
1305
        GeoDataFrame containing building data allocated to MaStR IDs.
1306
    """
1307
    assert len(
1308
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1309
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1310
    assert (
1311
        np.sort(
1312
            desagg_mastr_gdf.loc[
1313
                ~desagg_mastr_gdf.building_id.isna()
1314
            ].building_id.unique()
1315
        )
1316
        == np.sort(
1317
            desagg_buildings_gdf.loc[
1318
                ~desagg_buildings_gdf.gens_id.isna()
1319
            ].index.unique()
1320
        )
1321
    ).all()
1322
    assert (
1323
        np.sort(
1324
            desagg_mastr_gdf.loc[
1325
                ~desagg_mastr_gdf.building_id.isna()
1326
            ].index.unique()
1327
        )
1328
        == np.sort(
1329
            desagg_buildings_gdf.loc[
1330
                ~desagg_buildings_gdf.gens_id.isna()
1331
            ].gens_id.unique()
1332
        )
1333
    ).all()
1334
1335
    logger.debug("Validated output.")
1336
1337
1338
def drop_unallocated_gens(
1339
    gdf: gpd.GeoDataFrame,
1340
) -> gpd.GeoDataFrame:
1341
    """
1342
    Drop generators which did not get allocated.
1343
1344
    Parameters
1345
    -----------
1346
    gdf : geopandas.GeoDataFrame
1347
        GeoDataFrame containing MaStR data allocated to building IDs.
1348
    Returns
1349
    -------
1350
    geopandas.GeoDataFrame
1351
        GeoDataFrame containing MaStR data with generators dropped which did
1352
        not get allocated.
1353
    """
1354
    init_len = len(gdf)
1355
    gdf = gdf.loc[~gdf.building_id.isna()]
1356
    end_len = len(gdf)
1357
1358
    logger.debug(
1359
        f"Dropped {init_len - end_len} "
1360
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1361
        f" of {init_len} unallocated rows from MaStR DataFrame."
1362
    )
1363
1364
    return gdf
1365
1366
1367
@timer_func
1368
def allocate_to_buildings(
1369
    mastr_gdf: gpd.GeoDataFrame,
1370
    buildings_gdf: gpd.GeoDataFrame,
1371
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1372
    """
1373
    Allocate status quo pv rooftop generators to buildings.
1374
    Parameters
1375
    -----------
1376
    mastr_gdf : geopandas.GeoDataFrame
1377
        GeoDataFrame containing MaStR data with geocoded locations.
1378
    buildings_gdf : geopandas.GeoDataFrame
1379
        GeoDataFrame containing OSM buildings data with buildings without an
1380
        AGS ID dropped.
1381
    Returns
1382
    -------
1383
    tuple with two geopandas.GeoDataFrame s
1384
        GeoDataFrame containing MaStR data allocated to building IDs.
1385
        GeoDataFrame containing building data allocated to MaStR IDs.
1386
    """
1387
    logger.debug("Starting allocation of status quo.")
1388
1389
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1390
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1391
1392
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1393
        q_mastr_gdf, q_buildings_gdf, SEED
1394
    )
1395
1396
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1397
1398
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1399
1400
1401
@timer_func
1402
def grid_districts(
1403
    epsg: int,
1404
) -> gpd.GeoDataFrame:
1405
    """
1406
    Load mv grid district geo data from eGo^n Database as
1407
    geopandas.GeoDataFrame.
1408
    Parameters
1409
    -----------
1410
    epsg : int
1411
        EPSG ID to use as CRS.
1412
    Returns
1413
    -------
1414
    geopandas.GeoDataFrame
1415
        GeoDataFrame containing mv grid district ID and geo shapes data.
1416
    """
1417
    gdf = db.select_geodataframe(
1418
        """
1419
        SELECT bus_id, geom
1420
        FROM grid.egon_mv_grid_district
1421
        ORDER BY bus_id
1422
        """,
1423
        index_col="bus_id",
1424
        geom_col="geom",
1425
        epsg=epsg,
1426
    )
1427
1428
    gdf.index = gdf.index.astype(int)
1429
1430
    logger.debug("Grid districts loaded.")
1431
1432
    return gdf
1433
1434
1435
def scenario_data(
1436
    carrier: str = "solar_rooftop",
1437
    scenario: str = "eGon2035",
1438
) -> pd.DataFrame:
1439
    """
1440
    Get scenario capacity data from eGo^n Database.
1441
    Parameters
1442
    -----------
1443
    carrier : str
1444
        Carrier type to filter table by.
1445
    scenario : str
1446
        Scenario to filter table by.
1447
    Returns
1448
    -------
1449
    geopandas.GeoDataFrame
1450
        GeoDataFrame with scenario capacity data in GW.
1451
    """
1452
    with db.session_scope() as session:
1453
        query = session.query(EgonScenarioCapacities).filter(
1454
            EgonScenarioCapacities.carrier == carrier,
1455
            EgonScenarioCapacities.scenario_name == scenario,
1456
        )
1457
1458
    df = pd.read_sql(
1459
        query.statement, query.session.bind, index_col="index"
1460
    ).sort_index()
1461
1462
    logger.debug("Scenario capacity data loaded.")
1463
1464
    return df
1465
1466
1467 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1468
    __tablename__ = "vg250_lan"
1469
    __table_args__ = {"schema": "boundaries"}
1470
1471
    id = Column(BigInteger, primary_key=True, index=True)
1472
    ade = Column(BigInteger)
1473
    gf = Column(BigInteger)
1474
    bsg = Column(BigInteger)
1475
    ars = Column(String)
1476
    ags = Column(String)
1477
    sdv_ars = Column(String)
1478
    gen = Column(String)
1479
    bez = Column(String)
1480
    ibz = Column(BigInteger)
1481
    bem = Column(String)
1482
    nbd = Column(String)
1483
    sn_l = Column(String)
1484
    sn_r = Column(String)
1485
    sn_k = Column(String)
1486
    sn_v1 = Column(String)
1487
    sn_v2 = Column(String)
1488
    sn_g = Column(String)
1489
    fk_s3 = Column(String)
1490
    nuts = Column(String)
1491
    ars_0 = Column(String)
1492
    ags_0 = Column(String)
1493
    wsk = Column(String)
1494
    debkg_id = Column(String)
1495
    rs = Column(String)
1496
    sdv_rs = Column(String)
1497
    rs_0 = Column(String)
1498
    geometry = Column(Geometry(srid=EPSG), index=True)
1499
1500
1501
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1502
    """
1503
    Get feder state data from eGo^n Database.
1504
    Parameters
1505
    -----------
1506
    to_crs : pyproj.crs.crs.CRS
1507
        CRS to transform geometries to.
1508
    Returns
1509
    -------
1510
    geopandas.GeoDataFrame
1511
        GeoDataFrame with federal state data.
1512
    """
1513
    with db.session_scope() as session:
1514
        query = session.query(
1515
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1516
        )
1517
1518
        gdf = gpd.read_postgis(
1519
            query.statement, session.connection(), index_col="id"
1520
        ).to_crs(to_crs)
1521
1522
    logger.debug("Federal State data loaded.")
1523
1524
    return gdf
1525
1526
1527
@timer_func
1528
def overlay_grid_districts_with_counties(
1529
    mv_grid_district_gdf: gpd.GeoDataFrame,
1530
    federal_state_gdf: gpd.GeoDataFrame,
1531
) -> gpd.GeoDataFrame:
1532
    """
1533
    Calculate the intersections of mv grid districts and counties.
1534
    Parameters
1535
    -----------
1536
    mv_grid_district_gdf : gpd.GeoDataFrame
1537
        GeoDataFrame containing mv grid district ID and geo shapes data.
1538
    federal_state_gdf : gpd.GeoDataFrame
1539
        GeoDataFrame with federal state data.
1540
    Returns
1541
    -------
1542
    geopandas.GeoDataFrame
1543
        GeoDataFrame containing OSM buildings data.
1544
    """
1545
    logger.debug(
1546
        "Calculating intersection overlay between mv grid districts and "
1547
        "counties. This may take a while..."
1548
    )
1549
1550
    gdf = gpd.overlay(
1551
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1552
        mv_grid_district_gdf.reset_index(),
1553
        how="intersection",
1554
        keep_geom_type=True,
1555
    )
1556
1557
    logger.debug("Done!")
1558
1559
    return gdf
1560
1561
1562
@timer_func
1563
def add_overlay_id_to_buildings(
1564
    buildings_gdf: gpd.GeoDataFrame,
1565
    grid_federal_state_gdf: gpd.GeoDataFrame,
1566
) -> gpd.GeoDataFrame:
1567
    """
1568
    Add information about overlay ID to buildings.
1569
    Parameters
1570
    -----------
1571
    buildings_gdf : geopandas.GeoDataFrame
1572
        GeoDataFrame containing OSM buildings data.
1573
    grid_federal_state_gdf : geopandas.GeoDataFrame
1574
        GeoDataFrame with intersection shapes between counties and grid
1575
        districts.
1576
    Returns
1577
    -------
1578
    geopandas.GeoDataFrame
1579
        GeoDataFrame containing OSM buildings data with overlay ID added.
1580
    """
1581
    gdf = (
1582
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1583
        .sjoin(
1584
            grid_federal_state_gdf,
1585
            how="left",
1586
            predicate="intersects",
1587
        )
1588
        .rename(columns={"index_right": "overlay_id"})
1589
    )
1590
1591
    logger.debug("Added overlay ID to OSM buildings.")
1592
1593
    return gdf
1594
1595
1596
def drop_buildings_outside_grids(
1597
    buildings_gdf: gpd.GeoDataFrame,
1598
) -> gpd.GeoDataFrame:
1599
    """
1600
    Drop all buildings outside of grid areas.
1601
    Parameters
1602
    -----------
1603
    buildings_gdf : geopandas.GeoDataFrame
1604
        GeoDataFrame containing OSM buildings data.
1605
    Returns
1606
    -------
1607
    gepandas.GeoDataFrame
1608
        GeoDataFrame containing OSM buildings data
1609
        with buildings without an bus ID dropped.
1610
    """
1611
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1612
1613
    logger.debug(
1614
        f"{len(buildings_gdf) - len(gdf)} "
1615
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1616
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1617
        "and are therefore dropped."
1618
    )
1619
1620
    return gdf
1621
1622
1623
def cap_per_bus_id(
1624
    scenario: str,
1625
) -> pd.DataFrame:
1626
    """
1627
    Get table with total pv rooftop capacity per grid district.
1628
1629
    Parameters
1630
    -----------
1631
    scenario : str
1632
        Scenario name.
1633
    Returns
1634
    -------
1635
    pandas.DataFrame
1636
        DataFrame with total rooftop capacity per mv grid.
1637
    """
1638
    targets = config.datasets()["solar_rooftop"]["targets"]
1639
1640
    sql = f"""
1641
    SELECT bus as bus_id, control, p_nom as capacity
1642
    FROM {targets['generators']['schema']}.{targets['generators']['table']}
1643
    WHERE carrier = 'solar_rooftop'
1644
    AND scn_name = '{scenario}'
1645
    """
1646
    # TODO: woher kommen die Slack rows???
1647
1648
    df = db.select_dataframe(sql, index_col="bus_id")
1649
1650
    return df.loc[df.control != "Slack"]
1651
1652
    # overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1653
    #
1654
    # for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1655
    #     nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1656
    #
1657
    #     capacity = nuts_gdf.building_area.multiply(
1658
    #         cap / nuts_gdf.building_area.sum()
1659
    #     )
1660
    #
1661
    #     overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1662
    #         nuts_gdf.index
1663
    #     ].assign(capacity=capacity.multiply(conversion).to_numpy())
1664
    #
1665
    # return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1666
1667
1668
def determine_end_of_life_gens(
1669
    mastr_gdf: gpd.GeoDataFrame,
1670
    scenario_timestamp: pd.Timestamp,
1671
    pv_rooftop_lifetime: pd.Timedelta,
1672
) -> gpd.GeoDataFrame:
1673
    """
1674
    Determine if an old PV system has reached its end of life.
1675
    Parameters
1676
    -----------
1677
    mastr_gdf : geopandas.GeoDataFrame
1678
        GeoDataFrame containing geocoded MaStR data.
1679
    scenario_timestamp : pandas.Timestamp
1680
        Timestamp at which the scenario takes place.
1681
    pv_rooftop_lifetime : pandas.Timedelta
1682
        Average expected lifetime of PV rooftop systems.
1683
    Returns
1684
    -------
1685
    geopandas.GeoDataFrame
1686
        GeoDataFrame containing geocoded MaStR data and info if the system
1687
        has reached its end of life.
1688
    """
1689
    before = mastr_gdf.capacity.sum()
1690
1691
    mastr_gdf = mastr_gdf.assign(
1692
        age=scenario_timestamp - mastr_gdf.start_up_date
1693
    )
1694
1695
    mastr_gdf = mastr_gdf.assign(
1696
        end_of_life=pv_rooftop_lifetime < mastr_gdf.age
1697
    )
1698
1699
    after = mastr_gdf.loc[~mastr_gdf.end_of_life].capacity.sum()
1700
1701
    logger.debug(
1702
        f"Determined if pv rooftop systems reached their end of life.\nTotal "
1703
        f"capacity: {before}\nActive capacity: {after}"
1704
    )
1705
1706
    return mastr_gdf
1707
1708
1709
def calculate_max_pv_cap_per_building(
1710
    buildings_gdf: gpd.GeoDataFrame,
1711
    mastr_gdf: gpd.GeoDataFrame,
1712
    pv_cap_per_sq_m: float | int,
1713
    roof_factor: float | int,
1714
) -> gpd.GeoDataFrame:
1715
    """
1716
    Calculate the estimated maximum possible PV capacity per building.
1717
    Parameters
1718
    -----------
1719
    buildings_gdf : geopandas.GeoDataFrame
1720
        GeoDataFrame containing OSM buildings data.
1721
    mastr_gdf : geopandas.GeoDataFrame
1722
        GeoDataFrame containing geocoded MaStR data.
1723
    pv_cap_per_sq_m : float, int
1724
        Average expected, installable PV capacity per square meter.
1725
    roof_factor : float, int
1726
        Average for PV usable roof area share.
1727
    Returns
1728
    -------
1729
    geopandas.GeoDataFrame
1730
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1731
        capacity.
1732
    """
1733
    gdf = (
1734
        buildings_gdf.reset_index()
1735
        .rename(columns={"index": "id"})
1736
        .merge(
1737
            mastr_gdf[
1738
                [
1739
                    "capacity",
1740
                    "end_of_life",
1741
                    "building_id",
1742
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1743
                    "Hauptausrichtung",
1744
                    "HauptausrichtungNeigungswinkel",
1745
                ]
1746
            ],
1747
            how="left",
1748
            left_on="id",
1749
            right_on="building_id",
1750
        )
1751
        .set_index("id")
1752
        .drop(columns="building_id")
1753
    )
1754
1755
    return gdf.assign(
1756
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1757
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1758
        bus_id=gdf.bus_id.astype(int),
1759
    )
1760
1761
1762
def calculate_building_load_factor(
1763
    mastr_gdf: gpd.GeoDataFrame,
1764
    buildings_gdf: gpd.GeoDataFrame,
1765
    rounding: int = 4,
1766
) -> gpd.GeoDataFrame:
1767
    """
1768
    Calculate the roof load factor from existing PV systems.
1769
    Parameters
1770
    -----------
1771
    mastr_gdf : geopandas.GeoDataFrame
1772
        GeoDataFrame containing geocoded MaStR data.
1773
    buildings_gdf : geopandas.GeoDataFrame
1774
        GeoDataFrame containing OSM buildings data.
1775
    rounding : int
1776
        Rounding to use for load factor.
1777
    Returns
1778
    -------
1779
    geopandas.GeoDataFrame
1780
        GeoDataFrame containing geocoded MaStR data with calculated load
1781
        factor.
1782
    """
1783
    gdf = mastr_gdf.merge(
1784
        buildings_gdf[["max_cap", "building_area"]]
1785
        .loc[~buildings_gdf["max_cap"].isna()]
1786
        .reset_index(),
1787
        how="left",
1788
        left_on="building_id",
1789
        right_on="id",
1790
    ).set_index("id")
1791
1792
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1793
1794
1795
def get_probability_for_property(
1796
    mastr_gdf: gpd.GeoDataFrame,
1797
    cap_range: tuple[int | float, int | float],
1798
    prop: str,
1799
) -> tuple[np.array, np.array]:
1800
    """
1801
    Calculate the probability of the different options of a property of the
1802
    existing PV plants.
1803
    Parameters
1804
    -----------
1805
    mastr_gdf : geopandas.GeoDataFrame
1806
        GeoDataFrame containing geocoded MaStR data.
1807
    cap_range : tuple(int, int)
1808
        Capacity range of PV plants to look at.
1809
    prop : str
1810
        Property to calculate probabilities for. String needs to be in columns
1811
        of mastr_gdf.
1812
    Returns
1813
    -------
1814
    tuple
1815
        numpy.array
1816
            Unique values of property.
1817
        numpy.array
1818
            Probabilties per unique value.
1819
    """
1820
    cap_range_gdf = mastr_gdf.loc[
1821
        (mastr_gdf.capacity > cap_range[0])
1822
        & (mastr_gdf.capacity <= cap_range[1])
1823
    ]
1824
1825
    if prop == "load_factor":
1826
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1827
1828
    count = Counter(
1829
        cap_range_gdf[prop].loc[
1830
            ~cap_range_gdf[prop].isna()
1831
            & ~cap_range_gdf[prop].isnull()
1832
            & ~(cap_range_gdf[prop] == "None")
1833
        ]
1834
    )
1835
1836
    values = np.array(list(count.keys()))
1837
    probabilities = np.fromiter(count.values(), dtype=float)
1838
    probabilities = probabilities / np.sum(probabilities)
1839
1840
    return values, probabilities
1841
1842
1843
@timer_func
1844
def probabilities(
1845
    mastr_gdf: gpd.GeoDataFrame,
1846
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1847
    properties: list[str] | None = None,
1848
) -> dict:
1849
    """
1850
    Calculate the probability of the different options of properties of the
1851
    existing PV plants.
1852
    Parameters
1853
    -----------
1854
    mastr_gdf : geopandas.GeoDataFrame
1855
        GeoDataFrame containing geocoded MaStR data.
1856
    cap_ranges : list(tuple(int, int))
1857
        List of capacity ranges to distinguish between. The first tuple should
1858
        start with a zero and the last one should end with infinite.
1859
    properties : list(str)
1860
        List of properties to calculate probabilities for. Strings need to be
1861
        in columns of mastr_gdf.
1862
    Returns
1863
    -------
1864
    dict
1865
        Dictionary with values and probabilities per capacity range.
1866
    """
1867
    if cap_ranges is None:
1868
        cap_ranges = [
1869
            (0, 30),
1870
            (30, 100),
1871
            (100, float("inf")),
1872
        ]
1873
    if properties is None:
1874
        properties = [
1875
            "EinheitlicheAusrichtungUndNeigungswinkel",
1876
            "Hauptausrichtung",
1877
            "HauptausrichtungNeigungswinkel",
1878
            "load_factor",
1879
        ]
1880
1881
    prob_dict = {}
1882
1883
    for cap_range in cap_ranges:
1884
        prob_dict[cap_range] = {
1885
            "values": {},
1886
            "probabilities": {},
1887
        }
1888
1889
        for prop in properties:
1890
            v, p = get_probability_for_property(
1891
                mastr_gdf,
1892
                cap_range,
1893
                prop,
1894
            )
1895
1896
            prob_dict[cap_range]["values"][prop] = v
1897
            prob_dict[cap_range]["probabilities"][prop] = p
1898
1899
    return prob_dict
1900
1901
1902
def cap_share_per_cap_range(
1903
    mastr_gdf: gpd.GeoDataFrame,
1904
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1905
) -> dict[tuple[int | float, int | float], float]:
1906
    """
1907
    Calculate the share of PV capacity from the total PV capacity within
1908
    capacity ranges.
1909
    Parameters
1910
    -----------
1911
    mastr_gdf : geopandas.GeoDataFrame
1912
        GeoDataFrame containing geocoded MaStR data.
1913
    cap_ranges : list(tuple(int, int))
1914
        List of capacity ranges to distinguish between. The first tuple should
1915
        start with a zero and the last one should end with infinite.
1916
    Returns
1917
    -------
1918
    dict
1919
        Dictionary with share of PV capacity from the total PV capacity within
1920
        capacity ranges.
1921
    """
1922
    if cap_ranges is None:
1923
        cap_ranges = [
1924
            (0, 30),
1925
            (30, 100),
1926
            (100, float("inf")),
1927
        ]
1928
1929
    cap_share_dict = {}
1930
1931
    total_cap = mastr_gdf.capacity.sum()
1932
1933
    for cap_range in cap_ranges:
1934
        cap_share = (
1935
            mastr_gdf.loc[
1936
                (mastr_gdf.capacity > cap_range[0])
1937
                & (mastr_gdf.capacity <= cap_range[1])
1938
            ].capacity.sum()
1939
            / total_cap
1940
        )
1941
1942
        cap_share_dict[cap_range] = cap_share
1943
1944
    return cap_share_dict
1945
1946
1947
def mean_load_factor_per_cap_range(
1948
    mastr_gdf: gpd.GeoDataFrame,
1949
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1950
) -> dict[tuple[int | float, int | float], float]:
1951
    """
1952
    Calculate the mean roof load factor per capacity range from existing PV
1953
    plants.
1954
    Parameters
1955
    -----------
1956
    mastr_gdf : geopandas.GeoDataFrame
1957
        GeoDataFrame containing geocoded MaStR data.
1958
    cap_ranges : list(tuple(int, int))
1959
        List of capacity ranges to distinguish between. The first tuple should
1960
        start with a zero and the last one should end with infinite.
1961
    Returns
1962
    -------
1963
    dict
1964
        Dictionary with mean roof load factor per capacity range.
1965
    """
1966
    if cap_ranges is None:
1967
        cap_ranges = [
1968
            (0, 30),
1969
            (30, 100),
1970
            (100, float("inf")),
1971
        ]
1972
1973
    load_factor_dict = {}
1974
1975
    for cap_range in cap_ranges:
1976
        load_factor = mastr_gdf.loc[
1977
            (mastr_gdf.load_factor <= 1)
1978
            & (mastr_gdf.capacity > cap_range[0])
1979
            & (mastr_gdf.capacity <= cap_range[1])
1980
        ].load_factor.mean()
1981
1982
        load_factor_dict[cap_range] = load_factor
1983
1984
    return load_factor_dict
1985
1986
1987
def building_area_range_per_cap_range(
1988
    mastr_gdf: gpd.GeoDataFrame,
1989
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1990
    min_building_size: int | float = 10.0,
1991
    upper_quantile: float = 0.95,
1992
    lower_quantile: float = 0.05,
1993
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1994
    """
1995
    Estimate normal building area range per capacity range.
1996
    Calculate the mean roof load factor per capacity range from existing PV
1997
    plants.
1998
    Parameters
1999
    -----------
2000
    mastr_gdf : geopandas.GeoDataFrame
2001
        GeoDataFrame containing geocoded MaStR data.
2002
    cap_ranges : list(tuple(int, int))
2003
        List of capacity ranges to distinguish between. The first tuple should
2004
        start with a zero and the last one should end with infinite.
2005
    min_building_size : int, float
2006
        Minimal building size to consider for PV plants.
2007
    upper_quantile : float
2008
        Upper quantile to estimate maximum building size per capacity range.
2009
    lower_quantile : float
2010
        Lower quantile to estimate minimum building size per capacity range.
2011
    Returns
2012
    -------
2013
    dict
2014
        Dictionary with estimated normal building area range per capacity
2015
        range.
2016
    """
2017
    if cap_ranges is None:
2018
        cap_ranges = [
2019
            (0, 30),
2020
            (30, 100),
2021
            (100, float("inf")),
2022
        ]
2023
2024
    building_area_range_dict = {}
2025
2026
    n_ranges = len(cap_ranges)
2027
2028
    for count, cap_range in enumerate(cap_ranges):
2029
        cap_range_gdf = mastr_gdf.loc[
2030
            (mastr_gdf.capacity > cap_range[0])
2031
            & (mastr_gdf.capacity <= cap_range[1])
2032
        ]
2033
2034
        if count == 0:
2035
            building_area_range_dict[cap_range] = (
2036
                min_building_size,
2037
                cap_range_gdf.building_area.quantile(upper_quantile),
2038
            )
2039
        elif count == n_ranges - 1:
2040
            building_area_range_dict[cap_range] = (
2041
                cap_range_gdf.building_area.quantile(lower_quantile),
2042
                float("inf"),
2043
            )
2044
        else:
2045
            building_area_range_dict[cap_range] = (
2046
                cap_range_gdf.building_area.quantile(lower_quantile),
2047
                cap_range_gdf.building_area.quantile(upper_quantile),
2048
            )
2049
2050
    values = list(building_area_range_dict.values())
2051
2052
    building_area_range_normed_dict = {}
2053
2054
    for count, (cap_range, (min_area, max_area)) in enumerate(
2055
        building_area_range_dict.items()
2056
    ):
2057
        if count == 0:
2058
            building_area_range_normed_dict[cap_range] = (
2059
                min_area,
2060
                np.mean((values[count + 1][0], max_area)),
2061
            )
2062
        elif count == n_ranges - 1:
2063
            building_area_range_normed_dict[cap_range] = (
2064
                np.mean((values[count - 1][1], min_area)),
2065
                max_area,
2066
            )
2067
        else:
2068
            building_area_range_normed_dict[cap_range] = (
2069
                np.mean((values[count - 1][1], min_area)),
2070
                np.mean((values[count + 1][0], max_area)),
2071
            )
2072
2073
    return building_area_range_normed_dict
2074
2075
2076
@timer_func
2077
def desaggregate_pv_in_mv_grid(
2078
    buildings_gdf: gpd.GeoDataFrame,
2079
    pv_cap: float | int,
2080
    **kwargs,
2081
) -> gpd.GeoDataFrame:
2082
    """
2083
    Desaggregate PV capacity on buildings within a given grid district.
2084
    Parameters
2085
    -----------
2086
    buildings_gdf : geopandas.GeoDataFrame
2087
        GeoDataFrame containing buildings within the grid district.
2088
    pv_cap : float, int
2089
        PV capacity to desaggregate.
2090
    Other Parameters
2091
    -----------
2092
    prob_dict : dict
2093
        Dictionary with values and probabilities per capacity range.
2094
    cap_share_dict : dict
2095
        Dictionary with share of PV capacity from the total PV capacity within
2096
        capacity ranges.
2097
    building_area_range_dict : dict
2098
        Dictionary with estimated normal building area range per capacity
2099
        range.
2100
    load_factor_dict : dict
2101
        Dictionary with mean roof load factor per capacity range.
2102
    seed : int
2103
        Seed to use for random operations with NumPy and pandas.
2104
    pv_cap_per_sq_m : float, int
2105
        Average expected, installable PV capacity per square meter.
2106
    Returns
2107
    -------
2108
    geopandas.GeoDataFrame
2109
        GeoDataFrame containing OSM building data with desaggregated PV
2110
        plants.
2111
    """
2112
    bus_id = int(buildings_gdf.bus_id.iat[0])
2113
2114
    rng = default_rng(seed=kwargs["seed"])
2115
    random_state = RandomState(seed=kwargs["seed"])
2116
2117
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2118
2119
    for cap_range, share in kwargs["cap_share_dict"].items():
2120
        pv_cap_range = pv_cap * share
2121
2122
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2123
2124
        cap_range_buildings_gdf = buildings_gdf.loc[
2125
            ~buildings_gdf.index.isin(results_df.index)
2126
            & (buildings_gdf.building_area > b_area_min)
2127
            & (buildings_gdf.building_area <= b_area_max)
2128
        ]
2129
2130
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2131
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2132
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2133
            load_factor=np.nan,
2134
            capacity=np.nan,
2135
        )
2136
2137
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2138
2139
        if total_mean_cap == 0:
2140
            logger.warning(
2141
                f"There are no matching roof for capacity range {cap_range} "
2142
                f"kW in grid {bus_id}. Using all buildings as fallback."
2143
            )
2144
2145
            cap_range_buildings_gdf = buildings_gdf.loc[
2146
                ~buildings_gdf.index.isin(results_df.index)
2147
            ]
2148
2149
            if len(cap_range_buildings_gdf) == 0:
2150
                logger.warning(
2151
                    "There are no roofes available for capacity range "
2152
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2153
                )
2154
                cap_range_buildings_gdf = buildings_gdf.copy()
2155
2156
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2157
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2158
                load_factor=np.nan,
2159
                capacity=np.nan,
2160
            )
2161
2162
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2163
2164
        elif total_mean_cap < pv_cap_range:
2165
            logger.warning(
2166
                f"Average roof utilization of the roof area in grid {bus_id} "
2167
                f"and capacity range {cap_range} kW is not sufficient. The "
2168
                "roof utilization will be above average."
2169
            )
2170
2171
        frac = max(
2172
            pv_cap_range / total_mean_cap,
2173
            1 / len(cap_range_buildings_gdf),
2174
        )
2175
2176
        samples_gdf = cap_range_buildings_gdf.sample(
2177
            frac=min(1, frac),
2178
            random_state=random_state,
2179
        )
2180
2181
        cap_range_dict = kwargs["prob_dict"][cap_range]
2182
2183
        values_dict = cap_range_dict["values"]
2184
        p_dict = cap_range_dict["probabilities"]
2185
2186
        load_factors = rng.choice(
2187
            a=values_dict["load_factor"],
2188
            size=len(samples_gdf),
2189
            p=p_dict["load_factor"],
2190
        )
2191
2192
        samples_gdf = samples_gdf.assign(
2193
            load_factor=load_factors,
2194
            capacity=(
2195
                samples_gdf.building_area
2196
                * load_factors
2197
                * kwargs["pv_cap_per_sq_m"]
2198
            ).clip(lower=0.4),
2199
        )
2200
2201
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2202
2203
        samples_gdf = samples_gdf.assign(
2204
            capacity=(samples_gdf.capacity * missing_factor),
2205
            load_factor=(samples_gdf.load_factor * missing_factor),
2206
        )
2207
2208
        assert np.isclose(
2209
            samples_gdf.capacity.sum(),
2210
            pv_cap_range,
2211
            rtol=1e-03,
2212
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2213
2214
        results_df = pd.concat(
2215
            [
2216
                results_df,
2217
                samples_gdf,
2218
            ],
2219
        )
2220
2221
    total_missing_factor = pv_cap / results_df.capacity.sum()
2222
2223
    results_df = results_df.assign(
2224
        capacity=(results_df.capacity * total_missing_factor),
2225
    )
2226
2227
    assert np.isclose(
2228
        results_df.capacity.sum(),
2229
        pv_cap,
2230
        rtol=1e-03,
2231
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2232
2233
    return gpd.GeoDataFrame(
2234
        results_df,
2235
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2119 is not entered. Are you sure this can never be the case?
Loading history...
2236
        geometry="geom",
2237
    )
2238
2239
2240
@timer_func
2241
def desaggregate_pv(
2242
    buildings_gdf: gpd.GeoDataFrame,
2243
    cap_df: pd.DataFrame,
2244
    **kwargs,
2245
) -> gpd.GeoDataFrame:
2246
    """
2247
    Desaggregate PV capacity on buildings within a given grid district.
2248
    Parameters
2249
    -----------
2250
    buildings_gdf : geopandas.GeoDataFrame
2251
        GeoDataFrame containing OSM buildings data.
2252
    cap_df : pandas.DataFrame
2253
        DataFrame with total rooftop capacity per mv grid.
2254
    Other Parameters
2255
    -----------
2256
    prob_dict : dict
2257
        Dictionary with values and probabilities per capacity range.
2258
    cap_share_dict : dict
2259
        Dictionary with share of PV capacity from the total PV capacity within
2260
        capacity ranges.
2261
    building_area_range_dict : dict
2262
        Dictionary with estimated normal building area range per capacity
2263
        range.
2264
    load_factor_dict : dict
2265
        Dictionary with mean roof load factor per capacity range.
2266
    seed : int
2267
        Seed to use for random operations with NumPy and pandas.
2268
    pv_cap_per_sq_m : float, int
2269
        Average expected, installable PV capacity per square meter.
2270
    Returns
2271
    -------
2272
    geopandas.GeoDataFrame
2273
        GeoDataFrame containing OSM building data with desaggregated PV
2274
        plants.
2275
    """
2276
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2277
2278
    building_bus_ids = set(buildings_gdf.bus_id)
2279
    cap_bus_ids = set(cap_df.index)
2280
2281
    logger.debug(
2282
        f"Bus IDs from buildings: {len(building_bus_ids)}\nBus IDs from "
2283
        f"capacity: {len(cap_bus_ids)}"
2284
    )
2285
2286
    if len(building_bus_ids) > len(cap_bus_ids):
2287
        missing = building_bus_ids - cap_bus_ids
2288
    else:
2289
        missing = cap_bus_ids - building_bus_ids
2290
2291
    logger.debug(str(missing))
2292
2293
    bus_ids = np.intersect1d(list(building_bus_ids), list(cap_bus_ids))
2294
2295
    # assert set(buildings_gdf.bus_id.unique()) == set(cap_df.index)
2296
2297
    for bus_id in bus_ids:
2298
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2299
2300
        pv_installed_gdf = buildings_grid_gdf.loc[
2301
            ~buildings_grid_gdf.end_of_life
2302
        ]
2303
2304
        pv_installed = pv_installed_gdf.capacity.sum()
2305
2306
        pot_buildings_gdf = buildings_grid_gdf.drop(
2307
            index=pv_installed_gdf.index
2308
        )
2309
2310
        if len(pot_buildings_gdf) == 0:
2311
            logger.error(
2312
                f"In grid {bus_id} there are no potential buildings to "
2313
                f"allocate PV capacity to. The grid is skipped. This message "
2314
                f"should only appear doing test runs with few buildings."
2315
            )
2316
2317
            continue
2318
2319
        pv_target = cap_df.at[bus_id, "capacity"] * 1000
2320
2321
        logger.debug(f"pv_target: {pv_target}")
2322
2323
        pv_missing = pv_target - pv_installed
2324
2325
        if pv_missing <= 0:
2326
            logger.warning(
2327
                f"In grid {bus_id} there is more PV installed "
2328
                f"({pv_installed: g} kW) in status Quo than allocated within "
2329
                f"the scenario ({pv_target: g} kW). "
2330
                f"No new generators are added."
2331
            )
2332
2333
            continue
2334
2335
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2336
            logger.error(
2337
                f"In grid {bus_id} there is less PV potential ("
2338
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV "
2339
                f"capacity ({pv_missing:g} kW). The average roof utilization "
2340
                f"will be very high."
2341
            )
2342
2343
        gdf = desaggregate_pv_in_mv_grid(
2344
            buildings_gdf=pot_buildings_gdf,
2345
            pv_cap=pv_missing,
2346
            **kwargs,
2347
        )
2348
2349
        logger.debug(f"New cap in grid {bus_id}: {gdf.capacity.sum()}")
2350
        logger.debug(f"Installed cap in grid {bus_id}: {pv_installed}")
2351
        logger.debug(
2352
            f"Total cap in grid {bus_id}: {gdf.capacity.sum() + pv_installed}"
2353
        )
2354
2355
        if not np.isclose(
2356
            gdf.capacity.sum() + pv_installed, pv_target, rtol=1e-3
2357
        ):
2358
            logger.warning(
2359
                f"The desired capacity and actual capacity in grid {bus_id} "
2360
                f"differ.\n"
2361
                f"Desired cap: {pv_target}\nActual cap: "
2362
                f"{gdf.capacity.sum() + pv_installed}"
2363
            )
2364
2365
        pre_cap = allocated_buildings_gdf.capacity.sum()
2366
        new_cap = gdf.capacity.sum()
2367
2368
        allocated_buildings_gdf = pd.concat(
2369
            [
2370
                allocated_buildings_gdf,
2371
                gdf,
2372
            ]
2373
        )
2374
2375
        total_cap = allocated_buildings_gdf.capacity.sum()
2376
2377
        assert np.isclose(pre_cap + new_cap, total_cap)
2378
2379
    logger.debug("Desaggregated scenario.")
2380
    logger.debug(f"Scenario capacity: {cap_df.capacity.sum(): g}")
2381
    logger.debug(
2382
        f"Generator capacity: "
2383
        f"{allocated_buildings_gdf.capacity.sum() / 1000: g}"
2384
    )
2385
2386
    return gpd.GeoDataFrame(
2387
        allocated_buildings_gdf,
2388
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2389
        geometry="geom",
2390
    )
2391
2392
2393
@timer_func
2394
def add_buildings_meta_data(
2395
    buildings_gdf: gpd.GeoDataFrame,
2396
    prob_dict: dict,
2397
    seed: int,
2398
) -> gpd.GeoDataFrame:
2399
    """
2400
    Randomly add additional metadata to desaggregated PV plants.
2401
    Parameters
2402
    -----------
2403
    buildings_gdf : geopandas.GeoDataFrame
2404
        GeoDataFrame containing OSM buildings data with desaggregated PV
2405
        plants.
2406
    prob_dict : dict
2407
        Dictionary with values and probabilities per capacity range.
2408
    seed : int
2409
        Seed to use for random operations with NumPy and pandas.
2410
    Returns
2411
    -------
2412
    geopandas.GeoDataFrame
2413
        GeoDataFrame containing OSM building data with desaggregated PV
2414
        plants.
2415
    """
2416
    rng = default_rng(seed=seed)
2417
    buildings_gdf = buildings_gdf.reset_index().rename(
2418
        columns={
2419
            "index": "building_id",
2420
        }
2421
    )
2422
2423
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2424
        cap_range_gdf = buildings_gdf.loc[
2425
            (buildings_gdf.capacity >= min_cap)
2426
            & (buildings_gdf.capacity < max_cap)
2427
        ]
2428
2429
        for key, values in cap_range_prob_dict["values"].items():
2430
            if key == "load_factor":
2431
                continue
2432
2433
            gdf = cap_range_gdf.loc[
2434
                cap_range_gdf[key].isna()
2435
                | cap_range_gdf[key].isnull()
2436
                | (cap_range_gdf[key] == "None")
2437
            ]
2438
2439
            key_vals = rng.choice(
2440
                a=values,
2441
                size=len(gdf),
2442
                p=cap_range_prob_dict["probabilities"][key],
2443
            )
2444
2445
            buildings_gdf.loc[gdf.index, key] = key_vals
2446
2447
    return buildings_gdf
2448
2449
2450
def add_voltage_level(
2451
    buildings_gdf: gpd.GeoDataFrame,
2452
) -> gpd.GeoDataFrame:
2453
    """
2454
    Get voltage level data from mastr table and assign to units. Infer missing
2455
    values derived from generator capacity to the power plants.
2456
2457
    Parameters
2458
    -----------
2459
    buildings_gdf : geopandas.GeoDataFrame
2460
        GeoDataFrame containing OSM buildings data with desaggregated PV
2461
        plants.
2462
    Returns
2463
    -------
2464
    geopandas.GeoDataFrame
2465
        GeoDataFrame containing OSM building data with voltage level per
2466
        generator.
2467
    """
2468
2469 View Code Duplication
    def voltage_levels(p: float) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
2470
        if p <= 100:
2471
            return 7
2472
        elif p <= 200:
2473
            return 6
2474
        elif p <= 5500:
2475
            return 5
2476
        elif p <= 20000:
2477
            return 4
2478
        elif p <= 120000:
2479
            return 3
2480
        return 1
2481
2482
    # Join mastr table
2483
    with db.session_scope() as session:
2484
        query = session.query(
2485
            EgonPowerPlantsPv.gens_id,
2486
            EgonPowerPlantsPv.voltage_level,
2487
        )
2488
    voltage_levels_df = pd.read_sql(
2489
        query.statement, query.session.bind, index_col=None
2490
    )
2491
    buildings_gdf = buildings_gdf.merge(
2492
        voltage_levels_df,
2493
        left_on="gens_id",
2494
        right_on="gens_id",
2495
        how="left",
2496
    )
2497
2498
    # Infer missing values
2499
    mask = buildings_gdf.voltage_level.isna()
2500
    buildings_gdf.loc[mask, "voltage_level"] = buildings_gdf.loc[
2501
        mask
2502
    ].capacity.apply(voltage_levels)
2503
2504
    return buildings_gdf
2505
2506
2507
def add_start_up_date(
2508
    buildings_gdf: gpd.GeoDataFrame,
2509
    start: pd.Timestamp,
2510
    end: pd.Timestamp,
2511
    seed: int,
2512
):
2513
    """
2514
    Randomly and linear add start-up date to new pv generators.
2515
    Parameters
2516
    ----------
2517
    buildings_gdf : geopandas.GeoDataFrame
2518
        GeoDataFrame containing OSM buildings data with desaggregated PV
2519
        plants.
2520
    start : pandas.Timestamp
2521
        Minimum Timestamp to use.
2522
    end : pandas.Timestamp
2523
        Maximum Timestamp to use.
2524
    seed : int
2525
        Seed to use for random operations with NumPy and pandas.
2526
    Returns
2527
    -------
2528
    geopandas.GeoDataFrame
2529
        GeoDataFrame containing OSM buildings data with start-up date added.
2530
    """
2531
    rng = default_rng(seed=seed)
2532
2533
    date_range = pd.date_range(start=start, end=end, freq="1D")
2534
2535
    return buildings_gdf.assign(
2536
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2537
    )
2538
2539
2540
@timer_func
2541
def allocate_scenarios(
2542
    mastr_gdf: gpd.GeoDataFrame,
2543
    valid_buildings_gdf: gpd.GeoDataFrame,
2544
    last_scenario_gdf: gpd.GeoDataFrame,
2545
    scenario: str,
2546
):
2547
    """
2548
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2549
    Parameters
2550
    ----------
2551
    mastr_gdf : geopandas.GeoDataFrame
2552
        GeoDataFrame containing geocoded MaStR data.
2553
    valid_buildings_gdf : geopandas.GeoDataFrame
2554
        GeoDataFrame containing OSM buildings data.
2555
    last_scenario_gdf : geopandas.GeoDataFrame
2556
        GeoDataFrame containing OSM buildings matched with pv generators from
2557
        temporally preceding scenario.
2558
    scenario : str
2559
        Scenario to desaggrgate and allocate.
2560
    Returns
2561
    -------
2562
    tuple
2563
        geopandas.GeoDataFrame
2564
            GeoDataFrame containing OSM buildings matched with pv generators.
2565
        pandas.DataFrame
2566
            DataFrame containing pv rooftop capacity per grid id.
2567
    """
2568
    cap_per_bus_id_df = cap_per_bus_id(scenario)
2569
2570
    logger.debug(
2571
        f"cap_per_bus_id_df total capacity: {cap_per_bus_id_df.capacity.sum()}"
2572
    )
2573
2574
    last_scenario_gdf = determine_end_of_life_gens(
2575
        last_scenario_gdf,
2576
        SCENARIO_TIMESTAMP[scenario],
2577
        PV_ROOFTOP_LIFETIME,
2578
    )
2579
2580
    buildings_gdf = calculate_max_pv_cap_per_building(
2581
        valid_buildings_gdf,
2582
        last_scenario_gdf,
2583
        PV_CAP_PER_SQ_M,
2584
        ROOF_FACTOR,
2585
    )
2586
2587
    mastr_gdf = calculate_building_load_factor(
2588
        mastr_gdf,
2589
        buildings_gdf,
2590
    )
2591
2592
    probabilities_dict = probabilities(
2593
        mastr_gdf,
2594
        cap_ranges=CAP_RANGES,
2595
    )
2596
2597
    cap_share_dict = cap_share_per_cap_range(
2598
        mastr_gdf,
2599
        cap_ranges=CAP_RANGES,
2600
    )
2601
2602
    load_factor_dict = mean_load_factor_per_cap_range(
2603
        mastr_gdf,
2604
        cap_ranges=CAP_RANGES,
2605
    )
2606
2607
    building_area_range_dict = building_area_range_per_cap_range(
2608
        mastr_gdf,
2609
        cap_ranges=CAP_RANGES,
2610
        min_building_size=MIN_BUILDING_SIZE,
2611
        upper_quantile=UPPER_QUNATILE,
2612
        lower_quantile=LOWER_QUANTILE,
2613
    )
2614
2615
    allocated_buildings_gdf = desaggregate_pv(
2616
        buildings_gdf=buildings_gdf,
2617
        cap_df=cap_per_bus_id_df,
2618
        prob_dict=probabilities_dict,
2619
        cap_share_dict=cap_share_dict,
2620
        building_area_range_dict=building_area_range_dict,
2621
        load_factor_dict=load_factor_dict,
2622
        seed=SEED,
2623
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2624
    )
2625
2626
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2627
2628
    meta_buildings_gdf = frame_to_numeric(
2629
        add_buildings_meta_data(
2630
            allocated_buildings_gdf,
2631
            probabilities_dict,
2632
            SEED,
2633
        )
2634
    )
2635
2636
    return (
2637
        add_start_up_date(
2638
            meta_buildings_gdf,
2639
            start=last_scenario_gdf.start_up_date.max(),
2640
            end=SCENARIO_TIMESTAMP[scenario],
2641
            seed=SEED,
2642
        ),
2643
        cap_per_bus_id_df,
2644
    )
2645
2646
2647
class EgonPowerPlantPvRoofBuildingScenario(Base):
2648
    __tablename__ = "egon_power_plants_pv_roof_building"
2649
    __table_args__ = {"schema": "supply"}
2650
2651
    index = Column(Integer, primary_key=True, index=True)
2652
    scenario = Column(String)
2653
    bus_id = Column(Integer, nullable=True)
2654
    building_id = Column(Integer)
2655
    gens_id = Column(String, nullable=True)
2656
    capacity = Column(Float)
2657
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2658
    hauptausrichtung = Column(String)
2659
    hauptausrichtung_neigungswinkel = Column(String)
2660
    voltage_level = Column(Integer)
2661
    weather_cell_id = Column(Integer)
2662
2663
2664
def create_scenario_table(buildings_gdf):
2665
    """Create mapping table pv_unit <-> building for scenario"""
2666
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2667
        bind=engine, checkfirst=True
2668
    )
2669
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2670
        bind=engine, checkfirst=True
2671
    )
2672
2673
    buildings_gdf.rename(columns=COLS_TO_RENAME).assign(
2674
        capacity=buildings_gdf.capacity.div(10**3)  # kW -> MW
2675
    )[COLS_TO_EXPORT].reset_index().to_sql(
2676
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2677
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2678
        con=db.engine(),
2679
        if_exists="append",
2680
        index=False,
2681
    )
2682
2683
2684
def geocode_mastr_data():
2685
    """
2686
    Read PV rooftop data from MaStR CSV
2687
    TODO: the source will be replaced as soon as the MaStR data is available
2688
     in DB.
2689
    """
2690
    mastr_df = mastr_data(
2691
        MASTR_INDEX_COL,
2692
        MASTR_RELEVANT_COLS,
2693
        MASTR_DTYPES,
2694
        MASTR_PARSE_DATES,
2695
    )
2696
2697
    clean_mastr_df = clean_mastr_data(
2698
        mastr_df,
2699
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2700
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2701
        seed=SEED,
2702
        rounding=ROUNDING,
2703
    )
2704
2705
    geocoding_df = geocoding_data(clean_mastr_df)
2706
2707
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2708
2709
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2710
2711
    create_geocoded_table(geocode_gdf)
2712
2713
2714
def add_weather_cell_id(buildings_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
2715
    sql = """
2716
    SELECT building_id, zensus_population_id
2717
    FROM boundaries.egon_map_zensus_mvgd_buildings
2718
    """
2719
2720
    buildings_gdf = buildings_gdf.merge(
2721
        right=db.select_dataframe(sql).drop_duplicates(subset="building_id"),
2722
        how="left",
2723
        on="building_id",
2724
    )
2725
2726
    sql = """
2727
    SELECT zensus_population_id, w_id as weather_cell_id
2728
    FROM boundaries.egon_map_zensus_weather_cell
2729
    """
2730
2731
    buildings_gdf = buildings_gdf.merge(
2732
        right=db.select_dataframe(sql).drop_duplicates(
2733
            subset="zensus_population_id"
2734
        ),
2735
        how="left",
2736
        on="zensus_population_id",
2737
    )
2738
2739
    if buildings_gdf.weather_cell_id.isna().any():
2740
        missing = buildings_gdf.loc[
2741
            buildings_gdf.weather_cell_id.isna()
2742
        ].building_id.tolist()
2743
2744
        raise ValueError(
2745
            f"Following buildings don't have a weather cell id: {missing}"
2746
        )
2747
2748
    return buildings_gdf
2749
2750
2751
def add_bus_ids_sq(buildings_gdf: gpd.GeoDataFrame,) -> gpd.GeoDataFrame:
2752
    """Add bus ids for status_quo units
2753
2754
    Parameters
2755
    -----------
2756
    buildings_gdf : geopandas.GeoDataFrame
2757
        GeoDataFrame containing OSM buildings data with desaggregated PV
2758
        plants.
2759
    Returns
2760
    -------
2761
    geopandas.GeoDataFrame
2762
        GeoDataFrame containing OSM building data with bus_id per
2763
        generator.
2764
    """
2765
    grid_districts_gdf = grid_districts(EPSG)
2766
2767
    mask = buildings_gdf.scenario == "status_quo"
2768
    buildings_gdf.loc[mask, "bus_id"] = (
2769
        buildings_gdf.loc[mask]
2770
        .sjoin(grid_districts_gdf, how="left")
2771
        .index_right
2772
    )
2773
2774
    return buildings_gdf
2775
2776
2777
def pv_rooftop_to_buildings():
2778
    """Main script, executed as task"""
2779
2780
    mastr_gdf = load_mastr_data()
2781
2782
    buildings_gdf = load_building_data()
2783
2784
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2785
        mastr_gdf, buildings_gdf
2786
    )
2787
2788
    all_buildings_gdf = (
2789
        desagg_mastr_gdf.assign(scenario="status_quo")
2790
        .reset_index()
2791
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2792
    )
2793
2794
    scenario_buildings_gdf = all_buildings_gdf.copy()
2795
2796
    cap_per_bus_id_df = pd.DataFrame()
2797
2798
    for scenario in SCENARIOS:
2799
        logger.debug(f"Desaggregating scenario {scenario}.")
2800
        (
2801
            scenario_buildings_gdf,
2802
            cap_per_bus_id_scenario_df,
2803
        ) = allocate_scenarios(  # noqa: F841
2804
            desagg_mastr_gdf,
2805
            desagg_buildings_gdf,
2806
            scenario_buildings_gdf,
2807
            scenario,
2808
        )
2809
2810
        all_buildings_gdf = gpd.GeoDataFrame(
2811
            pd.concat(
2812
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2813
            ),
2814
            crs=scenario_buildings_gdf.crs,
2815
            geometry="geom",
2816
        )
2817
2818
        cap_per_bus_id_df = pd.concat(
2819
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2820
        )
2821
2822
    # add weather cell
2823
    all_buildings_gdf = add_weather_cell_id(all_buildings_gdf)
2824
2825
    # add bus IDs for status quo scenario
2826
    all_buildings_gdf = add_bus_ids_sq(all_buildings_gdf)
2827
2828
    # export scenario
2829
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2830