Passed
Push — master ( d16ca4...c4429f )
by Olivier
02:27
created

asyncua.server.server.Server.get_server_node()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5
import asyncio
6
import logging
7
from datetime import timedelta, datetime
8
from urllib.parse import urlparse
9
from typing import Coroutine, Optional
10
11
from asyncua import ua
12
from .binary_server_asyncio import BinaryServer
13
from .internal_server import InternalServer
14
from .event_generator import EventGenerator
15
from ..client import Client
16
from ..common.node import Node
17
from ..common.subscription import Subscription
18
from ..common.xmlimporter import XmlImporter
19
from ..common.xmlexporter import XmlExporter
20
from ..common.manage_nodes import delete_nodes
21
from ..common.event_objects import BaseEvent
22
from ..common.shortcuts import Shortcuts
23
from ..common.structures import load_type_definitions, load_enums
24
from ..common.ua_utils import get_nodes_of_namespace
25
26
from ..crypto import security_policies, uacrypto
27
28
_logger = logging.getLogger(__name__)
29
30
31
def _get_node(isession, whatever):
32
    if isinstance(whatever, Node):
33
        return whatever
34
    if isinstance(whatever, ua.NodeId):
35
        return Node(isession, whatever)
36
    return Node(isession, ua.NodeId(whatever))
37
38
39
class Server:
40
    """
41
    High level Server class
42
43
    This class creates an asyncua server with default values
44
45
    Create your own namespace and then populate your server address space
46
    using use the get_root() or get_objects() to get Node objects.
47
    and get_event_object() to fire events.
48
    Then start server. See example_server.py
49
    All methods are threadsafe
50
51
    If you need more flexibility you call directly the Ua Service methods
52
    on the iserver  or iserver.isession object members.
53
54
    During startup the standard address space will be constructed, which may be
55
    time-consuming when running a server on a less powerful device (e.g. a
56
    Raspberry Pi). In order to improve startup performance, a optional path to a
57
    cache file can be passed to the server constructor.
58
    If the parameter is defined, the address space will be loaded from the
59
    cache file or the file will be created if it does not exist yet.
60
    As a result the first startup will be even slower due to the cache file
61
    generation but all further start ups will be significantly faster.
62
    ┌────────┐
63
    │ Server │ ── BinaryServer ── OPCUAProtocol ── UaProcessor
64
    │        │ ── InternalServer ── InternalSession
65
    └────────┘                   ── SubscriptionService
66
67
    :ivar product_uri:
68
    :ivar name:
69
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
70
    :ivar iserver: `InternalServer` instance
71
    :ivar bserver: binary protocol server `BinaryServer`
72
    :ivar nodes: shortcuts to common nodes - `Shortcuts` instance
73
    """
74
75
    def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
76
        self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
77
        _logger = logging.getLogger(__name__)
78
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
79
        self._application_uri = "urn:freeopcua:python:server"
80
        self.product_uri = "urn:freeopcua.github.io:python:server"
81
        self.name: str = "FreeOpcUa Python Server"
82
        self.manufacturer_name = "FreeOpcUa"
83
        self.application_type = ua.ApplicationType.ClientAndServer
84
        self.default_timeout: int = 60 * 60 * 1000
85
        self.iserver = iserver if iserver else InternalServer(self.loop)
86
        self.bserver: Optional[BinaryServer] = None
87
        self._discovery_clients = {}
88
        self._discovery_period = 60
89
        self._policies = []
90
        self.nodes = Shortcuts(self.iserver.isession)
91
        # enable all endpoints by default
92
        self._security_policy = [
93
            ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
94
            ua.SecurityPolicyType.Basic256Sha256_Sign
95
        ]
96
        self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
97
        self.certificate = None
98
99
    async def init(self, shelf_file=None):
100
        await self.iserver.init(shelf_file)
101
        # setup some expected values
102
        await self.set_application_uri(self._application_uri)
103
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
104
        await sa_node.write_value([self._application_uri])
105
106
        await self.set_build_info(self.product_uri, self.manufacturer_name, self.name, "1.0pre", "0", datetime.now())
107
108
    async def set_build_info(self, product_uri, manufacturer_name, product_name, software_version, build_number, build_date):
109
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
110
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
111
112
        status = await status_node.read_value()
113
        if status is None:
114
            # first time
115
            status = ua.ServerStatusDataType()
