Passed
Pull Request — dev (#1112)
by
unknown
01:53
created

zip_and_municipality_from_standort()   A

Complexity

Conditions 5

Size

Total Lines 41
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 41
rs 9.0333
c 0
b 0
f 0
cc 5
nop 1
1
"""Import MaStR dataset and write to DB tables
2
3
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
4
database. Only some technologies are taken into account and written to the
5
following tables:
6
7
* PV: table `supply.egon_power_plants_pv`
8
* wind turbines: table `supply.egon_power_plants_wind`
9
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
10
* hydro plants: table `supply.egon_power_plants_hydro`
11
12
Handling of empty source data in MaStr dump:
13
* `voltage_level`: inferred based on nominal power (`capacity`) using the
14
  ranges from
15
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
16
  which results in True in column `voltage_level_inferred`. Remaining datasets
17
  are set to -1 (which only occurs if `capacity` is empty).
18
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
19
  districts or no geom available, e.g. for units with nom. power <30 kW)
20
* `supply.egon_power_plants_hydro.plant_type`: NaN
21
22
The data is used especially for the generation of status quo grids by ding0.
23
"""
24
from __future__ import annotations
25
26
from pathlib import Path
27
28
from geoalchemy2 import Geometry
29
from loguru import logger
30
from sqlalchemy import (
31
    Boolean,
32
    Column,
33
    DateTime,
34
    Float,
35
    Integer,
36
    Sequence,
37
    String,
38
)
39
from sqlalchemy.ext.declarative import declarative_base
40
import geopandas as gpd
41
import numpy as np
42
import pandas as pd
43
44
from egon.data import config, db
45
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
46
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
47
    federal_state_data,
48
)
49
50
Base = declarative_base()
51
52
TESTMODE_OFF = (
53
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
54
)
55
56
57
class EgonMastrGeocoded(Base):
58
    __tablename__ = "egon_mastr_geocoded"
59
    __table_args__ = {"schema": "supply"}
60
61
    index = Column(
62
        Integer, Sequence("mastr_geocoded_seq"), primary_key=True, index=True
63
    )
64
    zip_and_municipality = Column(String)
65
    latitude = Column(Float)
66
    longitude = Column(Float)
67
    altitude = Column(Float)
68
    geometry = Column(Geometry("POINT", 4326))
69
70
71
class EgonPowerPlantsPv(Base):
72
    __tablename__ = "egon_power_plants_pv"
73
    __table_args__ = {"schema": "supply"}
74
75
    id = Column(Integer, Sequence("pp_pv_seq"), primary_key=True)
76
    bus_id = Column(Integer, nullable=True)  # Grid district id
77
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
78
79
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
80
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
81
    postcode = Column(String(5), nullable=True)  # Postleitzahl
82
    city = Column(String(50), nullable=True)  # Ort
83
    municipality = Column(String, nullable=True)  # Gemeinde
84
    federal_state = Column(String(31), nullable=True)  # Bundesland
85
    site = Column(String, nullable=True)  # Standort
86
    zip_and_municipality = Column(String, nullable=True)
87
88
    site_type = Column(String(69), nullable=True)  # Lage
89
    usage_sector = Column(String(36), nullable=True)  # Nutzungsbereich
90
    orientation_primary = Column(String(11), nullable=True)  # Hauptausrichtung
91
    orientation_primary_angle = Column(
92
        String(18), nullable=True
93
    )  # HauptausrichtungNeigungswinkel
94
    orientation_secondary = Column(
95
        String(11), nullable=True
96
    )  # Nebenausrichtung
97
    orientation_secondary_angle = Column(
98
        String(18), nullable=True
99
    )  # NebenausrichtungNeigungswinkel
100
    orientation_uniform = Column(
101
        Boolean, nullable=True
102
    )  # EinheitlicheAusrichtungUndNeigungswinkel
103
    module_count = Column(Float, nullable=True)  # AnzahlModule
104
105
    capacity = Column(Float, nullable=True)  # Nettonennleistung
106
    capacity_inverter = Column(
107
        Float, nullable=True
108
    )  # ZugeordneteWirkleistungWechselrichter in MW
109
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
110
    voltage_level = Column(Integer, nullable=True)
111
    voltage_level_inferred = Column(Boolean, nullable=True)
112
113
    geometry_geocoded = Column(Boolean)
