Completed
Push — dev ( ebef09...86ff08 )
by Uwe
17s queued 13s
created

solph.views.node()   D

Complexity

Conditions 11

Size

Total Lines 108
Code Lines 68

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 68
dl 0
loc 108
rs 4.8272
c 0
b 0
f 0
cc 11
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like solph.views.node() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
"""Modules for providing convenient views for solph results.
4
5
Information about the possible usage is provided within the examples.
6
7
SPDX-FileCopyrightText: Uwe Krien <[email protected]>
8
SPDX-FileCopyrightText: Simon Hilpert
9
SPDX-FileCopyrightText: Cord Kaldemeyer
10
SPDX-FileCopyrightText: Stephan Günther
11
SPDX-FileCopyrightText: henhuy
12
13
SPDX-License-Identifier: MIT
14
15
"""
16
import logging
17
from collections import OrderedDict
18
from enum import Enum
19
20
import pandas as pd
21
22
from oemof.solph.processing import convert_keys_to_strings
23
24
NONE_REPLACEMENT_STR = "_NONE_"
25
26
27
def node(results, node, multiindex=False, keep_none_type=False):
28
    """
29
    Obtain results for a single node e.g. a Bus or Component.
30
31
    Either a node or its label string can be passed.
32
    Results are written into a dictionary which is keyed by 'scalars' and
33
    'sequences' holding respective data in a pandas Series and DataFrame.
34
    """
35
36
    def replace_none(col_list, reverse=False):
37
        replacement = (
38
            (None, NONE_REPLACEMENT_STR)
39
            if reverse
40
            else (NONE_REPLACEMENT_STR, None)
41
        )
42
        changed_col_list = [
43
            (
44
                (
45
                    replacement[0] if n1 is replacement[1] else n1,
46
                    replacement[0] if n2 is replacement[1] else n2,
47
                ),
48
                f,
49
            )
50
            for (n1, n2), f in col_list
51
        ]
52
        return changed_col_list
53
54
    # convert to keys if only a string is passed
55
    if type(node) is str:
56
        results = convert_keys_to_strings(results, keep_none_type)
57
58
    filtered = {}
59
60
    # create a series with tuples as index labels for scalars
61
    scalars = {
62
        k: v["scalars"]
63
        for k, v in results.items()
64
        if node in k and not v["scalars"].empty
65
    }
66
    if scalars:
67
        # aggregate data
68
        filtered["scalars"] = pd.concat(scalars.values(), axis=0)
69
        # assign index values
70
        idx = {
71
            k: [c for c in v["scalars"].index]
72
            for k, v in results.items()
73
            if node in k and not v["scalars"].empty
74
        }
75
        idx = [tuple((k, m) for m in v) for k, v in idx.items()]
76
        idx = [i for sublist in idx for i in sublist]
77
        filtered["scalars"].index = idx
78
79
        # Sort index
80
        # (if Nones are present, they have to be replaced while sorting)
81
        if keep_none_type:
82
            filtered["scalars"].index = replace_none(
83
                filtered["scalars"].index.tolist()
84
            )
85
        filtered["scalars"].sort_index(axis=0, inplace=True)
86
        if keep_none_type:
87
            filtered["scalars"].index = replace_none(
88
                filtered["scalars"].index.tolist(), True
89
            )
90
91
        if multiindex:
92
            idx = pd.MultiIndex.from_tuples(
93
                [
94
                    tuple([row[0][0], row[0][1], row[1]])
95
                    for row in filtered["scalars"].index
96
                ]
97
            )
98
            idx.set_names(["from", "to", "type"], inplace=True)
99
            filtered["scalars"].index = idx
100
101
    # create a dataframe with tuples as column labels for sequences
102
    sequences = {
103
        k: v["sequences"]
104
        for k, v in results.items()
105
        if node in k and not v["sequences"].empty
106
    }
107
    if sequences:
108
        # aggregate data
109
        filtered["sequences"] = pd.concat(sequences.values(), axis=1)
110
        # assign column names
111
        cols = {
112
            k: [c for c in v["sequences"].columns]
113
            for k, v in results.items()
114
            if node in k and not v["sequences"].empty
115
        }
116
        cols = [tuple((k, m) for m in v) for k, v in cols.items()]
117
        cols = [c for sublist in cols for c in sublist]
118
        filtered["sequences"].columns = replace_none(cols)
119
        filtered["sequences"].sort_index(axis=1, inplace=True)
120
        filtered["sequences"].columns = replace_none(
121
            filtered["sequences"].columns, True
122
        )
123
124
        if multiindex:
125
            idx = pd.MultiIndex.from_tuples(
126
                [
127
                    tuple([col[0][0], col[0][1], col[1]])
128
                    for col in filtered["sequences"].columns
129
                ]
130
            )
131
            idx.set_names(["from", "to", "type"], inplace=True)
132
            filtered["sequences"].columns = idx
133
134
    return filtered