116
            status.SecondsTillShutdown = 0
117
118
        status.BuildInfo.ProductUri = product_uri
119
        status.BuildInfo.ManufacturerName = manufacturer_name
120
        status.BuildInfo.ProductName = product_name
121
        status.BuildInfo.SoftwareVersion = software_version
122
        status.BuildInfo.BuildNumber = build_number
123
        status.BuildInfo.BuildDate = build_date
124
125
        await status_node.write_value(status)
126
        await build_node.write_value(status.BuildInfo)
127
128
        # we also need to update all individual nodes :/
129
        product_uri_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ProductUri))
130
        product_name_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ProductName))
131
        product_manufacturer_name_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ManufacturerName))
132
        product_software_version_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_SoftwareVersion))
133
        product_build_number_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_BuildNumber))
134
        product_build_date_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_BuildDate))
135
136
        await product_uri_node.write_value(status.BuildInfo.ProductUri)
137
        await product_name_node.write_value(status.BuildInfo.ProductName)
138
        await product_manufacturer_name_node.write_value(status.BuildInfo.ManufacturerName)
139
        await product_software_version_node.write_value(status.BuildInfo.SoftwareVersion)
140
        await product_build_number_node.write_value(status.BuildInfo.BuildNumber)
141
        await product_build_date_node.write_value(status.BuildInfo.BuildDate)
142
143
    async def __aenter__(self):
144
        await self.start()
145
146
    async def __aexit__(self, exc_type, exc_value, traceback):
147
        await self.stop()
148
149
    def __str__(self):
150
        return f"OPC UA Server({self.endpoint.geturl()})"
151
    __repr__ = __str__
152
153
    async def load_certificate(self, path: str, format: str =None):
154
        """
155
        load server certificate from file, either pem or der
156
        """
157
        self.certificate = await uacrypto.load_certificate(path, format)
158
159
    async def load_private_key(self, path, password=None, format=None):
160
        self.iserver.private_key = await uacrypto.load_private_key(path, password, format)
161
162
    def disable_clock(self, val: bool = True):
163
        """
164
        for debugging you may want to disable clock that write every second
165
        to address space
166
        """
167
        self.iserver.disabled_clock = val
168
169
    def get_application_uri(self):
170
        return self._application_uri
171
172
    async def set_application_uri(self, uri: str):
173
        """
174
        Set application/server URI.
175
        This uri is supposed to be unique. If you intent to register
176
        your server to a discovery server, it really should be unique in
177
        your system!
178
        default is : "urn:freeopcua:python:server"
179
        """
180
        self._application_uri = uri
181
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
182
        uries = await ns_node.read_value()
183
        if len(uries) > 1:
184
            uries[1] = uri  # application uri is always namespace 1
185
        else:
186
            uries.append(uri)
187
        await ns_node.write_value(uries)
188
189
    async def find_servers(self, uris=None):
190
        """
191
        find_servers. mainly implemented for symmetry with client
192
        """
193
        if uris is None:
194
            uris = []
195
        params = ua.FindServersParameters()
196
        params.EndpointUrl = self.endpoint.geturl()
197
        params.ServerUris = uris
198
        return self.iserver.find_servers(params)
199
200
    async def register_to_discovery(self, url: str = "opc.tcp://localhost:4840", period: int = 60):
201
        """
202
        Register to an OPC-UA Discovery server. Registering must be renewed at
203
        least every 10 minutes, so this method will use our asyncio thread to
204
        re-register every period seconds
205
        if period is 0 registration is not automatically renewed
206
        """
207
        # FIXME: have a period per discovery
208
        if url in self._discovery_clients:
209
            await self._discovery_clients[url].disconnect()
210
        self._discovery_clients[url] = Client(url)
211
        await self._discovery_clients[url].connect()
212
        await self._discovery_clients[url].register_server(self)
213
        self._discovery_period = period
214
        if period:
215
            self.loop.call_soon(self._schedule_renew_registration)
216
217
    def unregister_to_discovery(self, url: str = "opc.tcp://localhost:4840"):
218
        """
219
        stop registration thread
220
        """
221
        # FIXME: is there really no way to deregister?
222
        self._discovery_clients[url].disconnect()
223
224
    def _schedule_renew_registration(self):
225
        self.loop.create_task(self._renew_registration())
226
        self.loop.call_later(self._discovery_period, self._schedule_renew_registration)
