| Conditions | 4 |
| Total Lines | 75 |
| Code Lines | 39 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | """ |
||
| 21 | def generate_validation_report(**kwargs): |
||
| 22 | """ |
||
| 23 | Generate validation report aggregating all validation results. |
||
| 24 | |||
| 25 | This function collects all validation results from individual dataset |
||
| 26 | validation tasks that were executed during the pipeline run and generates |
||
| 27 | a comprehensive HTML report including: |
||
| 28 | - All validation results from individual dataset tasks |
||
| 29 | - Coverage analysis showing which tables were validated |
||
| 30 | - Summary statistics and pass/fail counts |
||
| 31 | """ |
||
| 32 | # Use same run_id as other validation tasks in the pipeline |
||
| 33 | # This ensures all tasks read/write to the same directory |
||
| 34 | run_id = ( |
||
| 35 | os.environ.get('AIRFLOW_CTX_DAG_RUN_ID') or |
||
| 36 | kwargs.get('run_id') or |
||
| 37 | (kwargs.get('ti') and hasattr(kwargs['ti'], 'dag_run') and kwargs['ti'].dag_run.run_id) or |
||
| 38 | (kwargs.get('dag_run') and kwargs['dag_run'].run_id) or |
||
| 39 | f"pipeline_validation_report_{int(time.time())}" |
||
| 40 | ) |
||
| 41 | |||
| 42 | # Determine output directory at runtime (not import time) |
||
| 43 | # Priority: EGON_VALIDATION_DIR env var > current working directory |
||
| 44 | out_dir = os.path.join( |
||
| 45 | os.environ.get('EGON_VALIDATION_DIR', os.getcwd()), |
||
| 46 | "validation_runs" |
||
| 47 | ) |
||
| 48 | |||
| 49 | try: |
||
| 50 | ctx = RunContext(run_id=run_id, source="airflow", out_dir=out_dir) |
||
| 51 | logger.info("Starting pipeline validation report generation", extra={ |
||
| 52 | "run_id": run_id, |
||
| 53 | "output_dir": out_dir |
||
| 54 | }) |
||
| 55 | |||
| 56 | # Make database connection available for table counting |
||
| 57 | # Set the database URL from egon.data configuration |
||
| 58 | try: |
||
| 59 | # Get the database URL from egon.data |
||
| 60 | db_url = str(egon_db.engine().url) |
||
| 61 | # Temporarily set the environment variable so discover_total_tables can use it |
||
| 62 | _os.environ[ENV_DB_URL] = db_url |
||
| 63 | logger.info("Database connection available for table counting") |
||
| 64 | except Exception as e: |
||
| 65 | logger.warning(f"Could not set database URL for table counting: {e}") |
||
| 66 | |||
| 67 | # Collect all validation results from existing validation runs |
||
| 68 | collected = collect(ctx) |
||
| 69 | coverage = build_coverage(ctx, collected) |
||
| 70 | final_out_dir = write_outputs(ctx, collected, coverage) |
||
| 71 | generate(ctx) |
||
| 72 | |||
| 73 | report_path = os.path.join(final_out_dir, 'report.html') |
||
| 74 | logger.info("Pipeline validation report generated successfully", extra={ |
||
| 75 | "report_path": report_path, |
||
| 76 | "run_id": run_id, |
||
| 77 | "total_results": len(collected.get("items", [])) |
||
| 78 | }) |
||
| 79 | |||
| 80 | |||
| 81 | except FileNotFoundError as e: |
||
| 82 | logger.warning( |
||
| 83 | f"No validation results found for pipeline validation report | " |
||
| 84 | f"run_id={run_id} | out_dir={out_dir} | error={e} | " |
||
| 85 | f"suggestion=This may be expected if no validation tasks were run during the pipeline" |
||
| 86 | ) |
||
| 87 | |||
| 88 | # Don't raise - this is acceptable if no validations were run |
||
| 89 | except Exception as e: |
||
| 90 | logger.error("Pipeline validation report generation failed", extra={ |
||
| 91 | "run_id": run_id, |
||
| 92 | "error": str(e), |
||
| 93 | "error_type": type(e).__name__ |
||
| 94 | }) |
||
| 95 | raise |
||
| 96 | |||
| 123 |