Passed
Pull Request — dev (#1195)
by Patrik
01:47
created

solph._processing.results()   F

Complexity

Conditions 15

Size

Total Lines 115
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 53
dl 0
loc 115
rs 2.9998
c 0
b 0
f 0
cc 15
nop 3

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like solph._processing.results() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
"""Compatibility wrapper of solph.Results for providing solph results in the
4
structure of the output of old processing.results(model).
5
6
SPDX-FileCopyrightText: Patrik Schönfeldt <[email protected]>
7
8
SPDX-License-Identifier: MIT
9
"""
10
11
import warnings
12
from itertools import groupby
13
14
import pandas as pd
15
from oemof.tools import debugging
16
17
from ._models import Model
18
from ._results import Results
19
20
21
def results(
22
    model: Model,
23
    remove_last_time_point: bool = False,
24
    scalar_data: list[str] | None = None,
25
):
26
    """Create a nested result dictionary from the result DataFrame
27
28
    The results from Pyomo from the Results object are
29
    transferred into a nested dictionary of pandas objects.
30
    The first level key of that dictionary is a node
31
    (denoting the respective flow or component).
32
33
    The second level keys are "sequences" and "scalars":
34
35
    * A pd.DataFrame holds all results that are time-dependent, i.e. given as
36
      a sequence and can be indexed with the energy system's timeindex.
37
    * A pd.Series holds all scalar values which are applicable for timestep 0
38
      (i.e. investments).
39
40
    Models with more than one time for investments are not supported.
41
    In these models, investments are sequential data,
42
    but with a second time imdex. As this is a compatibility layer,
43
    we did not add support for this new feature.
44
    Instead, use of the Results object is advised.
45
46
    Parameters
47
    ----------
48
    model : oemof.solph.Model
49
        A solved oemof.solph model.
50
    remove_last_time_point : bool
51
        The last time point of all TIMEPOINT variables is removed to get the
52
        same length as the TIMESTEP (interval) variables without getting
53
        nan-values. By default, the last time point is removed if it has not
54
        been defined by the user in the EnergySystem but inferred. If all
55
        time points have been defined explicitly by the user the last time
56
        point will not be removed by default. In that case all interval
57
        variables will get one row with nan-values to have the same index
58
        for all variables.
59
    scalar_data :  list[str]
60
        List of variables to be treated as scalar data (see above).
61
    sequence_data: list[str]
62
        List of variables to be treated as sequential data (see above).
63
    """
64
65
    meta_result_keys = ["Solution", "Problem", "Solver"]
66
67
    if scalar_data is None:
68
        scalar_data = ["invest", "total"]
69
70
    result_dict = {}
71
    with warnings.catch_warnings():
72
        warnings.filterwarnings(
73
            "ignore", category=debugging.ExperimentalFeatureWarning
74
        )
75
        result_object = Results(model)
76
77
    timeindex = model.es.timeindex
78
79
    if remove_last_time_point:
80
        timeindex = timeindex[:-1]
81
82
    def _handle_scalar(data):
83
        return data.iloc[0]
84
85
    def _handle_sequence(data):
86
        return data
87
88
    for result_key in result_object.keys():
89
        if result_key not in meta_result_keys:
90
            if result_key in scalar_data:
91
                result_type = "scalars"
92
                data_handler = _handle_scalar
93
            else:
94
                result_type = "sequences"
95
                data_handler = _handle_sequence
96
97
            index = result_object[result_key].columns
98
            for item in index:
99
                if isinstance(index, pd.MultiIndex):
100
                    node_tuple = item
101
                else:
102
                    node_tuple = (item, None)
103
                if node_tuple not in result_dict:
104
                    result_dict[node_tuple] = {
105
                        "scalars": pd.Series(),
106
                        "sequences": pd.DataFrame(index=timeindex),
107
                    }
108
109
                data = result_object[result_key][item]
110
                result_dict[node_tuple][result_type][result_key] = (
111
                    data_handler(data)
112
                )
113
114
    if model.dual is not None:
115
        grouped = groupby(
116
            sorted(model.BusBlock.balance.iterkeys()), lambda t: t[0]
117
        )
118
        for bus, timestep in grouped:
119
            duals = [
120
                model.dual[model.BusBlock.balance[bus, t]] for _, t in timestep
121
            ]
122
            if model.es.periods is None:
123
                df = pd.DataFrame({"duals": duals}, index=timeindex[:-1])
124
            # TODO: Align with standard model
125
            else:
126
                df = pd.DataFrame({"duals": duals}, index=timeindex)
127
            if (bus, None) not in result_dict.keys():
128
                result_dict[(bus, None)] = {
129
                    "sequences": df,
130
                    "scalars": pd.Series(dtype=float),
131
                }
132
            else:
133
                result_dict[(bus, None)]["sequences"]["duals"] = duals
134
135
    return result_dict
136