Completed
Push — master ( 67bd88...5cd7d6 )
by Olivier
08:07 queued 05:35
created

asyncua.client.client   F

Complexity

Total Complexity 91

Size/Duplication

Total Lines 597
Duplicated Lines 2.01 %

Importance

Changes 0
Metric Value
eloc 366
dl 12
loc 597
rs 2
c 0
b 0
f 0
wmc 91

51 Methods

Rating   Name   Duplication   Size   Complexity  
A Client.__init__() 0 34 1
A Client.__aenter__() 0 3 1
A Client.__aexit__() 0 2 1
A Client.register_server() 0 18 2
A Client._add_user_auth() 0 17 5
A Client.connect_socket() 0 5 1
A Client.disconnect_socket() 0 2 1
A Client.register_nodes() 0 12 2
A Client._renew_channel_loop() 0 17 3
A Client.get_server_node() 0 2 1
A Client.connect_and_find_servers() 0 13 1
A Client.unregister_nodes() 0 11 3
A Client.delete_nodes() 0 2 1
A Client.import_xml() 0 6 1
A Client.disconnect() 0 11 1
A Client.find_servers() 0 12 2
A Client.load_private_key() 0 5 1
A Client.export_xml() 0 7 1
A Client.set_security() 0 21 2
A Client.get_root_node() 0 2 1
A Client.server_policy_uri() 0 13 4
A Client.close_session() 0 7 1
A Client.register_namespace() 12 12 2
A Client._add_anonymous_auth() 0 3 1
A Client.find_endpoint() 0 10 5
A Client.create_subscription() 0 23 2
A Client.get_objects_node() 0 3 1
A Client.get_endpoints() 0 4 1
A Client._add_certificate_auth() 0 10 1
A Client.set_password() 0 8 2
A Client.create_session() 0 40 4
A Client._encrypt_password() 0 10 2
A Client.find_servers_on_network() 0 3 1
A Client.set_values() 0 9 2
A Client.load_type_definitions() 0 7 1
A Client.load_enums() 0 6 1
A Client.get_values() 0 7 1
A Client.get_node() 0 5 1
A Client.load_client_certificate() 0 5 1
A Client.close_secure_channel() 0 2 1
A Client.set_user() 0 6 1
A Client.connect_and_find_servers_on_network() 0 13 1
B Client.activate_session() 0 20 6
A Client.open_secure_channel() 0 17 2
A Client.connect() 0 16 2
A Client.set_security_string() 0 18 4
A Client.get_namespace_array() 0 3 1
A Client.server_policy_id() 0 9 3
A Client.send_hello() 0 7 2
A Client.connect_and_get_server_endpoints() 0 13 1
A Client.get_namespace_index() 0 4 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like asyncua.client.client often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import asyncio
2
import logging
3
from typing import Union, Coroutine
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient
8
from ..common.xmlimporter import XmlImporter
9
from ..common.xmlexporter import XmlExporter
10
from ..common.node import Node
11
from ..common.manage_nodes import delete_nodes
12
from ..common.subscription import Subscription
13
from ..common.shortcuts import Shortcuts
14
from ..common.structures import load_type_definitions, load_enums
15
from ..common.utils import create_nonce
16
from ..common.ua_utils import value_to_datavalue
17
from ..crypto import uacrypto, security_policies
18
19
_logger = logging.getLogger(__name__)
20
21
22
class Client:
23
    """
24
    High level client to connect to an OPC-UA server.
25
26
    This class makes it easy to connect and browse address space.
27
    It attempts to expose as much functionality as possible
28
    but if you want more flexibility it is possible and advised to
29
    use UaClient object, available as self.uaclient
30
    which offers the raw OPC-UA services interface.
31
    """
32
    def __init__(self, url: str, timeout: int = 4, loop=None):
