Passed
Pull Request — master (#184)
by Olivier
02:29
created

Server.write_attribute_value()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5
import asyncio
6
import logging
7
from datetime import timedelta, datetime
8
from urllib.parse import urlparse
9
from typing import Coroutine, Optional
10
11
from asyncua import ua
12
from .binary_server_asyncio import BinaryServer
13
from .internal_server import InternalServer
14
from .event_generator import EventGenerator
15
from ..client import Client
16
from ..common.node import Node
17
from ..common.subscription import Subscription
18
from ..common.xmlimporter import XmlImporter
19
from ..common.xmlexporter import XmlExporter
20
from ..common.manage_nodes import delete_nodes
21
from ..common.event_objects import BaseEvent
22
from ..common.shortcuts import Shortcuts
23
from ..common.structures import load_type_definitions, load_enums
24
from ..common.ua_utils import get_nodes_of_namespace
25
26
from ..crypto import security_policies, uacrypto
27
28
_logger = logging.getLogger(__name__)
29
30
31
def _get_node(isession, whatever):
32
    if isinstance(whatever, Node):
33
        return whatever
34
    if isinstance(whatever, ua.NodeId):
35
        return Node(isession, whatever)
36
    return Node(isession, ua.NodeId(whatever))
37
38
39
class Server:
40
    """
41
    High level Server class
42
43
    This class creates an asyncua server with default values
44
45
    Create your own namespace and then populate your server address space
46
    using use the get_root() or get_objects() to get Node objects.
47
    and get_event_object() to fire events.
48
    Then start server. See example_server.py
49
    All methods are threadsafe
50
51
    If you need more flexibility you call directly the Ua Service methods
52
    on the iserver  or iserver.isession object members.
53
54
    During startup the standard address space will be constructed, which may be
55
    time-consuming when running a server on a less powerful device (e.g. a
56
    Raspberry Pi). In order to improve startup performance, a optional path to a
57
    cache file can be passed to the server constructor.
58
    If the parameter is defined, the address space will be loaded from the
59
    cache file or the file will be created if it does not exist yet.
60
    As a result the first startup will be even slower due to the cache file
61
    generation but all further start ups will be significantly faster.
62
    ┌────────┐
63
    │ Server │ ── BinaryServer ── OPCUAProtocol ── UaProcessor
64
    │        │ ── InternalServer ── InternalSession
65
    └────────┘                   ── SubscriptionService
66
67
    :ivar product_uri:
68
    :ivar name:
69
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
70
    :ivar iserver: `InternalServer` instance
71
    :ivar bserver: binary protocol server `BinaryServer`
72
    :ivar nodes: shortcuts to common nodes - `Shortcuts` instance
73
    """
74
75
    def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
76
        self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
77
        _logger = logging.getLogger(__name__)
78
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
79
        self._application_uri = "urn:freeopcua:python:server"
80
        self.product_uri = "urn:freeopcua.github.io:python:server"
81
        self.name: str = "FreeOpcUa Python Server"
82
        self.manufacturer_name = "FreeOpcUa"
83
        self.application_type = ua.ApplicationType.ClientAndServer
84
        self.default_timeout: int = 60 * 60 * 1000
85
        self.iserver = iserver if iserver else InternalServer(self.loop)
86
        self.bserver: Optional[BinaryServer] = None
87
        self._discovery_clients = {}
88
        self._discovery_period = 60
89
        self._policies = []
90
        self.nodes = Shortcuts(self.iserver.isession)
91
        # enable all endpoints by default
92
        self._security_policy = [
93
            ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
94
            ua.SecurityPolicyType.Basic256Sha256_Sign
95
        ]
96
        self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
97
        self.certificate = None
98
99
    async def init(self, shelf_file=None):
100
        await self.iserver.init(shelf_file)
101
        # setup some expected values
102
        await self.set_application_uri(self._application_uri)
103
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
104
        await sa_node.write_value([self._application_uri])
105
106
        await self.set_build_info(self.product_uri, self.manufacturer_name, self.name, "1.0pre", "0", datetime.now())
107
108
    async def set_build_info(self, product_uri, manufacturer_name, product_name, software_version, build_number, build_date):
109
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
110
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
111
112
        status = await status_node.read_value()
113
        if status is None:
114
            # first time
115
            status = ua.ServerStatusDataType()
116
            status.SecondsTillShutdown = 0
117
118
        status.BuildInfo.ProductUri = product_uri
119
        status.BuildInfo.ManufacturerName = manufacturer_name
120
        status.BuildInfo.ProductName = product_name
121
        status.BuildInfo.SoftwareVersion = software_version
122
        status.BuildInfo.BuildNumber = build_number
123
        status.BuildInfo.BuildDate = build_date
124
125
        await status_node.write_value(status)
126
        await build_node.write_value(status.BuildInfo)
127
128
        # we also need to update all individual nodes :/
129
        product_uri_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ProductUri))
