asyncua.server.server.Server.init()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 2
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
"""
2
High level interface to pure python OPC-UA server
3
"""
4
5
import asyncio
6
import logging
7
from datetime import timedelta, datetime
8
from urllib.parse import urlparse
9
from typing import Coroutine, Optional
10
11
from asyncua import ua
12
from .binary_server_asyncio import BinaryServer
13
from .internal_server import InternalServer
14
from .event_generator import EventGenerator
15
from ..client import Client
16
from ..common.node import Node
17
from ..common.subscription import Subscription
18
from ..common.xmlimporter import XmlImporter
19
from ..common.xmlexporter import XmlExporter
20
from ..common.manage_nodes import delete_nodes
21
from ..common.event_objects import BaseEvent
22
from ..common.shortcuts import Shortcuts
23
from ..common.structures import load_type_definitions, load_enums
24
from ..common.structures104 import load_data_type_definitions
25
from ..common.ua_utils import get_nodes_of_namespace
26
27
from ..crypto import security_policies, uacrypto
28
29
_logger = logging.getLogger(__name__)
30
31
32
def _get_node(isession, whatever):
33
    if isinstance(whatever, Node):
34
        return whatever
35
    if isinstance(whatever, ua.NodeId):
36
        return Node(isession, whatever)
37
    return Node(isession, ua.NodeId(whatever))
38
39
40
class Server:
41
    """
42
    High level Server class
43
44
    This class creates an asyncua server with default values
45
46
    Create your own namespace and then populate your server address space
47
    using use the get_root() or get_objects() to get Node objects.
48
    and get_event_object() to fire events.
49
    Then start server. See example_server.py
50
    All methods are threadsafe
51
52
    If you need more flexibility you call directly the Ua Service methods
53
    on the iserver  or iserver.isession object members.
54
55
    During startup the standard address space will be constructed, which may be
56
    time-consuming when running a server on a less powerful device (e.g. a
57
    Raspberry Pi). In order to improve startup performance, a optional path to a
58
    cache file can be passed to the server constructor.
59
    If the parameter is defined, the address space will be loaded from the
60
    cache file or the file will be created if it does not exist yet.
61
    As a result the first startup will be even slower due to the cache file
62
    generation but all further start ups will be significantly faster.
63
    ┌────────┐
64
    │ Server │ ── BinaryServer ── OPCUAProtocol ── UaProcessor
65
    │        │ ── InternalServer ── InternalSession
66
    └────────┘                   ── SubscriptionService
67
68
    :ivar product_uri:
69
    :ivar name:
70
    :ivar default_timeout: timeout in milliseconds for sessions and secure channel
71
    :ivar iserver: `InternalServer` instance
72
    :ivar bserver: binary protocol server `BinaryServer`
73
    :ivar nodes: shortcuts to common nodes - `Shortcuts` instance
74
    """
75
76
    def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None, user_manager=None):
77
        self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
78
        _logger = logging.getLogger(__name__)
79
        self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
80
        self._application_uri = "urn:freeopcua:python:server"
81
        self.product_uri = "urn:freeopcua.github.io:python:server"
82
        self.name: str = "FreeOpcUa Python Server"
83
        self.manufacturer_name = "FreeOpcUa"
84
        self.application_type = ua.ApplicationType.ClientAndServer
85
        self.default_timeout: int = 60 * 60 * 1000
86
        self.iserver = iserver if iserver else InternalServer(self.loop, user_manager=user_manager)
87
        self.bserver: Optional[BinaryServer] = None
88
        self._discovery_clients = {}
89
        self._discovery_period = 60
90
        self._discovery_handle = None
91
        self._policies = []
92
        self.nodes = Shortcuts(self.iserver.isession)
93
        # enable all endpoints by default
94
        self._security_policy = [
95
            ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
96
            ua.SecurityPolicyType.Basic256Sha256_Sign
97
        ]
98
        # allow all certificates by default
99
        self._permission_ruleset = None
100
        self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
101
        self.certificate = None
102
103
    async def init(self, shelf_file=None):
104
        await self.iserver.init(shelf_file)
105
        # setup some expected values
