Passed
Pull Request — dev (#1195)
by Patrik
01:47
created

solph.processing.results()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
3
"""Modules for providing a convenient data structure for solph results.
4
5
Information about the possible usage is provided within the examples.
6
7
SPDX-FileCopyrightText: Uwe Krien <[email protected]>
8
SPDX-FileCopyrightText: Simon Hilpert
9
SPDX-FileCopyrightText: Cord Kaldemeyer
10
SPDX-FileCopyrightText: Stephan Günther
11
SPDX-FileCopyrightText: henhuy
12
SPDX-FileCopyrightText: Johannes Kochems
13
SPDX-FileCopyrightText: Patrik Schönfeldt <[email protected]>
14
15
SPDX-License-Identifier: MIT
16
17
"""
18
import numbers
19
from collections import abc
20
21
import pandas as pd
22
23
from ._plumbing import _FakeSequence
24
from ._processing import results as new_results
25
from .helpers import flatten
26
27
28
def convert_keys_to_strings(result, keep_none_type=False):
29
    """
30
    Convert the dictionary keys to strings.
31
32
    All (tuple) keys of the result object e.g. results[(pp1, bus1)] are
33
    converted into strings that represent the object labels
34
    e.g. results[('pp1','bus1')].
35
    """
36
    if keep_none_type:
37
        converted = {
38
            (
39
                tuple([str(e) if e is not None else None for e in k])
40
                if isinstance(k, tuple)
41
                else str(k) if k is not None else None
42
            ): v
43
            for k, v in result.items()
44
        }
45
    else:
46
        converted = {
47
            tuple(map(str, k)) if isinstance(k, tuple) else str(k): v
48
            for k, v in result.items()
49
        }
50
    return converted
51
52
53
def results(model, remove_last_time_point=False):
54
    return new_results(
55
        model=model,
56
        remove_last_time_point=remove_last_time_point,
57
    )
58
59
60
def meta_results(om, undefined=False):
61
    """
62
    Fetch some metadata from the Solver. Feel free to add more keys.
63
64
    Valid keys of the resulting dictionary are: 'objective', 'problem',
65
    'solver'.
66
67
    om : oemof.solph.Model
68
        A solved Model.
69
    undefined : bool
70
        By default (False) only defined keys can be found in the dictionary.
71
        Set to True to get also the undefined keys.
72
73
    Returns
74
    -------
75
    dict
76
    """
77
    meta_res = {"objective": om.objective()}
78
79
    for k1 in ["Problem", "Solver"]:
80
        k1 = k1.lower()
81
        meta_res[k1] = {}
82
        for k2, v2 in om.es.results[k1][0].items():
83
            try:
84
                if str(om.es.results[k1][0][k2]) == "<undefined>":
85
                    if undefined:
86
                        meta_res[k1][k2] = str(om.es.results[k1][0][k2])
87
                else:
88
                    meta_res[k1][k2] = om.es.results[k1][0][k2]
89
            except TypeError:
90
                if undefined:
91
                    msg = "Cannot fetch meta results of type {0}"
92
                    meta_res[k1][k2] = msg.format(
93
                        type(om.es.results[k1][0][k2])
94
                    )
95
96
    return meta_res
97
98
99
def __separate_attrs(
100
    system, exclude_attrs, get_flows=False, exclude_none=True
101
):
102
    """
103
    Create a dictionary with flow scalars and series.
104
105
    The dictionary is structured with flows as tuples and nested dictionaries
106
    holding the scalars and series e.g.
107
    {(node1, node2): {'scalars': {'attr1': scalar, 'attr2': 'text'},
108
    'sequences': {'attr1': iterable, 'attr2': iterable}}}
109
110
    system:
111
        A solved oemof.solph.Model or oemof.solph.Energysystem
112
    exclude_attrs: List[str]
113
        List of additional attributes which shall be excluded from
114
        parameter dict
115
    get_flows: bool
116
        Whether to include flow values or not
117
    exclude_none: bool
118
        If set, scalars and sequences containing None values are excluded
119
120
    Returns
121
    -------
122
    dict
123
    """
124
125
    def detect_scalars_and_sequences(com):
126
        scalars = {}
127
        sequences = {}
128
129
        default_exclusions = [
130
            "__",
131
            "_",
132
            "registry",
133
            "inputs",
134
            "outputs",
135
            "Label",
136
            "input",
137
            "output",
138
            "constraint_group",
139
        ]
140
        # Must be tuple in order to work with `str.startswith()`:
141
        exclusions = tuple(default_exclusions + exclude_attrs)
142
        attrs = [
143
            i
144
            for i in dir(com)
145
            if not (i.startswith(exclusions) or callable(getattr(com, i)))
146
        ]
147
148
        for a in attrs:
149
            attr_value = getattr(com, a)
150
151
            # Iterate trough investment and add scalars and sequences with
152
            # "investment" prefix to component data:
153
            if attr_value.__class__.__name__ == "Investment":
154
                invest_data = detect_scalars_and_sequences(attr_value)
155
                scalars.update(
156
                    {
157
                        "investment_" + str(k): v
158
                        for k, v in invest_data["scalars"].items()
159
                    }
160
                )
161
                sequences.update(
162
                    {
163
                        "investment_" + str(k): v
164
                        for k, v in invest_data["sequences"].items()
165
                    }
166
                )
167
                continue
168
169
            if isinstance(attr_value, str):
170
                scalars[a] = attr_value
171
                continue
172
173
            # If the label is a tuple it is iterable, therefore it should be
174
            # converted to a string. Otherwise, it will be a sequence.
175
            if a == "label":
176
                attr_value = str(attr_value)
177
178
            if isinstance(attr_value, abc.Iterable):
179
                sequences[a] = attr_value
180
            elif isinstance(attr_value, _FakeSequence):
181
                scalars[a] = attr_value.value
182
            else:
183
                scalars[a] = attr_value
184
185
        sequences = flatten(sequences)
186
187
        com_data = {
188
            "scalars": scalars,
189
            "sequences": sequences,
190
        }
191
        move_undetected_scalars(com_data)
192
        if exclude_none:
193
            remove_nones(com_data)
194
195
        com_data = {
196
            "scalars": pd.Series(com_data["scalars"]),
197
            "sequences": pd.DataFrame(com_data["sequences"]),
198
        }
199
        return com_data
200
201
    def move_undetected_scalars(com):
202
        for ckey, value in list(com["sequences"].items()):
203
            if isinstance(value, (str, numbers.Number)):
204
                com["scalars"][ckey] = value
205
                del com["sequences"][ckey]
206
            elif isinstance(value, _FakeSequence):
207
                com["scalars"][ckey] = value.value
208
                del com["sequences"][ckey]
209
            elif len(value) == 0:
210
                del com["sequences"][ckey]
211
212
    def remove_nones(com):
213
        for ckey, value in list(com["scalars"].items()):
214
            if value is None:
215
                del com["scalars"][ckey]
216
        for ckey, value in list(com["sequences"].items()):
217
            if len(value) == 0 or value[0] is None:
218
                del com["sequences"][ckey]
219
220
    # Check if system is es or om:
221
    if system.__class__.__name__ == "EnergySystem":
222
        components = system.flows() if get_flows else system.nodes
223
    else:
224
        components = system.flows if get_flows else system.es.nodes
225
226
    data = {}
227
    for com_key in components:
228
        component = components[com_key] if get_flows else com_key
229
        component_data = detect_scalars_and_sequences(component)
230
        comkey = com_key if get_flows else (com_key, None)
231
        data[comkey] = component_data
232
    return data
233
234
235
def parameter_as_dict(system, exclude_none=True, exclude_attrs=None):
236
    """
237
    Create a result dictionary containing node parameters.
238
239
    Results are written into a dictionary of pandas objects where
240
    a Series holds all scalar values and a dataframe all sequences for nodes
241
    and flows.
242
    The dictionary is keyed by flows (n, n) and nodes (n, None), e.g.
243
    `parameter[(n, n)]['sequences']` or `parameter[(n, n)]['scalars']`.
244
245
    Parameters
246
    ----------
247
    system: energy_system.EnergySystem
248
        A populated energy system.
249
    exclude_none: bool
250
        If True, all scalars and sequences containing None values are excluded
251
    exclude_attrs: Optional[List[str]]
252
        Optional list of additional attributes which shall be excluded from
253
        parameter dict
254
255
    Returns
256
    -------
257
    dict: Parameters for all nodes and flows
258
    """
259
260
    if exclude_attrs is None:
261
        exclude_attrs = []
262
263
    flow_data = __separate_attrs(
264
        system, exclude_attrs, get_flows=True, exclude_none=exclude_none
265
    )
266
    node_data = __separate_attrs(
267
        system, exclude_attrs, get_flows=False, exclude_none=exclude_none
268
    )
269
270
    flow_data.update(node_data)
271
    return flow_data
272