|
1
|
|
|
""" |
|
2
|
|
|
Dataset for generating validation reports during pipeline execution. |
|
3
|
|
|
|
|
4
|
|
|
This module provides the ValidationReport dataset which generates comprehensive |
|
5
|
|
|
validation reports by aggregating all validation results from individual dataset |
|
6
|
|
|
validation tasks executed during the pipeline run. |
|
7
|
|
|
""" |
|
8
|
|
|
|
|
9
|
|
|
import os |
|
10
|
|
|
import time |
|
11
|
|
|
|
|
12
|
|
|
from egon.data import logger, db as egon_db |
|
13
|
|
|
from egon.data.datasets import Dataset |
|
14
|
|
|
from egon_validation import RunContext |
|
15
|
|
|
from egon_validation.runner.aggregate import collect, build_coverage, write_outputs |
|
16
|
|
|
from egon_validation.report.generate import generate |
|
17
|
|
|
from egon_validation.runner.coverage_analysis import discover_total_tables |
|
18
|
|
|
from egon_validation.config import ENV_DB_URL |
|
19
|
|
|
import os as _os |
|
20
|
|
|
|
|
21
|
|
|
def generate_validation_report(**kwargs): |
|
22
|
|
|
""" |
|
23
|
|
|
Generate validation report aggregating all validation results. |
|
24
|
|
|
|
|
25
|
|
|
This function collects all validation results from individual dataset |
|
26
|
|
|
validation tasks that were executed during the pipeline run and generates |
|
27
|
|
|
a comprehensive HTML report including: |
|
28
|
|
|
- All validation results from individual dataset tasks |
|
29
|
|
|
- Coverage analysis showing which tables were validated |
|
30
|
|
|
- Summary statistics and pass/fail counts |
|
31
|
|
|
""" |
|
32
|
|
|
# Use same run_id as other validation tasks in the pipeline |
|
33
|
|
|
# This ensures all tasks read/write to the same directory |
|
34
|
|
|
run_id = ( |
|
35
|
|
|
os.environ.get('AIRFLOW_CTX_DAG_RUN_ID') or |
|
36
|
|
|
kwargs.get('run_id') or |
|
37
|
|
|
(kwargs.get('ti') and hasattr(kwargs['ti'], 'dag_run') and kwargs['ti'].dag_run.run_id) or |
|
38
|
|
|
(kwargs.get('dag_run') and kwargs['dag_run'].run_id) or |
|
39
|
|
|
f"pipeline_validation_report_{int(time.time())}" |
|
40
|
|
|
) |
|
41
|
|
|
|
|
42
|
|
|
# Determine output directory at runtime (not import time) |
|
43
|
|
|
# Priority: EGON_VALIDATION_DIR env var > current working directory |
|
44
|
|
|
out_dir = os.path.join( |
|
45
|
|
|
os.environ.get('EGON_VALIDATION_DIR', os.getcwd()), |
|
46
|
|
|
"validation_runs" |
|
47
|
|
|
) |
|
48
|
|
|
|
|
49
|
|
|
try: |
|
50
|
|
|
ctx = RunContext(run_id=run_id, source="airflow", out_dir=out_dir) |
|
51
|
|
|
logger.info("Starting pipeline validation report generation", extra={ |
|
52
|
|
|
"run_id": run_id, |
|
53
|
|
|
"output_dir": out_dir |
|
54
|
|
|
}) |
|
55
|
|
|
|
|
56
|
|
|
# Make database connection available for table counting |
|
57
|
|
|
# Set the database URL from egon.data configuration |
|
58
|
|
|
try: |
|
59
|
|
|
# Get the database URL from egon.data |
|
60
|
|
|
db_url = str(egon_db.engine().url) |
|
61
|
|
|
# Temporarily set the environment variable so discover_total_tables can use it |
|
62
|
|
|
_os.environ[ENV_DB_URL] = db_url |
|
63
|
|
|
logger.info("Database connection available for table counting") |
|
64
|
|
|
except Exception as e: |
|
65
|
|
|
logger.warning(f"Could not set database URL for table counting: {e}") |
|
66
|
|
|
|
|
67
|
|
|
# Collect all validation results from existing validation runs |
|
68
|
|
|
collected = collect(ctx) |
|
69
|
|
|
coverage = build_coverage(ctx, collected) |
|
70
|
|
|
final_out_dir = write_outputs(ctx, collected, coverage) |
|
71
|
|
|
generate(ctx) |
|
72
|
|
|
|
|
73
|
|
|
report_path = os.path.join(final_out_dir, 'report.html') |
|
74
|
|
|
logger.info("Pipeline validation report generated successfully", extra={ |
|
75
|
|
|
"report_path": report_path, |
|
76
|
|
|
"run_id": run_id, |
|
77
|
|
|
"total_results": len(collected.get("items", [])) |
|
78
|
|
|
}) |
|
79
|
|
|
|
|
80
|
|
|
|
|
81
|
|
|
except FileNotFoundError as e: |
|
82
|
|
|
logger.warning( |
|
83
|
|
|
f"No validation results found for pipeline validation report | " |
|
84
|
|
|
f"run_id={run_id} | out_dir={out_dir} | error={e} | " |
|
85
|
|
|
f"suggestion=This may be expected if no validation tasks were run during the pipeline" |
|
86
|
|
|
) |
|
87
|
|
|
|
|
88
|
|
|
# Don't raise - this is acceptable if no validations were run |
|
89
|
|
|
except Exception as e: |
|
90
|
|
|
logger.error("Pipeline validation report generation failed", extra={ |
|
91
|
|
|
"run_id": run_id, |
|
92
|
|
|
"error": str(e), |
|
93
|
|
|
"error_type": type(e).__name__ |
|
94
|
|
|
}) |
|
95
|
|
|
raise |
|
96
|
|
|
|
|
97
|
|
|
|
|
98
|
|
|
# Define the task |
|
99
|
|
|
tasks = (generate_validation_report,) |
|
100
|
|
|
|
|
101
|
|
|
|
|
102
|
|
|
class ValidationReport(Dataset): |
|
103
|
|
|
""" |
|
104
|
|
|
Dataset for generating validation reports. |
|
105
|
|
|
|
|
106
|
|
|
This dataset generates a comprehensive HTML validation report by aggregating |
|
107
|
|
|
all validation results from individual dataset validation tasks that were |
|
108
|
|
|
executed during the pipeline run. It should be placed before sanity_checks |
|
109
|
|
|
in the DAG to ensure validation results are collected before final checks. |
|
110
|
|
|
""" |
|
111
|
|
|
#: |
|
112
|
|
|
name: str = "ValidationReport" |
|
113
|
|
|
#: |
|
114
|
|
|
version: str = "0.0.2.dev" |
|
115
|
|
|
|
|
116
|
|
|
def __init__(self, dependencies): |
|
117
|
|
|
super().__init__( |
|
118
|
|
|
name=self.name, |
|
119
|
|
|
version=self.version, |
|
120
|
|
|
dependencies=dependencies, |
|
121
|
|
|
tasks=tasks, |
|
122
|
|
|
) |
|
123
|
|
|
|