Passed
Pull Request — master (#282)
by
unknown
02:27
created

asyncua.client.client   F

Complexity

Total Complexity 104

Size/Duplication

Total Lines 671
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 409
dl 0
loc 671
rs 2
c 0
b 0
f 0
wmc 104

54 Methods

Rating   Name   Duplication   Size   Complexity  
A Client.__init__() 0 37 1
A Client.__aenter__() 0 3 1
A Client.__aexit__() 0 2 1
A Client.__str__() 0 2 1
A Client.find_endpoint() 0 11 5
A Client.set_password() 0 8 2
A Client.set_user() 0 6 1
A Client.set_security_string() 0 31 5
A Client.register_server() 0 18 2
A Client.write_values() 0 9 2
A Client._add_user_auth() 0 17 5
A Client.connect_socket() 0 5 1
A Client.disconnect_socket() 0 2 1
A Client.register_nodes() 0 12 2
A Client._renew_channel_loop() 0 20 4
A Client.get_server_node() 0 2 1
A Client.connect_and_find_servers() 0 13 1
A Client.unregister_nodes() 0 11 3
A Client.delete_nodes() 0 2 1
A Client.import_xml() 0 6 1
A Client.disconnect() 0 11 1
A Client.find_servers() 0 12 2
A Client.load_private_key() 0 5 1
A Client.export_xml() 0 7 1
A Client.set_security() 0 23 5
A Client.get_root_node() 0 2 1
A Client.server_policy_uri() 0 13 4
A Client._set_security() 0 13 2
A Client.close_session() 0 7 1
A Client.register_namespace() 0 12 2
A Client._add_anonymous_auth() 0 3 1
A Client.create_subscription() 0 24 2
A Client.get_objects_node() 0 3 1
A Client.get_endpoints() 0 6 1
A Client._add_certificate_auth() 0 16 2
A Client._encrypt_password() 0 10 2
B Client.create_session() 0 43 5
A Client.find_servers_on_network() 0 3 1
A Client.load_type_definitions() 0 9 1
A Client.load_enums() 0 7 1
A Client.get_node() 0 5 1
A Client.load_client_certificate() 0 5 1
A Client.load_data_type_definitions() 0 7 1
A Client.close_secure_channel() 0 2 1
A Client.read_values() 0 7 1
A Client.connect_and_find_servers_on_network() 0 13 1
B Client.activate_session() 0 25 7
A Client.open_secure_channel() 0 18 3
A Client.connect() 0 16 2
A Client.get_namespace_array() 0 3 1
A Client.server_policy_id() 0 9 3
A Client.send_hello() 0 7 2
A Client.connect_and_get_server_endpoints() 0 13 1
A Client.get_namespace_index() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like asyncua.client.client often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import asyncio
2
import logging
3
from typing import Union, Coroutine, Optional
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient
8
from ..common.xmlimporter import XmlImporter
9
from ..common.xmlexporter import XmlExporter
10
from ..common.node import Node
11
from ..common.manage_nodes import delete_nodes
12
from ..common.subscription import Subscription
13
from ..common.shortcuts import Shortcuts
14
from ..common.structures import load_type_definitions, load_enums
15
from ..common.structures104 import load_data_type_definitions
16
from ..common.utils import create_nonce
17
from ..common.ua_utils import value_to_datavalue
18
from ..crypto import uacrypto, security_policies
19
20
_logger = logging.getLogger(__name__)
21
22
23
class Client:
24
    """
25
    High level client to connect to an OPC-UA server.
26
27
    This class makes it easy to connect and browse address space.
28
    It attempts to expose as much functionality as possible
29
    but if you want more flexibility it is possible and advised to
30
    use UaClient object, available as self.uaclient
31
    which offers the raw OPC-UA services interface.
32
    """
33
    def __init__(self, url: str, timeout: int = 4, loop=None):
34
        """
35
        :param url: url of the server.
36
            if you are unsure of url, write at least hostname
37
            and port and call get_endpoints
38
39
        :param timeout:
40
            Each request sent to the server expects an answer within this
41
            time. The timeout is specified in seconds.
42
43
        Some other client parameters can be changed by setting
44
        attributes on the constructed object:
45
        See the source code for the exhaustive list.
46
        """
47
        self.loop = loop or asyncio.get_event_loop()
48
        self.server_url = urlparse(url)
49
        # take initial username and password from the url
50
        self._username = self.server_url.username
51
        self._password = self.server_url.password
52
        self.name = "Pure Python Async. Client"
53
        self.description = self.name
54
        self.application_uri = "urn:freeopcua:client"
55
        self.product_uri = "urn:freeopcua.github.io:client"
56
        self.security_policy = ua.SecurityPolicy()
57
        self.secure_channel_id = None
58
        self.secure_channel_timeout = 3600000  # 1 hour
59
        self.session_timeout = 3600000  # 1 hour
60
        self._policy_ids = []
61
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
62
        self.user_certificate = None
63
        self.user_private_key = None
64
        self._server_nonce = None
65
        self._session_counter = 1
66
        self.nodes = Shortcuts(self.uaclient)
67
        self.max_messagesize = 0  # No limits
68
        self.max_chunkcount = 0  # No limits
69
        self._renew_channel_task = None
70
71
    async def __aenter__(self):
72
        await self.connect()
73
        return self
74
75
    async def __aexit__(self, exc_type, exc_value, traceback):
76
        await self.disconnect()
77
78
    def __str__(self):
79
        return f"Client({self.server_url.geturl()})"
80
81
    __repr__ = __str__
82
83
    @staticmethod
84
    def find_endpoint(endpoints, security_mode, policy_uri):
85
        """
86
        Find endpoint with required security mode and policy URI
87
        """
88
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
89
        for ep in endpoints:
90
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and
91
                    ep.SecurityPolicyUri == policy_uri):
