solph.views   B
last analyzed

Complexity

Total Complexity 43

Size/Duplication

Total Lines 475
Duplicated Lines 20.84 %

Importance

Changes 0
Metric Value
wmc 43
eloc 194
dl 99
loc 475
rs 8.96
c 0
b 0
f 0

8 Functions

Rating   Name   Duplication   Size   Complexity  
D node() 0 115 12
B filter_nodes() 0 49 8
A get_node_by_name() 0 14 3
A node_output_by_type() 31 41 3
A node_input_by_type() 32 42 3
A node_weight_by_type() 36 46 2
C net_storage_flow() 0 87 9
A convert_to_multiindex() 0 31 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like solph.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
"""Modules for providing convenient views for solph results.
4
5
See examples for to learn about the possible usage of the provided functions.
6
7
SPDX-FileCopyrightText: Uwe Krien <[email protected]>
8
SPDX-FileCopyrightText: Simon Hilpert
9
SPDX-FileCopyrightText: Cord Kaldemeyer
10
SPDX-FileCopyrightText: Stephan Günther
11
SPDX-FileCopyrightText: henhuy
12
SPDX-FileCopyrightText: Johannes Kochems
13
14
SPDX-License-Identifier: MIT
15
16
"""
17
import logging
18
from collections import OrderedDict
19
from enum import Enum
20
21
import pandas as pd
22
23
from oemof.solph.processing import convert_keys_to_strings
24
25
NONE_REPLACEMENT_STR = "_NONE_"
26
27
28
def node(results, node, multiindex=False, keep_none_type=False):
29
    """
30
    Obtain results for a single node e.g. a Bus or Component.
31
32
    Either a node or its label string can be passed.
33
    Results are written into a dictionary which is keyed by 'scalars'
34
    (resp. 'periods_scalars' for a multi-period model) and
35
    'sequences' holding respective data in a pandas Series (resp. DataFrame)
36
    and DataFrame.
37
    """
38
39
    def replace_none(col_list, reverse=False):
40
        replacement = (
41
            (None, NONE_REPLACEMENT_STR)
42
            if reverse
43
            else (NONE_REPLACEMENT_STR, None)
44
        )
45
        changed_col_list = [
46
            (
47
                (
48
                    replacement[0] if n1 is replacement[1] else n1,
49
                    replacement[0] if n2 is replacement[1] else n2,
50
                ),
51
                f,
52
            )
53
            for (n1, n2), f in col_list
54
        ]
55
        return changed_col_list
56
57
    # convert to keys if only a string is passed
58
    if type(node) is str:
59
        results = convert_keys_to_strings(results, keep_none_type)
60
61
    filtered = {}
62
63
    # create a series with tuples as index labels for scalars
64
    scalars_col = "scalars"
65
    # Check for multi-period model (different naming)
66
    if "period_scalars" in list(list(results.values())[0].keys()):
67
        scalars_col = "period_scalars"
68
69
    scalars = {
70
        k: v[scalars_col]
71
        for k, v in results.items()
72
        if node in k and not v[scalars_col].empty
73
    }
74
    if scalars:
75
        # aggregate data
76
        filtered[scalars_col] = pd.concat(scalars.values(), axis=0)
77
        # assign index values
78
        idx = {
79
            k: [c for c in v[scalars_col].index]
80
            for k, v in results.items()
81
            if node in k and not v[scalars_col].empty
82
        }
83
        idx = [tuple((k, m) for m in v) for k, v in idx.items()]
84
        idx = [i for sublist in idx for i in sublist]
85
        filtered[scalars_col].index = idx
86
87
        # Sort index
88
        # (if Nones are present, they have to be replaced while sorting)
89
        if keep_none_type:
90
            filtered[scalars_col].index = replace_none(
91
                filtered[scalars_col].index.tolist()
92
            )
93
        filtered[scalars_col].sort_index(axis=0, inplace=True)
94
        if keep_none_type:
95
            filtered[scalars_col].index = replace_none(
96
                filtered[scalars_col].index.tolist(), True
97
            )
