Passed
Pull Request — master (#368)
by Olivier
02:42
created

asyncua.client.client.Client.create_session()   B

Complexity

Conditions 5

Size

Total Lines 42
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 32
nop 1
dl 0
loc 42
rs 8.6453
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from typing import Union, Coroutine, Optional
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient
8
from ..common.xmlimporter import XmlImporter
9
from ..common.xmlexporter import XmlExporter
10
from ..common.node import Node
11
from ..common.manage_nodes import delete_nodes
12
from ..common.subscription import Subscription
13
from ..common.shortcuts import Shortcuts
14
from ..common.structures import load_type_definitions, load_enums
15
from ..common.structures104 import load_data_type_definitions
16
from ..common.utils import create_nonce
17
from ..common.ua_utils import value_to_datavalue
18
from ..crypto import uacrypto, security_policies
19
20
_logger = logging.getLogger(__name__)
21
22
23
class Client:
24
    """
25
    High level client to connect to an OPC-UA server.
26
27
    This class makes it easy to connect and browse address space.
28
    It attempts to expose as much functionality as possible
29
    but if you want more flexibility it is possible and advised to
30
    use UaClient object, available as self.uaclient
31
    which offers the raw OPC-UA services interface.
32
    """
33
    def __init__(self, url: str, timeout: int = 4, loop=None):
34
        """
35
        :param url: url of the server.
36
            if you are unsure of url, write at least hostname
37
            and port and call get_endpoints
38
39
        :param timeout:
40
            Each request sent to the server expects an answer within this
41
            time. The timeout is specified in seconds.
42
43
        Some other client parameters can be changed by setting
44
        attributes on the constructed object:
45
        See the source code for the exhaustive list.
46
        """
47
        self.loop = loop or asyncio.get_event_loop()
48
        self.server_url = urlparse(url)
49
        # take initial username and password from the url
50
        self._username = self.server_url.username
51
        self._password = self.server_url.password
52
        self.name = "Pure Python Async. Client"
53
        self.description = self.name
54
        self.application_uri = "urn:freeopcua:client"
55
        self.product_uri = "urn:freeopcua.github.io:client"
56
        self.security_policy = ua.SecurityPolicy()
57
        self.secure_channel_id = None
58
        self.secure_channel_timeout = 3600000  # 1 hour
59
        self.session_timeout = 3600000  # 1 hour
60
        self._policy_ids = []
61
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
62
        self.user_certificate = None
63
        self.user_private_key = None
64
        self._server_nonce = None
65
        self._session_counter = 1
66
        self.nodes = Shortcuts(self.uaclient)
67
        self.max_messagesize = 0  # No limits
68
        self.max_chunkcount = 0  # No limits
69
        self._renew_channel_task = None
70
71
    async def __aenter__(self):
72
        await self.connect()
73
        return self
74
75
    async def __aexit__(self, exc_type, exc_value, traceback):
76
        await self.disconnect()
77
78
    def __str__(self):
79
        return f"Client({self.server_url.geturl()})"
80
81
    __repr__ = __str__
82
83
    @staticmethod
84
    def find_endpoint(endpoints, security_mode, policy_uri):
85
        """
86
        Find endpoint with required security mode and policy URI
87
        """
88
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
89
        for ep in endpoints:
90
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and ep.SecurityPolicyUri == policy_uri):
91
                return ep
92
        raise ua.UaError(f"No matching endpoints: {security_mode}, {policy_uri}")
93
94
    def set_user(self, username: str):
95
        """
96
        Set user name for the connection.
97
        initial user from the URL will be overwritten
98
        """
99
        self._username = username
100
101
    def set_password(self, pwd: str):
102
        """
103
        Set user password for the connection.
104
        initial password from the URL will be overwritten
105
        """
106
        if not isinstance(pwd, str):
107
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
108
        self._password = pwd
109
110
    async def set_security_string(self, string: str):
