Passed
Pull Request — dev (#1375)
by
unknown
05:22
created

ResidentialElectricityHhRefinement.evaluate_df()   B

Complexity

Conditions 2

Size

Total Lines 43
Code Lines 37

Duplication

Lines 43
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 37
dl 43
loc 43
rs 8.9919
c 0
b 0
f 0
cc 2
nop 3
1
"""Residential electricity demand sanity check validation rules."""
2
3
from egon_validation.rules.base import DataFrameRule, RuleResult, Severity
4
import numpy as np
5
6
7 View Code Duplication
class ResidentialElectricityAnnualSum(DataFrameRule):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
8
    """Validate aggregated annual residential electricity demand matches DemandRegio at NUTS-3.
9
10
    Aggregates the annual demand of all census cells at NUTS3 to compare
11
    with initial scaling parameters from DemandRegio.
12
13
    Args:
14
        table: Primary table being validated (demand.egon_demandregio_zensus_electricity)
15
        rule_id: Unique identifier for this validation rule
16
        rtol: Relative tolerance for comparison (default: 0.005 = 0.5%)
17
18
    Example:
19
        >>> validation = {
20
        ...     "data_quality": [
21
        ...         ResidentialElectricityAnnualSum(
22
        ...             table="demand.egon_demandregio_zensus_electricity",
23
        ...             rule_id="SANITY_RESIDENTIAL_ELECTRICITY_ANNUAL_SUM",
24
        ...             rtol=0.005
25
        ...         )
26
        ...     ]
27
        ... }
28
    """
29
30
    def __init__(self, table: str, rule_id: str, rtol: float = 0.005, **kwargs):
31
        super().__init__(rule_id=rule_id, table=table, rtol=rtol, **kwargs)
32
        self.kind = "sanity"  # Override inferred kind
33
34
    def get_query(self, ctx):
35
        return """
36
        SELECT dr.nuts3, dr.scenario, dr.demand_regio_sum, profiles.profile_sum
37
        FROM (
38
            SELECT scenario, SUM(demand) AS profile_sum, vg250_nuts3
39
            FROM demand.egon_demandregio_zensus_electricity AS egon,
40
             boundaries.egon_map_zensus_vg250 AS boundaries
41
            WHERE egon.zensus_population_id = boundaries.zensus_population_id
42
            AND sector = 'residential'
43
            GROUP BY vg250_nuts3, scenario
44
        ) AS profiles
45
        JOIN (
46
            SELECT nuts3, scenario, sum(demand) AS demand_regio_sum
47
            FROM demand.egon_demandregio_hh
48
            GROUP BY year, scenario, nuts3
49
        ) AS dr
50
        ON profiles.vg250_nuts3 = dr.nuts3 AND profiles.scenario = dr.scenario
51
        """
52
53
    def evaluate_df(self, df, ctx):
54
        rtol = self.params.get("rtol", 0.005)
55
56
        try:
57
            np.testing.assert_allclose(
58
                actual=df["profile_sum"],
59
                desired=df["demand_regio_sum"],
60
                rtol=rtol,
61
                verbose=False,
62
            )
63
64
            # Calculate actual max deviation for reporting
65
            max_diff = ((df["profile_sum"] - df["demand_regio_sum"]) / df["demand_regio_sum"]).abs().max()
66
67
            return RuleResult(
68
                rule_id=self.rule_id,
69
                task=self.task,
70
                table=self.table,
71
                kind=self.kind,
72
                success=True,
73
                observed=float(max_diff),
74
                expected=rtol,
75
                message=f"Aggregated annual residential electricity demand matches with DemandRegio at NUTS-3 (max deviation: {max_diff:.4%}, tolerance: {rtol:.4%})",
76
                schema=self.schema,
77
                table_name=self.table_name,
78
                rule_class=self.__class__.__name__
79
            )
80
        except AssertionError:
81
            max_diff = ((df["profile_sum"] - df["demand_regio_sum"]) / df["demand_regio_sum"]).abs().max()
82
            violations = df[~np.isclose(df["profile_sum"], df["demand_regio_sum"], rtol=rtol)]
83
84
            return RuleResult(
85
                rule_id=self.rule_id,
86
                task=self.task,
87
                table=self.table,
88
                kind=self.kind,
89
                success=False,
90
                observed=float(max_diff),
91
                expected=rtol,
92
                message=f"Demand mismatch: max deviation {max_diff:.4%} exceeds tolerance {rtol:.4%}. {len(violations)} NUTS-3 regions have mismatches.",
93
                severity=Severity.ERROR,
94
                schema=self.schema,
95
                table_name=self.table_name,
96
                rule_class=self.__class__.__name__
97
            )
98
99
100 View Code Duplication
class ResidentialElectricityHhRefinement(DataFrameRule):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
101
    """Validate aggregated household types after refinement match original census values.
102
103
    Checks sum of aggregated household types after refinement method
104
    was applied and compares it to the original census values.
105
106
    Args:
107
        table: Primary table being validated (society.egon_destatis_zensus_household_per_ha_refined)
108
        rule_id: Unique identifier for this validation rule
109
        rtol: Relative tolerance for comparison (default: 1e-5 = 0.001%)
110
111
    Example:
112
        >>> validation = {
113
        ...     "data_quality": [
114
        ...         ResidentialElectricityHhRefinement(
115
        ...             table="society.egon_destatis_zensus_household_per_ha_refined",
116
        ...             rule_id="SANITY_RESIDENTIAL_HH_REFINEMENT",
117
        ...             rtol=1e-5
118
        ...         )
119
        ...     ]
120
        ... }
121
    """
122
123
    def __init__(self, table: str, rule_id: str, rtol: float = 1e-5, **kwargs):
124
        super().__init__(rule_id=rule_id, table=table, rtol=rtol, **kwargs)
125
        self.kind = "sanity"
126
127
    def get_query(self, ctx):
128
        return """
129
        SELECT refined.nuts3, refined.characteristics_code,
130
                refined.sum_refined::int, census.sum_census::int
131
        FROM(
132
            SELECT nuts3, characteristics_code, SUM(hh_10types) as sum_refined
133
            FROM society.egon_destatis_zensus_household_per_ha_refined
134
            GROUP BY nuts3, characteristics_code)
135
            AS refined
136
        JOIN(
137
            SELECT t.nuts3, t.characteristics_code, sum(orig) as sum_census
138
            FROM(
139
                SELECT nuts3, cell_id, characteristics_code,
140
                        sum(DISTINCT(hh_5types))as orig
141
                FROM society.egon_destatis_zensus_household_per_ha_refined
142
                GROUP BY cell_id, characteristics_code, nuts3) AS t
143
            GROUP BY t.nuts3, t.characteristics_code    ) AS census
144
        ON refined.nuts3 = census.nuts3
145
        AND refined.characteristics_code = census.characteristics_code
146
        """
147
148
    def evaluate_df(self, df, ctx):
149
        rtol = self.params.get("rtol", 1e-5)
150
151
        try:
152
            np.testing.assert_allclose(
153
                actual=df["sum_refined"],
154
                desired=df["sum_census"],
155
                rtol=rtol,
156
                verbose=False,
157
            )
158
159
            max_diff = ((df["sum_refined"] - df["sum_census"]) / df["sum_census"]).abs().max()
160
161
            return RuleResult(
162
                rule_id=self.rule_id,
163
                task=self.task,
164
                table=self.table,
165
                kind=self.kind,
166
                success=True,
167
                observed=float(max_diff),
168
                expected=rtol,
169
                message=f"All aggregated household types match at NUTS-3 (max deviation: {max_diff:.6%}, tolerance: {rtol:.6%})",
170
                schema=self.schema,
171
                table_name=self.table_name,
172
                rule_class=self.__class__.__name__
173
            )
174
        except AssertionError:
175
            max_diff = ((df["sum_refined"] - df["sum_census"]) / df["sum_census"]).abs().max()
176
            violations = df[~np.isclose(df["sum_refined"], df["sum_census"], rtol=rtol)]
177
178
            return RuleResult(
179
                rule_id=self.rule_id,
180
                task=self.task,
181
                table=self.table,
182
                kind=self.kind,
183
                success=False,
184
                observed=float(max_diff),
185
                expected=rtol,
186
                message=f"Household refinement mismatch: max deviation {max_diff:.6%} exceeds tolerance {rtol:.6%}. {len(violations)} NUTS-3/characteristic combinations have mismatches.",
187
                severity=Severity.ERROR,
188
                schema=self.schema,
189
                table_name=self.table_name,
190
                rule_class=self.__class__.__name__
191
            )
192