114
115
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
116
117
118 View Code Duplication
class EgonPowerPlantsWind(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
119
    __tablename__ = "egon_power_plants_wind"
120
    __table_args__ = {"schema": "supply"}
121
122
    id = Column(Integer, Sequence("pp_wind_seq"), primary_key=True)
123
    bus_id = Column(Integer, nullable=True)  # Grid district id
124
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
125
126
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
127
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
128
    postcode = Column(String(5), nullable=True)  # Postleitzahl
129
    city = Column(String(50), nullable=True)  # Ort
130
    municipality = Column(String, nullable=True)  # Gemeinde
131
    federal_state = Column(String(31), nullable=True)  # Bundesland
132
    zip_and_municipality = Column(String, nullable=True)
133
134
    site_type = Column(String(17), nullable=True)  # Lage
135
    manufacturer_name = Column(String(100), nullable=True)  # Hersteller
136
    type_name = Column(String(100), nullable=True)  # Typenbezeichnung
137
    hub_height = Column(Float, nullable=True)  # Nabenhoehe
138
    rotor_diameter = Column(Float, nullable=True)  # Rotordurchmesser
139
140
    capacity = Column(Float, nullable=True)  # Nettonennleistung
141
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
142
    voltage_level = Column(Integer, nullable=True)
143
    voltage_level_inferred = Column(Boolean, nullable=True)
144
145
    geometry_geocoded = Column(Boolean)
146
147
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
148
149
150 View Code Duplication
class EgonPowerPlantsBiomass(Base):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
151
    __tablename__ = "egon_power_plants_biomass"
152
    __table_args__ = {"schema": "supply"}
153
154
    id = Column(Integer, Sequence("pp_biomass_seq"), primary_key=True)
155
    bus_id = Column(Integer, nullable=True)  # Grid district id
156
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
157
158
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
159
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
160
    postcode = Column(String(5), nullable=True)  # Postleitzahl
161
    city = Column(String(50), nullable=True)  # Ort
162
    municipality = Column(String, nullable=True)  # Gemeinde
163
    federal_state = Column(String(31), nullable=True)  # Bundesland
164
    zip_and_municipality = Column(String, nullable=True)
165
166
    technology = Column(String(45), nullable=True)  # Technologie
167
    fuel_name = Column(String(52), nullable=True)  # Hauptbrennstoff
168
    fuel_type = Column(String(19), nullable=True)  # Biomasseart
169
170
    capacity = Column(Float, nullable=True)  # Nettonennleistung
171
    th_capacity = Column(Float, nullable=True)  # ThermischeNutzleistung
172
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
173
    voltage_level = Column(Integer, nullable=True)
174
    voltage_level_inferred = Column(Boolean, nullable=True)
175
176
    geometry_geocoded = Column(Boolean)
177
178
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
179
180
181
class EgonPowerPlantsHydro(Base):
182
    __tablename__ = "egon_power_plants_hydro"
183
    __table_args__ = {"schema": "supply"}
184
185
    id = Column(Integer, Sequence("pp_hydro_seq"), primary_key=True)
186
    bus_id = Column(Integer, nullable=True)  # Grid district id
187
    gens_id = Column(String, nullable=True)  # EinheitMastrNummer
188
189
    status = Column(String, nullable=True)  # EinheitBetriebsstatus
190
    commissioning_date = Column(DateTime, nullable=True)  # Inbetriebnahmedatum
191
    postcode = Column(String(5), nullable=True)  # Postleitzahl
192
    city = Column(String(50), nullable=True)  # Ort
193
    municipality = Column(String, nullable=True)  # Gemeinde
194
    federal_state = Column(String(31), nullable=True)  # Bundesland
195
    zip_and_municipality = Column(String, nullable=True)
196
197
    plant_type = Column(String(39), nullable=True)  # ArtDerWasserkraftanlage
198
    water_origin = Column(String(20), nullable=True)  # ArtDesZuflusses
199
200
    capacity = Column(Float, nullable=True)  # Nettonennleistung
201
    feedin_type = Column(String(47), nullable=True)  # Einspeisungsart
202
    voltage_level = Column(Integer, nullable=True)
203
    voltage_level_inferred = Column(Boolean, nullable=True)
204
205
    geometry_geocoded = Column(Boolean)
206
207
    geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
208
209
210
def isfloat(num: str):
211
    """
212
    Determine if string can be converted to float.
213
    Parameters
214
    -----------
215
    num : str
216
        String to parse.
217
    Returns
218
    -------
219
    bool
220
        Returns True in string can be parsed to float.
221
    """
222
    try:
223
        float(num)
224
        return True
225
    except ValueError:
226
        return False
227
228
229
def zip_and_municipality_from_standort(
230
    standort: str,
231
) -> tuple[str, bool]:
232
    """
233
    Get zip code and municipality from Standort string split into a list.
234
    Parameters
235
    -----------
236
    standort : str
237
        Standort as given from MaStR data.
238
    Returns
239
    -------
240
    str
241
        Standort with only the zip code and municipality
242
        as well a ', Germany' added.
243
    """
244
    standort_list = standort.split()
245
246
    found = False
247
    count = 0
248
249
    for count, elem in enumerate(standort_list):
250
        if len(elem) != 5:
251
            continue
252
        if not elem.isnumeric():
253
            continue
254
255
        found = True
256
257
        break
258
259
    if found:
260
        cleaned_str = " ".join(standort_list[count:])
261
262
        return cleaned_str, found
263
264
    logger.warning(
265
        "Couldn't identify zip code. This entry will be dropped."
266
        f" Original standort: {standort}."
267
    )
268
269
    return standort, found
270
271
272
def infer_voltage_level(
273
    units_gdf: gpd.GeoDataFrame,
274
) -> gpd.GeoDataFrame:
275
    """
276
    Infer nan values in voltage level derived from generator capacity to
277
    the power plants.
278
279
    Parameters
280
    -----------
281
    units_gdf : geopandas.GeoDataFrame
282
        GeoDataFrame containing units with voltage levels from MaStR
283
    Returnsunits_gdf: gpd.GeoDataFrame
284
    -------
285
    geopandas.GeoDataFrame
286
        GeoDataFrame containing units all having assigned a voltage level.
287
    """
288
289 View Code Duplication
    def voltage_levels(p: float) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
290
        if p <= 100:
291
            return 7
292
        elif p <= 200:
293
            return 6
294
        elif p <= 5500:
295
            return 5
296
        elif p <= 20000:
297
            return 4
298
        elif p <= 120000:
299
            return 3
300
        return 1
301
302
    units_gdf["voltage_level_inferred"] = False
303
    mask = units_gdf.voltage_level.isna()
304
    units_gdf.loc[mask, "voltage_level_inferred"] = True
305
    units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
306
        mask
307
    ].Nettonennleistung.apply(voltage_levels)