111
        """
112
        Set SecureConnection mode.
113
114
        :param string: Mode format ``Policy,Mode,certificate,private_key[,server_private_key]``
115
116
        where:
117
118
        - ``Policy`` is ``Basic128Rsa15``, ``Basic256`` or ``Basic256Sha256``
119
        - ``Mode`` is ``Sign`` or ``SignAndEncrypt``
120
        - ``certificate`` and ``server_private_key`` are paths to ``.pem`` or ``.der`` files
121
        - ``private_key`` may be a path to a ``.pem`` or ``.der`` file or a conjunction of ``path``::``password`` where
122
          ``password`` is the private key password.
123
124
        Call this before connect()
125
        """
126
        if not string:
127
            return
128
        parts = string.split(",")
129
        if len(parts) < 4:
130
            raise ua.UaError(f"Wrong format: `{string}`, expected at least 4 comma-separated values")
131
132
        if '::' in parts[3]:  # if the filename contains a colon, assume it's a conjunction and parse it
133
            parts[3], client_key_password = parts[3].split('::')
134
        else:
135
            client_key_password = None
136
137
        policy_class = getattr(security_policies, f"SecurityPolicy{parts[0]}")
138
        mode = getattr(ua.MessageSecurityMode, parts[1])
139
        return await self.set_security(policy_class, parts[2], parts[3], client_key_password, parts[4] if len(parts) >= 5 else None, mode)
140
141
    async def set_security(
142
        self,
143
        policy: ua.SecurityPolicy,
144
        certificate: Union[str, uacrypto.CertProperties],
145
        private_key: Union[str, uacrypto.CertProperties],
146
        private_key_password: Optional[Union[str, bytes]] = None,
147
        server_certificate: Optional[Union[str, uacrypto.CertProperties]] = None,
148
        mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt,
149
    ):
150
        """
151
        Set SecureConnection mode.
152
        Call this before connect()
153
        """
154
        if server_certificate is None:
155
            # load certificate from server's list of endpoints
156
            endpoints = await self.connect_and_get_server_endpoints()
157
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
158
            server_certificate = uacrypto.x509_from_der(endpoint.ServerCertificate)
159
        elif not isinstance(server_certificate, uacrypto.CertProperties):
160
            server_certificate = uacrypto.CertProperties(server_certificate)
161
        if not isinstance(certificate, uacrypto.CertProperties):
162
            certificate = uacrypto.CertProperties(certificate)
163
        if not isinstance(private_key, uacrypto.CertProperties):
164
            private_key = uacrypto.CertProperties(private_key, password=private_key_password)
165
        return await self._set_security(policy, certificate, private_key, server_certificate, mode)
166
167
    async def _set_security(
168
        self,
169
        policy: ua.SecurityPolicy,
170
        certificate: uacrypto.CertProperties,
171
        private_key: uacrypto.CertProperties,
172
        server_cert: uacrypto.CertProperties,
173
        mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt,
174
    ):
175
176
        if isinstance(server_cert, uacrypto.CertProperties):
177
            server_cert = await uacrypto.load_certificate(server_cert.path, server_cert.extension)
178
        cert = await uacrypto.load_certificate(certificate.path, certificate.extension)
179
        pk = await uacrypto.load_private_key(
180
            private_key.path,
181
            private_key.password,
182
            private_key.extension,
183
        )
184
        self.security_policy = policy(server_cert, cert, pk, mode)
185
        self.uaclient.set_security(self.security_policy)
186
187
    async def load_client_certificate(self, path: str, extension: Optional[str] = None):
188
        """
189
        load our certificate from file, either pem or der
190
        """
191
        self.user_certificate = await uacrypto.load_certificate(path, extension)
192
193
    async def load_private_key(self, path: str, password: Optional[Union[str, bytes]] = None, extension: Optional[str] = None):
194
        """
195
        Load user private key. This is used for authenticating using certificate
196
        """
197
        self.user_private_key = await uacrypto.load_private_key(path, password, extension)
198
199
    async def connect_and_get_server_endpoints(self):
200
        """
201
        Connect, ask server for endpoints, and disconnect
202
        """
203
        await self.connect_socket()
204
        try:
205
            await self.send_hello()
206
            await self.open_secure_channel()
207
            endpoints = await self.get_endpoints()
208
            await self.close_secure_channel()
209
        finally:
210
            self.disconnect_socket()
211
        return endpoints
212
213
    async def connect_and_find_servers(self):