106
        await self.set_application_uri(self._application_uri)
107
        sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
108
        await sa_node.write_value([self._application_uri])
109
        #TODO: ServiceLevel is 255 default, should be calculated in later Versions
110
        sl_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServiceLevel))
111
        await sl_node.write_value(ua.Variant(255, ua.VariantType.Byte))
112
113
        await self.set_build_info(self.product_uri, self.manufacturer_name, self.name, "1.0pre", "0", datetime.now())
114
115
    async def set_build_info(self, product_uri, manufacturer_name, product_name, software_version,
116
                             build_number, build_date):
117
        status_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus))
118
        build_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo))
119
120
        status = await status_node.read_value()
121
        if status is None:
122
            # first time
123
            status = ua.ServerStatusDataType()
124
            status.SecondsTillShutdown = 0
125
126
        status.BuildInfo.ProductUri = product_uri
127
        status.BuildInfo.ManufacturerName = manufacturer_name
128
        status.BuildInfo.ProductName = product_name
129
        status.BuildInfo.SoftwareVersion = software_version
130
        status.BuildInfo.BuildNumber = build_number
131
        status.BuildInfo.BuildDate = build_date
132
133
        await status_node.write_value(status)
134
        await build_node.write_value(status.BuildInfo)
135
136
        # we also need to update all individual nodes :/
137
        product_uri_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ProductUri))
138
        product_name_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ProductName))
139
        product_manufacturer_name_node = self.get_node(
140
            ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_ManufacturerName))
141
        product_software_version_node = self.get_node(
142
            ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_SoftwareVersion))
143
        product_build_number_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_BuildNumber))
144
        product_build_date_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_BuildInfo_BuildDate))
145
146
        await product_uri_node.write_value(status.BuildInfo.ProductUri)
147
        await product_name_node.write_value(status.BuildInfo.ProductName)
148
        await product_manufacturer_name_node.write_value(status.BuildInfo.ManufacturerName)
149
        await product_software_version_node.write_value(status.BuildInfo.SoftwareVersion)
150
        await product_build_number_node.write_value(status.BuildInfo.BuildNumber)
151
        await product_build_date_node.write_value(status.BuildInfo.BuildDate)
152
153
    async def __aenter__(self):
154
        await self.start()
155
156
    async def __aexit__(self, exc_type, exc_value, traceback):
157
        await self.stop()
158
159
    def __str__(self):
160
        return f"OPC UA Server({self.endpoint.geturl()})"
161
    __repr__ = __str__
162
163
    async def load_certificate(self, path: str, format: str = None):
164
        """
165
        load server certificate from file, either pem or der
166
        """
167
        self.certificate = await uacrypto.load_certificate(path, format)
168
169
    async def load_private_key(self, path, password=None, format=None):
170
        self.iserver.private_key = await uacrypto.load_private_key(path, password, format)
171
172
    def disable_clock(self, val: bool = True):
173
        """
174
        for debugging you may want to disable clock that write every second
175
        to address space
176
        """
177
        self.iserver.disabled_clock = val
178
179
    def get_application_uri(self):
180
        return self._application_uri
181
182
    async def set_application_uri(self, uri: str):
183
        """
184
        Set application/server URI.
185
        This uri is supposed to be unique. If you intent to register
186
        your server to a discovery server, it really should be unique in
187
        your system!
188
        default is : "urn:freeopcua:python:server"
189
        """
190
        self._application_uri = uri
191
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
192
        uries = await ns_node.read_value()
193
        if len(uries) > 1:
194
            uries[1] = uri  # application uri is always namespace 1
195
        else:
196
            uries.append(uri)
197
        await ns_node.write_value(uries)
198
199
    async def find_servers(self, uris=None):
200
        """
201
        find_servers. mainly implemented for symmetry with client
202
        """
203
        if uris is None:
204
            uris = []
205
        params = ua.FindServersParameters()
206
        params.EndpointUrl = self.endpoint.geturl()
207
        params.ServerUris = uris
208
        return self.iserver.find_servers(params)
209
210
    async def register_to_discovery(self, url: str = "opc.tcp://localhost:4840", period: int = 60):