33
        """
34
        :param url: url of the server.
35
            if you are unsure of url, write at least hostname
36
            and port and call get_endpoints
37
38
        :param timeout:
39
            Each request sent to the server expects an answer within this
40
            time. The timeout is specified in seconds.
41
        """
42
        self.logger = logging.getLogger(__name__)
43
        self.loop = loop or asyncio.get_event_loop()
44
        self.server_url = urlparse(url)
45
        # take initial username and password from the url
46
        self._username = self.server_url.username
47
        self._password = self.server_url.password
48
        self.name = "Pure Python Async. Client"
49
        self.description = self.name
50
        self.application_uri = "urn:freeopcua:client"
51
        self.product_uri = "urn:freeopcua.github.io:client"
52
        self.security_policy = ua.SecurityPolicy()
53
        self.secure_channel_id = None
54
        self.secure_channel_timeout = 3600000  # 1 hour
55
        self.session_timeout = 3600000  # 1 hour
56
        self._policy_ids = []
57
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
58
        self.user_certificate = None
59
        self.user_private_key = None
60
        self._server_nonce = None
61
        self._session_counter = 1
62
        self.nodes = Shortcuts(self.uaclient)
63
        self.max_messagesize = 0  # No limits
64
        self.max_chunkcount = 0  # No limits
65
        self._renew_channel_task = None
66
67
    async def __aenter__(self):
68
        await self.connect()
69
        return self
70
71
    async def __aexit__(self, exc_type, exc_value, traceback):
72
        await self.disconnect()
73
74
    @staticmethod
75
    def find_endpoint(endpoints, security_mode, policy_uri):
76
        """
77
        Find endpoint with required security mode and policy URI
78
        """
79
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
80
        for ep in endpoints:
81
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and ep.SecurityPolicyUri == policy_uri):
82
                return ep
83
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
84
85
    def set_user(self, username: str):
86
        """
87
        Set user name for the connection.
88
        initial user from the URL will be overwritten
89
        """
90
        self._username = username
91
92
    def set_password(self, pwd: str):
93
        """
94
        Set user password for the connection.
95
        initial password from the URL will be overwritten
96
        """
97
        if not isinstance(pwd, str):
98
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
99
        self._password = pwd
100
101
    async def set_security_string(self, string: str):
102
        """
103
        Set SecureConnection mode. String format:
104
        Policy,Mode,certificate,private_key[,server_private_key]
105
        where Policy is Basic128Rsa15, Basic256 or Basic256Sha256,
106
            Mode is Sign or SignAndEncrypt
107
            certificate, private_key and server_private_key are
108
                paths to .pem or .der files
109
        Call this before connect()
110
        """
111
        if not string:
112
            return
113
        parts = string.split(",")
114
        if len(parts) < 4:
115
            raise ua.UaError("Wrong format: `{}`, expected at least 4 comma-separated values".format(string))
116
        policy_class = getattr(security_policies, "SecurityPolicy{}".format(parts[0]))
117
        mode = getattr(ua.MessageSecurityMode, parts[1])
118
        return await self.set_security(policy_class, parts[2], parts[3], parts[4] if len(parts) >= 5 else None, mode)