214
        """
215
        Connect, ask server for a list of known servers, and disconnect
216
        """
217
        await self.connect_socket()
218
        try:
219
            await self.send_hello()
220
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
221
            servers = await self.find_servers()
222
            await self.close_secure_channel()
223
        finally:
224
            self.disconnect_socket()
225
        return servers
226
227
    async def connect_and_find_servers_on_network(self):
228
        """
229
        Connect, ask server for a list of known servers on network, and disconnect
230
        """
231
        await self.connect_socket()
232
        try:
233
            await self.send_hello()
234
            await self.open_secure_channel()
235
            servers = await self.find_servers_on_network()
236
            await self.close_secure_channel()
237
        finally:
238
            self.disconnect_socket()
239
        return servers
240
241
    async def connect(self):
242
        """
243
        High level method
244
        Connect, create and activate session
245
        """
246
        _logger.info("connect")
247
        await self.connect_socket()
248
        try:
249
            await self.send_hello()
250
            await self.open_secure_channel()
251
            await self.create_session()
252
        except Exception:
253
            # clean up open socket
254
            self.disconnect_socket()
255
            raise
256
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
257
258
    async def disconnect(self):
259
        """
260
        High level method
261
        Close session, secure channel and socket
262
        """
263
        _logger.info("disconnect")
264
        try:
265
            await self.close_session()
266
            await self.close_secure_channel()
267
        finally:
268
            self.disconnect_socket()
269
270
    async def connect_socket(self):
271
        """
272
        connect to socket defined in url
273
        """
274
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
275
276
    def disconnect_socket(self):
277
        self.uaclient.disconnect_socket()
278
279
    async def send_hello(self):
280
        """
281
        Send OPC-UA hello to server
282
        """
283
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
284
        if isinstance(ack, ua.UaStatusCodeError):
285
            raise ack
286
287
    async def open_secure_channel(self, renew=False):
288
        """
289
        Open secure channel, if renew is True, renew channel
290
        """
291
        params = ua.OpenSecureChannelParameters()
292
        params.ClientProtocolVersion = 0
293
        params.RequestType = ua.SecurityTokenRequestType.Issue
294
        if renew:
295
            params.RequestType = ua.SecurityTokenRequestType.Renew
296
        params.SecurityMode = self.security_policy.Mode
297
        params.RequestedLifetime = self.secure_channel_timeout
298
        # length should be equal to the length of key of symmetric encryption
299
        params.ClientNonce = create_nonce(self.security_policy.symmetric_key_size)
300
        result = await self.uaclient.open_secure_channel(params)
301
        if self.secure_channel_timeout != result.SecurityToken.RevisedLifetime:
302
            _logger.info("Requested secure channel timeout to be %dms, got %dms instead", self.secure_channel_timeout, result.SecurityToken.RevisedLifetime)
303
            self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
304
305
    async def close_secure_channel(self):
306
        return await self.uaclient.close_secure_channel()
307
308
    async def get_endpoints(self) -> list:
309
        """Get a list of OPC-UA endpoints."""
310
311
        params = ua.GetEndpointsParameters()
312
        params.EndpointUrl = self.server_url.geturl()
313
        return await self.uaclient.get_endpoints(params)
314
315
    async def register_server(self, server, discovery_configuration=None):
316
        """
317
        register a server to discovery server
318
        if discovery_configuration is provided, the newer register_server2 service call is used
319
        """
320
        serv = ua.RegisteredServer()
321
        serv.ServerUri = server.get_application_uri()
322
        serv.ProductUri = server.product_uri
323
        serv.DiscoveryUrls = [server.endpoint.geturl()]
324
        serv.ServerType = server.application_type
325
        serv.ServerNames = [ua.LocalizedText(server.name)]
326
        serv.IsOnline = True
327
        if discovery_configuration:
328
            params = ua.RegisterServer2Parameters()
329
            params.Server = serv
330
            params.DiscoveryConfiguration = discovery_configuration
331
            return await self.uaclient.register_server2(params)
332
        return await self.uaclient.register_server(serv)
333
334
    async def find_servers(self, uris=None):
335
        """
336
        send a FindServer request to the server. The answer should be a list of
337
        servers the server knows about
338
        A list of uris can be provided, only server having matching uris will be returned
339
        """