98
99
        if multiindex:
100
            idx = pd.MultiIndex.from_tuples(
101
                [
102
                    tuple([row[0][0], row[0][1], row[1]])
103
                    for row in filtered[scalars_col].index
104
                ]
105
            )
106
            idx.set_names(["from", "to", "type"], inplace=True)
107
            filtered[scalars_col].index = idx
108
109
    # create a dataframe with tuples as column labels for sequences
110
    sequences = {
111
        k: v["sequences"]
112
        for k, v in results.items()
113
        if node in k and not v["sequences"].empty
114
    }
115
    if sequences:
116
        # aggregate data
117
        filtered["sequences"] = pd.concat(sequences.values(), axis=1)
118
        # assign column names
119
        cols = {
120
            k: [c for c in v["sequences"].columns]
121
            for k, v in results.items()
122
            if node in k and not v["sequences"].empty
123
        }
124
        cols = [tuple((k, m) for m in v) for k, v in cols.items()]
125
        cols = [c for sublist in cols for c in sublist]
126
        filtered["sequences"].columns = replace_none(cols)
127
        filtered["sequences"].sort_index(axis=1, inplace=True)
128
        filtered["sequences"].columns = replace_none(
129
            filtered["sequences"].columns, True
130
        )
131
132
        if multiindex:
133
            idx = pd.MultiIndex.from_tuples(
134
                [
135
                    tuple([col[0][0], col[0][1], col[1]])
136
                    for col in filtered["sequences"].columns
137
                ]
138
            )
139
            idx.set_names(["from", "to", "type"], inplace=True)
140
            filtered["sequences"].columns = idx
141
142
    return filtered
143
144
145
class NodeOption(str, Enum):
146
    All = "all"
147
    HasOutputs = "has_outputs"
148
    HasInputs = "has_inputs"
149
    HasOnlyOutputs = "has_only_outputs"
150
    HasOnlyInputs = "has_only_inputs"
151
152
153
def filter_nodes(results, option=NodeOption.All, exclude_busses=False):
154
    """Get set of nodes from results-dict for given node option.
155
156
    This function filters nodes from results for special needs. At the moment,
157
    the following options are available:
158
159
        * :attr:`NodeOption.All`: `'all'`: Returns all nodes
160
        * :attr:`NodeOption.HasOutputs`: `'has_outputs'`:
161
            Returns nodes with an output flow (eg. Converter, Source)
162
        * :attr:`NodeOption.HasInputs`: `'has_inputs'`:
163
            Returns nodes with an input flow (eg. Converter, Sink)
164
        * :attr:`NodeOption.HasOnlyOutputs`: `'has_only_outputs'`:
165
            Returns nodes having only output flows (eg. Source)
166
        * :attr:`NodeOption.HasOnlyInputs`: `'has_only_inputs'`:
167
            Returns nodes having only input flows (eg. Sink)
168
169
    Additionally, busses can be excluded by setting `exclude_busses` to
170
    `True`.
171
172
    Parameters
173
    ----------
174
    results: dict
175
    option: NodeOption
176
    exclude_busses: bool
177
        If set, all bus nodes are excluded from the resulting node set.
178
179
    Returns
180
    -------
181
    :obj:`set`
182
        A set of Nodes.
183
    """
184
    node_from, node_to = map(lambda x: set(x) - {None}, zip(*results))
185
    if option == NodeOption.All:
186
        nodes = node_from.union(node_to)
187
    elif option == NodeOption.HasOutputs:
188
        nodes = node_from
189
    elif option == NodeOption.HasInputs:
190
        nodes = node_to
191
    elif option == NodeOption.HasOnlyOutputs:
192
        nodes = node_from - node_to
193
    elif option == NodeOption.HasOnlyInputs:
194
        nodes = node_to - node_from
195
    else:
196
        raise ValueError('Invalid node option "' + str(option) + '"')
197
198
    if exclude_busses:
199
        return {n for n in nodes if not n.__class__.__name__ == "Bus"}
200
    else:
201
        return nodes