119
120
    async def set_security(self,
121
                           policy,
122
                           certificate_path: str,
123
                           private_key_path: str,
124
                           server_certificate_path: str = None,
125
                           mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
126
        """
127
        Set SecureConnection mode.
128
        Call this before connect()
129
        """
130
        if server_certificate_path is None:
131
            # load certificate from server's list of endpoints
132
            endpoints = await self.connect_and_get_server_endpoints()
133
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
134
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
135
        else:
136
            server_cert = await uacrypto.load_certificate(server_certificate_path)
137
        cert = await uacrypto.load_certificate(certificate_path)
138
        pk = await uacrypto.load_private_key(private_key_path)
139
        self.security_policy = policy(server_cert, cert, pk, mode)
140
        self.uaclient.set_security(self.security_policy)
141
142
    async def load_client_certificate(self, path: str):
143
        """
144
        load our certificate from file, either pem or der
145
        """
146
        self.user_certificate = await uacrypto.load_certificate(path)
147
148
    async def load_private_key(self, path: str):
149
        """
150
        Load user private key. This is used for authenticating using certificate
151
        """
152
        self.user_private_key = await uacrypto.load_private_key(path)
153
154
    async def connect_and_get_server_endpoints(self):
155
        """
156
        Connect, ask server for endpoints, and disconnect
157
        """
158
        await self.connect_socket()
159
        try:
160
            await self.send_hello()
161
            await self.open_secure_channel()
162
            endpoints = await self.get_endpoints()
163
            await self.close_secure_channel()
164
        finally:
165
            self.disconnect_socket()
166
        return endpoints
167
168
    async def connect_and_find_servers(self):
169
        """
170
        Connect, ask server for a list of known servers, and disconnect
171
        """
172
        await self.connect_socket()
173
        try:
174
            await self.send_hello()
175
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
176
            servers = await self.find_servers()
177
            await self.close_secure_channel()
178
        finally:
179
            self.disconnect_socket()
180
        return servers
181
182
    async def connect_and_find_servers_on_network(self):
183
        """
184
        Connect, ask server for a list of known servers on network, and disconnect
185
        """
186
        await self.connect_socket()
187
        try:
188
            await self.send_hello()
189
            await self.open_secure_channel()
190
            servers = await self.find_servers_on_network()
191
            await self.close_secure_channel()
192
        finally:
193
            self.disconnect_socket()
194
        return servers
195
196
    async def connect(self):
197
        """
198
        High level method
199
        Connect, create and activate session
200
        """
201
        _logger.info("connect")
202
        await self.connect_socket()
203
        try:
204
            await self.send_hello()
205
            await self.open_secure_channel()
206
            await self.create_session()
207
        except Exception:
208
            # clean up open socket
209
            self.disconnect_socket()
210
            raise
211
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
212
213
    async def disconnect(self):
214
        """
215
        High level method
216
        Close session, secure channel and socket
217
        """
218
        _logger.info("disconnect")
219
        try:
220
            await self.close_session()
221
            await self.close_secure_channel()
222
        finally:
223
            self.disconnect_socket()
224
225
    async def connect_socket(self):
226
        """
227
        connect to socket defined in url
228
        """
229
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
230
231
    def disconnect_socket(self):
232
        self.uaclient.disconnect_socket()
233
234
    async def send_hello(self):
235
        """
236
        Send OPC-UA hello to server
237
        """
238
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
239
        if isinstance(ack, ua.UaStatusCodeError):
240
            raise ack
241
242
    async def open_secure_channel(self, renew=False):
243
        """
244
        Open secure channel, if renew is True, renew channel
245
        """
246
        params = ua.OpenSecureChannelParameters()
247
        params.ClientProtocolVersion = 0
248
        params.RequestType = ua.SecurityTokenRequestType.Issue
249
        if renew:
250
            params.RequestType = ua.SecurityTokenRequestType.Renew
251
        params.SecurityMode = self.security_policy.Mode
252
        params.RequestedLifetime = self.secure_channel_timeout
253
        # length should be equal to the length of key of symmetric encryption
254
        nonce = create_nonce(self.security_policy.symmetric_key_size)
255
        params.ClientNonce = nonce  # this nonce is used to create a symmetric key
256
        result = await self.uaclient.open_secure_channel(params)
257
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
258
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
259
260
    async def close_secure_channel(self):
261
        return await self.uaclient.close_secure_channel()
262
263
    async def get_endpoints(self) -> list:
264
        params = ua.GetEndpointsParameters()
265
        params.EndpointUrl = self.server_url.geturl()
266
        return await self.uaclient.get_endpoints(params)
267
268
    async def register_server(self, server, discovery_configuration=None):
269
        """
270
        register a server to discovery server
271
        if discovery_configuration is provided, the newer register_server2 service call is used
272
        """
273
        serv = ua.RegisteredServer()
274
        serv.ServerUri = server.get_application_uri()
275
        serv.ProductUri = server.product_uri
276
        serv.DiscoveryUrls = [server.endpoint.geturl()]
277
        serv.ServerType = server.application_type
278
        serv.ServerNames = [ua.LocalizedText(server.name)]
279
        serv.IsOnline = True
280
        if discovery_configuration:
281
            params = ua.RegisterServer2Parameters()
282
            params.Server = serv
283
            params.DiscoveryConfiguration = discovery_configuration
284
            return await self.uaclient.register_server2(params)
285
        return await self.uaclient.register_server(serv)
286
287
    async def find_servers(self, uris=None):
288
        """
289
        send a FindServer request to the server. The answer should be a list of
290
        servers the server knows about
291
        A list of uris can be provided, only server having matching uris will be returned
292
        """
293
        if uris is None:
294
            uris = []
295
        params = ua.FindServersParameters()
296
        params.EndpointUrl = self.server_url.geturl()
297
        params.ServerUris = uris
298
        return await self.uaclient.find_servers(params)
299
300
    async def find_servers_on_network(self):
301
        params = ua.FindServersOnNetworkParameters()
302
        return await self.uaclient.find_servers_on_network(params)
303
304
    async def create_session(self):
305
        """
306
        send a CreateSessionRequest to server with reasonable parameters.
307
        If you want o modify settings look at code of this methods
308
        and make your own
309
        """
310
        desc = ua.ApplicationDescription()
311
        desc.ApplicationUri = self.application_uri
312
        desc.ProductUri = self.product_uri
313
        desc.ApplicationName = ua.LocalizedText(self.name)
314
        desc.ApplicationType = ua.ApplicationType.Client
315
        params = ua.CreateSessionParameters()
316
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
317
        nonce = create_nonce(32)
318
        params.ClientNonce = nonce
319
        params.ClientCertificate = self.security_policy.client_certificate
320
        params.ClientDescription = desc
321
        params.EndpointUrl = self.server_url.geturl()
322
        params.SessionName = f"{self.description} Session{self._session_counter}"
323
        # Requested maximum number of milliseconds that a Session should remain open without activity
324
        params.RequestedSessionTimeout = 60 * 60 * 1000
325
        params.MaxResponseMessageSize = 0  # means no max size
326
        response = await self.uaclient.create_session(params)
327
        if self.security_policy.client_certificate is None:
328
            data = nonce
329
        else:
330
            data = self.security_policy.client_certificate + nonce
331
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
332
        self._server_nonce = response.ServerNonce
333
        if not self.security_policy.server_certificate:
334
            self.security_policy.server_certificate = response.ServerCertificate
335
        elif self.security_policy.server_certificate != response.ServerCertificate:
336
            raise ua.UaError("Server certificate mismatch")
337
        # remember PolicyId's: we will use them in activate_session()
338
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
339
        self._policy_ids = ep.UserIdentityTokens
340
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
341
        self.session_timeout = response.RevisedSessionTimeout
342
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
343
        return response
344
345
    async def _renew_channel_loop(self):
346
        """
347
        Renew the SecureChannel before the SessionTimeout will happen.
348
        In theory we could do that only if no session activity
349
        but it does not cost much..
350
        """
351
        try:
352
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 * 0.001
353
            while True:
354
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
355
                await asyncio.sleep(duration)
356
                self.logger.debug("renewing channel")
357
                await self.open_secure_channel(renew=True)
358
                val = await self.nodes.server_state.get_value()
359
                self.logger.debug("server state is: %s ", val)
360
        except asyncio.CancelledError:
361
            pass
362
363
    def server_policy_id(self, token_type, default):
364
        """
365
        Find PolicyId of server's UserTokenPolicy by token_type.
366
        Return default if there's no matching UserTokenPolicy.
367
        """
368
        for policy in self._policy_ids:
369
            if policy.TokenType == token_type:
370
                return policy.PolicyId
371
        return default
372
373
    def server_policy_uri(self, token_type):
374
        """
375
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
376
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
377
        of the endpoint
378
        """
379
        for policy in self._policy_ids:
380
            if policy.TokenType == token_type:
381
                if policy.SecurityPolicyUri:
382
                    return policy.SecurityPolicyUri
383
                # empty URI means "use this endpoint's policy URI"
384
                return self.security_policy.URI
385
        return self.security_policy.URI
386
387
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
388
        """
389
        Activate session using either username and password or private_key
390
        """
391
        params = ua.ActivateSessionParameters()
392
        challenge = b""
393
        if self.security_policy.server_certificate is not None:
394
            challenge += self.security_policy.server_certificate
395
        if self._server_nonce is not None:
396
            challenge += self._server_nonce
397
        params.ClientSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
398
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
399
        params.LocaleIds.append("en")
400
        if not username and not certificate:
401
            self._add_anonymous_auth(params)
402
        elif certificate:
403
            self._add_certificate_auth(params, certificate, challenge)
404
        else:
405
            self._add_user_auth(params, username, password)
406
        return await self.uaclient.activate_session(params)
407
408
    def _add_anonymous_auth(self, params):
409
        params.UserIdentityToken = ua.AnonymousIdentityToken()
410
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
411
412
    def _add_certificate_auth(self, params, certificate, challenge):
413
        params.UserIdentityToken = ua.X509IdentityToken()
414
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
415
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
416
        # specs part 4, 5.6.3.1: the data to sign is created by appending
417
        # the last serverNonce to the serverCertificate
418
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
419
        params.UserTokenSignature = ua.SignatureData()
420
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
421
        params.UserTokenSignature.Signature = sig
422
423
    def _add_user_auth(self, params, username: str, password: str):
424
        params.UserIdentityToken = ua.UserNameIdentityToken()
425
        params.UserIdentityToken.UserName = username
426
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
427
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
428
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
429
            # then the password only contains UTF-8 encoded password
430
            # and EncryptionAlgorithm is null
431
            if self._password:
432
                self.logger.warning("Sending plain-text password")
433
                params.UserIdentityToken.Password = password.encode("utf8")
434
            params.UserIdentityToken.EncryptionAlgorithm = None
435
        elif self._password:
436
            data, uri = self._encrypt_password(password, policy_uri)
437
            params.UserIdentityToken.Password = data
438
            params.UserIdentityToken.EncryptionAlgorithm = uri
439
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
440
441
    def _encrypt_password(self, password: str, policy_uri):
442
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
443
        # see specs part 4, 7.36.3: if the token is encrypted, password
444
        # shall be converted to UTF-8 and serialized with server nonce
445
        passwd = password.encode("utf8")
446
        if self._server_nonce is not None:
447
            passwd += self._server_nonce
448
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
449
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
450
        return data, uri
451
452
    async def close_session(self) -> Coroutine:
453
        """
454
        Close session
455
        """
456
        self._renew_channel_task.cancel()
457
        await self._renew_channel_task
458
        return await self.uaclient.close_session(True)
459
460
    def get_root_node(self):
461
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
462
463
    def get_objects_node(self):
464
        self.logger.info("get_objects_node")
465
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
466
467
    def get_server_node(self):
468
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
469
470
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
471
        """
472
        Get node using NodeId object or a string representing a NodeId.
473
        """
474
        return Node(self.uaclient, nodeid)
475
476
    async def create_subscription(self, period, handler):
477
        """
478
        Create a subscription.
479
        Returns a Subscription object which allows to subscribe to events or data changes on server.
480
481
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
482
        The second option should be used, if the asyncua-server has problems with the default options.
483
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
484
        base class for details). Remember not to block the main event loop inside the handler methods.
485
        """
486
        if isinstance(period, ua.CreateSubscriptionParameters):
487
            params = period
488
        else:
489
            params = ua.CreateSubscriptionParameters()
490
            params.RequestedPublishingInterval = period
491
            params.RequestedLifetimeCount = 10000
492
            params.RequestedMaxKeepAliveCount = 3000
493
            params.MaxNotificationsPerPublish = 10000
494
            params.PublishingEnabled = True
495
            params.Priority = 0
496
        subscription = Subscription(self.uaclient, params, handler)
497
        await subscription.init()
498
        return subscription
499
500
    def get_namespace_array(self) -> Coroutine:
501
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
502
        return ns_node.get_value()
503
504
    async def get_namespace_index(self, uri):
505
        uries = await self.get_namespace_array()
506
        _logger.info("get_namespace_index %s %r", type(uries), uries)
507
        return uries.index(uri)
508
509
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
510
        return delete_nodes(self.uaclient, nodes, recursive)
511
512
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
513
        """
514
        Import nodes defined in xml
515
        """
516
        importer = XmlImporter(self)
517
        return importer.import_xml(path, xmlstring)
518
519
    async def export_xml(self, nodes, path):
520
        """
521
        Export defined nodes to xml
522
        """
523
        exp = XmlExporter(self)
524
        await exp.build_etree(nodes)
525
        await exp.write_xml(path)
526
527 View Code Duplication
    async def register_namespace(self, uri):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
528
        """
529
        Register a new namespace. Nodes should in custom namespace, not 0.
530
        This method is mainly implemented for symetry with server
531
        """
532
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
533
        uries = await ns_node.get_value()
534
        if uri in uries:
535
            return uries.index(uri)
536
        uries.append(uri)
537
        await ns_node.set_value(uries)
538
        return len(uries) - 1
539
540
    def load_type_definitions(self, nodes=None) -> Coroutine:
541
        """
542
        Load custom types (custom structures/extension objects) definition from server
543
        Generate Python classes for custom structures/extension objects defined in server
544
        These classes will available in ua module
545
        """
546
        return load_type_definitions(self, nodes)
547
548
    def load_enums(self) -> Coroutine:
549
        """
550
        generate Python enums for custom enums on server.
551
        This enums will be available in ua module
552
        """
553
        return load_enums(self)
554
555
    async def register_nodes(self, nodes):
556
        """
557
        Register nodes for faster read and write access (if supported by server)
558
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
559
        available as node.basenodeid
560
        """
561
        nodeids = [node.nodeid for node in nodes]
562
        nodeids = await self.uaclient.register_nodes(nodeids)
563
        for node, nodeid in zip(nodes, nodeids):
564
            node.basenodeid = node.nodeid
565
            node.nodeid = nodeid
566
        return nodes
567
568
    async def unregister_nodes(self, nodes):
569
        """
570
        Unregister nodes
571
        """
572
        nodeids = [node.nodeid for node in nodes]
573
        await self.uaclient.unregister_nodes(nodeids)
574
        for node in nodes:
575
            if not node.basenodeid:
576
                continue
577
            node.nodeid = node.basenodeid
578
            node.basenodeid = None
579
580
    async def get_values(self, nodes):
581
        """
582
        Read the value of multiple nodes in one ua call.
583
        """
584
        nodeids = [node.nodeid for node in nodes]
585
        results = await self.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
586
        return [result.Value.Value for result in results]
587
588
    async def set_values(self, nodes, values):
589
        """
590
        Write values to multiple nodes in one ua call
591
        """
592
        nodeids = [node.nodeid for node in nodes]
593
        dvs = [value_to_datavalue(val) for val in values]
594
        results = await self.uaclient.set_attributes(nodeids, dvs, ua.AttributeIds.Value)
595
        for result in results:
596
            result.check()
597