308
309
    return units_gdf
310
311
312
def import_mastr() -> None:
313
    """Import MaStR data into database"""
314
    engine = db.engine()
315
316
    # import geocoded data
317
    cfg = config.datasets()["mastr_new"]
318
    path_parts = cfg["geocoding_path"]
319
    path = Path(*["."] + path_parts).resolve()
320
    path = list(path.iterdir())[0]
321
322
    deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
323
    deposit_id_mastr = cfg["deposit_id"]
324
325
    if deposit_id_geocoding != deposit_id_mastr:
326
        raise AssertionError(
327
            f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
328
            f" dataset is not matching with the geocoding version "
329
            f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
330
            f"the MaStR dataset is updated also update the geocoding and "
331
            f"update the egon data bundle. The geocoding can be done using: "
332
            f"https://github.com/RLI-sandbox/mastr-geocoding"
333
        )
334
335
    geocoding_gdf = gpd.read_file(path)
336
337
    # remove failed requests
338
    geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
339
340
    EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
341
    EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
342
343
    geocoding_gdf.to_postgis(
344
        name=EgonMastrGeocoded.__tablename__,
345
        con=engine,
346
        if_exists="append",
347
        schema=EgonMastrGeocoded.__table_args__["schema"],
348
        index=True,
349
    )
350
351
    cfg = config.datasets()["power_plants"]