130
        product_name_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ProductName))
131
        product_manufacturer_name_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ManufacturerName))
132
        product_software_version_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_SoftwareVersion))
133
        product_build_number_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_BuildNumber))
134
        product_build_date_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_BuildDate))
135
136
        await product_uri_node.write_value(status.BuildInfo.ProductUri)
137
        await product_name_node.write_value(status.BuildInfo.ProductName)
138
        await product_manufacturer_name_node.write_value(status.BuildInfo.ManufacturerName)
139
        await product_software_version_node.write_value(status.BuildInfo.SoftwareVersion)
140
        await product_build_number_node.write_value(status.BuildInfo.BuildNumber)
141
        await product_build_date_node.write_value(status.BuildInfo.BuildDate)
142
143
    async def __aenter__(self):
144
        await self.start()
145
146
    async def __aexit__(self, exc_type, exc_value, traceback):
147
        await self.stop()
148
149
    def __str__(self):
150
        return f"OPC UA Server({self.endpoint.geturl()})"
151
    __repr__ = __str__
152
153
    async def load_certificate(self, path: str, format: str =None):
154
        """
155
        load server certificate from file, either pem or der
156
        """
157
        self.certificate = await uacrypto.load_certificate(path, format)
158
159
    async def load_private_key(self, path, password=None, format=None):
160
        self.iserver.private_key = await uacrypto.load_private_key(path, password, format)
161
162
    def disable_clock(self, val: bool = True):
163
        """
164
        for debugging you may want to disable clock that write every second
165
        to address space
166
        """
167
        self.iserver.disabled_clock = val
168
169
    def get_application_uri(self):
170
        return self._application_uri
171
172
    async def set_application_uri(self, uri: str):
173
        """
174
        Set application/server URI.
175
        This uri is supposed to be unique. If you intent to register
176
        your server to a discovery server, it really should be unique in
177
        your system!
178
        default is : "urn:freeopcua:python:server"
179
        """
180
        self._application_uri = uri
181
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
182
        uries = await ns_node.read_value()
183
        if len(uries) > 1:
184
            uries[1] = uri  # application uri is always namespace 1
185
        else:
186
            uries.append(uri)
187
        await ns_node.write_value(uries)
188
189
    async def find_servers(self, uris=None):
190
        """
191
        find_servers. mainly implemented for symmetry with client
192
        """
193
        if uris is None:
194
            uris = []
195
        params = ua.FindServersParameters()
196
        params.EndpointUrl = self.endpoint.geturl()
197
        params.ServerUris = uris
198
        return self.iserver.find_servers(params)
199
200
    async def register_to_discovery(self, url: str = "opc.tcp://localhost:4840", period: int = 60):
201
        """
202
        Register to an OPC-UA Discovery server. Registering must be renewed at
203
        least every 10 minutes, so this method will use our asyncio thread to
204
        re-register every period seconds
205
        if period is 0 registration is not automatically renewed
206
        """
207
        # FIXME: have a period per discovery
208
        if url in self._discovery_clients:
209
            await self._discovery_clients[url].disconnect()
210
        self._discovery_clients[url] = Client(url)
211
        await self._discovery_clients[url].connect()
212
        await self._discovery_clients[url].register_server(self)
213
        self._discovery_period = period
214
        if period:
215
            self.loop.call_soon(self._schedule_renew_registration)
216
217
    def unregister_to_discovery(self, url: str = "opc.tcp://localhost:4840"):
218
        """
219
        stop registration thread
220
        """
221
        # FIXME: is there really no way to deregister?
222
        self._discovery_clients[url].disconnect()
223
224
    def _schedule_renew_registration(self):
225
        self.loop.create_task(self._renew_registration())
226
        self.loop.call_later(self._discovery_period, self._schedule_renew_registration)
227
228
    async def _renew_registration(self):
229
        for client in self._discovery_clients.values():
