SimpleFlowBlock._objective_expression()   F
last analyzed

Complexity

Conditions 16

Size

Total Lines 125
Code Lines 55

Duplication

Lines 18
Ratio 14.4 %

Importance

Changes 0
Metric Value
eloc 55
dl 18
loc 125
rs 2.4
c 0
b 0
f 0
cc 16
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like solph.flows._simple_flow_block.SimpleFlowBlock._objective_expression() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
"""Creating sets, variables, constraints and parts of the objective function
4
for Flow objects with neither nonconvex nor investment options.
5
6
SPDX-FileCopyrightText: Uwe Krien <[email protected]>
7
SPDX-FileCopyrightText: Simon Hilpert
8
SPDX-FileCopyrightText: Cord Kaldemeyer
9
SPDX-FileCopyrightText: Stephan Günther
10
SPDX-FileCopyrightText: Birgit Schachler
11
SPDX-FileCopyrightText: jnnr
12
SPDX-FileCopyrightText: jmloenneberga
13
SPDX-FileCopyrightText: Pierre-François Duc
14
SPDX-FileCopyrightText: Saeed Sayadi
15
SPDX-FileCopyrightText: Johannes Kochems
16
17
SPDX-License-Identifier: MIT
18
19
"""
20
from pyomo.core import BuildAction
21
from pyomo.core import Constraint
22
from pyomo.core import Expression
23
from pyomo.core import NonNegativeIntegers
24
from pyomo.core import NonNegativeReals
25
from pyomo.core import Set
26
from pyomo.core import Var
27
from pyomo.core.base.block import ScalarBlock
28
29
from oemof.solph._plumbing import valid_sequence
30
31
32
class SimpleFlowBlock(ScalarBlock):
33
    r"""Flow block with definitions for standard flows.
34
35
    See :class:`~oemof.solph.flows._flow.Flow` class for all parameters of the
36
    *Flow*.
37
38
    .. automethod:: _create_constraints
39
    .. automethod:: _create_variables
40
    .. automethod:: _create_sets
41
42
    .. automethod:: _objective_expression
43
44
    Note
45
    ----
46
    See the :class:`~oemof.solph.flows._flow.Flow` class for the definition of
47
    all parameters from the "List of Parameters above.
48
49
    """  # noqa: E501
50
51
    def __init__(self, *args, **kwargs):
52
        super().__init__(*args, **kwargs)
53
54
    def _create(self, group=None):
55
        r"""Creates sets, variables and constraints for all standard flows.
56
57
        Parameters
58
        ----------
59
        group : list
60
            List containing tuples containing flow (f) objects and the
61
            associated source (s) and target (t)
62
            of flow e.g. groups=[(s1, t1, f1), (s2, t2, f2),..]
63
        """
64
        if group is None:
65
            return None
66
67
        self._create_sets(group)
68
        self._create_variables(group)
69
        self._create_constraints()
70
71
    def _create_sets(self, group):
72
        """
73
        Creates all sets for standard flows.
74
        """
75
        self.FULL_LOAD_TIME_MAX_FLOWS = Set(
76
            initialize=[
77
                (g[0], g[1])
78
                for g in group
79
                if g[2].full_load_time_max is not None
80
                and g[2].nominal_capacity is not None
81
            ]
82
        )
83
84
        self.FULL_LOAD_TIME_MIN_FLOWS = Set(
85
            initialize=[
86
                (g[0], g[1])
87
                for g in group
88
                if g[2].full_load_time_min is not None
89
                and g[2].nominal_capacity is not None
90
            ]
91
        )
92
93
        self.NEGATIVE_GRADIENT_FLOWS = Set(
94
            initialize=[
95
                (g[0], g[1])
96
                for g in group
97
                if g[2].negative_gradient_limit[0] is not None
98
            ]
99
        )
100
101
        self.POSITIVE_GRADIENT_FLOWS = Set(
102
            initialize=[
103
                (g[0], g[1])
104
                for g in group
105
                if g[2].positive_gradient_limit[0] is not None
106
            ]
107
        )
108
109
        self.INTEGER_FLOWS = Set(
110
            initialize=[(g[0], g[1]) for g in group if g[2].integer]
111
        )
