Total Complexity | 2 |
Total Lines | 46 |
Duplicated Lines | 0 % |
Changes | 0 |
1 | """The central module to create low flex scenarios |
||
2 | |||
3 | """ |
||
4 | from airflow.models import Connection |
||
5 | from airflow.operators.postgres_operator import PostgresOperator |
||
6 | from airflow.settings import Session |
||
7 | from importlib_resources import files |
||
8 | from sqlalchemy.ext.declarative import declarative_base |
||
9 | |||
10 | from egon.data.datasets import Dataset |
||
11 | |||
12 | Base = declarative_base() |
||
13 | |||
14 | |||
15 | def ensure_postgres_connection(): |
||
16 | session = Session() |
||
17 | |||
18 | conn = ( |
||
19 | session.query(Connection) |
||
20 | .filter(Connection.conn_id == "egon_data") |
||
21 | .one_or_none() |
||
22 | ) |
||
23 | |||
24 | conn.conn_type = "postgres" |
||
25 | |||
26 | session.commit() |
||
27 | session.close() |
||
28 | |||
29 | |||
30 | class LowFlexScenario(Dataset): |
||
31 | def __init__(self, dependencies): |
||
32 | ensure_postgres_connection() |
||
33 | super().__init__( |
||
34 | name="low_flex_scenario", |
||
35 | version="0.0.1", |
||
36 | dependencies=dependencies, |
||
37 | tasks=( |
||
38 | { |
||
39 | PostgresOperator( |
||
40 | task_id="low_flex_eGon2035", |
||
41 | sql=files(__name__) |
||
42 | .joinpath("low_flex_eGon2035.sql") |
||
43 | .read_text(encoding="utf-8"), |
||
44 | postgres_conn_id="egon_data", |
||
45 | autocommit=True, |
||
46 | ), |
||
50 |