storage_level_constraint()   F
last analyzed

Complexity

Conditions 11

Size

Total Lines 316
Code Lines 196

Duplication

Lines 143
Ratio 45.25 %

Importance

Changes 0
Metric Value
eloc 196
dl 143
loc 316
rs 3.78
c 0
b 0
f 0
cc 11
nop 6

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like solph.constraints.storage_level.storage_level_constraint() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""A constraint to allow flows from to a storage based on the storage level.
2
3
SPDX-FileCopyrightText: Patrik Schönfeldt
4
SPDX-FileCopyrightText: Johannes Kochems
5
SPDX-FileCopyrightText: Deutsches Zentrum für Luft- und Raumfahrt (DLR)
6
7
SPDX-License-Identifier: MIT
8
"""
9
10
from pyomo import environ as po
11
12
13
def storage_level_constraint(
14
    model,
15
    name,
16
    storage_component,
17
    multiplexer_bus,
18
    input_levels,
19
    output_levels,
20
):
21
    r"""
22
    Add constraints to implement storage content based access.
23
24
    As a GenericStorage just allows exactly one input and one output,
25
    an additional bus, the multiplexer_bus, is used for the connections.
26
    Note that all Flow objects connected to the multiplexer_bus have to have
27
    a nominal_capacity.
28
29
    Parameters
30
    ----------
31
    model : oemof.solph.Model
32
        Model to which the constraint is added.
33
    name : string
34
        Name of the multiplexer.
35
    storage_component : oemof.solph.components.GenericStorage
36
        Storage component whose content should mandate
37
        the possible inputs and outputs.
38
    multiplexer_bus : oemof.solph.Bus
39
        Bus which connects the input and output levels to the storage.
40
    input_levels : dictionary with oemof.solph.Bus as keys and float as values
41
        Dictionary of buses which act as inputs and corresponding levels
42
    output_levels : dictionary with oemof.solph.Bus as keys and float as values
43
        Dictionary of buses which act as outputs and corresponding level
44
45
    Verbose description can be found in https://arxiv.org/abs/2211.14080
46
    """
47
48
    def _outputs():
49
        OUTPUTS = po.Set(initialize=output_levels.keys())
50
        setattr(model, f"{name}_OUTPUTS", OUTPUTS)
51
52
        active_output = po.Var(
53
            OUTPUTS, model.TIMESTEPS, domain=po.Binary, bounds=(0, 1)
54
        )
55
        setattr(model, f"{name}_active_output", active_output)
56
57
        constraint_name = f"{name}_output_active_constraint"
58
59
        def _output_active_rule(m):
60
            r"""
61
            .. math::
62
                y_n \le E(t) / E_n
63
            """
64
            for t in m.TIMESTEPS:
65
                for o in output_levels:
66
                    getattr(m, constraint_name).add(
67
                        (o, t),
68
                        m.GenericStorageBlock.storage_content[
69
                            storage_component, t + 1
70
                        ]
71
                        / storage_component.nominal_storage_capacity
72
                        >= active_output[o, t] * output_levels[o],
73
                    )
74
75
        setattr(
76
            model,
77
            constraint_name,
78
            po.Constraint(
79
                OUTPUTS,
80
                model.TIMESTEPS,
81
                noruleinit=True,
82
            ),
83
        )
84
        setattr(
85
            model,
86
            constraint_name + "build",
87
            po.BuildAction(rule=_output_active_rule),
88
        )
89
90
        # Define constraints on the output flows
91
        def _constraint_output_rule(m, o, t):
92
            return (
93
                m.flow[multiplexer_bus, o, t]
94
                / m.flows[multiplexer_bus, o].nominal_capacity
95
                <= active_output[o, t]
96
            )
97
98
        setattr(
99
            model,
100
            f"{name}_output_constraint",
101
            po.Constraint(
102
                OUTPUTS,
103
                model.TIMESTEPS,
104
                rule=_constraint_output_rule,
105
            ),
106
        )
107
108 View Code Duplication
    def _outputs_tsam():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
109
        OUTPUTS = po.Set(initialize=output_levels.keys())
110
        setattr(model, f"{name}_OUTPUTS", OUTPUTS)
111
112
        active_output = po.Var(
113
            OUTPUTS,
114
            model.TIMEINDEX_TYPICAL_CLUSTER_OFFSET,
115
            domain=po.Binary,
116
            bounds=(0, 1),
117
        )
118
        setattr(model, f"{name}_active_output", active_output)
119
120
        constraint_name = f"{name}_output_active_constraint"
121
122
        def _output_active_rule(m):
123
            r"""
124
            .. math::
125
                y_n \le E(t) / E_n