112
113
        self.LIFETIME_FLOWS = Set(
114
            initialize=[
115
                (g[0], g[1])
116
                for g in group
117
                if g[2].lifetime is not None and g[2].age is None
118
            ]
119
        )
120
121
        self.LIFETIME_AGE_FLOWS = Set(
122
            initialize=[
123
                (g[0], g[1])
124
                for g in group
125
                if g[2].lifetime is not None and g[2].age is not None
126
            ]
127
        )
128
129
    def _create_variables(self, group):
130
        r"""Creates all variables for standard flows.
131
132
        All *Flow* objects are indexed by a starting and ending node
133
        :math:`(i, o)`, which is omitted in the following for the sake of
134
        convenience. The creation of some variables depend on the values of
135
        *Flow* attributes. The following variables are created:
136
137
        * :math:`P(p, t)`
138
            Actual flow value (created in :class:`~oemof.solph._models.Model`).
139
            The variable is bound to:
140
            :math:`f_\mathrm{min}(t) \cdot P_\mathrm{nom}
141
            \le P(p, t)
142
            \le f_\mathrm{max}(t) \cdot P_\mathrm{nom}`.
143
144
            If `Flow.fix` is not None the variable is bound to
145
            :math:`P(p, t) = f_\mathrm{fix}(t) \cdot P_\mathrm{nom}`.
146
147
        * :math:`ve_n` (`Flow.negative_gradient` is not `None`)
148
            Difference of a flow in consecutive timesteps if flow is reduced.
149
            The variable is bound to: :math:`0 \ge ve_n \ge ve_n^{max}`.
150
151
        * :math:`ve_p` (`Flow.positive_gradient` is not `None`)
152
            Difference of a flow in consecutive timesteps if flow is increased.
153
            The variable is bound to: :math:`0 \ge ve_p \ge ve_p^{max}`.
154
155
        The following variable is build for Flows with the attribute
156
        `integer_flows` being not None.
157
158
        * :math:`i` (`Flow.integer` is `True`)
159
            All flow values are integers. Variable is bound to non-negative
160
            integers.
161
        """
162
        m = self.parent_block()
163
164
        self.positive_gradient = Var(
165
            self.POSITIVE_GRADIENT_FLOWS, m.TIMESTEPS, within=NonNegativeReals
166
        )
167
168
        self.negative_gradient = Var(
169
            self.NEGATIVE_GRADIENT_FLOWS, m.TIMESTEPS, within=NonNegativeReals
170
        )
171
172
        self.integer_flow = Var(
173
            self.INTEGER_FLOWS, m.TIMESTEPS, within=NonNegativeIntegers
174
        )
175
        # set upper bound of gradient variable
176
        for i, o, f in group:
177
            if valid_sequence(
178
                m.flows[i, o].positive_gradient_limit, len(m.TIMESTEPS)
179
            ):
180
                for t in m.TIMESTEPS:
181
                    self.positive_gradient[i, o, t].setub(
182
                        f.positive_gradient_limit[t] * f.nominal_capacity
183
                    )
184
            if valid_sequence(
185
                m.flows[i, o].negative_gradient_limit, len(m.TIMESTEPS)
186
            ):
187
                for t in m.TIMESTEPS:
188
                    self.negative_gradient[i, o, t].setub(
189
                        f.negative_gradient_limit[t] * f.nominal_capacity
190
                    )
191
192
    def _create_constraints(self):
193
        r"""Creates all constraints for standard flows.
194
195
        The following constraints are created, if the appropriate attribute of
196
        the *Flow* (see :class:`~oemof.solph.flows._flow.Flow`) object is set:
197
198
        * `Flow.full_load_time_max` is not `None` (full_load_time_max_constr):
199
            .. math::
200
                \sum_t P(t) \cdot \tau \leq F_{max} \cdot P_{nom}
201
202
        * `Flow.full_load_time_min` is not `None` (full_load_time_min_constr):
203
            .. math::
204
                \sum_t P(t) \cdot \tau \geq F_{min} \cdot P_{nom}
205
206
        * `Flow.negative_gradient` is not `None` (negative_gradient_constr):
207
            .. math::
208
              P(t-1) - P(t) \geq ve_n(t)
209
210
        * `Flow.positive_gradient` is not `None` (positive_gradient_constr):
211
            .. math::
212
              P(t) - P(t-1) \geq ve_p(t)
213
214
        * `Flow.integer` is `True`
215
            .. math::
216
              P(t) = i(t)
217
        """
