Completed
Pull Request — master (#24)
by Olivier
02:42
created

tests.test_subscriptions   F

Complexity

Total Complexity 69

Size/Duplication

Total Lines 563
Duplicated Lines 4.62 %

Importance

Changes 0
Metric Value
eloc 458
dl 26
loc 563
rs 2.88
c 0
b 0
f 0
wmc 69

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like tests.test_subscriptions often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import time
2
import pytest
3
from copy import copy
4
from asyncio import Future, sleep, wait_for, TimeoutError
5
from datetime import datetime, timedelta
6
7
import asyncua
8
from asyncua import ua
9
10
pytestmark = pytest.mark.asyncio
11
12
13
class SubHandler:
14
    """
15
    Dummy subscription client
16
    """
17
18
    def datachange_notification(self, node, val, data):
19
        pass
20
21
    def event_notification(self, event):
22
        pass
23
24
25
class MySubHandler:
26
    """
27
    More advanced subscription client using Future, so we can wait for events in tests
28
    """
29
30
    def __init__(self):
31
        self.future = Future()
32
33
    def reset(self):
34
        self.future = Future()
35
36
    async def result(self):
37
        return await wait_for(self.future, 2)
38
39
    def datachange_notification(self, node, val, data):
40
        self.future.set_result((node, val, data))
41
42
    def event_notification(self, event):
43
        self.future.set_result(event)
44
45
46
class MySubHandler2:
47
    def __init__(self):
48
        self.results = []
49
50
    def datachange_notification(self, node, val, data):
51
        self.results.append((node, val))
52
53
    def event_notification(self, event):
54
        self.results.append(event)
55
56
57
class MySubHandlerCounter:
58
    def __init__(self):
59
        self.datachange_count = 0
60
        self.event_count = 0
61
62
    def datachange_notification(self, node, val, data):
63
        self.datachange_count += 1
64
65
    def event_notification(self, event):
66
        self.event_count += 1
67
68
69
async def test_subscription_failure(opc):
70
    myhandler = MySubHandler()
71
    o = opc.opc.get_objects_node()
72
    sub = await opc.opc.create_subscription(100, myhandler)
73
    with pytest.raises(ua.UaStatusCodeError):
74
        # we can only subscribe to variables so this should fail
75
        handle1 = await sub.subscribe_data_change(o)
76
    await sub.delete()
77
78
79
async def test_subscription_overload(opc):
80
    nb = 10
81
    myhandler = SubHandler()
82
    o = opc.opc.get_objects_node()
83
    sub = await opc.opc.create_subscription(1, myhandler)
84
    variables = []
85
    subs = []
86
    for i in range(nb):
87
        v = await o.add_variable(3, f'SubscriptionVariableOverload{i}', 99)
88
        variables.append(v)
89
    for i in range(nb):
90
        await sub.subscribe_data_change(variables)
91
    for i in range(nb):
92
        for j in range(nb):
93
            await variables[i].set_value(j)
94
        s = await opc.opc.create_subscription(1, myhandler)
95
        await s.subscribe_data_change(variables)
96
        subs.append(s)
97
        await sub.subscribe_data_change(variables[i])
98
    for i in range(nb):
99
        for j in range(nb):
100
            await variables[i].set_value(j)
101
    await sub.delete()
102
    for s in subs:
103
        await s.delete()
104
105
106
async def test_subscription_count(opc):
107
    myhandler = MySubHandlerCounter()
108
    sub = await opc.opc.create_subscription(1, myhandler)
109
    o = opc.opc.get_objects_node()
110
    var = await o.add_variable(3, 'SubVarCounter', 0.1)
111
    await sub.subscribe_data_change(var)
112
    nb = 12
113
    for i in range(nb):
114
        val = await var.get_value()
115
        await var.set_value(val + 1)
116
    await sleep(0.2)  # let last event arrive
117
    assert nb + 1 == myhandler.datachange_count
118
    await sub.delete()
119
120
121
async def test_subscription_count_list(opc):
122
    myhandler = MySubHandlerCounter()
123
    sub = await opc.opc.create_subscription(1, myhandler)
124
    o = opc.opc.get_objects_node()
125
    var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
126
    await sub.subscribe_data_change(var)
127
    nb = 12
128
    for i in range(nb):
129
        val = await var.get_value()
130
        #  we do not want to modify object in our db, we need a copy in order to generate event
131
        val = copy(val)
132
        val.append(i)
133
        await var.set_value(copy(val))
134
    await sleep(0.2)  # let last event arrive
135
    assert nb + 1 == myhandler.datachange_count
136
    await sub.delete()
137
138
139
async def test_subscription_count_no_change(opc):
140
    myhandler = MySubHandlerCounter()
141
    sub = await opc.opc.create_subscription(1, myhandler)
142
    o = opc.opc.get_objects_node()
143
    var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
144
    await sub.subscribe_data_change(var)
145
    nb = 12
146
    for i in range(nb):
147
        val = await var.get_value()
148
        await var.set_value(val)
149
    await sleep(0.2)  # let last event arrive
150
    assert 1 == myhandler.datachange_count
151
    await sub.delete()
152
153
154
async def test_subscription_count_empty(opc):
155
    myhandler = MySubHandlerCounter()
156
    sub = await opc.opc.create_subscription(1, myhandler)
157
    o = opc.opc.get_objects_node()
158
    var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
159
    await sub.subscribe_data_change(var)
160
    while True:
161
        val = await var.get_value()
162
        # we do not want to modify object in our db, we need a copy in order to generate event
163
        val = copy(val)
164
        val.pop()
165
        await var.set_value(val, ua.VariantType.Double)
166
        if not val:
167
            break
168
    await sleep(0.2)  # let last event arrive
169
    assert 4 == myhandler.datachange_count
170
    await sub.delete()
171
172
173
async def test_subscription_overload_simple(opc):
174
    nb = 10
175
    myhandler = MySubHandler()
176
    o = opc.opc.get_objects_node()
177
    sub = await opc.opc.create_subscription(1, myhandler)
178
    variables = []
179
    for i in range(nb):
180
        variables.append(await o.add_variable(3, f'SubVarOverload{i}', i))
181
    for i in range(nb):
182
        await sub.subscribe_data_change(variables)
183
    await sub.delete()
184
185
186
async def test_subscription_data_change(opc):
187
    """
188
    test subscriptions. This is far too complicated for
189
    a unittest but, setting up subscriptions requires a lot
190
    of code, so when we first set it up, it is best
191
    to test as many things as possible
192
    """
193
    myhandler = MySubHandler()
194
    o = opc.opc.get_objects_node()
195
    # subscribe to a variable
196
    startv1 = [1, 2, 3]
197
    v1 = await o.add_variable(3, 'SubscriptionVariableV1', startv1)
198
    sub = await opc.opc.create_subscription(100, myhandler)
199
    handle1 = await sub.subscribe_data_change(v1)
200
    # Now check we get the start value
201
    node, val, data = await myhandler.result()
202
    assert startv1 == val
203
    assert v1 == node
204
    myhandler.reset()  # reset future object
205
    # modify v1 and check we get value
206
    await v1.set_value([5])
207
    node, val, data = await myhandler.result()
208
    assert v1 == node
209
    assert [5] == val
210
    with pytest.raises(ua.UaStatusCodeError):
211
        await sub.unsubscribe(999)  # non existing handle
212
    await sub.unsubscribe(handle1)
213
    with pytest.raises(ua.UaStatusCodeError):
214
        await sub.unsubscribe(handle1)  # second try should fail
215
    await sub.delete()
216
    with pytest.raises(ua.UaStatusCodeError):
217
        await sub.unsubscribe(handle1)  # sub does not exist anymore
218
219
220
async def test_subscription_data_change_bool(opc):
221
    """
222
    test subscriptions. This is far too complicated for
223
    a unittest but, setting up subscriptions requires a lot
224
    of code, so when we first set it up, it is best
225
    to test as many things as possible
226
    """
227
    myhandler = MySubHandler()
228
    o = opc.opc.get_objects_node()
229
    # subscribe to a variable
230
    startv1 = True
231
    v1 = await o.add_variable(3, 'SubscriptionVariableBool', startv1)
232
    sub = await opc.opc.create_subscription(100, myhandler)
233
    handle1 = await sub.subscribe_data_change(v1)
234
    # Now check we get the start value
235
    node, val, data = await myhandler.result()
236
    assert startv1 == val
237
    assert v1 == node
238
    myhandler.reset()  # reset future object
239
    # modify v1 and check we get value
240
    await v1.set_value(False)
241
    node, val, data = await myhandler.result()
242
    assert v1 == node
243
    assert val is False
244
    await sub.delete()  # should delete our monitoreditem too
245
246
247
async def test_subscription_data_change_many(opc):
248
    """
249
    test subscriptions. This is far too complicated for
250
    a unittest but, setting up subscriptions requires a lot
251
    of code, so when we first set it up, it is best
252
    to test as many things as possible
253
    """
254
    myhandler = MySubHandler2()
255
    o = opc.opc.get_objects_node()
256
    startv1 = True
257
    v1 = await o.add_variable(3, 'SubscriptionVariableMany1', startv1)
258
    startv2 = [1.22, 1.65]
259
    v2 = await o.add_variable(3, 'SubscriptionVariableMany2', startv2)
260
    sub = await opc.opc.create_subscription(100, myhandler)
261
    handle1, handle2 = await sub.subscribe_data_change([v1, v2])
262
    # Now check we get the start values
263
    nodes = [v1, v2]
264
    count = 0
265
    while not len(myhandler.results) > 1:
266
        count += 1
267
        await sleep(0.1)
268
        if count > 100:
269
            raise RuntimeError("Did not get result from subscription")
270
    for node, val in myhandler.results:
271
        assert node in nodes
272
        nodes.remove(node)
273
        if node == v1:
274
            assert val == startv1
275
        elif node == v2:
276
            assert val == startv2
277
        else:
278
            raise RuntimeError("Error node {0} is neither {1} nor {2}".format(node, v1, v2))
279
    await sub.delete()
280
281
282
async def test_subscribe_server_time(opc):
283
    myhandler = MySubHandler()
284
    server_time_node = opc.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
285
    sub = await opc.opc.create_subscription(200, myhandler)
286
    handle = await sub.subscribe_data_change(server_time_node)
287
    node, val, data = await myhandler.result()
288
    assert server_time_node == node
289
    delta = datetime.utcnow() - val
290
    assert delta < timedelta(seconds=2)
291
    await sub.unsubscribe(handle)
292
    await sub.delete()
293
294
295
async def test_create_delete_subscription(opc):
296
    o = opc.opc.get_objects_node()
297
    v = await o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
298
    sub = await opc.opc.create_subscription(100, MySubHandler())
299
    handle = await sub.subscribe_data_change(v)
300
    await sleep(0.1)
301
    await sub.unsubscribe(handle)
302
    await sub.delete()
303
304
305
async def test_subscribe_events(opc):
306
    sub = await opc.opc.create_subscription(100, MySubHandler())
307
    handle = await sub.subscribe_events()
308
    await sleep(0.1)
309
    await sub.unsubscribe(handle)
310
    await sub.delete()
311
312
313
async def test_subscribe_events_to_wrong_node(opc):
314
    sub = await opc.opc.create_subscription(100, MySubHandler())
315
    with pytest.raises(ua.UaStatusCodeError):
316
        handle = await sub.subscribe_events(opc.opc.get_node("i=85"))
317
    o = opc.opc.get_objects_node()
318
    v = await o.add_variable(3, 'VariableNoEventNofierAttribute', 4)
319
    with pytest.raises(ua.UaStatusCodeError):
320
        handle = await sub.subscribe_events(v)
321
    await sub.delete()
322
323
324
async def test_get_event_from_type_node_BaseEvent(opc):
325
    etype = opc.opc.get_node(ua.ObjectIds.BaseEventType)
326
    properties = await asyncua.common.events.get_event_properties_from_type_node(etype)
327
    for child in await etype.get_properties():
328
        assert child in properties
329
330
331
async def test_get_event_from_type_node_CustomEvent(opc):
332
    etype = await opc.server.create_custom_event_type(
333
        2, 'MyEvent', ua.ObjectIds.AuditEventType,
334
        [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)]
335
    )
