Passed
Pull Request — dev (#187)
by Stephan
01:14
created

data.db.airflow_db_connection()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 0
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
from contextlib import contextmanager
2
import os
3
4
from sqlalchemy import create_engine, text
5
from sqlalchemy.orm import sessionmaker
6
import yaml
7
import pandas as pd
8
import geopandas as gpd
9
10
from egon.data import config
11
12
13
def credentials():
14
    """Return local database connection parameters.
15
16
    Returns
17
    -------
18
    dict
19
        Complete DB connection information
20
    """
21
    translated = {
22
        "--database-name": "POSTGRES_DB",
23
        "--database-password": "POSTGRES_PASSWORD",
24
        "--database-host": "HOST",
25
        "--database-port": "PORT",
26
        "--database-user": "POSTGRES_USER",
27
    }
28
    custom = config.paths(pid="*")[0]
29
    if custom.is_file():
30
        with open(custom) as f:
31
            configuration = yaml.safe_load(f)["egon-data"]
32
        configuration = {
33
            translated[flag]: configuration[flag]
34
            for flag in configuration
35
            if flag in translated
36
        }
37
    return configuration
0 ignored issues
show
introduced by
The variable configuration does not seem to be defined in case custom.is_file() on line 29 is False. Are you sure this can never be the case?
Loading history...
38
39
40
def engine():
41
    """Engine for local database."""
42
    db_config = credentials()
43
    return create_engine(
44
        f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:"
45
        f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:"
46
        f"{db_config['PORT']}/{db_config['POSTGRES_DB']}",
47
        echo=False,
48
    )
49
50
51
def execute_sql(sql_string):
52
    """Execute a SQL expression given as string.
53
54
    The SQL expression passed as plain string is convert to a
55
    `sqlalchemy.sql.expression.TextClause`.
56
57
    Parameters
58
    ----------
59
    sql_string : str
60
        SQL expression
61
62
    """
63
    engine_local = engine()
64
65
    with engine_local.connect().execution_options(autocommit=True) as con:
66
        con.execute(text(sql_string))
67
68
69
def submit_comment(json, schema, table):
70
    """Add comment to table.
71
72
    We use `Open Energy Metadata <https://github.com/OpenEnergyPlatform/
73
    oemetadata/blob/develop/metadata/v140/metadata_key_description.md>`_
74
    standard for describging our data. Metadata is stored as JSON in the table
75
    comment.
76
77
    Parameters
78
    ----------
79
    json : str
80
        JSON string reflecting comment
81
    schema : str
82
        The target table's database schema
83
    table : str
84
        Database table on which to put the given comment
85
    """
86
    prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table)
87
88
    check_json_str = (
89
        "SELECT obj_description('{0}.{1}'::regclass)::json".format(
90
            schema, table
91
        )
92
    )
93
94
    execute_sql(prefix_str + json + ";")
95
96
    # Query table comment and cast it into JSON
97
    # The query throws an error if JSON is invalid
98
    execute_sql(check_json_str)
99
100
101
@contextmanager
102
def session_scope():
103
    """Provide a transactional scope around a series of operations."""
104
    Session = sessionmaker(bind=engine())
105
    session = Session()
106
    try:
107
        yield session
108
        session.commit()
109
    except:
110
        session.rollback()
111
        raise
112
    finally:
113
        session.close()
114
115
116
def select_dataframe(sql, index_col=None):
117
    """ Select data from local database as pandas.DataFrame
118
119
    Parameters
120
    ----------
121
    sql : str
122
        SQL query to be executed.
123
    index_col : str, optional
124
        Column(s) to set as index(MultiIndex). The default is None.
125
126
    Returns
127
    -------
128
    df : pandas.DataFrame
129
        Data returned from SQL statement.
130
131
    """
132
133
    df = pd.read_sql(sql, engine(), index_col=index_col)
134
135
    if df.size == 0:
136
        print(f"WARNING: No data returned by statement: \n {sql}")
137
138
    return df
139
140
def select_geodataframe(sql, index_col=None, geom_col='geom', epsg=3035):
141
    """ Select data from local database as geopandas.GeoDataFrame
142
143
    Parameters
144
    ----------
145
    sql : str
146
        SQL query to be executed.
147
    index_col : str, optional
148
        Column(s) to set as index(MultiIndex). The default is None.
149
    geom_col : str, optional
150
        column name to convert to shapely geometries. The default is 'geom'.
151
    epsg : int, optional
152
        EPSG code specifying output projection. The default is 3035.
153
154
    Returns
155
    -------
156
    gdf : pandas.DataFrame
157
        Data returned from SQL statement.
158
159
    """
160
161
    gdf = gpd.read_postgis(
162
        sql, engine(), index_col=index_col, geom_col=geom_col
163
        ).to_crs(epsg=epsg)
164
165
    if gdf.size == 0:
166
        print(f"WARNING: No data returned by statement: \n {sql}")
167
168
    return gdf
169