Passed
Pull Request — dev (#934)
by
unknown
01:43
created

data.datasets.electricity_demand_timeseries.tools   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 169
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 13
eloc 75
dl 0
loc 169
rs 10
c 0
b 0
f 0

6 Functions

Rating   Name   Duplication   Size   Complexity  
A write_table_to_postgres() 0 23 2
A random_ints_until_sum() 0 11 3
A write_table_to_postgis() 0 25 2
A specific_int_until_sum() 0 7 2
A psql_insert_copy() 0 28 3
A random_point_in_square() 0 30 1
1
from io import StringIO
2
import csv
3
4
from shapely.geometry import Point
5
import geopandas as gpd
6
import numpy as np
7
import pandas as pd
8
9
from egon.data import db
10
11
engine = db.engine()
12
13
14
def random_point_in_square(geom, tol):
15
    """
16
    Generate a random point within a square
17
18
    Parameters
19
    ----------
20
    geom: gpd.Series
21
        Geometries of square
22
    tol: float
23
        tolerance to square bounds
24
25
    Returns
26
    -------
27
    points: gpd.Series
28
        Series of random points
29
    """
30
    # cell bounds - half edge_length to not build buildings on the cell border
31
    xmin = geom.bounds["minx"] + tol / 2
32
    xmax = geom.bounds["maxx"] - tol / 2
33
    ymin = geom.bounds["miny"] + tol / 2
34
    ymax = geom.bounds["maxy"] - tol / 2
35
36
    # generate random coordinates within bounds - half edge_length
37
    x = (xmax - xmin) * np.random.rand(geom.shape[0]) + xmin
38
    y = (ymax - ymin) * np.random.rand(geom.shape[0]) + ymin
39
40
    points = pd.Series([Point(cords) for cords in zip(x, y)])
41
    points = gpd.GeoSeries(points, crs="epsg:3035")
42
43
    return points
44
45
46
# distribute amenities evenly
47
def specific_int_until_sum(s_sum, i_int):
48
    """
49
    Generate list `i_int` summing to `s_sum`. Last value will be <= `i_int`
50
    """
51
    list_i = [] if [s_sum % i_int] == [0] else [s_sum % i_int]
52
    list_i += s_sum // i_int * [i_int]
53
    return list_i
54
55
56
def random_ints_until_sum(s_sum, m_max):
57
    """
58
    Generate non-negative random integers < `m_max` summing to `s_sum`.
59
    """
60
    list_r = []
61
    while s_sum > 0:
62
        r = np.random.randint(1, m_max + 1)
63
        r = r if r <= m_max and r < s_sum else s_sum
64
        list_r.append(r)
65
        s_sum -= r
66
    return list_r
67
68
69
def write_table_to_postgis(df, table, engine, drop=True):
70
    """
71
    Append table
72
    """
73
74
    # Only take in db table defined columns
75
    columns = [column.key for column in table.__table__.columns]
76
    df = df.loc[:, columns]
77
78
    if drop:
79
        table.__table__.drop(bind=engine, checkfirst=True)
80
        table.__table__.create(bind=engine)
81
82
    dtypes = {
83
        i: table.__table__.columns[i].type
84
        for i in table.__table__.columns.keys()
85
    }
86
87
    # Write new buildings incl coord into db
88
    df.to_postgis(
89
        name=table.__tablename__,
90
        con=engine,
91
        if_exists="append",
92
        schema=table.__table_args__["schema"],
93
        dtype=dtypes,
94
    )
95
96
97
# def write_table_to_postgres(df, table, drop=True):
98
#     """"""
99
#
100
#     # Only take in db table defined columns
101
#     columns = [column.key for column in table.__table__.columns]
102
#     df = df.loc[:, columns]
103
#
104
#     if drop:
105
#         table.__table__.drop(bind=engine, checkfirst=True)
106
#         table.__table__.create(bind=engine)
107
#
108
#     # Write peak loads into db
109
#     with db.session_scope() as session:
110
#         session.bulk_insert_mappings(
111
#             table,
112
#             df.to_dict(orient="records"),
113
#         )
114
115
116
def psql_insert_copy(table, conn, keys, data_iter):
117
    """
118
    Execute SQL statement inserting data
119
120
    Parameters
121
    ----------
122
    table : pandas.io.sql.SQLTable
123
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
124
    keys : list of str
125
        Column names
126
    data_iter : Iterable that iterates the values to be inserted
127
    """
128
    # gets a DBAPI connection that can provide a cursor
129
    dbapi_conn = conn.connection
130
    with dbapi_conn.cursor() as cur:
131
        s_buf = StringIO()
132
        writer = csv.writer(s_buf)
133
        writer.writerows(data_iter)
134
        s_buf.seek(0)
135
136
        columns = ", ".join('"{}"'.format(k) for k in keys)
137
        if table.schema:
138
            table_name = "{}.{}".format(table.schema, table.name)
139
        else:
140
            table_name = table.name
141
142
        sql = "COPY {} ({}) FROM STDIN WITH CSV".format(table_name, columns)
143
        cur.copy_expert(sql=sql, file=s_buf)
144
145
146
def write_table_to_postgres(
147
    df, db_table, engine, drop=False, index=False, if_exists="append"
148
):
149
    """"""
150
151
    # Only take in db table defined columns and dtypes
152
    columns = {
153
        column.key: column.type for column in db_table.__table__.columns
154
    }
155
    df = df.loc[:, columns.keys()]
156
157
    if drop:
158
        db_table.__table__.drop(bind=engine, checkfirst=True)
159
        db_table.__table__.create(bind=engine)
160
161
    df.to_sql(
162
        name=db_table.__table__.name,
163
        schema=db_table.__table__.schema,
164
        con=engine,
165
        if_exists=if_exists,
166
        index=index,
167
        method=psql_insert_copy,
168
        dtype=columns,
169
    )
170