Completed
Pull Request — master (#179)
by Olivier
03:06
created

SubscriptionTests.test_events_MyObject()   B

Complexity

Conditions 1

Size

Total Lines 25

Duplication

Lines 25
Ratio 100 %

Code Coverage

Tests 20
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 25
loc 25
rs 8.8571
ccs 20
cts 20
cp 1
crap 1
1
2 1
from concurrent.futures import Future, TimeoutError
3 1
import time
4 1
from datetime import datetime, timedelta
5
6 1
import opcua
7 1
from opcua import ua
8
9
10 1
class SubHandler():
11
12
    '''
13
        Dummy subscription client
14
    '''
15
16 1
    def datachange_notification(self, node, val, data):
17
        pass
18
19 1
    def event_notification(self, event):
20
        pass
21
22
23 1
class MySubHandler():
24
25
    '''
26
    More advanced subscription client using Future, so we can wait for events in tests
27
    '''
28
29 1
    def __init__(self):
30 1
        self.future = Future()
31
32 1
    def reset(self):
33 1
        self.future = Future()
34
35 1
    def datachange_notification(self, node, val, data):
36 1
        self.future.set_result((node, val, data))
37
38 1
    def event_notification(self, event):
39 1
        self.future.set_result(event)
40
41
42 1
class MySubHandler2():
43 1
    def __init__(self):
44 1
        self.results = []
45
46 1
    def datachange_notification(self, node, val, data):
47 1
        self.results.append((node, val))
48
49 1
    def event_notification(self, event):
50 1
        self.results.append(event)
51
52
53 1
class MySubHandlerCounter():
54 1
    def __init__(self):
55 1
        self.datachange_count = 0
56 1
        self.event_count = 0
57
58 1
    def datachange_notification(self, node, val, data):
59 1
        self.datachange_count += 1
60
61 1
    def event_notification(self, event):
62
        self.event_count += 1
63
64
65 1
class SubscriptionTests(object):
66
67 1
    def test_subscription_failure(self):
68 1
        myhandler = MySubHandler()
69 1
        o = self.opc.get_objects_node()
70 1
        sub = self.opc.create_subscription(100, myhandler)
71 1
        with self.assertRaises(ua.UaStatusCodeError):
72 1
            handle1 = sub.subscribe_data_change(o) # we can only subscribe to variables so this should fail
73 1
        sub.delete()
74
75 1
    def test_subscription_overload(self):
76 1
        nb = 10
77 1
        myhandler = MySubHandler()
78 1
        o = self.opc.get_objects_node()
79 1
        sub = self.opc.create_subscription(1, myhandler)
80 1
        variables = []
81 1
        subs = []
82 1
        for i in range(nb):
83 1
            v = o.add_variable(3, 'SubscriptionVariableOverload' + str(i), 99)
84 1
            variables.append(v)
85 1
        for i in range(nb):
86 1
            sub.subscribe_data_change(variables)
87 1
        for i in range(nb):
88 1
            for j in range(nb):
89 1
                variables[i].set_value(j)
90 1
            s = self.opc.create_subscription(1, myhandler)
91 1
            s.subscribe_data_change(variables)
92 1
            subs.append(s)
93 1
            sub.subscribe_data_change(variables[i])
94 1
        for i in range(nb):
95 1
            for j in range(nb):
96 1
                variables[i].set_value(j)
97 1
        sub.delete()
98 1
        for s in subs:
99 1
            s.delete()
100
101 1
    def test_subscription_count(self):
102 1
        myhandler = MySubHandlerCounter()
103 1
        sub = self.opc.create_subscription(1, myhandler)
104 1
        o = self.opc.get_objects_node()
105 1
        var = o.add_variable(3, 'SubVarCounter', 0.1)
106 1
        sub.subscribe_data_change(var)
107 1
        nb = 12
108 1
        for i in range(nb):
109 1
            val = var.get_value()
110 1
            var.set_value(val +1)
111 1
        time.sleep(0.2)  # let last event arrive
112 1
        self.assertEqual(myhandler.datachange_count, nb + 1)
113 1
        sub.delete()
114
115 1 View Code Duplication
    def test_subscription_count_list(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
116 1
        myhandler = MySubHandlerCounter()
117 1
        sub = self.opc.create_subscription(1, myhandler)
118 1
        o = self.opc.get_objects_node()
119 1
        var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
120 1
        sub.subscribe_data_change(var)
121 1
        nb = 12
122 1
        for i in range(nb):
123 1
            val = var.get_value()
124 1
            val.append(i)
125 1
            var.set_value(val)
126 1
        time.sleep(0.2)  # let last event arrive
127 1
        self.assertEqual(myhandler.datachange_count, nb + 1)
128 1
        sub.delete()
129
130 1
    def test_subscription_count_no_change(self):
131 1
        myhandler = MySubHandlerCounter()
132 1
        sub = self.opc.create_subscription(1, myhandler)
133 1
        o = self.opc.get_objects_node()
134 1
        var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
135 1
        sub.subscribe_data_change(var)
136 1
        nb = 12
137 1
        for i in range(nb):
138 1
            val = var.get_value()
139 1
            var.set_value(val)
140 1
        time.sleep(0.2)  # let last event arrive
141 1
        self.assertEqual(myhandler.datachange_count, 1)
142 1
        sub.delete()
143
144 1 View Code Duplication
    def test_subscription_count_empty(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
145 1
        myhandler = MySubHandlerCounter()
146 1
        sub = self.opc.create_subscription(1, myhandler)
147 1
        o = self.opc.get_objects_node()
148 1
        var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
149 1
        sub.subscribe_data_change(var)
150 1
        while True:
151 1
            val = var.get_value()
152 1
            val.pop()
153 1
            var.set_value(val, ua.VariantType.Double)
154 1
            if not val:
155 1
                break
156 1
        time.sleep(0.2)  # let last event arrive
157 1
        self.assertEqual(myhandler.datachange_count, 4)
158 1
        sub.delete()
159
160 1
    def test_subscription_overload_simple(self):
161 1
        nb = 10
162 1
        myhandler = MySubHandler()
163 1
        o = self.opc.get_objects_node()
164 1
        sub = self.opc.create_subscription(1, myhandler)
165 1
        variables = [o.add_variable(3, 'SubVarOverload' + str(i), i) for i in range(nb)]
166 1
        for i in range(nb):
167 1
            sub.subscribe_data_change(variables)
168 1
        sub.delete()
169
170 1
    def test_subscription_data_change(self):
171
        '''
172
        test subscriptions. This is far too complicated for
173
        a unittest but, setting up subscriptions requires a lot
174
        of code, so when we first set it up, it is best
175
        to test as many things as possible
176
        '''
177 1
        myhandler = MySubHandler()
178
179 1
        o = self.opc.get_objects_node()
180
181
        # subscribe to a variable
182 1
        startv1 = [1, 2, 3]
183 1
        v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
184 1
        sub = self.opc.create_subscription(100, myhandler)
185 1
        handle1 = sub.subscribe_data_change(v1)
186
187
        # Now check we get the start value
188 1
        node, val, data = myhandler.future.result()
189 1
        self.assertEqual(val, startv1)
190 1
        self.assertEqual(node, v1)
191
192 1
        myhandler.reset()  # reset future object
193
194
        # modify v1 and check we get value
195 1
        v1.set_value([5])
196 1
        node, val, data = myhandler.future.result()
197
198 1
        self.assertEqual(node, v1)
199 1
        self.assertEqual(val, [5])
200
201 1
        with self.assertRaises(ua.UaStatusCodeError):
202 1
            sub.unsubscribe(999)  # non existing handle
203 1
        sub.unsubscribe(handle1)
204 1
        with self.assertRaises(ua.UaStatusCodeError):
205 1
            sub.unsubscribe(handle1)  # second try should fail
206 1
        sub.delete()
207 1
        with self.assertRaises(ua.UaStatusCodeError):
208 1
            sub.unsubscribe(handle1)  # sub does not exist anymore
209
210
211 1
    def test_subscription_data_change_bool(self):
212
        '''
213
        test subscriptions. This is far too complicated for
214
        a unittest but, setting up subscriptions requires a lot
215
        of code, so when we first set it up, it is best
216
        to test as many things as possible
217
        '''
218 1
        myhandler = MySubHandler()
219
220 1
        o = self.opc.get_objects_node()
221
222
        # subscribe to a variable
223 1
        startv1 = True
224 1
        v1 = o.add_variable(3, 'SubscriptionVariableBool', startv1)
225 1
        sub = self.opc.create_subscription(100, myhandler)
226 1
        handle1 = sub.subscribe_data_change(v1)
227
228
        # Now check we get the start value
229 1
        node, val, data = myhandler.future.result()
230 1
        self.assertEqual(val, startv1)
231 1
        self.assertEqual(node, v1)
232
233 1
        myhandler.reset()  # reset future object
234
235
        # modify v1 and check we get value
236 1
        v1.set_value(False)
237 1
        node, val, data = myhandler.future.result()
238 1
        self.assertEqual(node, v1)
239 1
        self.assertEqual(val, False)
240
241 1
        sub.delete() # should delete our monitoreditem too
242
243 1
    def test_subscription_data_change_many(self):
244
        '''
245
        test subscriptions. This is far too complicated for
246
        a unittest but, setting up subscriptions requires a lot
247
        of code, so when we first set it up, it is best
248
        to test as many things as possible
249
        '''
250 1
        myhandler = MySubHandler2()
251 1
        o = self.opc.get_objects_node()
252
253 1
        startv1 = True
254 1
        v1 = o.add_variable(3, 'SubscriptionVariableMany1', startv1)
255 1
        startv2 = [1.22, 1.65]
256 1
        v2 = o.add_variable(3, 'SubscriptionVariableMany2', startv2)
257
258 1
        sub = self.opc.create_subscription(100, myhandler)
259 1
        handle1, handle2 = sub.subscribe_data_change([v1, v2])
260
261
        # Now check we get the start values
262 1
        nodes = [v1, v2]
263
264 1
        count = 0
265 1
        while not len(myhandler.results) > 1:
266 1
            count += 1
267 1
            time.sleep(0.1)
268 1
            if count > 100:
269
                self.fail("Did not get result from subscription")
270 1
        for node, val in myhandler.results:
271 1
            self.assertIn(node, nodes)
272 1
            nodes.remove(node)
273 1
            if node == v1:
274 1
                self.assertEqual(startv1, val)
275 1
            elif node == v2:
276 1
                self.assertEqual(startv2, val)
277
            else:
278
                self.fail("Error node {} is neither {} nor {}".format(node, v1, v2))
279
280 1
        sub.delete()
281
282 1
    def test_subscribe_server_time(self):
283 1
        myhandler = MySubHandler()
284
285 1
        server_time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
286
287 1
        sub = self.opc.create_subscription(200, myhandler)
288 1
        handle = sub.subscribe_data_change(server_time_node)
289
290 1
        node, val, data = myhandler.future.result()
291 1
        self.assertEqual(node, server_time_node)
292 1
        delta = datetime.utcnow() - val
293 1
        self.assertTrue(delta < timedelta(seconds=2))
294
295 1
        sub.unsubscribe(handle)
296 1
        sub.delete()
297
298
299
300 1
    def test_create_delete_subscription(self):
301 1
        o = self.opc.get_objects_node()
302 1
        v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
303 1
        sub = self.opc.create_subscription(100, MySubHandler())
304 1
        handle = sub.subscribe_data_change(v)
305 1
        time.sleep(0.1)
306 1
        sub.unsubscribe(handle)
307 1
        sub.delete()
308
309 1
    def test_subscribe_events(self):
310 1
        sub = self.opc.create_subscription(100, MySubHandler())
311 1
        handle = sub.subscribe_events()
312 1
        time.sleep(0.1)
313 1
        sub.unsubscribe(handle)
314 1
        sub.delete()
315
316 1
    def test_subscribe_events_to_wrong_node(self):
317 1
        sub = self.opc.create_subscription(100, MySubHandler())
318 1
        with self.assertRaises(ua.UaStatusCodeError):
319 1
            handle = sub.subscribe_events(self.opc.get_node("i=85"))
320 1
        o = self.opc.get_objects_node()
321 1
        v = o.add_variable(3, 'VariableNoEventNofierAttribute', 4)
322 1
        with self.assertRaises(ua.UaStatusCodeError):
323 1
            handle = sub.subscribe_events(v)
324 1
        sub.delete()
325
326 1
    def test_get_event_from_type_node_BaseEvent(self):
327 1
        etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
328 1
        properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
329 1
        for child in etype.get_properties():
330 1
            self.assertTrue(child in properties)
331
332 1
    def test_get_event_from_type_node_CustomEvent(self):
333 1
        etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.AuditEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
334
335 1
        properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
336
337 1
        for child in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
338 1
            self.assertTrue(child in properties)
339 1
        for child in self.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
340 1
            self.assertTrue(child in properties)
341 1
        for child in self.opc.get_node(etype.nodeid).get_properties():
342 1
                self.assertTrue(child in properties)
343
344 1
        self.assertTrue(etype.get_child("2:PropertyNum") in properties)
345 1
        self.assertTrue(etype.get_child("2:PropertyString") in properties)
346
347 1 View Code Duplication
    def test_events_default(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
348 1
        evgen = self.srv.get_event_generator()
349
350 1
        myhandler = MySubHandler()
351 1
        sub = self.opc.create_subscription(100, myhandler)
352 1
        handle = sub.subscribe_events()
353
354 1
        tid = datetime.utcnow()
355 1
        msg = b"this is my msg "
356 1
        evgen.trigger(tid, msg)
357
358 1
        ev = myhandler.future.result()
359 1
        self.assertIsNot(ev, None)  # we did not receive event
360 1
        self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
361 1
        self.assertEqual(ev.Severity, 1)
362 1
        self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
363 1
        self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
364 1
        self.assertEqual(ev.Message.Text, msg)
365 1
        self.assertEqual(ev.Time, tid)
366
367
        # time.sleep(0.1)
368 1
        sub.unsubscribe(handle)
369 1
        sub.delete()
370
371 1 View Code Duplication
    def test_events_MyObject(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
372 1
        objects = self.srv.get_objects_node()
373 1
        o = objects.add_object(3, 'MyObject')
374 1
        evgen = self.srv.get_event_generator(source=o)
375
376 1
        myhandler = MySubHandler()
377 1
        sub = self.opc.create_subscription(100, myhandler)
378 1
        handle = sub.subscribe_events(o)
379
380 1
        tid = datetime.utcnow()
381 1
        msg = b"this is my msg "
382 1
        evgen.trigger(tid, msg)
383
384 1
        ev = myhandler.future.result(10)
385 1
        self.assertIsNot(ev, None)  # we did not receive event
386 1
        self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
387 1
        self.assertEqual(ev.Severity, 1)
388 1
        self.assertEqual(ev.SourceName, 'MyObject')
389 1
        self.assertEqual(ev.SourceNode, o.nodeid)
390 1
        self.assertEqual(ev.Message.Text, msg)
391 1
        self.assertEqual(ev.Time, tid)
392
393
        # time.sleep(0.1)
394 1
        sub.unsubscribe(handle)
395 1
        sub.delete()
396
397 1
    def test_events_wrong_source(self):
398 1
        objects = self.srv.get_objects_node()
399 1
        o = objects.add_object(3, 'MyObject')
400 1
        evgen = self.srv.get_event_generator(source=o)
401
402 1
        myhandler = MySubHandler()
403 1
        sub = self.opc.create_subscription(100, myhandler)
404 1
        handle = sub.subscribe_events()
405
406 1
        tid = datetime.utcnow()
407 1
        msg = b"this is my msg "
408 1
        evgen.trigger(tid, msg)
409
410 1
        with self.assertRaises(TimeoutError):  # we should not receive event
411 1
            ev = myhandler.future.result(2)
412
413
        # time.sleep(0.1)
414 1
        sub.unsubscribe(handle)
415 1
        sub.delete()
416
417 1 View Code Duplication
    def test_events_CustomEvent(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
418 1
        etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
419 1
        evgen = self.srv.get_event_generator(etype)
420
421 1
        myhandler = MySubHandler()
422 1
        sub = self.opc.create_subscription(100, myhandler)
423 1
        handle = sub.subscribe_events(evtype=etype)
424
425 1
        propertynum = 2
426 1
        propertystring = "This is my test"
427 1
        evgen.event.PropertyNum = propertynum
428 1
        evgen.event.PropertyString = propertystring
429 1
        serverity = 500
430 1
        evgen.event.Severity = serverity
431 1
        tid = datetime.utcnow()
432 1
        msg = b"this is my msg "
433 1
        evgen.trigger(tid, msg)
434
435 1
        ev = myhandler.future.result(10)
436 1
        self.assertIsNot(ev, None)  # we did not receive event
437 1
        self.assertEqual(ev.EventType, etype.nodeid)
438 1
        self.assertEqual(ev.Severity, serverity)
439 1
        self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
440 1
        self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
441 1
        self.assertEqual(ev.Message.Text, msg)
442 1
        self.assertEqual(ev.Time, tid)
443 1
        self.assertEqual(ev.PropertyNum, propertynum)
444 1
        self.assertEqual(ev.PropertyString, propertystring)
445
446
        # time.sleep(0.1)
447 1
        sub.unsubscribe(handle)
448 1
        sub.delete()
449
450 1 View Code Duplication
    def test_events_CustomEvent_MyObject(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
451 1
        objects = self.srv.get_objects_node()
452 1
        o = objects.add_object(3, 'MyObject')
453 1
        etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
454 1
        evgen = self.srv.get_event_generator(etype, o)
455
456 1
        myhandler = MySubHandler()
457 1
        sub = self.opc.create_subscription(100, myhandler)
458 1
        handle = sub.subscribe_events(o, etype)
459
460 1
        propertynum = 2
461 1
        propertystring = "This is my test"
462 1
        evgen.event.PropertyNum = propertynum
463 1
        evgen.event.PropertyString = propertystring
464 1
        tid = datetime.utcnow()
465 1
        msg = b"this is my msg "
466 1
        evgen.trigger(tid, msg)
467
468 1
        ev = myhandler.future.result(10)
469 1
        self.assertIsNot(ev, None)  # we did not receive event
470 1
        self.assertEqual(ev.EventType, etype.nodeid)
471 1
        self.assertEqual(ev.Severity, 1)
472 1
        self.assertEqual(ev.SourceName, 'MyObject')
473 1
        self.assertEqual(ev.SourceNode, o.nodeid)
474 1
        self.assertEqual(ev.Message.Text, msg)
475 1
        self.assertEqual(ev.Time, tid)
476 1
        self.assertEqual(ev.PropertyNum, propertynum)
477 1
        self.assertEqual(ev.PropertyString, propertystring)
478
479
        # time.sleep(0.1)
480 1
        sub.unsubscribe(handle)
481 1
        sub.delete()
482
483 1
    def test_several_different_events(self):
484 1
        objects = self.srv.get_objects_node()
485 1
        o = objects.add_object(3, 'MyObject')
486
487 1
        etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
488 1
        evgen1 = self.srv.get_event_generator(etype1, o)
489
490 1
        etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
491 1
        evgen2 = self.srv.get_event_generator(etype2, o)
492
493 1
        myhandler = MySubHandler2()
494 1
        sub = self.opc.create_subscription(100, myhandler)
495 1
        handle = sub.subscribe_events(o, etype1)
496
497 1
        propertynum1 = 1
498 1
        propertystring1 = "This is my test 1"
499 1
        evgen1.event.PropertyNum = propertynum1
500 1
        evgen1.event.PropertyString = propertystring1
501
502 1
        propertynum2 = 2
503 1
        propertystring2 = "This is my test 2"
504 1
        evgen2.event.PropertyNum = propertynum2
505 1
        evgen2.event.PropertyString = propertystring2
506
        
507 1
        for i in range(3):
508 1
            evgen1.trigger()
509 1
            evgen2.trigger()
510 1
        time.sleep(1)
511
512 1
        self.assertEqual(len(myhandler.results), 3)
513 1
        ev = myhandler.results[-1]
514 1
        self.assertEqual(ev.EventType, etype1.nodeid)
515
516 1
        handle = sub.subscribe_events(o, etype2)
517 1
        for i in range(4):
518 1
            evgen1.trigger()
519 1
            evgen2.trigger()
520 1
        time.sleep(1)
521
522
523 1
        ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
524 1
        ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
525
526 1
        self.assertEqual(len(myhandler.results), 11)
527 1
        self.assertEqual(len(ev2s), 4)
528 1
        self.assertEqual(len(ev1s), 7)
529
530
        #self.assertEqual(ev.SourceName, 'MyObject')
531
        #self.assertEqual(ev.SourceNode, o.nodeid)
532
        #self.assertEqual(ev.Message.Text, msg)
533
        #self.assertEqual(ev.Time, tid)
534
        #self.assertEqual(ev.PropertyNum, propertynum)
535
        #self.assertEqual(ev.PropertyString, propertystring)
536
537
        # time.sleep(0.1)
538 1
        sub.unsubscribe(handle)
539 1
        sub.delete()
540
541