340
        if uris is None:
341
            uris = []
342
        params = ua.FindServersParameters()
343
        params.EndpointUrl = self.server_url.geturl()
344
        params.ServerUris = uris
345
        return await self.uaclient.find_servers(params)
346
347
    async def find_servers_on_network(self):
348
        params = ua.FindServersOnNetworkParameters()
349
        return await self.uaclient.find_servers_on_network(params)
350
351
    async def create_session(self):
352
        """
353
        send a CreateSessionRequest to server with reasonable parameters.
354
        If you want o modify settings look at code of this methods
355
        and make your own
356
        """
357
        desc = ua.ApplicationDescription()
358
        desc.ApplicationUri = self.application_uri
359
        desc.ProductUri = self.product_uri
360
        desc.ApplicationName = ua.LocalizedText(self.name)
361
        desc.ApplicationType = ua.ApplicationType.Client
362
        params = ua.CreateSessionParameters()
363
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
364
        nonce = create_nonce(32)
365
        params.ClientNonce = nonce
366
        params.ClientCertificate = self.security_policy.host_certificate
367
        params.ClientDescription = desc
368
        params.EndpointUrl = self.server_url.geturl()
369
        params.SessionName = f"{self.description} Session{self._session_counter}"
370
        # Requested maximum number of milliseconds that a Session should remain open without activity
371
        params.RequestedSessionTimeout = self.session_timeout
372
        params.MaxResponseMessageSize = 0  # means no max size
373
        response = await self.uaclient.create_session(params)
374
        if self.security_policy.host_certificate is None:
375
            data = nonce
376
        else:
377
            data = self.security_policy.host_certificate + nonce
378
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
379
        self._server_nonce = response.ServerNonce
380
        if not self.security_policy.peer_certificate:
381
            self.security_policy.peer_certificate = response.ServerCertificate
382
        elif self.security_policy.peer_certificate != response.ServerCertificate:
383
            raise ua.UaError("Server certificate mismatch")
384
        # remember PolicyId's: we will use them in activate_session()
385
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
386
        self._policy_ids = ep.UserIdentityTokens
387
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
388
        if self.session_timeout != response.RevisedSessionTimeout:
389
            _logger.warning("Requested session timeout to be %dms, got %dms instead", self.secure_channel_timeout, response.RevisedSessionTimeout)
390
            self.session_timeout = response.RevisedSessionTimeout
391
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
392
        return response
393
394
    async def _renew_channel_loop(self):
395
        """
396
        Renew the SecureChannel before the SessionTimeout will happen.
397
        In theory we could do that only if no session activity
398
        but it does not cost much..
399
        """
400
        try:
401
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 / 1000
402
            while True:
403
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
404
                await asyncio.sleep(duration)
405
                _logger.debug("renewing channel")
406
                await self.open_secure_channel(renew=True)
407
                val = await self.nodes.server_state.read_value()
408
                _logger.debug("server state is: %s ", val)
409
        except asyncio.CancelledError:
410
            pass
411
        except Exception:
412
            _logger.exception("Error while renewing session")
413
            raise
414
415
    def server_policy_id(self, token_type, default):
416
        """
417
        Find PolicyId of server's UserTokenPolicy by token_type.
418
        Return default if there's no matching UserTokenPolicy.
419
        """
420
        for policy in self._policy_ids:
421
            if policy.TokenType == token_type:
422
                return policy.PolicyId
423
        return default
424
425
    def server_policy_uri(self, token_type):
426
        """
427
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
428
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
429
        of the endpoint
430
        """
431
        for policy in self._policy_ids:
432
            if policy.TokenType == token_type:
433
                if policy.SecurityPolicyUri:
434
                    return policy.SecurityPolicyUri
435
                # empty URI means "use this endpoint's policy URI"
436
                return self.security_policy.URI
437
        return self.security_policy.URI
438
439
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
440
        """
441
        Activate session using either username and password or private_key
442
        """
443
        params = ua.ActivateSessionParameters()
444
        challenge = b""
445
        if self.security_policy.peer_certificate is not None:
446
            challenge += self.security_policy.peer_certificate