352
353
    cols_mapping = {
354
        "all": {
355
            "EinheitMastrNummer": "gens_id",
356
            "EinheitBetriebsstatus": "status",
357
            "Inbetriebnahmedatum": "commissioning_date",
358
            "Postleitzahl": "postcode",
359
            "Ort": "city",
360
            "Gemeinde": "municipality",
361
            "Bundesland": "federal_state",
362
            "Nettonennleistung": "capacity",
363
            "Einspeisungsart": "feedin_type",
364
        },
365
        "pv": {
366
            "Lage": "site_type",
367
            "Standort": "site",
368
            "Nutzungsbereich": "usage_sector",
369
            "Hauptausrichtung": "orientation_primary",
370
            "HauptausrichtungNeigungswinkel": "orientation_primary_angle",
371
            "Nebenausrichtung": "orientation_secondary",
372
            "NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
373
            "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
374
            "AnzahlModule": "module_count",
375
            "zugeordneteWirkleistungWechselrichter": "capacity_inverter",
376
        },
377
        "wind": {
378
            "Lage": "site_type",
379
            "Hersteller": "manufacturer_name",
380
            "Typenbezeichnung": "type_name",
381
            "Nabenhoehe": "hub_height",
382
            "Rotordurchmesser": "rotor_diameter",
383
        },
384
        "biomass": {
385
            "Technologie": "technology",
386
            "Hauptbrennstoff": "fuel_name",
387
            "Biomasseart": "fuel_type",
388
            "ThermischeNutzleistung": "th_capacity",
389
        },
390
        "hydro": {
391
            "ArtDerWasserkraftanlage": "plant_type",
392
            "ArtDesZuflusses": "water_origin",
393
        },
394
    }
395
396
    source_files = {
397
        "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
398
        "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
399
        "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
400
        "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
401
    }
402
    target_tables = {
403
        "pv": EgonPowerPlantsPv,
404
        "wind": EgonPowerPlantsWind,
405
        "biomass": EgonPowerPlantsBiomass,
406
        "hydro": EgonPowerPlantsHydro,
407
    }
408
    vlevel_mapping = {
409
        "Höchstspannung": 1,
410
        "UmspannungZurHochspannung": 2,
411
        "Hochspannung": 3,
412
        "UmspannungZurMittelspannung": 4,
413
        "Mittelspannung": 5,
414
        "UmspannungZurNiederspannung": 6,
415
        "Niederspannung": 7,
416
    }
417
418
    # import locations
419
    locations = pd.read_csv(
420
        WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
421
        index_col=None,
422
    )
423
424
    # import grid districts
425
    mv_grid_districts = db.select_geodataframe(
426
        f"""
427
        SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
428
        """,
429
        epsg=4326,
430
    )
431
432
    # import units
433
    technologies = ["pv", "wind", "biomass", "hydro"]
434
    for tech in technologies:
435
        # read units
436
        logger.info(f"===== Importing MaStR dataset: {tech} =====")
437
        logger.debug("Reading CSV and filtering data...")
438
        units = pd.read_csv(
439
            source_files[tech],
440
            usecols=(
441
                ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
442
                + list(cols_mapping["all"].keys())
443
                + list(cols_mapping[tech].keys())
444
            ),
445
            index_col=None,
446
            dtype={"Postleitzahl": str},
447
        ).rename(columns=cols_mapping)
448
449
        # drop units outside of Germany
450
        len_old = len(units)
451
        units = units.loc[units.Land == "Deutschland"]
452
        logger.debug(
453
            f"{len_old - len(units)} units outside of Germany dropped..."
454
        )
455
456
        # get boundary
457
        boundary = (
458
            federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
459
        )
460
461
        # filter for SH units if in testmode
462
        if not TESTMODE_OFF:
463
            logger.info(
464
                "TESTMODE: Dropping all units outside of Schleswig-Holstein..."
465
            )
466
            units = units.loc[units.Bundesland == "SchleswigHolstein"]
467
468
        # merge and rename voltage level
469
        logger.debug("Merging with locations and allocate voltage level...")
470
        units = units.merge(
471
            locations[["MaStRNummer", "Spannungsebene"]],
472
            left_on="LokationMastrNummer",
473
            right_on="MaStRNummer",
474
            how="left",
475
        )
476
        # convert voltage levels to numbers
477
        units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
478
        # set voltage level for nan values
479
        units = infer_voltage_level(units)
480
481
        # add geometry
482
        logger.debug("Adding geometries...")
483
        units = gpd.GeoDataFrame(
484
            units,
485
            geometry=gpd.points_from_xy(
486
                units["Laengengrad"], units["Breitengrad"], crs=4326
487
            ),
488
            crs=4326,
489
        )
490
491
        units["geometry_geocoded"] = (
492
            units.Laengengrad.isna() | units.Laengengrad.isna()
493
        )
