Passed
Push — dev ( b05edb...d00529 )
by Stephan
01:36 queued 12s
created

data.db.airflow_db_connection()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 0
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
from contextlib import contextmanager
2
3
from sqlalchemy import create_engine, text
4
from sqlalchemy.orm import sessionmaker
5
import pandas as pd
6
import geopandas as gpd
7
8
from egon.data import config
9
10
11
def credentials():
12
    """Return local database connection parameters.
13
14
    Returns
15
    -------
16
    dict
17
        Complete DB connection information
18
    """
19
    translated = {
20
        "--database-name": "POSTGRES_DB",
21
        "--database-password": "POSTGRES_PASSWORD",
22
        "--database-host": "HOST",
23
        "--database-port": "PORT",
24
        "--database-user": "POSTGRES_USER",
25
    }
26
    configuration = config.settings()["egon-data"]
27
    update = {
28
        translated[flag]: configuration[flag]
29
        for flag in configuration
30
        if flag in translated
31
    }
32
    configuration.update(update)
33
    return configuration
34
35
36
def engine():
37
    """Engine for local database."""
38
    db_config = credentials()
39
    return create_engine(
40
        f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:"
41
        f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:"
42
        f"{db_config['PORT']}/{db_config['POSTGRES_DB']}",
43
        echo=False,
44
    )
45
46
47
def execute_sql(sql_string):
48
    """Execute a SQL expression given as string.
49
50
    The SQL expression passed as plain string is convert to a
51
    `sqlalchemy.sql.expression.TextClause`.
52
53
    Parameters
54
    ----------
55
    sql_string : str
56
        SQL expression
57
58
    """
59
    engine_local = engine()
60
61
    with engine_local.connect().execution_options(autocommit=True) as con:
62
        con.execute(text(sql_string))
63
64
65
def submit_comment(json, schema, table):
66
    """Add comment to table.
67
68
    We use `Open Energy Metadata <https://github.com/OpenEnergyPlatform/
69
    oemetadata/blob/develop/metadata/v140/metadata_key_description.md>`_
70
    standard for describging our data. Metadata is stored as JSON in the table
71
    comment.
72
73
    Parameters
74
    ----------
75
    json : str
76
        JSON string reflecting comment
77
    schema : str
78
        The target table's database schema
79
    table : str
80
        Database table on which to put the given comment
81
    """
82
    prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table)
83
84
    check_json_str = (
85
        "SELECT obj_description('{0}.{1}'::regclass)::json".format(
86
            schema, table
87
        )
88
    )
89
90
    execute_sql(prefix_str + json + ";")
91
92
    # Query table comment and cast it into JSON
93
    # The query throws an error if JSON is invalid
94
    execute_sql(check_json_str)
95
96
97
@contextmanager
98
def session_scope():
99
    """Provide a transactional scope around a series of operations."""
100
    Session = sessionmaker(bind=engine())
101
    session = Session()
102
    try:
103
        yield session
104
        session.commit()
105
    except:
106
        session.rollback()
107
        raise
108
    finally:
109
        session.close()
110
111
112
def select_dataframe(sql, index_col=None):
113
    """ Select data from local database as pandas.DataFrame
114
115
    Parameters
116
    ----------
117
    sql : str
118
        SQL query to be executed.
119
    index_col : str, optional
120
        Column(s) to set as index(MultiIndex). The default is None.
121
122
    Returns
123
    -------
124
    df : pandas.DataFrame
125
        Data returned from SQL statement.
126
127
    """
128
129
    df = pd.read_sql(sql, engine(), index_col=index_col)
130
131
    if df.size == 0:
132
        print(f"WARNING: No data returned by statement: \n {sql}")
133
134
    return df
135
136
def select_geodataframe(sql, index_col=None, geom_col='geom', epsg=3035):
137
    """ Select data from local database as geopandas.GeoDataFrame
138
139
    Parameters
140
    ----------
141
    sql : str
142
        SQL query to be executed.
143
    index_col : str, optional
144
        Column(s) to set as index(MultiIndex). The default is None.
145
    geom_col : str, optional
146
        column name to convert to shapely geometries. The default is 'geom'.
147
    epsg : int, optional
148
        EPSG code specifying output projection. The default is 3035.
149
150
    Returns
151
    -------
152
    gdf : pandas.DataFrame
153
        Data returned from SQL statement.
154
155
    """
156
157
    gdf = gpd.read_postgis(
158
        sql, engine(), index_col=index_col, geom_col=geom_col
159
        ).to_crs(epsg=epsg)
160
161
    if gdf.size == 0:
162
        print(f"WARNING: No data returned by statement: \n {sql}")
163
164
    return gdf
165