Passed
Pull Request — dev (#1008)
by
unknown
01:48
created

allocate_home_batteries_to_buildings()   C

Complexity

Conditions 10

Size

Total Lines 105
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 53
dl 0
loc 105
rs 5.7381
c 0
b 0
f 0
cc 10
nop 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like data.datasets.storages.home_batteries.allocate_home_batteries_to_buildings() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from loguru import logger
2
from numpy.random import RandomState
3
from sqlalchemy import Column, Float, Integer, String
4
from sqlalchemy.ext.declarative import declarative_base
5
import numpy as np
6
import pandas as pd
7
8
from egon.data import config, db
9
10
Base = declarative_base()
11
12
13
def get_cbat_pbat_ratio():
14
    """
15
    Mean ratio between the storage capacity and the power of the pv rooftop
16
    system
17
18
    Returns
19
    -------
20
    int
21
        Mean ratio between the storage capacity and the power of the pv
22
        rooftop system
23
    """
24
    sources = config.datasets()["home_batteries"]["sources"]
25
26
    sql = f"""
27
    SELECT max_hours
28
    FROM {sources["etrago_storage"]["schema"]}
29
    .{sources["etrago_storage"]["table"]}
30
    WHERE carrier = 'home_battery'
31
    """
32
33
    return int(db.select_dataframe(sql).iat[0, 0])
34
35
36
def allocate_home_batteries_to_buildings():
37
    """
38
    Allocate home battery storage systems to buildings with pv rooftop systems
39
    """
40
    # get constants
41
    constants = config.datasets()["home_batteries"]["constants"]
42
    scenarios = constants["scenarios"]
43
    cbat_ppv_ratio = constants["cbat_ppv_ratio"]
44
    rtol = constants["rtol"]
45
    max_it = constants["max_it"]
46
    cbat_pbat_ratio = get_cbat_pbat_ratio()
47
48
    sources = config.datasets()["home_batteries"]["sources"]
49
50
    df_list = []
51
52
    for scenario in scenarios:
53
        # get home battery capacity per mv grid id
54
        sql = f"""
55
        SELECT el_capacity as p_nom_min, bus_id as bus FROM
56
        {sources["storage"]["schema"]}
57
        .{sources["storage"]["table"]}
58
        WHERE carrier = 'home_battery'
59
        AND scenario = '{scenario}';
60
        """
61
62
        home_batteries_df = db.select_dataframe(sql)
63
64
        home_batteries_df = home_batteries_df.assign(
65
            bat_cap=home_batteries_df.p_nom_min * cbat_pbat_ratio
66
        )
67
68
        sql = """
69
        SELECT building_id, capacity
70
        FROM supply.egon_power_plants_pv_roof_building
71
        WHERE scenario = '{}'
72
        AND bus_id = {}
73
        """
74
75
        for bus_id, bat_cap in home_batteries_df[
76
            ["bus", "bat_cap"]
77
        ].itertuples(index=False):
78
            pv_df = db.select_dataframe(sql.format(scenario, bus_id))
79
80
            grid_ratio = bat_cap / pv_df.capacity.sum()
81
82
            if grid_ratio > cbat_ppv_ratio:
83
                logger.warning(
84
                    f"In Grid {bus_id} and scenario {scenario}, the ratio of "
85
                    f"home storage capacity to pv rooftop capacity is above 1"
86
                    f" ({grid_ratio: g}). The storage capacity of pv rooftop "
87
                    f"systems will be high."
88
                )
89
90
            if grid_ratio < cbat_ppv_ratio:
91
                random_state = RandomState(seed=bus_id)
92
93
                n = max(int(len(pv_df) * grid_ratio), 1)
94
95
                best_df = pv_df.sample(n=n, random_state=random_state)
96
97
                i = 0
98
99
                while (
100
                    not np.isclose(best_df.capacity.sum(), bat_cap, rtol=rtol)
101
                    and i < max_it
102
                ):
103
                    sample_df = pv_df.sample(n=n, random_state=random_state)
104
105
                    if abs(best_df.capacity.sum() - bat_cap) > abs(
106
                        sample_df.capacity.sum() - bat_cap
107
                    ):
108
                        best_df = sample_df.copy()
109
110
                    i += 1
111
112
                    if sample_df.capacity.sum() < bat_cap:
113
                        n = min(n + 1, len(pv_df))
114
                    else:
115
                        n = max(n - 1, 1)
116
117
                if not np.isclose(best_df.capacity.sum(), bat_cap, rtol=rtol):
118
                    logger.warning(
119
                        f"No suitable generators could be found in Grid "
120
                        f"{bus_id} and scenario {scenario} to achieve the "
121
                        f"desired ratio between battery capacity and pv "
122
                        f"rooftop capacity. The ratio will be "
123
                        f"{bat_cap / best_df.capacity.sum()}."
124
                    )
125
126
                pv_df = best_df.copy()
127
128
            bat_df = pv_df.drop(columns=["capacity"]).assign(
129
                capacity=pv_df.capacity / pv_df.capacity.sum() * bat_cap,
130
                p_nom=pv_df.capacity
131
                / pv_df.capacity.sum()
132
                * bat_cap
133
                / cbat_pbat_ratio,
134
                scenario=scenario,
135
                bus_id=bus_id,
136
            )
137
138
            df_list.append(bat_df)
139
140
    create_table(pd.concat(df_list, ignore_index=True))
141
142
143
class EgonHomeBatteries(Base):
144
    targets = config.datasets()["home_batteries"]["targets"]
145
146
    __tablename__ = targets["home_batteries"]["table"]
147
    __table_args__ = {"schema": targets["home_batteries"]["schema"]}
148
149
    index = Column(Integer, primary_key=True, index=True)
150
    scenario = Column(String)
151
    bus_id = Column(Integer)
152
    building_id = Column(Integer)
153
    p_nom = Column(Float)
154
    capacity = Column(Float)
155
156
157
def create_table(df):
158
    """Create mapping table home battery <-> building id"""
159
    engine = db.engine()
160
161
    EgonHomeBatteries.__table__.drop(bind=engine, checkfirst=True)
162
    EgonHomeBatteries.__table__.create(bind=engine, checkfirst=True)
163
164
    df.reset_index().to_sql(
165
        name=EgonHomeBatteries.__table__.name,
166
        schema=EgonHomeBatteries.__table__.schema,
167
        con=engine,
168
        if_exists="append",
169
        index=False,
170
    )
171