336
    properties = await asyncua.common.events.get_event_properties_from_type_node(etype)
337
    for child in await opc.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
338
        assert child in properties
339
    for child in await opc.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
340
        assert child in properties
341
    for child in await opc.opc.get_node(etype.nodeid).get_properties():
342
        assert child in properties
343
    assert await etype.get_child("2:PropertyNum") in properties
344
    assert await etype.get_child("2:PropertyString") in properties
345
346
347
async def test_events_default(opc):
348
    evgen = await opc.server.get_event_generator()
349
    myhandler = MySubHandler()
350
    sub = await opc.opc.create_subscription(100, myhandler)
351
    handle = await sub.subscribe_events()
352
    tid = datetime.utcnow()
353
    msg = "this is my msg "
354
    evgen.trigger(tid, msg)
355
    ev = await myhandler.result()
356
    assert ev is not None  # we did not receive event
357
    assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType
358
    assert 1 == ev.Severity
359
    assert (await opc.opc.get_server_node().get_browse_name()).Name == ev.SourceName
360
    assert opc.opc.get_server_node().nodeid == ev.SourceNode
361
    assert msg == ev.Message.Text
362
    assert tid == ev.Time
363
    await sub.unsubscribe(handle)
364
    await sub.delete()
365
366
367
async def test_events_MyObject(opc):
368
    objects = opc.server.get_objects_node()
