Code Duplication    Length = 71-71 lines in 2 locations

src/egon/data/datasets/heat_supply/individual_heating.py 2 locations

@@ 141-211 (lines=71) @@
138
        )
139
140
141
class HeatPumps2035(Dataset):
142
    def __init__(self, dependencies):
143
        def dyn_parallel_tasks():
144
            """Dynamically generate tasks
145
146
            The goal is to speed up tasks by parallelising bulks of mvgds.
147
148
            The number of parallel tasks is defined via parameter
149
            `parallel_tasks` in the dataset config `datasets.yml`.
150
151
            Returns
152
            -------
153
            set of airflow.PythonOperators
154
                The tasks. Each element is of
155
                :func:`egon.data.datasets.heat_supply.individual_heating.
156
                determine_hp_capacity_eGon2035_pypsa_eur_sec`
157
            """
158
            parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get(
159
                "parallel_tasks", 1
160
            )
161
            # ========== Register np datatypes with SQLA ==========
162
            register_adapter(np.float64, adapt_numpy_float64)
163
            register_adapter(np.int64, adapt_numpy_int64)
164
            # =====================================================
165
166
            with db.session_scope() as session:
167
                query = (
168
                    session.query(
169
                        MapZensusGridDistricts.bus_id,
170
                    )
171
                    .filter(
172
                        MapZensusGridDistricts.zensus_population_id
173
                        == EgonPetaHeat.zensus_population_id
174
                    )
175
                    .distinct(MapZensusGridDistricts.bus_id)
176
                )
177
            mvgd_ids = pd.read_sql(
178
                query.statement, query.session.bind, index_col=None
179
            )
180
181
            mvgd_ids = mvgd_ids.sort_values("bus_id").reset_index(drop=True)
182
183
            mvgd_ids = np.array_split(
184
                mvgd_ids["bus_id"].values, parallel_tasks
185
            )
186
187
            # mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0]
188
            tasks = set()
189
            for i, bulk in enumerate(mvgd_ids):
190
                tasks.add(
191
                    PythonOperator(
192
                        task_id=(
193
                            f"determine-hp-capacity-eGon2035_"
194
                            f"mvgd_{min(bulk)}-{max(bulk)}"
195
                        ),
196
                        python_callable=determine_hp_cap_peak_load_mvgd_ts_2035,
197
                        op_kwargs={
198
                            "mvgd_ids": bulk,
199
                        },
200
                    )
201
                )
202
            return tasks
203
204
        super().__init__(
205
            name="HeatPumps2035",
206
            version="0.0.0",
207
            dependencies=dependencies,
208
            tasks=(
209
                create_hp_capacity_table,
210
                delete_peak_loads_if_existing,
211
                {*dyn_parallel_tasks()},
212
            ),
213
        )
214
@@ 66-136 (lines=71) @@
63
    hp_capacity = Column(REAL)
64
65
66
class HeatPumpsPypsaEurSec(Dataset):
67
    def __init__(self, dependencies):
68
        def dyn_parallel_tasks():
69
            """Dynamically generate tasks
70
71
            The goal is to speed up tasks by parallelising bulks of mvgds.
72
73
            The number of parallel tasks is defined via parameter
74
            `parallel_tasks` in the dataset config `datasets.yml`.
75
76
            Returns
77
            -------
78
            set of airflow.PythonOperators
79
                The tasks. Each element is of
80
                :func:`egon.data.datasets.heat_supply.individual_heating.
81
                determine_hp_capacity_eGon2035_pypsa_eur_sec`
82
            """
83
            parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get(
84
                "parallel_tasks", 1
85
            )
86
            # ========== Register np datatypes with SQLA ==========
87
            register_adapter(np.float64, adapt_numpy_float64)
88
            register_adapter(np.int64, adapt_numpy_int64)
89
            # =====================================================
90
91
            with db.session_scope() as session:
92
                query = (
93
                    session.query(
94
                        MapZensusGridDistricts.bus_id,
95
                    )
96
                    .filter(
97
                        MapZensusGridDistricts.zensus_population_id
98
                        == EgonPetaHeat.zensus_population_id
99
                    )
100
                    .distinct(MapZensusGridDistricts.bus_id)
101
                )
102
            mvgd_ids = pd.read_sql(
103
                query.statement, query.session.bind, index_col=None
104
            )
105
106
            mvgd_ids = mvgd_ids.sort_values("bus_id").reset_index(drop=True)
107
108
            mvgd_ids = np.array_split(
109
                mvgd_ids["bus_id"].values, parallel_tasks
110
            )
111
112
            # mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0]
113
            tasks = set()
114
            for i, bulk in enumerate(mvgd_ids):
115
                tasks.add(
116
                    PythonOperator(
117
                        task_id=(
118
                            f"determine-hp-capacity-pypsa-eur-sec_"
119
                            f"mvgd_{min(bulk)}-{max(bulk)}"
120
                        ),
121
                        python_callable=determine_hp_cap_peak_load_mvgd_ts_pypsa_eur_sec,
122
                        op_kwargs={
123
                            "mvgd_ids": bulk,
124
                        },
125
                    )
126
                )
127
            return tasks
128
129
        super().__init__(
130
            name="HeatPumpsPypsaEurSec",
131
            version="0.0.0",
132
            dependencies=dependencies,
133
            tasks=(
134
                create_peak_load_table,
135
                create_egon_etrago_timeseries_individual_heating,
136
                {*dyn_parallel_tasks()},
137
            ),
138
        )
139