Conditions | 9 |
Total Lines | 265 |
Code Lines | 152 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | """The central module containing all code dealing with power plant data. |
||
317 | def allocate_storage_units_sq(scn_name, storage_types): |
||
318 | """ |
||
319 | Allocate storage units by mastr data only. Capacities outside |
||
320 | germany are assigned to foreign buses. |
||
321 | |||
322 | Parameters |
||
323 | ---------- |
||
324 | scn_name: str |
||
325 | Scenario name |
||
326 | storage_types: list |
||
327 | contains all the required storage units carriers to be imported |
||
328 | |||
329 | Returns |
||
330 | ------- |
||
331 | |||
332 | """ |
||
333 | sources = config.datasets()["power_plants"]["sources"] |
||
334 | scn_parameters = get_sector_parameters("global", scn_name) |
||
335 | scenario_date_max = str(scn_parameters["weather_year"]) + "-12-31 23:59:00" |
||
336 | |||
337 | map_storage = { |
||
338 | "battery": "Batterie", |
||
339 | "pumped_hydro": "Pumpspeicher", |
||
340 | "compressed_air": "Druckluft", |
||
341 | "flywheel": "Schwungrad", |
||
342 | "other": "Sonstige", |
||
343 | } |
||
344 | |||
345 | for storage_type in storage_types: |
||
346 | # Read-in data from MaStR |
||
347 | mastr_ph = pd.read_csv( |
||
348 | WORKING_DIR_MASTR_NEW / sources["mastr_storage"], |
||
349 | delimiter=",", |
||
350 | usecols=[ |
||
351 | "Nettonennleistung", |
||
352 | "EinheitMastrNummer", |
||
353 | "Kraftwerksnummer", |
||
354 | "Technologie", |
||
355 | "Postleitzahl", |
||
356 | "Laengengrad", |
||
357 | "Breitengrad", |
||
358 | "EinheitBetriebsstatus", |
||
359 | "LokationMastrNummer", |
||
360 | "Ort", |
||
361 | "Bundesland", |
||
362 | "DatumEndgueltigeStilllegung", |
||
363 | "Inbetriebnahmedatum", |
||
364 | ], |
||
365 | dtype={"Postleitzahl": str}, |
||
366 | ) |
||
367 | |||
368 | # Rename columns |
||
369 | mastr_ph = mastr_ph.rename( |
||
370 | columns={ |
||
371 | "Kraftwerksnummer": "bnetza_id", |
||
372 | "Technologie": "carrier", |
||
373 | "Postleitzahl": "plz", |
||
374 | "Ort": "city", |
||
375 | "Bundesland": "federal_state", |
||
376 | "Nettonennleistung": "el_capacity", |
||
377 | "DatumEndgueltigeStilllegung": "decommissioning_date", |
||
378 | } |
||
379 | ) |
||
380 | |||
381 | # Select only the required type of storage |
||
382 | mastr_ph = mastr_ph.loc[mastr_ph.carrier == map_storage[storage_type]] |
||
383 | |||
384 | # Select only storage units in operation |
||
385 | mastr_ph.loc[ |
||
386 | mastr_ph["decommissioning_date"] > scenario_date_max, |
||
387 | "EinheitBetriebsstatus", |
||
388 | ] = "InBetrieb" |
||
389 | mastr_ph = mastr_ph.loc[ |
||
390 | mastr_ph.EinheitBetriebsstatus.isin( |
||
391 | ["InBetrieb", "VoruebergehendStillgelegt"] |
||
392 | ) |
||
393 | ] |
||
394 | |||
395 | # Select only storage units installed before scenario_date_max |
||
396 | mastr_ph = mastr_ph[ |
||
397 | pd.to_datetime(mastr_ph["Inbetriebnahmedatum"]) < scenario_date_max |
||
398 | ] |
||
399 | |||
400 | # Calculate power in MW |
||
401 | mastr_ph.loc[:, "el_capacity"] *= 1e-3 |
||
402 | |||
403 | # Create geodataframe from long, lat |
||
404 | mastr_ph = gpd.GeoDataFrame( |
||
405 | mastr_ph, |
||
406 | geometry=gpd.points_from_xy( |
||
407 | mastr_ph["Laengengrad"], mastr_ph["Breitengrad"] |
||
408 | ), |
||
409 | crs="4326", |
||
410 | ) |
||
411 | |||
412 | # Identify pp without geocord |
||
413 | mastr_ph_nogeo = mastr_ph.loc[mastr_ph["Laengengrad"].isna()] |
||
414 | |||
415 | # Remove all PP without geocord |
||
416 | mastr_ph = mastr_ph.dropna(subset="Laengengrad") |
||
417 | |||
418 | # Get geometry of villages/cities with same name of pp with missing geocord |
||
419 | with session_scope() as session: |
||
420 | query = session.query(Vg250GemClean.gen, Vg250GemClean.geometry) |
||
421 | df_cities = gpd.read_postgis( |
||
422 | query.statement, |
||
423 | query.session.bind, |
||
424 | geom_col="geometry", |
||
425 | crs="3035", |
||
426 | ) |
||
427 | |||
428 | # Keep only useful cities |
||
429 | df_cities = df_cities[df_cities["gen"].isin(mastr_ph_nogeo["city"])] |
||
430 | |||
431 | # Just take the first entry, inaccuracy is negligible as centroid is taken afterwards |
||
432 | df_cities = df_cities.drop_duplicates("gen", keep="first") |
||
433 | |||
434 | # Use the centroid instead of polygon of region |
||
435 | df_cities.loc[:, "geometry"] = df_cities["geometry"].centroid |
||
436 | |||
437 | # Change coordinate system |
||
438 | df_cities.to_crs("4326", inplace=True) |
||
439 | |||
440 | # Add centroid geometry to pp without geometry |
||
441 | mastr_ph_nogeo = pd.merge( |
||
442 | left=df_cities, |
||
443 | right=mastr_ph_nogeo, |
||
444 | right_on="city", |
||
445 | left_on="gen", |
||
446 | suffixes=("", "_no-geo"), |
||
447 | how="inner", |
||
448 | ).drop("gen", axis=1) |
||
449 | |||
450 | mastr_ph = pd.concat([mastr_ph, mastr_ph_nogeo], axis=0) |
||
451 | |||
452 | # aggregate capacity per location |
||
453 | agg_cap = mastr_ph.groupby("geometry")["el_capacity"].sum() |
||
454 | |||
455 | # list mastr number by location |
||
456 | agg_mastr = mastr_ph.groupby("geometry")["EinheitMastrNummer"].apply( |
||
457 | list |
||
458 | ) |
||
459 | |||
460 | # remove duplicates by location |
||
461 | mastr_ph = mastr_ph.drop_duplicates( |
||
462 | subset="geometry", keep="first" |
||
463 | ).drop(["el_capacity", "EinheitMastrNummer"], axis=1) |
||
464 | |||
465 | # Adjust capacity |
||
466 | mastr_ph = pd.merge( |
||
467 | left=mastr_ph, |
||
468 | right=agg_cap, |
||
469 | left_on="geometry", |
||
470 | right_on="geometry", |
||
471 | ) |
||
472 | |||
473 | # Adjust capacity |
||
474 | mastr_ph = pd.merge( |
||
475 | left=mastr_ph, |
||
476 | right=agg_mastr, |
||
477 | left_on="geometry", |
||
478 | right_on="geometry", |
||
479 | ) |
||
480 | |||
481 | # Drop small pp <= 30 kW |
||
482 | mastr_ph = mastr_ph.loc[mastr_ph["el_capacity"] > 0.03] |
||
483 | |||
484 | # Apply voltage level by capacity |
||
485 | mastr_ph = apply_voltage_level_thresholds(mastr_ph) |
||
486 | mastr_ph["voltage_level"] = mastr_ph["voltage_level"].astype(int) |
||
487 | |||
488 | # Capacity located outside germany -> will be assigned to foreign buses |
||
489 | mastr_ph_foreign = mastr_ph.loc[mastr_ph["federal_state"].isna()] |
||
490 | |||
491 | # Keep only capacities within germany |
||
492 | mastr_ph = mastr_ph.dropna(subset="federal_state") |
||
493 | |||
494 | # Asign buses within germany |
||
495 | mastr_ph = assign_bus_id( |
||
496 | mastr_ph, cfg=config.datasets()["power_plants"], drop_missing=True |
||
497 | ) |
||
498 | mastr_ph["bus_id"] = mastr_ph["bus_id"].astype(int) |
||
499 | |||
500 | # Get foreign central buses |
||
501 | sql = f""" |
||
502 | SELECT * FROM grid.egon_etrago_bus |
||
503 | WHERE scn_name = '{scn_name}' |
||
504 | and country != 'DE' |
||
505 | """ |
||
506 | df_foreign_buses = db.select_geodataframe( |
||
507 | sql, geom_col="geom", epsg="4326" |
||
508 | ) |
||
509 | central_bus = entsoe_to_bus_etrago(scn_name).to_frame() |
||
510 | central_bus["geom"] = ( |
||
511 | df_foreign_buses.set_index("bus_id") |
||
512 | .loc[central_bus[0], "geom"] |
||
513 | .values |
||
514 | ) |
||
515 | df_foreign_buses = df_foreign_buses[ |
||
516 | df_foreign_buses["geom"].isin(central_bus["geom"]) |
||
517 | ] |
||
518 | |||
519 | if len(mastr_ph_foreign) > 0: |
||
520 | # Assign closest bus at voltage level to foreign pp |
||
521 | nearest_neighbors = [] |
||
522 | for vl, v_nom in {1: 380, 2: 220, 3: 110}.items(): |
||
523 | ph = mastr_ph_foreign.loc[ |
||
524 | mastr_ph_foreign["voltage_level"] == vl |
||
525 | ] |
||
526 | if ph.empty: |
||
527 | continue |
||
528 | bus = df_foreign_buses.loc[ |
||
529 | df_foreign_buses["v_nom"] == v_nom, |
||
530 | ["v_nom", "country", "bus_id", "geom"], |
||
531 | ] |
||
532 | results = gpd.sjoin_nearest( |
||
533 | left_df=ph, |
||
534 | right_df=bus, |
||
535 | how="left", |
||
536 | distance_col="distance", |
||
537 | ) |
||
538 | nearest_neighbors.append(results) |
||
539 | mastr_ph_foreign = pd.concat(nearest_neighbors) |
||
540 | |||
541 | # Merge foreign pp |
||
542 | mastr_ph = pd.concat([mastr_ph, mastr_ph_foreign]) |
||
543 | |||
544 | # Reduce to necessary columns |
||
545 | mastr_ph = mastr_ph[ |
||
546 | [ |
||
547 | "el_capacity", |
||
548 | "voltage_level", |
||
549 | "bus_id", |
||
550 | "geometry", |
||
551 | "EinheitMastrNummer", |
||
552 | ] |
||
553 | ] |
||
554 | |||
555 | # Rename and format columns |
||
556 | mastr_ph["carrier"] = storage_type |
||
557 | mastr_ph = mastr_ph.rename( |
||
558 | columns={"EinheitMastrNummer": "source_id", "geometry": "geom"} |
||
559 | ) |
||
560 | mastr_ph["source_id"] = mastr_ph["source_id"].apply( |
||
561 | lambda x: {"MastrNummer": ", ".join(x)} |
||
562 | ) |
||
563 | mastr_ph = mastr_ph.set_geometry("geom") |
||
564 | mastr_ph["geom"] = mastr_ph["geom"].apply(lambda x: x.wkb_hex) |
||
565 | mastr_ph["scenario"] = scn_name |
||
566 | mastr_ph["sources"] = [ |
||
567 | {"el_capacity": "MaStR aggregated by location"} |
||
568 | ] * mastr_ph.shape[0] |
||
569 | |||
570 | # Delete existing units in the target table |
||
571 | db.execute_sql( |
||
572 | f""" DELETE FROM supply.egon_storages |
||
573 | WHERE carrier = '{storage_type}' |
||
574 | AND scenario = '{scn_name}' |
||
575 | AND sources ->> 'el_capacity' = 'MaStR aggregated by location';""" |
||
576 | ) |
||
577 | |||
578 | with db.session_scope() as session: |
||
579 | session.bulk_insert_mappings( |
||
580 | EgonStorages, |
||
581 | mastr_ph.to_dict(orient="records"), |
||
582 | ) |
||
765 |