| Total Complexity | 42 |
| Total Lines | 434 |
| Duplicated Lines | 22.81 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like solph.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 2 | |||
| 3 | """Modules for providing convenient views for solph results. |
||
| 4 | |||
| 5 | Information about the possible usage is provided within the examples. |
||
| 6 | |||
| 7 | SPDX-FileCopyrightText: Uwe Krien <[email protected]> |
||
| 8 | SPDX-FileCopyrightText: Simon Hilpert |
||
| 9 | SPDX-FileCopyrightText: Cord Kaldemeyer |
||
| 10 | SPDX-FileCopyrightText: Stephan Günther |
||
| 11 | SPDX-FileCopyrightText: henhuy |
||
| 12 | |||
| 13 | SPDX-License-Identifier: MIT |
||
| 14 | |||
| 15 | """ |
||
| 16 | import logging |
||
| 17 | from collections import OrderedDict |
||
| 18 | from enum import Enum |
||
| 19 | |||
| 20 | import pandas as pd |
||
| 21 | |||
| 22 | from oemof.solph.processing import convert_keys_to_strings |
||
| 23 | |||
| 24 | NONE_REPLACEMENT_STR = "_NONE_" |
||
| 25 | |||
| 26 | |||
| 27 | def node(results, node, multiindex=False, keep_none_type=False): |
||
| 28 | """ |
||
| 29 | Obtain results for a single node e.g. a Bus or Component. |
||
| 30 | |||
| 31 | Either a node or its label string can be passed. |
||
| 32 | Results are written into a dictionary which is keyed by 'scalars' and |
||
| 33 | 'sequences' holding respective data in a pandas Series and DataFrame. |
||
| 34 | """ |
||
| 35 | |||
| 36 | def replace_none(col_list, reverse=False): |
||
| 37 | replacement = ( |
||
| 38 | (None, NONE_REPLACEMENT_STR) |
||
| 39 | if reverse |
||
| 40 | else (NONE_REPLACEMENT_STR, None) |
||
| 41 | ) |
||
| 42 | changed_col_list = [ |
||
| 43 | ( |
||
| 44 | ( |
||
| 45 | replacement[0] if n1 is replacement[1] else n1, |
||
| 46 | replacement[0] if n2 is replacement[1] else n2, |
||
| 47 | ), |
||
| 48 | f, |
||
| 49 | ) |
||
| 50 | for (n1, n2), f in col_list |
||
| 51 | ] |
||
| 52 | return changed_col_list |
||
| 53 | |||
| 54 | # convert to keys if only a string is passed |
||
| 55 | if type(node) is str: |
||
| 56 | results = convert_keys_to_strings(results, keep_none_type) |
||
| 57 | |||
| 58 | filtered = {} |
||
| 59 | |||
| 60 | # create a series with tuples as index labels for scalars |
||
| 61 | scalars = { |
||
| 62 | k: v["scalars"] |
||
| 63 | for k, v in results.items() |
||
| 64 | if node in k and not v["scalars"].empty |
||
| 65 | } |
||
| 66 | if scalars: |
||
| 67 | # aggregate data |
||
| 68 | filtered["scalars"] = pd.concat(scalars.values(), axis=0) |
||
| 69 | # assign index values |
||
| 70 | idx = { |
||
| 71 | k: [c for c in v["scalars"].index] |
||
| 72 | for k, v in results.items() |
||
| 73 | if node in k and not v["scalars"].empty |
||
| 74 | } |
||
| 75 | idx = [tuple((k, m) for m in v) for k, v in idx.items()] |
||
| 76 | idx = [i for sublist in idx for i in sublist] |
||
| 77 | filtered["scalars"].index = idx |
||
| 78 | |||
| 79 | # Sort index |
||
| 80 | # (if Nones are present, they have to be replaced while sorting) |
||
| 81 | if keep_none_type: |
||
| 82 | filtered["scalars"].index = replace_none( |
||
| 83 | filtered["scalars"].index.tolist() |
||
| 84 | ) |
||
| 85 | filtered["scalars"].sort_index(axis=0, inplace=True) |
||
| 86 | if keep_none_type: |
||
| 87 | filtered["scalars"].index = replace_none( |
||
| 88 | filtered["scalars"].index.tolist(), True |
||
| 89 | ) |
||
| 90 | |||
| 91 | if multiindex: |
||
| 92 | idx = pd.MultiIndex.from_tuples( |
||
| 93 | [ |
||
| 94 | tuple([row[0][0], row[0][1], row[1]]) |
||
| 95 | for row in filtered["scalars"].index |
||
| 96 | ] |
||
| 97 | ) |
||
| 98 | idx.set_names(["from", "to", "type"], inplace=True) |
||
| 99 | filtered["scalars"].index = idx |
||
| 100 | |||
| 101 | # create a dataframe with tuples as column labels for sequences |
||
| 102 | sequences = { |
||
| 103 | k: v["sequences"] |
||
| 104 | for k, v in results.items() |
||
| 105 | if node in k and not v["sequences"].empty |
||
| 106 | } |
||
| 107 | if sequences: |
||
| 108 | # aggregate data |
||
| 109 | filtered["sequences"] = pd.concat(sequences.values(), axis=1) |
||
| 110 | # assign column names |
||
| 111 | cols = { |
||
| 112 | k: [c for c in v["sequences"].columns] |
||
| 113 | for k, v in results.items() |
||
| 114 | if node in k and not v["sequences"].empty |
||
| 115 | } |
||
| 116 | cols = [tuple((k, m) for m in v) for k, v in cols.items()] |
||
| 117 | cols = [c for sublist in cols for c in sublist] |
||
| 118 | filtered["sequences"].columns = replace_none(cols) |
||
| 119 | filtered["sequences"].sort_index(axis=1, inplace=True) |
||
| 120 | filtered["sequences"].columns = replace_none( |
||
| 121 | filtered["sequences"].columns, True |
||
| 122 | ) |
||
| 123 | |||
| 124 | if multiindex: |
||
| 125 | idx = pd.MultiIndex.from_tuples( |
||
| 126 | [ |
||
| 127 | tuple([col[0][0], col[0][1], col[1]]) |
||
| 128 | for col in filtered["sequences"].columns |
||
| 129 | ] |
||
| 130 | ) |
||
| 131 | idx.set_names(["from", "to", "type"], inplace=True) |
||
| 132 | filtered["sequences"].columns = idx |
||
| 133 | |||
| 134 | return filtered |
||
| 135 | |||
| 136 | |||
| 137 | class NodeOption(str, Enum): |
||
| 138 | All = "all" |
||
| 139 | HasOutputs = "has_outputs" |
||
| 140 | HasInputs = "has_inputs" |
||
| 141 | HasOnlyOutputs = "has_only_outputs" |
||
| 142 | HasOnlyInputs = "has_only_inputs" |
||
| 143 | |||
| 144 | |||
| 145 | def filter_nodes(results, option=NodeOption.All, exclude_busses=False): |
||
| 146 | """Get set of nodes from results-dict for given node option. |
||
| 147 | |||
| 148 | This function filters nodes from results for special needs. At the moment, |
||
| 149 | the following options are available: |
||
| 150 | |||
| 151 | * :attr:`NodeOption.All`: `'all'`: Returns all nodes |
||
| 152 | * :attr:`NodeOption.HasOutputs`: `'has_outputs'`: |
||
| 153 | Returns nodes with an output flow (eg. Transformer, Source) |
||
| 154 | * :attr:`NodeOption.HasInputs`: `'has_inputs'`: |
||
| 155 | Returns nodes with an input flow (eg. Transformer, Sink) |
||
| 156 | * :attr:`NodeOption.HasOnlyOutputs`: `'has_only_outputs'`: |
||
| 157 | Returns nodes having only output flows (eg. Source) |
||
| 158 | * :attr:`NodeOption.HasOnlyInputs`: `'has_only_inputs'`: |
||
| 159 | Returns nodes having only input flows (eg. Sink) |
||
| 160 | |||
| 161 | Additionally, busses can be excluded by setting `exclude_busses` to |
||
| 162 | `True`. |
||
| 163 | |||
| 164 | Parameters |
||
| 165 | ---------- |
||
| 166 | results: dict |
||
| 167 | option: NodeOption |
||
| 168 | exclude_busses: bool |
||
| 169 | If set, all bus nodes are excluded from the resulting node set. |
||
| 170 | |||
| 171 | Returns |
||
| 172 | ------- |
||
| 173 | :obj:`set` |
||
| 174 | A set of Nodes. |
||
| 175 | """ |
||
| 176 | node_from, node_to = map(lambda x: set(x) - {None}, zip(*results)) |
||
| 177 | if option == NodeOption.All: |
||
| 178 | nodes = node_from.union(node_to) |
||
| 179 | elif option == NodeOption.HasOutputs: |
||
| 180 | nodes = node_from |
||
| 181 | elif option == NodeOption.HasInputs: |
||
| 182 | nodes = node_to |
||
| 183 | elif option == NodeOption.HasOnlyOutputs: |
||
| 184 | nodes = node_from - node_to |
||
| 185 | elif option == NodeOption.HasOnlyInputs: |
||
| 186 | nodes = node_to - node_from |
||
| 187 | else: |
||
| 188 | raise ValueError('Invalid node option "' + str(option) + '"') |
||
| 189 | |||
| 190 | if exclude_busses: |
||
| 191 | return {n for n in nodes if not n.__class__.__name__ == "Bus"} |
||
| 192 | else: |
||
| 193 | return nodes |
||
| 194 | |||
| 195 | |||
| 196 | def get_node_by_name(results, *names): |
||
| 197 | """ |
||
| 198 | Searches results for nodes |
||
| 199 | |||
| 200 | Names are looked up in nodes from results and either returned single node |
||
| 201 | (in case only one name is given) or as list of nodes. If name is not found, |
||
| 202 | None is returned. |
||
| 203 | """ |
||
| 204 | nodes = filter_nodes(results) |
||
| 205 | if len(names) == 1: |
||
| 206 | return next(filter(lambda x: str(x) == names[0], nodes), None) |
||
| 207 | else: |
||
| 208 | node_names = {str(n): n for n in nodes} |
||
| 209 | return [node_names.get(n, None) for n in names] |
||
| 210 | |||
| 211 | |||
| 212 | View Code Duplication | def node_weight_by_type(results, node_type): |
|
|
|
|||
| 213 | """ |
||
| 214 | Extracts node weights (if exist) of all components of the specified |
||
| 215 | `node_type`. |
||
| 216 | |||
| 217 | Node weight are endogenous optimzation variables associated with the node |
||
| 218 | and not the edge between two node, foxample the variable representing the |
||
| 219 | storage level. |
||
| 220 | |||
| 221 | Parameters |
||
| 222 | ---------- |
||
| 223 | results: dict |
||
| 224 | A result dictionary from a solved oemof.solph.Model object |
||
| 225 | node_type: oemof.solph class |
||
| 226 | Specifies the type for which node weights should be collected |
||
| 227 | |||
| 228 | Example |
||
| 229 | -------- |
||
| 230 | from oemof.outputlib import views |
||
| 231 | |||
| 232 | # solve oemof model 'm' |
||
| 233 | # Then collect node weights |
||
| 234 | views.node_weight_by_type(m.results(), node_type=solph.GenericStorage) |
||
| 235 | """ |
||
| 236 | |||
| 237 | group = { |
||
| 238 | k: v["sequences"] |
||
| 239 | for k, v in results.items() |
||
| 240 | if isinstance(k[0], node_type) and k[1] is None |
||
| 241 | } |
||
| 242 | if not group: |
||
| 243 | logging.error( |
||
| 244 | "No node weights for nodes of type `{}`".format(node_type) |
||
| 245 | ) |
||
| 246 | return None |
||
| 247 | else: |
||
| 248 | df = convert_to_multiindex( |
||
| 249 | group, index_names=["node", "to", "weight_type"], droplevel=[1] |
||
| 250 | ) |
||
| 251 | return df |
||
| 252 | |||
| 253 | |||
| 254 | View Code Duplication | def node_input_by_type(results, node_type, droplevel=None): |
|
| 255 | """Gets all inputs for all nodes of the type `node_type` and returns |
||
| 256 | a dataframe. |
||
| 257 | |||
| 258 | Parameters |
||
| 259 | ---------- |
||
| 260 | results: dict |
||
| 261 | A result dictionary from a solved oemof.solph.Model object |
||
| 262 | node_type: oemof.solph class |
||
| 263 | Specifies the type of the node for that inputs are selected |
||
| 264 | droplevel: list |
||
| 265 | |||
| 266 | Notes |
||
| 267 | ----- |
||
| 268 | from oemof import solph |
||
| 269 | from oemof.outputlib import views |
||
| 270 | |||
| 271 | # solve oemof solph model 'm' |
||
| 272 | # Then collect node weights |
||
| 273 | views.node_input_by_type(m.results(), node_type=solph.Sink) |
||
| 274 | """ |
||
| 275 | if droplevel is None: |
||
| 276 | droplevel = [] |
||
| 277 | |||
| 278 | group = { |
||
| 279 | k: v["sequences"] |
||
| 280 | for k, v in results.items() |
||
| 281 | if isinstance(k[1], node_type) and k[0] is not None |
||
| 282 | } |
||
| 283 | |||
| 284 | if not group: |
||
| 285 | logging.info("No nodes of type `{}`".format(node_type)) |
||
| 286 | return None |
||
| 287 | else: |
||
| 288 | df = convert_to_multiindex(group, droplevel=droplevel) |
||
| 289 | return df |
||
| 290 | |||
| 291 | |||
| 292 | View Code Duplication | def node_output_by_type(results, node_type, droplevel=None): |
|
| 293 | """Gets all outputs for all nodes of the type `node_type` and returns |
||
| 294 | a dataframe. |
||
| 295 | |||
| 296 | Parameters |
||
| 297 | ---------- |
||
| 298 | results: dict |
||
| 299 | A result dictionary from a solved oemof.solph.Model object |
||
| 300 | node_type: oemof.solph class |
||
| 301 | Specifies the type of the node for that outputs are selected |
||
| 302 | droplevel: list |
||
| 303 | |||
| 304 | Notes |
||
| 305 | ----- |
||
| 306 | import oemof.solph as solph |
||
| 307 | from oemof.outputlib import views |
||
| 308 | |||
| 309 | # solve oemof solph model 'm' |
||
| 310 | # Then collect node weights |
||
| 311 | views.node_output_by_type(m.results(), node_type=solph.Transformer) |
||
| 312 | """ |
||
| 313 | if droplevel is None: |
||
| 314 | droplevel = [] |
||
| 315 | group = { |
||
| 316 | k: v["sequences"] |
||
| 317 | for k, v in results.items() |
||
| 318 | if isinstance(k[0], node_type) and k[1] is not None |
||
| 319 | } |
||
| 320 | |||
| 321 | if not group: |
||
| 322 | logging.info("No nodes of type `{}`".format(node_type)) |
||
| 323 | return None |
||
| 324 | else: |
||
| 325 | df = convert_to_multiindex(group, droplevel=droplevel) |
||
| 326 | return df |
||
| 327 | |||
| 328 | |||
| 329 | def net_storage_flow(results, node_type): |
||
| 330 | """Calculates the net storage flow for storage models that have one |
||
| 331 | input edge and one output edge both with flows within the domain of |
||
| 332 | non-negative reals. |
||
| 333 | |||
| 334 | Parameters |
||
| 335 | ---------- |
||
| 336 | results: dict |
||
| 337 | A result dictionary from a solved oemof.solph.Model object |
||
| 338 | node_type: oemof.solph class |
||
| 339 | Specifies the type for which (storage) type net flows are calculated |
||
| 340 | |||
| 341 | Returns |
||
| 342 | ------- |
||
| 343 | pandas.DataFrame object with multiindex colums. Names of levels of columns |
||
| 344 | are: from, to, net_flow. |
||
| 345 | |||
| 346 | Examples |
||
| 347 | -------- |
||
| 348 | import oemof.solph as solph |
||
| 349 | from oemof.outputlib import views |
||
| 350 | |||
| 351 | # solve oemof solph model 'm' |
||
| 352 | # Then collect node weights |
||
| 353 | views.net_storage_flow(m.results(), node_type=solph.GenericStorage) |
||
| 354 | """ |
||
| 355 | |||
| 356 | group = { |
||
| 357 | k: v["sequences"] |
||
| 358 | for k, v in results.items() |
||
| 359 | if isinstance(k[0], node_type) or isinstance(k[1], node_type) |
||
| 360 | } |
||
| 361 | |||
| 362 | if not group: |
||
| 363 | logging.info("No nodes of type `{}`".format(node_type)) |
||
| 364 | return None |
||
| 365 | |||
| 366 | df = convert_to_multiindex(group) |
||
| 367 | |||
| 368 | if "storage_content" not in df.columns.get_level_values(2).unique(): |
||
| 369 | return None |
||
| 370 | |||
| 371 | x = df.xs("storage_content", axis=1, level=2).columns.values |
||
| 372 | labels = [s for s, t in x] |
||
| 373 | |||
| 374 | dataframes = [] |
||
| 375 | |||
| 376 | for lb in labels: |
||
| 377 | subset = df.groupby( |
||
| 378 | lambda x1: ( |
||
| 379 | lambda fr, to, ty: "output" |
||
| 380 | if (fr == lb and ty == "flow") |
||
| 381 | else "input" |
||
| 382 | if (to == lb and ty == "flow") |
||
| 383 | else "level" |
||
| 384 | if (fr == lb and ty != "flow") |
||
| 385 | else None |
||
| 386 | )(*x1), |
||
| 387 | axis=1, |
||
| 388 | ).sum() |
||
| 389 | |||
| 390 | subset["net_flow"] = subset["output"] - subset["input"] |
||
| 391 | |||
| 392 | subset.columns = pd.MultiIndex.from_product( |
||
| 393 | [[lb], [o for o in lb.outputs], subset.columns] |
||
| 394 | ) |
||
| 395 | |||
| 396 | dataframes.append( |
||
| 397 | subset.loc[:, (slice(None), slice(None), "net_flow")] |
||
| 398 | ) |
||
| 399 | |||
| 400 | return pd.concat(dataframes, axis=1) |
||
| 401 | |||
| 402 | |||
| 403 | def convert_to_multiindex(group, index_names=None, droplevel=None): |
||
| 404 | """Convert dict to pandas DataFrame with multiindex |
||
| 405 | |||
| 406 | Parameters |
||
| 407 | ---------- |
||
| 408 | group: dict |
||
| 409 | Sequences of the oemof.solph.Model.results dictionary |
||
| 410 | index_names: arraylike |
||
| 411 | Array with names of the MultiIndex |
||
| 412 | droplevel: arraylike |
||
| 413 | List containing levels to be dropped from the dataframe |
||
| 414 | """ |
||
| 415 | if index_names is None: |
||
| 416 | index_names = ["from", "to", "type"] |
||
| 417 | if droplevel is None: |
||
| 418 | droplevel = [] |
||
| 419 | |||
| 420 | sorted_group = OrderedDict((k, group[k]) for k in sorted(group)) |
||
| 421 | df = pd.concat(sorted_group.values(), axis=1) |
||
| 422 | |||
| 423 | cols = OrderedDict((k, v.columns) for k, v in sorted_group.items()) |
||
| 424 | cols = [tuple((k, m) for m in v) for k, v in cols.items()] |
||
| 425 | cols = [c for sublist in cols for c in sublist] |
||
| 426 | idx = pd.MultiIndex.from_tuples( |
||
| 427 | [tuple([col[0][0], col[0][1], col[1]]) for col in cols] |
||
| 428 | ) |
||
| 429 | idx.set_names(index_names, inplace=True) |
||
| 430 | df.columns = idx |
||
| 431 | df.columns = df.columns.droplevel(droplevel) |
||
| 432 | |||
| 433 | return df |
||
| 434 |