Passed
Pull Request — dev (#1345)
by
unknown
02:03
created

ensure_postgres_connection()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 13
rs 9.95
c 0
b 0
f 0
cc 1
nop 0
1
"""The central module to create low flex scenarios
2
3
"""
4
from airflow.models import Connection
5
from airflow.operators.postgres_operator import PostgresOperator
6
from airflow.settings import Session
7
from importlib_resources import files
8
from sqlalchemy.ext.declarative import declarative_base
9
10
from egon.data.datasets import Dataset
11
12
Base = declarative_base()
13
14
15
def ensure_postgres_connection():
16
    session = Session()
17
18
    conn = (
19
        session.query(Connection)
20
        .filter(Connection.conn_id == "egon_data")
21
        .one_or_none()
22
    )
23
24
    conn.conn_type = "postgres"
25
26
    session.commit()
27
    session.close()
28
29
30
class LowFlexScenario(Dataset):
31
    def __init__(self, dependencies):
32
        ensure_postgres_connection()
33
        super().__init__(
34
            name="low_flex_scenario",
35
            version="0.0.1",
36
            dependencies=dependencies,
37
            tasks=(
38
                {
39
                    PostgresOperator(
40
                        task_id="low_flex_eGon2035",
41
                        sql=files(__name__)
42
                        .joinpath("low_flex_eGon2035.sql")
43
                        .read_text(encoding="utf-8"),
44
                        postgres_conn_id="egon_data",
45
                        autocommit=True,
46
                    ),
47
                },
48
            ),
49
        )
50