Passed
Push — master ( 8b73a4...c8d307 )
by Olivier
04:21 queued 01:50
created

asyncua.client.client.Client._set_security()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 6
dl 0
loc 13
rs 9.8
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from typing import Union, Coroutine, Optional
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient
8
from ..common.xmlimporter import XmlImporter
9
from ..common.xmlexporter import XmlExporter
10
from ..common.node import Node
11
from ..common.manage_nodes import delete_nodes
12
from ..common.subscription import Subscription
13
from ..common.shortcuts import Shortcuts
14
from ..common.structures import load_type_definitions, load_enums
15
from ..common.structures104 import load_data_type_definitions
16
from ..common.utils import create_nonce
17
from ..common.ua_utils import value_to_datavalue
18
from ..crypto import uacrypto, security_policies
19
20
_logger = logging.getLogger(__name__)
21
22
23
class Client:
24
    """
25
    High level client to connect to an OPC-UA server.
26
27
    This class makes it easy to connect and browse address space.
28
    It attempts to expose as much functionality as possible
29
    but if you want more flexibility it is possible and advised to
30
    use UaClient object, available as self.uaclient
31
    which offers the raw OPC-UA services interface.
32
    """
33
    def __init__(self, url: str, timeout: int = 4, loop=None):
34
        """
35
        :param url: url of the server.
36
            if you are unsure of url, write at least hostname
37
            and port and call get_endpoints
38
39
        :param timeout:
40
            Each request sent to the server expects an answer within this
41
            time. The timeout is specified in seconds.
42
43
        Some other client parameters can be changed by setting
44
        attributes on the constructed object:
45
        See the source code for the exhaustive list.
46
        """
47
        self.loop = loop or asyncio.get_event_loop()
48
        self.server_url = urlparse(url)
49
        # take initial username and password from the url
50
        self._username = self.server_url.username
51
        self._password = self.server_url.password
52
        self.name = "Pure Python Async. Client"
53
        self.description = self.name
54
        self.application_uri = "urn:freeopcua:client"
55
        self.product_uri = "urn:freeopcua.github.io:client"
56
        self.security_policy = ua.SecurityPolicy()
57
        self.secure_channel_id = None
58
        self.secure_channel_timeout = 3600000  # 1 hour
59
        self.session_timeout = 3600000  # 1 hour
60
        self._policy_ids = []
61
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
62
        self.user_certificate = None
63
        self.user_private_key = None
64
        self._server_nonce = None
65
        self._session_counter = 1
66
        self.nodes = Shortcuts(self.uaclient)
67
        self.max_messagesize = 0  # No limits
68
        self.max_chunkcount = 0  # No limits
69
        self._renew_channel_task = None
70
71
    async def __aenter__(self):
72
        await self.connect()
73
        return self
74
75
    async def __aexit__(self, exc_type, exc_value, traceback):
76
        await self.disconnect()
77
78
    def __str__(self):
79
        return f"Client({self.server_url.geturl()})"
80
81
    __repr__ = __str__
82
83
    @staticmethod
84
    def find_endpoint(endpoints, security_mode, policy_uri):
85
        """
86
        Find endpoint with required security mode and policy URI
87
        """
88
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
89
        for ep in endpoints:
90
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and
91
                    ep.SecurityPolicyUri == policy_uri):
92
                return ep
93
        raise ua.UaError(f"No matching endpoints: {security_mode}, {policy_uri}")
94
95
    def set_user(self, username: str):
96
        """
97
        Set user name for the connection.
98
        initial user from the URL will be overwritten
99
        """
100
        self._username = username
101
102
    def set_password(self, pwd: str):
103
        """
104
        Set user password for the connection.
105
        initial password from the URL will be overwritten
106
        """
107
        if not isinstance(pwd, str):
108
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
109
        self._password = pwd
110
111
    async def set_security_string(self, string: str):