211
        """
212
        Register to an OPC-UA Discovery server. Registering must be renewed at
213
        least every 10 minutes, so this method will use our asyncio thread to
214
        re-register every period seconds
215
        if period is 0 registration is not automatically renewed
216
        """
217
        # FIXME: have a period per discovery
218
        if url in self._discovery_clients:
219
            await self._discovery_clients[url].disconnect()
220
        self._discovery_clients[url] = Client(url)
221
        await self._discovery_clients[url].connect()
222
        await self._discovery_clients[url].register_server(self)
223
        self._discovery_period = period
224
        if period:
225
            self.loop.call_soon(self._schedule_renew_registration)
226
227
    async def unregister_to_discovery(self, url: str = "opc.tcp://localhost:4840"):
228
        """
229
        stop registration thread
230
        """
231
        # FIXME: is there really no way to deregister?
232
        await self._discovery_clients[url].disconnect()
233
        del self._discovery_clients[url]
234
        if not self._discovery_clients and self._discovery_handle:
235
            self._discovery_handle.cancel()
236
237
    def _schedule_renew_registration(self):
238
        self.loop.create_task(self._renew_registration())
239
        self._discovery_handle = self.loop.call_later(self._discovery_period, self._schedule_renew_registration)
240
241
    async def _renew_registration(self):
242
        for client in self._discovery_clients.values():
243
            await client.register_server(self)
244
245
    def allow_remote_admin(self, allow):
246
        """
247
        Enable or disable the builtin Admin user from network clients
248
        """
249
        self.iserver.allow_remote_admin = allow
250
251
    def set_endpoint(self, url):
252
        self.endpoint = urlparse(url)
253
254
    async def get_endpoints(self) -> Coroutine:
255
        return await self.iserver.get_endpoints()
256
257
    def set_security_policy(self, security_policy, permission_ruleset=None):
258
        """
259
        Method setting up the security policies for connections
260
        to the server, where security_policy is a list of integers.
261
        During server initialization, all endpoints are enabled:
262
263
                security_policy = [
264
                            ua.SecurityPolicyType.NoSecurity,
265
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
266
                            ua.SecurityPolicyType.Basic256Sha256_Sign
267
                                ]
268
269
        E.g. to limit the number of endpoints and disable no encryption:
270
271
                set_security_policy([
272
                            ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
273
274
        """
275
        self._security_policy = security_policy
276
        self._permission_ruleset = permission_ruleset
277
278
    def set_security_IDs(self, policy_ids):
279
        """
280
            Method setting up the security endpoints for identification
281
            of clients. During server object initialization, all possible
282
            endpoints are enabled:
283
284
            self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
285
286
            E.g. to limit the number of IDs and disable anonymous clients:
287
288
                set_security_IDs(["Basic256Sha256"])
289
290
        (Implementation for ID check is currently not finalized...)
291
        """
292
        self._policyIDs = policy_ids
293
294
    async def _setup_server_nodes(self):
295
        # to be called just before starting server since it needs all parameters to be setup
296
        if ua.SecurityPolicyType.NoSecurity in self._security_policy:
297
            self._set_endpoints()
298
            self._policies = [ua.SecurityPolicyFactory(permission_ruleset=self._permission_ruleset)]
299
300
        if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
301
            if not (self.certificate and self.iserver.private_key):
302
                _logger.warning("Endpoints other than open requested but private key and certificate are not set.")
303
                return
304
305
            if ua.SecurityPolicyType.NoSecurity in self._security_policy:
306
                _logger.warning(
307
                    "Creating an open endpoint to the server, although encrypted endpoints are enabled.")
308
309
            if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
310
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
311
                                    ua.MessageSecurityMode.SignAndEncrypt)
312
                self._policies.append(
313
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
314
                                             ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
315
                                             self.iserver.private_key,
316
                                             permission_ruleset=self._permission_ruleset))
317
            if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
318
                self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
319
                self._policies.append(
320
                    ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
321
                                             ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key,
322
                                             permission_ruleset=self._permission_ruleset))
323
324
    def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
325
        idtokens = []
326
        if "Anonymous" in self._policyIDs:
327
            idtoken = ua.UserTokenPolicy()
328
            idtoken.PolicyId = "anonymous"
329
            idtoken.TokenType = ua.UserTokenType.Anonymous
330
            idtokens.append(idtoken)
331
332
        if "Basic256Sha256" in self._policyIDs:
333
            idtoken = ua.UserTokenPolicy()
334
            idtoken.PolicyId = 'certificate_basic256sha256'
335
            idtoken.TokenType = ua.UserTokenType.Certificate
336
            idtokens.append(idtoken)
337
338
        if "Username" in self._policyIDs:
339
            idtoken = ua.UserTokenPolicy()
340
            idtoken.PolicyId = "username"
341
            idtoken.TokenType = ua.UserTokenType.UserName
342
            idtokens.append(idtoken)
343
344
        appdesc = ua.ApplicationDescription()
345
        appdesc.ApplicationName = ua.LocalizedText(self.name)
346
        appdesc.ApplicationUri = self._application_uri
347
        appdesc.ApplicationType = self.application_type
348
        appdesc.ProductUri = self.product_uri
349
        appdesc.DiscoveryUrls.append(self.endpoint.geturl())
350
351
        edp = ua.EndpointDescription()
352
        edp.EndpointUrl = self.endpoint.geturl()
353
        edp.Server = appdesc
354
        if self.certificate:
355
            edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
356
        edp.SecurityMode = mode
357
        edp.SecurityPolicyUri = policy.URI
358
        edp.UserIdentityTokens = idtokens
359
        edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
360
        edp.SecurityLevel = 0
361
        self.iserver.add_endpoint(edp)
362
363
    def set_server_name(self, name):
364
        self.name = name
365
366
    async def start(self):
367
        """
368
        Start to listen on network
369
        """
370
        await self._setup_server_nodes()
371
        await self.iserver.start()
372
        try:
373
            self.bserver = BinaryServer(self.iserver, self.endpoint.hostname, self.endpoint.port)
374
            self.bserver.set_policies(self._policies)
375
            await self.bserver.start()
376
        except Exception as exp:
377
            _logger.exception("%s error starting server", self)
378
            await self.iserver.stop()
379
            raise exp
380
        else:
381
            _logger.debug("%s server started", self)
382
383
    async def stop(self):
384
        """
385
        Stop server
386
        """
387
        if self._discovery_handle:
388
            self._discovery_handle.cancel()
389
        if self._discovery_clients:
390
            await asyncio.wait([client.disconnect() for client in self._discovery_clients.values()])
391
        await self.bserver.stop()
392
        await self.iserver.stop()
393
        _logger.debug("%s Internal server stopped, everything closed", self)
394
395
    def get_root_node(self):
396
        """
397
        Get Root node of server. Returns a Node object.
398
        """
399
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
400
401
    def get_objects_node(self):
402
        """
403
        Get Objects node of server. Returns a Node object.
404
        """
405
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
406
407
    def get_node(self, nodeid):
408
        """
409
        Get a specific node using NodeId object or a string representing a NodeId
410
        """
411
        return Node(self.iserver.isession, nodeid)
412
413
    async def create_subscription(self, period, handler):
414
        """
415
        Create a subscription.
416
        Returns a Subscription object which allow to subscribe to events or data changes on server
417
        :param period: Period in milliseconds
418
        :param handler: A class instance - see `SubHandler` base class for details
419
        """
420
        params = ua.CreateSubscriptionParameters()
421
        params.RequestedPublishingInterval = period
422
        params.RequestedLifetimeCount = 3000
423
        params.RequestedMaxKeepAliveCount = 10000
424
        params.MaxNotificationsPerPublish = 0
425
        params.PublishingEnabled = True
426
        params.Priority = 0
427
        subscription = Subscription(self.iserver.isession, params, handler)
428
        await subscription.init()
429
        return subscription
430
431
    async def get_namespace_array(self):
432
        """
433
        get all namespace defined in server
434
        """
435
        return await self.nodes.namespace_array.read_value()
436
437
    async def register_namespace(self, uri) -> int:
