Passed
Pull Request — dev (#1183)
by Patrik
04:02
created

solph.flows._simple_flow_block   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 397
Duplicated Lines 4.53 %

Importance

Changes 0
Metric Value
wmc 27
eloc 183
dl 18
loc 397
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
B SimpleFlowBlock._create_variables() 0 61 6
A SimpleFlowBlock._create() 0 16 2
A SimpleFlowBlock.__init__() 0 2 1
A SimpleFlowBlock._create_sets() 0 55 1
C SimpleFlowBlock._create_constraints() 0 148 9
B SimpleFlowBlock._objective_expression() 18 55 8

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
3
"""Creating sets, variables, constraints and parts of the objective function
4
for Flow objects with neither nonconvex nor investment options.
5
6
SPDX-FileCopyrightText: Uwe Krien <[email protected]>
7
SPDX-FileCopyrightText: Simon Hilpert
8
SPDX-FileCopyrightText: Cord Kaldemeyer
9
SPDX-FileCopyrightText: Stephan Günther
10
SPDX-FileCopyrightText: Birgit Schachler
11
SPDX-FileCopyrightText: jnnr
12
SPDX-FileCopyrightText: jmloenneberga
13
SPDX-FileCopyrightText: Pierre-François Duc
14
SPDX-FileCopyrightText: Saeed Sayadi
15
SPDX-FileCopyrightText: Johannes Kochems
16
17
SPDX-License-Identifier: MIT
18
19
"""
20
from pyomo.core import BuildAction
21
from pyomo.core import Constraint
22
from pyomo.core import Expression
23
from pyomo.core import NonNegativeIntegers
24
from pyomo.core import NonNegativeReals
25
from pyomo.core import Set
26
from pyomo.core import Var
27
from pyomo.core.base.block import ScalarBlock
28
29
from oemof.solph._plumbing import valid_sequence
30
31
32
class SimpleFlowBlock(ScalarBlock):
33
    r"""Flow block with definitions for standard flows.
34
35
    See :class:`~oemof.solph.flows._flow.Flow` class for all parameters of the
36
    *Flow*.
37
38
    .. automethod:: _create_constraints
39
    .. automethod:: _create_variables
40
    .. automethod:: _create_sets
41
42
    .. automethod:: _objective_expression
43
44
    Note
45
    ----
46
    See the :class:`~oemof.solph.flows._flow.Flow` class for the definition of
47
    all parameters from the "List of Parameters above.
48
49
    """  # noqa: E501
50
51
    def __init__(self, *args, **kwargs):
52
        super().__init__(*args, **kwargs)
53
54
    def _create(self, group=None):
55
        r"""Creates sets, variables and constraints for all standard flows.
56
57
        Parameters
58
        ----------
59
        group : list
60
            List containing tuples containing flow (f) objects and the
61
            associated source (s) and target (t)
62
            of flow e.g. groups=[(s1, t1, f1), (s2, t2, f2),..]
63
        """
64
        if group is None:
65
            return None
66
67
        self._create_sets(group)
68
        self._create_variables(group)
69
        self._create_constraints()
70
71
    def _create_sets(self, group):
72
        """
73
        Creates all sets for standard flows.
74
        """
75
        self.FULL_LOAD_TIME_MAX_FLOWS = Set(
76
            initialize=[
77
                (g[0], g[1])
78
                for g in group
79
                if g[2].full_load_time_max is not None
80
                and g[2].nominal_capacity is not None
81
            ]
82
        )
83
84
        self.FULL_LOAD_TIME_MIN_FLOWS = Set(
85
            initialize=[
86
                (g[0], g[1])
87
                for g in group
88
                if g[2].full_load_time_min is not None
89
                and g[2].nominal_capacity is not None
90
            ]
91
        )
92
93
        self.NEGATIVE_GRADIENT_FLOWS = Set(
94
            initialize=[
95
                (g[0], g[1])
96
                for g in group
97
                if g[2].negative_gradient_limit[0] is not None
98
            ]
99
        )
100
101
        self.POSITIVE_GRADIENT_FLOWS = Set(
102
            initialize=[
103
                (g[0], g[1])
104
                for g in group
105
                if g[2].positive_gradient_limit[0] is not None
106
            ]
107
        )
108
109
        self.INTEGER_FLOWS = Set(
110
            initialize=[(g[0], g[1]) for g in group if g[2].integer]
111
        )
112
113
        self.LIFETIME_FLOWS = Set(
114
            initialize=[
115
                (g[0], g[1])
116
                for g in group
117
                if g[2].lifetime is not None and g[2].age is None
118
            ]
119
        )
120
121
        self.LIFETIME_AGE_FLOWS = Set(
122
            initialize=[
123
                (g[0], g[1])
124
                for g in group
125
                if g[2].lifetime is not None and g[2].age is not None
126
            ]
127
        )
128
129
    def _create_variables(self, group):
130
        r"""Creates all variables for standard flows.
131
132
        All *Flow* objects are indexed by a starting and ending node
133
        :math:`(i, o)`, which is omitted in the following for the sake of
134
        convenience. The creation of some variables depend on the values of
135
        *Flow* attributes. The following variables are created:
136
137
        * :math:`P(p, t)`
138
            Actual flow value (created in :class:`~oemof.solph._models.Model`).
139
            The variable is bound to:
140
            :math:`f_\mathrm{min}(t) \cdot P_\mathrm{nom}
141
            \le P(p, t)
142
            \le f_\mathrm{max}(t) \cdot P_\mathrm{nom}`.
143
144
            If `Flow.fix` is not None the variable is bound to
145
            :math:`P(p, t) = f_\mathrm{fix}(t) \cdot P_\mathrm{nom}`.
146
147
        * :math:`ve_n` (`Flow.negative_gradient` is not `None`)
148
            Difference of a flow in consecutive timesteps if flow is reduced.
149
            The variable is bound to: :math:`0 \ge ve_n \ge ve_n^{max}`.
150
151
        * :math:`ve_p` (`Flow.positive_gradient` is not `None`)
152
            Difference of a flow in consecutive timesteps if flow is increased.
153
            The variable is bound to: :math:`0 \ge ve_p \ge ve_p^{max}`.
154
155
        The following variable is build for Flows with the attribute
156
        `integer_flows` being not None.
157
158
        * :math:`i` (`Flow.integer` is `True`)
159
            All flow values are integers. Variable is bound to non-negative
160
            integers.
161
        """
162
        m = self.parent_block()
163
164
        self.positive_gradient = Var(
165
            self.POSITIVE_GRADIENT_FLOWS, m.TIMESTEPS, within=NonNegativeReals
166
        )
167
168
        self.negative_gradient = Var(
169
            self.NEGATIVE_GRADIENT_FLOWS, m.TIMESTEPS, within=NonNegativeReals
170
        )
171
172
        self.integer_flow = Var(
173
            self.INTEGER_FLOWS, m.TIMESTEPS, within=NonNegativeIntegers
174
        )
175
        # set upper bound of gradient variable
176
        for i, o, f in group:
177
            if valid_sequence(
178
                m.flows[i, o].positive_gradient_limit, len(m.TIMESTEPS)
179
            ):
180
                for t in m.TIMESTEPS:
181
                    self.positive_gradient[i, o, t].setub(
182
                        f.positive_gradient_limit[t] * f.nominal_capacity
183
                    )
184
            if valid_sequence(
185
                m.flows[i, o].negative_gradient_limit, len(m.TIMESTEPS)
186
            ):
187
                for t in m.TIMESTEPS:
188
                    self.negative_gradient[i, o, t].setub(
189
                        f.negative_gradient_limit[t] * f.nominal_capacity
190
                    )
191
192
    def _create_constraints(self):
193
        r"""Creates all constraints for standard flows.
194
195
        The following constraints are created, if the appropriate attribute of
196
        the *Flow* (see :class:`~oemof.solph.flows._flow.Flow`) object is set:
197
198
        * `Flow.full_load_time_max` is not `None` (full_load_time_max_constr):
199
            .. math::
200
                \sum_t P(t) \cdot \tau \leq F_{max} \cdot P_{nom}
201
202
        * `Flow.full_load_time_min` is not `None` (full_load_time_min_constr):
203
            .. math::
204
                \sum_t P(t) \cdot \tau \geq F_{min} \cdot P_{nom}
205
206
        * `Flow.negative_gradient` is not `None` (negative_gradient_constr):
207
            .. math::
208
              P(t-1) - P(t) \geq ve_n(t)
209
210
        * `Flow.positive_gradient` is not `None` (positive_gradient_constr):
211
            .. math::
212
              P(t) - P(t-1) \geq ve_p(t)
213
214
        * `Flow.integer` is `True`
215
            .. math::
216
              P(t) = i(t)
217
        """
218
        m = self.parent_block()
219
220
        def _flow_full_load_time_max_rule(model):
221
            """Rule definition for build action of max. sum flow constraint."""
222
            for inp, out in self.FULL_LOAD_TIME_MAX_FLOWS:
223
                lhs = sum(
224
                    m.flow[inp, out, ts]
225
                    * m.timeincrement[ts]
226
                    * m.tsam_weighting[ts]
227
                    for ts in m.TIMESTEPS
228
                )
229
                rhs = (
230
                    m.flows[inp, out].full_load_time_max
231
                    * m.flows[inp, out].nominal_capacity
232
                )
233
                self.full_load_time_max_constr.add((inp, out), lhs <= rhs)
234
235
        self.full_load_time_max_constr = Constraint(
236
            self.FULL_LOAD_TIME_MAX_FLOWS, noruleinit=True
237
        )
238
        self.full_load_time_max_build = BuildAction(
239
            rule=_flow_full_load_time_max_rule
240
        )
241
242
        def _flow_full_load_time_min_rule(_):
243
            """Rule definition for build action of min. sum flow constraint."""
244
            for inp, out in self.FULL_LOAD_TIME_MIN_FLOWS:
245
                lhs = sum(
246
                    m.flow[inp, out, ts]
247
                    * m.timeincrement[ts]
248
                    * m.tsam_weighting[ts]
249
                    for ts in m.TIMESTEPS
250
                )
251
                rhs = (
252
                    m.flows[inp, out].full_load_time_min
253
                    * m.flows[inp, out].nominal_capacity
254
                )
255
                self.full_load_time_min_constr.add((inp, out), lhs >= rhs)
256
257
        self.full_load_time_min_constr = Constraint(
258
            self.FULL_LOAD_TIME_MIN_FLOWS, noruleinit=True
259
        )
260
        self.full_load_time_min_build = BuildAction(
261
            rule=_flow_full_load_time_min_rule
262
        )
263
264
        def _positive_gradient_flow_rule(_):
265
            """Rule definition for positive gradient constraint."""
266
            for inp, out in self.POSITIVE_GRADIENT_FLOWS:
267
                for index in range(1, len(m.TIMESTEPS) + 1):
268
                    if m.TIMESTEPS.at(index) > 0:
269
                        lhs = (
270
                            m.flow[
271
                                inp,
272
                                out,
273
                                m.TIMESTEPS.at(index),
274
                            ]
275
                            - m.flow[
276
                                inp,
277
                                out,
278
                                m.TIMESTEPS.at(index - 1),
279
                            ]
280
                        )
281
                        rhs = self.positive_gradient[
282
                            inp, out, m.TIMESTEPS.at(index)
283
                        ]
284
                        self.positive_gradient_constr.add(
285
                            (inp, out, m.TIMESTEPS.at(index)),
286
                            lhs <= rhs,
287
                        )
288
                    else:
289
                        lhs = self.positive_gradient[inp, out, 0]
290
                        rhs = 0
291
                        self.positive_gradient_constr.add(
292
                            (inp, out, m.TIMESTEPS.at(index)),
293
                            lhs == rhs,
294
                        )
295
296
        self.positive_gradient_constr = Constraint(
297
            self.POSITIVE_GRADIENT_FLOWS, m.TIMESTEPS, noruleinit=True
298
        )
299
        self.positive_gradient_build = BuildAction(
300
            rule=_positive_gradient_flow_rule
301
        )
302
303
        def _negative_gradient_flow_rule(model):
304
            """Rule definition for negative gradient constraint."""
305
            for inp, out in self.NEGATIVE_GRADIENT_FLOWS:
306
                for index in range(1, len(m.TIMESTEPS) + 1):
307
                    if m.TIMESTEPS.at(index) > 0:
308
                        lhs = (
309
                            m.flow[inp, out, m.TIMESTEPS.at(index - 1)]
310
                            - m.flow[inp, out, m.TIMESTEPS.at(index)]
311
                        )
312
                        rhs = self.negative_gradient[
313
                            inp, out, m.TIMESTEPS.at(index)
314
                        ]
315
                        self.negative_gradient_constr.add(
316
                            (inp, out, m.TIMESTEPS.at(index)),
317
                            lhs <= rhs,
318
                        )
319
                    else:
320
                        lhs = self.negative_gradient[inp, out, 0]
321
                        rhs = 0
322
                        self.negative_gradient_constr.add(
323
                            (inp, out, m.TIMESTEPS.at(index)),
324
                            lhs == rhs,
325
                        )
326
327
        self.negative_gradient_constr = Constraint(
328
            self.NEGATIVE_GRADIENT_FLOWS, m.TIMESTEPS, noruleinit=True
329
        )
330
        self.negative_gradient_build = BuildAction(
331
            rule=_negative_gradient_flow_rule
332
        )
333
334
        def _integer_flow_rule(_, ii, oi, ti):
335
            """Force flow variable to NonNegativeInteger values."""
336
            return self.integer_flow[ii, oi, ti] == m.flow[ii, oi, ti]
337
338
        self.integer_flow_constr = Constraint(
339
            self.INTEGER_FLOWS, m.TIMESTEPS, rule=_integer_flow_rule
340
        )
341
342
    def _objective_expression(self):
343
        r"""Objective expression for all standard flows with fixed costs
344
        and variable costs.
345
346
        Depending on the attributes of the `Flow` object the following parts of
347
        the objective function are created for a standard model:
348
349
        * `Flow.variable_costs` is not `None`:
350
            .. math::
351
              \sum_{(i,o)} \sum_t P(t) \cdot w(t) \cdot c_{var}(i, o, t)
352
353
        where :math:`w(t)` is the objective weighting.
354
355
        In a multi-period model, in contrast, the following parts of
356
        the objective function are created:
357
358
        * `Flow.variable_costs` is not `None`:
359
            .. math::
360
              \sum_{(i,o)} \sum_{p, t} P(p, t) \cdot w(t)
361
              \cdot c_{var}(i, o, t)
362
        """
363
        m = self.parent_block()
364
365
        variable_costs = 0
366
367
        if m.es.investment_times is None:
368
            for i, o in m.FLOWS:
369 View Code Duplication
                if valid_sequence(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
370
                    m.flows[i, o].variable_costs, len(m.TIMESTEPS)
371
                ):
372
                    for t in m.TIMESTEPS:
373
                        variable_costs += (
374
                            m.flow[i, o, t]
375
                            * m.objective_weighting[t]
376
                            * m.tsam_weighting[t]
377
                            * m.flows[i, o].variable_costs[t]
378
                        )
379
380
        else:
381
            for i, o in m.FLOWS:
382 View Code Duplication
                if valid_sequence(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
383
                    m.flows[i, o].variable_costs, len(m.TIMESTEPS)
384
                ):
385
                    for p, t in m.TIMEINDEX:
386
                        variable_costs += (
387
                            m.flow[i, o, t]
388
                            * m.objective_weighting[t]
389
                            * m.tsam_weighting[t]
390
                            * m.flows[i, o].variable_costs[t]
391
                        )
392
393
        self.variable_costs = Expression(expr=variable_costs)
394
        self.costs = Expression(expr=variable_costs)
395
396
        return self.costs
397