Passed
Pull Request — dev (#1375)
by
unknown
02:12
created

ValidationReport.__init__()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
Dataset for generating validation reports during pipeline execution.
3
4
This module provides the ValidationReport dataset which generates comprehensive
5
validation reports by aggregating all validation results from individual dataset
6
validation tasks executed during the pipeline run.
7
"""
8
9
import os
10
import time
11
12
from egon.data import logger, db as egon_db
13
from egon.data.datasets import Dataset
14
from egon_validation import RunContext
15
from egon_validation.runner.aggregate import collect, build_coverage, write_outputs
16
from egon_validation.report.generate import generate
17
from egon_validation.runner.coverage_analysis import discover_total_tables
18
from egon_validation.config import ENV_DB_URL
19
import os as _os
20
21
def generate_validation_report(**kwargs):
22
    """
23
    Generate validation report aggregating all validation results.
24
25
    This function collects all validation results from individual dataset
26
    validation tasks that were executed during the pipeline run and generates
27
    a comprehensive HTML report including:
28
    - All validation results from individual dataset tasks
29
    - Coverage analysis showing which tables were validated
30
    - Summary statistics and pass/fail counts
31
    """
32
    # Use same run_id as other validation tasks in the pipeline
33
    # This ensures all tasks read/write to the same directory
34
    run_id = (
35
        os.environ.get('AIRFLOW_CTX_DAG_RUN_ID') or
36
        kwargs.get('run_id') or
37
        (kwargs.get('ti') and hasattr(kwargs['ti'], 'dag_run') and kwargs['ti'].dag_run.run_id) or
38
        (kwargs.get('dag_run') and kwargs['dag_run'].run_id) or
39
        f"pipeline_validation_report_{int(time.time())}"
40
    )
41
42
    # Determine output directory at runtime (not import time)
43
    # Priority: EGON_VALIDATION_DIR env var > current working directory
44
    out_dir = os.path.join(
45
        os.environ.get('EGON_VALIDATION_DIR', os.getcwd()),
46
        "validation_runs"
47
    )
48
49
    try:
50
        ctx = RunContext(run_id=run_id, source="airflow", out_dir=out_dir)
51
        logger.info("Starting pipeline validation report generation", extra={
52
            "run_id": run_id,
53
            "output_dir": out_dir
54
        })
55
56
        # Make database connection available for table counting
57
        # Set the database URL from egon.data configuration
58
        try:
59
            # Get the database URL from egon.data
60
            db_url = str(egon_db.engine().url)
61
            # Temporarily set the environment variable so discover_total_tables can use it
62
            _os.environ[ENV_DB_URL] = db_url
63
            logger.info("Database connection available for table counting")
64
        except Exception as e:
65
            logger.warning(f"Could not set database URL for table counting: {e}")
66
67
        # Collect all validation results from existing validation runs
68
        collected = collect(ctx)
69
        coverage = build_coverage(ctx, collected)
70
        final_out_dir = write_outputs(ctx, collected, coverage)
71
        generate(ctx)
72
73
        report_path = os.path.join(final_out_dir, 'report.html')
74
        logger.info("Pipeline validation report generated successfully", extra={
75
            "report_path": report_path,
76
            "run_id": run_id,
77
            "total_results": len(collected.get("items", []))
78
        })
79
80
81
    except FileNotFoundError as e:
82
        logger.warning(
83
            f"No validation results found for pipeline validation report | "
84
            f"run_id={run_id} | out_dir={out_dir} | error={e} | "
85
            f"suggestion=This may be expected if no validation tasks were run during the pipeline"
86
        )
87
88
        # Don't raise - this is acceptable if no validations were run
89
    except Exception as e:
90
        logger.error("Pipeline validation report generation failed", extra={
91
            "run_id": run_id,
92
            "error": str(e),
93
            "error_type": type(e).__name__
94
        })
95
        raise
96
97
98
# Define the task
99
tasks = (generate_validation_report,)
100
101
102
class ValidationReport(Dataset):
103
    """
104
    Dataset for generating validation reports.
105
106
    This dataset generates a comprehensive HTML validation report by aggregating
107
    all validation results from individual dataset validation tasks that were
108
    executed during the pipeline run. It should be placed before sanity_checks
109
    in the DAG to ensure validation results are collected before final checks.
110
    """
111
    #:
112
    name: str = "ValidationReport"
113
    #:
114
    version: str = "0.0.2.dev"
115
116
    def __init__(self, dependencies):
117
        super().__init__(
118
            name=self.name,
119
            version=self.version,
120
            dependencies=dependencies,
121
            tasks=tasks,
122
        )
123