438
        """
439
        Register a new namespace. Nodes should in custom namespace, not 0.
440
        """
441
        uries = await self.nodes.namespace_array.read_value()
442
        if uri in uries:
443
            return uries.index(uri)
444
        uries.append(uri)
445
        await self.nodes.namespace_array.write_value(uries)
446
        return len(uries) - 1
447
448
    async def get_namespace_index(self, uri):
449
        """
450
        get index of a namespace using its uri
451
        """
452
        uries = await self.get_namespace_array()
453
        return uries.index(uri)
454
455
    async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
456
        """
457
        Returns an event object using an event type from address space.
458
        Use this object to fire events
459
        """
460
        if not etype:
461
            etype = BaseEvent()
462
        ev_gen = EventGenerator(self.iserver.isession)
463
        await ev_gen.init(etype, emitting_node=emitting_node)
464
        return ev_gen
465
466
    async def create_custom_data_type(self, idx, name, basetype=ua.ObjectIds.BaseDataType,
467
                                      properties=None, description=None) -> Coroutine:
468
        if properties is None:
469
            properties = []
470
        base_t = _get_node(self.iserver.isession, basetype)
471
472
        custom_t = await base_t.add_data_type(idx, name, description)
473
        for prop in properties:
474
            datatype = None
475
            if len(prop) > 2:
476
                datatype = prop[2]
477
            await custom_t.add_property(idx, prop[0], ua.get_default_value(prop[1]),
478
                                        varianttype=prop[1], datatype=datatype)
479
        return custom_t
480
481
    async def create_custom_event_type(self, idx, name,
482
                                       basetype=ua.ObjectIds.BaseEventType, properties=None) -> Coroutine:
483
        if properties is None:
484
            properties = []
485
        return await self._create_custom_type(idx, name, basetype, properties, [], [])
486
487
    async def create_custom_object_type(self,
488
                                        idx,
489
                                        name,
490
                                        basetype=ua.ObjectIds.BaseObjectType,
491
                                        properties=None,
492
                                        variables=None,
493
                                        methods=None) -> Coroutine:
494
        if properties is None:
495
            properties = []
496
        if variables is None:
497
            variables = []
498
        if methods is None:
499
            methods = []
500
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
501
502
    # def create_custom_reference_type(self, idx, name, basetype=ua.ObjectIds.BaseReferenceType, properties=[]):
503
    # return self._create_custom_type(idx, name, basetype, properties)
504
505
    async def create_custom_variable_type(self,
506
                                          idx,
507
                                          name,
508
                                          basetype=ua.ObjectIds.BaseVariableType,
509
                                          properties=None,
510
                                          variables=None,
511
                                          methods=None) -> Coroutine:
512
        if properties is None:
513
            properties = []
514
        if variables is None:
515
            variables = []
516
        if methods is None:
517
            methods = []
518
        return await self._create_custom_type(idx, name, basetype, properties, variables, methods)
519
520
    async def _create_custom_type(self, idx, name, basetype, properties, variables, methods):
521
        base_t = _get_node(self.iserver.isession, basetype)
522
        custom_t = await base_t.add_object_type(idx, name)
523
        for prop in properties:
524
            datatype = None
525
            if len(prop) > 2:
526
                datatype = prop[2]
527
            await custom_t.add_property(
528
                idx, prop[0], ua.get_default_value(prop[1]), varianttype=prop[1], datatype=datatype)
529
        for variable in variables:
530
            datatype = None
531
            if len(variable) > 2:
532
                datatype = variable[2]
533
            await custom_t.add_variable(
534
                idx, variable[0], ua.get_default_value(variable[1]), varianttype=variable[1], datatype=datatype)
535
        for method in methods:
536
            await custom_t.add_method(idx, method[0], method[1], method[2], method[3])
537
        return custom_t
538
539
    async def import_xml(self, path=None, xmlstring=None) -> Coroutine:
540
        """
541
        Import nodes defined in xml
542
        """
543
        importer = XmlImporter(self)
544
        return await importer.import_xml(path, xmlstring)
545
546
    async def export_xml(self, nodes, path):
547
        """
548
        Export defined nodes to xml
549
        """