369
    o = await objects.add_object(3, 'MyObject')
370
    evgen = await opc.server.get_event_generator(emitting_node=o)
371
    myhandler = MySubHandler()
372
    sub = await opc.opc.create_subscription(100, myhandler)
373
    handle = await sub.subscribe_events(o)
374
    tid = datetime.utcnow()
375
    msg = "this is my msg "
376
    evgen.trigger(tid, msg)
377
    ev = await myhandler.result()
378
    assert ev is not None  # we did not receive event
379
    assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType
380
    assert 1 == ev.Severity
381
    assert 'MyObject' == ev.SourceName
382
    assert o.nodeid == ev.SourceNode
383
    assert msg == ev.Message.Text
384
    assert tid == ev.Time
385
    await sub.unsubscribe(handle)
386
    await sub.delete()
387
388
389
async def test_events_wrong_source(opc):
390
    objects = opc.server.get_objects_node()
391
    o = await objects.add_object(3, 'MyObject')
392
    evgen = await opc.server.get_event_generator(emitting_node=o)
393
    myhandler = MySubHandler()
394
    sub = await opc.opc.create_subscription(100, myhandler)
395
    handle = await sub.subscribe_events()
396
    tid = datetime.utcnow()
397
    msg = "this is my msg "
398
    evgen.trigger(tid, msg)