218
        m = self.parent_block()
219
220
        def _flow_full_load_time_max_rule(model):
221
            """Rule definition for build action of max. sum flow constraint."""
222
            for inp, out in self.FULL_LOAD_TIME_MAX_FLOWS:
223
                lhs = sum(
224
                    m.flow[inp, out, ts]
225
                    * m.timeincrement[ts]
226
                    * m.tsam_weighting[ts]
227
                    for ts in m.TIMESTEPS
228
                )
229
                rhs = (
230
                    m.flows[inp, out].full_load_time_max
231
                    * m.flows[inp, out].nominal_capacity
232
                )
233
                self.full_load_time_max_constr.add((inp, out), lhs <= rhs)
234
235
        self.full_load_time_max_constr = Constraint(
236
            self.FULL_LOAD_TIME_MAX_FLOWS, noruleinit=True
237
        )
238
        self.full_load_time_max_build = BuildAction(
239
            rule=_flow_full_load_time_max_rule
240
        )
241
242
        def _flow_full_load_time_min_rule(_):
243
            """Rule definition for build action of min. sum flow constraint."""
244
            for inp, out in self.FULL_LOAD_TIME_MIN_FLOWS:
245
                lhs = sum(
246
                    m.flow[inp, out, ts]
247
                    * m.timeincrement[ts]
248
                    * m.tsam_weighting[ts]
249
                    for ts in m.TIMESTEPS
250
                )
251
                rhs = (
252
                    m.flows[inp, out].full_load_time_min
253
                    * m.flows[inp, out].nominal_capacity
254
                )
255
                self.full_load_time_min_constr.add((inp, out), lhs >= rhs)
256
257
        self.full_load_time_min_constr = Constraint(
258
            self.FULL_LOAD_TIME_MIN_FLOWS, noruleinit=True
259
        )
260
        self.full_load_time_min_build = BuildAction(
261
            rule=_flow_full_load_time_min_rule
262
        )
263
264
        def _positive_gradient_flow_rule(_):
265
            """Rule definition for positive gradient constraint."""
266
            for inp, out in self.POSITIVE_GRADIENT_FLOWS:
267
                for index in range(1, len(m.TIMESTEPS) + 1):
268
                    if m.TIMESTEPS.at(index) > 0:
269
                        lhs = (
270
                            m.flow[
271
                                inp,
272
                                out,
273
                                m.TIMESTEPS.at(index),
274
                            ]
275
                            - m.flow[
276
                                inp,
277
                                out,
278
                                m.TIMESTEPS.at(index - 1),
279
                            ]
280
                        )
281
                        rhs = self.positive_gradient[
282
                            inp, out, m.TIMESTEPS.at(index)
283
                        ]
284
                        self.positive_gradient_constr.add(
285
                            (inp, out, m.TIMESTEPS.at(index)),
286
                            lhs <= rhs,
287
                        )
288
                    else:
289
                        lhs = self.positive_gradient[inp, out, 0]
290
                        rhs = 0
291
                        self.positive_gradient_constr.add(
292
                            (inp, out, m.TIMESTEPS.at(index)),
293
                            lhs == rhs,
294
                        )
295
296
        self.positive_gradient_constr = Constraint(
297
            self.POSITIVE_GRADIENT_FLOWS, m.TIMESTEPS, noruleinit=True
298
        )
299
        self.positive_gradient_build = BuildAction(
300
            rule=_positive_gradient_flow_rule
301
        )
302
303
        def _negative_gradient_flow_rule(model):
304
            """Rule definition for negative gradient constraint."""
305
            for inp, out in self.NEGATIVE_GRADIENT_FLOWS:
306
                for index in range(1, len(m.TIMESTEPS) + 1):
307
                    if m.TIMESTEPS.at(index) > 0:
308
                        lhs = (
309
                            m.flow[inp, out, m.TIMESTEPS.at(index - 1)]
310
                            - m.flow[inp, out, m.TIMESTEPS.at(index)]
311
                        )
312
                        rhs = self.negative_gradient[
313
                            inp, out, m.TIMESTEPS.at(index)
314
                        ]