550
        exp = XmlExporter(self)
551
        await exp.build_etree(nodes)
552
        await exp.write_xml(path)
553
554
    async def export_xml_by_ns(self, path: str, namespaces: list = None):
555
        """
556
        Export nodes of one or more namespaces to an XML file.
557
        Namespaces used by nodes are always exported for consistency.
558
        :param path: name of the xml file to write
559
        :param namespaces: list of string uris or int indexes of the namespace to export,
560
         if not provide all ns are used except 0
561
        """
562
        if namespaces is None:
563
            namespaces = []
564
        nodes = await get_nodes_of_namespace(self, namespaces)
565
        await self.export_xml(nodes, path)
566
567
    async def delete_nodes(self, nodes, recursive=False) -> Coroutine:
568
        return await delete_nodes(self.iserver.isession, nodes, recursive)
569
570
    async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
571
        """
572
        Start historizing supplied nodes; see history module
573
        :param node: node or list of nodes that can be historized (variables/properties)
574
        :param period: time delta to store the history; older data will be deleted from the storage
575
        :param count: number of changes to store in the history
576
        """
577
        nodes = node if isinstance(node, (list, tuple)) else [node]
578
        for n in nodes:
579
            await self.iserver.enable_history_data_change(n, period, count)
580
581
    async def dehistorize_node_data_change(self, node):
582
        """
583
        Stop historizing supplied nodes; see history module
584
        :param node: node or list of nodes that can be historized (UA variables/properties)
585
        """
586
        nodes = node if isinstance(node, (list, tuple)) else [node]
587
        for n in nodes:
588
            await self.iserver.disable_history_data_change(n)
589
590
    async def historize_node_event(self, node, period=timedelta(days=7), count: int = 0):
591
        """
592
        Start historizing events from node (typically a UA object); see history module
593
        :param node: node or list of nodes that can be historized (UA objects)
594
        :param period: time delta to store the history; older data will be deleted from the storage
595
        :param count: number of events to store in the history
596
        """
597
        nodes = node if isinstance(node, (list, tuple)) else [node]
598
        for n in nodes:
599
            await self.iserver.enable_history_event(n, period, count)
600
601
    async def dehistorize_node_event(self, node):
602
        """
603
        Stop historizing events from node (typically a UA object); see history module
604
        :param node: node or list of nodes that can be historized (UA objects)
605
        """
606
        nodes = node if isinstance(node, (list, tuple)) else [node]
607
        for n in nodes:
608
            await self.iserver.disable_history_event(n)
609
610
    def subscribe_server_callback(self, event, handle):
611
        self.iserver.subscribe_server_callback(event, handle)
612
613
    def unsubscribe_server_callback(self, event, handle):
614
        self.iserver.unsubscribe_server_callback(event, handle)
615
616
    def link_method(self, node, callback):
617
        """
618
        Link a python function to a UA method in the address space; required when a UA method has been imported
619
        to the address space via XML; the python executable must be linked manually
620
        :param node: UA method node
621
        :param callback: python function that the UA method will call
622
        """
623
        self.iserver.isession.add_method_callback(node.nodeid, callback)
624
625
    async def load_type_definitions(self, nodes=None) -> Coroutine:
626
        """
627
        load custom structures from our server.
628
        Server side this can be used to create python objects from custom structures
629
        imported through xml into server
630
        """
631
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
632
        return await load_type_definitions(self, nodes)
633
634
    async def load_data_type_definitions(self, node=None):
635
        """
636
        Load custom types (custom structures/extension objects) definition from server
637
        Generate Python classes for custom structures/extension objects defined in server
638
        These classes will be available in ua module
639
        """
640
        return await load_data_type_definitions(self, node)
641
642
    async def load_enums(self) -> Coroutine:
643
        """
644
        load UA structures and generate python Enums in ua module for custom enums in server
645
        """
646
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
647
        return await load_enums(self)
648
649
    async def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
650
        """
651
        directly write datavalue to the Attribute, bypasing some checks and structure creation
652
        so it is a little faster
653
        """
654
        return await self.iserver.write_attribute_value(nodeid, datavalue, attr)
655