399
    with pytest.raises(TimeoutError):  # we should not receive event
400
        ev = await myhandler.result()
401
    await sub.unsubscribe(handle)
402
    await sub.delete()
403
404
405
async def test_events_CustomEvent(opc):
406
    etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
407
                                              [('PropertyNum', ua.VariantType.Float),
408
                                               ('PropertyString', ua.VariantType.String)])
409
    evgen = await opc.server.get_event_generator(etype)
410
    myhandler = MySubHandler()
411
    sub = await opc.opc.create_subscription(100, myhandler)
412
    handle = await sub.subscribe_events(evtypes=etype)
413
    propertynum = 2
414
    propertystring = "This is my test"
415
    evgen.event.PropertyNum = propertynum
416
    evgen.event.PropertyString = propertystring
417
    serverity = 500
418
    evgen.event.Severity = serverity
419
    tid = datetime.utcnow()
420
    msg = "this is my msg "
421
    evgen.trigger(tid, msg)
422
    ev = await myhandler.result()
423
    assert ev is not None  # we did not receive event
424
    assert etype.nodeid == ev.EventType
425
    assert serverity == ev.Severity
426
    assert (await opc.opc.get_server_node().get_browse_name()).Name == ev.SourceName
427
    assert opc.opc.get_server_node().nodeid == ev.SourceNode
428
    assert msg == ev.Message.Text
429
    assert tid == ev.Time
430
    assert propertynum == ev.PropertyNum
431
    assert propertystring == ev.PropertyString
432
    await sub.unsubscribe(handle)
433
    await sub.delete()
434
435
436
async def test_events_CustomEvent_MyObject(opc):
437
    objects = opc.server.get_objects_node()
438
    o = await objects.add_object(3, 'MyObject')
439
    etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
440
                                              [('PropertyNum', ua.VariantType.Float),
441
                                               ('PropertyString', ua.VariantType.String)])
442
    evgen = await opc.server.get_event_generator(etype, emitting_node=o)
443
    myhandler = MySubHandler()
444
    sub = await opc.opc.create_subscription(100, myhandler)
445
    handle = await sub.subscribe_events(o, etype)
446
    propertynum = 2
447
    propertystring = "This is my test"
448
    evgen.event.PropertyNum = propertynum
449
    evgen.event.PropertyString = propertystring
450
    tid = datetime.utcnow()
451
    msg = "this is my msg "
452
    evgen.trigger(tid, msg)
453
    ev = await myhandler.result()
454
    assert ev is not None  # we did not receive event
455
    assert etype.nodeid == ev.EventType
456
    assert 1 == ev.Severity
457
    assert 'MyObject' == ev.SourceName
458
    assert o.nodeid == ev.SourceNode
459
    assert msg == ev.Message.Text
460
    assert tid == ev.Time
461
    assert propertynum == ev.PropertyNum
462
    assert propertystring == ev.PropertyString
463
    await sub.unsubscribe(handle)
464
    await sub.delete()