112
        """
113
        Set SecureConnection mode.
114
115
        :param string: Mode format ``Policy,Mode,certificate,private_key[,server_private_key]``
116
117
        where:
118
119
        - ``Policy`` is ``Basic128Rsa15``, ``Basic256`` or ``Basic256Sha256``
120
        - ``Mode`` is ``Sign`` or ``SignAndEncrypt``
121
        - ``certificate`` and ``server_private_key`` are paths to ``.pem`` or ``.der`` files
122
        - ``private_key`` may be a path to a ``.pem`` or ``.der`` file or a conjunction of ``path``::``password`` where
123
          ``password`` is the private key password.
124
125
        Call this before connect()
126
        """
127
        if not string:
128
            return
129
        parts = string.split(",")
130
        if len(parts) < 4:
131
            raise ua.UaError(f"Wrong format: `{string}`, expected at least 4 comma-separated values")
132
133
        if '::' in parts[3]:  # if the filename contains a colon, assume it's a conjunction and parse it
134
            parts[3], client_key_password = parts[3].split('::')
135
        else:
136
            client_key_password = None
137
138
        policy_class = getattr(security_policies, f"SecurityPolicy{parts[0]}")
139
        mode = getattr(ua.MessageSecurityMode, parts[1])
140
        return await self.set_security(policy_class, parts[2], parts[3], client_key_password,
141
                                       parts[4] if len(parts) >= 5 else None, mode)