230
            await client.register_server(self)
231
232
    def allow_remote_admin(self, allow):
233
        """
234
        Enable or disable the builtin Admin user from network clients
235
        """
236
        self.iserver.allow_remote_admin = allow
237
238
    def set_endpoint(self, url):
239
        self.endpoint = urlparse(url)
240
241
    def get_endpoints(self) -> Coroutine:
242
        return self.iserver.get_endpoints()
243
244
    def set_security_policy(self, security_policy):
245
        """
246
        Method setting up the security policies for connections
247
        to the server, where security_policy is a list of integers.
248
        During server initialization, all endpoints are enabled:
249
250
                security_policy = [
251
                            ua.SecurityPolicyType.NoSecurity,
252
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
253
                            ua.SecurityPolicyType.Basic256Sha256_Sign
254
                                ]
255
256
        E.g. to limit the number of endpoints and disable no encryption:
257
258
                set_security_policy([
259
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
260
261
        """
262
        self._security_policy = security_policy
263
264
    def set_security_IDs(self, policy_ids):
265
        """
266
            Method setting up the security endpoints for identification
267
            of clients. During server object initialization, all possible
268
            endpoints are enabled:
269
270
            self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
271
272
            E.g. to limit the number of IDs and disable anonymous clients:
273
274
                set_security_IDs(["Basic256Sha256"])
275
276
        (Implementation for ID check is currently not finalized...)
277
        """
278
        self._policyIDs = policy_ids
279
280
    async def _setup_server_nodes(self):
281
        # to be called just before starting server since it needs all parameters to be setup
282
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
283
            self._set_endpoints()
284
            self._policies = [ua.SecurityPolicyFactory()]
285
286
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
287
            if not (self.certificate and self.iserver.private_key):
288
                _logger.warning("Endpoints other than open requested but private key and certificate are not set.")
289
                return
290
291
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
292
                _logger.warning(
293
                    "Creating an open endpoint to the server, although encrypted endpoints are enabled.")
294
295
            if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
296
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
297
                                    ua.MessageSecurityMode.SignAndEncrypt)
298
                self._policies.append(
299
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
300
                                             ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
301
                                             self.iserver.private_key))
302
            if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
303
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
304
                self._policies.append(
305
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
306
                                             ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key))
307
308
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
309
        idtokens = []
310
        if "Anonymous" in self._policyIDs:
311
            idtoken = ua.UserTokenPolicy()
312
            idtoken.PolicyId = "anonymous"
313
            idtoken.TokenType = ua.UserTokenType.Anonymous
314
            idtokens.append(idtoken)
315
316
        if "Basic256Sha256" in self._policyIDs:
317
            idtoken = ua.UserTokenPolicy()
318
            idtoken.PolicyId = 'certificate_basic256sha256'
319
            idtoken.TokenType = ua.UserTokenType.Certificate
320
            idtokens.append(idtoken)
321
322
        if "Username" in self._policyIDs:
323
            idtoken = ua.UserTokenPolicy()
324
            idtoken.PolicyId = "username"
325
            idtoken.TokenType = ua.UserTokenType.UserName
326
            idtokens.append(idtoken)
327
328
        appdesc = ua.ApplicationDescription()
329
        appdesc.ApplicationName = ua.LocalizedText(self.name)
330
        appdesc.ApplicationUri = self._application_uri
331
        appdesc.ApplicationType = self.application_type
332
        appdesc.ProductUri = self.product_uri
333
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
334
335
        edp = ua.EndpointDescription()
336
        edp.EndpointUrl = self.endpoint.geturl()
337
        edp.Server = appdesc
338
        if self.certificate:
339
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
340
        edp.SecurityMode = mode
341
        edp.SecurityPolicyUri = policy.URI
342
        edp.UserIdentityTokens = idtokens
343
        edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
344
        edp.SecurityLevel = 0
345
        self.iserver.add_endpoint(edp)
346
347
    def set_server_name(self, name):
348
        self.name = name
349
350
    async def start(self):
351
        """
352
        Start to listen on network
353
        """
354
        await self._setup_server_nodes()
355
        await self.iserver.start()
356
        try:
357
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
358
            self.bserver.set_policies(self._policies)
359
            await self.bserver.start()
360
        except Exception as exp:
361
            _logger.exception("%s error starting server", self)
362
            await self.iserver.stop()
363
            raise exp
364
        else:
365
            _logger.debug("%s server started", self)
