1
|
|
|
# -*- coding: utf-8 - |
2
|
|
|
|
3
|
|
|
"""Basic tests. |
4
|
|
|
|
5
|
|
|
This file is part of project oemof (github.com/oemof/oemof). It's copyrighted |
6
|
|
|
by the contributors recorded in the version control history of the file, |
7
|
|
|
available from its original location oemof/tests/basic_tests.py |
8
|
|
|
|
9
|
|
|
SPDX-FileCopyrightText: Stephan Günther <> |
10
|
|
|
SPDX-FileCopyrightText: Uwe Krien <[email protected]> |
11
|
|
|
SPDX-FileCopyrightText: Simon Hilpert <> |
12
|
|
|
SPDX-FileCopyrightText: Cord Kaldemeyer <> |
13
|
|
|
|
14
|
|
|
SPDX-License-Identifier: MIT |
15
|
|
|
""" |
16
|
|
|
from collections.abc import Iterable |
17
|
|
|
from itertools import chain |
18
|
|
|
from pprint import pformat |
19
|
|
|
|
20
|
|
|
from oemof.network import energy_system as es |
21
|
|
|
from oemof.network.groupings import Flows |
22
|
|
|
from oemof.network.groupings import FlowsWithNodes as FWNs |
23
|
|
|
from oemof.network.groupings import Grouping |
24
|
|
|
from oemof.network.groupings import Nodes |
25
|
|
|
from oemof.network.network import Bus |
26
|
|
|
from oemof.network.network import Node |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
class TestsEnergySystem: |
30
|
|
|
def setup(self): |
31
|
|
|
self.es = es.EnergySystem() |
32
|
|
|
|
33
|
|
|
def test_entity_grouping_on_construction(self): |
34
|
|
|
bus = Bus(label="test bus") |
35
|
|
|
ensys = es.EnergySystem(entities=[bus]) |
36
|
|
|
assert ensys.groups[bus.label] is bus |
37
|
|
|
|
38
|
|
|
def test_that_nodes_is_a_proper_alias_for_entities(self): |
39
|
|
|
b1, b2 = Bus(label="B1"), Bus(label="B2") |
40
|
|
|
self.es.add(b1, b2) |
41
|
|
|
assert self.es.nodes == [b1, b2] |
42
|
|
|
empty = [] |
43
|
|
|
self.es.nodes = empty |
44
|
|
|
assert self.es.entities is empty |
45
|
|
|
|
46
|
|
|
def test_that_none_is_not_a_valid_group(self): |
47
|
|
|
def by_uid(n): |
48
|
|
|
if "Not in 'Group'" in n.uid: |
49
|
|
|
return None |
50
|
|
|
else: |
51
|
|
|
return "Group" |
52
|
|
|
|
53
|
|
|
ensys = es.EnergySystem(groupings=[by_uid]) |
54
|
|
|
|
55
|
|
|
ungrouped = [ |
56
|
|
|
Node(uid="Not in 'Group': {}".format(i)) for i in range(10) |
57
|
|
|
] |
58
|
|
|
grouped = [Node(uid="In 'Group': {}".format(i)) for i in range(10)] |
59
|
|
|
assert None not in ensys.groups |
60
|
|
|
for g in ensys.groups.values(): |
61
|
|
|
for e in ungrouped: |
62
|
|
|
if isinstance(g, Iterable) and not isinstance(g, str): |
63
|
|
|
assert e not in g |
64
|
|
|
for e in grouped: |
65
|
|
|
if isinstance(g, Iterable) and not isinstance(g, str): |
66
|
|
|
assert e in g |
67
|
|
|
|
68
|
|
|
def test_defining_multiple_groupings_with_one_function(self): |
69
|
|
|
def assign_to_multiple_groups_in_one_go(n): |
70
|
|
|
g1 = n.label[-1] |
71
|
|
|
g2 = n.label[0:3] |
72
|
|
|
return [g1, g2] |
73
|
|
|
|
74
|
|
|
ensy = es.EnergySystem(groupings=[assign_to_multiple_groups_in_one_go]) |
75
|
|
|
nodes = [ |
76
|
|
|
Node( |
77
|
|
|
label=("Foo: " if i % 2 == 0 else "Bar: ") |
78
|
|
|
+ "{}".format(i) |
79
|
|
|
+ ("A" if i < 5 else "B") |
80
|
|
|
) |
81
|
|
|
for i in range(10) |
82
|
|
|
] |
83
|
|
|
ensy.add(*nodes) |
84
|
|
|
for group in ["Foo", "Bar", "A", "B"]: |
85
|
|
|
assert len(ensy.groups[group]) == 5, ( |
86
|
|
|
"\n Failed testing length of group '{}'." |
87
|
|
|
+ "\n Expected: 5" |
88
|
|
|
+ "\n Got : {}" |
89
|
|
|
+ "\n Group : {}" |
90
|
|
|
).format( |
91
|
|
|
group, |
92
|
|
|
len(ensy.groups[group]), |
93
|
|
|
sorted([e.label for e in ensy.groups[group]]), |
94
|
|
|
) |
95
|
|
|
|
96
|
|
|
def test_grouping_filter_parameter(self): |
97
|
|
|
g1 = Grouping( |
98
|
|
|
key=lambda e: "The Special One", |
99
|
|
|
filter=lambda e: "special" in str(e), |
100
|
|
|
) |
101
|
|
|
g2 = Nodes( |
102
|
|
|
key=lambda e: "A Subset", filter=lambda e: "subset" in str(e) |
103
|
|
|
) |
104
|
|
|
ensys = es.EnergySystem(groupings=[g1, g2]) |
105
|
|
|
special = Node(label="special") |
106
|
|
|
subset = set(Node(label="subset: {}".format(i)) for i in range(10)) |
107
|
|
|
others = set(Node(label="other: {}".format(i)) for i in range(10)) |
108
|
|
|
ensys.add(special, *subset) |
109
|
|
|
ensys.add(*others) |
110
|
|
|
assert ensys.groups["The Special One"] == special |
111
|
|
|
assert ensys.groups["A Subset"] == subset |
112
|
|
|
|
113
|
|
|
def test_proper_filtering(self): |
114
|
|
|
"""`Grouping.filter` should not be "all or nothing". |
115
|
|
|
|
116
|
|
|
There was a bug where, if `Grouping.filter` returned `False` only for |
117
|
|
|
some elements of `Grouping.value(e)`, those elements where actually |
118
|
|
|
retained. |
119
|
|
|
This test makes sure that the bug doesn't resurface again. |
120
|
|
|
""" |
121
|
|
|
g = Nodes( |
122
|
|
|
key="group", |
123
|
|
|
value=lambda _: {1, 2, 3, 4}, |
124
|
|
|
filter=lambda x: x % 2 == 0, |
125
|
|
|
) |
126
|
|
|
ensys = es.EnergySystem(groupings=[g]) |
127
|
|
|
special = Node(label="object") |
128
|
|
|
ensys.add(special) |
129
|
|
|
assert ensys.groups["group"] == {2, 4} |
130
|
|
|
|
131
|
|
|
def test_non_callable_group_keys(self): |
132
|
|
|
collect_everything = Nodes(key="everything") |
133
|
|
|
g1 = Grouping( |
134
|
|
|
key="The Special One", filter=lambda e: "special" in e.label |
135
|
|
|
) |
136
|
|
|
g2 = Nodes(key="A Subset", filter=lambda e: "subset" in e.label) |
137
|
|
|
ensys = es.EnergySystem(groupings=[g1, g2, collect_everything]) |
138
|
|
|
special = Node(label="special") |
139
|
|
|
subset = set(Node(label="subset: {}".format(i)) for i in range(2)) |
140
|
|
|
others = set(Node(label="other: {}".format(i)) for i in range(2)) |
141
|
|
|
everything = subset.union(others) |
142
|
|
|
everything.add(special) |
143
|
|
|
ensys.add(*everything) |
144
|
|
|
assert ensys.groups["The Special One"] == special |
145
|
|
|
assert ensys.groups["A Subset"] == subset |
146
|
|
|
assert ensys.groups["everything"] == everything |
147
|
|
|
|
148
|
|
|
def test_grouping_laziness(self): |
149
|
|
|
"""Energy system `groups` should be fully lazy. |
150
|
|
|
|
151
|
|
|
`Node`s added to an energy system should only be tested for and put |
152
|
|
|
into their respective groups right before the `groups` property of an |
153
|
|
|
energy system is accessed. |
154
|
|
|
""" |
155
|
|
|
group = "Group" |
156
|
|
|
g = Nodes(key=group, filter=lambda n: getattr(n, "group", False)) |
157
|
|
|
self.es = es.EnergySystem(groupings=[g]) |
158
|
|
|
buses = [Bus("Grouped"), Bus("Ungrouped one"), Bus("Ungrouped two")] |
159
|
|
|
self.es.add(buses[0]) |
160
|
|
|
buses[0].group = True |
161
|
|
|
self.es.add(*buses[1:]) |
162
|
|
|
assert group in self.es.groups, ( |
163
|
|
|
( |
164
|
|
|
"\nExpected to find\n\n `{!r}`\n\n" |
165
|
|
|
"in `es.groups`.\nGot:\n\n `{}`" |
166
|
|
|
).format( |
167
|
|
|
group, |
168
|
|
|
"\n ".join(pformat(set(self.es.groups.keys())).split("\n")), |
169
|
|
|
), |
170
|
|
|
) |
171
|
|
|
assert buses[0] in self.es.groups[group], ( |
172
|
|
|
"\nExpected\n\n `{}`\n\nin `es.groups['{}']`:\n\n `{}`".format( |
173
|
|
|
"\n ".join(pformat(buses[0]).split("\n")), |
174
|
|
|
group, |
175
|
|
|
"\n ".join(pformat(self.es.groups[group]).split("\n")), |
176
|
|
|
), |
177
|
|
|
) |
178
|
|
|
|
179
|
|
|
def test_constant_group_keys(self): |
180
|
|
|
"""Callable keys passed in as `constant_key` should not be called. |
181
|
|
|
|
182
|
|
|
The `constant_key` parameter can be used to specify callable group keys |
183
|
|
|
without having to worry about `Grouping`s trying to call them. This |
184
|
|
|
test makes sure that the parameter is handled correctly. |
185
|
|
|
""" |
186
|
|
|
|
187
|
|
|
def everything(): |
188
|
|
|
return "everything" |
189
|
|
|
|
190
|
|
|
collect_everything = Nodes(constant_key=everything) |
191
|
|
|
ensys = es.EnergySystem(groupings=[collect_everything]) |
192
|
|
|
node = Node(label="A Node") |
193
|
|
|
ensys.add(node) |
194
|
|
|
assert "everything" not in ensys.groups |
195
|
|
|
assert everything in ensys.groups |
196
|
|
|
assert ensys.groups[everything] == {node} |
197
|
|
|
assert everything() == "everything" |
198
|
|
|
|
199
|
|
|
def test_flows(self): |
200
|
|
|
key = object() |
201
|
|
|
ensys = es.EnergySystem(groupings=[Flows(key)]) |
202
|
|
|
bus = Bus(label="A Bus") |
203
|
|
|
node = Node(label="A Node", inputs={bus: None}, outputs={bus: None}) |
204
|
|
|
ensys.add(bus, node) |
205
|
|
|
assert ensys.groups[key] == set( |
206
|
|
|
chain(bus.inputs.values(), bus.outputs.values()) |
207
|
|
|
) |
208
|
|
|
|
209
|
|
|
def test_flows_with_nodes(self): |
210
|
|
|
key = object() |
211
|
|
|
ensys = es.EnergySystem(groupings=[FWNs(key)]) |
212
|
|
|
bus = Bus(label="A Bus") |
213
|
|
|
node = Node(label="A Node", inputs={bus: None}, outputs={bus: None}) |
214
|
|
|
ensys.add(bus, node) |
215
|
|
|
assert ensys.groups[key], { |
216
|
|
|
(bus, node, bus.outputs[node]), |
217
|
|
|
(node, bus, node.outputs[bus]), |
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
def test_that_node_additions_are_signalled(self): |
221
|
|
|
""" |
222
|
|
|
When a node gets `add`ed, a corresponding signal should be emitted. |
223
|
|
|
""" |
224
|
|
|
node = Node(label="Node") |
225
|
|
|
|
226
|
|
|
def subscriber(sender, **kwargs): |
227
|
|
|
assert sender is node |
228
|
|
|
assert kwargs["EnergySystem"] is self.es |
229
|
|
|
subscriber.called = True |
230
|
|
|
|
231
|
|
|
subscriber.called = False |
232
|
|
|
|
233
|
|
|
es.EnergySystem.signals[es.EnergySystem.add].connect( |
234
|
|
|
subscriber, sender=node |
235
|
|
|
) |
236
|
|
|
self.es.add(node) |
237
|
|
|
assert subscriber.called, ( |
238
|
|
|
"\nExpected `subscriber.called` to be `True`.\n" |
239
|
|
|
"Got {}.\n" |
240
|
|
|
"Probable reason: `subscriber` didn't get called." |
241
|
|
|
).format(subscriber.called) |
242
|
|
|
|