Passed
Pull Request — dev (#809)
by
unknown
01:47
created

data.datasets.power_plants.pv_rooftop_buildings   F

Complexity

Total Complexity 114

Size/Duplication

Total Lines 2618
Duplicated Lines 1.22 %

Importance

Changes 0
Metric Value
wmc 114
eloc 1177
dl 32
loc 2618
rs 0.892
c 0
b 0
f 0

52 Functions

Rating   Name   Duplication   Size   Complexity  
A load_building_data() 0 47 2
A synthetic_buildings() 0 25 2
A geocoding_data() 0 18 1
A geocoded_data_from_db() 0 23 2
A add_ags_to_gens() 0 24 1
A drop_buildings_outside_muns() 0 25 1
B clean_mastr_data() 0 143 2
A mastr_data() 0 60 2
A geocode_data() 0 46 3
B zip_and_municipality_from_standort() 0 50 7
A merge_geocode_with_mastr() 0 27 1
A municipality_data() 0 14 2
A osm_buildings() 0 25 2
A add_ags_to_buildings() 0 24 1
A sort_and_qcut_df() 0 29 1
A create_geocoded_table() 0 17 1
A load_mastr_data() 0 35 1
A drop_gens_outside_muns() 0 25 1
A geocoder() 0 22 1
A drop_invalid_entries_from_gdf() 0 25 1
A most_plausible() 0 41 4
A grid_districts() 0 32 1
A timer_func() 0 11 1
A get_probability_for_property() 0 46 2
A calculate_building_load_factor() 0 30 1
A determine_end_of_life_gens() 0 28 1
A add_start_up_date() 0 30 1
A buildings_area_per_overlay_id() 0 33 1
B add_voltage_level() 0 31 6
A add_overlay_id_to_buildings() 0 31 1
A cap_per_bus_id() 0 36 2
B desaggregate_pv_in_mv_grid() 0 151 5
A calculate_max_pv_cap_per_building() 0 49 1
A scenario_data() 0 30 2
A drop_buildings_outside_grids() 0 25 1
C building_area_range_per_cap_range() 0 87 8
A add_buildings_meta_data() 0 55 4
B allocate_pv() 0 125 6
A federal_state_data() 0 24 2
A geocode_mastr_data() 0 28 1
A mean_load_factor_per_cap_range() 0 38 3
B desaggregate_pv() 0 104 5
A frame_to_numeric() 0 20 4
A pv_rooftop_to_buildings() 0 47 2
A allocate_to_buildings() 0 32 1
A overlay_grid_districts_with_counties() 0 33 1
B allocate_scenarios() 0 124 1
A validate_output() 0 50 1
A create_scenario_table() 0 17 1
A cap_share_per_cap_range() 0 43 3
A drop_unallocated_gens() 0 27 1
B probabilities() 0 57 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like data.datasets.power_plants.pv_rooftop_buildings often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Distribute MaStR PV rooftop capacities to OSM and synthetic buildings. Generate new
3
PV rooftop generators for scenarios eGon2035 and eGon100RE.
4
Data cleaning: Drop duplicates and entries with missing critical data. Determine most
5
plausible capacity from multiple values given in MaStR data. Drop generators which don't
6
have any plausible capacity data (23.5MW > P > 0.1). Randomly and weighted add a
7
start-up date if it is missing. Extract zip and municipality from 'Standort' given in
8
MaStR data. Geocode unique zip and municipality combinations with Nominatim (1sec
9
delay). Drop generators for which geocoding failed or which are located outside the
10
municipalities of Germany. Add some visual sanity checks for cleaned data.
11
Allocation of MaStR data: Allocate each generator to an existing building from OSM.
12
Determine the quantile each generator and building is in depending on the capacity of
13
the generator and the area of the polygon of the building. Randomly distribute
14
generators within each municipality preferably within the same building area quantile as
15
the generators are capacity wise. If not enough buildings exists within a municipality
16
and quantile additional buildings from other quantiles are chosen randomly.
17
Desegregation of pv rooftop scenarios: The scenario data per federal state is linear
18
distributed to the mv grid districts according to the pv rooftop potential per mv grid
19
district. The rooftop potential is estimated from the building area given from the OSM
20
buildings. Grid districts, which are located in several federal states, are allocated PV
21
capacity according to their respective roof potential in the individual federal states.
22
The desegregation of PV plants within a grid districts respects existing plants from
23
MaStR, which did not reach their end of life. New PV plants are randomly and weighted
24
generated using a breakdown of MaStR data as generator basis. Plant metadata (e.g. plant
25
orientation) is also added random and weighted from MaStR data as basis.
26
"""
27
from __future__ import annotations
28
29
from collections import Counter
30
from functools import wraps
31
from pathlib import Path
32
from time import perf_counter
33
from typing import Any
34
35
from geoalchemy2 import Geometry
36
from geopy.extra.rate_limiter import RateLimiter
37
from geopy.geocoders import Nominatim
38
from loguru import logger
39
from numpy.random import RandomState, default_rng
40
from pyproj.crs.crs import CRS
41
from sqlalchemy import BigInteger, Column, Float, Integer, String
42
from sqlalchemy.dialects.postgresql import HSTORE
43
from sqlalchemy.ext.declarative import declarative_base
44
import geopandas as gpd
45
import numpy as np
46
import pandas as pd
47
48
from egon.data import config, db
49
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
50
    OsmBuildingsSynthetic,
51
)
52
from egon.data.datasets.scenario_capacities import EgonScenarioCapacities
53
from egon.data.datasets.zensus_vg250 import Vg250Gem
54
55
engine = db.engine()
56
Base = declarative_base()
57
SEED = int(config.settings()["egon-data"]["--random-seed"])
58
59
# TODO: move to yml
60
# mastr data
61
MASTR_RELEVANT_COLS = [
62
    "EinheitMastrNummer",
63
    "Bruttoleistung",
64
    "StatisikFlag",
65
    "Bruttoleistung_extended",
66
    "Nettonennleistung",
67
    "InstallierteLeistung",
68
    "zugeordneteWirkleistungWechselrichter",
69
    "EinheitBetriebsstatus",
70
    "Standort",
71
    "Bundesland",
72
    "Land",
73
    "Landkreis",
74
    "Gemeinde",
75
    "Postleitzahl",
76
    "Ort",
77
    "GeplantesInbetriebnahmedatum",
78
    "Inbetriebnahmedatum",
79
    "GemeinsamerWechselrichterMitSpeicher",
80
    "Lage",
81
    "Leistungsbegrenzung",
82
    "EinheitlicheAusrichtungUndNeigungswinkel",
83
    "Hauptausrichtung",
84
    "HauptausrichtungNeigungswinkel",
85
    "Nebenausrichtung",
86
]
87
88
MASTR_DTYPES = {
89
    "EinheitMastrNummer": str,
90
    "Bruttoleistung": float,
91
    "StatisikFlag": str,
92
    "Bruttoleistung_extended": float,
93
    "Nettonennleistung": float,
94
    "InstallierteLeistung": float,
95
    "zugeordneteWirkleistungWechselrichter": float,
96
    "EinheitBetriebsstatus": str,
97
    "Standort": str,
98
    "Bundesland": str,
99
    "Land": str,
100
    "Landkreis": str,
101
    "Gemeinde": str,
102
    # "Postleitzahl": int,  # fails because of nan values
103
    "Ort": str,
104
    "GemeinsamerWechselrichterMitSpeicher": str,
105
    "Lage": str,
106
    "Leistungsbegrenzung": str,
107
    # this will parse nan values as false wich is not always correct
108
    # "EinheitlicheAusrichtungUndNeigungswinkel": bool,
109
    "Hauptausrichtung": str,
110
    "HauptausrichtungNeigungswinkel": str,
111
    "Nebenausrichtung": str,
112
    "NebenausrichtungNeigungswinkel": str,
113
}
114
115
MASTR_PARSE_DATES = [
116
    "GeplantesInbetriebnahmedatum",
117
    "Inbetriebnahmedatum",
118
]
119
120
MASTR_INDEX_COL = "EinheitMastrNummer"
121
122
EPSG = 4326
123
SRID = 3035
124
125
# data cleaning
126
MAX_REALISTIC_PV_CAP = 23500
127
MIN_REALISTIC_PV_CAP = 0.1
128
ROUNDING = 1
129
130
# geopy
131
MIN_DELAY_SECONDS = 1
132
USER_AGENT = "rli_kh_geocoder"
133
134
# show additional logging information
135
VERBOSE = False
136
137
EXPORT_DIR = Path(__name__).resolve().parent / "data"
138
EXPORT_FILE = "mastr_geocoded.gpkg"
139
EXPORT_PATH = EXPORT_DIR / EXPORT_FILE
140
DRIVER = "GPKG"
141
142
# Number of quantiles
143
Q = 5
144
145
# Scenario Data
146
CARRIER = "solar_rooftop"
147
SCENARIOS = ["eGon2035", "eGon100RE"]
148
SCENARIO_TIMESTAMP = {
149
    "eGon2035": pd.Timestamp("2035-01-01", tz="UTC"),
150
    "eGon100RE": pd.Timestamp("2050-01-01", tz="UTC"),
151
}
152
PV_ROOFTOP_LIFETIME = pd.Timedelta(30 * 365, unit="D")
153
154
# Example Modul Trina Vertex S TSM-400DE09M.08 400 Wp
155
# https://www.photovoltaik4all.de/media/pdf/92/64/68/Trina_Datasheet_VertexS_DE09-08_2021_A.pdf
156
MODUL_CAP = 0.4  # kWp
157
MODUL_SIZE = 1.096 * 1.754  # m²
158
PV_CAP_PER_SQ_M = MODUL_CAP / MODUL_SIZE
159
160
# Estimation of usable roof area
161
# Factor for the conversion of building area to roof area
162
# estimation mean roof pitch: 35°
163
# estimation usable roof share: 80%
164
# estimation that only the south side of the building is used for pv
165
# see https://mediatum.ub.tum.de/doc/%20969497/969497.pdf
166
# AREA_FACTOR = 1.221
167
# USABLE_ROOF_SHARE = 0.8
168
# SOUTH_SHARE = 0.5
169
# ROOF_FACTOR = AREA_FACTOR * USABLE_ROOF_SHARE * SOUTH_SHARE
170
ROOF_FACTOR = 0.5
171
172
CAP_RANGES = [
173
    (0, 30),
174
    (30, 100),
175
    (100, float("inf")),
176
]
177
178
MIN_BUILDING_SIZE = 10.0
179
UPPER_QUNATILE = 0.95
180
LOWER_QUANTILE = 0.05
181
182
COLS_TO_RENAME = {
183
    "EinheitlicheAusrichtungUndNeigungswinkel": (
184
        "einheitliche_ausrichtung_und_neigungswinkel"
185
    ),
186
    "Hauptausrichtung": "hauptausrichtung",
187
    "HauptausrichtungNeigungswinkel": "hauptausrichtung_neigungswinkel",
188
}
189
190
COLS_TO_EXPORT = [
191
    "scenario",
192
    "building_id",
193
    "gens_id",
194
    "capacity",
195
    "einheitliche_ausrichtung_und_neigungswinkel",
196
    "hauptausrichtung",
197
    "hauptausrichtung_neigungswinkel",
198
    "voltage_level",
199
]
200
201
INCLUDE_SYNTHETIC_BUILDINGS = False
202
203
204
def timer_func(func):
205
    @wraps(func)
206
    def timeit_wrapper(*args, **kwargs):
207
        start_time = perf_counter()
208
        result = func(*args, **kwargs)
209
        end_time = perf_counter()
210
        total_time = end_time - start_time
211
        print(f"Function {func.__name__} took {total_time:.4f} seconds.")
212
        return result
213
214
    return timeit_wrapper
215
216
217
@timer_func
218
def mastr_data(
219
    index_col: str | int | list[str] | list[int],
220
    usecols: list[str],
221
    dtype: dict[str, Any] | None,
222
    parse_dates: list[str] | None,
223
) -> pd.DataFrame:
224
    """
225
    Read MaStR data from csv.
226
227
    Parameters
228
    -----------
229
    index_col : str, int or list of str or int
230
        Column(s) to use as the row labels of the DataFrame.
231
    usecols : list of str
232
        Return a subset of the columns.
233
    dtype : dict of column (str) -> type (any), optional
234
        Data type for data or columns.
235
    parse_dates : list of names (str), optional
236
        Try to parse given columns to datetime.
237
    Returns
238
    -------
239
    pandas.DataFrame
240
        DataFrame containing MaStR data.
241
    """
242
    mastr_path = Path(
243
        config.datasets()["power_plants"]["sources"]["mastr_pv"]
244
    ).resolve()
245
246
    mastr_df = pd.read_csv(
247
        mastr_path,
248
        index_col=index_col,
249
        usecols=usecols,
250
        dtype=dtype,
251
        parse_dates=parse_dates,
252
    )
253
254
    mastr_df = mastr_df.loc[
255
        (mastr_df.StatisikFlag == "B")
256
        & (mastr_df.EinheitBetriebsstatus == "InBetrieb")
257
        & (mastr_df.Land == "Deutschland")
258
        & (mastr_df.Lage == "BaulicheAnlagen")
259
    ]
260
261
    if (
262
        config.settings()["egon-data"]["--dataset-boundary"]
263
        == "Schleswig-Holstein"
264
    ):
265
        init_len = len(mastr_df)
266
267
        mastr_df = mastr_df.loc[mastr_df.Bundesland == "SchleswigHolstein"]
268
269
        logger.info(
270
            f"Using only MaStR data within Schleswig-Holstein. "
271
            f"{init_len - len(mastr_df)} of {init_len} generators are dropped."
272
        )
273
274
    logger.debug("MaStR data loaded.")
275
276
    return mastr_df
277
278
279
@timer_func
280
def clean_mastr_data(
281
    mastr_df: pd.DataFrame,
282
    max_realistic_pv_cap: int | float,
283
    min_realistic_pv_cap: int | float,
284
    rounding: int,
285
    seed: int,
286
) -> pd.DataFrame:
287
    """
288
    Clean the MaStR data from implausible data.
289
290
    * Drop MaStR ID duplicates.
291
    * Drop generators with implausible capacities.
292
    * Drop generators without any kind of start-up date.
293
    * Clean up Standort column and capacity.
294
295
    Parameters
296
    -----------
297
    mastr_df : pandas.DataFrame
298
        DataFrame containing MaStR data.
299
    max_realistic_pv_cap : int or float
300
        Maximum capacity, which is considered to be realistic.
301
    min_realistic_pv_cap : int or float
302
        Minimum capacity, which is considered to be realistic.
303
    rounding : int
304
        Rounding to use when cleaning up capacity. E.g. when
305
        rounding is 1 a capacity of 9.93 will be rounded to 9.9.
306
    seed : int
307
        Seed to use for random operations with NumPy and pandas.
308
    Returns
309
    -------
310
    pandas.DataFrame
311
        DataFrame containing cleaned MaStR data.
312
    """
313
    init_len = len(mastr_df)
314
315
    # drop duplicates
316
    mastr_df = mastr_df.loc[~mastr_df.index.duplicated()]
317
318
    # drop invalid entries in standort
319
    index_to_drop = mastr_df.loc[
320
        (mastr_df.Standort.isna()) | (mastr_df.Standort.isnull())
321
    ].index
322
323
    mastr_df = mastr_df.loc[~mastr_df.index.isin(index_to_drop)]
324
325
    df = mastr_df[
326
        [
327
            "Bruttoleistung",
328
            "Bruttoleistung_extended",
329
            "Nettonennleistung",
330
            "zugeordneteWirkleistungWechselrichter",
331
            "InstallierteLeistung",
332
        ]
333
    ].round(rounding)
334
335
    # use only the smallest capacity rating if multiple are given
336
    mastr_df = mastr_df.assign(
337
        capacity=[
338
            most_plausible(p_tub, min_realistic_pv_cap)
339
            for p_tub in df.itertuples(index=False)
340
        ]
341
    )
342
343
    # drop generators without any capacity info
344
    # and capacity of zero
345
    # and if the capacity is > 23.5 MW, because
346
    # Germanies largest rooftop PV is 23 MW
347
    # https://www.iwr.de/news/groesste-pv-dachanlage-europas-wird-in-sachsen-anhalt-gebaut-news37379
348
    mastr_df = mastr_df.loc[
349
        (~mastr_df.capacity.isna())
350
        & (mastr_df.capacity <= max_realistic_pv_cap)
351
        & (mastr_df.capacity > min_realistic_pv_cap)
352
    ]
353
354
    # get zip and municipality
355
    mastr_df[["zip_and_municipality", "drop_this"]] = pd.DataFrame(
356
        mastr_df.Standort.astype(str)
357
        .apply(
358
            zip_and_municipality_from_standort,
359
            args=(VERBOSE,),
360
        )
361
        .tolist(),
362
        index=mastr_df.index,
363
    )
364
365
    # drop invalid entries
366
    mastr_df = mastr_df.loc[mastr_df.drop_this].drop(columns="drop_this")
367
368
    # add ", Deutschland" just in case
369
    mastr_df = mastr_df.assign(
370
        zip_and_municipality=(mastr_df.zip_and_municipality + ", Deutschland")
371
    )
372
373
    # get consistent start-up date
374
    mastr_df = mastr_df.assign(
375
        start_up_date=mastr_df.Inbetriebnahmedatum,
376
    )
377
378
    mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
379
        mastr_df.start_up_date.isna()
380
    ].assign(
381
        start_up_date=mastr_df.GeplantesInbetriebnahmedatum.loc[
382
            mastr_df.start_up_date.isna()
383
        ]
384
    )
385
386
    # randomly and weighted fill missing start-up dates
387
    pool = mastr_df.loc[
388
        ~mastr_df.start_up_date.isna()
389
    ].start_up_date.to_numpy()
390
391
    size = len(mastr_df) - len(pool)
392
393
    if size > 0:
394
        np.random.seed(seed)
395
396
        choice = np.random.choice(
397
            pool,
398
            size=size,
399
            replace=False,
400
        )
401
402
        mastr_df.loc[mastr_df.start_up_date.isna()] = mastr_df.loc[
403
            mastr_df.start_up_date.isna()
404
        ].assign(start_up_date=choice)
405
406
        logger.info(
407
            f"Randomly and weigthed added start-up date to {size} generators."
408
        )
409
410
    mastr_df = mastr_df.assign(
411
        start_up_date=pd.to_datetime(mastr_df.start_up_date, utc=True)
412
    )
413
414
    end_len = len(mastr_df)
415
    logger.debug(
416
        f"Dropped {init_len - end_len} "
417
        f"({((init_len - end_len) / init_len) * 100:g}%)"
418
        f" of {init_len} rows from MaStR DataFrame."
419
    )
420
421
    return mastr_df
422
423
424
def zip_and_municipality_from_standort(
425
    standort: str,
426
    verbose: bool = False,
427
) -> tuple[str, bool]:
428
    """
429
    Get zip code and municipality from Standort string split into a list.
430
    Parameters
431
    -----------
432
    standort : str
433
        Standort as given from MaStR data.
434
    verbose : bool
435
        Logs additional info if True.
436
    Returns
437
    -------
438
    str
439
        Standort with only the zip code and municipality
440
        as well a ', Germany' added.
441
    """
442
    if verbose:
443
        logger.debug(f"Uncleaned String: {standort}")
444
445
    standort_list = standort.split()
446
447
    found = False
448
    count = 0
449
450
    for count, elem in enumerate(standort_list):
451
        if len(elem) != 5:
452
            continue
453
        if not elem.isnumeric():
454
            continue
455
456
        found = True
457
458
        break
459
460
    if found:
461
        cleaned_str = " ".join(standort_list[count:])
462
463
        if verbose:
464
            logger.debug(f"Cleaned String:   {cleaned_str}")
465
466
        return cleaned_str, found
467
468
    logger.warning(
469
        "Couldn't identify zip code. This entry will be dropped."
470
        f" Original standort: {standort}."
471
    )
472
473
    return standort, found
474
475
476
def most_plausible(
477
    p_tub: tuple,
478
    min_realistic_pv_cap: int | float,
479
) -> float:
480
    """
481
    Try to determine the most plausible capacity.
482
    Try to determine the most plausible capacity from a given
483
    generator from MaStR data.
484
    Parameters
485
    -----------
486
    p_tub : tuple
487
        Tuple containing the different capacities given in
488
        the MaStR data.
489
    min_realistic_pv_cap : int or float
490
        Minimum capacity, which is considered to be realistic.
491
    Returns
492
    -------
493
    float
494
        Capacity of the generator estimated as the most realistic.
495
    """
496
    count = Counter(p_tub).most_common(3)
497
498
    if len(count) == 1:
499
        return count[0][0]
500
501
    val1 = count[0][0]
502
    val2 = count[1][0]
503
504
    if len(count) == 2:
505
        min_val = min(val1, val2)
506
        max_val = max(val1, val2)
507
    else:
508
        val3 = count[2][0]
509
510
        min_val = min(val1, val2, val3)
511
        max_val = max(val1, val2, val3)
512
513
    if min_val < min_realistic_pv_cap:
514
        return max_val
515
516
    return min_val
517
518
519
def geocoder(
520
    user_agent: str,
521
    min_delay_seconds: int,
522
) -> RateLimiter:
523
    """
524
    Setup Nominatim geocoding class.
525
    Parameters
526
    -----------
527
    user_agent : str
528
        The app name.
529
    min_delay_seconds : int
530
        Delay in seconds to use between requests to Nominatim.
531
        A minimum of 1 is advised.
532
    Returns
533
    -------
534
    geopy.extra.rate_limiter.RateLimiter
535
        Nominatim RateLimiter geocoding class to use for geocoding.
536
    """
537
    locator = Nominatim(user_agent=user_agent)
538
    return RateLimiter(
539
        locator.geocode,
540
        min_delay_seconds=min_delay_seconds,
541
    )
542
543
544
def geocoding_data(
545
    clean_mastr_df: pd.DataFrame,
546
) -> pd.DataFrame:
547
    """
548
    Setup DataFrame to geocode.
549
    Parameters
550
    -----------
551
    clean_mastr_df : pandas.DataFrame
552
        DataFrame containing cleaned MaStR data.
553
    Returns
554
    -------
555
    pandas.DataFrame
556
        DataFrame containing all unique combinations of
557
        zip codes with municipalities for geocoding.
558
    """
559
    return pd.DataFrame(
560
        data=clean_mastr_df.zip_and_municipality.unique(),
561
        columns=["zip_and_municipality"],
562
    )
563
564
565
@timer_func
566
def geocode_data(
567
    geocoding_df: pd.DataFrame,
568
    ratelimiter: RateLimiter,
569
    epsg: int,
570
) -> gpd.GeoDataFrame:
571
    """
572
    Geocode zip code and municipality.
573
    Extract latitude, longitude and altitude.
574
    Transfrom latitude and longitude to shapely
575
    Point and return a geopandas GeoDataFrame.
576
    Parameters
577
    -----------
578
    geocoding_df : pandas.DataFrame
579
        DataFrame containing all unique combinations of
580
        zip codes with municipalities for geocoding.
581
    ratelimiter : geopy.extra.rate_limiter.RateLimiter
582
        Nominatim RateLimiter geocoding class to use for geocoding.
583
    epsg : int
584
        EPSG ID to use as CRS.
585
    Returns
586
    -------
587
    geopandas.GeoDataFrame
588
        GeoDataFrame containing all unique combinations of
589
        zip codes with municipalities with matching geolocation.
590
    """
591
    logger.info(f"Geocoding {len(geocoding_df)} locations.")
592
593
    geocode_df = geocoding_df.assign(
594
        location=geocoding_df.zip_and_municipality.apply(ratelimiter)
595
    )
596
597
    geocode_df = geocode_df.assign(
598
        point=geocode_df.location.apply(
599
            lambda loc: tuple(loc.point) if loc else None
600
        )
601
    )
602
603
    geocode_df[["latitude", "longitude", "altitude"]] = pd.DataFrame(
604
        geocode_df.point.tolist(), index=geocode_df.index
605
    )
606
607
    return gpd.GeoDataFrame(
608
        geocode_df,
609
        geometry=gpd.points_from_xy(geocode_df.longitude, geocode_df.latitude),
610
        crs=f"EPSG:{epsg}",
611
    )
612
613
614
def merge_geocode_with_mastr(
615
    clean_mastr_df: pd.DataFrame, geocode_gdf: gpd.GeoDataFrame
616
) -> gpd.GeoDataFrame:
617
    """
618
    Merge geometry to original mastr data.
619
    Parameters
620
    -----------
621
    clean_mastr_df : pandas.DataFrame
622
        DataFrame containing cleaned MaStR data.
623
    geocode_gdf : geopandas.GeoDataFrame
624
        GeoDataFrame containing all unique combinations of
625
        zip codes with municipalities with matching geolocation.
626
    Returns
627
    -------
628
    gepandas.GeoDataFrame
629
        GeoDataFrame containing cleaned MaStR data with
630
        matching geolocation from geocoding.
631
    """
632
    return gpd.GeoDataFrame(
633
        clean_mastr_df.merge(
634
            geocode_gdf[["zip_and_municipality", "geometry"]],
635
            how="left",
636
            left_on="zip_and_municipality",
637
            right_on="zip_and_municipality",
638
        ),
639
        crs=geocode_gdf.crs,
640
    ).set_index(clean_mastr_df.index)
641
642
643
def drop_invalid_entries_from_gdf(
644
    gdf: gpd.GeoDataFrame,
645
) -> gpd.GeoDataFrame:
646
    """
647
    Drop invalid entries from geopandas GeoDataFrame.
648
    TODO: how to omit the logging from geos here???
649
    Parameters
650
    -----------
651
    gdf : geopandas.GeoDataFrame
652
        GeoDataFrame to be checked for validity.
653
    Returns
654
    -------
655
    gepandas.GeoDataFrame
656
        GeoDataFrame with rows with invalid geometries
657
        dropped.
658
    """
659
    valid_gdf = gdf.loc[gdf.is_valid]
660
661
    logger.debug(
662
        f"{len(gdf) - len(valid_gdf)} "
663
        f"({(len(gdf) - len(valid_gdf)) / len(gdf) * 100:g}%) "
664
        f"of {len(gdf)} values were invalid and are dropped."
665
    )
666
667
    return valid_gdf
668
669
670
@timer_func
671
def municipality_data() -> gpd.GeoDataFrame:
672
    """
673
    Get municipality data from eGo^n Database.
674
    Returns
675
    -------
676
    gepandas.GeoDataFrame
677
        GeoDataFrame with municipality data.
678
    """
679
    with db.session_scope() as session:
680
        query = session.query(Vg250Gem.ags, Vg250Gem.geometry.label("geom"))
681
682
    return gpd.read_postgis(
683
        query.statement, query.session.bind, index_col="ags"
684
    )
685
686
687
@timer_func
688
def add_ags_to_gens(
689
    valid_mastr_gdf: gpd.GeoDataFrame,
690
    municipalities_gdf: gpd.GeoDataFrame,
691
) -> gpd.GeoDataFrame:
692
    """
693
    Add information about AGS ID to generators.
694
    Parameters
695
    -----------
696
    valid_mastr_gdf : geopandas.GeoDataFrame
697
        GeoDataFrame with valid and cleaned MaStR data.
698
    municipalities_gdf : geopandas.GeoDataFrame
699
        GeoDataFrame with municipality data.
700
    Returns
701
    -------
702
    gepandas.GeoDataFrame
703
        GeoDataFrame with valid and cleaned MaStR data
704
        with AGS ID added.
705
    """
706
    return valid_mastr_gdf.sjoin(
707
        municipalities_gdf,
708
        how="left",
709
        predicate="intersects",
710
    ).rename(columns={"index_right": "ags"})
711
712
713
def drop_gens_outside_muns(
714
    valid_mastr_gdf: gpd.GeoDataFrame,
715
) -> gpd.GeoDataFrame:
716
    """
717
    Drop all generators outside of municipalities.
718
    Parameters
719
    -----------
720
    valid_mastr_gdf : geopandas.GeoDataFrame
721
        GeoDataFrame with valid and cleaned MaStR data.
722
    Returns
723
    -------
724
    gepandas.GeoDataFrame
725
        GeoDataFrame with valid and cleaned MaStR data
726
        with generatos without an AGS ID dropped.
727
    """
728
    gdf = valid_mastr_gdf.loc[~valid_mastr_gdf.ags.isna()]
729
730
    logger.debug(
731
        f"{len(valid_mastr_gdf) - len(gdf)} "
732
        f"({(len(valid_mastr_gdf) - len(gdf)) / len(valid_mastr_gdf) * 100:g}%) "
733
        f"of {len(valid_mastr_gdf)} values are outside of the municipalities"
734
        " and are therefore dropped."
735
    )
736
737
    return gdf
738
739
740
class EgonMastrPvRoofGeocoded(Base):
741
    __tablename__ = "egon_mastr_pv_roof_geocoded"
742
    __table_args__ = {"schema": "supply"}
743
744
    zip_and_municipality = Column(String, primary_key=True, index=True)
745
    location = Column(String)
746
    point = Column(String)
747
    latitude = Column(Float)
748
    longitude = Column(Float)
749
    altitude = Column(Float)
750
    geometry = Column(Geometry(srid=EPSG))
751
752
753
def create_geocoded_table(geocode_gdf):
754
    """
755
    Create geocoded table mastr pv rooftop
756
    Parameters
757
    -----------
758
    geocode_gdf : geopandas.GeoDataFrame
759
        GeoDataFrame containing geocoding information on pv rooftop locations.
760
    """
761
    EgonMastrPvRoofGeocoded.__table__.drop(bind=engine, checkfirst=True)
762
    EgonMastrPvRoofGeocoded.__table__.create(bind=engine, checkfirst=True)
763
764
    geocode_gdf.to_postgis(
765
        name=EgonMastrPvRoofGeocoded.__table__.name,
766
        schema=EgonMastrPvRoofGeocoded.__table__.schema,
767
        con=db.engine(),
768
        if_exists="append",
769
        index=False,
770
        # dtype={}
771
    )
772
773
774
def geocoded_data_from_db(
775
    epsg: str | int,
776
) -> gpd.GeoDataFrame:
777
    """
778
    Read OSM buildings data from eGo^n Database.
779
    Parameters
780
    -----------
781
    to_crs : pyproj.crs.crs.CRS
782
        CRS to transform geometries to.
783
    Returns
784
    -------
785
    geopandas.GeoDataFrame
786
        GeoDataFrame containing OSM buildings data.
787
    """
788
    with db.session_scope() as session:
789
        query = session.query(
790
            EgonMastrPvRoofGeocoded.zip_and_municipality,
791
            EgonMastrPvRoofGeocoded.geometry,
792
        )
793
794
    return gpd.read_postgis(
795
        query.statement, query.session.bind, geom_col="geometry"
796
    ).to_crs(f"EPSG:{epsg}")
797
798
799
def load_mastr_data():
800
    """Read PV rooftop data from MaStR CSV
801
    Note: the source will be replaced as soon as the MaStR data is available
802
    in DB.
803
    Returns
804
    -------
805
    geopandas.GeoDataFrame
806
        GeoDataFrame containing MaStR data with geocoded locations.
807
    """
808
    mastr_df = mastr_data(
809
        MASTR_INDEX_COL,
810
        MASTR_RELEVANT_COLS,
811
        MASTR_DTYPES,
812
        MASTR_PARSE_DATES,
813
    )
814
815
    clean_mastr_df = clean_mastr_data(
816
        mastr_df,
817
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
818
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
819
        seed=SEED,
820
        rounding=ROUNDING,
821
    )
822
823
    geocode_gdf = geocoded_data_from_db(EPSG)
824
825
    mastr_gdf = merge_geocode_with_mastr(clean_mastr_df, geocode_gdf)
826
827
    valid_mastr_gdf = drop_invalid_entries_from_gdf(mastr_gdf)
828
829
    municipalities_gdf = municipality_data()
830
831
    valid_mastr_gdf = add_ags_to_gens(valid_mastr_gdf, municipalities_gdf)
832
833
    return drop_gens_outside_muns(valid_mastr_gdf)
834
835
836
class OsmBuildingsFiltered(Base):
837
    __tablename__ = "osm_buildings_filtered"
838
    __table_args__ = {"schema": "openstreetmap"}
839
840
    osm_id = Column(BigInteger)
841
    amenity = Column(String)
842
    building = Column(String)
843
    name = Column(String)
844
    geom = Column(Geometry(srid=SRID), index=True)
845
    area = Column(Float)
846
    geom_point = Column(Geometry(srid=SRID), index=True)
847
    tags = Column(HSTORE)
848
    id = Column(BigInteger, primary_key=True, index=True)
849
850
851
@timer_func
852
def osm_buildings(
853
    to_crs: CRS,
854
) -> gpd.GeoDataFrame:
855
    """
856
    Read OSM buildings data from eGo^n Database.
857
    Parameters
858
    -----------
859
    to_crs : pyproj.crs.crs.CRS
860
        CRS to transform geometries to.
861
    Returns
862
    -------
863
    geopandas.GeoDataFrame
864
        GeoDataFrame containing OSM buildings data.
865
    """
866
    with db.session_scope() as session:
867
        query = session.query(
868
            OsmBuildingsFiltered.id,
869
            OsmBuildingsFiltered.area,
870
            OsmBuildingsFiltered.geom_point.label("geom"),
871
        )
872
873
    return gpd.read_postgis(
874
        query.statement, query.session.bind, index_col="id"
875
    ).to_crs(to_crs)
876
877
878
@timer_func
879
def synthetic_buildings(
880
    to_crs: CRS,
881
) -> gpd.GeoDataFrame:
882
    """
883
    Read synthetic buildings data from eGo^n Database.
884
    Parameters
885
    -----------
886
    to_crs : pyproj.crs.crs.CRS
887
        CRS to transform geometries to.
888
    Returns
889
    -------
890
    geopandas.GeoDataFrame
891
        GeoDataFrame containing OSM buildings data.
892
    """
893
    with db.session_scope() as session:
894
        query = session.query(
895
            OsmBuildingsSynthetic.id,
896
            OsmBuildingsSynthetic.area,
897
            OsmBuildingsSynthetic.geom_point.label("geom"),
898
        )
899
900
    return gpd.read_postgis(
901
        query.statement, query.session.bind, index_col="id"
902
    ).to_crs(to_crs)
903
904
905
@timer_func
906
def add_ags_to_buildings(
907
    buildings_gdf: gpd.GeoDataFrame,
908
    municipalities_gdf: gpd.GeoDataFrame,
909
) -> gpd.GeoDataFrame:
910
    """
911
    Add information about AGS ID to buildings.
912
    Parameters
913
    -----------
914
    buildings_gdf : geopandas.GeoDataFrame
915
        GeoDataFrame containing OSM buildings data.
916
    municipalities_gdf : geopandas.GeoDataFrame
917
        GeoDataFrame with municipality data.
918
    Returns
919
    -------
920
    gepandas.GeoDataFrame
921
        GeoDataFrame containing OSM buildings data
922
        with AGS ID added.
923
    """
924
    return buildings_gdf.sjoin(
925
        municipalities_gdf,
926
        how="left",
927
        predicate="intersects",
928
    ).rename(columns={"index_right": "ags"})
929
930
931
def drop_buildings_outside_muns(
932
    buildings_gdf: gpd.GeoDataFrame,
933
) -> gpd.GeoDataFrame:
934
    """
935
    Drop all buildings outside of municipalities.
936
    Parameters
937
    -----------
938
    buildings_gdf : geopandas.GeoDataFrame
939
        GeoDataFrame containing OSM buildings data.
940
    Returns
941
    -------
942
    gepandas.GeoDataFrame
943
        GeoDataFrame containing OSM buildings data
944
        with buildings without an AGS ID dropped.
945
    """
946
    gdf = buildings_gdf.loc[~buildings_gdf.ags.isna()]
947
948
    logger.debug(
949
        f"{len(buildings_gdf) - len(gdf)} "
950
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
951
        f"of {len(buildings_gdf)} values are outside of the municipalities "
952
        "and are therefore dropped."
953
    )
954
955
    return gdf
956
957
958
@timer_func
959
def load_building_data():
960
    """
961
    Read buildings from DB
962
    Tables:
963
964
    * `openstreetmap.osm_buildings_filtered` (from OSM)
965
    * `openstreetmap.osm_buildings_synthetic` (synthetic, created by us)
966
967
    Use column `id` for both as it is unique hence you concat both datasets. If
968
    INCLUDE_SYNTHETIC_BUILDINGS is False synthetic buildings will not be loaded.
969
970
    Returns
971
    -------
972
    gepandas.GeoDataFrame
973
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
974
        dropped.
975
    """
976
977
    municipalities_gdf = municipality_data()
978
979
    osm_buildings_gdf = osm_buildings(municipalities_gdf.crs)
980
981
    if INCLUDE_SYNTHETIC_BUILDINGS:
982
        synthetic_buildings_gdf = synthetic_buildings(municipalities_gdf.crs)
983
984
        buildings_gdf = gpd.GeoDataFrame(
985
            pd.concat(
986
                [
987
                    osm_buildings_gdf,
988
                    synthetic_buildings_gdf,
989
                ]
990
            ),
991
            geometry="geom",
992
            crs=osm_buildings_gdf.crs,
993
        ).rename(columns={"area": "building_area"})
994
995
    else:
996
        buildings_gdf = osm_buildings_gdf.rename(
997
            columns={"area": "building_area"}
998
        )
999
1000
    buildings_ags_gdf = add_ags_to_buildings(buildings_gdf, municipalities_gdf)
1001
1002
    logger.debug("Loaded buildings.")
1003
1004
    return drop_buildings_outside_muns(buildings_ags_gdf)
1005
1006
1007
@timer_func
1008
def sort_and_qcut_df(
1009
    df: pd.DataFrame | gpd.GeoDataFrame,
1010
    col: str,
1011
    q: int,
1012
) -> pd.DataFrame | gpd.GeoDataFrame:
1013
    """
1014
    Determine the quantile of a given attribute in a (Geo)DataFrame.
1015
    Sort the (Geo)DataFrame in ascending order for the given attribute.
1016
    Parameters
1017
    -----------
1018
    df : pandas.DataFrame or geopandas.GeoDataFrame
1019
        (Geo)DataFrame to sort and qcut.
1020
    col : str
1021
        Name of the attribute to sort and qcut the (Geo)DataFrame on.
1022
    q : int
1023
        Number of quantiles.
1024
    Returns
1025
    -------
1026
    pandas.DataFrame or gepandas.GeoDataFrame
1027
        Sorted and qcut (Geo)DataFrame.
1028
    """
1029
    df = df.sort_values(col, ascending=True)
1030
1031
    return df.assign(
1032
        quant=pd.qcut(
1033
            df[col],
1034
            q=q,
1035
            labels=range(q),
1036
        )
1037
    )
1038
1039
1040
@timer_func
1041
def allocate_pv(
1042
    q_mastr_gdf: gpd.GeoDataFrame,
1043
    q_buildings_gdf: gpd.GeoDataFrame,
1044
    seed: int,
1045
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1046
    """
1047
    Allocate the MaStR pv generators to the OSM buildings.
1048
    This will determine a building for each pv generator if there are more
1049
    buildings than generators within a given AGS. Primarily generators are
1050
    distributed with the same qunatile as the buildings. Multiple assignment
1051
    is excluded.
1052
    Parameters
1053
    -----------
1054
    q_mastr_gdf : geopandas.GeoDataFrame
1055
        GeoDataFrame containing geocoded and qcut MaStR data.
1056
    q_buildings_gdf : geopandas.GeoDataFrame
1057
        GeoDataFrame containing qcut OSM buildings data.
1058
    seed : int
1059
        Seed to use for random operations with NumPy and pandas.
1060
    Returns
1061
    -------
1062
    tuple with two geopandas.GeoDataFrame s
1063
        GeoDataFrame containing MaStR data allocated to building IDs.
1064
        GeoDataFrame containing building data allocated to MaStR IDs.
1065
    """
1066
    rng = default_rng(seed=seed)
1067
1068
    q_buildings_gdf = q_buildings_gdf.assign(gens_id=np.nan)
1069
    q_mastr_gdf = q_mastr_gdf.assign(building_id=np.nan)
1070
1071
    ags_list = q_buildings_gdf.ags.unique()
1072
1073
    num_ags = len(ags_list)
1074
1075
    t0 = perf_counter()
1076
1077
    for count, ags in enumerate(ags_list):
1078
1079
        buildings = q_buildings_gdf.loc[
1080
            (q_buildings_gdf.ags == ags) & (q_buildings_gdf.gens_id.isna())
1081
        ]
1082
        gens = q_mastr_gdf.loc[
1083
            (q_mastr_gdf.ags == ags) & (q_mastr_gdf.building_id.isna())
1084
        ]
1085
1086
        len_build = len(buildings)
1087
        len_gens = len(gens)
1088
1089
        if len_build < len_gens:
1090
            gens = gens.sample(len_build, random_state=RandomState(seed=seed))
1091
            logger.error(
1092
                f"There are {len_gens} generators and only {len_build}"
1093
                f" buildings in AGS {ags}. {len_gens - len(gens)} "
1094
                "generators were truncated to match the amount of buildings."
1095
            )
1096
1097
            assert len_build == len(gens)
1098
1099
        for quant in gens.quant.unique():
1100
            q_buildings = buildings.loc[
1101
                (buildings.quant == quant) & (buildings.gens_id.isna())
1102
            ]
1103
            q_gens = gens.loc[gens.quant == quant]
1104
1105
            len_build = len(q_buildings)
1106
            len_gens = len(q_gens)
1107
1108
            if len_build < len_gens:
1109
                delta = len_gens - len_build
1110
1111
                logger.warning(
1112
                    f"There are {len_gens} generators and only {len_build} "
1113
                    f"buildings in AGS {ags} and quantile {quant}. {delta} "
1114
                    f"buildings from AGS {ags} will be added randomly."
1115
                )
1116
1117
                add_buildings = pd.Index(
1118
                    rng.choice(
1119
                        buildings.loc[
1120
                            (buildings.quant != quant)
1121
                            & (buildings.gens_id.isna())
1122
                        ].index,
1123
                        size=delta,
1124
                        replace=False,
1125
                    )
1126
                )
1127
1128
                q_buildings = buildings.loc[
1129
                    q_buildings.index.append(add_buildings)
1130
                ]
1131
1132
                assert len(q_buildings) == len_gens
1133
1134
            chosen_buildings = pd.Index(
1135
                rng.choice(
1136
                    q_buildings.index,
1137
                    size=len_gens,
1138
                    replace=False,
1139
                )
1140
            )
1141
1142
            # q_mastr_gdf.loc[q_gens.index] = q_mastr_gdf.loc[
1143
            #     q_gens.index
1144
            # ].assign(building_id=chosen_buildings)
1145
1146
            q_buildings_gdf.loc[chosen_buildings, "gens_id"] = q_gens.index
1147
1148
        if count % 100 == 0:
1149
            logger.debug(
1150
                f"Allocation of {count / num_ags * 100:g} % of AGS done. It took "
1151
                f"{perf_counter() - t0:g} seconds."
1152
            )
1153
1154
            t0 = perf_counter()
1155
1156
    assigned_buildings = q_buildings_gdf.loc[~q_buildings_gdf.gens_id.isna()]
1157
1158
    q_mastr_gdf.loc[
1159
        assigned_buildings.gens_id, "building_id"
1160
    ] = assigned_buildings.index
1161
1162
    logger.debug("Allocated status quo generators to buildings.")
1163
1164
    return frame_to_numeric(q_mastr_gdf), frame_to_numeric(q_buildings_gdf)
1165
1166
1167
def frame_to_numeric(
1168
    df: pd.DataFrame | gpd.GeoDataFrame,
1169
) -> pd.DataFrame | gpd.GeoDataFrame:
1170
    """
1171
    Try to convert all columns of a DataFrame to numeric ignoring errors.
1172
    Parameters
1173
    ----------
1174
    df : pandas.DataFrame or geopandas.GeoDataFrame
1175
    Returns
1176
    -------
1177
    pandas.DataFrame or geopandas.GeoDataFrame
1178
    """
1179
    if str(df.index.dtype) == "object":
1180
        df.index = pd.to_numeric(df.index, errors="ignore")
1181
1182
    for col in df.columns:
1183
        if str(df[col].dtype) == "object":
1184
            df[col] = pd.to_numeric(df[col], errors="ignore")
1185
1186
    return df
1187
1188
1189
def validate_output(
1190
    desagg_mastr_gdf: pd.DataFrame | gpd.GeoDataFrame,
1191
    desagg_buildings_gdf: pd.DataFrame | gpd.GeoDataFrame,
1192
) -> None:
1193
    """
1194
    Validate output.
1195
1196
    * Validate that there are exactly as many buildings with a pv system as there are
1197
      pv systems with a building
1198
    * Validate that the building IDs with a pv system are the same building IDs as
1199
      assigned to the pv systems
1200
    * Validate that the pv system IDs with a building are the same pv system IDs as
1201
      assigned to the buildings
1202
1203
    Parameters
1204
    -----------
1205
    desagg_mastr_gdf : geopandas.GeoDataFrame
1206
        GeoDataFrame containing MaStR data allocated to building IDs.
1207
    desagg_buildings_gdf : geopandas.GeoDataFrame
1208
        GeoDataFrame containing building data allocated to MaStR IDs.
1209
    """
1210
    assert len(
1211
        desagg_mastr_gdf.loc[~desagg_mastr_gdf.building_id.isna()]
1212
    ) == len(desagg_buildings_gdf.loc[~desagg_buildings_gdf.gens_id.isna()])
1213
    assert (
1214
        np.sort(
1215
            desagg_mastr_gdf.loc[
1216
                ~desagg_mastr_gdf.building_id.isna()
1217
            ].building_id.unique()
1218
        )
1219
        == np.sort(
1220
            desagg_buildings_gdf.loc[
1221
                ~desagg_buildings_gdf.gens_id.isna()
1222
            ].index.unique()
1223
        )
1224
    ).all()
1225
    assert (
1226
        np.sort(
1227
            desagg_mastr_gdf.loc[
1228
                ~desagg_mastr_gdf.building_id.isna()
1229
            ].index.unique()
1230
        )
1231
        == np.sort(
1232
            desagg_buildings_gdf.loc[
1233
                ~desagg_buildings_gdf.gens_id.isna()
1234
            ].gens_id.unique()
1235
        )
1236
    ).all()
1237
1238
    logger.debug("Validated output.")
1239
1240
1241
def drop_unallocated_gens(
1242
    gdf: gpd.GeoDataFrame,
1243
) -> gpd.GeoDataFrame:
1244
    """
1245
    Drop generators which did not get allocated.
1246
1247
    Parameters
1248
    -----------
1249
    gdf : geopandas.GeoDataFrame
1250
        GeoDataFrame containing MaStR data allocated to building IDs.
1251
    Returns
1252
    -------
1253
    geopandas.GeoDataFrame
1254
        GeoDataFrame containing MaStR data with generators dropped which did not get
1255
        allocated.
1256
    """
1257
    init_len = len(gdf)
1258
    gdf = gdf.loc[~gdf.building_id.isna()]
1259
    end_len = len(gdf)
1260
1261
    logger.debug(
1262
        f"Dropped {init_len - end_len} "
1263
        f"({((init_len - end_len) / init_len) * 100:g}%)"
1264
        f" of {init_len} unallocated rows from MaStR DataFrame."
1265
    )
1266
1267
    return gdf
1268
1269
1270
@timer_func
1271
def allocate_to_buildings(
1272
    mastr_gdf: gpd.GeoDataFrame,
1273
    buildings_gdf: gpd.GeoDataFrame,
1274
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
1275
    """
1276
    Allocate status quo pv rooftop generators to buildings.
1277
    Parameters
1278
    -----------
1279
    mastr_gdf : geopandas.GeoDataFrame
1280
        GeoDataFrame containing MaStR data with geocoded locations.
1281
    buildings_gdf : geopandas.GeoDataFrame
1282
        GeoDataFrame containing OSM buildings data with buildings without an AGS ID
1283
        dropped.
1284
    Returns
1285
    -------
1286
    tuple with two geopandas.GeoDataFrame s
1287
        GeoDataFrame containing MaStR data allocated to building IDs.
1288
        GeoDataFrame containing building data allocated to MaStR IDs.
1289
    """
1290
    logger.debug("Starting allocation of status quo.")
1291
1292
    q_mastr_gdf = sort_and_qcut_df(mastr_gdf, col="capacity", q=Q)
1293
    q_buildings_gdf = sort_and_qcut_df(buildings_gdf, col="building_area", q=Q)
1294
1295
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_pv(
1296
        q_mastr_gdf, q_buildings_gdf, SEED
1297
    )
1298
1299
    validate_output(desagg_mastr_gdf, desagg_buildings_gdf)
1300
1301
    return drop_unallocated_gens(desagg_mastr_gdf), desagg_buildings_gdf
1302
1303
1304
@timer_func
1305
def grid_districts(
1306
    epsg: int,
1307
) -> gpd.GeoDataFrame:
1308
    """
1309
    Load mv grid district geo data from eGo^n Database as
1310
    geopandas.GeoDataFrame.
1311
    Parameters
1312
    -----------
1313
    epsg : int
1314
        EPSG ID to use as CRS.
1315
    Returns
1316
    -------
1317
    geopandas.GeoDataFrame
1318
        GeoDataFrame containing mv grid district ID and geo shapes data.
1319
    """
1320
    gdf = db.select_geodataframe(
1321
        """
1322
        SELECT bus_id, geom
1323
        FROM grid.egon_mv_grid_district
1324
        ORDER BY bus_id
1325
        """,
1326
        index_col="bus_id",
1327
        geom_col="geom",
1328
        epsg=epsg,
1329
    )
1330
1331
    gdf.index = gdf.index.astype(int)
1332
1333
    logger.debug("Grid districts loaded.")
1334
1335
    return gdf
1336
1337
1338
def scenario_data(
1339
    carrier: str = "solar_rooftop",
1340
    scenario: str = "eGon2035",
1341
) -> pd.DataFrame:
1342
    """
1343
    Get scenario capacity data from eGo^n Database.
1344
    Parameters
1345
    -----------
1346
    carrier : str
1347
        Carrier type to filter table by.
1348
    scenario : str
1349
        Scenario to filter table by.
1350
    Returns
1351
    -------
1352
    geopandas.GeoDataFrame
1353
        GeoDataFrame with scenario capacity data in GW.
1354
    """
1355
    with db.session_scope() as session:
1356
        query = session.query(EgonScenarioCapacities).filter(
1357
            EgonScenarioCapacities.carrier == carrier,
1358
            EgonScenarioCapacities.scenario_name == scenario,
1359
        )
1360
1361
    df = pd.read_sql(
1362
        query.statement, query.session.bind, index_col="index"
1363
    ).sort_index()
1364
1365
    logger.debug("Scenario capacity data loaded.")
1366
1367
    return df
1368
1369
1370 View Code Duplication
class Vg250Lan(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1371
    __tablename__ = "vg250_lan"
1372
    __table_args__ = {"schema": "boundaries"}
1373
1374
    id = Column(BigInteger, primary_key=True, index=True)
1375
    ade = Column(BigInteger)
1376
    gf = Column(BigInteger)
1377
    bsg = Column(BigInteger)
1378
    ars = Column(String)
1379
    ags = Column(String)
1380
    sdv_ars = Column(String)
1381
    gen = Column(String)
1382
    bez = Column(String)
1383
    ibz = Column(BigInteger)
1384
    bem = Column(String)
1385
    nbd = Column(String)
1386
    sn_l = Column(String)
1387
    sn_r = Column(String)
1388
    sn_k = Column(String)
1389
    sn_v1 = Column(String)
1390
    sn_v2 = Column(String)
1391
    sn_g = Column(String)
1392
    fk_s3 = Column(String)
1393
    nuts = Column(String)
1394
    ars_0 = Column(String)
1395
    ags_0 = Column(String)
1396
    wsk = Column(String)
1397
    debkg_id = Column(String)
1398
    rs = Column(String)
1399
    sdv_rs = Column(String)
1400
    rs_0 = Column(String)
1401
    geometry = Column(Geometry(srid=EPSG), index=True)
1402
1403
1404
def federal_state_data(to_crs: CRS) -> gpd.GeoDataFrame:
1405
    """
1406
    Get feder state data from eGo^n Database.
1407
    Parameters
1408
    -----------
1409
    to_crs : pyproj.crs.crs.CRS
1410
        CRS to transform geometries to.
1411
    Returns
1412
    -------
1413
    geopandas.GeoDataFrame
1414
        GeoDataFrame with federal state data.
1415
    """
1416
    with db.session_scope() as session:
1417
        query = session.query(
1418
            Vg250Lan.id, Vg250Lan.nuts, Vg250Lan.geometry.label("geom")
1419
        )
1420
1421
    gdf = gpd.read_postgis(
1422
        query.statement, query.session.bind, index_col="id"
1423
    ).to_crs(to_crs)
1424
1425
    logger.debug("Federal State data loaded.")
1426
1427
    return gdf
1428
1429
1430
@timer_func
1431
def overlay_grid_districts_with_counties(
1432
    mv_grid_district_gdf: gpd.GeoDataFrame,
1433
    federal_state_gdf: gpd.GeoDataFrame,
1434
) -> gpd.GeoDataFrame:
1435
    """
1436
    Calculate the intersections of mv grid districts and counties.
1437
    Parameters
1438
    -----------
1439
    mv_grid_district_gdf : gpd.GeoDataFrame
1440
        GeoDataFrame containing mv grid district ID and geo shapes data.
1441
    federal_state_gdf : gpd.GeoDataFrame
1442
        GeoDataFrame with federal state data.
1443
    Returns
1444
    -------
1445
    geopandas.GeoDataFrame
1446
        GeoDataFrame containing OSM buildings data.
1447
    """
1448
    logger.debug(
1449
        "Calculating intersection overlay between mv grid districts and "
1450
        "counties. This may take a while..."
1451
    )
1452
1453
    gdf = gpd.overlay(
1454
        federal_state_gdf.to_crs(mv_grid_district_gdf.crs),
1455
        mv_grid_district_gdf.reset_index(),
1456
        how="intersection",
1457
        keep_geom_type=True,
1458
    )
1459
1460
    logger.debug("Done!")
1461
1462
    return gdf
1463
1464
1465
@timer_func
1466
def add_overlay_id_to_buildings(
1467
    buildings_gdf: gpd.GeoDataFrame,
1468
    grid_federal_state_gdf: gpd.GeoDataFrame,
1469
) -> gpd.GeoDataFrame:
1470
    """
1471
    Add information about overlay ID to buildings.
1472
    Parameters
1473
    -----------
1474
    buildings_gdf : geopandas.GeoDataFrame
1475
        GeoDataFrame containing OSM buildings data.
1476
    grid_federal_state_gdf : geopandas.GeoDataFrame
1477
        GeoDataFrame with intersection shapes between counties and grid districts.
1478
    Returns
1479
    -------
1480
    geopandas.GeoDataFrame
1481
        GeoDataFrame containing OSM buildings data with overlay ID added.
1482
    """
1483
    gdf = (
1484
        buildings_gdf.to_crs(grid_federal_state_gdf.crs)
1485
        .sjoin(
1486
            grid_federal_state_gdf,
1487
            how="left",
1488
            predicate="intersects",
1489
        )
1490
        .rename(columns={"index_right": "overlay_id"})
1491
    )
1492
1493
    logger.debug("Added overlay ID to OSM buildings.")
1494
1495
    return gdf
1496
1497
1498
def drop_buildings_outside_grids(
1499
    buildings_gdf: gpd.GeoDataFrame,
1500
) -> gpd.GeoDataFrame:
1501
    """
1502
    Drop all buildings outside of grid areas.
1503
    Parameters
1504
    -----------
1505
    buildings_gdf : geopandas.GeoDataFrame
1506
        GeoDataFrame containing OSM buildings data.
1507
    Returns
1508
    -------
1509
    gepandas.GeoDataFrame
1510
        GeoDataFrame containing OSM buildings data
1511
        with buildings without an bus ID dropped.
1512
    """
1513
    gdf = buildings_gdf.loc[~buildings_gdf.bus_id.isna()]
1514
1515
    logger.debug(
1516
        f"{len(buildings_gdf) - len(gdf)} "
1517
        f"({(len(buildings_gdf) - len(gdf)) / len(buildings_gdf) * 100:g}%) "
1518
        f"of {len(buildings_gdf)} values are outside of the grid areas "
1519
        "and are therefore dropped."
1520
    )
1521
1522
    return gdf
1523
1524
1525
def buildings_area_per_overlay_id(
1526
    valid_buildings_gdf: gpd.GeoDataFrame,
1527
    grid_federal_state_gdf: gpd.GeoDataFrame,
1528
) -> pd.DataFrame:
1529
    """
1530
    Calculate total area of building per overlay ID.
1531
    TODO: This is very simplified at the moment. If possible add some kind
1532
     of weights to the area per building to differentiate between single-
1533
     family and multi-family houses as well as agricultural and industrial
1534
     buildings.
1535
    Parameters
1536
    -----------
1537
    valid_buildings_gdf : geopandas.GeoDataFrame
1538
        GeoDataFrame containing OSM buildings data with overlay ID added.
1539
    grid_federal_state_gdf : geopandas.GeoDataFrame
1540
        GeoDataFrame with intersection shapes between counties and grid districts.
1541
    Returns
1542
    -------
1543
    geopandas.GeoDataFrame
1544
        GeoDataFrame with grid data and total buildings area.
1545
    """
1546
    return grid_federal_state_gdf.merge(
1547
        valid_buildings_gdf[
1548
            [
1549
                "building_area",
1550
                "overlay_id",
1551
            ]
1552
        ]
1553
        .groupby("overlay_id")
1554
        .sum(),
1555
        how="left",
1556
        left_index=True,
1557
        right_index=True,
1558
    )
1559
1560
1561
def cap_per_bus_id(
1562
    overlay_gdf: gpd.GeoDataFrame,
1563
    scenario_df: pd.DataFrame,
1564
    conversion: int | float = 10**3,
1565
) -> pd.DataFrame:
1566
    """
1567
    Calculate total pv rooftop capacity per grid district dependent on
1568
    available buildings area.
1569
    TODO: This is very simplified at the moment as it assumes an evenly
1570
     distribution of generators depending on the available buildings
1571
     area per grid district.
1572
    Parameters
1573
    -----------
1574
    overlay_gdf : geopandas.GeoDataFrame
1575
        GeoDataFrame with grid data and total buildings area.
1576
    conversion : int, float
1577
        Conversion factor to match units. E.g. MW -> kW
1578
    Returns
1579
    -------
1580
    pandas.DataFrame
1581
        DataFrame with total rooftop capacity per mv grid.
1582
    """
1583
    overlay_gdf = overlay_gdf.assign(capacity=np.nan)
1584
1585
    for cap, nuts in scenario_df[["capacity", "nuts"]].itertuples(index=False):
1586
        nuts_gdf = overlay_gdf.loc[overlay_gdf.nuts == nuts]
1587
1588
        capacity = nuts_gdf.building_area.multiply(
1589
            cap / nuts_gdf.building_area.sum()
1590
        )
1591
1592
        overlay_gdf.loc[nuts_gdf.index] = overlay_gdf.loc[
1593
            nuts_gdf.index
1594
        ].assign(capacity=capacity.multiply(conversion).to_numpy())
1595
1596
    return overlay_gdf[["bus_id", "capacity"]].groupby("bus_id").sum()
1597
1598
1599
def determine_end_of_life_gens(
1600
    mastr_gdf: gpd.GeoDataFrame,
1601
    scenario_timestamp: pd.Timestamp,
1602
    pv_rooftop_lifetime: pd.Timedelta,
1603
) -> gpd.GeoDataFrame:
1604
    """
1605
    Determine if an old PV system has reached its end of life.
1606
    Parameters
1607
    -----------
1608
    mastr_gdf : geopandas.GeoDataFrame
1609
        GeoDataFrame containing geocoded MaStR data.
1610
    scenario_timestamp : pandas.Timestamp
1611
        Timestamp at which the scenario takes place.
1612
    pv_rooftop_lifetime : pandas.Timedelta
1613
        Average expected lifetime of PV rooftop systems.
1614
    Returns
1615
    -------
1616
    geopandas.GeoDataFrame
1617
        GeoDataFrame containing geocoded MaStR data and info if the system
1618
        has reached its end of life.
1619
    """
1620
    mastr_gdf = mastr_gdf.assign(
1621
        age=scenario_timestamp - mastr_gdf.start_up_date
1622
    )
1623
1624
    logger.debug("Determined if pv rooftop systems reached their end of life.")
1625
1626
    return mastr_gdf.assign(end_of_life=pv_rooftop_lifetime < mastr_gdf.age)
1627
1628
1629
def calculate_max_pv_cap_per_building(
1630
    buildings_gdf: gpd.GeoDataFrame,
1631
    mastr_gdf: gpd.GeoDataFrame,
1632
    pv_cap_per_sq_m: float | int,
1633
    roof_factor: float | int,
1634
) -> gpd.GeoDataFrame:
1635
    """
1636
    Calculate the estimated maximum possible PV capacity per building.
1637
    Parameters
1638
    -----------
1639
    buildings_gdf : geopandas.GeoDataFrame
1640
        GeoDataFrame containing OSM buildings data.
1641
    mastr_gdf : geopandas.GeoDataFrame
1642
        GeoDataFrame containing geocoded MaStR data.
1643
    pv_cap_per_sq_m : float, int
1644
        Average expected, installable PV capacity per square meter.
1645
    roof_factor : float, int
1646
        Average for PV usable roof area share.
1647
    Returns
1648
    -------
1649
    geopandas.GeoDataFrame
1650
        GeoDataFrame containing OSM buildings data with estimated maximum PV
1651
        capacity.
1652
    """
1653
    gdf = (
1654
        buildings_gdf.reset_index()
1655
        .merge(
1656
            mastr_gdf[
1657
                [
1658
                    "capacity",
1659
                    "end_of_life",
1660
                    "building_id",
1661
                    "EinheitlicheAusrichtungUndNeigungswinkel",
1662
                    "Hauptausrichtung",
1663
                    "HauptausrichtungNeigungswinkel",
1664
                ]
1665
            ],
1666
            how="left",
1667
            left_on="id",
1668
            right_on="building_id",
1669
        )
1670
        .set_index("id")
1671
        .drop(columns="building_id")
1672
    )
1673
1674
    return gdf.assign(
1675
        max_cap=gdf.building_area.multiply(roof_factor * pv_cap_per_sq_m),
1676
        end_of_life=gdf.end_of_life.fillna(True).astype(bool),
1677
        bus_id=gdf.bus_id.astype(int),
1678
    )
1679
1680
1681
def calculate_building_load_factor(
1682
    mastr_gdf: gpd.GeoDataFrame,
1683
    buildings_gdf: gpd.GeoDataFrame,
1684
    rounding: int = 4,
1685
) -> gpd.GeoDataFrame:
1686
    """
1687
    Calculate the roof load factor from existing PV systems.
1688
    Parameters
1689
    -----------
1690
    mastr_gdf : geopandas.GeoDataFrame
1691
        GeoDataFrame containing geocoded MaStR data.
1692
    buildings_gdf : geopandas.GeoDataFrame
1693
        GeoDataFrame containing OSM buildings data.
1694
    rounding : int
1695
        Rounding to use for load factor.
1696
    Returns
1697
    -------
1698
    geopandas.GeoDataFrame
1699
        GeoDataFrame containing geocoded MaStR data with calculated load factor.
1700
    """
1701
    gdf = mastr_gdf.merge(
1702
        buildings_gdf[["max_cap", "building_area"]].loc[
1703
            ~buildings_gdf["max_cap"].isna()
1704
        ],
1705
        how="left",
1706
        left_on="building_id",
1707
        right_index=True,
1708
    )
1709
1710
    return gdf.assign(load_factor=(gdf.capacity / gdf.max_cap).round(rounding))
1711
1712
1713
def get_probability_for_property(
1714
    mastr_gdf: gpd.GeoDataFrame,
1715
    cap_range: tuple[int | float, int | float],
1716
    prop: str,
1717
) -> tuple[np.array, np.array]:
1718
    """
1719
    Calculate the probability of the different options of a property of the
1720
    existing PV plants.
1721
    Parameters
1722
    -----------
1723
    mastr_gdf : geopandas.GeoDataFrame
1724
        GeoDataFrame containing geocoded MaStR data.
1725
    cap_range : tuple(int, int)
1726
        Capacity range of PV plants to look at.
1727
    prop : str
1728
        Property to calculate probabilities for. String needs to be in columns
1729
        of mastr_gdf.
1730
    Returns
1731
    -------
1732
    tuple
1733
        numpy.array
1734
            Unique values of property.
1735
        numpy.array
1736
            Probabilties per unique value.
1737
    """
1738
    cap_range_gdf = mastr_gdf.loc[
1739
        (mastr_gdf.capacity > cap_range[0])
1740
        & (mastr_gdf.capacity <= cap_range[1])
1741
    ]
1742
1743
    if prop == "load_factor":
1744
        cap_range_gdf = cap_range_gdf.loc[cap_range_gdf[prop] <= 1]
1745
1746
    count = Counter(
1747
        cap_range_gdf[prop].loc[
1748
            ~cap_range_gdf[prop].isna()
1749
            & ~cap_range_gdf[prop].isnull()
1750
            & ~(cap_range_gdf[prop] == "None")
1751
        ]
1752
    )
1753
1754
    values = np.array(list(count.keys()))
1755
    probabilities = np.fromiter(count.values(), dtype=float)
1756
    probabilities = probabilities / np.sum(probabilities)
1757
1758
    return values, probabilities
1759
1760
1761
@timer_func
1762
def probabilities(
1763
    mastr_gdf: gpd.GeoDataFrame,
1764
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1765
    properties: list[str] | None = None,
1766
) -> dict:
1767
    """
1768
    Calculate the probability of the different options of properties of the
1769
    existing PV plants.
1770
    Parameters
1771
    -----------
1772
    mastr_gdf : geopandas.GeoDataFrame
1773
        GeoDataFrame containing geocoded MaStR data.
1774
    cap_ranges : list(tuple(int, int))
1775
        List of capacity ranges to distinguish between. The first tuple should
1776
        start with a zero and the last one should end with infinite.
1777
    properties : list(str)
1778
        List of properties to calculate probabilities for. Strings needs to be
1779
        in columns of mastr_gdf.
1780
    Returns
1781
    -------
1782
    dict
1783
        Dictionary with values and probabilities per capacity range.
1784
    """
1785
    if cap_ranges is None:
1786
        cap_ranges = [
1787
            (0, 30),
1788
            (30, 100),
1789
            (100, float("inf")),
1790
        ]
1791
    if properties is None:
1792
        properties = [
1793
            "EinheitlicheAusrichtungUndNeigungswinkel",
1794
            "Hauptausrichtung",
1795
            "HauptausrichtungNeigungswinkel",
1796
            "load_factor",
1797
        ]
1798
1799
    prob_dict = {}
1800
1801
    for cap_range in cap_ranges:
1802
        prob_dict[cap_range] = {
1803
            "values": {},
1804
            "probabilities": {},
1805
        }
1806
1807
        for prop in properties:
1808
            v, p = get_probability_for_property(
1809
                mastr_gdf,
1810
                cap_range,
1811
                prop,
1812
            )
1813
1814
            prob_dict[cap_range]["values"][prop] = v
1815
            prob_dict[cap_range]["probabilities"][prop] = p
1816
1817
    return prob_dict
1818
1819
1820
def cap_share_per_cap_range(
1821
    mastr_gdf: gpd.GeoDataFrame,
1822
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1823
) -> dict[tuple[int | float, int | float], float]:
1824
    """
1825
    Calculate the share of PV capacity from the total PV capacity within
1826
    capacity ranges.
1827
    Parameters
1828
    -----------
1829
    mastr_gdf : geopandas.GeoDataFrame
1830
        GeoDataFrame containing geocoded MaStR data.
1831
    cap_ranges : list(tuple(int, int))
1832
        List of capacity ranges to distinguish between. The first tuple should
1833
        start with a zero and the last one should end with infinite.
1834
    Returns
1835
    -------
1836
    dict
1837
        Dictionary with share of PV capacity from the total PV capacity within
1838
        capacity ranges.
1839
    """
1840
    if cap_ranges is None:
1841
        cap_ranges = [
1842
            (0, 30),
1843
            (30, 100),
1844
            (100, float("inf")),
1845
        ]
1846
1847
    cap_share_dict = {}
1848
1849
    total_cap = mastr_gdf.capacity.sum()
1850
1851
    for cap_range in cap_ranges:
1852
        cap_share = (
1853
            mastr_gdf.loc[
1854
                (mastr_gdf.capacity > cap_range[0])
1855
                & (mastr_gdf.capacity <= cap_range[1])
1856
            ].capacity.sum()
1857
            / total_cap
1858
        )
1859
1860
        cap_share_dict[cap_range] = cap_share
1861
1862
    return cap_share_dict
1863
1864
1865
def mean_load_factor_per_cap_range(
1866
    mastr_gdf: gpd.GeoDataFrame,
1867
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1868
) -> dict[tuple[int | float, int | float], float]:
1869
    """
1870
    Calculate the mean roof load factor per capacity range from existing PV
1871
    plants.
1872
    Parameters
1873
    -----------
1874
    mastr_gdf : geopandas.GeoDataFrame
1875
        GeoDataFrame containing geocoded MaStR data.
1876
    cap_ranges : list(tuple(int, int))
1877
        List of capacity ranges to distinguish between. The first tuple should
1878
        start with a zero and the last one should end with infinite.
1879
    Returns
1880
    -------
1881
    dict
1882
        Dictionary with mean roof load factor per capacity range.
1883
    """
1884
    if cap_ranges is None:
1885
        cap_ranges = [
1886
            (0, 30),
1887
            (30, 100),
1888
            (100, float("inf")),
1889
        ]
1890
1891
    load_factor_dict = {}
1892
1893
    for cap_range in cap_ranges:
1894
        load_factor = mastr_gdf.loc[
1895
            (mastr_gdf.load_factor <= 1)
1896
            & (mastr_gdf.capacity > cap_range[0])
1897
            & (mastr_gdf.capacity <= cap_range[1])
1898
        ].load_factor.mean()
1899
1900
        load_factor_dict[cap_range] = load_factor
1901
1902
    return load_factor_dict
1903
1904
1905
def building_area_range_per_cap_range(
1906
    mastr_gdf: gpd.GeoDataFrame,
1907
    cap_ranges: list[tuple[int | float, int | float]] | None = None,
1908
    min_building_size: int | float = 10.0,
1909
    upper_quantile: float = 0.95,
1910
    lower_quantile: float = 0.05,
1911
) -> dict[tuple[int | float, int | float], tuple[int | float, int | float]]:
1912
    """
1913
    Estimate normal building area range per capacity range.
1914
    Calculate the mean roof load factor per capacity range from existing PV
1915
    plants.
1916
    Parameters
1917
    -----------
1918
    mastr_gdf : geopandas.GeoDataFrame
1919
        GeoDataFrame containing geocoded MaStR data.
1920
    cap_ranges : list(tuple(int, int))
1921
        List of capacity ranges to distinguish between. The first tuple should
1922
        start with a zero and the last one should end with infinite.
1923
    min_building_size : int, float
1924
        Minimal building size to consider for PV plants.
1925
    upper_quantile : float
1926
        Upper quantile to estimate maximum building size per capacity range.
1927
    lower_quantile : float
1928
        Lower quantile to estimate minimum building size per capacity range.
1929
    Returns
1930
    -------
1931
    dict
1932
        Dictionary with estimated normal building area range per capacity
1933
        range.
1934
    """
1935
    if cap_ranges is None:
1936
        cap_ranges = [
1937
            (0, 30),
1938
            (30, 100),
1939
            (100, float("inf")),
1940
        ]
1941
1942
    building_area_range_dict = {}
1943
1944
    n_ranges = len(cap_ranges)
1945
1946
    for count, cap_range in enumerate(cap_ranges):
1947
        cap_range_gdf = mastr_gdf.loc[
1948
            (mastr_gdf.capacity > cap_range[0])
1949
            & (mastr_gdf.capacity <= cap_range[1])
1950
        ]
1951
1952
        if count == 0:
1953
            building_area_range_dict[cap_range] = (
1954
                min_building_size,
1955
                cap_range_gdf.building_area.quantile(upper_quantile),
1956
            )
1957
        elif count == n_ranges - 1:
1958
            building_area_range_dict[cap_range] = (
1959
                cap_range_gdf.building_area.quantile(lower_quantile),
1960
                float("inf"),
1961
            )
1962
        else:
1963
            building_area_range_dict[cap_range] = (
1964
                cap_range_gdf.building_area.quantile(lower_quantile),
1965
                cap_range_gdf.building_area.quantile(upper_quantile),
1966
            )
1967
1968
    values = list(building_area_range_dict.values())
1969
1970
    building_area_range_normed_dict = {}
1971
1972
    for count, (cap_range, (min_area, max_area)) in enumerate(
1973
        building_area_range_dict.items()
1974
    ):
1975
        if count == 0:
1976
            building_area_range_normed_dict[cap_range] = (
1977
                min_area,
1978
                np.mean((values[count + 1][0], max_area)),
1979
            )
1980
        elif count == n_ranges - 1:
1981
            building_area_range_normed_dict[cap_range] = (
1982
                np.mean((values[count - 1][1], min_area)),
1983
                max_area,
1984
            )
1985
        else:
1986
            building_area_range_normed_dict[cap_range] = (
1987
                np.mean((values[count - 1][1], min_area)),
1988
                np.mean((values[count + 1][0], max_area)),
1989
            )
1990
1991
    return building_area_range_normed_dict
1992
1993
1994
@timer_func
1995
def desaggregate_pv_in_mv_grid(
1996
    buildings_gdf: gpd.GeoDataFrame,
1997
    pv_cap: float | int,
1998
    **kwargs,
1999
) -> gpd.GeoDataFrame:
2000
    """
2001
    Desaggregate PV capacity on buildings within a given grid district.
2002
    Parameters
2003
    -----------
2004
    buildings_gdf : geopandas.GeoDataFrame
2005
        GeoDataFrame containing buildings within the grid district.
2006
    pv_cap : float, int
2007
        PV capacity to desaggregate.
2008
    Other Parameters
2009
    -----------
2010
    prob_dict : dict
2011
        Dictionary with values and probabilities per capacity range.
2012
    cap_share_dict : dict
2013
        Dictionary with share of PV capacity from the total PV capacity within
2014
        capacity ranges.
2015
    building_area_range_dict : dict
2016
        Dictionary with estimated normal building area range per capacity
2017
        range.
2018
    load_factor_dict : dict
2019
        Dictionary with mean roof load factor per capacity range.
2020
    seed : int
2021
        Seed to use for random operations with NumPy and pandas.
2022
    pv_cap_per_sq_m : float, int
2023
        Average expected, installable PV capacity per square meter.
2024
    Returns
2025
    -------
2026
    geopandas.GeoDataFrame
2027
        GeoDataFrame containing OSM building data with desaggregated PV
2028
        plants.
2029
    """
2030
    bus_id = int(buildings_gdf.bus_id.iat[0])
2031
2032
    rng = default_rng(seed=kwargs["seed"])
2033
    random_state = RandomState(seed=kwargs["seed"])
2034
2035
    results_df = pd.DataFrame(columns=buildings_gdf.columns)
2036
2037
    for cap_range, share in kwargs["cap_share_dict"].items():
2038
        pv_cap_range = pv_cap * share
2039
2040
        b_area_min, b_area_max = kwargs["building_area_range_dict"][cap_range]
2041
2042
        cap_range_buildings_gdf = buildings_gdf.loc[
2043
            ~buildings_gdf.index.isin(results_df.index)
2044
            & (buildings_gdf.building_area > b_area_min)
2045
            & (buildings_gdf.building_area <= b_area_max)
2046
        ]
2047
2048
        mean_load_factor = kwargs["load_factor_dict"][cap_range]
2049
        cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2050
            mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2051
            load_factor=np.nan,
2052
            capacity=np.nan,
2053
        )
2054
2055
        total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2056
2057
        if total_mean_cap == 0:
2058
            logger.warning(
2059
                f"There are no matching roof for capacity range {cap_range} "
2060
                f"kW in grid {bus_id}. Using all buildings as fallback."
2061
            )
2062
2063
            cap_range_buildings_gdf = buildings_gdf.loc[
2064
                ~buildings_gdf.index.isin(results_df.index)
2065
            ]
2066
2067
            if len(cap_range_buildings_gdf) == 0:
2068
                logger.warning(
2069
                    "There are no roofes available for capacity range "
2070
                    f"{cap_range} kW in grid {bus_id}. Allowing dual use."
2071
                )
2072
                cap_range_buildings_gdf = buildings_gdf.copy()
2073
2074
            cap_range_buildings_gdf = cap_range_buildings_gdf.assign(
2075
                mean_cap=cap_range_buildings_gdf.max_cap * mean_load_factor,
2076
                load_factor=np.nan,
2077
                capacity=np.nan,
2078
            )
2079
2080
            total_mean_cap = cap_range_buildings_gdf.mean_cap.sum()
2081
2082
        elif total_mean_cap < pv_cap_range:
2083
            logger.warning(
2084
                f"Average roof utilization of the roof area in grid {bus_id} "
2085
                f"and capacity range {cap_range} kW is not sufficient. The "
2086
                "roof utilization will be above average."
2087
            )
2088
2089
        frac = max(
2090
            pv_cap_range / total_mean_cap,
2091
            1 / len(cap_range_buildings_gdf),
2092
        )
2093
2094
        samples_gdf = cap_range_buildings_gdf.sample(
2095
            frac=min(1, frac),
2096
            random_state=random_state,
2097
        )
2098
2099
        cap_range_dict = kwargs["prob_dict"][cap_range]
2100
2101
        values_dict = cap_range_dict["values"]
2102
        p_dict = cap_range_dict["probabilities"]
2103
2104
        load_factors = rng.choice(
2105
            a=values_dict["load_factor"],
2106
            size=len(samples_gdf),
2107
            p=p_dict["load_factor"],
2108
        )
2109
2110
        samples_gdf = samples_gdf.assign(
2111
            load_factor=load_factors,
2112
            capacity=samples_gdf.building_area
2113
            * load_factors
2114
            * kwargs["pv_cap_per_sq_m"],
2115
        )
2116
2117
        missing_factor = pv_cap_range / samples_gdf.capacity.sum()
2118
2119
        samples_gdf = samples_gdf.assign(
2120
            capacity=(samples_gdf.capacity * missing_factor),
2121
            load_factor=(samples_gdf.load_factor * missing_factor),
2122
        )
2123
2124
        assert np.isclose(
2125
            samples_gdf.capacity.sum(),
2126
            pv_cap_range,
2127
        ), f"{samples_gdf.capacity.sum()} != {pv_cap_range}"
2128
2129
        results_df = pd.concat(
2130
            [
2131
                results_df,
2132
                samples_gdf,
2133
            ],
2134
        )
2135
2136
    assert np.isclose(
2137
        results_df.capacity.sum(),
2138
        pv_cap,
2139
    ), f"{results_df.capacity.sum()} != {pv_cap}"
2140
2141
    return gpd.GeoDataFrame(
2142
        results_df,
2143
        crs=samples_gdf.crs,
0 ignored issues
show
introduced by
The variable samples_gdf does not seem to be defined in case the for loop on line 2037 is not entered. Are you sure this can never be the case?
Loading history...
2144
        geometry="geom",
2145
    )
2146
2147
2148
@timer_func
2149
def desaggregate_pv(
2150
    buildings_gdf: gpd.GeoDataFrame,
2151
    cap_df: pd.DataFrame,
2152
    **kwargs,
2153
) -> gpd.GeoDataFrame:
2154
    """
2155
    Desaggregate PV capacity on buildings within a given grid district.
2156
    Parameters
2157
    -----------
2158
    buildings_gdf : geopandas.GeoDataFrame
2159
        GeoDataFrame containing OSM buildings data.
2160
    cap_df : pandas.DataFrame
2161
        DataFrame with total rooftop capacity per mv grid.
2162
    Other Parameters
2163
    -----------
2164
    prob_dict : dict
2165
        Dictionary with values and probabilities per capacity range.
2166
    cap_share_dict : dict
2167
        Dictionary with share of PV capacity from the total PV capacity within
2168
        capacity ranges.
2169
    building_area_range_dict : dict
2170
        Dictionary with estimated normal building area range per capacity
2171
        range.
2172
    load_factor_dict : dict
2173
        Dictionary with mean roof load factor per capacity range.
2174
    seed : int
2175
        Seed to use for random operations with NumPy and pandas.
2176
    pv_cap_per_sq_m : float, int
2177
        Average expected, installable PV capacity per square meter.
2178
    Returns
2179
    -------
2180
    geopandas.GeoDataFrame
2181
        GeoDataFrame containing OSM building data with desaggregated PV
2182
        plants.
2183
    """
2184
    allocated_buildings_gdf = buildings_gdf.loc[~buildings_gdf.end_of_life]
2185
2186
    for bus_id in buildings_gdf.bus_id.unique():
2187
        buildings_grid_gdf = buildings_gdf.loc[buildings_gdf.bus_id == bus_id]
2188
2189
        pv_installed_gdf = buildings_grid_gdf.loc[
2190
            ~buildings_grid_gdf.end_of_life
2191
        ]
2192
2193
        pv_installed = pv_installed_gdf.capacity.sum()
2194
2195
        pot_buildings_gdf = buildings_grid_gdf.loc[
2196
            ~buildings_grid_gdf.index.isin(pv_installed_gdf.index)
2197
        ]
2198
2199
        if len(pot_buildings_gdf) == 0:
2200
            logger.error(
2201
                f"In grid {bus_id} there are no potential buildings to allocate "
2202
                "PV capacity to. The grid is skipped. This message should only "
2203
                "appear doing test runs with few buildings."
2204
            )
2205
2206
            continue
2207
2208
        pv_target = cap_df.at[
2209
            bus_id,
2210
            "capacity",
2211
        ]
2212
2213
        pv_missing = pv_target - pv_installed
2214
2215
        if pv_missing <= 0:
2216
            logger.info(
2217
                f"In grid {bus_id} there is more PV installed in status Quo than"
2218
                " allocated within the scenario. No new generators are added."
2219
            )
2220
2221
            continue
2222
2223
        if pot_buildings_gdf.max_cap.sum() < pv_missing:
2224
            logger.error(
2225
                f"In grid {bus_id} there is less PV potential ("
2226
                f"{pot_buildings_gdf.max_cap.sum():g} kW) than allocated PV "
2227
                f"capacity ({pv_missing:g} kW). The grid is skipped. This message "
2228
                "should only appear doing test runs with few buildings."
2229
            )
2230
2231
            continue
2232
2233
        gdf = desaggregate_pv_in_mv_grid(
2234
            buildings_gdf=pot_buildings_gdf,
2235
            pv_cap=pv_missing,
2236
            **kwargs,
2237
        )
2238
2239
        allocated_buildings_gdf = pd.concat(
2240
            [
2241
                allocated_buildings_gdf,
2242
                gdf,
2243
            ]
2244
        )
2245
2246
    logger.debug("Desaggregated scenario.")
2247
2248
    return gpd.GeoDataFrame(
2249
        allocated_buildings_gdf,
2250
        crs=gdf.crs,
0 ignored issues
show
introduced by
The variable gdf does not seem to be defined for all execution paths.
Loading history...
2251
        geometry="geom",
2252
    )
2253
2254
2255
@timer_func
2256
def add_buildings_meta_data(
2257
    buildings_gdf: gpd.GeoDataFrame,
2258
    prob_dict: dict,
2259
    seed: int,
2260
) -> gpd.GeoDataFrame:
2261
    """
2262
    Randomly add additional metadata to desaggregated PV plants.
2263
    Parameters
2264
    -----------
2265
    buildings_gdf : geopandas.GeoDataFrame
2266
        GeoDataFrame containing OSM buildings data with desaggregated PV
2267
        plants.
2268
    prob_dict : dict
2269
        Dictionary with values and probabilities per capacity range.
2270
    seed : int
2271
        Seed to use for random operations with NumPy and pandas.
2272
    Returns
2273
    -------
2274
    geopandas.GeoDataFrame
2275
        GeoDataFrame containing OSM building data with desaggregated PV
2276
        plants.
2277
    """
2278
    rng = default_rng(seed=seed)
2279
    buildings_gdf = buildings_gdf.reset_index().rename(
2280
        columns={
2281
            "index": "building_id",
2282
        }
2283
    )
2284
2285
    for (min_cap, max_cap), cap_range_prob_dict in prob_dict.items():
2286
        cap_range_gdf = buildings_gdf.loc[
2287
            (buildings_gdf.capacity >= min_cap)
2288
            & (buildings_gdf.capacity < max_cap)
2289
        ]
2290
2291
        for key, values in cap_range_prob_dict["values"].items():
2292
            if key == "load_factor":
2293
                continue
2294
2295
            gdf = cap_range_gdf.loc[
2296
                cap_range_gdf[key].isna()
2297
                | cap_range_gdf[key].isnull()
2298
                | (cap_range_gdf[key] == "None")
2299
            ]
2300
2301
            key_vals = rng.choice(
2302
                a=values,
2303
                size=len(gdf),
2304
                p=cap_range_prob_dict["probabilities"][key],
2305
            )
2306
2307
            buildings_gdf.loc[gdf.index, key] = key_vals
2308
2309
    return buildings_gdf
2310
2311
2312
def add_voltage_level(
2313
    buildings_gdf: gpd.GeoDataFrame,
2314
) -> gpd.GeoDataFrame:
2315
    """
2316
    Add voltage level derived from generator capacity to the power plants.
2317
    Parameters
2318
    -----------
2319
    buildings_gdf : geopandas.GeoDataFrame
2320
        GeoDataFrame containing OSM buildings data with desaggregated PV
2321
        plants.
2322
    Returns
2323
    -------
2324
    geopandas.GeoDataFrame
2325
        GeoDataFrame containing OSM building data with voltage level per generator.
2326
    """
2327
2328
    def voltage_levels(p: float) -> int:
2329
        if p < 100:
2330
            return 7
2331
        elif p < 200:
2332
            return 6
2333
        elif p < 5500:
2334
            return 5
2335
        elif p < 20000:
2336
            return 4
2337
        elif p < 120000:
2338
            return 3
2339
        return 1
2340
2341
    return buildings_gdf.assign(
2342
        voltage_level=buildings_gdf.capacity.apply(voltage_levels)
2343
    )
2344
2345
2346
def add_start_up_date(
2347
    buildings_gdf: gpd.GeoDataFrame,
2348
    start: pd.Timestamp,
2349
    end: pd.Timestamp,
2350
    seed: int,
2351
):
2352
    """
2353
    Randomly and linear add start-up date to new pv generators.
2354
    Parameters
2355
    ----------
2356
    buildings_gdf : geopandas.GeoDataFrame
2357
        GeoDataFrame containing OSM buildings data with desaggregated PV
2358
        plants.
2359
    start : pandas.Timestamp
2360
        Minimum Timestamp to use.
2361
    end : pandas.Timestamp
2362
        Maximum Timestamp to use.
2363
    seed : int
2364
        Seed to use for random operations with NumPy and pandas.
2365
    Returns
2366
    -------
2367
    geopandas.GeoDataFrame
2368
        GeoDataFrame containing OSM buildings data with start-up date added.
2369
    """
2370
    rng = default_rng(seed=seed)
2371
2372
    date_range = pd.date_range(start=start, end=end, freq="1D")
2373
2374
    return buildings_gdf.assign(
2375
        start_up_date=rng.choice(date_range, size=len(buildings_gdf))
2376
    )
2377
2378
2379
@timer_func
2380
def allocate_scenarios(
2381
    mastr_gdf: gpd.GeoDataFrame,
2382
    buildings_gdf: gpd.GeoDataFrame,
2383
    last_scenario_gdf: gpd.GeoDataFrame,
2384
    scenario: str,
2385
):
2386
    """
2387
    Desaggregate and allocate scenario pv rooftop ramp-ups onto buildings.
2388
    Parameters
2389
    ----------
2390
    mastr_gdf : geopandas.GeoDataFrame
2391
        GeoDataFrame containing geocoded MaStR data.
2392
    buildings_gdf : geopandas.GeoDataFrame
2393
        GeoDataFrame containing OSM buildings data.
2394
    last_scenario_gdf : geopandas.GeoDataFrame
2395
        GeoDataFrame containing OSM buildings matched with pv generators from temporal
2396
        preceding scenario.
2397
    scenario : str
2398
        Scenario to desaggrgate and allocate.
2399
    Returns
2400
    -------
2401
    tuple
2402
        geopandas.GeoDataFrame
2403
            GeoDataFrame containing OSM buildings matched with pv generators.
2404
        pandas.DataFrame
2405
            DataFrame containingpv rooftop capacity per grid id.
2406
    """
2407
    grid_districts_gdf = grid_districts(EPSG)
2408
2409
    federal_state_gdf = federal_state_data(grid_districts_gdf.crs)
2410
2411
    grid_federal_state_gdf = overlay_grid_districts_with_counties(
2412
        grid_districts_gdf,
2413
        federal_state_gdf,
2414
    )
2415
2416
    buildings_overlay_gdf = add_overlay_id_to_buildings(
2417
        buildings_gdf,
2418
        grid_federal_state_gdf,
2419
    )
2420
2421
    valid_buildings_gdf = drop_buildings_outside_grids(buildings_overlay_gdf)
2422
2423
    buildings_area_per_overlay_gdf = buildings_area_per_overlay_id(
2424
        valid_buildings_gdf,
2425
        grid_federal_state_gdf,
2426
    )
2427
2428
    cap_per_bus_id_df = cap_per_bus_id(
2429
        buildings_area_per_overlay_gdf,
2430
        scenario_data(CARRIER, scenario),
2431
    )
2432
2433
    last_scenario_gdf = determine_end_of_life_gens(
2434
        last_scenario_gdf,
2435
        SCENARIO_TIMESTAMP[scenario],
2436
        PV_ROOFTOP_LIFETIME,
2437
    )
2438
2439
    buildings_gdf = calculate_max_pv_cap_per_building(
2440
        valid_buildings_gdf,
2441
        last_scenario_gdf,
2442
        PV_CAP_PER_SQ_M,
2443
        ROOF_FACTOR,
2444
    )
2445
2446
    mastr_gdf = calculate_building_load_factor(
2447
        mastr_gdf,
2448
        buildings_gdf,
2449
    )
2450
2451
    probabilities_dict = probabilities(
2452
        mastr_gdf,
2453
        cap_ranges=CAP_RANGES,
2454
    )
2455
2456
    cap_share_dict = cap_share_per_cap_range(
2457
        mastr_gdf,
2458
        cap_ranges=CAP_RANGES,
2459
    )
2460
2461
    load_factor_dict = mean_load_factor_per_cap_range(
2462
        mastr_gdf,
2463
        cap_ranges=CAP_RANGES,
2464
    )
2465
2466
    building_area_range_dict = building_area_range_per_cap_range(
2467
        mastr_gdf,
2468
        cap_ranges=CAP_RANGES,
2469
        min_building_size=MIN_BUILDING_SIZE,
2470
        upper_quantile=UPPER_QUNATILE,
2471
        lower_quantile=LOWER_QUANTILE,
2472
    )
2473
2474
    allocated_buildings_gdf = desaggregate_pv(
2475
        buildings_gdf=buildings_gdf,
2476
        cap_df=cap_per_bus_id_df,
2477
        prob_dict=probabilities_dict,
2478
        cap_share_dict=cap_share_dict,
2479
        building_area_range_dict=building_area_range_dict,
2480
        load_factor_dict=load_factor_dict,
2481
        seed=SEED,
2482
        pv_cap_per_sq_m=PV_CAP_PER_SQ_M,
2483
    )
2484
2485
    allocated_buildings_gdf = allocated_buildings_gdf.assign(scenario=scenario)
2486
2487
    meta_buildings_gdf = frame_to_numeric(
2488
        add_buildings_meta_data(
2489
            allocated_buildings_gdf,
2490
            probabilities_dict,
2491
            SEED,
2492
        )
2493
    )
2494
2495
    return (
2496
        add_start_up_date(
2497
            meta_buildings_gdf,
2498
            start=last_scenario_gdf.start_up_date.max(),
2499
            end=SCENARIO_TIMESTAMP[scenario],
2500
            seed=SEED,
2501
        ),
2502
        cap_per_bus_id_df,
2503
    )
2504
2505
2506
class EgonPowerPlantPvRoofBuildingScenario(Base):
2507
    __tablename__ = "egon_power_plants_pv_roof_building"
2508
    __table_args__ = {"schema": "supply"}
2509
2510
    index = Column(Integer, primary_key=True, index=True)
2511
    scenario = Column(String)
2512
    building_id = Column(Integer)
2513
    gens_id = Column(String, nullable=True)
2514
    capacity = Column(Float)
2515
    einheitliche_ausrichtung_und_neigungswinkel = Column(Float)
2516
    hauptausrichtung = Column(String)
2517
    hauptausrichtung_neigungswinkel = Column(String)
2518
    voltage_level = Column(Integer)
2519
2520
2521
def create_scenario_table(buildings_gdf):
2522
    """Create mapping table pv_unit <-> building for scenario"""
2523
    EgonPowerPlantPvRoofBuildingScenario.__table__.drop(
2524
        bind=engine, checkfirst=True
2525
    )
2526
    EgonPowerPlantPvRoofBuildingScenario.__table__.create(
2527
        bind=engine, checkfirst=True
2528
    )
2529
2530
    buildings_gdf.rename(columns=COLS_TO_RENAME)[
2531
        COLS_TO_EXPORT
2532
    ].reset_index().to_sql(
2533
        name=EgonPowerPlantPvRoofBuildingScenario.__table__.name,
2534
        schema=EgonPowerPlantPvRoofBuildingScenario.__table__.schema,
2535
        con=db.engine(),
2536
        if_exists="append",
2537
        index=False,
2538
    )
2539
2540
2541
def geocode_mastr_data():
2542
    """
2543
    Read PV rooftop data from MaStR CSV
2544
    TODO: the source will be replaced as soon as the MaStR data is available
2545
     in DB.
2546
    """
2547
    mastr_df = mastr_data(
2548
        MASTR_INDEX_COL,
2549
        MASTR_RELEVANT_COLS,
2550
        MASTR_DTYPES,
2551
        MASTR_PARSE_DATES,
2552
    )
2553
2554
    clean_mastr_df = clean_mastr_data(
2555
        mastr_df,
2556
        max_realistic_pv_cap=MAX_REALISTIC_PV_CAP,
2557
        min_realistic_pv_cap=MIN_REALISTIC_PV_CAP,
2558
        seed=SEED,
2559
        rounding=ROUNDING,
2560
    )
2561
2562
    geocoding_df = geocoding_data(clean_mastr_df)
2563
2564
    ratelimiter = geocoder(USER_AGENT, MIN_DELAY_SECONDS)
2565
2566
    geocode_gdf = geocode_data(geocoding_df, ratelimiter, EPSG)
2567
2568
    create_geocoded_table(geocode_gdf)
2569
2570
2571
def pv_rooftop_to_buildings():
2572
    """Main script, executed as task"""
2573
2574
    mastr_gdf = load_mastr_data()
2575
2576
    buildings_gdf = load_building_data()
2577
2578
    desagg_mastr_gdf, desagg_buildings_gdf = allocate_to_buildings(
2579
        mastr_gdf, buildings_gdf
2580
    )
2581
2582
    all_buildings_gdf = (
2583
        desagg_mastr_gdf.assign(scenario="status_quo")
2584
        .reset_index()
2585
        .rename(columns={"geometry": "geom", "EinheitMastrNummer": "gens_id"})
2586
    )
2587
2588
    scenario_buildings_gdf = all_buildings_gdf.copy()
2589
2590
    cap_per_bus_id_df = pd.DataFrame()
2591
2592
    for scenario in SCENARIOS:
2593
        logger.debug(f"Desaggregating scenario {scenario}.")
2594
        (
2595
            scenario_buildings_gdf,
2596
            cap_per_bus_id_scenario_df,
2597
        ) = allocate_scenarios(  # noqa: F841
2598
            desagg_mastr_gdf,
2599
            desagg_buildings_gdf,
2600
            scenario_buildings_gdf,
2601
            scenario,
2602
        )
2603
2604
        all_buildings_gdf = gpd.GeoDataFrame(
2605
            pd.concat(
2606
                [all_buildings_gdf, scenario_buildings_gdf], ignore_index=True
2607
            ),
2608
            crs=scenario_buildings_gdf.crs,
2609
            geometry="geom",
2610
        )
2611
2612
        cap_per_bus_id_df = pd.concat(
2613
            [cap_per_bus_id_df, cap_per_bus_id_scenario_df]
2614
        )
2615
2616
    # export scenario
2617
    create_scenario_table(add_voltage_level(all_buildings_gdf))
2618