227
228
    async def _renew_registration(self):
229
        for client in self._discovery_clients.values():
230
            await client.register_server(self)
231
232
    def allow_remote_admin(self, allow):
233
        """
234
        Enable or disable the builtin Admin user from network clients
235
        """
236
        self.iserver.allow_remote_admin = allow
237
238
    def set_endpoint(self, url):
239
        self.endpoint = urlparse(url)
240
241
    def get_endpoints(self) -> Coroutine:
242
        return self.iserver.get_endpoints()
243
244
    def set_security_policy(self, security_policy):
245
        """
246
        Method setting up the security policies for connections
247
        to the server, where security_policy is a list of integers.
248
        During server initialization, all endpoints are enabled:
249
250
                security_policy = [
251
                            ua.SecurityPolicyType.NoSecurity,
252
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
253
                            ua.SecurityPolicyType.Basic256Sha256_Sign
254
                                ]
255
256
        E.g. to limit the number of endpoints and disable no encryption:
257
258
                set_security_policy([
259
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
260
261
        """
262
        self._security_policy = security_policy
263
264
    def set_security_IDs(self, policy_ids):
265
        """
266
            Method setting up the security endpoints for identification
267
            of clients. During server object initialization, all possible
268
            endpoints are enabled:
269
270
            self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
271
272
            E.g. to limit the number of IDs and disable anonymous clients:
273
274
                set_security_IDs(["Basic256Sha256"])
275
276
        (Implementation for ID check is currently not finalized...)
277
        """
278
        self._policyIDs = policy_ids
279
280
    async def _setup_server_nodes(self):
281
        # to be called just before starting server since it needs all parameters to be setup
282
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
283
            self._set_endpoints()
284
            self._policies = [ua.SecurityPolicyFactory()]
285
286
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
287
            if not (self.certificate and self.iserver.private_key):
288
                _logger.warning("Endpoints other than open requested but private key and certificate are not set.")
289
                return
290
291
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
292
                _logger.warning(
293
                    "Creating an open endpoint to the server, although encrypted endpoints are enabled.")
294
295
            if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
296
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
297
                                    ua.MessageSecurityMode.SignAndEncrypt)
298
                self._policies.append(
299
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
300
                                             ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
301
                                             self.iserver.private_key))
302
            if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
303
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
304
                self._policies.append(
305
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
306
                                             ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key))
307
308
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
309
        idtokens = []
310
        if "Anonymous" in self._policyIDs:
311
            idtoken = ua.UserTokenPolicy()
312
            idtoken.PolicyId = "anonymous"
313
            idtoken.TokenType = ua.UserTokenType.Anonymous
314
            idtokens.append(idtoken)
315
316
        if "Basic256Sha256" in self._policyIDs:
317
            idtoken = ua.UserTokenPolicy()
318
            idtoken.PolicyId = 'certificate_basic256sha256'
319
            idtoken.TokenType = ua.UserTokenType.Certificate
320
            idtokens.append(idtoken)
321
322
        if "Username" in self._policyIDs:
323
            idtoken = ua.UserTokenPolicy()
324
            idtoken.PolicyId = "username"
325
            idtoken.TokenType = ua.UserTokenType.UserName
326
            idtokens.append(idtoken)
327
328
        appdesc = ua.ApplicationDescription()
329
        appdesc.ApplicationName = ua.LocalizedText(self.name)
330
        appdesc.ApplicationUri = self._application_uri
331
        appdesc.ApplicationType = self.application_type
332
        appdesc.ProductUri = self.product_uri
333
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
334
335
        edp = ua.EndpointDescription()
336
        edp.EndpointUrl = self.endpoint.geturl()
337
        edp.Server = appdesc
338
        if self.certificate:
339
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
340
        edp.SecurityMode = mode
341
        edp.SecurityPolicyUri = policy.URI
342
        edp.UserIdentityTokens = idtokens
343
        edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
344
        edp.SecurityLevel = 0
345
        self.iserver.add_endpoint(edp)
346
347
    def set_server_name(self, name):
348
        self.name = name
349
350
    async def start(self):
351
        """
352
        Start to listen on network
353
        """
354
        await self._setup_server_nodes()
355
        await self.iserver.start()
356
        try:
357
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
358
            self.bserver.set_policies(self._policies)
359
            await self.bserver.start()
360
        except Exception as exp:
361
            _logger.exception("%s error starting server", self)
362
            await self.iserver.stop()
363
            raise exp
364
        else:
365
            _logger.debug("%s server started", self)
366
367
    async def stop(self):
368
        """
369
        Stop server
370
        """
371
        if self._discovery_clients:
372
            await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
373
        await self.bserver.stop()
374
        await self.iserver.stop()
375
        _logger.debug("%s Internal server stopped, everything closed", self)
376
377
    def get_root_node(self):
378
        """
379
        Get Root node of server. Returns a Node object.
380
        """
381
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
382
383
    def get_objects_node(self):
384
        """
385
        Get Objects node of server. Returns a Node object.
386
        """
387
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
388
389
    def get_node(self, nodeid):
390
        """
391
        Get a specific node using NodeId object or a string representing a NodeId
392
        """
393
        return Node(self.iserver.isession, nodeid)
394
395
    async def create_subscription(self, period, handler):
396
        """
397
        Create a subscription.
398
        Returns a Subscription object which allow to subscribe to events or data changes on server
399
        :param period: Period in milliseconds
400
        :param handler: A class instance - see `SubHandler` base class for details
401
        """
402
        params = ua.CreateSubscriptionParameters()
403
        params.RequestedPublishingInterval = period
404
        params.RequestedLifetimeCount = 3000
405
        params.RequestedMaxKeepAliveCount = 10000
406
        params.MaxNotificationsPerPublish = 0
407
        params.PublishingEnabled = True
408
        params.Priority = 0
409
        subscription = Subscription(self.iserver.isession, params, handler)
410
        await subscription.init()
411
        return subscription
412
413
    async def get_namespace_array(self):
414
        """
415
        get all namespace defined in server
416
        """
417
        return await self.nodes.namespace_array.read_value()
418
419
    async def register_namespace(self, uri) -> int:
420
        """
421
        Register a new namespace. Nodes should in custom namespace, not 0.
422
        """
423
        uries = await self.nodes.namespace_array.read_value()
424
        if uri in uries:
425
            return uries.index(uri)
426
        uries.append(uri)
427
        await self.nodes.namespace_array.write_value(uries)
428
        return len(uries) - 1
429
430
    async def get_namespace_index(self, uri):
431
        """
432
        get index of a namespace using its uri
433
        """
434
        uries = await self.get_namespace_array()
435
        return uries.index(uri)
436
437
    async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
438
        """
439
        Returns an event object using an event type from address space.
440
        Use this object to fire events
441
        """
442
        if not etype:
443
            etype = BaseEvent()
444
        ev_gen = EventGenerator(self.iserver.isession)
445
        await ev_gen.init(etype, emitting_node=emitting_node)
446
        return ev_gen
447
448
    async def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None, description=None) -> Coroutine:
449
        if properties is None:
450
            properties = []
451
        base_t = _get_node(self.iserver.isession, basetype)
452
453
        custom_t = await base_t.add_data_type(idx, name, description)
454
        for prop in properties:
455
            datatype = None
456
            if len(prop) > 2:
457
                datatype = prop[2]
458
            await custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
459
        return custom_t
460
461
    async def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
462
        if properties is None:
463
            properties = []
464
        return await self._create_custom_type(idx, name, basetype, properties, [], [])
465
466
    async def create_custom_object_type(self,
467
                                  idx,
468
                                  name,
469
                                  basetype=ua.ObjectIds.BaseObjectType,
470
                                  properties=None,
471
                                  variables=None,
472
                                  methods=None) -> Coroutine:
473
        if properties is None:
474
            properties = []
475
        if variables is None:
476
            variables = []
477
        if methods is None:
478
            methods = []
479
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
480
481
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
482
    # return self._create_custom_type(idx, name, basetype, properties)
483
484
    async def create_custom_variable_type(self,
485
                                    idx,
486
                                    name,
487
                                    basetype=ua.ObjectIds.BaseVariableType,
488
                                    properties=None,
489
                                    variables=None,
490
                                    methods=None) -> Coroutine:
491
        if properties is None:
492
            properties = []
493
        if variables is None:
494
            variables = []
495
        if methods is None:
496
            methods = []
497
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
498
499
    async def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
500
        base_t = _get_node(self.iserver.isession, basetype)
501
        custom_t = await base_t.add_object_type(idx, name)
502
        for prop in properties:
503
            datatype = None
504
            if len(prop) > 2:
505
                datatype = prop[2]
506
            await custom_t.add_property(
507
                idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
508
        for variable in variables:
509
            datatype = None
510
            if len(variable) > 2:
511
                datatype = variable[2]
512
            await custom_t.add_variable(
513
                idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
514
        for method in methods:
515
            await custom_t.add_method(idx, method[0], method[1], method[2], method[3])
516
        return custom_t
517
518
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
519
        """
520
        Import nodes defined in xml
521
        """
522
        importer = XmlImporter(self)
523
        return importer.import_xml(path, xmlstring)
524
525
    async def export_xml(self, nodes, path):
526
        """
527
        Export defined nodes to xml
528
        """
529
        exp = XmlExporter(self)
530
        await exp.build_etree(nodes)
531
        await exp.write_xml(path)
532
533
    async def export_xml_by_ns(self, path: str, namespaces: list = None):
534
        """
535
        Export nodes of one or more namespaces to an XML file.
536
        Namespaces used by nodes are always exported for consistency.
537
        :param path: name of the xml file to write
538
        :param namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
539
        """
540
        if namespaces is None:
541
            namespaces = []
542
        nodes = await get_nodes_of_namespace(self, namespaces)
543
        await self.export_xml(nodes, path)
544
545
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
546
        return delete_nodes(self.iserver.isession, nodes, recursive)
547
548
    async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
549
        """
550
        Start historizing supplied nodes; see history module
551
        :param node: node or list of nodes that can be historized (variables/properties)
552
        :param period: time delta to store the history; older data will be deleted from the storage
553
        :param count: number of changes to store in the history
554
        """
555
        nodes = node if isinstance(node, (list, tuple)) else [node]
556
        for n in nodes:
557
            await self.iserver.enable_history_data_change(n, period, count)
558
559
    async def dehistorize_node_data_change(self, node):
560
        """
561
        Stop historizing supplied nodes; see history module
562
        :param node: node or list of nodes that can be historized (UA variables/properties)
563
        """
564
        nodes = node if isinstance(node, (list, tuple)) else [node]
565
        for n in nodes:
566
            await self.iserver.disable_history_data_change(n)
567
568
    async def historize_node_event(self, node, period=timedelta(days=7), count: int = 0):
569
        """
570
        Start historizing events from node (typically a UA object); see history module
571
        :param node: node or list of nodes that can be historized (UA objects)
572
        :param period: time delta to store the history; older data will be deleted from the storage
573
        :param count: number of events to store in the history
574
        """
575
        nodes = node if isinstance(node, (list, tuple)) else [node]
576
        for n in nodes:
577
            await self.iserver.enable_history_event(n, period, count)
578
579
    async def dehistorize_node_event(self, node):
580
        """
581
        Stop historizing events from node (typically a UA object); see history module
582
        :param node: node or list of nodes that can be historized (UA objects)
583
        """
584
        nodes = node if isinstance(node, (list, tuple)) else [node]
585
        for n in nodes:
586
            await self.iserver.disable_history_event(n)
587
588
    def subscribe_server_callback(self, event, handle):
589
        self.iserver.subscribe_server_callback(event, handle)
590
591
    def unsubscribe_server_callback(self, event, handle):
592
        self.iserver.unsubscribe_server_callback(event, handle)
593
594
    def link_method(self, node, callback):
595
        """
596
        Link a python function to a UA method in the address space; required when a UA method has been imported
597
        to the address space via XML; the python executable must be linked manually
598
        :param node: UA method node
599
        :param callback: python function that the UA method will call
600
        """
601
        self.iserver.isession.add_method_callback(node.nodeid, callback)
602
603
    def load_type_definitions(self, nodes=None) -> Coroutine:
604
        """
605
        load custom structures from our server.
606
        Server side this can be used to create python objects from custom structures
607
        imported through xml into server
608
        """
609
        return load_type_definitions(self, nodes)
610
611
    def load_enums(self) -> Coroutine:
612
        """
613
        load UA structures and generate python Enums in ua module for custom enums in server
614
        """
615
        return load_enums(self)
616
617
    def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
618
        """
619
        directly write datavalue to the Attribute, bypasing some checks and structure creation
620
        so it is a little faster
621
        """
622
        return self.iserver.write_attribute_value(nodeid, datavalue, attr)
623