315
                        self.negative_gradient_constr.add(
316
                            (inp, out, m.TIMESTEPS.at(index)),
317
                            lhs <= rhs,
318
                        )
319
                    else:
320
                        lhs = self.negative_gradient[inp, out, 0]
321
                        rhs = 0
322
                        self.negative_gradient_constr.add(
323
                            (inp, out, m.TIMESTEPS.at(index)),
324
                            lhs == rhs,
325
                        )
326
327
        self.negative_gradient_constr = Constraint(
328
            self.NEGATIVE_GRADIENT_FLOWS, m.TIMESTEPS, noruleinit=True
329
        )
330
        self.negative_gradient_build = BuildAction(
331
            rule=_negative_gradient_flow_rule
332
        )
333
334
        def _integer_flow_rule(_, ii, oi, ti):
335
            """Force flow variable to NonNegativeInteger values."""
336
            return self.integer_flow[ii, oi, ti] == m.flow[ii, oi, ti]
337
338
        self.integer_flow_constr = Constraint(
339
            self.INTEGER_FLOWS, m.TIMESTEPS, rule=_integer_flow_rule
340
        )
341
342
        if m.es.periods is not None:
343
344
            def _lifetime_output_rule(_):
345
                """Force flow value to zero when lifetime is reached"""
346
                for inp, out in self.LIFETIME_FLOWS:
347
                    for p, ts in m.TIMEINDEX:
348
                        if m.flows[inp, out].lifetime <= m.es.periods_years[p]:
349
                            lhs = m.flow[inp, out, ts]
350
                            rhs = 0
351
                            self.lifetime_output.add(
352
                                (inp, out, p, ts), (lhs == rhs)
353
                            )
354
355
            self.lifetime_output = Constraint(
356
                self.LIFETIME_FLOWS, m.TIMEINDEX, noruleinit=True
357
            )
358
            self.lifetime_output_build = BuildAction(
359
                rule=_lifetime_output_rule
360
            )
361
362
            def _lifetime_age_output_rule(block):
363
                """Force flow value to zero when lifetime is reached
364
                considering initial age
365
                """
366
                for inp, out in self.LIFETIME_AGE_FLOWS:
367
                    for p, ts in m.TIMEINDEX:
368
                        if (
369
                            m.flows[inp, out].lifetime - m.flows[inp, out].age
370
                            <= m.es.periods_years[p]
371
                        ):
372
                            lhs = m.flow[inp, out, ts]
373
                            rhs = 0
374
                            self.lifetime_age_output.add(
375
                                (inp, out, p, ts), (lhs == rhs)
376
                            )
377
378
            self.lifetime_age_output = Constraint(
379
                self.LIFETIME_AGE_FLOWS, m.TIMEINDEX, noruleinit=True
380
            )
381
            self.lifetime_age_output_build = BuildAction(
382
                rule=_lifetime_age_output_rule
383
            )
384
385
    def _objective_expression(self):
386
        r"""Objective expression for all standard flows with fixed costs
387
        and variable costs.
388
389
        Depending on the attributes of the `Flow` object the following parts of
390
        the objective function are created for a standard model:
391
392
        * `Flow.variable_costs` is not `None`:
393
            .. math::
394
              \sum_{(i,o)} \sum_t P(t) \cdot w(t) \cdot c_{var}(i, o, t)
395
396
        where :math:`w(t)` is the objective weighting.
397
398
        In a multi-period model, in contrast, the following parts of
399
        the objective function are created:
400
401
        * `Flow.variable_costs` is not `None`:
402
            .. math::
403
              \sum_{(i,o)} \sum_{p, t} P(p, t) \cdot w(t)
404
              \cdot c_{var}(i, o, t)
405
406
        * `Flow.fixed_costs` is not `None` and flow has no lifetime limit
407
            .. math::
408
              \sum_{(i,o)} \displaystyle \sum_{pp=0}^{year_{max}}
409
              P_{nominal} \cdot c_{fixed}(i, o, pp) \cdot DF^{-pp}
410
411
        * `Flow.fixed_costs` is not `None` and flow has a lifetime limit,
412
           but not an initial age
413
            .. math::
414
              \sum_{(i,o)} \displaystyle \sum_{pp=0}^{limit_{exo}}
415
              P_{nominal} \cdot c_{fixed}(i, o, pp) \cdot DF^{-pp}
416
417
        * `Flow.fixed_costs` is not `None` and flow has a lifetime limit,
418
           and an initial age
419
            .. math::
420
              \sum_{(i,o)} \displaystyle \sum_{pp=0}^{limit_{exo}} P_{nominal}
421
              \cdot c_{fixed}(i, o, pp) \cdot DF^{-pp}
422
423
        Hereby
424
425
        * :math:`DF(p) = (1 + dr)` is the discount factor for period :math:`p`
426
          and :math:`dr` is the discount rate.
427
        * :math:`n` is the unit lifetime and :math:`a` is the initial age.
428
        * :math:`year_{max}` denotes the last year of the optimization
429
          horizon, i.e. at the end of the last period.
430
        * :math:`limit_{exo}=min\{year_{max}, n - a\}` is used as an
431
          upper bound to ensure fixed costs for existing capacities to occur
432
          within the optimization horizon. :math:`a` is the initial age
433
          of an asset (or 0 if not specified).
434
        """
