Total Complexity | 69 |
Total Lines | 563 |
Duplicated Lines | 4.62 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like tests.test_subscriptions often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import time |
||
2 | import pytest |
||
3 | from copy import copy |
||
4 | from asyncio import Future, sleep, wait_for, TimeoutError |
||
5 | from datetime import datetime, timedelta |
||
6 | |||
7 | import asyncua |
||
8 | from asyncua import ua |
||
9 | |||
10 | pytestmark = pytest.mark.asyncio |
||
11 | |||
12 | |||
13 | class SubHandler: |
||
14 | """ |
||
15 | Dummy subscription client |
||
16 | """ |
||
17 | |||
18 | def datachange_notification(self, node, val, data): |
||
19 | pass |
||
20 | |||
21 | def event_notification(self, event): |
||
22 | pass |
||
23 | |||
24 | |||
25 | class MySubHandler: |
||
26 | """ |
||
27 | More advanced subscription client using Future, so we can wait for events in tests |
||
28 | """ |
||
29 | |||
30 | def __init__(self): |
||
31 | self.future = Future() |
||
32 | |||
33 | def reset(self): |
||
34 | self.future = Future() |
||
35 | |||
36 | async def result(self): |
||
37 | return await wait_for(self.future, 2) |
||
38 | |||
39 | def datachange_notification(self, node, val, data): |
||
40 | self.future.set_result((node, val, data)) |
||
41 | |||
42 | def event_notification(self, event): |
||
43 | self.future.set_result(event) |
||
44 | |||
45 | |||
46 | class MySubHandler2: |
||
47 | def __init__(self): |
||
48 | self.results = [] |
||
49 | |||
50 | def datachange_notification(self, node, val, data): |
||
51 | self.results.append((node, val)) |
||
52 | |||
53 | def event_notification(self, event): |
||
54 | self.results.append(event) |
||
55 | |||
56 | |||
57 | class MySubHandlerCounter: |
||
58 | def __init__(self): |
||
59 | self.datachange_count = 0 |
||
60 | self.event_count = 0 |
||
61 | |||
62 | def datachange_notification(self, node, val, data): |
||
63 | self.datachange_count += 1 |
||
64 | |||
65 | def event_notification(self, event): |
||
66 | self.event_count += 1 |
||
67 | |||
68 | |||
69 | async def test_subscription_failure(opc): |
||
70 | myhandler = MySubHandler() |
||
71 | o = opc.opc.get_objects_node() |
||
72 | sub = await opc.opc.create_subscription(100, myhandler) |
||
73 | with pytest.raises(ua.UaStatusCodeError): |
||
74 | # we can only subscribe to variables so this should fail |
||
75 | handle1 = await sub.subscribe_data_change(o) |
||
76 | await sub.delete() |
||
77 | |||
78 | |||
79 | async def test_subscription_overload(opc): |
||
80 | nb = 10 |
||
81 | myhandler = SubHandler() |
||
82 | o = opc.opc.get_objects_node() |
||
83 | sub = await opc.opc.create_subscription(1, myhandler) |
||
84 | variables = [] |
||
85 | subs = [] |
||
86 | for i in range(nb): |
||
87 | v = await o.add_variable(3, f'SubscriptionVariableOverload{i}', 99) |
||
88 | variables.append(v) |
||
89 | for i in range(nb): |
||
90 | await sub.subscribe_data_change(variables) |
||
91 | for i in range(nb): |
||
92 | for j in range(nb): |
||
93 | await variables[i].set_value(j) |
||
94 | s = await opc.opc.create_subscription(1, myhandler) |
||
95 | await s.subscribe_data_change(variables) |
||
96 | subs.append(s) |
||
97 | await sub.subscribe_data_change(variables[i]) |
||
98 | for i in range(nb): |
||
99 | for j in range(nb): |
||
100 | await variables[i].set_value(j) |
||
101 | await sub.delete() |
||
102 | for s in subs: |
||
103 | await s.delete() |
||
104 | |||
105 | |||
106 | async def test_subscription_count(opc): |
||
107 | myhandler = MySubHandlerCounter() |
||
108 | sub = await opc.opc.create_subscription(1, myhandler) |
||
109 | o = opc.opc.get_objects_node() |
||
110 | var = await o.add_variable(3, 'SubVarCounter', 0.1) |
||
111 | await sub.subscribe_data_change(var) |
||
112 | nb = 12 |
||
113 | for i in range(nb): |
||
114 | val = await var.get_value() |
||
115 | await var.set_value(val + 1) |
||
116 | await sleep(0.2) # let last event arrive |
||
117 | assert nb + 1 == myhandler.datachange_count |
||
118 | await sub.delete() |
||
119 | |||
120 | |||
121 | async def test_subscription_count_list(opc): |
||
122 | myhandler = MySubHandlerCounter() |
||
123 | sub = await opc.opc.create_subscription(1, myhandler) |
||
124 | o = opc.opc.get_objects_node() |
||
125 | var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2]) |
||
126 | await sub.subscribe_data_change(var) |
||
127 | nb = 12 |
||
128 | for i in range(nb): |
||
129 | val = await var.get_value() |
||
130 | # we do not want to modify object in our db, we need a copy in order to generate event |
||
131 | val = copy(val) |
||
132 | val.append(i) |
||
133 | await var.set_value(copy(val)) |
||
134 | await sleep(0.2) # let last event arrive |
||
135 | assert nb + 1 == myhandler.datachange_count |
||
136 | await sub.delete() |
||
137 | |||
138 | |||
139 | async def test_subscription_count_no_change(opc): |
||
140 | myhandler = MySubHandlerCounter() |
||
141 | sub = await opc.opc.create_subscription(1, myhandler) |
||
142 | o = opc.opc.get_objects_node() |
||
143 | var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2]) |
||
144 | await sub.subscribe_data_change(var) |
||
145 | nb = 12 |
||
146 | for i in range(nb): |
||
147 | val = await var.get_value() |
||
148 | await var.set_value(val) |
||
149 | await sleep(0.2) # let last event arrive |
||
150 | assert 1 == myhandler.datachange_count |
||
151 | await sub.delete() |
||
152 | |||
153 | |||
154 | async def test_subscription_count_empty(opc): |
||
155 | myhandler = MySubHandlerCounter() |
||
156 | sub = await opc.opc.create_subscription(1, myhandler) |
||
157 | o = opc.opc.get_objects_node() |
||
158 | var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3]) |
||
159 | await sub.subscribe_data_change(var) |
||
160 | while True: |
||
161 | val = await var.get_value() |
||
162 | # we do not want to modify object in our db, we need a copy in order to generate event |
||
163 | val = copy(val) |
||
164 | val.pop() |
||
165 | await var.set_value(val, ua.VariantType.Double) |
||
166 | if not val: |
||
167 | break |
||
168 | await sleep(0.2) # let last event arrive |
||
169 | assert 4 == myhandler.datachange_count |
||
170 | await sub.delete() |
||
171 | |||
172 | |||
173 | async def test_subscription_overload_simple(opc): |
||
174 | nb = 10 |
||
175 | myhandler = MySubHandler() |
||
176 | o = opc.opc.get_objects_node() |
||
177 | sub = await opc.opc.create_subscription(1, myhandler) |
||
178 | variables = [] |
||
179 | for i in range(nb): |
||
180 | variables.append(await o.add_variable(3, f'SubVarOverload{i}', i)) |
||
181 | for i in range(nb): |
||
182 | await sub.subscribe_data_change(variables) |
||
183 | await sub.delete() |
||
184 | |||
185 | |||
186 | async def test_subscription_data_change(opc): |
||
187 | """ |
||
188 | test subscriptions. This is far too complicated for |
||
189 | a unittest but, setting up subscriptions requires a lot |
||
190 | of code, so when we first set it up, it is best |
||
191 | to test as many things as possible |
||
192 | """ |
||
193 | myhandler = MySubHandler() |
||
194 | o = opc.opc.get_objects_node() |
||
195 | # subscribe to a variable |
||
196 | startv1 = [1, 2, 3] |
||
197 | v1 = await o.add_variable(3, 'SubscriptionVariableV1', startv1) |
||
198 | sub = await opc.opc.create_subscription(100, myhandler) |
||
199 | handle1 = await sub.subscribe_data_change(v1) |
||
200 | # Now check we get the start value |
||
201 | node, val, data = await myhandler.result() |
||
202 | assert startv1 == val |
||
203 | assert v1 == node |
||
204 | myhandler.reset() # reset future object |
||
205 | # modify v1 and check we get value |
||
206 | await v1.set_value([5]) |
||
207 | node, val, data = await myhandler.result() |
||
208 | assert v1 == node |
||
209 | assert [5] == val |
||
210 | with pytest.raises(ua.UaStatusCodeError): |
||
211 | await sub.unsubscribe(999) # non existing handle |
||
212 | await sub.unsubscribe(handle1) |
||
213 | with pytest.raises(ua.UaStatusCodeError): |
||
214 | await sub.unsubscribe(handle1) # second try should fail |
||
215 | await sub.delete() |
||
216 | with pytest.raises(ua.UaStatusCodeError): |
||
217 | await sub.unsubscribe(handle1) # sub does not exist anymore |
||
218 | |||
219 | |||
220 | async def test_subscription_data_change_bool(opc): |
||
221 | """ |
||
222 | test subscriptions. This is far too complicated for |
||
223 | a unittest but, setting up subscriptions requires a lot |
||
224 | of code, so when we first set it up, it is best |
||
225 | to test as many things as possible |
||
226 | """ |
||
227 | myhandler = MySubHandler() |
||
228 | o = opc.opc.get_objects_node() |
||
229 | # subscribe to a variable |
||
230 | startv1 = True |
||
231 | v1 = await o.add_variable(3, 'SubscriptionVariableBool', startv1) |
||
232 | sub = await opc.opc.create_subscription(100, myhandler) |
||
233 | handle1 = await sub.subscribe_data_change(v1) |
||
234 | # Now check we get the start value |
||
235 | node, val, data = await myhandler.result() |
||
236 | assert startv1 == val |
||
237 | assert v1 == node |
||
238 | myhandler.reset() # reset future object |
||
239 | # modify v1 and check we get value |
||
240 | await v1.set_value(False) |
||
241 | node, val, data = await myhandler.result() |
||
242 | assert v1 == node |
||
243 | assert val is False |
||
244 | await sub.delete() # should delete our monitoreditem too |
||
245 | |||
246 | |||
247 | async def test_subscription_data_change_many(opc): |
||
248 | """ |
||
249 | test subscriptions. This is far too complicated for |
||
250 | a unittest but, setting up subscriptions requires a lot |
||
251 | of code, so when we first set it up, it is best |
||
252 | to test as many things as possible |
||
253 | """ |
||
254 | myhandler = MySubHandler2() |
||
255 | o = opc.opc.get_objects_node() |
||
256 | startv1 = True |
||
257 | v1 = await o.add_variable(3, 'SubscriptionVariableMany1', startv1) |
||
258 | startv2 = [1.22, 1.65] |
||
259 | v2 = await o.add_variable(3, 'SubscriptionVariableMany2', startv2) |
||
260 | sub = await opc.opc.create_subscription(100, myhandler) |
||
261 | handle1, handle2 = await sub.subscribe_data_change([v1, v2]) |
||
262 | # Now check we get the start values |
||
263 | nodes = [v1, v2] |
||
264 | count = 0 |
||
265 | while not len(myhandler.results) > 1: |
||
266 | count += 1 |
||
267 | await sleep(0.1) |
||
268 | if count > 100: |
||
269 | raise RuntimeError("Did not get result from subscription") |
||
270 | for node, val in myhandler.results: |
||
271 | assert node in nodes |
||
272 | nodes.remove(node) |
||
273 | if node == v1: |
||
274 | assert val == startv1 |
||
275 | elif node == v2: |
||
276 | assert val == startv2 |
||
277 | else: |
||
278 | raise RuntimeError("Error node {0} is neither {1} nor {2}".format(node, v1, v2)) |
||
279 | await sub.delete() |
||
280 | |||
281 | |||
282 | async def test_subscribe_server_time(opc): |
||
283 | myhandler = MySubHandler() |
||
284 | server_time_node = opc.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) |
||
285 | sub = await opc.opc.create_subscription(200, myhandler) |
||
286 | handle = await sub.subscribe_data_change(server_time_node) |
||
287 | node, val, data = await myhandler.result() |
||
288 | assert server_time_node == node |
||
289 | delta = datetime.utcnow() - val |
||
290 | assert delta < timedelta(seconds=2) |
||
291 | await sub.unsubscribe(handle) |
||
292 | await sub.delete() |
||
293 | |||
294 | |||
295 | async def test_create_delete_subscription(opc): |
||
296 | o = opc.opc.get_objects_node() |
||
297 | v = await o.add_variable(3, 'SubscriptionVariable', [1, 2, 3]) |
||
298 | sub = await opc.opc.create_subscription(100, MySubHandler()) |
||
299 | handle = await sub.subscribe_data_change(v) |
||
300 | await sleep(0.1) |
||
301 | await sub.unsubscribe(handle) |
||
302 | await sub.delete() |
||
303 | |||
304 | |||
305 | async def test_subscribe_events(opc): |
||
306 | sub = await opc.opc.create_subscription(100, MySubHandler()) |
||
307 | handle = await sub.subscribe_events() |
||
308 | await sleep(0.1) |
||
309 | await sub.unsubscribe(handle) |
||
310 | await sub.delete() |
||
311 | |||
312 | |||
313 | async def test_subscribe_events_to_wrong_node(opc): |
||
314 | sub = await opc.opc.create_subscription(100, MySubHandler()) |
||
315 | with pytest.raises(ua.UaStatusCodeError): |
||
316 | handle = await sub.subscribe_events(opc.opc.get_node("i=85")) |
||
317 | o = opc.opc.get_objects_node() |
||
318 | v = await o.add_variable(3, 'VariableNoEventNofierAttribute', 4) |
||
319 | with pytest.raises(ua.UaStatusCodeError): |
||
320 | handle = await sub.subscribe_events(v) |
||
321 | await sub.delete() |
||
322 | |||
323 | |||
324 | async def test_get_event_from_type_node_BaseEvent(opc): |
||
325 | etype = opc.opc.get_node(ua.ObjectIds.BaseEventType) |
||
326 | properties = await asyncua.common.events.get_event_properties_from_type_node(etype) |
||
327 | for child in await etype.get_properties(): |
||
328 | assert child in properties |
||
329 | |||
330 | |||
331 | async def test_get_event_from_type_node_CustomEvent(opc): |
||
332 | etype = await opc.server.create_custom_event_type( |
||
333 | 2, 'MyEvent', ua.ObjectIds.AuditEventType, |
||
334 | [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
335 | ) |
||
336 | properties = await asyncua.common.events.get_event_properties_from_type_node(etype) |
||
337 | for child in await opc.opc.get_node(ua.ObjectIds.BaseEventType).get_properties(): |
||
338 | assert child in properties |
||
339 | for child in await opc.opc.get_node(ua.ObjectIds.AuditEventType).get_properties(): |
||
340 | assert child in properties |
||
341 | for child in await opc.opc.get_node(etype.nodeid).get_properties(): |
||
342 | assert child in properties |
||
343 | assert await etype.get_child("2:PropertyNum") in properties |
||
344 | assert await etype.get_child("2:PropertyString") in properties |
||
345 | |||
346 | |||
347 | async def test_events_default(opc): |
||
348 | evgen = await opc.server.get_event_generator() |
||
349 | myhandler = MySubHandler() |
||
350 | sub = await opc.opc.create_subscription(100, myhandler) |
||
351 | handle = await sub.subscribe_events() |
||
352 | tid = datetime.utcnow() |
||
353 | msg = "this is my msg " |
||
354 | evgen.trigger(tid, msg) |
||
355 | ev = await myhandler.result() |
||
356 | assert ev is not None # we did not receive event |
||
357 | assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType |
||
358 | assert 1 == ev.Severity |
||
359 | assert (await opc.opc.get_server_node().get_browse_name()).Name == ev.SourceName |
||
360 | assert opc.opc.get_server_node().nodeid == ev.SourceNode |
||
361 | assert msg == ev.Message.Text |
||
362 | assert tid == ev.Time |
||
363 | await sub.unsubscribe(handle) |
||
364 | await sub.delete() |
||
365 | |||
366 | |||
367 | async def test_events_MyObject(opc): |
||
368 | objects = opc.server.get_objects_node() |
||
369 | o = await objects.add_object(3, 'MyObject') |
||
370 | evgen = await opc.server.get_event_generator(emitting_node=o) |
||
371 | myhandler = MySubHandler() |
||
372 | sub = await opc.opc.create_subscription(100, myhandler) |
||
373 | handle = await sub.subscribe_events(o) |
||
374 | tid = datetime.utcnow() |
||
375 | msg = "this is my msg " |
||
376 | evgen.trigger(tid, msg) |
||
377 | ev = await myhandler.result() |
||
378 | assert ev is not None # we did not receive event |
||
379 | assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType |
||
380 | assert 1 == ev.Severity |
||
381 | assert 'MyObject' == ev.SourceName |
||
382 | assert o.nodeid == ev.SourceNode |
||
383 | assert msg == ev.Message.Text |
||
384 | assert tid == ev.Time |
||
385 | await sub.unsubscribe(handle) |
||
386 | await sub.delete() |
||
387 | |||
388 | |||
389 | async def test_events_wrong_source(opc): |
||
390 | objects = opc.server.get_objects_node() |
||
391 | o = await objects.add_object(3, 'MyObject') |
||
392 | evgen = await opc.server.get_event_generator(emitting_node=o) |
||
393 | myhandler = MySubHandler() |
||
394 | sub = await opc.opc.create_subscription(100, myhandler) |
||
395 | handle = await sub.subscribe_events() |
||
396 | tid = datetime.utcnow() |
||
397 | msg = "this is my msg " |
||
398 | evgen.trigger(tid, msg) |
||
399 | with pytest.raises(TimeoutError): # we should not receive event |
||
400 | ev = await myhandler.result() |
||
401 | await sub.unsubscribe(handle) |
||
402 | await sub.delete() |
||
403 | |||
404 | |||
405 | async def test_events_CustomEvent(opc): |
||
406 | etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, |
||
407 | [('PropertyNum', ua.VariantType.Float), |
||
408 | ('PropertyString', ua.VariantType.String)]) |
||
409 | evgen = await opc.server.get_event_generator(etype) |
||
410 | myhandler = MySubHandler() |
||
411 | sub = await opc.opc.create_subscription(100, myhandler) |
||
412 | handle = await sub.subscribe_events(evtypes=etype) |
||
413 | propertynum = 2 |
||
414 | propertystring = "This is my test" |
||
415 | evgen.event.PropertyNum = propertynum |
||
416 | evgen.event.PropertyString = propertystring |
||
417 | serverity = 500 |
||
418 | evgen.event.Severity = serverity |
||
419 | tid = datetime.utcnow() |
||
420 | msg = "this is my msg " |
||
421 | evgen.trigger(tid, msg) |
||
422 | ev = await myhandler.result() |
||
423 | assert ev is not None # we did not receive event |
||
424 | assert etype.nodeid == ev.EventType |
||
425 | assert serverity == ev.Severity |
||
426 | assert (await opc.opc.get_server_node().get_browse_name()).Name == ev.SourceName |
||
427 | assert opc.opc.get_server_node().nodeid == ev.SourceNode |
||
428 | assert msg == ev.Message.Text |
||
429 | assert tid == ev.Time |
||
430 | assert propertynum == ev.PropertyNum |
||
431 | assert propertystring == ev.PropertyString |
||
432 | await sub.unsubscribe(handle) |
||
433 | await sub.delete() |
||
434 | |||
435 | |||
436 | async def test_events_CustomEvent_MyObject(opc): |
||
437 | objects = opc.server.get_objects_node() |
||
438 | o = await objects.add_object(3, 'MyObject') |
||
439 | etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, |
||
440 | [('PropertyNum', ua.VariantType.Float), |
||
441 | ('PropertyString', ua.VariantType.String)]) |
||
442 | evgen = await opc.server.get_event_generator(etype, emitting_node=o) |
||
443 | myhandler = MySubHandler() |
||
444 | sub = await opc.opc.create_subscription(100, myhandler) |
||
445 | handle = await sub.subscribe_events(o, etype) |
||
446 | propertynum = 2 |
||
447 | propertystring = "This is my test" |
||
448 | evgen.event.PropertyNum = propertynum |
||
449 | evgen.event.PropertyString = propertystring |
||
450 | tid = datetime.utcnow() |
||
451 | msg = "this is my msg " |
||
452 | evgen.trigger(tid, msg) |
||
453 | ev = await myhandler.result() |
||
454 | assert ev is not None # we did not receive event |
||
455 | assert etype.nodeid == ev.EventType |
||
456 | assert 1 == ev.Severity |
||
457 | assert 'MyObject' == ev.SourceName |
||
458 | assert o.nodeid == ev.SourceNode |
||
459 | assert msg == ev.Message.Text |
||
460 | assert tid == ev.Time |
||
461 | assert propertynum == ev.PropertyNum |
||
462 | assert propertystring == ev.PropertyString |
||
463 | await sub.unsubscribe(handle) |
||
464 | await sub.delete() |
||
465 | |||
466 | |||
467 | async def test_several_different_events(opc): |
||
468 | objects = opc.server.get_objects_node() |
||
469 | o = await objects.add_object(3, 'MyObject') |
||
470 | etype1 = await opc.server.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, |
||
471 | [('PropertyNum', ua.VariantType.Float), |
||
472 | ('PropertyString', ua.VariantType.String)]) |
||
473 | evgen1 = await opc.server.get_event_generator(etype1, o) |
||
474 | etype2 = await opc.server.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, |
||
475 | [('PropertyNum', ua.VariantType.Float), |
||
476 | ('PropertyString', ua.VariantType.String)]) |
||
477 | evgen2 = await opc.server.get_event_generator(etype2, o) |
||
478 | myhandler = MySubHandler2() |
||
479 | sub = await opc.opc.create_subscription(100, myhandler) |
||
480 | handle = await sub.subscribe_events(o, etype1) |
||
481 | propertynum1 = 1 |
||
482 | propertystring1 = "This is my test 1" |
||
483 | evgen1.event.PropertyNum = propertynum1 |
||
484 | evgen1.event.PropertyString = propertystring1 |
||
485 | propertynum2 = 2 |
||
486 | propertystring2 = "This is my test 2" |
||
487 | evgen2.event.PropertyNum = propertynum2 |
||
488 | evgen2.event.PropertyString = propertystring2 |
||
489 | for i in range(3): |
||
490 | evgen1.trigger() |
||
491 | evgen2.trigger() |
||
492 | await sleep(1) # ToDo: replace |
||
493 | assert 3 == len(myhandler.results) |
||
494 | ev = myhandler.results[-1] |
||
495 | assert etype1.nodeid == ev.EventType |
||
496 | handle = await sub.subscribe_events(o, etype2) |
||
497 | for i in range(4): |
||
498 | evgen1.trigger() |
||
499 | evgen2.trigger() |
||
500 | await sleep(1) # ToDo: replace |
||
501 | ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid] |
||
502 | ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid] |
||
503 | assert 11 == len(myhandler.results) |
||
504 | assert 4 == len(ev2s) |
||
505 | assert 7 == len(ev1s) |
||
506 | await sub.unsubscribe(handle) |
||
507 | await sub.delete() |
||
508 | |||
509 | |||
510 | async def test_several_different_events_2(opc): |
||
511 | objects = opc.server.get_objects_node() |
||
512 | o = await objects.add_object(3, 'MyObject') |
||
513 | etype1 = await opc.server.create_custom_event_type( |
||
514 | 2, 'MyEvent1', ua.ObjectIds.BaseEventType, |
||
515 | [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
516 | ) |
||
517 | evgen1 = await opc.server.get_event_generator(etype1, o) |
||
518 | etype2 = await opc.server.create_custom_event_type( |
||
519 | 2, 'MyEvent2', ua.ObjectIds.BaseEventType, |
||
520 | [('PropertyNum2', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
521 | ) |
||
522 | evgen2 = await opc.server.get_event_generator(etype2, o) |
||
523 | etype3 = await opc.server.create_custom_event_type( |
||
524 | 2, 'MyEvent3', ua.ObjectIds.BaseEventType, |
||
525 | [('PropertyNum3', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)] |
||
526 | ) |
||
527 | evgen3 = await opc.server.get_event_generator(etype3, o) |
||
528 | myhandler = MySubHandler2() |
||
529 | sub = await opc.opc.create_subscription(100, myhandler) |
||
530 | handle = await sub.subscribe_events(o, [etype1, etype3]) |
||
531 | propertynum1 = 1 |
||
532 | propertystring1 = "This is my test 1" |
||
533 | evgen1.event.PropertyNum = propertynum1 |
||
534 | evgen1.event.PropertyString = propertystring1 |
||
535 | propertynum2 = 2 |
||
536 | propertystring2 = "This is my test 2" |
||
537 | evgen2.event.PropertyNum2 = propertynum2 |
||
538 | evgen2.event.PropertyString = propertystring2 |
||
539 | propertynum3 = 3 |
||
540 | propertystring3 = "This is my test 3" |
||
541 | evgen3.event.PropertyNum3 = propertynum3 |
||
542 | evgen3.event.PropertyString = propertystring2 |
||
543 | for i in range(3): |
||
544 | evgen1.trigger() |
||
545 | evgen2.trigger() |
||
546 | evgen3.trigger() |
||
547 | evgen3.event.PropertyNum3 = 9999 |
||
548 | evgen3.trigger() |
||
549 | await sleep(1) |
||
550 | ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid] |
||
551 | ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid] |
||
552 | ev3s = [ev for ev in myhandler.results if ev.EventType == etype3.nodeid] |
||
553 | assert 7 == len(myhandler.results) |
||
554 | assert 3 == len(ev1s) |
||
555 | assert 0 == len(ev2s) |
||
556 | assert 4 == len(ev3s) |
||
557 | assert propertynum1 == ev1s[0].PropertyNum |
||
558 | assert propertynum3 == ev3s[0].PropertyNum3 |
||
559 | assert 9999 == ev3s[-1].PropertyNum3 |
||
560 | assert ev1s[0].PropertyNum3 is None |
||
561 | await sub.unsubscribe(handle) |
||
562 | await sub.delete() |
||
563 |