92
                return ep
93
        raise ua.UaError(f"No matching endpoints: {security_mode}, {policy_uri}")
94
95
    def set_user(self, username: str):
96
        """
97
        Set user name for the connection.
98
        initial user from the URL will be overwritten
99
        """
100
        self._username = username
101
102
    def set_password(self, pwd: str):
103
        """
104
        Set user password for the connection.
105
        initial password from the URL will be overwritten
106
        """
107
        if not isinstance(pwd, str):
108
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
109
        self._password = pwd
110
111
    async def set_security_string(self, string: str):
112
        """
113
        Set SecureConnection mode.
114
115
        :param string: Mode format ``Policy,Mode,certificate,private_key[,server_private_key]``
116
117
        where:
118
119
        - ``Policy`` is ``Basic128Rsa15``, ``Basic256`` or ``Basic256Sha256``
120
        - ``Mode`` is ``Sign`` or ``SignAndEncrypt``
121
        - ``certificate`` and ``server_private_key`` are paths to ``.pem`` or ``.der`` files
122
        - ``private_key`` may be a path to a ``.pem`` or ``.der`` file or a conjunction of ``path``::``password`` where
123
          ``password`` is the private key password.
124
125
        Call this before connect()
126
        """
127
        if not string:
128
            return
129
        parts = string.split(",")
130
        if len(parts) < 4:
131
            raise ua.UaError(f"Wrong format: `{string}`, expected at least 4 comma-separated values")
132
133
        if '::' in parts[3]:  # if the filename contains a colon, assume it's a conjunction and parse it
134
            parts[3], client_key_password = parts[3].split('::')
135
        else:
136
            client_key_password = None
137
138
        policy_class = getattr(security_policies, f"SecurityPolicy{parts[0]}")
139
        mode = getattr(ua.MessageSecurityMode, parts[1])
140
        return await self.set_security(policy_class, parts[2], parts[3], client_key_password,
141
                                       parts[4] if len(parts) >= 5 else None, mode)
