Passed
Pull Request — dev (#1375)
by
unknown
02:18
created

ElectricalLoadSectorBreakdown.get_query()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
Sanity check validation rules for electrical load sector breakdown.
3
4
Validates that electrical loads are correctly disaggregated into sectors
5
(residential, commercial, industrial) and that each sector matches expected values.
6
"""
7
8
from egon_validation.rules.base import DataFrameRule, RuleResult, Severity
9
from egon.data import config, db
10
import pandas as pd
11
12
13
class ElectricalLoadSectorBreakdown(DataFrameRule):
14
    """
15
    Validate electrical load breakdown by sector (residential, commercial, industrial).
16
17
    This rule checks that the electrical load for each sector matches expected values:
18
    - Residential: 90.4 TWh (from household_curves)
19
    - Commercial: 146.7 TWh (from cts_curves)
20
    - Industrial: 382.9 TWh (from osm_curves + sites_curves)
21
    - Total: 620.0 TWh (from etrago AC loads)
22
23
    Matches the original electrical_load_100RE() function from sanity_checks.py.
24
    """
25
26
    def __init__(self, table: str, rule_id: str, scenario: str = "eGon100RE",
27
                 rtol: float = 0.01, **kwargs):
28
        """
29
        Parameters
30
        ----------
31
        table : str
32
            Target table (grid.egon_etrago_load)
33
        rule_id : str
34
            Unique identifier for this validation rule
35
        scenario : str
36
            Scenario name (default: "eGon100RE")
37
        rtol : float
38
            Relative tolerance for load deviation (default: 0.01 = 1%)
39
        """
40
        super().__init__(rule_id=rule_id, table=table, scenario=scenario,
41
                         rtol=rtol, **kwargs)
42
        self.kind = "sanity"
43
        self.scenario = scenario
44
        self.rtol = rtol
45
46
    def get_query(self, ctx):
47
        """
48
        Query to get total AC electrical load for Germany.
49
50
        Returns total load in TWh from etrago tables.
51
        """
52
        return f"""
53
        SELECT SUM((SELECT SUM(p) FROM UNNEST(b.p_set) p))/1000000::numeric as load_twh
54
        FROM grid.egon_etrago_load a
55
        JOIN grid.egon_etrago_load_timeseries b
56
            ON (a.load_id = b.load_id)
57
        JOIN grid.egon_etrago_bus c
58
            ON (a.bus = c.bus_id)
59
        WHERE a.scn_name = '{self.scenario}'
60
            AND b.scn_name = '{self.scenario}'
61
            AND c.scn_name = '{self.scenario}'
62
            AND a.carrier = 'AC'
63
            AND c.country = 'DE'
64
        """
65
66
    def _get_sector_loads(self):
67
        """
68
        Get electrical loads by sector from source tables.
69
70
        Returns
71
        -------
72
        dict
73
            Dictionary with sector loads in TWh:
74
            - residential: TWh from household_curves
75
            - commercial: TWh from cts_curves
76
            - industrial: TWh from osm_curves + sites_curves
77
        """
78
        sources = config.datasets()["etrago_electricity"]["sources"]
79
80
        # Commercial load from CTS curves
81
        cts_curves = db.select_dataframe(
82
            f"""SELECT bus_id AS bus, p_set FROM
83
                {sources['cts_curves']['schema']}.
84
                {sources['cts_curves']['table']}
85
                WHERE scn_name = '{self.scenario}'""",
86
            warning=False
87
        )
88
        commercial_twh = (
89
            cts_curves.apply(lambda x: sum(x["p_set"]), axis=1).sum() / 1000000
90
        )
91
92
        # Industrial load from OSM landuse areas
93
        ind_curves_osm = db.select_dataframe(
94
            f"""SELECT bus, p_set FROM
95
                {sources['osm_curves']['schema']}.
96
                {sources['osm_curves']['table']}
97
                WHERE scn_name = '{self.scenario}'""",
98
            warning=False
99
        )
100
        industrial_osm_twh = (
101
            ind_curves_osm.apply(lambda x: sum(x["p_set"]), axis=1).sum() / 1000000
102
        )
103
104
        # Industrial load from industrial sites
105
        ind_curves_sites = db.select_dataframe(
106
            f"""SELECT bus, p_set FROM
107
                {sources['sites_curves']['schema']}.
108
                {sources['sites_curves']['table']}
109
                WHERE scn_name = '{self.scenario}'""",
110
            warning=False
111
        )
112
        industrial_sites_twh = (
113
            ind_curves_sites.apply(lambda x: sum(x["p_set"]), axis=1).sum() / 1000000
114
        )
115
116
        # Total industrial
117
        industrial_twh = industrial_osm_twh + industrial_sites_twh
118
119
        # Residential load from household curves
120
        hh_curves = db.select_dataframe(
121
            f"""SELECT bus_id AS bus, p_set FROM
122
                {sources['household_curves']['schema']}.
123
                {sources['household_curves']['table']}
124
                WHERE scn_name = '{self.scenario}'""",
125
            warning=False
126
        )
127
        residential_twh = (
128
            hh_curves.apply(lambda x: sum(x["p_set"]), axis=1).sum() / 1000000
129
        )
130
131
        return {
132
            "residential": residential_twh,
133
            "commercial": commercial_twh,
134
            "industrial": industrial_twh
135
        }
136
137
    def evaluate_df(self, df, ctx):
138
        """
139
        Evaluate electrical load sector breakdown.
140
141
        Parameters
142
        ----------
143
        df : pd.DataFrame
144
            DataFrame with total load_twh column
145
        ctx : dict
146
            Context information
147
148
        Returns
149
        -------
150
        RuleResult
151
            Validation result with success/failure status
152
        """
153 View Code Duplication
        if df.empty or df["load_twh"].isna().all():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
154
            return RuleResult(
155
                rule_id=self.rule_id,
156
                task=self.task,
157
                table=self.table,
158
                kind=self.kind,
159
                success=False,
160
                message=f"No electrical load data found for scenario {self.scenario}",
161
                severity=Severity.ERROR,
162
                schema=self.schema,
163
                table_name=self.table_name,
164
                rule_class=self.__class__.__name__
165
            )
166
167
        # Get total AC load
168
        total_load_twh = float(df["load_twh"].values[0])
169
170
        # Get sector loads
171
        try:
172
            sector_loads = self._get_sector_loads()
173
        except Exception as e:
174
            return RuleResult(
175
                rule_id=self.rule_id,
176
                task=self.task,
177
                table=self.table,
178
                kind=self.kind,
179
                success=False,
180
                message=f"Error reading sector load data: {str(e)}",
181
                severity=Severity.ERROR,
182
                schema=self.schema,
183
                table_name=self.table_name,
184
                rule_class=self.__class__.__name__
185
            )
186
187
        # Expected values (from original sanity_checks.py lines 2689-2694)
188
        # References:
189
        # https://github.com/openego/powerd-data/blob/56b8215928a8dc4fe953d266c563ce0ed98e93f9/src/egon/data/datasets/demandregio/__init__.py#L480
190
        # https://github.com/openego/powerd-data/blob/56b8215928a8dc4fe953d266c563ce0ed98e93f9/src/egon/data/datasets/demandregio/__init__.py#L775
191
        expected_values = {
192
            "residential": 90.4,
193
            "commercial": 146.7,
194
            "industrial": 382.9,
195
            "total": 620.0
196
        }
197
198
        # Build load summary dataframe
199
        load_summary = pd.DataFrame({
200
            "sector": ["residential", "commercial", "industrial", "total"],
201
            "expected": [
202
                expected_values["residential"],
203
                expected_values["commercial"],
204
                expected_values["industrial"],
205
                expected_values["total"]
206
            ],
207
            "observed": [
208
                sector_loads["residential"],
209
                sector_loads["commercial"],
210
                sector_loads["industrial"],
211
                total_load_twh
212
            ]
213
        })
214
215
        load_summary["diff"] = load_summary["observed"] - load_summary["expected"]
216
        load_summary["diff_pct"] = (
217
            load_summary["diff"] / load_summary["observed"] * 100
218
        )
219
220
        # Check if all deviations are within tolerance (< 1% as in original)
221
        violations = load_summary[load_summary["diff_pct"].abs() >= (self.rtol * 100)]
222
223
        if not violations.empty:
224
            # Format violation details
225
            violation_details = []
226
            for _, row in violations.iterrows():
227
                violation_details.append(
228
                    f"{row['sector']}: {row['observed']:.2f} TWh "
229
                    f"(expected {row['expected']:.2f} TWh, "
230
                    f"deviation {row['diff_pct']:+.2f}%)"
231
                )
232
233
            max_deviation = load_summary["diff_pct"].abs().max()
234
235
            return RuleResult(
236
                rule_id=self.rule_id,
237
                task=self.task,
238
                table=self.table,
239
                kind=self.kind,
240
                success=False,
241
                observed=float(max_deviation),
242
                expected=self.rtol * 100,
243
                message=(
244
                    f"Electrical load sector breakdown deviations exceed tolerance for {self.scenario}: "
245
                    f"{'; '.join(violation_details)}"
246
                ),
247
                severity=Severity.ERROR,
248
                schema=self.schema,
249
                table_name=self.table_name,
250
                rule_class=self.__class__.__name__
251
            )
252
253
        # All sectors within tolerance
254
        sector_summary = "; ".join([
255
            f"{row['sector']}: {row['observed']:.2f} TWh "
256
            f"(expected {row['expected']:.2f} TWh, "
257
            f"deviation {row['diff_pct']:+.2f}%)"
258
            for _, row in load_summary.iterrows()
259
        ])
260
261
        return RuleResult(
262
            rule_id=self.rule_id,
263
            task=self.task,
264
            table=self.table,
265
            kind=self.kind,
266
            success=True,
267
            observed=0.0,
268
            expected=0.0,
269
            message=(
270
                f"Electrical load sector breakdown valid for {self.scenario}: {sector_summary}"
271
            ),
272
            schema=self.schema,
273
            table_name=self.table_name,
274
            rule_class=self.__class__.__name__
275
        )
276