Passed
Push — dev ( 836244...247d11 )
by Stephan
01:23 queued 11s
created

data.db.session_scoped()   A

Complexity

Conditions 2

Size

Total Lines 25
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 1
dl 0
loc 25
rs 10
c 0
b 0
f 0
1
from contextlib import contextmanager
2
import functools
3
4
from sqlalchemy import create_engine, text
5
from sqlalchemy.orm import sessionmaker
6
import pandas as pd
7
import geopandas as gpd
8
9
from egon.data import config
10
11
12
def credentials():
13
    """Return local database connection parameters.
14
15
    Returns
16
    -------
17
    dict
18
        Complete DB connection information
19
    """
20
    translated = {
21
        "--database-name": "POSTGRES_DB",
22
        "--database-password": "POSTGRES_PASSWORD",
23
        "--database-host": "HOST",
24
        "--database-port": "PORT",
25
        "--database-user": "POSTGRES_USER",
26
    }
27
    configuration = config.settings()["egon-data"]
28
    update = {
29
        translated[flag]: configuration[flag]
30
        for flag in configuration
31
        if flag in translated
32
    }
33
    configuration.update(update)
34
    return configuration
35
36
37
def engine():
38
    """Engine for local database."""
39
    db_config = credentials()
40
    return create_engine(
41
        f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:"
42
        f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:"
43
        f"{db_config['PORT']}/{db_config['POSTGRES_DB']}",
44
        echo=False,
45
    )
46
47
48
def execute_sql(sql_string):
49
    """Execute a SQL expression given as string.
50
51
    The SQL expression passed as plain string is convert to a
52
    `sqlalchemy.sql.expression.TextClause`.
53
54
    Parameters
55
    ----------
56
    sql_string : str
57
        SQL expression
58
59
    """
60
    engine_local = engine()
61
62
    with engine_local.connect().execution_options(autocommit=True) as con:
63
        con.execute(text(sql_string))
64
65
66
def submit_comment(json, schema, table):
67
    """Add comment to table.
68
69
    We use `Open Energy Metadata <https://github.com/OpenEnergyPlatform/
70
    oemetadata/blob/develop/metadata/v140/metadata_key_description.md>`_
71
    standard for describging our data. Metadata is stored as JSON in the table
72
    comment.
73
74
    Parameters
75
    ----------
76
    json : str
77
        JSON string reflecting comment
78
    schema : str
79
        The target table's database schema
80
    table : str
81
        Database table on which to put the given comment
82
    """
83
    prefix_str = "COMMENT ON TABLE {0}.{1} IS ".format(schema, table)
84
85
    check_json_str = (
86
        "SELECT obj_description('{0}.{1}'::regclass)::json".format(
87
            schema, table
88
        )
89
    )
90
91
    execute_sql(prefix_str + json + ";")
92
93
    # Query table comment and cast it into JSON
94
    # The query throws an error if JSON is invalid
95
    execute_sql(check_json_str)
96
97
98
@contextmanager
99
def session_scope():
100
    """Provide a transactional scope around a series of operations."""
101
    Session = sessionmaker(bind=engine())
102
    session = Session()
103
    try:
104
        yield session
105
        session.commit()
106
    except:
107
        session.rollback()
108
        raise
109
    finally:
110
        session.close()
111
112
113
def session_scoped(function):
114
    """Provide a session scope to a function.
115
116
    Can be used as a decorator like this:
117
118
    >>> @session_scoped
119
    ... def get_bind(session):
120
    ...     return session.get_bind()
121
    ...
122
    >>> get_bind()
123
    Engine(postgresql+psycopg2://egon:***@127.0.0.1:59734/egon-data)
124
125
    Note that the decorated function needs to accept a parameter named
126
    `session`, but is called without supplying a value for that parameter
127
    because the parameter's value will be filled in by `session_scoped`.
128
    Using this decorator allows saving an indentation level when defining
129
    such functions but it also has other usages.
130
    """
131
132
    @functools.wraps(function)
133
    def wrapped(*xs, **ks):
134
        with session_scope() as session:
135
            return function(session=session, *xs, **ks)
136
137
    return wrapped
138
139
140
def select_dataframe(sql, index_col=None):
141
    """ Select data from local database as pandas.DataFrame
142
143
    Parameters
144
    ----------
145
    sql : str
146
        SQL query to be executed.
147
    index_col : str, optional
148
        Column(s) to set as index(MultiIndex). The default is None.
149
150
    Returns
151
    -------
152
    df : pandas.DataFrame
153
        Data returned from SQL statement.
154
155
    """
156
157
    df = pd.read_sql(sql, engine(), index_col=index_col)
158
159
    if df.size == 0:
160
        print(f"WARNING: No data returned by statement: \n {sql}")
161
162
    return df
163
164
def select_geodataframe(sql, index_col=None, geom_col='geom', epsg=3035):
165
    """ Select data from local database as geopandas.GeoDataFrame
166
167
    Parameters
168
    ----------
169
    sql : str
170
        SQL query to be executed.
171
    index_col : str, optional
172
        Column(s) to set as index(MultiIndex). The default is None.
173
    geom_col : str, optional
174
        column name to convert to shapely geometries. The default is 'geom'.
175
    epsg : int, optional
176
        EPSG code specifying output projection. The default is 3035.
177
178
    Returns
179
    -------
180
    gdf : pandas.DataFrame
181
        Data returned from SQL statement.
182
183
    """
184
185
    gdf = gpd.read_postgis(
186
        sql, engine(), index_col=index_col, geom_col=geom_col
187
        ).to_crs(epsg=epsg)
188
189
    if gdf.size == 0:
190
        print(f"WARNING: No data returned by statement: \n {sql}")
191
192
    return gdf
193