Passed
Pull Request — dev (#1375)
by
unknown
02:18
created

custom.sanity.gas_grid   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 808
Duplicated Lines 33.17 %

Importance

Changes 0
Metric Value
wmc 37
eloc 356
dl 268
loc 808
rs 9.44
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
A GasLinksConnections.__init__() 0 19 1
A GasOnePortConnections.__init__() 0 39 1
B CH4GridCapacity._get_reference_capacity() 0 72 7
A CH4GridCapacity.get_query() 0 8 1
A GasBusesIsolated.__init__() 0 31 1
A GasOnePortConnections.evaluate_df() 60 60 2
A GasBusesCount.__init__() 0 21 1
A GasBusesIsolated.get_query() 0 17 3
A GasLinksConnections.get_query() 0 8 1
B CH4GridCapacity.evaluate_df() 93 93 5
B GasBusesCount.evaluate_df() 0 104 5
A GasBusesCount.get_query() 0 8 1
A CH4GridCapacity.__init__() 0 18 1
A GasOnePortConnections.get_query() 0 31 3
A GasBusesIsolated.evaluate_df() 59 59 2
A GasLinksConnections.evaluate_df() 56 56 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
Sanity check validation rules for gas grid components.
3
4
Validates gas bus connectivity, counts, and grid consistency.
5
"""
6
7
from pathlib import Path
8
import pandas as pd
9
from egon_validation.rules.base import DataFrameRule, RuleResult, Severity
10
from typing import List, Tuple
11
from egon.data.datasets.scenario_parameters import get_sector_parameters
12
13
14
class GasBusesIsolated(DataFrameRule):
15
    """
16
    Validate that gas buses are not isolated.
17
18
    Checks that all gas buses (CH4, H2_grid, H2_saltcavern) in Germany
19
    are connected to at least one link. Isolated buses indicate potential
20
    issues with grid connectivity.
21
22
    The check examines buses that don't appear in either bus0 or bus1
23
    of the corresponding link carrier.
24
    """
25
26
    def __init__(self, table: str, rule_id: str, scenario: str = "eGon2035",
27
                 carrier: str = "CH4", **kwargs):
28
        """
29
        Parameters
30
        ----------
31
        table : str
32
            Target table (grid.egon_etrago_bus)
33
        rule_id : str
34
            Unique identifier for this validation rule
35
        scenario : str
36
            Scenario name ("eGon2035" or "eGon100RE")
37
        carrier : str
38
            Bus carrier type ("CH4", "H2_grid", or "H2_saltcavern")
39
        """
40
        super().__init__(rule_id=rule_id, table=table, scenario=scenario,
41
                         carrier=carrier, **kwargs)
42
        self.kind = "sanity"
43
        self.scenario = scenario
44
        self.carrier = carrier
45
46
        # Map bus carrier to corresponding link carrier
47
        self.carrier_mapping = {
48
            "eGon2035": {
49
                "CH4": "CH4",
50
                "H2_grid": "H2_feedin",
51
                "H2_saltcavern": "power_to_H2",
52
            },
53
            "eGon100RE": {
54
                "CH4": "CH4",
55
                "H2_grid": "H2_retrofit",
56
                "H2_saltcavern": "H2_extension",
57
            }
58
        }
59
60
    def get_query(self, ctx):
61
        """
62
        Query to find isolated gas buses.
63
64
        Returns a query that finds buses of the specified carrier that
65
        are not connected to any links (don't appear in bus0 or bus1
66
        of links with the corresponding carrier).
67
        """
68
        if self.scenario not in self.carrier_mapping:
69
            # Return empty query for unsupported scenarios
70
            return "SELECT NULL as bus_id, NULL as carrier, NULL as country LIMIT 0"
71
72
        link_carrier = self.carrier_mapping[self.scenario].get(self.carrier)
73
        if not link_carrier:
74
            return "SELECT NULL as bus_id, NULL as carrier, NULL as country LIMIT 0"
75
76
        return f"""
77
        SELECT bus_id, carrier, country
78
        FROM grid.egon_etrago_bus
79
        WHERE scn_name = '{self.scenario}'
80
        AND carrier = '{self.carrier}'
81
        AND country = 'DE'
82
        AND bus_id NOT IN (
83
            SELECT bus0
84
            FROM grid.egon_etrago_link
85
            WHERE scn_name = '{self.scenario}'
86
            AND carrier = '{link_carrier}'
87
        )
88
        AND bus_id NOT IN (
89
            SELECT bus1
90
            FROM grid.egon_etrago_link
91
            WHERE scn_name = '{self.scenario}'
92
            AND carrier = '{link_carrier}'
93
        )
94
        """
95
96 View Code Duplication
    def evaluate_df(self, df, ctx):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
97
        """
98
        Evaluate isolated buses.
99
100
        Parameters
101
        ----------
102
        df : pd.DataFrame
103
            DataFrame with isolated buses (bus_id, carrier, country)
104
        ctx : dict
105
            Context information
106
107
        Returns
108
        -------
109
        RuleResult
110
            Validation result with success/failure status
111
        """
112
        # Filter out NULL rows from unsupported scenarios
113
        df = df.dropna()
114
115
        isolated_count = len(df)
116
117
        if isolated_count == 0:
118
            return RuleResult(
119
                rule_id=self.rule_id,
120
                task=self.task,
121
                table=self.table,
122
                kind=self.kind,
123
                success=True,
124
                observed=0,
125
                expected=0,
126
                message=(
127
                    f"No isolated {self.carrier} buses found for {self.scenario} "
128
                    f"(all buses connected to grid)"
129
                ),
130
                severity=Severity.INFO,
131
                schema=self.schema,
132
                table_name=self.table_name,
133
                rule_class=self.__class__.__name__
134
            )
135
        else:
136
            # Show sample of isolated buses (first 5)
137
            sample_buses = df.head(5).to_dict(orient='records')
138
139
            return RuleResult(
140
                rule_id=self.rule_id,
141
                task=self.task,
142
                table=self.table,
143
                kind=self.kind,
144
                success=False,
145
                observed=isolated_count,
146
                expected=0,
147
                message=(
148
                    f"Found {isolated_count} isolated {self.carrier} buses for {self.scenario}. "
149
                    f"Sample (first 5): {sample_buses}"
150
                ),
151
                severity=Severity.ERROR,
152
                schema=self.schema,
153
                table_name=self.table_name,
154
                rule_class=self.__class__.__name__
155
            )
156
157
158
class GasBusesCount(DataFrameRule):
159
    """
160
    Validate gas grid bus count against SciGRID_gas data.
161
162
    Compares the number of gas grid buses (CH4 or H2_grid) in the database
163
    against the original SciGRID_gas node count for Germany. Allows for
164
    small deviations due to grid simplification or modifications.
165
    """
166
167
    def __init__(self, table: str, rule_id: str, scenario: str = "eGon2035",
168
                 carrier: str = "CH4", rtol: float = 0.10, **kwargs):
169
        """
170
        Parameters
171
        ----------
172
        table : str
173
            Target table (grid.egon_etrago_bus)
174
        rule_id : str
175
            Unique identifier for this validation rule
176
        scenario : str
177
            Scenario name ("eGon2035" or "eGon100RE")
178
        carrier : str
179
            Bus carrier type ("CH4" or "H2_grid")
180
        rtol : float
181
            Relative tolerance for bus count deviation (default: 0.10 = 10%)
182
        """
183
        super().__init__(rule_id=rule_id, table=table, scenario=scenario,
184
                         carrier=carrier, rtol=rtol, **kwargs)
185
        self.kind = "sanity"
186
        self.scenario = scenario
187
        self.carrier = carrier
188
189
    def get_query(self, ctx):
190
        """
191
        Query to count gas grid buses in Germany.
192
193
        Returns a query that counts buses of the specified carrier
194
        in Germany for the specified scenario.
195
        """
196
        return f"""
197
        SELECT COUNT(*) as bus_count
198
        FROM grid.egon_etrago_bus
199
        WHERE scn_name = '{self.scenario}'
200
        AND country = 'DE'
201
        AND carrier = '{self.carrier}'
202
        """
203
204
    def evaluate_df(self, df, ctx):
205
        """
206
        Evaluate bus count against SciGRID_gas reference data.
207
208
        Parameters
209
        ----------
210
        df : pd.DataFrame
211
            DataFrame with bus_count column
212
        ctx : dict
213
            Context information
214
215
        Returns
216
        -------
217
        RuleResult
218
            Validation result with success/failure status
219
        """
220
        if df.empty or df["bus_count"].isna().all():
221
            return RuleResult(
222
                rule_id=self.rule_id,
223
                task=self.task,
224
                table=self.table,
225
                kind=self.kind,
226
                success=False,
227
                message=f"No {self.carrier} buses found for scenario {self.scenario}",
228
                severity=Severity.WARNING,
229
                schema=self.schema,
230
                table_name=self.table_name,
231
                rule_class=self.__class__.__name__
232
            )
233
234
        observed_count = int(df["bus_count"].values[0])
235
236
        # Get expected count from SciGRID_gas data
237
        try:
238
            target_file = Path(".") / "datasets" / "gas_data" / "data" / "IGGIELGN_Nodes.csv"
239
            grid_buses_df = pd.read_csv(
240
                target_file,
241
                delimiter=";",
242
                decimal=".",
243
                usecols=["country_code"],
244
            )
245
            grid_buses_df = grid_buses_df[
246
                grid_buses_df["country_code"].str.match("DE")
247
            ]
248
            expected_count = len(grid_buses_df.index)
249
        except Exception as e:
250
            return RuleResult(
251
                rule_id=self.rule_id,
252
                task=self.task,
253
                table=self.table,
254
                kind=self.kind,
255
                success=False,
256
                message=f"Error reading SciGRID_gas reference data: {str(e)}",
257
                severity=Severity.ERROR,
258
                schema=self.schema,
259
                table_name=self.table_name,
260
                rule_class=self.__class__.__name__
261
            )
262
263
        # Calculate relative deviation
264
        rtol = self.params.get("rtol", 0.10)
265
        deviation = abs(observed_count - expected_count) / expected_count
266
267
        success = deviation <= rtol
268
269
        deviation_pct = deviation * 100
270
271
        if success:
272
            return RuleResult(
273
                rule_id=self.rule_id,
274
                task=self.task,
275
                table=self.table,
276
                kind=self.kind,
277
                success=True,
278
                observed=float(observed_count),
279
                expected=float(expected_count),
280
                message=(
281
                    f"{self.carrier} bus count valid for {self.scenario}: "
282
                    f"{observed_count} buses (deviation: {deviation_pct:.2f}%, "
283
                    f"tolerance: {rtol*100:.2f}%)"
284
                ),
285
                severity=Severity.INFO,
286
                schema=self.schema,
287
                table_name=self.table_name,
288
                rule_class=self.__class__.__name__
289
            )
290
        else:
291
            return RuleResult(
292
                rule_id=self.rule_id,
293
                task=self.task,
294
                table=self.table,
295
                kind=self.kind,
296
                success=False,
297
                observed=float(observed_count),
298
                expected=float(expected_count),
299
                message=(
300
                    f"{self.carrier} bus count deviation too large for {self.scenario}: "
301
                    f"{observed_count} vs {expected_count} expected "
302
                    f"(deviation: {deviation_pct:.2f}%, tolerance: {rtol*100:.2f}%)"
303
                ),
304
                severity=Severity.ERROR,
305
                schema=self.schema,
306
                table_name=self.table_name,
307
                rule_class=self.__class__.__name__
308
            )
309
310
311
class GasOnePortConnections(DataFrameRule):
312
    """
313
    Validate that gas one-port components are connected to existing buses.
314
315
    Checks that all gas one-port components (loads, generators, stores) are
316
    connected to buses that exist in the database with the correct carrier type.
317
318
    This validation ensures data integrity across the etrago tables and prevents
319
    orphaned components that would cause errors in network optimization.
320
    """
321
322
    def __init__(self, table: str, rule_id: str, scenario: str = "eGon2035",
323
                 component_type: str = "load", component_carrier: str = "CH4_for_industry",
324
                 bus_conditions: List[Tuple[str, str]] = None, **kwargs):
325
        """
326
        Parameters
327
        ----------
328
        table : str
329
            Target table (grid.egon_etrago_load, grid.egon_etrago_generator,
330
            or grid.egon_etrago_store)
331
        rule_id : str
332
            Unique identifier for this validation rule
333
        scenario : str
334
            Scenario name ("eGon2035" or "eGon100RE")
335
        component_type : str
336
            Type of component ("load", "generator", or "store")
337
        component_carrier : str
338
            Carrier of the component to check
339
        bus_conditions : List[Tuple[str, str]]
340
            List of (bus_carrier, country_condition) tuples that define valid buses
341
            Examples:
342
            - [("CH4", "= 'DE'")] - CH4 buses in Germany
343
            - [("CH4", "!= 'DE'")] - CH4 buses outside Germany
344
            - [("H2_grid", "= 'DE'"), ("AC", "!= 'DE'")] - H2_grid in DE OR AC abroad
345
        """
346
        super().__init__(rule_id=rule_id, table=table, scenario=scenario,
347
                         component_type=component_type,
348
                         component_carrier=component_carrier,
349
                         bus_conditions=bus_conditions or [], **kwargs)
350
        self.kind = "sanity"
351
        self.scenario = scenario
352
        self.component_type = component_type
353
        self.component_carrier = component_carrier
354
        self.bus_conditions = bus_conditions or []
355
356
        # Map component type to ID column name
357
        self.id_column_map = {
358
            "load": "load_id",
359
            "generator": "generator_id",
360
            "store": "store_id"
361
        }
362
363
    def get_query(self, ctx):
364
        """
365
        Query to find one-port components not connected to valid buses.
366
367
        Returns a query that finds components of the specified type and carrier
368
        that are NOT connected to any of the valid bus types specified in
369
        bus_conditions.
370
        """
371
        if not self.bus_conditions:
372
            # No bus conditions specified - skip validation
373
            return "SELECT NULL as component_id, NULL as bus, NULL as carrier LIMIT 0"
374
375
        id_column = self.id_column_map.get(self.component_type, "id")
376
377
        # Build bus subqueries for each condition
378
        bus_subqueries = []
379
        for bus_carrier, country_cond in self.bus_conditions:
380
            subquery = f"""
381
                (SELECT bus_id
382
                FROM grid.egon_etrago_bus
383
                WHERE scn_name = '{self.scenario}'
384
                AND carrier = '{bus_carrier}'
385
                AND country {country_cond})
386
            """
387
            bus_subqueries.append(subquery)
388
389
        # Build NOT IN clauses for all bus conditions
390
        not_in_clauses = [f"bus NOT IN {subq}" for subq in bus_subqueries]
391
        combined_condition = " AND ".join(not_in_clauses)
392
393
        return f"""
394
        SELECT {id_column} as component_id, bus, carrier, scn_name
395
        FROM {self.table}
396
        WHERE scn_name = '{self.scenario}'
397
        AND carrier = '{self.component_carrier}'
398
        AND {combined_condition}
399
        """
400
401 View Code Duplication
    def evaluate_df(self, df, ctx):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
402
        """
403
        Evaluate one-port component connections.
404
405
        Parameters
406
        ----------
407
        df : pd.DataFrame
408
            DataFrame with disconnected components (component_id, bus, carrier)
409
        ctx : dict
410
            Context information
411
412
        Returns
413
        -------
414
        RuleResult
415
            Validation result with success/failure status
416
        """
417
        # Filter out NULL rows
418
        df = df.dropna()
419
420
        disconnected_count = len(df)
421
422
        if disconnected_count == 0:
423
            return RuleResult(
424
                rule_id=self.rule_id,
425
                task=self.task,
426
                table=self.table,
427
                kind=self.kind,
428
                success=True,
429
                observed=0,
430
                expected=0,
431
                message=(
432
                    f"All {self.component_carrier} {self.component_type}s connected "
433
                    f"to valid buses for {self.scenario}"
434
                ),
435
                severity=Severity.INFO,
436
                schema=self.schema,
437
                table_name=self.table_name,
438
                rule_class=self.__class__.__name__
439
            )
440
        else:
441
            # Show sample of disconnected components (first 5)
442
            sample_components = df.head(5).to_dict(orient='records')
443
444
            return RuleResult(
445
                rule_id=self.rule_id,
446
                task=self.task,
447
                table=self.table,
448
                kind=self.kind,
449
                success=False,
450
                observed=disconnected_count,
451
                expected=0,
452
                message=(
453
                    f"Found {disconnected_count} disconnected {self.component_carrier} "
454
                    f"{self.component_type}s for {self.scenario}. "
455
                    f"Sample (first 5): {sample_components}"
456
                ),
457
                severity=Severity.ERROR,
458
                schema=self.schema,
459
                table_name=self.table_name,
460
                rule_class=self.__class__.__name__
461
            )
462
463
464
class CH4GridCapacity(DataFrameRule):
465
    """
466
    Validate CH4 grid capacity against SciGRID_gas reference data.
467
468
    Compares the total capacity (p_nom) of CH4 pipelines in Germany from the
469
    database against the original SciGRID_gas pipeline data. For eGon100RE,
470
    the expected capacity is adjusted to account for the share of CH4 pipelines
471
    retrofitted to H2 pipelines (based on PyPSA-eur-sec parameters).
472
473
    This validation ensures that the CH4 grid capacity in the database matches
474
    the imported SciGRID_gas data, accounting for any scenario-specific modifications.
475
    """
476
477
    def __init__(self, table: str, rule_id: str, scenario: str = "eGon2035",
478
                 rtol: float = 0.10, **kwargs):
479
        """
480
        Parameters
481
        ----------
482
        table : str
483
            Target table (grid.egon_etrago_link)
484
        rule_id : str
485
            Unique identifier for this validation rule
486
        scenario : str
487
            Scenario name ("eGon2035" or "eGon100RE")
488
        rtol : float
489
            Relative tolerance for capacity deviation (default: 0.10 = 10%)
490
        """
491
        super().__init__(rule_id=rule_id, table=table, scenario=scenario,
492
                         rtol=rtol, **kwargs)
493
        self.kind = "sanity"
494
        self.scenario = scenario
495
496
    def get_query(self, ctx):
497
        """
498
        Query to get total CH4 pipeline capacity in Germany.
499
500
        Returns a query that sums the p_nom of all CH4 links where both
501
        bus0 and bus1 are in Germany.
502
        """
503
        return f"""
504
        SELECT SUM(p_nom::numeric) as total_p_nom
505
        FROM grid.egon_etrago_link
506
        WHERE scn_name = '{self.scenario}'
507
        AND carrier = 'CH4'
508
        AND bus0 IN (
509
            SELECT bus_id
510
            FROM grid.egon_etrago_bus
511
            WHERE scn_name = '{self.scenario}'
512
            AND country = 'DE'
513
            AND carrier = 'CH4'
514
        )
515
        AND bus1 IN (
516
            SELECT bus_id
517
            FROM grid.egon_etrago_bus
518
            WHERE scn_name = '{self.scenario}'
519
            AND country = 'DE'
520
            AND carrier = 'CH4'
521
        )
522
        """
523
524
    def _get_reference_capacity(self):
525
        """
526
        Calculate reference capacity from SciGRID_gas pipeline data.
527
528
        Returns
529
        -------
530
        float
531
            Expected total pipeline capacity for the scenario
532
        """
533
        try:
534
            # Read pipeline segments from SciGRID_gas
535
            target_file = (
536
                Path(".")
537
                / "datasets"
538
                / "gas_data"
539
                / "data"
540
                / "IGGIELGN_PipeSegments.csv"
541
            )
542
543
            pipelines = pd.read_csv(
544
                target_file,
545
                delimiter=";",
546
                decimal=".",
547
                usecols=["id", "node_id", "country_code", "param"],
548
            )
549
550
            # Parse bus0, bus1 and countries
551
            pipelines["bus0"] = pipelines["node_id"].apply(lambda x: x.split(",")[0])
552
            pipelines["bus1"] = pipelines["node_id"].apply(lambda x: x.split(",")[1])
553
            pipelines["country_0"] = pipelines["country_code"].apply(lambda x: x.split(",")[0])
554
            pipelines["country_1"] = pipelines["country_code"].apply(lambda x: x.split(",")[1])
555
556
            # Filter for pipelines within Germany
557
            germany_pipelines = pipelines[
558
                (pipelines["country_0"] == "DE") & (pipelines["country_1"] == "DE")
559
            ]
560
561
            # Read pipeline classification for capacity mapping
562
            classification_file = (
563
                Path(".")
564
                / "data_bundle_egon_data"
565
                / "pipeline_classification_gas"
566
                / "pipeline_classification.csv"
567
            )
568
569
            classification = pd.read_csv(
570
                classification_file,
571
                delimiter=",",
572
                usecols=["classification", "max_transport_capacity_Gwh/d"],
573
            )
574
575
            # Map pipeline param to capacity
576
            param_to_capacity = dict(
577
                zip(classification["classification"],
578
                    classification["max_transport_capacity_Gwh/d"])
579
            )
580
581
            germany_pipelines["p_nom"] = germany_pipelines["param"].map(param_to_capacity)
582
583
            # Sum total capacity
584
            total_p_nom = germany_pipelines["p_nom"].sum()
585
586
            # Adjust for eGon100RE (H2 retrofit share)
587
            if self.scenario == "eGon100RE":
588
                scn_params = get_sector_parameters("gas", "eGon100RE")
589
                h2_retrofit_share = scn_params["retrofitted_CH4pipeline-to-H2pipeline_share"]
590
                total_p_nom = total_p_nom * (1 - h2_retrofit_share)
591
592
            return float(total_p_nom)
593
594
        except Exception as e:
595
            raise ValueError(f"Error reading SciGRID_gas reference data: {str(e)}")
596
597 View Code Duplication
    def evaluate_df(self, df, ctx):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
598
        """
599
        Evaluate CH4 grid capacity against reference data.
600
601
        Parameters
602
        ----------
603
        df : pd.DataFrame
604
            DataFrame with total_p_nom column
605
        ctx : dict
606
            Context information
607
608
        Returns
609
        -------
610
        RuleResult
611
            Validation result with success/failure status
612
        """
613
        if df.empty or df["total_p_nom"].isna().all():
614
            return RuleResult(
615
                rule_id=self.rule_id,
616
                task=self.task,
617
                table=self.table,
618
                kind=self.kind,
619
                success=False,
620
                message=f"No CH4 links found for scenario {self.scenario}",
621
                severity=Severity.WARNING,
622
                schema=self.schema,
623
                table_name=self.table_name,
624
                rule_class=self.__class__.__name__
625
            )
626
627
        observed_capacity = float(df["total_p_nom"].values[0])
628
629
        # Get expected capacity from SciGRID_gas data
630
        try:
631
            expected_capacity = self._get_reference_capacity()
632
        except Exception as e:
633
            return RuleResult(
634
                rule_id=self.rule_id,
635
                task=self.task,
636
                table=self.table,
637
                kind=self.kind,
638
                success=False,
639
                message=str(e),
640
                severity=Severity.ERROR,
641
                schema=self.schema,
642
                table_name=self.table_name,
643
                rule_class=self.__class__.__name__
644
            )
645
646
        # Calculate relative deviation
647
        rtol = self.params.get("rtol", 0.10)
648
        deviation = abs(observed_capacity - expected_capacity) / expected_capacity
649
650
        success = deviation <= rtol
651
        deviation_pct = deviation * 100
652
653
        if success:
654
            return RuleResult(
655
                rule_id=self.rule_id,
656
                task=self.task,
657
                table=self.table,
658
                kind=self.kind,
659
                success=True,
660
                observed=observed_capacity,
661
                expected=expected_capacity,
662
                message=(
663
                    f"CH4 grid capacity valid for {self.scenario}: "
664
                    f"{observed_capacity:.2f} GWh/d (deviation: {deviation_pct:.2f}%, "
665
                    f"tolerance: {rtol*100:.2f}%)"
666
                ),
667
                severity=Severity.INFO,
668
                schema=self.schema,
669
                table_name=self.table_name,
670
                rule_class=self.__class__.__name__
671
            )
672
        else:
673
            return RuleResult(
674
                rule_id=self.rule_id,
675
                task=self.task,
676
                table=self.table,
677
                kind=self.kind,
678
                success=False,
679
                observed=observed_capacity,
680
                expected=expected_capacity,
681
                message=(
682
                    f"CH4 grid capacity deviation too large for {self.scenario}: "
683
                    f"{observed_capacity:.2f} vs {expected_capacity:.2f} GWh/d expected "
684
                    f"(deviation: {deviation_pct:.2f}%, tolerance: {rtol*100:.2f}%)"
685
                ),
686
                severity=Severity.ERROR,
687
                schema=self.schema,
688
                table_name=self.table_name,
689
                rule_class=self.__class__.__name__
690
            )
691
692
693
class GasLinksConnections(DataFrameRule):
694
    """
695
    Validate that gas links are connected to existing buses.
696
697
    Checks that all gas links (two-port components) have both bus0 and bus1
698
    connected to buses that exist in the database. This validation ensures
699
    data integrity and prevents orphaned links that would cause errors in
700
    network optimization.
701
702
    This check covers all gas-related link carriers including CH4 pipelines,
703
    H2 conversion links, and power-to-gas links.
704
    """
705
706
    def __init__(self, table: str, rule_id: str, scenario: str = "eGon2035",
707
                 carrier: str = "CH4", **kwargs):
708
        """
709
        Parameters
710
        ----------
711
        table : str
712
            Target table (grid.egon_etrago_link)
713
        rule_id : str
714
            Unique identifier for this validation rule
715
        scenario : str
716
            Scenario name ("eGon2035" or "eGon100RE")
717
        carrier : str
718
            Link carrier type to check (e.g., "CH4", "H2_feedin", "power_to_H2")
719
        """
720
        super().__init__(rule_id=rule_id, table=table, scenario=scenario,
721
                         carrier=carrier, **kwargs)
722
        self.kind = "sanity"
723
        self.scenario = scenario
724
        self.carrier = carrier
725
726
    def get_query(self, ctx):
727
        """
728
        Query to find links with missing buses.
729
730
        Returns a query that finds links where either bus0 or bus1
731
        does not exist in the bus table for the same scenario.
732
        """
733
        return f"""
734
        SELECT link_id, bus0, bus1, carrier, scn_name
735
        FROM grid.egon_etrago_link
736
        WHERE scn_name = '{self.scenario}'
737
        AND carrier = '{self.carrier}'
738
        AND (
739
            bus0 NOT IN (
740
                SELECT bus_id
741
                FROM grid.egon_etrago_bus
742
                WHERE scn_name = '{self.scenario}'
743
            )
744
            OR bus1 NOT IN (
745
                SELECT bus_id
746
                FROM grid.egon_etrago_bus
747
                WHERE scn_name = '{self.scenario}'
748
            )
749
        )
750
        """
751
752 View Code Duplication
    def evaluate_df(self, df, ctx):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
753
        """
754
        Evaluate link connections.
755
756
        Parameters
757
        ----------
758
        df : pd.DataFrame
759
            DataFrame with links that have missing buses
760
        ctx : dict
761
            Context information
762
763
        Returns
764
        -------
765
        RuleResult
766
            Validation result with success/failure status
767
        """
768
        disconnected_count = len(df)
769
770
        if disconnected_count == 0:
771
            return RuleResult(
772
                rule_id=self.rule_id,
773
                task=self.task,
774
                table=self.table,
775
                kind=self.kind,
776
                success=True,
777
                observed=0,
778
                expected=0,
779
                message=(
780
                    f"All {self.carrier} links connected to valid buses for {self.scenario}"
781
                ),
782
                severity=Severity.INFO,
783
                schema=self.schema,
784
                table_name=self.table_name,
785
                rule_class=self.__class__.__name__
786
            )
787
        else:
788
            # Show sample of disconnected links (first 5)
789
            sample_links = df.head(5).to_dict(orient='records')
790
791
            return RuleResult(
792
                rule_id=self.rule_id,
793
                task=self.task,
794
                table=self.table,
795
                kind=self.kind,
796
                success=False,
797
                observed=disconnected_count,
798
                expected=0,
799
                message=(
800
                    f"Found {disconnected_count} disconnected {self.carrier} links "
801
                    f"for {self.scenario}. "
802
                    f"Sample (first 5): {sample_links}"
803
                ),
804
                severity=Severity.ERROR,
805
                schema=self.schema,
806
                table_name=self.table_name,
807
                rule_class=self.__class__.__name__
808
            )
809