| @@ 141-211 (lines=71) @@ | ||
| 138 | ) |
|
| 139 | ||
| 140 | ||
| 141 | class HeatPumps2035(Dataset): |
|
| 142 | def __init__(self, dependencies): |
|
| 143 | def dyn_parallel_tasks(): |
|
| 144 | """Dynamically generate tasks |
|
| 145 | ||
| 146 | The goal is to speed up tasks by parallelising bulks of mvgds. |
|
| 147 | ||
| 148 | The number of parallel tasks is defined via parameter |
|
| 149 | `parallel_tasks` in the dataset config `datasets.yml`. |
|
| 150 | ||
| 151 | Returns |
|
| 152 | ------- |
|
| 153 | set of airflow.PythonOperators |
|
| 154 | The tasks. Each element is of |
|
| 155 | :func:`egon.data.datasets.heat_supply.individual_heating. |
|
| 156 | determine_hp_capacity_eGon2035_pypsa_eur_sec` |
|
| 157 | """ |
|
| 158 | parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get( |
|
| 159 | "parallel_tasks", 1 |
|
| 160 | ) |
|
| 161 | # ========== Register np datatypes with SQLA ========== |
|
| 162 | register_adapter(np.float64, adapt_numpy_float64) |
|
| 163 | register_adapter(np.int64, adapt_numpy_int64) |
|
| 164 | # ===================================================== |
|
| 165 | ||
| 166 | with db.session_scope() as session: |
|
| 167 | query = ( |
|
| 168 | session.query( |
|
| 169 | MapZensusGridDistricts.bus_id, |
|
| 170 | ) |
|
| 171 | .filter( |
|
| 172 | MapZensusGridDistricts.zensus_population_id |
|
| 173 | == EgonPetaHeat.zensus_population_id |
|
| 174 | ) |
|
| 175 | .distinct(MapZensusGridDistricts.bus_id) |
|
| 176 | ) |
|
| 177 | mvgd_ids = pd.read_sql( |
|
| 178 | query.statement, query.session.bind, index_col=None |
|
| 179 | ) |
|
| 180 | ||
| 181 | mvgd_ids = mvgd_ids.sort_values("bus_id").reset_index(drop=True) |
|
| 182 | ||
| 183 | mvgd_ids = np.array_split( |
|
| 184 | mvgd_ids["bus_id"].values, parallel_tasks |
|
| 185 | ) |
|
| 186 | ||
| 187 | # mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0] |
|
| 188 | tasks = set() |
|
| 189 | for i, bulk in enumerate(mvgd_ids): |
|
| 190 | tasks.add( |
|
| 191 | PythonOperator( |
|
| 192 | task_id=( |
|
| 193 | f"determine-hp-capacity-eGon2035_" |
|
| 194 | f"mvgd_{min(bulk)}-{max(bulk)}" |
|
| 195 | ), |
|
| 196 | python_callable=determine_hp_cap_peak_load_mvgd_ts_2035, |
|
| 197 | op_kwargs={ |
|
| 198 | "mvgd_ids": bulk, |
|
| 199 | }, |
|
| 200 | ) |
|
| 201 | ) |
|
| 202 | return tasks |
|
| 203 | ||
| 204 | super().__init__( |
|
| 205 | name="HeatPumps2035", |
|
| 206 | version="0.0.0", |
|
| 207 | dependencies=dependencies, |
|
| 208 | tasks=( |
|
| 209 | create_hp_capacity_table, |
|
| 210 | delete_peak_loads_if_existing, |
|
| 211 | {*dyn_parallel_tasks()}, |
|
| 212 | ), |
|
| 213 | ) |
|
| 214 | ||
| @@ 66-136 (lines=71) @@ | ||
| 63 | hp_capacity = Column(REAL) |
|
| 64 | ||
| 65 | ||
| 66 | class HeatPumpsPypsaEurSec(Dataset): |
|
| 67 | def __init__(self, dependencies): |
|
| 68 | def dyn_parallel_tasks(): |
|
| 69 | """Dynamically generate tasks |
|
| 70 | ||
| 71 | The goal is to speed up tasks by parallelising bulks of mvgds. |
|
| 72 | ||
| 73 | The number of parallel tasks is defined via parameter |
|
| 74 | `parallel_tasks` in the dataset config `datasets.yml`. |
|
| 75 | ||
| 76 | Returns |
|
| 77 | ------- |
|
| 78 | set of airflow.PythonOperators |
|
| 79 | The tasks. Each element is of |
|
| 80 | :func:`egon.data.datasets.heat_supply.individual_heating. |
|
| 81 | determine_hp_capacity_eGon2035_pypsa_eur_sec` |
|
| 82 | """ |
|
| 83 | parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get( |
|
| 84 | "parallel_tasks", 1 |
|
| 85 | ) |
|
| 86 | # ========== Register np datatypes with SQLA ========== |
|
| 87 | register_adapter(np.float64, adapt_numpy_float64) |
|
| 88 | register_adapter(np.int64, adapt_numpy_int64) |
|
| 89 | # ===================================================== |
|
| 90 | ||
| 91 | with db.session_scope() as session: |
|
| 92 | query = ( |
|
| 93 | session.query( |
|
| 94 | MapZensusGridDistricts.bus_id, |
|
| 95 | ) |
|
| 96 | .filter( |
|
| 97 | MapZensusGridDistricts.zensus_population_id |
|
| 98 | == EgonPetaHeat.zensus_population_id |
|
| 99 | ) |
|
| 100 | .distinct(MapZensusGridDistricts.bus_id) |
|
| 101 | ) |
|
| 102 | mvgd_ids = pd.read_sql( |
|
| 103 | query.statement, query.session.bind, index_col=None |
|
| 104 | ) |
|
| 105 | ||
| 106 | mvgd_ids = mvgd_ids.sort_values("bus_id").reset_index(drop=True) |
|
| 107 | ||
| 108 | mvgd_ids = np.array_split( |
|
| 109 | mvgd_ids["bus_id"].values, parallel_tasks |
|
| 110 | ) |
|
| 111 | ||
| 112 | # mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0] |
|
| 113 | tasks = set() |
|
| 114 | for i, bulk in enumerate(mvgd_ids): |
|
| 115 | tasks.add( |
|
| 116 | PythonOperator( |
|
| 117 | task_id=( |
|
| 118 | f"determine-hp-capacity-pypsa-eur-sec_" |
|
| 119 | f"mvgd_{min(bulk)}-{max(bulk)}" |
|
| 120 | ), |
|
| 121 | python_callable=determine_hp_cap_peak_load_mvgd_ts_pypsa_eur_sec, |
|
| 122 | op_kwargs={ |
|
| 123 | "mvgd_ids": bulk, |
|
| 124 | }, |
|
| 125 | ) |
|
| 126 | ) |
|
| 127 | return tasks |
|
| 128 | ||
| 129 | super().__init__( |
|
| 130 | name="HeatPumpsPypsaEurSec", |
|
| 131 | version="0.0.0", |
|
| 132 | dependencies=dependencies, |
|
| 133 | tasks=( |
|
| 134 | create_peak_load_table, |
|
| 135 | create_egon_etrago_timeseries_individual_heating, |
|
| 136 | {*dyn_parallel_tasks()}, |
|
| 137 | ), |
|
| 138 | ) |
|
| 139 | ||