435
        m = self.parent_block()
436
437
        variable_costs = 0
438
        fixed_costs = 0
439
440
        if m.es.periods is None:
441
            for i, o in m.FLOWS:
442
                if valid_sequence(
443
                    m.flows[i, o].variable_costs, len(m.TIMESTEPS)
444
                ):
445
                    for t in m.TIMESTEPS:
446
                        variable_costs += (
447
                            m.flow[i, o, t]
448
                            * m.objective_weighting[t]
449
                            * m.tsam_weighting[t]
450
                            * m.flows[i, o].variable_costs[t]
451
                        )
452
453
        else:
454
            for i, o in m.FLOWS:
455
                if valid_sequence(
456
                    m.flows[i, o].variable_costs, len(m.TIMESTEPS)
457
                ):
458
                    for p, t in m.TIMEINDEX:
459
                        variable_costs += (
460
                            m.flow[i, o, t]
461
                            * m.objective_weighting[t]
462
                            * m.tsam_weighting[t]
463
                            * m.flows[i, o].variable_costs[t]
464
                            * ((1 + m.discount_rate) ** -m.es.periods_years[p])
465
                        )
466
467
                # Fixed costs for units with no lifetime limit
468
                if (
469
                    m.flows[i, o].fixed_costs[0] is not None
470
                    and m.flows[i, o].nominal_capacity is not None
471
                    and (i, o) not in self.LIFETIME_FLOWS
472
                    and (i, o) not in self.LIFETIME_AGE_FLOWS
473
                ):
474
                    fixed_costs += sum(
475
                        m.flows[i, o].nominal_capacity
476
                        * m.flows[i, o].fixed_costs[pp]
477
                        for pp in range(m.es.end_year_of_optimization)
478
                    )
479
480
            # Fixed costs for units with limited lifetime
481
            for i, o in self.LIFETIME_FLOWS:
482 View Code Duplication
                if valid_sequence(m.flows[i, o].fixed_costs, len(m.TIMESTEPS)):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
483
                    range_limit = min(
484
                        m.es.end_year_of_optimization,
485
                        m.flows[i, o].lifetime,
486
                    )
487
                    fixed_costs += sum(
488
                        m.flows[i, o].nominal_capacity
489
                        * m.flows[i, o].fixed_costs[pp]
490
                        for pp in range(range_limit)
491
                    )
492
493
            for i, o in self.LIFETIME_AGE_FLOWS:
494 View Code Duplication
                if valid_sequence(m.flows[i, o].fixed_costs, len(m.TIMESTEPS)):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
495
                    range_limit = min(
496
                        m.es.end_year_of_optimization,
497
                        m.flows[i, o].lifetime - m.flows[i, o].age,
498
                    )
499
                    fixed_costs += sum(
500
                        m.flows[i, o].nominal_capacity
501
                        * m.flows[i, o].fixed_costs[pp]
502
                        for pp in range(range_limit)
503
                    )
504
505
        self.variable_costs = Expression(expr=variable_costs)
506
        self.fixed_costs = Expression(expr=fixed_costs)
507
        self.costs = Expression(expr=variable_costs + fixed_costs)
508
509
        return self.costs
510