465
466
467
async def test_several_different_events(opc):
468
    objects = opc.server.get_objects_node()
469
    o = await objects.add_object(3, 'MyObject')
470
    etype1 = await opc.server.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType,
471
                                               [('PropertyNum', ua.VariantType.Float),
472
                                                ('PropertyString', ua.VariantType.String)])
473
    evgen1 = await opc.server.get_event_generator(etype1, o)
474
    etype2 = await opc.server.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType,
475
                                               [('PropertyNum', ua.VariantType.Float),
476
                                                ('PropertyString', ua.VariantType.String)])
477
    evgen2 = await opc.server.get_event_generator(etype2, o)
478
    myhandler = MySubHandler2()
479
    sub = await opc.opc.create_subscription(100, myhandler)
480
    handle = await sub.subscribe_events(o, etype1)
481
    propertynum1 = 1
482
    propertystring1 = "This is my test 1"
483
    evgen1.event.PropertyNum = propertynum1
484
    evgen1.event.PropertyString = propertystring1
485
    propertynum2 = 2
486
    propertystring2 = "This is my test 2"
487
    evgen2.event.PropertyNum = propertynum2
488
    evgen2.event.PropertyString = propertystring2
489
    for i in range(3):
490
        evgen1.trigger()
491
        evgen2.trigger()
492
    await sleep(1)  # ToDo: replace
493
    assert 3 == len(myhandler.results)
494
    ev = myhandler.results[-1]
495
    assert etype1.nodeid == ev.EventType
496
    handle = await sub.subscribe_events(o, etype2)
497
    for i in range(4):
498
        evgen1.trigger()
499
        evgen2.trigger()
500
    await sleep(1)  # ToDo: replace
501
    ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
502
    ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
503
    assert 11 == len(myhandler.results)
504
    assert 4 == len(ev2s)
505
    assert 7 == len(ev1s)
506
    await sub.unsubscribe(handle)
507
    await sub.delete()
508
509
510
async def test_several_different_events_2(opc):
511
    objects = opc.server.get_objects_node()
512
    o = await objects.add_object(3, 'MyObject')
513
    etype1 = await opc.server.create_custom_event_type(
514
        2, 'MyEvent1', ua.ObjectIds.BaseEventType,
515
        [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)]
516
    )
517
    evgen1 = await opc.server.get_event_generator(etype1, o)
518
    etype2 = await opc.server.create_custom_event_type(
519
        2, 'MyEvent2', ua.ObjectIds.BaseEventType,
520
        [('PropertyNum2', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)]
521
    )
522
    evgen2 = await opc.server.get_event_generator(etype2, o)
523
    etype3 = await opc.server.create_custom_event_type(
524
        2, 'MyEvent3', ua.ObjectIds.BaseEventType,
525
        [('PropertyNum3', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)]
526
    )
527
    evgen3 = await opc.server.get_event_generator(etype3, o)
528
    myhandler = MySubHandler2()
529
    sub = await opc.opc.create_subscription(100, myhandler)
530
    handle = await sub.subscribe_events(o, [etype1, etype3])
531
    propertynum1 = 1
532
    propertystring1 = "This is my test 1"
533
    evgen1.event.PropertyNum = propertynum1
534
    evgen1.event.PropertyString = propertystring1
535
    propertynum2 = 2
536
    propertystring2 = "This is my test 2"
537
    evgen2.event.PropertyNum2 = propertynum2
538
    evgen2.event.PropertyString = propertystring2
539
    propertynum3 = 3
540
    propertystring3 = "This is my test 3"
541
    evgen3.event.PropertyNum3 = propertynum3
542
    evgen3.event.PropertyString = propertystring2
543
    for i in range(3):
544
        evgen1.trigger()
545
        evgen2.trigger()
546
        evgen3.trigger()
547
    evgen3.event.PropertyNum3 = 9999
548
    evgen3.trigger()
549
    await sleep(1)
550
    ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
551
    ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
552
    ev3s = [ev for ev in myhandler.results if ev.EventType == etype3.nodeid]
553
    assert 7 == len(myhandler.results)
554
    assert 3 == len(ev1s)
555
    assert 0 == len(ev2s)
556
    assert 4 == len(ev3s)
557
    assert propertynum1 == ev1s[0].PropertyNum
558
    assert propertynum3 == ev3s[0].PropertyNum3
559
    assert 9999 == ev3s[-1].PropertyNum3
560
    assert ev1s[0].PropertyNum3 is None
561
    await sub.unsubscribe(handle)
562
    await sub.delete()
563