447
        if self._server_nonce is not None:
448
            challenge += self._server_nonce
449
        if self.security_policy.AsymmetricSignatureURI:
450
            params.ClientSignature.Algorithm = self.security_policy.AsymmetricSignatureURI
451
        else:
452
            params.ClientSignature.Algorithm = (security_policies.SecurityPolicyBasic256.AsymmetricSignatureURI)
453
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
454
        params.LocaleIds.append("en")
455
        if not username and not certificate:
456
            self._add_anonymous_auth(params)
457
        elif certificate:
458
            self._add_certificate_auth(params, certificate, challenge)
459
        else:
460
            self._add_user_auth(params, username, password)
461
        return await self.uaclient.activate_session(params)
462
463
    def _add_anonymous_auth(self, params):
464
        params.UserIdentityToken = ua.AnonymousIdentityToken()
465
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
466
467
    def _add_certificate_auth(self, params, certificate, challenge):
468
        params.UserIdentityToken = ua.X509IdentityToken()
469
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
470
        # specs part 4, 5.6.3.1: the data to sign is created by appending
471
        # the last serverNonce to the serverCertificate
472
        params.UserTokenSignature = ua.SignatureData()
473
        # use signature algorithm that was used for certificate generation
474
        if certificate.signature_hash_algorithm.name == "sha256":
475
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256sha256")
476
            sig = uacrypto.sign_sha256(self.user_private_key, challenge)
477
            params.UserTokenSignature.Algorithm = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
478
            params.UserTokenSignature.Signature = sig
479
        else:
480
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
481
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
482
            params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
483
            params.UserTokenSignature.Signature = sig
484
485
    def _add_user_auth(self, params, username: str, password: str):
486
        params.UserIdentityToken = ua.UserNameIdentityToken()
487
        params.UserIdentityToken.UserName = username
488
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
489
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
490
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
491
            # then the password only contains UTF-8 encoded password
492
            # and EncryptionAlgorithm is null
493
            if self._password:
494
                _logger.warning("Sending plain-text password")
495
                params.UserIdentityToken.Password = password.encode("utf8")
496
            params.UserIdentityToken.EncryptionAlgorithm = None
497
        elif self._password:
498
            data, uri = self._encrypt_password(password, policy_uri)
499
            params.UserIdentityToken.Password = data
500
            params.UserIdentityToken.EncryptionAlgorithm = uri
501
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
502
503
    def _encrypt_password(self, password: str, policy_uri):
504
        pubkey = uacrypto.x509_from_der(self.security_policy.peer_certificate).public_key()
505
        # see specs part 4, 7.36.3: if the token is encrypted, password
506
        # shall be converted to UTF-8 and serialized with server nonce
507
        passwd = password.encode("utf8")
508
        if self._server_nonce is not None:
509
            passwd += self._server_nonce
510
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
511
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
512
        return data, uri
513
514
    async def close_session(self) -> Coroutine:
515
        """
516
        Close session
517
        """
518
        self._renew_channel_task.cancel()
519
        try:
520
            await self._renew_channel_task
521
        except Exception:
522
            _logger.exception("Error while closing secure channel loop")
523
        return await self.uaclient.close_session(True)
524
525
    def get_root_node(self):
526
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
527
528
    def get_objects_node(self):
529
        _logger.info("get_objects_node")
530
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
531
532
    def get_server_node(self):
533
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
534
535
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
536
        """
537
        Get node using NodeId object or a string representing a NodeId.
538
        """
539
        return Node(self.uaclient, nodeid)
540
541
    async def create_subscription(self, period, handler, publishing=True):
542
        """
543
        Create a subscription.
544
        Returns a Subscription object which allows to subscribe to events or data changes on server.
545
546
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
547
            The second option should be used, if the asyncua-server has problems with the default options.
548
549
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
550
            base class for details). Remember not to block the main event loop inside the handler methods.
551
        """
552
        if isinstance(period, ua.CreateSubscriptionParameters):
553
            params = period
554
        else:
555
            params = ua.CreateSubscriptionParameters()
556
            params.RequestedPublishingInterval = period
557
            params.RequestedLifetimeCount = 10000