202
203
204
def get_node_by_name(results, *names):
205
    """
206
    Searches results for nodes
207
208
    Names are looked up in nodes from results and either returned single node
209
    (in case only one name is given) or as list of nodes. If name is not found,
210
    None is returned.
211
    """
212
    nodes = filter_nodes(results)
213
    if len(names) == 1:
214
        return next(filter(lambda x: str(x) == names[0], nodes), None)
215
    else:
216
        node_names = {str(n): n for n in nodes}
217
        return [node_names.get(n, None) for n in names]
218
219
220 View Code Duplication
def node_weight_by_type(results, node_type):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
221
    """
222
    Extracts node weights (if exist) of all components of the specified
223
    `node_type`.
224
225
    Node weight are endogenous optimzation variables associated with the node
226
    and not the edge between two node, foxample the variable representing the
227
    storage level.
228
229
    Parameters
230
    ----------
231
    results: dict
232
        A result dictionary from a solved oemof.solph.Model object
233
    node_type: oemof.solph class
234
        Specifies the type for which node weights should be collected,
235
        e.g. solph.components.GenericStorage
236
237
    Example
238
    --------
239
    ::
240
241
        from oemof.solph import views
242
243
        # solve oemof model 'm'
244
        # Then collect node weights
245
        views.node_weight_by_type(
246
            m.results(),
247
           node_type=solph.components.GenericStorage
248
        )
249
    """
250
251
    group = {
252
        k: v["sequences"]
253
        for k, v in results.items()
254
        if isinstance(k[0], node_type) and k[1] is None
255
    }
256
    if not group:
257
        logging.error(
258
            "No node weights for nodes of type `{}`".format(node_type)
259
        )
260
        return None
261
    else:
262
        df = convert_to_multiindex(
263
            group, index_names=["node", "to", "weight_type"], droplevel=[1]
264
        )
265
        return df