366
367
    async def stop(self):
368
        """
369
        Stop server
370
        """
371
        if self._discovery_clients:
372
            await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
373
        await self.bserver.stop()
374
        await self.iserver.stop()
375
        _logger.debug("%s Internal server stopped, everything closed", self)
376
377
    def get_root_node(self):
378
        """
379
        Get Root node of server. Returns a Node object.
380
        """
381
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
382
383
    def get_objects_node(self):
384
        """
385
        Get Objects node of server. Returns a Node object.
386
        """
387
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
388
389
    def get_server_node(self):
390
        """
391
        Get Server node of server. Returns a Node object.
392
        """
393
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
394
395
    def get_node(self, nodeid):
396
        """
397
        Get a specific node using NodeId object or a string representing a NodeId
398
        """
399
        return Node(self.iserver.isession, nodeid)
400
401
    async def create_subscription(self, period, handler):
402
        """
403
        Create a subscription.
404
        Returns a Subscription object which allow to subscribe to events or data changes on server
405
        :param period: Period in milliseconds
406
        :param handler: A class instance - see `SubHandler` base class for details
407
        """
408
        params = ua.CreateSubscriptionParameters()
409
        params.RequestedPublishingInterval = period
410
        params.RequestedLifetimeCount = 3000
411
        params.RequestedMaxKeepAliveCount = 10000
412
        params.MaxNotificationsPerPublish = 0
413
        params.PublishingEnabled = True
414
        params.Priority = 0
415
        subscription = Subscription(self.iserver.isession, params, handler)
416
        await subscription.init()
417
        return subscription
418
419
    async def get_namespace_array(self):
420
        """
421
        get all namespace defined in server
422
        """
423
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
424
        return await ns_node.read_value()
425
426 View Code Duplication
    async def register_namespace(self, uri) -> int:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
427
        """
428
        Register a new namespace. Nodes should in custom namespace, not 0.
429
        """
430
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
431
        uries = await ns_node.read_value()
432
        if uri in uries:
433
            return uries.index(uri)
434
        uries.append(uri)
435
        await ns_node.write_value(uries)
436
        return len(uries) - 1
437
438
    async def get_namespace_index(self, uri):
439
        """
440
        get index of a namespace using its uri
441
        """
442
        uries = await self.get_namespace_array()
443
        return uries.index(uri)
444
445
    async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
446
        """
447
        Returns an event object using an event type from address space.
448
        Use this object to fire events
449
        """
450
        if not etype:
451
            etype = BaseEvent()
452
        ev_gen = EventGenerator(self.iserver.isession)
453
        await ev_gen.init(etype, emitting_node=emitting_node)
454
        return ev_gen
455
456
    async def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType, properties=None, description=None) -> Coroutine:
457
        if properties is None:
458
            properties = []
459
        base_t = _get_node(self.iserver.isession, basetype)
460
461
        custom_t = await base_t.add_data_type(idx, name, description)
462
        for prop in properties:
463
            datatype = None
464
            if len(prop) > 2:
465
                datatype = prop[2]
466
            await custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
467
        return custom_t
