| Conditions | 9 |
| Total Lines | 265 |
| Code Lines | 152 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | """The central module containing all code dealing with power plant data. |
||
| 281 | def allocate_storage_units_sq(scn_name, storage_types): |
||
| 282 | """ |
||
| 283 | Allocate storage units by mastr data only. Capacities outside |
||
| 284 | germany are assigned to foreign buses. |
||
| 285 | |||
| 286 | Parameters |
||
| 287 | ---------- |
||
| 288 | scn_name: str |
||
| 289 | Scenario name |
||
| 290 | storage_types: list |
||
| 291 | contains all the required storage units carriers to be imported |
||
| 292 | |||
| 293 | Returns |
||
| 294 | ------- |
||
| 295 | |||
| 296 | """ |
||
| 297 | sources = config.datasets()["power_plants"]["sources"] |
||
| 298 | scn_parameters = get_sector_parameters("global", scn_name) |
||
| 299 | scenario_date_max = str(scn_parameters["weather_year"]) + "-12-31 23:59:00" |
||
| 300 | |||
| 301 | map_storage = { |
||
| 302 | "battery": "Batterie", |
||
| 303 | "pumped_hydro": "Pumpspeicher", |
||
| 304 | "compressed_air": "Druckluft", |
||
| 305 | "flywheel": "Schwungrad", |
||
| 306 | "other": "Sonstige", |
||
| 307 | } |
||
| 308 | |||
| 309 | for storage_type in storage_types: |
||
| 310 | # Read-in data from MaStR |
||
| 311 | mastr_ph = pd.read_csv( |
||
| 312 | WORKING_DIR_MASTR_NEW / sources["mastr_storage"], |
||
| 313 | delimiter=",", |
||
| 314 | usecols=[ |
||
| 315 | "Nettonennleistung", |
||
| 316 | "EinheitMastrNummer", |
||
| 317 | "Kraftwerksnummer", |
||
| 318 | "Technologie", |
||
| 319 | "Postleitzahl", |
||
| 320 | "Laengengrad", |
||
| 321 | "Breitengrad", |
||
| 322 | "EinheitBetriebsstatus", |
||
| 323 | "LokationMastrNummer", |
||
| 324 | "Ort", |
||
| 325 | "Bundesland", |
||
| 326 | "DatumEndgueltigeStilllegung", |
||
| 327 | "Inbetriebnahmedatum", |
||
| 328 | ], |
||
| 329 | dtype={"Postleitzahl": str}, |
||
| 330 | ) |
||
| 331 | |||
| 332 | # Rename columns |
||
| 333 | mastr_ph = mastr_ph.rename( |
||
| 334 | columns={ |
||
| 335 | "Kraftwerksnummer": "bnetza_id", |
||
| 336 | "Technologie": "carrier", |
||
| 337 | "Postleitzahl": "plz", |
||
| 338 | "Ort": "city", |
||
| 339 | "Bundesland": "federal_state", |
||
| 340 | "Nettonennleistung": "el_capacity", |
||
| 341 | "DatumEndgueltigeStilllegung": "decommissioning_date", |
||
| 342 | } |
||
| 343 | ) |
||
| 344 | |||
| 345 | # Select only the required type of storage |
||
| 346 | mastr_ph = mastr_ph.loc[mastr_ph.carrier == map_storage[storage_type]] |
||
| 347 | |||
| 348 | # Select only storage units in operation |
||
| 349 | mastr_ph.loc[ |
||
| 350 | mastr_ph["decommissioning_date"] > scenario_date_max, |
||
| 351 | "EinheitBetriebsstatus", |
||
| 352 | ] = "InBetrieb" |
||
| 353 | mastr_ph = mastr_ph.loc[ |
||
| 354 | mastr_ph.EinheitBetriebsstatus.isin( |
||
| 355 | ["InBetrieb", "VoruebergehendStillgelegt"] |
||
| 356 | ) |
||
| 357 | ] |
||
| 358 | |||
| 359 | # Select only storage units installed before scenario_date_max |
||
| 360 | mastr_ph = mastr_ph[ |
||
| 361 | pd.to_datetime(mastr_ph["Inbetriebnahmedatum"]) < scenario_date_max |
||
| 362 | ] |
||
| 363 | |||
| 364 | # Calculate power in MW |
||
| 365 | mastr_ph.loc[:, "el_capacity"] *= 1e-3 |
||
| 366 | |||
| 367 | # Create geodataframe from long, lat |
||
| 368 | mastr_ph = gpd.GeoDataFrame( |
||
| 369 | mastr_ph, |
||
| 370 | geometry=gpd.points_from_xy( |
||
| 371 | mastr_ph["Laengengrad"], mastr_ph["Breitengrad"] |
||
| 372 | ), |
||
| 373 | crs="4326", |
||
| 374 | ) |
||
| 375 | |||
| 376 | # Identify pp without geocord |
||
| 377 | mastr_ph_nogeo = mastr_ph.loc[mastr_ph["Laengengrad"].isna()] |
||
| 378 | |||
| 379 | # Remove all PP without geocord |
||
| 380 | mastr_ph = mastr_ph.dropna(subset="Laengengrad") |
||
| 381 | |||
| 382 | # Get geometry of villages/cities with same name of pp with missing geocord |
||
| 383 | with session_scope() as session: |
||
| 384 | query = session.query(Vg250GemClean.gen, Vg250GemClean.geometry) |
||
| 385 | df_cities = gpd.read_postgis( |
||
| 386 | query.statement, |
||
| 387 | query.session.bind, |
||
| 388 | geom_col="geometry", |
||
| 389 | crs="3035", |
||
| 390 | ) |
||
| 391 | |||
| 392 | # Keep only useful cities |
||
| 393 | df_cities = df_cities[df_cities["gen"].isin(mastr_ph_nogeo["city"])] |
||
| 394 | |||
| 395 | # Just take the first entry, inaccuracy is negligible as centroid is taken afterwards |
||
| 396 | df_cities = df_cities.drop_duplicates("gen", keep="first") |
||
| 397 | |||
| 398 | # Use the centroid instead of polygon of region |
||
| 399 | df_cities.loc[:, "geometry"] = df_cities["geometry"].centroid |
||
| 400 | |||
| 401 | # Change coordinate system |
||
| 402 | df_cities.to_crs("4326", inplace=True) |
||
| 403 | |||
| 404 | # Add centroid geometry to pp without geometry |
||
| 405 | mastr_ph_nogeo = pd.merge( |
||
| 406 | left=df_cities, |
||
| 407 | right=mastr_ph_nogeo, |
||
| 408 | right_on="city", |
||
| 409 | left_on="gen", |
||
| 410 | suffixes=("", "_no-geo"), |
||
| 411 | how="inner", |
||
| 412 | ).drop("gen", axis=1) |
||
| 413 | |||
| 414 | mastr_ph = pd.concat([mastr_ph, mastr_ph_nogeo], axis=0) |
||
| 415 | |||
| 416 | # aggregate capacity per location |
||
| 417 | agg_cap = mastr_ph.groupby("geometry")["el_capacity"].sum() |
||
| 418 | |||
| 419 | # list mastr number by location |
||
| 420 | agg_mastr = mastr_ph.groupby("geometry")["EinheitMastrNummer"].apply( |
||
| 421 | list |
||
| 422 | ) |
||
| 423 | |||
| 424 | # remove duplicates by location |
||
| 425 | mastr_ph = mastr_ph.drop_duplicates( |
||
| 426 | subset="geometry", keep="first" |
||
| 427 | ).drop(["el_capacity", "EinheitMastrNummer"], axis=1) |
||
| 428 | |||
| 429 | # Adjust capacity |
||
| 430 | mastr_ph = pd.merge( |
||
| 431 | left=mastr_ph, |
||
| 432 | right=agg_cap, |
||
| 433 | left_on="geometry", |
||
| 434 | right_on="geometry", |
||
| 435 | ) |
||
| 436 | |||
| 437 | # Adjust capacity |
||
| 438 | mastr_ph = pd.merge( |
||
| 439 | left=mastr_ph, |
||
| 440 | right=agg_mastr, |
||
| 441 | left_on="geometry", |
||
| 442 | right_on="geometry", |
||
| 443 | ) |
||
| 444 | |||
| 445 | # Drop small pp <= 30 kW |
||
| 446 | mastr_ph = mastr_ph.loc[mastr_ph["el_capacity"] > 0.03] |
||
| 447 | |||
| 448 | # Apply voltage level by capacity |
||
| 449 | mastr_ph = apply_voltage_level_thresholds(mastr_ph) |
||
| 450 | mastr_ph["voltage_level"] = mastr_ph["voltage_level"].astype(int) |
||
| 451 | |||
| 452 | # Capacity located outside germany -> will be assigned to foreign buses |
||
| 453 | mastr_ph_foreign = mastr_ph.loc[mastr_ph["federal_state"].isna()] |
||
| 454 | |||
| 455 | # Keep only capacities within germany |
||
| 456 | mastr_ph = mastr_ph.dropna(subset="federal_state") |
||
| 457 | |||
| 458 | # Asign buses within germany |
||
| 459 | mastr_ph = assign_bus_id( |
||
| 460 | mastr_ph, cfg=config.datasets()["power_plants"], drop_missing=True |
||
| 461 | ) |
||
| 462 | mastr_ph["bus_id"] = mastr_ph["bus_id"].astype(int) |
||
| 463 | |||
| 464 | # Get foreign central buses |
||
| 465 | sql = f""" |
||
| 466 | SELECT * FROM grid.egon_etrago_bus |
||
| 467 | WHERE scn_name = '{scn_name}' |
||
| 468 | and country != 'DE' |
||
| 469 | """ |
||
| 470 | df_foreign_buses = db.select_geodataframe( |
||
| 471 | sql, geom_col="geom", epsg="4326" |
||
| 472 | ) |
||
| 473 | central_bus = entsoe_to_bus_etrago(scn_name).to_frame() |
||
| 474 | central_bus["geom"] = ( |
||
| 475 | df_foreign_buses.set_index("bus_id") |
||
| 476 | .loc[central_bus[0], "geom"] |
||
| 477 | .values |
||
| 478 | ) |
||
| 479 | df_foreign_buses = df_foreign_buses[ |
||
| 480 | df_foreign_buses["geom"].isin(central_bus["geom"]) |
||
| 481 | ] |
||
| 482 | |||
| 483 | if len(mastr_ph_foreign) > 0: |
||
| 484 | # Assign closest bus at voltage level to foreign pp |
||
| 485 | nearest_neighbors = [] |
||
| 486 | for vl, v_nom in {1: 380, 2: 220, 3: 110}.items(): |
||
| 487 | ph = mastr_ph_foreign.loc[ |
||
| 488 | mastr_ph_foreign["voltage_level"] == vl |
||
| 489 | ] |
||
| 490 | if ph.empty: |
||
| 491 | continue |
||
| 492 | bus = df_foreign_buses.loc[ |
||
| 493 | df_foreign_buses["v_nom"] == v_nom, |
||
| 494 | ["v_nom", "country", "bus_id", "geom"], |
||
| 495 | ] |
||
| 496 | results = gpd.sjoin_nearest( |
||
| 497 | left_df=ph, |
||
| 498 | right_df=bus, |
||
| 499 | how="left", |
||
| 500 | distance_col="distance", |
||
| 501 | ) |
||
| 502 | nearest_neighbors.append(results) |
||
| 503 | mastr_ph_foreign = pd.concat(nearest_neighbors) |
||
| 504 | |||
| 505 | # Merge foreign pp |
||
| 506 | mastr_ph = pd.concat([mastr_ph, mastr_ph_foreign]) |
||
| 507 | |||
| 508 | # Reduce to necessary columns |
||
| 509 | mastr_ph = mastr_ph[ |
||
| 510 | [ |
||
| 511 | "el_capacity", |
||
| 512 | "voltage_level", |
||
| 513 | "bus_id", |
||
| 514 | "geometry", |
||
| 515 | "EinheitMastrNummer", |
||
| 516 | ] |
||
| 517 | ] |
||
| 518 | |||
| 519 | # Rename and format columns |
||
| 520 | mastr_ph["carrier"] = storage_type |
||
| 521 | mastr_ph = mastr_ph.rename( |
||
| 522 | columns={"EinheitMastrNummer": "source_id", "geometry": "geom"} |
||
| 523 | ) |
||
| 524 | mastr_ph["source_id"] = mastr_ph["source_id"].apply( |
||
| 525 | lambda x: {"MastrNummer": ", ".join(x)} |
||
| 526 | ) |
||
| 527 | mastr_ph = mastr_ph.set_geometry("geom") |
||
| 528 | mastr_ph["geom"] = mastr_ph["geom"].apply(lambda x: x.wkb_hex) |
||
| 529 | mastr_ph["scenario"] = scn_name |
||
| 530 | mastr_ph["sources"] = [ |
||
| 531 | {"el_capacity": "MaStR aggregated by location"} |
||
| 532 | ] * mastr_ph.shape[0] |
||
| 533 | |||
| 534 | # Delete existing units in the target table |
||
| 535 | db.execute_sql( |
||
| 536 | f""" DELETE FROM supply.egon_storages |
||
| 537 | WHERE carrier = '{storage_type}' |
||
| 538 | AND scenario = '{scn_name}' |
||
| 539 | AND sources ->> 'el_capacity' = 'MaStR aggregated by location';""" |
||
| 540 | ) |
||
| 541 | |||
| 542 | with db.session_scope() as session: |
||
| 543 | session.bulk_insert_mappings( |
||
| 544 | EgonStorages, |
||
| 545 | mastr_ph.to_dict(orient="records"), |
||
| 546 | ) |
||
| 729 |