Completed
Pull Request — master (#24)
by Olivier
02:42
created

tests.test_server   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 589
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 443
dl 0
loc 589
rs 7.44
c 0
b 0
f 0
wmc 52

How to fix   Complexity   

Complexity

Complex classes like tests.test_server often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Run common tests on server side
3
Tests that can only be run on server side must be defined here
4
"""
5
import asyncio
6
import pytest
7
import logging
8
import os
9
import shelve
10
from datetime import timedelta
11
from enum import Enum, EnumMeta
12
13
import asyncua
14
from asyncua import Server, Client, ua, uamethod, BaseEvent, AuditEvent, AuditChannelEvent, AuditSecurityEvent, AuditOpenSecureChannelEvent
15
from asyncua.common import ua_utils
16
17
pytestmark = pytest.mark.asyncio
18
_logger = logging.getLogger(__name__)
19
20
21
async def test_discovery(server, discovery_server):
22
    client = Client(discovery_server.endpoint.geturl())
23
    async with client:
24
        servers = await client.find_servers()
25
        new_app_uri = 'urn:freeopcua:python:server:test_discovery'
26
        server.application_uri = new_app_uri
27
        await server.register_to_discovery(discovery_server.endpoint.geturl(), 0)
28
        # let server register registration
29
        await asyncio.sleep(0.1)
30
        new_servers = await client.find_servers()
31
        assert len(new_servers) - len(servers) == 1
32
        assert new_app_uri not in [s.ApplicationUri for s in servers]
33
        assert new_app_uri in [s.ApplicationUri for s in new_servers]
34
35
36
async def test_find_servers2(server, discovery_server):
37
    client = Client(discovery_server.endpoint.geturl())
38
    async with client:
39
        servers = await client.find_servers()
40
        new_app_uri1 = 'urn:freeopcua:python:server:test_discovery1'
41
        server.application_uri = new_app_uri1
42
        await server.register_to_discovery(discovery_server.endpoint.geturl(), period=0)
43
        new_app_uri2 = 'urn:freeopcua:python:test_discovery2'
44
        server.application_uri = new_app_uri2
45
        await server.register_to_discovery(discovery_server.endpoint.geturl(), period=0)
46
        await asyncio.sleep(0.1)  # let server register registration
47
        new_servers = await client.find_servers()
48
        assert len(new_servers) - len(servers) == 2
49
        assert new_app_uri1 not in [s.ApplicationUri for s in servers]
50
        assert new_app_uri2 not in [s.ApplicationUri for s in servers]
51
        assert new_app_uri1 in [s.ApplicationUri for s in new_servers]
52
        assert new_app_uri2 in [s.ApplicationUri for s in new_servers]
53
        # now do a query with filer
54
        new_servers = await client.find_servers(['urn:freeopcua:python:server'])
55
        assert len(new_servers) - len(servers) == 0
56
        assert new_app_uri1 in [s.ApplicationUri for s in new_servers]
57
        assert new_app_uri2 not in [s.ApplicationUri for s in new_servers]
58
        # now do a query with filer
59
        new_servers = await client.find_servers(['urn:freeopcua:python'])
60
        assert len(new_servers) - len(servers) == 2
61
        assert new_app_uri1 in [s.ApplicationUri for s in new_servers]
62
        assert new_app_uri2 in [s.ApplicationUri for s in new_servers]
63
64
65
async def test_register_namespace(server):
66
    uri = 'http://mycustom.Namespace.com'
67
    idx1 = await server.register_namespace(uri)
68
    idx2 = await server.get_namespace_index(uri)
69
    assert idx1 == idx2
70
71
72
async def test_register_existing_namespace(server):
73
    uri = 'http://mycustom.Namespace.com'
74
    idx1 = await server.register_namespace(uri)
75
    idx2 = await server.register_namespace(uri)
76
    idx3 = await server.get_namespace_index(uri)
77
    assert idx1 == idx2
78
    assert idx1 == idx3
79
80
81
async def test_register_use_namespace(server):
82
    uri = 'http://my_very_custom.Namespace.com'
83
    idx = await server.register_namespace(uri)
84
    root = server.get_root_node()
85
    myvar = await root.add_variable(idx, 'var_in_custom_namespace', [5])
86
    myid = myvar.nodeid
87
    assert idx == myid.NamespaceIndex
88
89
90
async def test_server_method(server):
91
    def func(parent, variant):
92
        variant.Value *= 2
93
        return [variant]
94
95
    o = server.get_objects_node()
96
    v = await o.add_method(3, 'Method1', func, [ua.VariantType.Int64], [ua.VariantType.Int64])
97
    result = await o.call_method(v, ua.Variant(2.1))
98
    assert result == 4.2
99
100
101
async def test_historize_variable(server):
102
    o = server.get_objects_node()
103
    var = await o.add_variable(3, "test_hist", 1.0)
104
    await server.iserver.enable_history_data_change(var, timedelta(days=1))
105
    await asyncio.sleep(1)
106
    await var.set_value(2.0)
107
    await var.set_value(3.0)
108
    await server.iserver.disable_history_data_change(var)
109
110
111
async def test_historize_events(server):
112
    srv_node = server.get_node(ua.ObjectIds.Server)
113
    assert await srv_node.get_event_notifier() == {ua.EventNotifier.SubscribeToEvents}
114
    srvevgen = await server.get_event_generator()
115
    await server.iserver.enable_history_event(srv_node, period=None)
116
    assert await srv_node.get_event_notifier() == {ua.EventNotifier.SubscribeToEvents, ua.EventNotifier.HistoryRead}
117
    srvevgen.trigger(message='Message')
118
    await server.iserver.disable_history_event(srv_node)
119
120
121
async def test_references_for_added_nodes_method(server):
122
    objects = server.get_objects_node()
123
    o = await objects.add_object(3, 'MyObject')
124
    nodes = await objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Forward,
125
                                               includesubtypes=False)
126
    assert o in nodes
127
    nodes = await o.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse,
128
                                         includesubtypes=False)
129
    assert objects in nodes
130
    assert await o.get_parent() == objects
131
    assert (await o.get_type_definition()).Identifier == ua.ObjectIds.BaseObjectType
132
133
    @uamethod
134
    def callback(parent):
135
        return
136
137
    m = await o.add_method(3, 'MyMethod', callback)
138
    nodes = await o.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward,
139
                                         includesubtypes=False)
140
    assert m in nodes
141
    nodes = await m.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse,
142
                                         includesubtypes=False)
143
    assert o in nodes
144
    assert await m.get_parent() == o
145
146
147
async def test_get_event_from_type_node_BaseEvent(server):
148
    """
149
    This should work for following BaseEvent tests to work
150
    (maybe to write it a bit differentlly since they are not independent) 
151
    """
152
    ev = await asyncua.common.events.get_event_obj_from_type_node(
153
        asyncua.Node(server.iserver.isession, ua.NodeId(ua.ObjectIds.BaseEventType))
154
    )
155
    check_base_event(ev)
156
157
158
async def test_get_event_from_type_node_Inhereted_AuditEvent(server):
159
    ev = await asyncua.common.events.get_event_obj_from_type_node(
160
        asyncua.Node(server.iserver.isession, ua.NodeId(ua.ObjectIds.AuditEventType))
161
    )
162
    # we did not receive event
163
    assert ev is not None
164
    assert isinstance(ev, BaseEvent)
165
    assert isinstance(ev, AuditEvent)
166
    assert ev.EventType == ua.NodeId(ua.ObjectIds.AuditEventType)
167
    assert ev.Severity == 1
168
    assert ev.ActionTimeStamp is None
169
    assert ev.Status == False
170
    assert ev.ServerId is None
171
    assert ev.ClientAuditEntryId is None
172
    assert ev.ClientUserId is None
173
174
175
async def test_get_event_from_type_node_MultiInhereted_AuditOpenSecureChannelEvent(server):
176
    ev = await asyncua.common.events.get_event_obj_from_type_node(
177
        asyncua.Node(server.iserver.isession, ua.NodeId(ua.ObjectIds.AuditOpenSecureChannelEventType))
178
    )
179
    assert ev is not None
180
    assert isinstance(ev, BaseEvent)
181
    assert isinstance(ev, AuditEvent)
182
    assert isinstance(ev, AuditSecurityEvent)
183
    assert isinstance(ev, AuditChannelEvent)
184
    assert isinstance(ev, AuditOpenSecureChannelEvent)
185
    assert ev.EventType == ua.NodeId(ua.ObjectIds.AuditOpenSecureChannelEventType)
186
    assert ev.Severity == 1
187
    assert ev.ClientCertificate is None
188
    assert ev.ClientCertificateThumbprint is None
189
    assert ev.RequestType is None
190
    assert ev.SecurityPolicyUri is None
191
    assert ev.SecurityMode is None
192
    assert ev.RequestedLifetime is None
193
194
195
async def test_eventgenerator_default(server):
196
    evgen = await server.get_event_generator()
197
    await check_eventgenerator_base_event(evgen, server)
198
    await check_eventgenerator_source_server(evgen, server)
199
200
201
async def test_eventgenerator_BaseEvent_object(server):
202
    evgen = await server.get_event_generator(BaseEvent())
203
    await check_eventgenerator_base_event(evgen, server)
204
    await check_eventgenerator_source_server(evgen, server)
205
206
207
async def test_eventgenerator_BaseEvent_Node(server):
208
    evgen = await server.get_event_generator(asyncua.Node(server.iserver.isession, ua.NodeId(ua.ObjectIds.BaseEventType)))
209
    await check_eventgenerator_base_event(evgen, server)
210
    await check_eventgenerator_source_server(evgen, server)
211
212
213
async def test_eventgenerator_BaseEvent_NodeId(server):
214
    evgen = await server.get_event_generator(ua.NodeId(ua.ObjectIds.BaseEventType))
215
    await check_eventgenerator_base_event(evgen, server)
216
    await check_eventgenerator_source_server(evgen, server)
217
218
219
async def test_eventgenerator_BaseEvent_ObjectIds(server):
220
    evgen = await server.get_event_generator(ua.ObjectIds.BaseEventType)
221
    await check_eventgenerator_base_event(evgen, server)
222
    await check_eventgenerator_source_server(evgen, server)
223
224
225
async def test_eventgenerator_BaseEvent_Identifier(server):
226
    evgen = await server.get_event_generator(2041)
227
    await check_eventgenerator_base_event(evgen, server)
228
    await check_eventgenerator_source_server(evgen, server)
229
230
231
async def test_eventgenerator_sourceServer_Node(server):
232
    evgen = await server.get_event_generator(emitting_node=asyncua.Node(server.iserver.isession, ua.NodeId(ua.ObjectIds.Server)))
233
    await check_eventgenerator_base_event(evgen, server)
234
    await check_eventgenerator_source_server(evgen, server)
235
236
237
async def test_eventgenerator_sourceServer_NodeId(server):
238
    evgen = await server.get_event_generator(emitting_node=ua.NodeId(ua.ObjectIds.Server))
239
    await check_eventgenerator_base_event(evgen, server)
240
    await check_eventgenerator_source_server(evgen, server)
241
242
243
async def test_eventgenerator_sourceServer_ObjectIds(server):
244
    evgen = await server.get_event_generator(emitting_node=ua.ObjectIds.Server)
245
    await check_eventgenerator_base_event(evgen, server)
246
    await check_eventgenerator_source_server(evgen, server)
247
248
249
async def test_eventgenerator_sourceMyObject(server):
250
    objects = server.get_objects_node()
251
    o = await objects.add_object(3, 'MyObject')
252
    evgen = await server.get_event_generator(emitting_node=o)
253
    await check_eventgenerator_base_event(evgen, server)
254
    await check_event_generator_object(evgen, o)
255
256
257
async def test_eventgenerator_source_collision(server):
258
    objects = server.get_objects_node()
259
    o = await objects.add_object(3, 'MyObject')
260
    event = BaseEvent(sourcenode=o.nodeid)
261
    evgen = await server.get_event_generator(event, ua.ObjectIds.Server)
262
    await check_eventgenerator_base_event(evgen, server)
263
    await check_event_generator_object(evgen, o, emitting_node=asyncua.Node(server.iserver.isession, ua.ObjectIds.Server))
264
265
266
async def test_eventgenerator_inherited_event(server):
267
    evgen = await server.get_event_generator(ua.ObjectIds.AuditEventType)
268
    await check_eventgenerator_source_server(evgen, server)
269
    ev = evgen.event
270
    assert ev is not None  # we did not receive event
271
    assert isinstance(ev, BaseEvent)
272
    assert isinstance(ev, AuditEvent)
273
    assert ua.NodeId(ua.ObjectIds.AuditEventType) == ev.EventType
274
    assert 1 == ev.Severity
275
    assert ev.ActionTimeStamp is None
276
    assert False == ev.Status
277
    assert ev.ServerId is None
278
    assert ev.ClientAuditEntryId is None
279
    assert ev.ClientUserId is None
280
281
282
async def test_eventgenerator_multi_inherited_event(server):
283
    evgen = await server.get_event_generator(ua.ObjectIds.AuditOpenSecureChannelEventType)
284
    await check_eventgenerator_source_server(evgen, server)
285
    ev = evgen.event
286
    assert ev is not None  # we did not receive event
287
    assert isinstance(ev, BaseEvent)
288
    assert isinstance(ev, AuditEvent)
289
    assert isinstance(ev, AuditSecurityEvent)
290
    assert isinstance(ev, AuditChannelEvent)
291
    assert isinstance(ev, AuditOpenSecureChannelEvent)
292
    assert ua.NodeId(ua.ObjectIds.AuditOpenSecureChannelEventType) == ev.EventType
293
    assert 1 == ev.Severity
294
    assert ev.ClientCertificate is None
295
    assert ev.ClientCertificateThumbprint is None
296
    assert ev.RequestType is None
297
    assert ev.SecurityPolicyUri is None
298
    assert ev.SecurityMode is None
299
    assert ev.RequestedLifetime is None
300
301
302
async def test_create_custom_data_type_object_id(server):
303
    """
304
    For the custom events all posibilites are tested.
305
    For other custom types only one test case is done since they are using the same code
306
    """
307
    type = await server.create_custom_data_type(2, 'MyDataType', ua.ObjectIds.BaseDataType,
308
                                                [('PropertyNum', ua.VariantType.Int32),
309
                                                 ('PropertyString', ua.VariantType.String)])
310
    await check_custom_type(type, ua.ObjectIds.BaseDataType, server)
311
312
313
async def test_create_custom_event_type_object_id(server):
314
    type = await server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
315
                                                 [('PropertyNum', ua.VariantType.Int32),
316
                                                  ('PropertyString', ua.VariantType.String)])
317
    await check_custom_type(type, ua.ObjectIds.BaseEventType, server)
318
319
320
async def test_create_custom_object_type_object_id(server):
321
    def func(parent, variant):
322
        return [ua.Variant(ret, ua.VariantType.Boolean)]
323
324
    properties = [('PropertyNum', ua.VariantType.Int32),
325
                  ('PropertyString', ua.VariantType.String)]
326
    variables = [('VariableString', ua.VariantType.String),
327
                 ('MyEnumVar', ua.VariantType.Int32, ua.NodeId(ua.ObjectIds.ApplicationType))]
328
    methods = [('MyMethod', func, [ua.VariantType.Int64], [ua.VariantType.Boolean])]
329
    node_type = await server.create_custom_object_type(2, 'MyObjectType', ua.ObjectIds.BaseObjectType, properties,
330
                                                       variables, methods)
331
    await check_custom_type(node_type, ua.ObjectIds.BaseObjectType, server)
332
    variables = await node_type.get_variables()
333
    assert await node_type.get_child("2:VariableString") in variables
334
    assert ua.VariantType.String == (
335
        await(await node_type.get_child("2:VariableString")).get_data_value()).Value.VariantType
336
    assert await node_type.get_child("2:MyEnumVar") in variables
337
    assert ua.VariantType.Int32 == (await(await node_type.get_child("2:MyEnumVar")).get_data_value()).Value.VariantType
338
    assert ua.NodeId(ua.ObjectIds.ApplicationType) == await (await node_type.get_child("2:MyEnumVar")).get_data_type()
339
    methods = await node_type.get_methods()
340
    assert await node_type.get_child("2:MyMethod") in methods
341
342
343
async def test_create_custom_variable_type_object_id(server):
344
    type = await server.create_custom_variable_type(2, 'MyVariableType', ua.ObjectIds.BaseVariableType,
345
                                                    [('PropertyNum', ua.VariantType.Int32),
346
                                                     ('PropertyString', ua.VariantType.String)])
347
    await check_custom_type(type, ua.ObjectIds.BaseVariableType, server)
348
349
350
async def test_create_custom_event_type_node_id(server):
351
    etype = await server.create_custom_event_type(2, 'MyEvent', ua.NodeId(ua.ObjectIds.BaseEventType),
352
                                                  [('PropertyNum', ua.VariantType.Int32),
353
                                                   ('PropertyString', ua.VariantType.String)])
354
    await check_custom_type(etype, ua.ObjectIds.BaseEventType, server)
355
356
357
async def test_create_custom_event_type_node(server):
358
    etype = await server.create_custom_event_type(2, 'MyEvent', asyncua.Node(server.iserver.isession,
359
                                                                           ua.NodeId(ua.ObjectIds.BaseEventType)),
360
                                                  [('PropertyNum', ua.VariantType.Int32),
361
                                                   ('PropertyString', ua.VariantType.String)])
362
    await check_custom_type(etype, ua.ObjectIds.BaseEventType, server)
363
364
365
async def test_get_event_from_type_node_custom_event(server):
366
    etype = await server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
367
                                                  [('PropertyNum', ua.VariantType.Int32),
368
                                                   ('PropertyString', ua.VariantType.String)])
369
    ev = await asyncua.common.events.get_event_obj_from_type_node(etype)
370
    check_custom_event(ev, etype)
371
    assert 0 == ev.PropertyNum
372
    assert ev.PropertyString is None
373
374
375
async def test_eventgenerator_custom_event(server):
376
    etype = await server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
377
                                                  [('PropertyNum', ua.VariantType.Int32),
378
                                                   ('PropertyString', ua.VariantType.String)])
379
    evgen = await server.get_event_generator(etype, ua.ObjectIds.Server)
380
    check_eventgenerator_custom_event(evgen, etype, server)
381
    await check_eventgenerator_source_server(evgen, server)
382
    assert 0 == evgen.event.PropertyNum
383
    assert evgen.event.PropertyString is None
384
385
386
async def test_eventgenerator_double_custom_event(server):
387
    event1 = await server.create_custom_event_type(3, 'MyEvent1', ua.ObjectIds.BaseEventType,
388
                                                   [('PropertyNum', ua.VariantType.Int32),
389
                                                    ('PropertyString', ua.VariantType.String)])
390
    event2 = await server.create_custom_event_type(4, 'MyEvent2', event1, [('PropertyBool', ua.VariantType.Boolean),
391
                                                                           ('PropertyInt', ua.VariantType.Int32)])
392
    evgen = await server.get_event_generator(event2, ua.ObjectIds.Server)
393
    check_eventgenerator_custom_event(evgen, event2, server)
394
    await check_eventgenerator_source_server(evgen, server)
395
    # Properties from MyEvent1
396
    assert 0 == evgen.event.PropertyNum
397
    assert evgen.event.PropertyString is None
398
    # Properties from MyEvent2
399
    assert not evgen.event.PropertyBool
400
    assert 0 == evgen.event.PropertyInt
401
402
403
async def test_eventgenerator_custom_event_my_object(server):
404
    objects = server.get_objects_node()
405
    o = await objects.add_object(3, 'MyObject')
406
    etype = await server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
407
                                                  [('PropertyNum', ua.VariantType.Int32),
408
                                                   ('PropertyString', ua.VariantType.String)])
409
410
    evgen = await server.get_event_generator(etype, o)
411
    check_eventgenerator_custom_event(evgen, etype, server)
412
    await check_event_generator_object(evgen, o)
413
    assert 0 == evgen.event.PropertyNum
414
    assert evgen.event.PropertyString is None
415
416
417
async def test_context_manager():
418
    # Context manager calls start() and stop()
419
    state = [0]
420
421
    async def increment_state(self, *args, **kwargs):
422
        state[0] += 1
423
424
    # create server and replace instance methods with dummy methods
425
    server = Server()
426
    server.start = increment_state.__get__(server)
427
    server.stop = increment_state.__get__(server)
428
    assert state[0] == 0
429
    async with server:
430
        # test if server started
431
        assert 1 == state[0]
432
    # test if server stopped
433
    assert 2 == state[0]
434
435
436
async def test_get_node_by_ns(server):
437
    def get_ns_of_nodes(nodes):
438
        ns_list = set()
439
        for node in nodes:
440
            ns_list.add(node.nodeid.NamespaceIndex)
441
        return ns_list
442
443
    # incase other testss created nodes  in unregistered namespace
444
    _idx_d = await server.register_namespace('dummy1')
445
    _idx_d = await server.register_namespace('dummy2')
446
    _idx_d = await server.register_namespace('dummy3')
447
    # create the test namespaces and vars
448
    idx_a = await server.register_namespace('a')
449
    idx_b = await server.register_namespace('b')
450
    idx_c = await server.register_namespace('c')
451
    o = server.get_objects_node()
452
    _myvar2 = await o.add_variable(idx_a, "MyBoolVar2", True)
453
    _myvar3 = await o.add_variable(idx_b, "MyBoolVar3", True)
454
    _myvar4 = await o.add_variable(idx_c, "MyBoolVar4", True)
455
    # the tests
456
    nodes = await ua_utils.get_nodes_of_namespace(server, namespaces=[idx_a, idx_b, idx_c])
457
    assert 3 == len(nodes)
458
    assert set([idx_a, idx_b, idx_c]) == get_ns_of_nodes(nodes)
459
460
    nodes = await ua_utils.get_nodes_of_namespace(server, namespaces=[idx_a])
461
    assert 1 == len(nodes)
462
    assert set([idx_a]) == get_ns_of_nodes(nodes)
463
464
    nodes = await ua_utils.get_nodes_of_namespace(server, namespaces=[idx_b])
465
    assert 1 == len(nodes)
466
    assert set([idx_b]) == get_ns_of_nodes(nodes)
467
468
    nodes = await ua_utils.get_nodes_of_namespace(server, namespaces=['a'])
469
    assert 1 == len(nodes)
470
    assert set([idx_a]) == get_ns_of_nodes(nodes)
471
472
    nodes = await ua_utils.get_nodes_of_namespace(server, namespaces=['a', 'c'])
473
    assert 2 == len(nodes)
474
    assert set([idx_a, idx_c]) == get_ns_of_nodes(nodes)
475
476
    nodes = await ua_utils.get_nodes_of_namespace(server, namespaces='b')
477
    assert 1 == len(nodes)
478
    assert set([idx_b]) == get_ns_of_nodes(nodes)
479
480
    nodes = await ua_utils.get_nodes_of_namespace(server, namespaces=idx_b)
481
    assert 1 == len(nodes)
482
    assert set([idx_b]) == get_ns_of_nodes(nodes)
483
    with pytest.raises(ValueError):
484
        await ua_utils.get_nodes_of_namespace(server, namespaces='non_existing_ns')
485
486
487
async def test_load_enum_strings(server):
488
    dt = await server.nodes.enum_data_type.add_data_type(0, "MyStringEnum")
489
    await dt.add_variable(0, "EnumStrings", [ua.LocalizedText("e1"), ua.LocalizedText("e2"), ua.LocalizedText("e3"),
490
                                       ua.LocalizedText("e 4")])
491
    await server.load_enums()
492
    e = getattr(ua, "MyStringEnum")
493
    assert isinstance(e, EnumMeta)
494
    assert hasattr(e, "e1")
495
    assert hasattr(e, "e4")
496
    assert 3 == getattr(e, "e4")
497
498
499
async def test_load_enum_values(server):
500
    dt = await server.nodes.enum_data_type.add_data_type(0, "MyValuesEnum")
501
    v1 = ua.EnumValueType()
502
    v1.DisplayName.Text = "v1"
503
    v1.Value = 2
504
    v2 = ua.EnumValueType()
505
    v2.DisplayName.Text = "v2"
506
    v2.Value = 3
507
    v3 = ua.EnumValueType()
508
    v3.DisplayName.Text = "v 3 "
509
    v3.Value = 4
510
    await dt.add_variable(0, "EnumValues", [v1, v2, v3])
511
    await server.load_enums()
512
    e = getattr(ua, "MyValuesEnum")
513
    assert isinstance(e, EnumMeta)
514
    assert hasattr(e, "v1")
515
    assert hasattr(e, "v3")
516
    assert 4 == getattr(e, "v3")
517
518
519
async def check_eventgenerator_source_server(evgen, server: Server):
520
    server_node = server.get_server_node()
521
    assert evgen.event.SourceName == (await server_node.get_browse_name()).Name
522
    assert evgen.event.SourceNode == ua.NodeId(ua.ObjectIds.Server)
523
    assert await server_node.get_event_notifier() == {ua.EventNotifier.SubscribeToEvents}
524
    refs = await server_node.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward,
525
                                                  ua.NodeClass.ObjectType, False)
526
    assert len(refs) >= 1
527
528
529
async def check_event_generator_object(evgen, obj, emitting_node=None):
530
    assert evgen.event.SourceName == (await obj.get_browse_name()).Name
531
    assert evgen.event.SourceNode == obj.nodeid
532
533
    if not emitting_node:
534
        assert await obj.get_event_notifier() == {ua.EventNotifier.SubscribeToEvents}
535
        refs = await obj.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
536
    else:
537
        assert await emitting_node.get_event_notifier() == {ua.EventNotifier.SubscribeToEvents}
538
        refs = await emitting_node.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
539
540
    assert evgen.event.EventType in [x.nodeid for x in refs]
541
542
543
async def check_eventgenerator_base_event(evgen, server: Server):
544
    # we did not receive event generator
545
    assert evgen is not None
546
    assert evgen.isession is server.iserver.isession
547
    check_base_event(evgen.event)
548
549
550
def check_base_event(ev):
551
    # we did not receive event
552
    assert ev is not None
553
    assert isinstance(ev, BaseEvent)
554
    assert ev.EventType == ua.NodeId(ua.ObjectIds.BaseEventType)
555
    assert ev.Severity == 1
556
557
558
def check_eventgenerator_custom_event(evgen, etype, server: Server):
559
    # we did not receive event generator
560
    assert evgen is not None
561
    assert evgen.isession is server.iserver.isession
562
    check_custom_event(evgen.event, etype)
563
564
565
def check_custom_event(ev, etype):
566
    # we did not receive event
567
    assert ev is not None
568
    assert isinstance(ev, BaseEvent)
569
    assert ev.EventType == etype.nodeid
570
    assert ev.Severity == 1
571
572
573
async def check_custom_type(type, base_type, server: Server):
574
    base = asyncua.Node(server.iserver.isession, ua.NodeId(base_type))
575
    assert type in await base.get_children()
576
    nodes = await type.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse,
577
                                            includesubtypes=True)
578
    assert base == nodes[0]
579
    properties = await type.get_properties()
580
    assert properties is not None
581
    assert len(properties) == 2
582
    assert await type.get_child("2:PropertyNum") in properties
583
    assert (await(await type.get_child("2:PropertyNum")).get_data_value()).Value.VariantType == ua.VariantType.Int32
584
    assert await type.get_child("2:PropertyString") in properties
585
    assert (await(await type.get_child("2:PropertyString")).get_data_value()).Value.VariantType == ua.VariantType.String
586
587
588
"""
589
class TestServerCaching(unittest.TestCase):
590
    def runTest(self):
591
        return # FIXME broken
592
        tmpfile = NamedTemporaryFile()
593
        path = tmpfile.name
594
        tmpfile.close()
595
596
        # create cache file
597
        server = Server(shelffile=path)
598
599
        # modify cache content
600
        id = ua.NodeId(ua.ObjectIds.Server_ServerStatus_SecondsTillShutdown)
601
        s = shelve.open(path, "w", writeback=True)
602
        s[id.to_string()].attributes[ua.AttributeIds.Value].value = ua.DataValue(123)
603
        s.close()
604
605
        # ensure that we are actually loading from the cache
606
        server = Server(shelffile=path)
607
        assert server.get_node(id).get_value(), 123)
608
609
        os.remove(path)
610
611
class TestServerStartError(unittest.TestCase):
612
613
    def test_port_in_use(self):
614
615
        server1 = Server()
616
        server1.set_endpoint('opc.tcp://127.0.0.1:{0:d}'.format(port_num + 1))
617
        server1.start()
618
619
        server2 = Server()
620
        server2.set_endpoint('opc.tcp://127.0.0.1:{0:d}'.format(port_num + 1))
621
        try:
622
            server2.start()
623
        except Exception:
624
            pass
625
626
        server1.stop()
627
        server2.stop()
628
"""
629