468
469
    async def create_custom_event_type(self, idx, name, basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
470
        if properties is None:
471
            properties = []
472
        return await self._create_custom_type(idx, name, basetype, properties, [], [])
473
474
    async def create_custom_object_type(self,
475
                                  idx,
476
                                  name,
477
                                  basetype=ua.ObjectIds.BaseObjectType,
478
                                  properties=None,
479
                                  variables=None,
480
                                  methods=None) -> Coroutine:
481
        if properties is None:
482
            properties = []
483
        if variables is None:
484
            variables = []
485
        if methods is None:
486
            methods = []
487
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
488
489
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
490
    # return self._create_custom_type(idx, name, basetype, properties)
491
492
    async def create_custom_variable_type(self,
493
                                    idx,
494
                                    name,
495
                                    basetype=ua.ObjectIds.BaseVariableType,
496
                                    properties=None,
497
                                    variables=None,
498
                                    methods=None) -> Coroutine:
499
        if properties is None:
500
            properties = []
501
        if variables is None:
502
            variables = []
503
        if methods is None:
504
            methods = []
505
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
506
507
    async def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
508
        base_t = _get_node(self.iserver.isession, basetype)
509
        custom_t = await base_t.add_object_type(idx, name)
510
        for prop in properties:
511
            datatype = None
512
            if len(prop) > 2:
513
                datatype = prop[2]
514
            await custom_t.add_property(
515
                idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
516
        for variable in variables:
517
            datatype = None
518
            if len(variable) > 2:
519
                datatype = variable[2]
520
            await custom_t.add_variable(
521
                idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
522
        for method in methods:
523
            await custom_t.add_method(idx, method[0], method[1], method[2], method[3])
524
        return custom_t
525
526
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
527
        """
528
        Import nodes defined in xml
529
        """
530
        importer = XmlImporter(self)
531
        return importer.import_xml(path, xmlstring)
532
533
    async def export_xml(self, nodes, path):
534
        """
535
        Export defined nodes to xml
536
        """
537
        exp = XmlExporter(self)
538
        await exp.build_etree(nodes)
539
        await exp.write_xml(path)
540
541
    async def export_xml_by_ns(self, path: str, namespaces: list = None):
542
        """
543
        Export nodes of one or more namespaces to an XML file.
544
        Namespaces used by nodes are always exported for consistency.
545
        :param path: name of the xml file to write
546
        :param namespaces: list of string uris or int indexes of the namespace to export, if not provide all ns are used except 0
547
        """
548
        if namespaces is None:
549
            namespaces = []
550
        nodes = await get_nodes_of_namespace(self, namespaces)
551
        await self.export_xml(nodes, path)
552
553
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
554
        return delete_nodes(self.iserver.isession, nodes, recursive)
555
556
    async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
557
        """
558
        Start historizing supplied nodes; see history module
559
        :param node: node or list of nodes that can be historized (variables/properties)
560
        :param period: time delta to store the history; older data will be deleted from the storage
561
        :param count: number of changes to store in the history
562
        """
563
        nodes = node if isinstance(node, (list, tuple)) else [node]
564
        for n in nodes:
565
            await self.iserver.enable_history_data_change(n, period, count)
566
567
    async def dehistorize_node_data_change(self, node):
568
        """
569
        Stop historizing supplied nodes; see history module
570
        :param node: node or list of nodes that can be historized (UA variables/properties)
571
        """
572
        nodes = node if isinstance(node, (list, tuple)) else [node]
573
        for n in nodes:
574
            await self.iserver.disable_history_data_change(n)
575
576
    async def historize_node_event(self, node, period=timedelta(days=7), count: int = 0):
577
        """
578
        Start historizing events from node (typically a UA object); see history module
579
        :param node: node or list of nodes that can be historized (UA objects)
580
        :param period: time delta to store the history; older data will be deleted from the storage
581
        :param count: number of events to store in the history
582
        """
583
        nodes = node if isinstance(node, (list, tuple)) else [node]
584
        for n in nodes:
585
            await self.iserver.enable_history_event(n, period, count)
586
587
    async def dehistorize_node_event(self, node):
588
        """
589
        Stop historizing events from node (typically a UA object); see history module
590
        :param node: node or list of nodes that can be historized (UA objects)
591
        """
592
        nodes = node if isinstance(node, (list, tuple)) else [node]
593
        for n in nodes:
594
            await self.iserver.disable_history_event(n)
595
596
    def subscribe_server_callback(self, event, handle):
597
        self.iserver.subscribe_server_callback(event, handle)
598
599
    def unsubscribe_server_callback(self, event, handle):
600
        self.iserver.unsubscribe_server_callback(event, handle)
601
602
    def link_method(self, node, callback):
603
        """
604
        Link a python function to a UA method in the address space; required when a UA method has been imported
605
        to the address space via XML; the python executable must be linked manually
606
        :param node: UA method node
607
        :param callback: python function that the UA method will call
608
        """
609
        self.iserver.isession.add_method_callback(node.nodeid, callback)
610
611
    def load_type_definitions(self, nodes=None) -> Coroutine:
612
        """
613
        load custom structures from our server.
614
        Server side this can be used to create python objects from custom structures
615
        imported through xml into server
616
        """
617
        return load_type_definitions(self, nodes)
618
619
    def load_enums(self) -> Coroutine:
620
        """
621
        load UA structures and generate python Enums in ua module for custom enums in server
622
        """
623
        return load_enums(self)
624
625
    def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
626
        """
627
        directly write datavalue to the Attribute, bypasing some checks and structure creation
628
        so it is a little faster
629
        """
630
        return self.iserver.write_attribute_value(nodeid, datavalue, attr)
631