Passed
Pull Request — dev (#1375)
by
unknown
02:18
created

data.validation_utils.create_validation_tasks()   C

Complexity

Conditions 10

Size

Total Lines 120
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 55
dl 0
loc 120
rs 5.6727
c 0
b 0
f 0
cc 10
nop 3

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.validation_utils.create_validation_tasks() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Airflow integration for egon-validation."""
2
3
from typing import Any, Dict, List
4
from airflow.operators.python import PythonOperator
5
from egon_validation import run_validations, RunContext
6
from egon_validation.rules.base import Rule
7
import logging
8
9
logger = logging.getLogger(__name__)
10
11
12
def _resolve_context_value(value: Any, boundary: str) -> Any:
13
    """Resolve a value that may be boundary-dependent.
14
15
    Args:
16
        value: The value to resolve. Can be:
17
            - A dict with boundary keys: {"Schleswig-Holstein": 27, "Everything": 537}
18
            - Any other value (returned as-is)
19
        boundary: Current dataset boundary setting
20
21
    Returns:
22
        Resolved value based on current boundary
23
24
    Examples:
25
        >>> _resolve_context_value({"Schleswig-Holstein": 27, "Everything": 537},
26
        ...                        "Schleswig-Holstein")
27
        27
28
29
        >>> _resolve_context_value(42, "Everything")
30
        42
31
    """
32
    # If not a dict, return as-is
33
    if not isinstance(value, dict):
34
        return value
35
36
    # Try to resolve by boundary
37
    if boundary in value:
38
        logger.debug(f"Resolved boundary-dependent value: {boundary} -> {value[boundary]}")
39
        return value[boundary]
40
41
    # If dict doesn't match boundary pattern, return as-is
42
    # This handles cases like column_types dicts which are not context-dependent
43
    return value
44
45
46
def _resolve_rule_params(rule: Rule, boundary: str) -> None:
47
    """Resolve boundary-dependent parameters in a rule.
48
49
    Modifies rule.params in-place, resolving any dict values that match
50
    boundary patterns.
51
52
    Args:
53
        rule: The validation rule to process
54
        boundary: Current dataset boundary setting
55
    """
56
    if not hasattr(rule, 'params') or not isinstance(rule.params, dict):
57
        return
58
59
    # Resolve all parameter values
60
    for param_name, param_value in rule.params.items():
61
        resolved_value = _resolve_context_value(param_value, boundary)
62
63
        # If the value was resolved (changed), update it
64
        if resolved_value is not param_value:
65
            logger.info(
66
                f"Rule {rule.rule_id}: Resolved {param_name} for "
67
                f"boundary='{boundary}'"
68
            )
69
            rule.params[param_name] = resolved_value
70
71
def create_validation_tasks(
72
    validation_dict: Dict[str, List[Rule]],
73
    dataset_name: str,
74
    on_failure: str = "continue"
75
) -> List[PythonOperator]:
76
    """Convert validation dict to Airflow tasks.
77
78
    Automatically resolves boundary-dependent parameters in validation rules.
79
    Parameters can be specified as dicts with boundary keys:
80
81
    - Boundary-dependent: {"Schleswig-Holstein": 27, "Everything": 537}
82
83
    The appropriate value is selected based on the current configuration.
84
85
    Args:
86
        validation_dict: {"task_name": [Rule1(), Rule2()]}
87
        dataset_name: Name of dataset
88
        on_failure: "continue" or "fail"
89
90
    Returns:
91
        List of PythonOperator tasks
92
93
    Example:
94
        >>> validation_dict = {
95
        ...     "data_quality": [
96
        ...         RowCountValidation(
97
        ...             table="boundaries.vg250_krs",
98
        ...             rule_id="TEST_ROW_COUNT",
99
        ...             expected_count={"Schleswig-Holstein": 27, "Everything": 537}
100
        ...         )
101
        ...     ]
102
        ... }
103
        >>> tasks = create_validation_tasks(validation_dict, "VG250")
104
    """
105
    if not validation_dict:
106
        return []
107
108
    tasks = []
109
110
    for task_name, rules in validation_dict.items():
111
        def make_callable(rules, task_name):
112
            def run_validation(**context):
113
                import os
114
                import time
115
                from datetime import datetime
116
                from egon.data import db as egon_db
117
                from egon.data.config import settings
118
119
                # Use same run_id as validation report for consistency
120
                # This allows the validation report to collect results from all validation tasks
121
                run_id = (
122
                    os.environ.get('AIRFLOW_CTX_DAG_RUN_ID') or
123
                    context.get('run_id') or
124
                    (context.get('ti') and hasattr(context['ti'], 'dag_run') and context['ti'].dag_run.run_id) or
125
                    (context.get('dag_run') and context['dag_run'].run_id) or
126
                    f"airflow-{dataset_name}-{task_name}-{int(time.time())}"
127
                )
128
129
                # Use absolute path to ensure consistent location regardless of working directory
130
                # Priority: EGON_VALIDATION_DIR env var > current working directory
131
                out_dir = os.path.join(
132
                    os.environ.get('EGON_VALIDATION_DIR', os.getcwd()),
133
                    "validation_runs"
134
                )
135
136
                # Include execution timestamp in task name so retries write to separate directories
137
                # The validation report will filter to keep only the most recent execution per task
138
                execution_date = context.get('execution_date') or datetime.now()
139
                timestamp = execution_date.strftime('%Y%m%dT%H%M%S')
140
                full_task_name = f"{dataset_name}.{task_name}.{timestamp}"
141
142
                logger.info(f"Validation: {full_task_name} (run_id: {run_id})")
143
144
                # Use existing engine from egon.data.db
145
                engine = egon_db.engine()
146
147
                # Get current configuration context
148
                config = settings()["egon-data"]
149
                boundary = config["--dataset-boundary"]
150
151
                logger.info(f"Resolving validation parameters for boundary='{boundary}'")
152
153
                # Set task and dataset on all rules (required by Rule base class)
154
                # Also resolve boundary-dependent parameters
155
                for rule in rules:
156
                    if not hasattr(rule, 'task') or rule.task is None:
157
                        rule.task = task_name
158
                    if not hasattr(rule, 'dataset') or rule.dataset is None:
159
                        rule.dataset = dataset_name
160
161
                    # Automatically resolve boundary-dependent parameters
162
                    _resolve_rule_params(rule, boundary)
163
164
                ctx = RunContext(run_id=run_id, source="airflow", out_dir=out_dir)
165
                results = run_validations(engine, ctx, rules, full_task_name)
166
167
                total = len(results)
168
                failed = sum(1 for r in results if not r.success)
169
170
                logger.info(f"Complete: {total - failed}/{total} passed")
171
172
                if failed > 0 and on_failure == "fail":
173
                    raise Exception(f"{failed}/{total} validations failed")
174
175
                return {"total": total, "passed": total - failed, "failed": failed}
176
177
            return run_validation
178
179
        func = make_callable(rules, task_name)
180
        func.__name__ = f"validate_{task_name}"
181
182
        operator = PythonOperator(
183
            task_id=f"{dataset_name}.validate.{task_name}",
184
            python_callable=func,
185
            provide_context=True,
186
        )
187
188
        tasks.append(operator)
189
190
    return tasks
191