Passed
Pull Request — dev (#826)
by
unknown
01:49
created

data.datasets.electricity_demand_timeseries.tools   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 110
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 11
eloc 54
dl 0
loc 110
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
A write_table_to_postgres() 0 16 3
A random_ints_until_sum() 0 11 3
A write_table_to_postgis() 0 25 2
A specific_int_until_sum() 0 7 2
A random_point_in_square() 0 30 1
1
from shapely.geometry import Point
2
import geopandas as gpd
3
import numpy as np
4
import pandas as pd
5
6
from egon.data import db
7
8
engine = db.engine()
9
10
11
def random_point_in_square(geom, tol):
12
    """
13
    Generate a random point within a square
14
15
    Parameters
16
    ----------
17
    geom: gpd.Series
18
        Geometries of square
19
    tol: float
20
        tolerance to square bounds
21
22
    Returns
23
    -------
24
    points: gpd.Series
25
        Series of random points
26
    """
27
    # cell bounds - half edge_length to not build buildings on the cell border
28
    xmin = geom.bounds["minx"] + tol / 2
29
    xmax = geom.bounds["maxx"] - tol / 2
30
    ymin = geom.bounds["miny"] + tol / 2
31
    ymax = geom.bounds["maxy"] - tol / 2
32
33
    # generate random coordinates within bounds - half edge_length
34
    x = (xmax - xmin) * np.random.rand(geom.shape[0]) + xmin
35
    y = (ymax - ymin) * np.random.rand(geom.shape[0]) + ymin
36
37
    points = pd.Series([Point(cords) for cords in zip(x, y)])
38
    points = gpd.GeoSeries(points, crs="epsg:3035")
39
40
    return points
41
42
43
# distribute amenities evenly
44
def specific_int_until_sum(s_sum, i_int):
45
    """
46
    Generate list `i_int` summing to `s_sum`. Last value will be <= `i_int`
47
    """
48
    list_i = [] if [s_sum % i_int] == [0] else [s_sum % i_int]
49
    list_i += s_sum // i_int * [i_int]
50
    return list_i
51
52
53
def random_ints_until_sum(s_sum, m_max):
54
    """
55
    Generate non-negative random integers < `m_max` summing to `s_sum`.
56
    """
57
    list_r = []
58
    while s_sum > 0:
59
        r = np.random.randint(1, m_max + 1)
60
        r = r if r <= m_max and r < s_sum else s_sum
61
        list_r.append(r)
62
        s_sum -= r
63
    return list_r
64
65
66
def write_table_to_postgis(df, table, drop=True):
67
    """
68
    Append table
69
    """
70
71
    # Only take in db table defined columns
72
    columns = [column.key for column in table.__table__.columns]
73
    df = df.loc[:, columns]
74
75
    if drop:
76
        table.__table__.drop(bind=engine, checkfirst=True)
77
        table.__table__.create(bind=engine)
78
79
    dtypes = {
80
        i: table.__table__.columns[i].type
81
        for i in table.__table__.columns.keys()
82
    }
83
84
    # Write new buildings incl coord into db
85
    df.to_postgis(
86
        name=table.__tablename__,
87
        con=engine,
88
        if_exists="append",
89
        schema=table.__table_args__["schema"],
90
        dtype=dtypes,
91
    )
92
93
94
def write_table_to_postgres(df, table, drop=True):
95
    """"""
96
97
    # Only take in db table defined columns
98
    columns = [column.key for column in table.__table__.columns]
99
    df = df.loc[:, columns]
100
101
    if drop:
102
        table.__table__.drop(bind=engine, checkfirst=True)
103
        table.__table__.create(bind=engine)
104
105
    # Write peak loads into db
106
    with db.session_scope() as session:
107
        session.bulk_insert_mappings(
108
            table,
109
            df.to_dict(orient="records"),
110
        )
111