558
            params.RequestedMaxKeepAliveCount = 3000
559
            params.MaxNotificationsPerPublish = 10000
560
            params.PublishingEnabled = publishing
561
            params.Priority = 0
562
        subscription = Subscription(self.uaclient, params, handler)
563
        await subscription.init()
564
        return subscription
565
566
    async def get_namespace_array(self):
567
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
568
        return await ns_node.read_value()
569
570
    async def get_namespace_index(self, uri):
571
        uries = await self.get_namespace_array()
572
        _logger.info("get_namespace_index %s %r", type(uries), uries)
573
        return uries.index(uri)
574
575
    async def delete_nodes(self, nodes, recursive=False) -> Coroutine:
576
        return await delete_nodes(self.uaclient, nodes, recursive)
577
578
    async def import_xml(self, path=None, xmlstring=None) -> Coroutine:
579
        """
580
        Import nodes defined in xml
581
        """
582
        importer = XmlImporter(self)
583
        return await importer.import_xml(path, xmlstring)
584
585
    async def export_xml(self, nodes, path):
586
        """
587
        Export defined nodes to xml
588
        """
589
        exp = XmlExporter(self)
590
        await exp.build_etree(nodes)
591
        await exp.write_xml(path)
592
593
    async def register_namespace(self, uri):
594
        """
595
        Register a new namespace. Nodes should in custom namespace, not 0.
596
        This method is mainly implemented for symetry with server
597
        """
598
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
599
        uries = await ns_node.read_value()
600
        if uri in uries:
601
            return uries.index(uri)
602
        uries.append(uri)
603
        await ns_node.write_value(uries)
604
        return len(uries) - 1
605
606
    async def load_type_definitions(self, nodes=None):
607
        """
608
        Load custom types (custom structures/extension objects) definition from server
609
        Generate Python classes for custom structures/extension objects defined in server
610
        These classes will available in ua module
611
        WARNING: protocol has changed in 1.04. use load_data_type_definitions()
612
        """
613
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
614
        return await load_type_definitions(self, nodes)
615
616
    async def load_data_type_definitions(self, node=None):
617
        """
618
        Load custom types (custom structures/extension objects) definition from server
619
        Generate Python classes for custom structures/extension objects defined in server
620
        These classes will be available in ua module
621
        """
622
        return await load_data_type_definitions(self, node)
623
624
    async def load_enums(self):
625
        """
626
        generate Python enums for custom enums on server.
627
        This enums will be available in ua module
628
        """
629
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
630
        return await load_enums(self)
631
632
    async def register_nodes(self, nodes):
633
        """
634
        Register nodes for faster read and write access (if supported by server)
635
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
636
        available as node.basenodeid
637
        """
638
        nodeids = [node.nodeid for node in nodes]
639
        nodeids = await self.uaclient.register_nodes(nodeids)
640
        for node, nodeid in zip(nodes, nodeids):
641
            node.basenodeid = node.nodeid
642
            node.nodeid = nodeid
643
        return nodes
644
645
    async def unregister_nodes(self, nodes):
646
        """
647
        Unregister nodes
648
        """
649
        nodeids = [node.nodeid for node in nodes]
650
        await self.uaclient.unregister_nodes(nodeids)
651
        for node in nodes:
652
            if not node.basenodeid:
653
                continue
654
            node.nodeid = node.basenodeid
655
            node.basenodeid = None
656
657
    async def read_values(self, nodes):
658
        """
659
        Read the value of multiple nodes in one ua call.
660
        """
661
        nodeids = [node.nodeid for node in nodes]
662
        results = await self.uaclient.read_attributes(nodeids, ua.AttributeIds.Value)
663
        return [result.Value.Value for result in results]
664
665
    async def write_values(self, nodes, values):
666
        """
667
        Write values to multiple nodes in one ua call
668
        """
669
        nodeids = [node.nodeid for node in nodes]
670
        dvs = [value_to_datavalue(val) for val in values]
671
        results = await self.uaclient.write_attributes(nodeids, dvs, ua.AttributeIds.Value)
672
        for result in results:
673
            result.check()
674
675
    get_values = read_values  # legacy compatibility
676
    set_values = write_values  # legacy compatibility
677