494
495
        units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
496
            ~units.geometry_geocoded, "geometry"
497
        ].is_valid
498
499
        units_wo_geom = units["geometry_geocoded"].sum()
500
501
        logger.debug(
502
            f"{units_wo_geom}/{len(units)} units do not have a geometry!"
503
            " Adding geocoding results."
504
        )
505
506
        # determine zip and municipality string
507
        mask = (
508
            units.Postleitzahl.apply(isfloat)
509
            & ~units.Postleitzahl.isna()
510
            & ~units.Gemeinde.isna()
511
        )
512
        units["zip_and_municipality"] = np.nan
513
        ok_units = units.loc[mask]
514
515
        units.loc[mask, "zip_and_municipality"] = (
516
            ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
517
            + " "
518
            + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
519
            + ", Deutschland"
520
        )
521
522
        # get zip and municipality from Standort
523
        parse_df = units.loc[~mask]
524
525
        if not parse_df.empty and "Standort" in parse_df.columns:
526
            init_len = len(parse_df)
527
528
            logger.info(
529
                f"Parsing ZIP code and municipality from Standort for "
530
                f"{init_len} values for {tech}."
531
            )
532
533
            parse_df[["zip_and_municipality", "drop_this"]] = (
534
                parse_df.Standort.astype(str)
535
                .apply(zip_and_municipality_from_standort)
536
                .tolist()
537
            )
538
539
            parse_df = parse_df.loc[parse_df.drop_this]
540
541
            if not parse_df.empty:
542
                units.loc[
543
                    parse_df.index, "zip_and_municipality"
544
                ] = parse_df.zip_and_municipality
545
546
        # add geocoding to missing
547
        units = units.merge(
548
            right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
549
                columns={"geometry": "temp"}
550
            ),
551
            how="left",
552
            on="zip_and_municipality",
553
        )
554
555
        units.loc[units.geometry_geocoded, "geometry"] = units.loc[
556
            units.geometry_geocoded, "temp"
557
        ]
558
559
        init_len = len(units)
560
561
        logger.info(
562
            "Dropping units outside boundary by geometry or without geometry"
563
            "..."
564
        )
565
566
        units.dropna(subset=["geometry"], inplace=True)
567
568
        units = units.loc[units.geometry.within(boundary)]
569
570
        logger.debug(
571
            f"{init_len - len(units)}/{init_len} "
572
            f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
573
        )
574
575
        # drop unnecessary and rename columns
576
        logger.debug("Reformatting...")
577
        units.drop(
578
            columns=[
579
                "LokationMastrNummer",
580
                "MaStRNummer",
581
                "Laengengrad",
582
                "Breitengrad",
583
                "Spannungsebene",
584
                "Land",
585
                "temp",
586
            ],
587
            inplace=True,
588
        )
589
        mapping = cols_mapping["all"].copy()
590
        mapping.update(cols_mapping[tech])
591
        mapping.update({"geometry": "geom"})
592
        units.rename(columns=mapping, inplace=True)
593
        units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
594
595
        units.set_geometry("geom", inplace=True)
596
        units["id"] = range(0, len(units))
597
598
        # change capacity unit: kW to MW
599
        units["capacity"] = units["capacity"] / 1e3
600
        if "capacity_inverter" in units.columns:
601
            units["capacity_inverter"] = units["capacity_inverter"] / 1e3
602
        if "th_capacity" in units.columns:
603
            units["th_capacity"] = units["th_capacity"] / 1e3
604
605
        # assign bus ids
606
        logger.debug("Assigning bus ids...")
607
        units = units.assign(
608
            bus_id=units.loc[~units.geom.x.isna()]
609
            .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
610
            .drop(columns=["index_right"])
611
            .bus_id
612
        )
613
        units["bus_id"] = units.bus_id.fillna(-1).astype(int)
614
615
        # write to DB
616
        logger.info(f"Writing {len(units)} units to DB...")
617
        target_tables[tech].__table__.drop(bind=engine, checkfirst=True)
618
        target_tables[tech].__table__.create(bind=engine, checkfirst=True)
619
620
        units.to_postgis(
621
            name=target_tables[tech].__tablename__,
622
            con=engine,
623
            if_exists="append",
624
            schema=target_tables[tech].__table_args__["schema"],
625
        )
626