126
            """
127
            for p, i, g in m.TIMEINDEX_CLUSTER:
128
                k = m.es.tsa_parameters[p]["order"][i]
129
                t = m.get_timestep_from_tsam_timestep(p, k, g)
130
                tk = m.get_timestep_from_tsam_timestep(p, k, g)
131
                for o in output_levels:
132
                    getattr(m, constraint_name).add(
133
                        (o, p, i, g),
134
                        (
135
                            m.GenericStorageBlock.intra_storage_delta[
136
                                storage_component, p, k, g + 1
137
                            ]
138
                            + m.GenericStorageBlock.inter_storage_content[
139
                                storage_component, i
140
                            ]
141
                            * (1 - storage_component.loss_rate[t])
142
                            ** (g * m.timeincrement[tk])
143
                        )
144
                        / storage_component.nominal_storage_capacity
145
                        >= active_output[o, p, k, g] * output_levels[o],
146
                    )
147
148
        setattr(
149
            model,
150
            constraint_name,
151
            po.Constraint(
152
                OUTPUTS,
153
                model.TIMEINDEX_CLUSTER,
154
                noruleinit=True,
155
            ),
156
        )
157
        setattr(
158
            model,
159
            constraint_name + "build",
160
            po.BuildAction(rule=_output_active_rule),
161
        )
162
163
        # Define constraints on the output flows
164
        def _constraint_output_rule(m, o, p, k, g):
165
            t = m.get_timestep_from_tsam_timestep(p, k, g)
166
            return (
167
                m.flow[multiplexer_bus, o, p, t]
168
                / m.flows[multiplexer_bus, o].nominal_value
169
                <= active_output[o, p, k, g]
170
            )
171
172
        setattr(
173
            model,
174
            f"{name}_output_constraint",
175
            po.Constraint(
176
                OUTPUTS,
177
                model.TIMEINDEX_TYPICAL_CLUSTER,
178
                rule=_constraint_output_rule,
179
            ),
180
        )
181
182
    if not model.TSAM_MODE:
183
        _outputs()
184
    else:
185
        _outputs_tsam()
186
187
    def _inputs():
188
        INPUTS = po.Set(initialize=input_levels.keys())
189
        setattr(model, f"{name}_INPUTS", INPUTS)
190
191
        inactive_input = po.Var(
192
            INPUTS, model.TIMESTEPS, domain=po.Binary, bounds=(0, 1)
193
        )
194
        setattr(model, f"{name}_active_input", inactive_input)
195
196
        constraint_name = f"{name}_input_active_constraint"
197
198
        def _input_active_rule(m):
199
            r"""
200
            .. math::
201
                \hat{y}_n \ge (E(t) - E_n) / E_{max}
202
            """
203
            for t in m.TIMESTEPS:
204
                for i in input_levels:
205
                    getattr(m, constraint_name).add(
206
                        (i, t),
207
                        (
208
                            m.GenericStorageBlock.storage_content[
209
                                storage_component, t
210
                            ]
211
                            / storage_component.nominal_storage_capacity
212
                            - input_levels[i]
213
                        )
214
                        <= inactive_input[i, t],
215
                    )
216
217
        setattr(
218
            model,
219
            constraint_name,
220
            po.Constraint(
221
                INPUTS,
222
                model.TIMESTEPS,
223
                noruleinit=True,
224
            ),
225
        )
226
        setattr(
227
            model,
228
            constraint_name + "build",
229
            po.BuildAction(rule=_input_active_rule),
230
        )
231
232
        # Define constraints on the input flows
233
        def _constraint_input_rule(m, i, t):
234
            return (
235
                m.flow[i, multiplexer_bus, t]
236
                / m.flows[i, multiplexer_bus].nominal_capacity
237
                <= 1 - inactive_input[i, t]
238
            )
239
240
        setattr(
241
            model,
242
            f"{name}_input_constraint",
243
            po.Constraint(
244
                INPUTS,
245
                model.TIMESTEPS,
246
                rule=_constraint_input_rule,
247
            ),
248
        )
249
250 View Code Duplication
    def _inputs_tsam():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
251
        INPUTS = po.Set(initialize=input_levels.keys())
252
        setattr(model, f"{name}_INPUTS", INPUTS)
253
254
        inactive_input = po.Var(
255
            INPUTS,
256
            model.TIMEINDEX_TYPICAL_CLUSTER_OFFSET,
257
            domain=po.Binary,
258
            bounds=(0, 1),
259
        )
260
        setattr(model, f"{name}_active_input", inactive_input)
261
262
        constraint_name = f"{name}_input_active_constraint"
263
264
        def _input_active_rule(m):
265
            r"""
266
            .. math::
267
                \hat{y}_n \ge (E(t) - E_n) / E_{max}
268
            """
269
            for p, i, g in m.TIMEINDEX_CLUSTER:
270
                k = m.es.tsa_parameters[p]["order"][i]
271
                t = m.get_timestep_from_tsam_timestep(p, k, g)
272
                tk = m.get_timestep_from_tsam_timestep(p, k, g)
273
                for inp in input_levels:
274
                    getattr(m, constraint_name).add(
275
                        (inp, p, i, g),
276
                        (
277
                            m.GenericStorageBlock.intra_storage_delta[
278
                                storage_component, p, k, g + 1
279
                            ]
280
                            + m.GenericStorageBlock.inter_storage_content[
281
                                storage_component, i
282
                            ]
283
                            * (1 - storage_component.loss_rate[t])
284
                            ** (g * m.timeincrement[tk])
285
                        )
286
                        / storage_component.nominal_storage_capacity
287
                        - input_levels[inp]
288
                        <= inactive_input[inp, p, k, g],
289
                    )
290
291
        setattr(
292
            model,
293
            constraint_name,
294
            po.Constraint(
295
                INPUTS,
296
                model.TIMEINDEX_CLUSTER,
297
                noruleinit=True,
298
            ),
299
        )
300
        setattr(
301
            model,
302
            constraint_name + "build",
303
            po.BuildAction(rule=_input_active_rule),
304
        )
305
306
        # Define constraints on the input flows
307
        def _constraint_input_rule(m, i, p, k, g):
308
            t = m.get_timestep_from_tsam_timestep(p, k, g)
309
            return (
310
                m.flow[i, multiplexer_bus, p, t]
311
                / m.flows[i, multiplexer_bus].nominal_value
312
                <= 1 - inactive_input[i, p, k, g]
313
            )
314
315
        setattr(
316
            model,
317
            f"{name}_input_constraint",
318
            po.Constraint(
319
                INPUTS,
320
                model.TIMEINDEX_TYPICAL_CLUSTER,
321
                rule=_constraint_input_rule,
322
            ),
323
        )
324
325
    if not model.TSAM_MODE:
326
        _inputs()
327
    else:
328
        _inputs_tsam()
329