135
136
137
class NodeOption(str, Enum):
138
    All = "all"
139
    HasOutputs = "has_outputs"
140
    HasInputs = "has_inputs"
141
    HasOnlyOutputs = "has_only_outputs"
142
    HasOnlyInputs = "has_only_inputs"
143
144
145
def filter_nodes(results, option=NodeOption.All, exclude_busses=False):
146
    """Get set of nodes from results-dict for given node option.
147
148
    This function filters nodes from results for special needs. At the moment,
149
    the following options are available:
150
151
        * :attr:`NodeOption.All`: `'all'`: Returns all nodes
152
        * :attr:`NodeOption.HasOutputs`: `'has_outputs'`:
153
            Returns nodes with an output flow (eg. Transformer, Source)
154
        * :attr:`NodeOption.HasInputs`: `'has_inputs'`:
155
            Returns nodes with an input flow (eg. Transformer, Sink)
156
        * :attr:`NodeOption.HasOnlyOutputs`: `'has_only_outputs'`:
157
            Returns nodes having only output flows (eg. Source)
158
        * :attr:`NodeOption.HasOnlyInputs`: `'has_only_inputs'`:
159
            Returns nodes having only input flows (eg. Sink)
160
161
    Additionally, busses can be excluded by setting `exclude_busses` to
162
    `True`.
163
164
    Parameters
165
    ----------
166
    results: dict
167
    option: NodeOption
168
    exclude_busses: bool
169
        If set, all bus nodes are excluded from the resulting node set.
170
171
    Returns
172
    -------
173
    :obj:`set`
174
        A set of Nodes.
175
    """
176
    node_from, node_to = map(lambda x: set(x) - {None}, zip(*results))
177
    if option == NodeOption.All:
178
        nodes = node_from.union(node_to)
179
    elif option == NodeOption.HasOutputs:
180
        nodes = node_from
181
    elif option == NodeOption.HasInputs:
182
        nodes = node_to
183
    elif option == NodeOption.HasOnlyOutputs:
184
        nodes = node_from - node_to
185
    elif option == NodeOption.HasOnlyInputs:
186
        nodes = node_to - node_from
187
    else:
188
        raise ValueError('Invalid node option "' + str(option) + '"')
189
190
    if exclude_busses:
191
        return {n for n in nodes if not n.__class__.__name__ == "Bus"}
192
    else:
193
        return nodes
194
195
196
def get_node_by_name(results, *names):
197
    """
198
    Searches results for nodes
199
200
    Names are looked up in nodes from results and either returned single node
201
    (in case only one name is given) or as list of nodes. If name is not found,
202
    None is returned.
203
    """
204
    nodes = filter_nodes(results)
205
    if len(names) == 1:
206
        return next(filter(lambda x: str(x) == names[0], nodes), None)
207
    else:
208
        node_names = {str(n): n for n in nodes}
209
        return [node_names.get(n, None) for n in names]
210
211
212 View Code Duplication
def node_weight_by_type(results, node_type):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
213
    """
214
    Extracts node weights (if exist) of all components of the specified
215
    `node_type`.
216
217
    Node weight are endogenous optimzation variables associated with the node
218
    and not the edge between two node, foxample the variable representing the
219
    storage level.
220
221
    Parameters
222
    ----------
223
    results: dict
224
        A result dictionary from a solved oemof.solph.Model object
225
    node_type: oemof.solph class
226
        Specifies the type for which node weights should be collected
227
228
    Example
229
    --------
230
    from oemof.outputlib import views
231
232
    # solve oemof model 'm'
233
    # Then collect node weights
234
    views.node_weight_by_type(m.results(), node_type=solph.GenericStorage)
235
    """
236
237
    group = {
238
        k: v["sequences"]
239
        for k, v in results.items()
240
        if isinstance(k[0], node_type) and k[1] is None
241
    }
242
    if not group:
243
        logging.error(
244
            "No node weights for nodes of type `{}`".format(node_type)
245
        )
246
        return None
247
    else:
248
        df = convert_to_multiindex(
249
            group, index_names=["node", "to", "weight_type"], droplevel=[1]
250
        )
251
        return df
