Passed
Pull Request — dev (#1375)
by
unknown
02:18
created

HomeBatteriesAggregation.evaluate_df()   C

Complexity

Conditions 7

Size

Total Lines 113
Code Lines 71

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 71
dl 0
loc 113
rs 6.549
c 0
b 0
f 0
cc 7
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Sanity check validation rules for home batteries
3
4
Validates that home battery capacities are correctly aggregated from building-level
5
to bus-level in the storages table.
6
"""
7
8
import numpy as np
9
import pandas as pd
10
from egon_validation.rules.base import DataFrameRule, RuleResult, Severity
11
12
from egon.data import config, db
13
from egon.data.datasets.storages.home_batteries import get_cbat_pbat_ratio
14
15
16
class HomeBatteriesAggregation(DataFrameRule):
17
    """
18
    Validate home battery capacity aggregation from buildings to buses.
19
20
    This rule checks that the sum of home battery capacities allocated to
21
    buildings matches the aggregated capacity per bus in the storage table.
22
23
    The check compares:
24
    1. p_nom (power rating in MW) per bus
25
    2. capacity (energy capacity in MWh) per bus
26
27
    Both values are rounded to 6 decimal places for comparison.
28
    """
29
30
    def __init__(self, table: str, rule_id: str, scenario: str = "eGon2035", **kwargs):
31
        super().__init__(rule_id=rule_id, table=table, scenario=scenario, **kwargs)
32
        self.kind = "sanity"
33
        self.scenario = scenario
34
35
    def evaluate(self, engine, ctx) -> RuleResult:
36
        """Override evaluate to catch errors from get_cbat_pbat_ratio()."""
37
        try:
38
            return super().evaluate(engine, ctx)
39
        except IndexError as e:
40
            # get_cbat_pbat_ratio() failed because no home_battery data exists
41
            if "index 0 is out of bounds" in str(e):
42
                return RuleResult(
43
                    rule_id=self.rule_id,
44
                    task=self.task,
45
                    table=self.table,
46
                    kind=self.kind,
47
                    success=False,
48
                    message=f"⚠️ NO DATA FOUND: No home_battery carrier found in etrago_storage table for scenario {self.scenario}",
49
                    severity=Severity.WARNING,
50
                    schema=self.schema,
51
                    table_name=self.table_name,
52
                    rule_class=self.__class__.__name__
53
                )
54
            raise
55
56
    def get_query(self, ctx):
57
        """
58
        Query to compare storage and building-level home battery data.
59
60
        Returns a joined query that compares aggregated building-level data
61
        with the storage table data per bus.
62
        """
63
        # Get table names from config
64
        sources = config.datasets()["home_batteries"]["sources"]
65
        targets = config.datasets()["home_batteries"]["targets"]
66
67
        # Get cbat_pbat_ratio for capacity calculation (same as original sanity check)
68
        cbat_pbat_ratio = get_cbat_pbat_ratio()
69
70
        return f"""
71
        WITH storage_data AS (
72
            SELECT
73
                bus_id,
74
                el_capacity as storage_p_nom,
75
                el_capacity * {cbat_pbat_ratio} as storage_capacity
76
            FROM {sources["storage"]["schema"]}.{sources["storage"]["table"]}
77
            WHERE carrier = 'home_battery'
78
            AND scenario = '{self.scenario}'
79
        ),
80
        building_data AS (
81
            SELECT
82
                bus_id,
83
                SUM(p_nom) as building_p_nom,
84
                SUM(capacity) as building_capacity
85
            FROM {targets["home_batteries"]["schema"]}.{targets["home_batteries"]["table"]}
86
            WHERE scenario = '{self.scenario}'
87
            GROUP BY bus_id
88
        )
89
        SELECT
90
            COALESCE(s.bus_id, b.bus_id) as bus_id,
91
            ROUND(s.storage_p_nom::numeric, 6) as storage_p_nom,
92
            ROUND(s.storage_capacity::numeric, 6) as storage_capacity,
93
            ROUND(b.building_p_nom::numeric, 6) as building_p_nom,
94
            ROUND(b.building_capacity::numeric, 6) as building_capacity
95
        FROM storage_data s
96
        FULL OUTER JOIN building_data b ON s.bus_id = b.bus_id
97
        ORDER BY bus_id
98
        """
99
100
    def evaluate_df(self, df, ctx):
101
        """
102
        Evaluate the comparison between storage and building data.
103
104
        Parameters
105
        ----------
106
        df : pd.DataFrame
107
            DataFrame with storage and building data per bus
108
        ctx : dict
109
            Context information
110
111
        Returns
112
        -------
113
        RuleResult
114
            Validation result with success/failure status
115
        """
116
        if df.empty:
117
            return RuleResult(
118
                rule_id=self.rule_id,
119
                task=self.task,
120
                table=self.table,
121
                kind=self.kind,
122
                success=False,
123
                message=f"No home battery data found for scenario {self.scenario}",
124
                severity=Severity.WARNING,
125
                schema=self.schema,
126
                table_name=self.table_name,
127
                rule_class=self.__class__.__name__
128
            )
129
130
        # Check for buses that exist in only one source
131
        missing_in_storage = df[df["storage_p_nom"].isna()]
132
        missing_in_buildings = df[df["building_p_nom"].isna()]
133
134
        if not missing_in_storage.empty or not missing_in_buildings.empty:
135
            violations = []
136
            if not missing_in_storage.empty:
137
                violations.append(
138
                    f"{len(missing_in_storage)} bus(es) in buildings but not in storage: "
139
                    f"{missing_in_storage['bus_id'].tolist()[:5]}"
140
                )
141
            if not missing_in_buildings.empty:
142
                violations.append(
143
                    f"{len(missing_in_buildings)} bus(es) in storage but not in buildings: "
144
                    f"{missing_in_buildings['bus_id'].tolist()[:5]}"
145
                )
146
147
            return RuleResult(
148
                rule_id=self.rule_id,
149
                task=self.task,
150
                table=self.table,
151
                kind=self.kind,
152
                success=False,
153
                observed=len(missing_in_storage) + len(missing_in_buildings),
154
                expected=0,
155
                message=f"Bus mismatch between tables: {'; '.join(violations)}",
156
                severity=Severity.ERROR,
157
                schema=self.schema,
158
                table_name=self.table_name,
159
                rule_class=self.__class__.__name__
160
            )
161
162
        # Check if p_nom values match
163
        p_nom_mismatch = df[df["storage_p_nom"] != df["building_p_nom"]]
164
165
        # Check if capacity values match
166
        capacity_mismatch = df[df["storage_capacity"] != df["building_capacity"]]
167
168
        # Combine mismatches
169
        mismatches = pd.concat([p_nom_mismatch, capacity_mismatch]).drop_duplicates(subset=["bus_id"])
170
171
        if not mismatches.empty:
172
            # Calculate maximum differences
173
            max_p_nom_diff = (df["storage_p_nom"] - df["building_p_nom"]).abs().max()
174
            max_capacity_diff = (df["storage_capacity"] - df["building_capacity"]).abs().max()
175
176
            # Get all violations
177
            all_violations = mismatches[
178
                ["bus_id", "storage_p_nom", "building_p_nom", "storage_capacity", "building_capacity"]
179
            ].to_dict(orient="records")
180
181
            return RuleResult(
182
                rule_id=self.rule_id,
183
                task=self.task,
184
                table=self.table,
185
                kind=self.kind,
186
                success=False,
187
                observed=float(max(max_p_nom_diff, max_capacity_diff)),
188
                expected=0.0,
189
                message=(
190
                    f"Home battery aggregation mismatch for {len(mismatches)} bus(es): "
191
                    f"max p_nom diff={max_p_nom_diff:.6f}, max capacity diff={max_capacity_diff:.6f}. "
192
                    f"violations: {all_violations}"
193
                ),
194
                severity=Severity.ERROR,
195
                schema=self.schema,
196
                table_name=self.table_name,
197
                rule_class=self.__class__.__name__
198
            )
199
200
        # All checks passed
201
        return RuleResult(
202
            rule_id=self.rule_id,
203
            task=self.task,
204
            table=self.table,
205
            kind=self.kind,
206
            success=True,
207
            observed=0.0,
208
            expected=0.0,
209
            message=f"Home battery capacities correctly aggregated for all {len(df)} buses in scenario {self.scenario}",
210
            schema=self.schema,
211
            table_name=self.table_name,
212
            rule_class=self.__class__.__name__
213
        )