Passed
Pull Request — dev (#1375)
by
unknown
02:12
created

data.validation_utils.create_validation_tasks()   C

Complexity

Conditions 10

Size

Total Lines 90
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 50
dl 0
loc 90
rs 5.8362
c 0
b 0
f 0
cc 10
nop 3

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.validation_utils.create_validation_tasks() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Airflow integration for egon-validation."""
2
3
from typing import Dict, List
4
from airflow.operators.python import PythonOperator
5
from egon_validation import run_validations, RunContext
6
from egon_validation.rules.base import Rule
7
import logging
8
9
logger = logging.getLogger(__name__)
10
11
def create_validation_tasks(
12
    validation_dict: Dict[str, List[Rule]],
13
    dataset_name: str,
14
    on_failure: str = "continue"
15
) -> List[PythonOperator]:
16
    """Convert validation dict to Airflow tasks.
17
18
    Args:
19
        validation_dict: {"task_name": [Rule1(), Rule2()]}
20
        dataset_name: Name of dataset
21
        on_failure: "continue" or "fail"
22
23
    Returns:
24
        List of PythonOperator tasks
25
    """
26
    if not validation_dict:
27
        return []
28
29
    tasks = []
30
31
    for task_name, rules in validation_dict.items():
32
        def make_callable(rules, task_name):
33
            def run_validation(**context):
34
                import os
35
                import time
36
                from datetime import datetime
37
                from egon.data import db as egon_db
38
39
                # Use same run_id as validation report for consistency
40
                # This allows the validation report to collect results from all validation tasks
41
                run_id = (
42
                    os.environ.get('AIRFLOW_CTX_DAG_RUN_ID') or
43
                    context.get('run_id') or
44
                    (context.get('ti') and hasattr(context['ti'], 'dag_run') and context['ti'].dag_run.run_id) or
45
                    (context.get('dag_run') and context['dag_run'].run_id) or
46
                    f"airflow-{dataset_name}-{task_name}-{int(time.time())}"
47
                )
48
49
                # Use absolute path to ensure consistent location regardless of working directory
50
                # Priority: EGON_VALIDATION_DIR env var > current working directory
51
                out_dir = os.path.join(
52
                    os.environ.get('EGON_VALIDATION_DIR', os.getcwd()),
53
                    "validation_runs"
54
                )
55
56
                # Include execution timestamp in task name so retries write to separate directories
57
                # The validation report will filter to keep only the most recent execution per task
58
                execution_date = context.get('execution_date') or datetime.now()
59
                timestamp = execution_date.strftime('%Y%m%dT%H%M%S')
60
                full_task_name = f"{dataset_name}.{task_name}.{timestamp}"
61
62
                logger.info(f"Validation: {full_task_name} (run_id: {run_id})")
63
64
                # Use existing engine from egon.data.db
65
                engine = egon_db.engine()
66
67
                # Set task and dataset on all rules (required by Rule base class)
68
                for rule in rules:
69
                    if not hasattr(rule, 'task') or rule.task is None:
70
                        rule.task = task_name
71
                    if not hasattr(rule, 'dataset') or rule.dataset is None:
72
                        rule.dataset = dataset_name
73
74
                ctx = RunContext(run_id=run_id, source="airflow", out_dir=out_dir)
75
                results = run_validations(engine, ctx, rules, full_task_name)
76
77
                total = len(results)
78
                failed = sum(1 for r in results if not r.success)
79
80
                logger.info(f"Complete: {total - failed}/{total} passed")
81
82
                if failed > 0 and on_failure == "fail":
83
                    raise Exception(f"{failed}/{total} validations failed")
84
85
                return {"total": total, "passed": total - failed, "failed": failed}
86
87
            return run_validation
88
89
        func = make_callable(rules, task_name)
90
        func.__name__ = f"validate_{task_name}"
91
92
        operator = PythonOperator(
93
            task_id=f"{dataset_name}.validate.{task_name}",
94
            python_callable=func,
95
            provide_context=True,
96
        )
97
98
        tasks.append(operator)
99
100
    return tasks
101