| Conditions | 10 |
| Total Lines | 210 |
| Code Lines | 114 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like data.datasets.storages.allocate_pumped_hydro_sq() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """The central module containing all code dealing with power plant data. |
||
| 279 | def allocate_pumped_hydro_sq(scn_name): |
||
| 280 | """ |
||
| 281 | Allocate pumped hydro by mastr data only. Capacities outside |
||
| 282 | germany are assigned to foreign buses. Mastr dump 2024 is used. |
||
| 283 | No filter for commissioning is applied. |
||
| 284 | Parameters |
||
| 285 | ---------- |
||
| 286 | scn_name |
||
| 287 | |||
| 288 | Returns |
||
| 289 | ------- |
||
| 290 | |||
| 291 | """ |
||
| 292 | sources = config.datasets()["power_plants"]["sources"] |
||
| 293 | |||
| 294 | # Read-in data from MaStR |
||
| 295 | mastr_ph = pd.read_csv( |
||
| 296 | WORKING_DIR_MASTR_NEW / sources["mastr_storage"], |
||
| 297 | delimiter=",", |
||
| 298 | usecols=[ |
||
| 299 | "Nettonennleistung", |
||
| 300 | "EinheitMastrNummer", |
||
| 301 | "Kraftwerksnummer", |
||
| 302 | "Technologie", |
||
| 303 | "Postleitzahl", |
||
| 304 | "Laengengrad", |
||
| 305 | "Breitengrad", |
||
| 306 | "EinheitBetriebsstatus", |
||
| 307 | "LokationMastrNummer", |
||
| 308 | "Ort", |
||
| 309 | "Bundesland", |
||
| 310 | ], |
||
| 311 | dtype={"Postleitzahl": str}, |
||
| 312 | ) |
||
| 313 | |||
| 314 | if config.settings()["egon-data"]["--dataset-boundary"] == "Schleswig-Holstein": |
||
| 315 | # Filter for Schleswig-Holstein |
||
| 316 | mastr_ph = mastr_ph.loc[mastr_ph.Bundesland == "SchleswigHolstein"] |
||
| 317 | |||
| 318 | # Rename columns |
||
| 319 | mastr_ph = mastr_ph.rename( |
||
| 320 | columns={ |
||
| 321 | "Kraftwerksnummer": "bnetza_id", |
||
| 322 | "Technologie": "carrier", |
||
| 323 | "Postleitzahl": "plz", |
||
| 324 | "Ort": "city", |
||
| 325 | "Bundesland": "federal_state", |
||
| 326 | "Nettonennleistung": "el_capacity", |
||
| 327 | } |
||
| 328 | ) |
||
| 329 | |||
| 330 | # Select only pumped hydro units |
||
| 331 | mastr_ph = mastr_ph.loc[mastr_ph.carrier == "Pumpspeicher"] |
||
| 332 | |||
| 333 | # Select only pumped hydro units which are in operation |
||
| 334 | mastr_ph = mastr_ph.loc[mastr_ph.EinheitBetriebsstatus == "InBetrieb"] |
||
| 335 | |||
| 336 | # Calculate power in MW |
||
| 337 | mastr_ph.loc[:, "el_capacity"] *= 1e-3 |
||
| 338 | |||
| 339 | # Create geodataframe from long, lat |
||
| 340 | mastr_ph = gpd.GeoDataFrame( |
||
| 341 | mastr_ph, |
||
| 342 | geometry=gpd.points_from_xy( |
||
| 343 | mastr_ph["Laengengrad"], mastr_ph["Breitengrad"] |
||
| 344 | ), |
||
| 345 | crs="4326", |
||
| 346 | ) |
||
| 347 | |||
| 348 | # Identify pp without geocord |
||
| 349 | mastr_ph_nogeo = mastr_ph.loc[mastr_ph["Laengengrad"].isna()] |
||
| 350 | |||
| 351 | # Remove all PP without geocord (PP<= 30kW) |
||
| 352 | mastr_ph = mastr_ph.dropna(subset="Laengengrad") |
||
| 353 | |||
| 354 | # Get geometry of villages/cities with same name of pp with missing geocord |
||
| 355 | with session_scope() as session: |
||
| 356 | query = session.query( |
||
| 357 | Vg250GemClean.gen, Vg250GemClean.geometry |
||
| 358 | ).filter(Vg250GemClean.gen.in_(mastr_ph_nogeo.loc[:, "city"].unique())) |
||
| 359 | df_cities = gpd.read_postgis( |
||
| 360 | query.statement, |
||
| 361 | query.session.bind, |
||
| 362 | geom_col="geometry", |
||
| 363 | crs="4326", |
||
| 364 | ) |
||
| 365 | |||
| 366 | # Just take the first entry, inaccuracy is negligible as centroid is taken afterwards |
||
| 367 | df_cities = df_cities.drop_duplicates("gen", keep="first") |
||
| 368 | |||
| 369 | # Use the centroid instead of polygon of region |
||
| 370 | df_cities.loc[:, "geometry"] = df_cities["geometry"].centroid |
||
| 371 | |||
| 372 | # Add centroid geometry to pp without geometry |
||
| 373 | mastr_ph_nogeo = pd.merge( |
||
| 374 | left=df_cities, |
||
| 375 | right=mastr_ph_nogeo, |
||
| 376 | right_on="city", |
||
| 377 | left_on="gen", |
||
| 378 | how="inner", |
||
| 379 | ).drop("gen", axis=1) |
||
| 380 | |||
| 381 | mastr_ph = pd.concat([mastr_ph, mastr_ph_nogeo], axis=0) |
||
| 382 | |||
| 383 | # aggregate capacity per location |
||
| 384 | agg_cap = mastr_ph.groupby("geometry")["el_capacity"].sum() |
||
| 385 | |||
| 386 | # list mastr number by location |
||
| 387 | agg_mastr = mastr_ph.groupby("geometry")["EinheitMastrNummer"].apply(list) |
||
| 388 | |||
| 389 | # remove duplicates by location and keep only first |
||
| 390 | mastr_ph = mastr_ph.drop_duplicates(subset="geometry", keep="first").drop( |
||
| 391 | ["el_capacity", "EinheitMastrNummer"], axis=1 |
||
| 392 | ) |
||
| 393 | |||
| 394 | # Add aggregated capacity by location |
||
| 395 | mastr_ph = pd.merge( |
||
| 396 | left=mastr_ph, right=agg_cap, left_on="geometry", right_on="geometry" |
||
| 397 | ) |
||
| 398 | |||
| 399 | # Add list of mastr nr by location |
||
| 400 | mastr_ph = pd.merge( |
||
| 401 | left=mastr_ph, right=agg_mastr, left_on="geometry", right_on="geometry" |
||
| 402 | ) |
||
| 403 | |||
| 404 | # Drop small pp <= 03 kW |
||
| 405 | mastr_ph = mastr_ph.loc[mastr_ph["el_capacity"] > 30] |
||
| 406 | |||
| 407 | # Apply voltage level by capacity |
||
| 408 | mastr_ph = apply_voltage_level_thresholds(mastr_ph) |
||
| 409 | mastr_ph["voltage_level"] = mastr_ph["voltage_level"].astype(int) |
||
| 410 | |||
| 411 | # Capacity located outside germany -> will be assigned to foreign buses |
||
| 412 | mastr_ph_foreign = mastr_ph.loc[mastr_ph["federal_state"].isna()] |
||
| 413 | |||
| 414 | if not mastr_ph_foreign.empty: |
||
| 415 | # Get foreign buses |
||
| 416 | sql = f""" |
||
| 417 | SELECT * FROM grid.egon_etrago_bus |
||
| 418 | WHERE scn_name = '{scn_name}' |
||
| 419 | and country != 'DE' |
||
| 420 | """ |
||
| 421 | df_foreign_buses = db.select_geodataframe( |
||
| 422 | sql, geom_col="geom", epsg="4326" |
||
| 423 | ) |
||
| 424 | |||
| 425 | # Assign closest foreign bus at voltage level to foreign pp |
||
| 426 | nearest_neighbors = [] |
||
| 427 | for vl, v_nom in {1: 380, 2: 220, 3: 110}.items(): |
||
| 428 | ph = mastr_ph_foreign.loc[mastr_ph_foreign["voltage_level"] == vl] |
||
| 429 | if ph.empty: |
||
| 430 | continue |
||
| 431 | bus = df_foreign_buses.loc[ |
||
| 432 | df_foreign_buses["v_nom"] == v_nom, |
||
| 433 | ["v_nom", "country", "bus_id", "geom"], |
||
| 434 | ] |
||
| 435 | results = gpd.sjoin_nearest( |
||
| 436 | left_df=ph, right_df=bus, how="left", distance_col="distance" |
||
| 437 | ) |
||
| 438 | nearest_neighbors.append(results) |
||
| 439 | mastr_ph_foreign = pd.concat(nearest_neighbors) |
||
| 440 | |||
| 441 | # Keep only capacities within germany |
||
| 442 | mastr_ph = mastr_ph.dropna(subset="federal_state") |
||
| 443 | |||
| 444 | # Assign buses within germany |
||
| 445 | mastr_ph = assign_bus_id(mastr_ph, cfg=config.datasets()["power_plants"]) |
||
| 446 | mastr_ph["bus_id"] = mastr_ph["bus_id"].astype(int) |
||
| 447 | |||
| 448 | if not mastr_ph_foreign.empty: |
||
| 449 | # Merge foreign pp |
||
| 450 | mastr_ph = pd.concat([mastr_ph, mastr_ph_foreign]) |
||
| 451 | |||
| 452 | # Reduce to necessary columns |
||
| 453 | mastr_ph = mastr_ph[ |
||
| 454 | [ |
||
| 455 | "el_capacity", |
||
| 456 | "voltage_level", |
||
| 457 | "bus_id", |
||
| 458 | "geometry", |
||
| 459 | "EinheitMastrNummer", |
||
| 460 | ] |
||
| 461 | ] |
||
| 462 | |||
| 463 | # Rename and format columns |
||
| 464 | mastr_ph["carrier"] = "pumped_hydro" |
||
| 465 | mastr_ph = mastr_ph.rename( |
||
| 466 | columns={"EinheitMastrNummer": "source_id", "geometry": "geom"} |
||
| 467 | ) |
||
| 468 | mastr_ph["source_id"] = mastr_ph["source_id"].apply( |
||
| 469 | lambda x: {"MastrNummer": ", ".join(x)} |
||
| 470 | ) |
||
| 471 | mastr_ph = mastr_ph.set_geometry("geom") |
||
| 472 | mastr_ph["geom"] = mastr_ph["geom"].apply(lambda x: x.wkb_hex) |
||
| 473 | mastr_ph["scenario"] = "status2023" |
||
| 474 | mastr_ph["sources"] = [ |
||
| 475 | {"el_capacity": "MaStR aggregated by location"} |
||
| 476 | ] * mastr_ph.shape[0] |
||
| 477 | |||
| 478 | # Delete existing units in the target table |
||
| 479 | db.execute_sql( |
||
| 480 | f""" DELETE FROM supply.egon_storages |
||
| 481 | WHERE carrier = 'pumped_hydro' |
||
| 482 | AND scenario= '{scn_name}';""" |
||
| 483 | ) |
||
| 484 | |||
| 485 | with db.session_scope() as session: |
||
| 486 | session.bulk_insert_mappings( |
||
| 487 | EgonStorages, |
||
| 488 | mastr_ph.to_dict(orient="records"), |
||
| 489 | ) |
||
| 665 |