252
253
254 View Code Duplication
def node_input_by_type(results, node_type, droplevel=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
255
    """Gets all inputs for all nodes of the type `node_type` and returns
256
    a dataframe.
257
258
    Parameters
259
    ----------
260
    results: dict
261
        A result dictionary from a solved oemof.solph.Model object
262
    node_type: oemof.solph class
263
        Specifies the type of the node for that inputs are selected
264
    droplevel: list
265
266
    Notes
267
    -----
268
    from oemof import solph
269
    from oemof.outputlib import views
270
271
    # solve oemof solph model 'm'
272
    # Then collect node weights
273
    views.node_input_by_type(m.results(), node_type=solph.Sink)
274
    """
275
    if droplevel is None:
276
        droplevel = []
277
278
    group = {
279
        k: v["sequences"]
280
        for k, v in results.items()
281
        if isinstance(k[1], node_type) and k[0] is not None
282
    }
283
284
    if not group:
285
        logging.info("No nodes of type `{}`".format(node_type))
286
        return None
287
    else:
288
        df = convert_to_multiindex(group, droplevel=droplevel)
289
        return df
290
291
292 View Code Duplication
def node_output_by_type(results, node_type, droplevel=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
293
    """Gets all outputs for all nodes of the type `node_type` and returns
294
    a dataframe.
295
296
    Parameters
297
    ----------
298
    results: dict
299
        A result dictionary from a solved oemof.solph.Model object
300
    node_type: oemof.solph class
301
        Specifies the type of the node for that outputs are selected
302
    droplevel: list
303
304
    Notes
305
    -----
306
    import oemof.solph as solph
307
    from oemof.outputlib import views
308
309
    # solve oemof solph model 'm'
310
    # Then collect node weights
311
    views.node_output_by_type(m.results(), node_type=solph.Transformer)
312
    """
313
    if droplevel is None:
314
        droplevel = []
315
    group = {
316
        k: v["sequences"]
317
        for k, v in results.items()
318
        if isinstance(k[0], node_type) and k[1] is not None
319
    }
320
321
    if not group:
322
        logging.info("No nodes of type `{}`".format(node_type))
323
        return None
324
    else:
325
        df = convert_to_multiindex(group, droplevel=droplevel)
326
        return df
327
328
329
def net_storage_flow(results, node_type):
330
    """Calculates the net storage flow for storage models that have one
331
    input edge and one output edge both with flows within the domain of
332
    non-negative reals.
333
334
    Parameters
335
    ----------
336
    results: dict
337
        A result dictionary from a solved oemof.solph.Model object
338
    node_type: oemof.solph class
339
        Specifies the type for which (storage) type net flows are calculated
340
341
    Returns
342
    -------
343
    pandas.DataFrame object with multiindex colums. Names of levels of columns
344
    are: from, to, net_flow.
345
346
    Examples
347
    --------
348
    import oemof.solph as solph
349
    from oemof.outputlib import views
350
351
    # solve oemof solph model 'm'
352
    # Then collect node weights
353
    views.net_storage_flow(m.results(), node_type=solph.GenericStorage)
354
    """
355
356
    group = {
357
        k: v["sequences"]
358
        for k, v in results.items()
359
        if isinstance(k[0], node_type) or isinstance(k[1], node_type)
360
    }
361
362
    if not group:
363
        logging.info("No nodes of type `{}`".format(node_type))
364
        return None
365
366
    df = convert_to_multiindex(group)
367
368
    if "storage_content" not in df.columns.get_level_values(2).unique():
369
        return None
370
371
    x = df.xs("storage_content", axis=1, level=2).columns.values
372
    labels = [s for s, t in x]
373
374
    dataframes = []
375
376
    for lb in labels:
377
        subset = df.groupby(
378
            lambda x1: (
379
                lambda fr, to, ty: "output"
380
                if (fr == lb and ty == "flow")
381
                else "input"
382
                if (to == lb and ty == "flow")
383
                else "level"
384
                if (fr == lb and ty != "flow")
385
                else None
386
            )(*x1),
387
            axis=1,
388
        ).sum()
389
390
        subset["net_flow"] = subset["output"] - subset["input"]
391
392
        subset.columns = pd.MultiIndex.from_product(
393
            [[lb], [o for o in lb.outputs], subset.columns]
394
        )
395
396
        dataframes.append(
397
            subset.loc[:, (slice(None), slice(None), "net_flow")]
398
        )
399
400
    return pd.concat(dataframes, axis=1)
401
402
403
def convert_to_multiindex(group, index_names=None, droplevel=None):
404
    """Convert dict to pandas DataFrame with multiindex
405
406
    Parameters
407
    ----------
408
    group: dict
409
        Sequences of the oemof.solph.Model.results dictionary
410
    index_names: arraylike
411
        Array with names of the MultiIndex
412
    droplevel: arraylike
413
        List containing levels to be dropped from the dataframe
414
    """
415
    if index_names is None:
416
        index_names = ["from", "to", "type"]
417
    if droplevel is None:
418
        droplevel = []
419
420
    sorted_group = OrderedDict((k, group[k]) for k in sorted(group))
421
    df = pd.concat(sorted_group.values(), axis=1)
422
423
    cols = OrderedDict((k, v.columns) for k, v in sorted_group.items())
424
    cols = [tuple((k, m) for m in v) for k, v in cols.items()]
425
    cols = [c for sublist in cols for c in sublist]
426
    idx = pd.MultiIndex.from_tuples(
427
        [tuple([col[0][0], col[0][1], col[1]]) for col in cols]
428
    )
429
    idx.set_names(index_names, inplace=True)
430
    df.columns = idx
431
    df.columns = df.columns.droplevel(droplevel)
432
433
    return df
434