Passed
Pull Request — dev (#187)
by Stephan
01:24
created

data.db.airflow_db_connection()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 0
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
from contextlib import contextmanager
2
import os
3
4
from sqlalchemy import create_engine, text
5
from sqlalchemy.orm import sessionmaker
6
import yaml
7
import pandas as pd
8
import geopandas as gpd
9
10
from egon.data import config
11
12
13
def credentials():
14
    """Return local database connection parameters.
15
16
    Returns
17
    -------
18
    dict
19
        Complete DB connection information
20
    """
21
    translated = {
22
        "--database-name": "POSTGRES_DB",
23
        "--database-password": "POSTGRES_PASSWORD",
24
        "--database-host": "HOST",
25
        "--database-port": "PORT",
26
        "--database-user": "POSTGRES_USER",
27
    }
28
    configuration = config.settings()["egon-data"]
29
    update = {
30
        translated[flag]: configuration[flag]
31
        for flag in configuration
32
        if flag in translated
33
    }
34
    configuration.update(update)
35
    return configuration
36
37
38
def engine():
39
    """Engine for local database."""
40
    db_config = credentials()
41
    return create_engine(
42
        f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:"
43
        f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:"
44
        f"{db_config['PORT']}/{db_config['POSTGRES_DB']}",
45
        echo=False,
46
    )
47
48
49
def execute_sql(sql_string):
50
    """Execute a SQL expression given as string.
51
52
    The SQL expression passed as plain string is convert to a
53
    `sqlalchemy.sql.expression.TextClause`.
54
55
    Parameters
56
    ----------
57
    sql_string : str
58
        SQL expression
59
60
    """
61
    engine_local = engine()
62
63
    with engine_local.connect().execution_options(autocommit=True) as con:
64
        con.execute(text(sql_string))
65
66
67
def submit_comment(json, schema, table):
68
    """Add comment to table.
69
70
    We use `Open Energy Metadata <https://github.com/OpenEnergyPlatform/
71
    oemetadata/blob/develop/metadata/v140/metadata_key_description.md>`_
72
    standard for describging our data. Metadata is stored as JSON in the table
73
    comment.
74
75
    Parameters
76
    ----------
77
    json : str
78
        JSON string reflecting comment
79
    schema : str
80
        The target table's database schema
81
    table : str
82
        Database table on which to put the given comment
83
    """
84
    prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table)
85
86
    check_json_str = (
87
        "SELECT obj_description('{0}.{1}'::regclass)::json".format(
88
            schema, table
89
        )
90
    )
91
92
    execute_sql(prefix_str + json + ";")
93
94
    # Query table comment and cast it into JSON
95
    # The query throws an error if JSON is invalid
96
    execute_sql(check_json_str)
97
98
99
@contextmanager
100
def session_scope():
101
    """Provide a transactional scope around a series of operations."""
102
    Session = sessionmaker(bind=engine())
103
    session = Session()
104
    try:
105
        yield session
106
        session.commit()
107
    except:
108
        session.rollback()
109
        raise
110
    finally:
111
        session.close()
112
113
114
def select_dataframe(sql, index_col=None):
115
    """ Select data from local database as pandas.DataFrame
116
117
    Parameters
118
    ----------
119
    sql : str
120
        SQL query to be executed.
121
    index_col : str, optional
122
        Column(s) to set as index(MultiIndex). The default is None.
123
124
    Returns
125
    -------
126
    df : pandas.DataFrame
127
        Data returned from SQL statement.
128
129
    """
130
131
    df = pd.read_sql(sql, engine(), index_col=index_col)
132
133
    if df.size == 0:
134
        print(f"WARNING: No data returned by statement: \n {sql}")
135
136
    return df
137
138
def select_geodataframe(sql, index_col=None, geom_col='geom', epsg=3035):
139
    """ Select data from local database as geopandas.GeoDataFrame
140
141
    Parameters
142
    ----------
143
    sql : str
144
        SQL query to be executed.
145
    index_col : str, optional
146
        Column(s) to set as index(MultiIndex). The default is None.
147
    geom_col : str, optional
148
        column name to convert to shapely geometries. The default is 'geom'.
149
    epsg : int, optional
150
        EPSG code specifying output projection. The default is 3035.
151
152
    Returns
153
    -------
154
    gdf : pandas.DataFrame
155
        Data returned from SQL statement.
156
157
    """
158
159
    gdf = gpd.read_postgis(
160
        sql, engine(), index_col=index_col, geom_col=geom_col
161
        ).to_crs(epsg=epsg)
162
163
    if gdf.size == 0:
164
        print(f"WARNING: No data returned by statement: \n {sql}")
165
166
    return gdf
167