142
143
    async def set_security(self,
144
                           policy,
145
                           certificate: Union[str, uacrypto.CertProperties],
146
                           private_key: Union[str, uacrypto.CertProperties],
147
                           private_key_password: str = None,
148
                           server_certificate: Optional[Union[str, uacrypto.CertProperties]] = None,
149
                           mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
150
        """
151
        Set SecureConnection mode.
152
        Call this before connect()
153
        """
154
        if server_certificate is None:
155
            # load certificate from server's list of endpoints
156
            endpoints = await self.connect_and_get_server_endpoints()
157
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
158
            server_certificate = uacrypto.x509_from_der(endpoint.ServerCertificate)
159
        elif not isinstance(server_certificate, uacrypto.CertProperties):
160
            server_certificate = uacrypto.CertProperties(server_certificate)
161
        if not isinstance(certificate, uacrypto.CertProperties):
162
            certificate = uacrypto.CertProperties(certificate)
163
        if not isinstance(private_key, uacrypto.CertProperties):
164
            private_key = uacrypto.CertProperties(private_key, password=private_key_password)
165
        return await self._set_security(policy, certificate, private_key, server_certificate, mode)
166
167
    async def _set_security(self,
168
                            policy,
169
                            certificate: uacrypto.CertProperties,
170
                            private_key: uacrypto.CertProperties,
171
                            server_cert: uacrypto.CertProperties,
172
                            mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
173
174
        if isinstance(server_cert, uacrypto.CertProperties):
175
            server_cert = await uacrypto.load_certificate(server_cert.path, server_cert.extension)
176
        cert = await uacrypto.load_certificate(certificate.path, certificate.extension)
177
        pk = await uacrypto.load_private_key(private_key.path, private_key.password, private_key.extension)
178
        self.security_policy = policy(server_cert, cert, pk, mode)
179
        self.uaclient.set_security(self.security_policy)
180
181
    async def load_client_certificate(self, path: str, extension: str = None):
182
        """
183
        load our certificate from file, either pem or der
184
        """
185
        self.user_certificate = await uacrypto.load_certificate(path, extension)
186
187
    async def load_private_key(self, path: str, password: str = None, extension: str = None):
188
        """
189
        Load user private key. This is used for authenticating using certificate
190
        """
191
        self.user_private_key = await uacrypto.load_private_key(path, password, extension)
192
193
    async def connect_and_get_server_endpoints(self):
194
        """
195
        Connect, ask server for endpoints, and disconnect
196
        """
197
        await self.connect_socket()
198
        try:
199
            await self.send_hello()
200
            await self.open_secure_channel()
201
            endpoints = await self.get_endpoints()
202
            await self.close_secure_channel()
203
        finally:
204
            self.disconnect_socket()
205
        return endpoints
206
207
    async def connect_and_find_servers(self):
208
        """
209
        Connect, ask server for a list of known servers, and disconnect
210
        """
211
        await self.connect_socket()
212
        try:
213
            await self.send_hello()
214
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
215
            servers = await self.find_servers()
216
            await self.close_secure_channel()
217
        finally:
218
            self.disconnect_socket()
219
        return servers
220
221
    async def connect_and_find_servers_on_network(self):
222
        """
223
        Connect, ask server for a list of known servers on network, and disconnect
224
        """
225
        await self.connect_socket()
226
        try:
227
            await self.send_hello()
228
            await self.open_secure_channel()
229
            servers = await self.find_servers_on_network()
230
            await self.close_secure_channel()
231
        finally:
232
            self.disconnect_socket()
233
        return servers
234
235
    async def connect(self):
236
        """
237
        High level method
238
        Connect, create and activate session
239
        """
240
        _logger.info("connect")
241
        await self.connect_socket()
242
        try:
243
            await self.send_hello()
244
            await self.open_secure_channel()
245
            await self.create_session()
246
        except Exception:
247
            # clean up open socket
248
            self.disconnect_socket()
249
            raise
250
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
251
252
    async def disconnect(self):
253
        """
254
        High level method
255
        Close session, secure channel and socket
256
        """
257
        _logger.info("disconnect")
258
        try:
259
            await self.close_session()
260
            await self.close_secure_channel()
261
        finally:
262
            self.disconnect_socket()
263
264
    async def connect_socket(self):
265
        """
266
        connect to socket defined in url
267
        """
268
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
269
270
    def disconnect_socket(self):
271
        self.uaclient.disconnect_socket()
272
273
    async def send_hello(self):
274
        """
275
        Send OPC-UA hello to server
276
        """
277
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
278
        if isinstance(ack, ua.UaStatusCodeError):
279
            raise ack
280
281
    async def open_secure_channel(self, renew=False):
282
        """
283
        Open secure channel, if renew is True, renew channel
284
        """
285
        params = ua.OpenSecureChannelParameters()
286
        params.ClientProtocolVersion = 0
287
        params.RequestType = ua.SecurityTokenRequestType.Issue
288
        if renew:
289
            params.RequestType = ua.SecurityTokenRequestType.Renew
290
        params.SecurityMode = self.security_policy.Mode
291
        params.RequestedLifetime = self.secure_channel_timeout
292
        # length should be equal to the length of key of symmetric encryption
293
        params.ClientNonce = create_nonce(self.security_policy.symmetric_key_size)
294
        result = await self.uaclient.open_secure_channel(params)
295
        if self.secure_channel_timeout != result.SecurityToken.RevisedLifetime:
296
            _logger.info("Requested secure channel timeout to be %dms, got %dms instead",
297
                         self.secure_channel_timeout, result.SecurityToken.RevisedLifetime)
298
            self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
299
300
    async def close_secure_channel(self):
301
        return await self.uaclient.close_secure_channel()
302
303
    async def get_endpoints(self) -> list:
304
        """Get a list of OPC-UA endpoints."""
305
306
        params = ua.GetEndpointsParameters()
307
        params.EndpointUrl = self.server_url.geturl()
308
        return await self.uaclient.get_endpoints(params)
309
310
    async def register_server(self, server, discovery_configuration=None):
311
        """
312
        register a server to discovery server
313
        if discovery_configuration is provided, the newer register_server2 service call is used
314
        """
315
        serv = ua.RegisteredServer()
316
        serv.ServerUri = server.get_application_uri()
317
        serv.ProductUri = server.product_uri
318
        serv.DiscoveryUrls = [server.endpoint.geturl()]
319
        serv.ServerType = server.application_type
320
        serv.ServerNames = [ua.LocalizedText(server.name)]
321
        serv.IsOnline = True
322
        if discovery_configuration:
323
            params = ua.RegisterServer2Parameters()
324
            params.Server = serv
325
            params.DiscoveryConfiguration = discovery_configuration
326
            return await self.uaclient.register_server2(params)
327
        return await self.uaclient.register_server(serv)
328
329
    async def find_servers(self, uris=None):
330
        """
331
        send a FindServer request to the server. The answer should be a list of
332
        servers the server knows about
333
        A list of uris can be provided, only server having matching uris will be returned
334
        """
335
        if uris is None:
336
            uris = []
337
        params = ua.FindServersParameters()
338
        params.EndpointUrl = self.server_url.geturl()
339
        params.ServerUris = uris
340
        return await self.uaclient.find_servers(params)
341
342
    async def find_servers_on_network(self):
343
        params = ua.FindServersOnNetworkParameters()
344
        return await self.uaclient.find_servers_on_network(params)
345
346
    async def create_session(self):
347
        """
348
        send a CreateSessionRequest to server with reasonable parameters.
349
        If you want o modify settings look at code of this methods
350
        and make your own
351
        """
352
        desc = ua.ApplicationDescription()
353
        desc.ApplicationUri = self.application_uri
354
        desc.ProductUri = self.product_uri
355
        desc.ApplicationName = ua.LocalizedText(self.name)
356
        desc.ApplicationType = ua.ApplicationType.Client
357
        params = ua.CreateSessionParameters()
358
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
359
        nonce = create_nonce(32)
360
        params.ClientNonce = nonce
361
        params.ClientCertificate = self.security_policy.host_certificate
362
        params.ClientDescription = desc
363
        params.EndpointUrl = self.server_url.geturl()
364
        params.SessionName = f"{self.description} Session{self._session_counter}"
365
        # Requested maximum number of milliseconds that a Session should remain open without activity
366
        params.RequestedSessionTimeout = self.session_timeout
367
        params.MaxResponseMessageSize = 0  # means no max size
368
        response = await self.uaclient.create_session(params)
369
        if self.security_policy.host_certificate is None:
370
            data = nonce
371
        else:
372
            data = self.security_policy.host_certificate + nonce
373
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
374
        self._server_nonce = response.ServerNonce
375
        if not self.security_policy.peer_certificate:
376
            self.security_policy.peer_certificate = response.ServerCertificate
377
        elif self.security_policy.peer_certificate != response.ServerCertificate:
378
            raise ua.UaError("Server certificate mismatch")
379
        # remember PolicyId's: we will use them in activate_session()
380
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
381
        self._policy_ids = ep.UserIdentityTokens
382
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
383
        if self.session_timeout != response.RevisedSessionTimeout:
384
            _logger.warning("Requested session timeout to be %dms, got %dms instead",
385
                            self.secure_channel_timeout, response.RevisedSessionTimeout)
386
            self.session_timeout = response.RevisedSessionTimeout
387
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
388
        return response
389
390
    async def _renew_channel_loop(self):
391
        """
392
        Renew the SecureChannel before the SessionTimeout will happen.
393
        In theory we could do that only if no session activity
394
        but it does not cost much..
395
        """
396
        try:
397
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 / 1000
398
            while True:
399
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
400
                await asyncio.sleep(duration)
401
                _logger.debug("renewing channel")
402
                await self.open_secure_channel(renew=True)
403
                val = await self.nodes.server_state.read_value()
404
                _logger.debug("server state is: %s ", val)
405
        except asyncio.CancelledError:
406
            pass
407
        except:
408
            _logger.exception("Error while renewing session")
409
            raise
410
411
    def server_policy_id(self, token_type, default):
412
        """
413
        Find PolicyId of server's UserTokenPolicy by token_type.
414
        Return default if there's no matching UserTokenPolicy.
415
        """
416
        for policy in self._policy_ids:
417
            if policy.TokenType == token_type:
418
                return policy.PolicyId
419
        return default
420
421
    def server_policy_uri(self, token_type):
422
        """
423
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
424
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
425
        of the endpoint
426
        """
427
        for policy in self._policy_ids:
428
            if policy.TokenType == token_type:
429
                if policy.SecurityPolicyUri:
430
                    return policy.SecurityPolicyUri
431
                # empty URI means "use this endpoint's policy URI"
432
                return self.security_policy.URI
433
        return self.security_policy.URI
434
435
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
436
        """
437
        Activate session using either username and password or private_key
438
        """
439
        params = ua.ActivateSessionParameters()
440
        challenge = b""
441
        if self.security_policy.peer_certificate is not None:
442
            challenge += self.security_policy.peer_certificate
443
        if self._server_nonce is not None:
444
            challenge += self._server_nonce
445
        if self.security_policy.AsymmetricSignatureURI:
446
            params.ClientSignature.Algorithm = self.security_policy.AsymmetricSignatureURI
447
        else:
448
            params.ClientSignature.Algorithm = (
449
                security_policies.SecurityPolicyBasic256.AsymmetricSignatureURI
450
            )
451
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
452
        params.LocaleIds.append("en")
453
        if not username and not certificate:
454
            self._add_anonymous_auth(params)
455
        elif certificate:
456
            self._add_certificate_auth(params, certificate, challenge)
457
        else:
458
            self._add_user_auth(params, username, password)
459
        return await self.uaclient.activate_session(params)
460
461
    def _add_anonymous_auth(self, params):
462
        params.UserIdentityToken = ua.AnonymousIdentityToken()
463
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
464
465
    def _add_certificate_auth(self, params, certificate, challenge):
466
        params.UserIdentityToken = ua.X509IdentityToken()
467
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
468
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
469
        # specs part 4, 5.6.3.1: the data to sign is created by appending
470
        # the last serverNonce to the serverCertificate
471
        params.UserTokenSignature = ua.SignatureData()
472
        # use signature algorithm that was used for certificate generation
473
        if certificate.signature_hash_algorithm.name == "sha256":
474
            sig = uacrypto.sign_sha256(self.user_private_key, challenge)
475
            params.UserTokenSignature.Algorithm = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
476
            params.UserTokenSignature.Signature = sig
477
        else:
478
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
479
            params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
480
            params.UserTokenSignature.Signature = sig
481
482
    def _add_user_auth(self, params, username: str, password: str):
483
        params.UserIdentityToken = ua.UserNameIdentityToken()
484
        params.UserIdentityToken.UserName = username
485
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
486
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
487
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
488
            # then the password only contains UTF-8 encoded password
489
            # and EncryptionAlgorithm is null
490
            if self._password:
491
                _logger.warning("Sending plain-text password")
492
                params.UserIdentityToken.Password = password.encode("utf8")
493
            params.UserIdentityToken.EncryptionAlgorithm = None
494
        elif self._password:
495
            data, uri = self._encrypt_password(password, policy_uri)
496
            params.UserIdentityToken.Password = data
497
            params.UserIdentityToken.EncryptionAlgorithm = uri
498
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
499
500
    def _encrypt_password(self, password: str, policy_uri):
501
        pubkey = uacrypto.x509_from_der(self.security_policy.peer_certificate).public_key()
502
        # see specs part 4, 7.36.3: if the token is encrypted, password
503
        # shall be converted to UTF-8 and serialized with server nonce
504
        passwd = password.encode("utf8")
505
        if self._server_nonce is not None:
506
            passwd += self._server_nonce
507
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
508
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
509
        return data, uri
510
511
    async def close_session(self) -> Coroutine:
512
        """
513
        Close session
514
        """
515
        self._renew_channel_task.cancel()
516
        await self._renew_channel_task
517
        return await self.uaclient.close_session(True)
518
519
    def get_root_node(self):
520
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
521
522
    def get_objects_node(self):
523
        _logger.info("get_objects_node")
524
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
525
526
    def get_server_node(self):
527
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
528
529
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
530
        """
531
        Get node using NodeId object or a string representing a NodeId.
532
        """
533
        return Node(self.uaclient, nodeid)
534
535
    async def create_subscription(self, period, handler):
536
        """
537
        Create a subscription.
538
        Returns a Subscription object which allows to subscribe to events or data changes on server.
539
540
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
541
            The second option should be used, if the asyncua-server has problems with the default options.
542
543
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
544
            base class for details). Remember not to block the main event loop inside the handler methods.
545
        """
546
        if isinstance(period, ua.CreateSubscriptionParameters):
547
            params = period
548
        else:
549
            params = ua.CreateSubscriptionParameters()
550
            params.RequestedPublishingInterval = period
551
            params.RequestedLifetimeCount = 10000
552
            params.RequestedMaxKeepAliveCount = 3000
553
            params.MaxNotificationsPerPublish = 10000
554
            params.PublishingEnabled = True
555
            params.Priority = 0
556
        subscription = Subscription(self.uaclient, params, handler)
557
        await subscription.init()
558
        return subscription
559
560
    async def get_namespace_array(self):
561
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
562
        return await ns_node.read_value()
563
564
    async def get_namespace_index(self, uri):
565
        uries = await self.get_namespace_array()
566
        _logger.info("get_namespace_index %s %r", type(uries), uries)
567
        return uries.index(uri)
568
569
    async def delete_nodes(self, nodes, recursive=False) -> Coroutine:
570
        return await delete_nodes(self.uaclient, nodes, recursive)
571
572
    async def import_xml(self, path=None, xmlstring=None) -> Coroutine:
573
        """
574
        Import nodes defined in xml
575
        """
576
        importer = XmlImporter(self)
577
        return await importer.import_xml(path, xmlstring)
578
579
    async def export_xml(self, nodes, path):
580
        """
581
        Export defined nodes to xml
582
        """
583
        exp = XmlExporter(self)
584
        await exp.build_etree(nodes)
585
        await exp.write_xml(path)
586
587
    async def register_namespace(self, uri):
588
        """
589
        Register a new namespace. Nodes should in custom namespace, not 0.
590
        This method is mainly implemented for symetry with server
591
        """
592
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
593
        uries = await ns_node.read_value()
594
        if uri in uries:
595
            return uries.index(uri)
596
        uries.append(uri)
597
        await ns_node.write_value(uries)
598
        return len(uries) - 1
599
600
    async def load_type_definitions(self, nodes=None):
601
        """
602
        Load custom types (custom structures/extension objects) definition from server
603
        Generate Python classes for custom structures/extension objects defined in server
604
        These classes will available in ua module
605
        WARNING: protocol has changed in 1.04. use load_data_type_definitions()
606
        """
607
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
608
        return await load_type_definitions(self, nodes)
609
610
    async def load_data_type_definitions(self, node=None):
611
        """
612
        Load custom types (custom structures/extension objects) definition from server
613
        Generate Python classes for custom structures/extension objects defined in server
614
        These classes will be available in ua module
615
        """
616
        return await load_data_type_definitions(self, node)
617
618
    async def load_enums(self):
619
        """
620
        generate Python enums for custom enums on server.
621
        This enums will be available in ua module
622
        """
623
        _logger.warning("Deprecated since spec 1.04, call load_data_type_definitions")
624
        return await load_enums(self)
625
626
    async def register_nodes(self, nodes):
627
        """
628
        Register nodes for faster read and write access (if supported by server)
629
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
630
        available as node.basenodeid
631
        """
632
        nodeids = [node.nodeid for node in nodes]
633
        nodeids = await self.uaclient.register_nodes(nodeids)
634
        for node, nodeid in zip(nodes, nodeids):
635
            node.basenodeid = node.nodeid
636
            node.nodeid = nodeid
637
        return nodes
638
639
    async def unregister_nodes(self, nodes):
640
        """
641
        Unregister nodes
642
        """
643
        nodeids = [node.nodeid for node in nodes]
644
        await self.uaclient.unregister_nodes(nodeids)
645
        for node in nodes:
646
            if not node.basenodeid:
647
                continue
648
            node.nodeid = node.basenodeid
649
            node.basenodeid = None
650
651
    async def read_values(self, nodes):
652
        """
653
        Read the value of multiple nodes in one ua call.
654
        """
655
        nodeids = [node.nodeid for node in nodes]
656
        results = await self.uaclient.read_attributes(nodeids, ua.AttributeIds.Value)
657
        return [result.Value.Value for result in results]
658
659
    async def write_values(self, nodes, values):
660
        """
661
        Write values to multiple nodes in one ua call
662
        """
663
        nodeids = [node.nodeid for node in nodes]
664
        dvs = [value_to_datavalue(val) for val in values]
665
        results = await self.uaclient.write_attributes(nodeids, dvs, ua.AttributeIds.Value)
666
        for result in results:
667
            result.check()
668
669
    get_values = read_values  # legacy compatibility
670
    set_values = write_values  # legacy compatibility
671