266
267
268 View Code Duplication
def node_input_by_type(results, node_type, droplevel=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
269
    """Gets all inputs for all nodes of the type `node_type` and returns
270
    a dataframe.
271
272
    Parameters
273
    ----------
274
    results: dict
275
        A result dictionary from a solved oemof.solph.Model object
276
    node_type: oemof.solph class
277
        Specifies the type of the node for that inputs are selected,
278
        e.g. solph.components.Sink
279
    droplevel: list
280
281
    Examples
282
    -----
283
    ::
284
285
        from oemof import solph
286
        from oemof.solph import views
287
288
        # solve oemof solph model 'm'
289
        # Then collect node weights
290
        views.node_input_by_type(
291
            m.results(),
292
            node_type=solph.components.Sink
293
        )
294
    """
295
    if droplevel is None:
296
        droplevel = []
297
298
    group = {
299
        k: v["sequences"]
300
        for k, v in results.items()
301
        if isinstance(k[1], node_type) and k[0] is not None
302
    }
303
304
    if not group:
305
        logging.info("No nodes of type `{}`".format(node_type))
306
        return None
307
    else:
308
        df = convert_to_multiindex(group, droplevel=droplevel)
309
        return df
310
311
312 View Code Duplication
def node_output_by_type(results, node_type, droplevel=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
313
    """Gets all outputs for all nodes of the type `node_type` and returns
314
    a dataframe.
315
316
    Parameters
317
    ----------
318
    results: dict
319
        A result dictionary from a solved oemof.solph.Model object
320
    node_type: oemof.solph class
321
        Specifies the type of the node for that outputs are selected,
322
        e.g. solph.components.Converter
323
    droplevel: list
324
325
    Examples
326
    --------
327
    ::
328
329
        import oemof.solph as solph
330
        from oemof.solph import views
331
332
        # solve oemof solph model 'm'
333
        # Then collect node weights
334
        views.node_output_by_type(
335
            m.results(),
336
            node_type=solph.components.Converter
337
        )
338
    """
339
    if droplevel is None:
340
        droplevel = []
341
    group = {
342
        k: v["sequences"]
343
        for k, v in results.items()
344
        if isinstance(k[0], node_type) and k[1] is not None
345
    }
346
347
    if not group:
348
        logging.info("No nodes of type `{}`".format(node_type))
349
        return None
350
    else:
351
        df = convert_to_multiindex(group, droplevel=droplevel)
352
        return df
353
354
355
def net_storage_flow(results, node_type):
356
    """Calculates the net storage flow for storage models that have one
357
    input edge and one output edge both with flows within the domain of
358
    non-negative reals.
359
360
    Parameters
361
    ----------
362
    results: dict
363
        A result dictionary from a solved oemof.solph.Model object
364
    node_type: oemof.solph class
365
        Specifies the type for which (storage) type net flows are calculated,
366
        e.g. solph.components.GenericStorage
367
368
    Returns
369
    -------
370
    pandas.DataFrame object with multiindex colums. Names of levels of columns
371
        are: from, to, net_flow.
372
373
    Examples
374
    --------
375
    ::
376
377
        import oemof.solph as solph
378
        from oemof.solph import views
379
380
        # solve oemof solph model 'm'
381
        # Then collect node weights
382
        views.net_storage_flow(
383
            m.results(),
384
            node_type=solph.components.GenericStorage
385
        )
386
    """
387
388
    group = {
389
        k: v["sequences"]
390
        for k, v in results.items()
391
        if isinstance(k[0], node_type) or isinstance(k[1], node_type)
392
    }
393
394
    if not group:
395
        logging.info("No nodes of type `{}`".format(node_type))
396
        return None
397
398
    df = convert_to_multiindex(group)
399
400
    if "storage_content" not in df.columns.get_level_values(2).unique():
401
        return None
402
403
    x = df.xs("storage_content", axis=1, level=2).columns.values
404
    labels = [s for s, t in x]
405
406
    dataframes = []
407
408
    for lb in labels:
409
        subset = (
410
            df.T.groupby(
411
                lambda x1: (
412
                    lambda fr, to, ty: (
413
                        "output"
414
                        if (fr == lb and ty == "flow")
415
                        else (
416
                            "input"
417
                            if (to == lb and ty == "flow")
418
                            else (
419
                                "level"
420
                                if (fr == lb and ty != "flow")
421
                                else None
422
                            )
423
                        )
424
                    )
425
                )(*x1)
426
            )
427
            .sum()
428
            .T
429
        )
430
431
        subset["net_flow"] = subset["output"] - subset["input"]
432
433
        subset.columns = pd.MultiIndex.from_product(
434
            [[lb], [o for o in lb.outputs], subset.columns]
435
        )
436
437
        dataframes.append(
438
            subset.loc[:, (slice(None), slice(None), "net_flow")]
439
        )
440
441
    return pd.concat(dataframes, axis=1)
442
443
444
def convert_to_multiindex(group, index_names=None, droplevel=None):
445
    """Convert dict to pandas DataFrame with multiindex
446
447
    Parameters
448
    ----------
449
    group: dict
450
        Sequences of the oemof.solph.Model.results dictionary
451
    index_names: arraylike
452
        Array with names of the MultiIndex
453
    droplevel: arraylike
454
        List containing levels to be dropped from the dataframe
455
    """
456
    if index_names is None:
457
        index_names = ["from", "to", "type"]
458
    if droplevel is None:
459
        droplevel = []
460
461
    sorted_group = OrderedDict((k, group[k]) for k in sorted(group))
462
    df = pd.concat(sorted_group.values(), axis=1)
463
464
    cols = OrderedDict((k, v.columns) for k, v in sorted_group.items())
465
    cols = [tuple((k, m) for m in v) for k, v in cols.items()]
466
    cols = [c for sublist in cols for c in sublist]
467
    idx = pd.MultiIndex.from_tuples(
468
        [tuple([col[0][0], col[0][1], col[1]]) for col in cols]
469
    )
470
    idx.set_names(index_names, inplace=True)
471
    df.columns = idx
472
    df.columns = df.columns.droplevel(droplevel)
473
474
    return df
475