142
143
    async def set_security(self,
144
                           policy,
145
                           certificate: Union[str, uacrypto.CertProperties],
146
                           private_key: Union[str, uacrypto.CertProperties],
147
                           private_key_password: str = None,
148
                           server_certificate: Optional[Union[str, uacrypto.CertProperties]] = None,
149
                           mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
150
        """
151
        Set SecureConnection mode.
152
        Call this before connect()
153
        """
154
        if server_certificate is None:
155
            # load certificate from server's list of endpoints
156
            endpoints = await self.connect_and_get_server_endpoints()
157
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
158
            server_certificate = uacrypto.x509_from_der(endpoint.ServerCertificate)
159
        elif not isinstance(server_certificate, uacrypto.CertProperties):
160
            server_certificate = uacrypto.CertProperties(server_certificate)
161
        if not isinstance(certificate, uacrypto.CertProperties):
162
            certificate = uacrypto.CertProperties(certificate)
163
        if not isinstance(private_key, uacrypto.CertProperties):
164
            private_key = uacrypto.CertProperties(private_key, password=private_key_password)
165
        return await self._set_security(policy, certificate, private_key, server_certificate, mode)
166
167
    async def _set_security(self,
168
                            policy,
169
                            certificate: uacrypto.CertProperties,
170
                            private_key: uacrypto.CertProperties,
171
                            server_cert: uacrypto.CertProperties,
172
                            mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
173
174
        if isinstance(server_cert, uacrypto.CertProperties):
175
            server_cert = await uacrypto.load_certificate(server_cert.path, server_cert.extension)
176
        cert = await uacrypto.load_certificate(certificate.path, certificate.extension)
177
        pk = await uacrypto.load_private_key(private_key.path, private_key.password, private_key.extension)
178
        self.security_policy = policy(server_cert, cert, pk, mode)
179
        self.uaclient.set_security(self.security_policy)
180
181
    async def load_client_certificate(self, path: str, extension: str = None):
182
        """
183
        load our certificate from file, either pem or der
184
        """
185
        self.user_certificate = await uacrypto.load_certificate(path, extension)
186
187
    async def load_private_key(self, path: str, password: str = None, extension: str = None):
188
        """
189
        Load user private key. This is used for authenticating using certificate
190
        """
191
        self.user_private_key = await uacrypto.load_private_key(path, password, extension)
192
193
    async def connect_and_get_server_endpoints(self):
194
        """
195
        Connect, ask server for endpoints, and disconnect
196
        """
197
        await self.connect_socket()
198
        try:
199
            await self.send_hello()
200
            await self.open_secure_channel()
201
            endpoints = await self.get_endpoints()
202
            await self.close_secure_channel()
203
        finally:
204
            self.disconnect_socket()
205
        return endpoints
206
207
    async def connect_and_find_servers(self):
208
        """
209
        Connect, ask server for a list of known servers, and disconnect
210
        """
211
        await self.connect_socket()
212
        try:
213
            await self.send_hello()
214
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
215
            servers = await self.find_servers()
216
            await self.close_secure_channel()
217
        finally:
218
            self.disconnect_socket()
219
        return servers
220
221
    async def connect_and_find_servers_on_network(self):
222
        """
223
        Connect, ask server for a list of known servers on network, and disconnect
224
        """
225
        await self.connect_socket()
226
        try:
227
            await self.send_hello()
228
            await self.open_secure_channel()
229
            servers = await self.find_servers_on_network()
230
            await self.close_secure_channel()
231
        finally:
232
            self.disconnect_socket()
233
        return servers
234
235
    async def connect(self):
236
        """
237
        High level method
238
        Connect, create and activate session
239
        """
240
        _logger.info("connect")
241
        await self.connect_socket()
242
        try:
243
            await self.send_hello()
244
            await self.open_secure_channel()
245
            await self.create_session()
246
        except Exception:
247
            # clean up open socket
248
            self.disconnect_socket()
249
            raise
250
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
251
252
    async def disconnect(self):
253
        """
254
        High level method
255
        Close session, secure channel and socket
256
        """
257
        _logger.info("disconnect")
258
        try:
259
            await self.close_session()
260
            await self.close_secure_channel()
261
        finally:
262
            self.disconnect_socket()
263
264
    async def connect_socket(self):
265
        """
266
        connect to socket defined in url
267
        """
268
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
269
270
    def disconnect_socket(self):
271
        self.uaclient.disconnect_socket()
272
273
    async def send_hello(self):
274
        """
275
        Send OPC-UA hello to server
276
        """
277
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
278
        if isinstance(ack, ua.UaStatusCodeError):
279
            raise ack
280
281
    async def open_secure_channel(self, renew=False):
282
        """
283
        Open secure channel, if renew is True, renew channel
284
        """
285
        params = ua.OpenSecureChannelParameters()
286
        params.ClientProtocolVersion = 0
287
        params.RequestType = ua.SecurityTokenRequestType.Issue
288
        if renew:
289
            params.RequestType = ua.SecurityTokenRequestType.Renew
290
        params.SecurityMode = self.security_policy.Mode
291
        params.RequestedLifetime = self.secure_channel_timeout
292
        # length should be equal to the length of key of symmetric encryption
293
        params.ClientNonce = create_nonce(self.security_policy.symmetric_key_size)
294
        result = await self.uaclient.open_secure_channel(params)
295
        if self.secure_channel_timeout != result.SecurityToken.RevisedLifetime:
296
            _logger.info("Requested secure channel timeout to be %dms, got %dms instead",
297
                         self.secure_channel_timeout, result.SecurityToken.RevisedLifetime)
298
            self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
299
300
    async def close_secure_channel(self):
301
        return await self.uaclient.close_secure_channel()
302
303
    async def get_endpoints(self) -> list:
304
        """Get a list of OPC-UA endpoints."""
305
306
        params = ua.GetEndpointsParameters()
307
        params.EndpointUrl = self.server_url.geturl()
308
        return await self.uaclient.get_endpoints(params)
309
310
    async def register_server(self, server, discovery_configuration=None):
311
        """
312
        register a server to discovery server
313
        if discovery_configuration is provided, the newer register_server2 service call is used
314
        """
315
        serv = ua.RegisteredServer()
316
        serv.ServerUri = server.get_application_uri()
317
        serv.ProductUri = server.product_uri
318
        serv.DiscoveryUrls = [server.endpoint.geturl()]
319
        serv.ServerType = server.application_type
320
        serv.ServerNames = [ua.LocalizedText(server.name)]
321
        serv.IsOnline = True
322
        if discovery_configuration:
323
            params = ua.RegisterServer2Parameters()
324
            params.Server = serv
325
            params.DiscoveryConfiguration = discovery_configuration
326
            return await self.uaclient.register_server2(params)
327
        return await self.uaclient.register_server(serv)
328
329
    async def find_servers(self, uris=None):
330
        """
331
        send a FindServer request to the server. The answer should be a list of
332
        servers the server knows about
333
        A list of uris can be provided, only server having matching uris will be returned
334
        """
335
        if uris is None:
336
            uris = []
337
        params = ua.FindServersParameters()
338
        params.EndpointUrl = self.server_url.geturl()
339
        params.ServerUris = uris
340
        return await self.uaclient.find_servers(params)
341
342
    async def find_servers_on_network(self):
343
        params = ua.FindServersOnNetworkParameters()
344
        return await self.uaclient.find_servers_on_network(params)
345
346
    async def create_session(self):
347
        """
348
        send a CreateSessionRequest to server with reasonable parameters.
349
        If you want o modify settings look at code of this methods
350
        and make your own
351
        """
352
        desc = ua.ApplicationDescription()
353
        desc.ApplicationUri = self.application_uri
354
        desc.ProductUri = self.product_uri
355
        desc.ApplicationName = ua.LocalizedText(self.name)
356
        desc.ApplicationType = ua.ApplicationType.Client
357
        params = ua.CreateSessionParameters()
358
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
359
        nonce = create_nonce(32)
360
        params.ClientNonce = nonce
361
        params.ClientCertificate = self.security_policy.host_certificate
362
        params.ClientDescription = desc
363
        params.EndpointUrl = self.server_url.geturl()
364
        params.SessionName = f"{self.description} Session{self._session_counter}"
365
        # Requested maximum number of milliseconds that a Session should remain open without activity
366
        params.RequestedSessionTimeout = self.session_timeout
367
        params.MaxResponseMessageSize = 0  # means no max size
368
        response = await self.uaclient.create_session(params)
369
        if self.security_policy.host_certificate is None:
370
            data = nonce
371
        else:
372
            data = self.security_policy.host_certificate + nonce
373
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
374
        self._server_nonce = response.ServerNonce
375
        if not self.security_policy.peer_certificate:
376
            self.security_policy.peer_certificate = response.ServerCertificate
377
        elif self.security_policy.peer_certificate != response.ServerCertificate:
378
            raise ua.UaError("Server certificate mismatch")
379
        # remember PolicyId's: we will use them in activate_session()
380
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
381
        self._policy_ids = ep.UserIdentityTokens
382
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
383
        if self.session_timeout != response.RevisedSessionTimeout:
384
            _logger.warning("Requested session timeout to be %dms, got %dms instead",
385
                            self.secure_channel_timeout, response.RevisedSessionTimeout)
386
            self.session_timeout = response.RevisedSessionTimeout
387
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
388
        return response
389
390
    async def _renew_channel_loop(self):
391
        """
392
        Renew the SecureChannel before the SessionTimeout will happen.
393
        In theory we could do that only if no session activity
394
        but it does not cost much..
395
        """
396
        try:
397
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 / 1000
398
            while True:
399
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
400
                await asyncio.sleep(duration)
401
                _logger.debug("renewing channel")
402
                await self.open_secure_channel(renew=True)
403
                val = await self.nodes.server_state.read_value()
404
                _logger.debug("server state is: %s ", val)
405
        except asyncio.CancelledError:
406
            pass
407
        except:
408
            _logger.exception("Error while renewing session")
409
            raise
410
411
    def server_policy_id(self, token_type, default):
412
        """
413
        Find PolicyId of server's UserTokenPolicy by token_type.
414
        Return default if there's no matching UserTokenPolicy.
415
        """
416
        for policy in self._policy_ids:
417
            if policy.TokenType == token_type:
418
                return policy.PolicyId
419
        return default
420
421
    def server_policy_uri(self, token_type):
422
        """
423
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
424
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
425
        of the endpoint
426
        """
427
        for policy in self._policy_ids:
428
            if policy.TokenType == token_type:
429
                if policy.SecurityPolicyUri:
430
                    return policy.SecurityPolicyUri
431
                # empty URI means "use this endpoint's policy URI"
432
                return self.security_policy.URI
433
        return self.security_policy.URI
434
435
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
436
        """
437
        Activate session using either username and password or private_key
438
        """
439
        params = ua.ActivateSessionParameters()
440
        challenge = b""
441
        if self.security_policy.peer_certificate is not None:
442
            challenge += self.security_policy.peer_certificate
443
        if self._server_nonce is not None:
444
            challenge += self._server_nonce
445
        if self.security_policy.AsymmetricSignatureURI:
446
            params.ClientSignature.Algorithm = self.security_policy.AsymmetricSignatureURI
447
        else:
448
            params.ClientSignature.Algorithm = (
449
                security_policies.SecurityPolicyBasic256.AsymmetricSignatureURI
450
            )
451
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
452
        params.LocaleIds.append("en")
453
        if not username and not certificate:
454
            self._add_anonymous_auth(params)
455
        elif certificate:
456
            self._add_certificate_auth(params, certificate, challenge)
457
        else:
458
            self._add_user_auth(params, username, password)
459
        return await self.uaclient.activate_session(params)
460
461
    def _add_anonymous_auth(self, params):
462
        params.UserIdentityToken = ua.AnonymousIdentityToken()
463
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
464
465
    def _add_certificate_auth(self, params, certificate, challenge):
466
        params.UserIdentityToken = ua.X509IdentityToken()
467
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
468
        # specs part 4, 5.6.3.1: the data to sign is created by appending
469
        # the last serverNonce to the serverCertificate
470
        params.UserTokenSignature = ua.SignatureData()
471
        # use signature algorithm that was used for certificate generation
472
        if certificate.signature_hash_algorithm.name == "sha256":
473
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256sha256")
474
            sig = uacrypto.sign_sha256(self.user_private_key, challenge)
475
            params.UserTokenSignature.Algorithm = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
476
            params.UserTokenSignature.Signature = sig
477
        else:
478
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
479
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
480
            params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
481
            params.UserTokenSignature.Signature = sig
482
483
    def _add_user_auth(self, params, username: str, password: str):
484
        params.UserIdentityToken = ua.UserNameIdentityToken()
485
        params.UserIdentityToken.UserName = username
486
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
487
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
488
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
489
            # then the password only contains UTF-8 encoded password
490
            # and EncryptionAlgorithm is null
491
            if self._password:
492
                _logger.warning("Sending plain-text password")
493
                params.UserIdentityToken.Password = password.encode("utf8")
494
            params.UserIdentityToken.EncryptionAlgorithm = None
495
        elif self._password:
496
            data, uri = self._encrypt_password(password, policy_uri)
497
            params.UserIdentityToken.Password = data
498
            params.UserIdentityToken.EncryptionAlgorithm = uri
499
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
500
501
    def _encrypt_password(self, password: str, policy_uri):
502
        pubkey = uacrypto.x509_from_der(self.security_policy.peer_certificate).public_key()
503
        # see specs part 4, 7.36.3: if the token is encrypted, password
504
        # shall be converted to UTF-8 and serialized with server nonce
505
        passwd = password.encode("utf8")
506
        if self._server_nonce is not None:
507
            passwd += self._server_nonce
508
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
509
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
510
        return data, uri
511
512
    async def close_session(self) -> Coroutine:
513
        """
514
        Close session
515
        """
516
        self._renew_channel_task.cancel()
517
        await self._renew_channel_task
518
        return await self.uaclient.close_session(True)
519
520
    def get_root_node(self):
521
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
522
523
    def get_objects_node(self):
524
        _logger.info("get_objects_node")
525
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
526
527
    def get_server_node(self):
528
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
529
530
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
531
        """
532
        Get node using NodeId object or a string representing a NodeId.
533
        """
534
        return Node(self.uaclient, nodeid)
535
536
    async def create_subscription(self, period, handler):
537
        """
538
        Create a subscription.
539
        Returns a Subscription object which allows to subscribe to events or data changes on server.
540
541
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
542
            The second option should be used, if the asyncua-server has problems with the default options.
543
544
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
545
            base class for details). Remember not to block the main event loop inside the handler methods.
546
        """
547
        if isinstance(period, ua.CreateSubscriptionParameters):
548
            params = period
549
        else:
550
            params = ua.CreateSubscriptionParameters()
551
            params.RequestedPublishingInterval = period
552
            params.RequestedLifetimeCount = 10000
553
            params.RequestedMaxKeepAliveCount = 3000
554
            params.MaxNotificationsPerPublish = 10000
555
            params.PublishingEnabled = True
556
            params.Priority = 0
557
        subscription = Subscription(self.uaclient, params, handler)
558
        await subscription.init()
559
        return subscription
560
561
    async def get_namespace_array(self):
562
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
563
        return await ns_node.read_value()
564
565
    async def get_namespace_index(self, uri):
566
        uries = await self.get_namespace_array()
567
        _logger.info("get_namespace_index %s %r", type(uries), uries)
568
        return uries.index(uri)
569
570
    async def delete_nodes(self, nodes, recursive=False) -> Coroutine:
571
        return await delete_nodes(self.uaclient, nodes, recursive)
572
573
    async def import_xml(self, path=None, xmlstring=None) -> Coroutine:
574
        """
575
        Import nodes defined in xml
576
        """
577
        importer = XmlImporter(self)
578
        return await importer.import_xml(path, xmlstring)
579
580
    async def export_xml(self, nodes, path):
581
        """
582
        Export defined nodes to xml
583
        """
584
        exp = XmlExporter(self)
585
        await exp.build_etree(nodes)
586
        await exp.write_xml(path)
587
588
    async def register_namespace(self, uri):
589
        """
590
        Register a new namespace. Nodes should in custom namespace, not 0.
591
        This method is mainly implemented for symetry with server
592
        """
593
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
594
        uries = await ns_node.read_value()
595
        if uri in uries:
596
            return uries.index(uri)
597
        uries.append(uri)
598
        await ns_node.write_value(uries)
599
        return len(uries) - 1
600
601
    async def load_type_definitions(self, nodes=None):
602
        """
603
        Load custom types (custom structures/extension objects) definition from server
604
        Generate Python classes for custom structures/extension objects defined in server
605
        These classes will available in ua module
606
        WARNING: protocol has changed in 1.04. use load_data_type_definitions()
607
        """
608
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
609
        return await load_type_definitions(self, nodes)
610
611
    async def load_data_type_definitions(self, node=None):
612
        """
613
        Load custom types (custom structures/extension objects) definition from server
614
        Generate Python classes for custom structures/extension objects defined in server
615
        These classes will be available in ua module
616
        """
617
        return await load_data_type_definitions(self, node)
618
619
    async def load_enums(self):
620
        """
621
        generate Python enums for custom enums on server.
622
        This enums will be available in ua module
623
        """
624
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
625
        return await load_enums(self)
626
627
    async def register_nodes(self, nodes):
628
        """
629
        Register nodes for faster read and write access (if supported by server)
630
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
631
        available as node.basenodeid
632
        """
633
        nodeids = [node.nodeid for node in nodes]
634
        nodeids = await self.uaclient.register_nodes(nodeids)
635
        for node, nodeid in zip(nodes, nodeids):
636
            node.basenodeid = node.nodeid
637
            node.nodeid = nodeid
638
        return nodes
639
640
    async def unregister_nodes(self, nodes):
641
        """
642
        Unregister nodes
643
        """
644
        nodeids = [node.nodeid for node in nodes]
645
        await self.uaclient.unregister_nodes(nodeids)
646
        for node in nodes:
647
            if not node.basenodeid:
648
                continue
649
            node.nodeid = node.basenodeid
650
            node.basenodeid = None
651
652
    async def read_values(self, nodes):
653
        """
654
        Read the value of multiple nodes in one ua call.
655
        """
656
        nodeids = [node.nodeid for node in nodes]
657
        results = await self.uaclient.read_attributes(nodeids, ua.AttributeIds.Value)
658
        return [result.Value.Value for result in results]
659
660
    async def write_values(self, nodes, values):
661
        """
662
        Write values to multiple nodes in one ua call
663
        """
664
        nodeids = [node.nodeid for node in nodes]
665
        dvs = [value_to_datavalue(val) for val in values]
666
        results = await self.uaclient.write_attributes(nodeids, dvs, ua.AttributeIds.Value)
667
        for result in results:
668
            result.check()
669
670
    get_values = read_values  # legacy compatibility
671
    set_values = write_values  # legacy compatibility
672