Passed
Pull Request — master (#156)
by
unknown
02:27
created

asyncua.client.client.Client._renew_channel_loop()   A

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 14
nop 1
dl 0
loc 20
rs 9.7
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from typing import Union, Coroutine
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient
8
from ..common.xmlimporter import XmlImporter
9
from ..common.xmlexporter import XmlExporter
10
from ..common.node import Node
11
from ..common.manage_nodes import delete_nodes
12
from ..common.subscription import Subscription
13
from ..common.shortcuts import Shortcuts
14
from ..common.structures import load_type_definitions, load_enums
15
from ..common.utils import create_nonce
16
from ..common.ua_utils import value_to_datavalue
17
from ..crypto import uacrypto, security_policies
18
from ..ua import ServerState
19
20
_logger = logging.getLogger(__name__)
21
22
23
class Client:
24
    """
25
    High level client to connect to an OPC-UA server.
26
27
    This class makes it easy to connect and browse address space.
28
    It attempts to expose as much functionality as possible
29
    but if you want more flexibility it is possible and advised to
30
    use UaClient object, available as self.uaclient
31
    which offers the raw OPC-UA services interface.
32
    """
33
    def __init__(self, url: str, timeout: int = 4, loop=None):
34
        """
35
        :param url: url of the server.
36
            if you are unsure of url, write at least hostname
37
            and port and call get_endpoints
38
39
        :param timeout:
40
            Each request sent to the server expects an answer within this
41
            time. The timeout is specified in seconds.
42
43
        Some other client parameters can be changed by setting
44
        attributes on the constructed object:
45
        See the source code for the exhaustive list.
46
        """
47
        self.loop = loop or asyncio.get_event_loop()
48
        self.server_url = urlparse(url)
49
        # take initial username and password from the url
50
        self._username = self.server_url.username
51
        self._password = self.server_url.password
52
        self.name = "Pure Python Async. Client"
53
        self.description = self.name
54
        self.application_uri = "urn:freeopcua:client"
55
        self.product_uri = "urn:freeopcua.github.io:client"
56
        self.security_policy = ua.SecurityPolicy()
57
        self.secure_channel_id = None
58
        self.secure_channel_timeout = 3600000  # 1 hour
59
        self.session_timeout = 3600000  # 1 hour
60
        self._policy_ids = []
61
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
62
        self.user_certificate = None
63
        self.user_private_key = None
64
        self._server_nonce = None
65
        self._session_counter = 1
66
        self.nodes = Shortcuts(self.uaclient)
67
        self.max_messagesize = 0  # No limits
68
        self.max_chunkcount = 0  # No limits
69
        self._renew_channel_task = None
70
71
    async def __aenter__(self):
72
        await self.connect()
73
        return self
74
75
    async def __aexit__(self, exc_type, exc_value, traceback):
76
        await self.disconnect()
77
78
    def __str__(self):
79
        return f"Client({self.server_url.geturl()})"
80
81
    __repr__ = __str__
82
83
    @staticmethod
84
    def find_endpoint(endpoints, security_mode, policy_uri):
85
        """
86
        Find endpoint with required security mode and policy URI
87
        """
88
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
89
        for ep in endpoints:
90
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and ep.SecurityPolicyUri == policy_uri):
91
                return ep
92
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
93
94
    def set_user(self, username: str):
95
        """
96
        Set user name for the connection.
97
        initial user from the URL will be overwritten
98
        """
99
        self._username = username
100
101
    def set_password(self, pwd: str):
102
        """
103
        Set user password for the connection.
104
        initial password from the URL will be overwritten
105
        """
106
        if not isinstance(pwd, str):
107
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
108
        self._password = pwd
109
110
    async def set_security_string(self, string: str):
111
        """
112
        Set SecureConnection mode.
113
114
        :param string: Mode format ``Policy,Mode,certificate,private_key[,server_private_key]``
115
116
        where:
117
118
        - ``Policy`` is ``Basic128Rsa15``, ``Basic256`` or ``Basic256Sha256``
119
        - ``Mode`` is ``Sign`` or ``SignAndEncrypt``
120
        - ``certificate``, ``private_key`` and ``server_private_key`` are paths to ``.pem`` or ``.der`` files
121
122
        Call this before connect()
123
        """
124
        if not string:
125
            return
126
        parts = string.split(",")
127
        if len(parts) < 4:
128
            raise ua.UaError("Wrong format: `{}`, expected at least 4 comma-separated values".format(string))
129
        policy_class = getattr(security_policies, "SecurityPolicy{}".format(parts[0]))
130
        mode = getattr(ua.MessageSecurityMode, parts[1])
131
        return await self.set_security(policy_class, parts[2], parts[3], parts[4] if len(parts) >= 5 else None, mode)
132
133
    async def set_security(self,
134
                           policy,
135
                           certificate_path: str,
136
                           private_key_path: str,
137
                           server_certificate_path: str = None,
138
                           mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
139
        """
140
        Set SecureConnection mode.
141
        Call this before connect()
142
        """
143
        if server_certificate_path is None:
144
            # load certificate from server's list of endpoints
145
            endpoints = await self.connect_and_get_server_endpoints()
146
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
147
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
148
        else:
149
            server_cert = await uacrypto.load_certificate(server_certificate_path)
150
        cert = await uacrypto.load_certificate(certificate_path)
151
        pk = await uacrypto.load_private_key(private_key_path)
152
        self.security_policy = policy(server_cert, cert, pk, mode)
153
        self.uaclient.set_security(self.security_policy)
154
155
    async def load_client_certificate(self, path: str):
156
        """
157
        load our certificate from file, either pem or der
158
        """
159
        self.user_certificate = await uacrypto.load_certificate(path)
160
161
    async def load_private_key(self, path, password=None, format=None):
162
        """
163
        Load user private key. This is used for authenticating using certificate
164
        """
165
        self.user_private_key = await uacrypto.load_private_key(path, password, format)
166
167
    async def connect_and_get_server_endpoints(self):
168
        """
169
        Connect, ask server for endpoints, and disconnect
170
        """
171
        await self.connect_socket()
172
        try:
173
            await self.send_hello()
174
            await self.open_secure_channel()
175
            endpoints = await self.get_endpoints()
176
            await self.close_secure_channel()
177
        finally:
178
            self.disconnect_socket()
179
        return endpoints
180
181
    async def connect_and_find_servers(self):
182
        """
183
        Connect, ask server for a list of known servers, and disconnect
184
        """
185
        await self.connect_socket()
186
        try:
187
            await self.send_hello()
188
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
189
            servers = await self.find_servers()
190
            await self.close_secure_channel()
191
        finally:
192
            self.disconnect_socket()
193
        return servers
194
195
    async def connect_and_find_servers_on_network(self):
196
        """
197
        Connect, ask server for a list of known servers on network, and disconnect
198
        """
199
        await self.connect_socket()
200
        try:
201
            await self.send_hello()
202
            await self.open_secure_channel()
203
            servers = await self.find_servers_on_network()
204
            await self.close_secure_channel()
205
        finally:
206
            self.disconnect_socket()
207
        return servers
208
209
    async def connect(self):
210
        """
211
        High level method
212
        Connect, create and activate session
213
        """
214
        _logger.info("connect")
215
        await self.connect_socket()
216
        try:
217
            await self.send_hello()
218
            await self.open_secure_channel()
219
            await self.create_session()
220
        except Exception:
221
            # clean up open socket
222
            self.disconnect_socket()
223
            raise
224
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
225
226
    async def disconnect(self):
227
        """
228
        High level method
229
        Close session, secure channel and socket
230
        """
231
        _logger.info("disconnect")
232
        try:
233
            await self.close_session()
234
            await self.close_secure_channel()
235
        finally:
236
            self.disconnect_socket()
237
238
    async def connect_socket(self):
239
        """
240
        connect to socket defined in url
241
        """
242
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
243
244
    def disconnect_socket(self):
245
        self.uaclient.disconnect_socket()
246
247
    async def is_server_running(self):
248
        """
249
        High level method to know if the connection is still Good
250
        Access Server state and evaluates ServerState node.
251
        Raises:
252
            ua.UaError: OPCUA server is not running
253
        """
254
        server_state_node = self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
255
        server_state = await server_state_node.get_value()
256
        if server_state == ServerState.Running:
257
            return True
258
259
        raise ua.UaError("Server is not running. Server state: {0}".format(ServerState(server_state).name))
260
261
    async def send_hello(self):
262
        """
263
        Send OPC-UA hello to server
264
        """
265
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
266
        if isinstance(ack, ua.UaStatusCodeError):
267
            raise ack
268
269
    async def open_secure_channel(self, renew=False):
270
        """
271
        Open secure channel, if renew is True, renew channel
272
        """
273
        params = ua.OpenSecureChannelParameters()
274
        params.ClientProtocolVersion = 0
275
        params.RequestType = ua.SecurityTokenRequestType.Issue
276
        if renew:
277
            params.RequestType = ua.SecurityTokenRequestType.Renew
278
        params.SecurityMode = self.security_policy.Mode
279
        params.RequestedLifetime = self.secure_channel_timeout
280
        # length should be equal to the length of key of symmetric encryption
281
        params.ClientNonce = create_nonce(self.security_policy.symmetric_key_size)
282
        result = await self.uaclient.open_secure_channel(params)
283
        if self.secure_channel_timeout != result.SecurityToken.RevisedLifetime:
284
            _logger.info("Requested secure channel timeout to be %dms, got %dms instead", self.secure_channel_timeout, result.SecurityToken.RevisedLifetime)
285
            self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
286
287
    async def close_secure_channel(self):
288
        return await self.uaclient.close_secure_channel()
289
290
    async def get_endpoints(self) -> list:
291
        """Get a list of OPC-UA endpoints."""
292
293
        params = ua.GetEndpointsParameters()
294
        params.EndpointUrl = self.server_url.geturl()
295
        return await self.uaclient.get_endpoints(params)
296
297
    async def register_server(self, server, discovery_configuration=None):
298
        """
299
        register a server to discovery server
300
        if discovery_configuration is provided, the newer register_server2 service call is used
301
        """
302
        serv = ua.RegisteredServer()
303
        serv.ServerUri = server.get_application_uri()
304
        serv.ProductUri = server.product_uri
305
        serv.DiscoveryUrls = [server.endpoint.geturl()]
306
        serv.ServerType = server.application_type
307
        serv.ServerNames = [ua.LocalizedText(server.name)]
308
        serv.IsOnline = True
309
        if discovery_configuration:
310
            params = ua.RegisterServer2Parameters()
311
            params.Server = serv
312
            params.DiscoveryConfiguration = discovery_configuration
313
            return await self.uaclient.register_server2(params)
314
        return await self.uaclient.register_server(serv)
315
316
    async def find_servers(self, uris=None):
317
        """
318
        send a FindServer request to the server. The answer should be a list of
319
        servers the server knows about
320
        A list of uris can be provided, only server having matching uris will be returned
321
        """
322
        if uris is None:
323
            uris = []
324
        params = ua.FindServersParameters()
325
        params.EndpointUrl = self.server_url.geturl()
326
        params.ServerUris = uris
327
        return await self.uaclient.find_servers(params)
328
329
    async def find_servers_on_network(self):
330
        params = ua.FindServersOnNetworkParameters()
331
        return await self.uaclient.find_servers_on_network(params)
332
333
    async def create_session(self):
334
        """
335
        send a CreateSessionRequest to server with reasonable parameters.
336
        If you want o modify settings look at code of this methods
337
        and make your own
338
        """
339
        desc = ua.ApplicationDescription()
340
        desc.ApplicationUri = self.application_uri
341
        desc.ProductUri = self.product_uri
342
        desc.ApplicationName = ua.LocalizedText(self.name)
343
        desc.ApplicationType = ua.ApplicationType.Client
344
        params = ua.CreateSessionParameters()
345
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
346
        nonce = create_nonce(32)
347
        params.ClientNonce = nonce
348
        params.ClientCertificate = self.security_policy.client_certificate
349
        params.ClientDescription = desc
350
        params.EndpointUrl = self.server_url.geturl()
351
        params.SessionName = f"{self.description} Session{self._session_counter}"
352
        # Requested maximum number of milliseconds that a Session should remain open without activity
353
        params.RequestedSessionTimeout = self.session_timeout
354
        params.MaxResponseMessageSize = 0  # means no max size
355
        response = await self.uaclient.create_session(params)
356
        if self.security_policy.client_certificate is None:
357
            data = nonce
358
        else:
359
            data = self.security_policy.client_certificate + nonce
360
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
361
        self._server_nonce = response.ServerNonce
362
        if not self.security_policy.server_certificate:
363
            self.security_policy.server_certificate = response.ServerCertificate
364
        elif self.security_policy.server_certificate != response.ServerCertificate:
365
            raise ua.UaError("Server certificate mismatch")
366
        # remember PolicyId's: we will use them in activate_session()
367
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
368
        self._policy_ids = ep.UserIdentityTokens
369
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
370
        if self.session_timeout != response.RevisedSessionTimeout:
371
            _logger.warning("Requested session timeout to be %dms, got %dms instead", self.secure_channel_timeout, response.RevisedSessionTimeout)
372
            self.session_timeout = response.RevisedSessionTimeout
373
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
374
        return response
375
376
    async def _renew_channel_loop(self):
377
        """
378
        Renew the SecureChannel before the SessionTimeout will happen.
379
        In theory we could do that only if no session activity
380
        but it does not cost much..
381
        """
382
        try:
383
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 / 1000
384
            while True:
385
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
386
                await asyncio.sleep(duration)
387
                _logger.debug("renewing channel")
388
                await self.open_secure_channel(renew=True)
389
                val = await self.nodes.server_state.read_value()
390
                _logger.debug("server state is: %s ", val)
391
        except asyncio.CancelledError:
392
            pass
393
        except:
394
            _logger.exception("Error while renewing session")
395
            raise
396
397
    def server_policy_id(self, token_type, default):
398
        """
399
        Find PolicyId of server's UserTokenPolicy by token_type.
400
        Return default if there's no matching UserTokenPolicy.
401
        """
402
        for policy in self._policy_ids:
403
            if policy.TokenType == token_type:
404
                return policy.PolicyId
405
        return default
406
407
    def server_policy_uri(self, token_type):
408
        """
409
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
410
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
411
        of the endpoint
412
        """
413
        for policy in self._policy_ids:
414
            if policy.TokenType == token_type:
415
                if policy.SecurityPolicyUri:
416
                    return policy.SecurityPolicyUri
417
                # empty URI means "use this endpoint's policy URI"
418
                return self.security_policy.URI
419
        return self.security_policy.URI
420
421
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
422
        """
423
        Activate session using either username and password or private_key
424
        """
425
        params = ua.ActivateSessionParameters()
426
        challenge = b""
427
        if self.security_policy.server_certificate is not None:
428
            challenge += self.security_policy.server_certificate
429
        if self._server_nonce is not None:
430
            challenge += self._server_nonce
431
        if self.security_policy.AsymmetricSignatureURI:
432
            params.ClientSignature.Algorithm = self.security_policy.AsymmetricSignatureURI
433
        else:
434
            params.ClientSignature.Algorithm = (
435
                security_policies.SecurityPolicyBasic256.AsymmetricSignatureURI
436
            )
437
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
438
        params.LocaleIds.append("en")
439
        if not username and not certificate:
440
            self._add_anonymous_auth(params)
441
        elif certificate:
442
            self._add_certificate_auth(params, certificate, challenge)
443
        else:
444
            self._add_user_auth(params, username, password)
445
        return await self.uaclient.activate_session(params)
446
447
    def _add_anonymous_auth(self, params):
448
        params.UserIdentityToken = ua.AnonymousIdentityToken()
449
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
450
451
    def _add_certificate_auth(self, params, certificate, challenge):
452
        params.UserIdentityToken = ua.X509IdentityToken()
453
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
454
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
455
        # specs part 4, 5.6.3.1: the data to sign is created by appending
456
        # the last serverNonce to the serverCertificate
457
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
458
        params.UserTokenSignature = ua.SignatureData()
459
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
460
        params.UserTokenSignature.Signature = sig
461
462
    def _add_user_auth(self, params, username: str, password: str):
463
        params.UserIdentityToken = ua.UserNameIdentityToken()
464
        params.UserIdentityToken.UserName = username
465
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
466
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
467
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
468
            # then the password only contains UTF-8 encoded password
469
            # and EncryptionAlgorithm is null
470
            if self._password:
471
                _logger.warning("Sending plain-text password")
472
                params.UserIdentityToken.Password = password.encode("utf8")
473
            params.UserIdentityToken.EncryptionAlgorithm = None
474
        elif self._password:
475
            data, uri = self._encrypt_password(password, policy_uri)
476
            params.UserIdentityToken.Password = data
477
            params.UserIdentityToken.EncryptionAlgorithm = uri
478
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
479
480
    def _encrypt_password(self, password: str, policy_uri):
481
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
482
        # see specs part 4, 7.36.3: if the token is encrypted, password
483
        # shall be converted to UTF-8 and serialized with server nonce
484
        passwd = password.encode("utf8")
485
        if self._server_nonce is not None:
486
            passwd += self._server_nonce
487
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
488
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
489
        return data, uri
490
491
    async def close_session(self) -> Coroutine:
492
        """
493
        Close session
494
        """
495
        self._renew_channel_task.cancel()
496
        await self._renew_channel_task
497
        return await self.uaclient.close_session(True)
498
499
    def get_root_node(self):
500
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
501
502
    def get_objects_node(self):
503
        _logger.info("get_objects_node")
504
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
505
506
    def get_server_node(self):
507
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
508
509
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
510
        """
511
        Get node using NodeId object or a string representing a NodeId.
512
        """
513
        return Node(self.uaclient, nodeid)
514
515
    async def create_subscription(self, period, handler):
516
        """
517
        Create a subscription.
518
        Returns a Subscription object which allows to subscribe to events or data changes on server.
519
520
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
521
            The second option should be used, if the asyncua-server has problems with the default options.
522
523
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
524
            base class for details). Remember not to block the main event loop inside the handler methods.
525
        """
526
        if isinstance(period, ua.CreateSubscriptionParameters):
527
            params = period
528
        else:
529
            params = ua.CreateSubscriptionParameters()
530
            params.RequestedPublishingInterval = period
531
            params.RequestedLifetimeCount = 10000
532
            params.RequestedMaxKeepAliveCount = 3000
533
            params.MaxNotificationsPerPublish = 10000
534
            params.PublishingEnabled = True
535
            params.Priority = 0
536
        subscription = Subscription(self.uaclient, params, handler)
537
        await subscription.init()
538
        return subscription
539
540
    def get_namespace_array(self) -> Coroutine:
541
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
542
        return ns_node.read_value()
543
544
    async def get_namespace_index(self, uri):
545
        uries = await self.get_namespace_array()
546
        _logger.info("get_namespace_index %s %r", type(uries), uries)
547
        return uries.index(uri)
548
549
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
550
        return delete_nodes(self.uaclient, nodes, recursive)
551
552
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
553
        """
554
        Import nodes defined in xml
555
        """
556
        importer = XmlImporter(self)
557
        return importer.import_xml(path, xmlstring)
558
559
    async def export_xml(self, nodes, path):
560
        """
561
        Export defined nodes to xml
562
        """
563
        exp = XmlExporter(self)
564
        await exp.build_etree(nodes)
565
        await exp.write_xml(path)
566
567 View Code Duplication
    async def register_namespace(self, uri):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
568
        """
569
        Register a new namespace. Nodes should in custom namespace, not 0.
570
        This method is mainly implemented for symetry with server
571
        """
572
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
573
        uries = await ns_node.read_value()
574
        if uri in uries:
575
            return uries.index(uri)
576
        uries.append(uri)
577
        await ns_node.write_value(uries)
578
        return len(uries) - 1
579
580
    def load_type_definitions(self, nodes=None) -> Coroutine:
581
        """
582
        Load custom types (custom structures/extension objects) definition from server
583
        Generate Python classes for custom structures/extension objects defined in server
584
        These classes will available in ua module
585
        """
586
        return load_type_definitions(self, nodes)
587
588
    def load_enums(self) -> Coroutine:
589
        """
590
        generate Python enums for custom enums on server.
591
        This enums will be available in ua module
592
        """
593
        return load_enums(self)
594
595
    async def register_nodes(self, nodes):
596
        """
597
        Register nodes for faster read and write access (if supported by server)
598
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
599
        available as node.basenodeid
600
        """
601
        nodeids = [node.nodeid for node in nodes]
602
        nodeids = await self.uaclient.register_nodes(nodeids)
603
        for node, nodeid in zip(nodes, nodeids):
604
            node.basenodeid = node.nodeid
605
            node.nodeid = nodeid
606
        return nodes
607
608
    async def unregister_nodes(self, nodes):
609
        """
610
        Unregister nodes
611
        """
612
        nodeids = [node.nodeid for node in nodes]
613
        await self.uaclient.unregister_nodes(nodeids)
614
        for node in nodes:
615
            if not node.basenodeid:
616
                continue
617
            node.nodeid = node.basenodeid
618
            node.basenodeid = None
619
620
    async def read_values(self, nodes):
621
        """
622
        Read the value of multiple nodes in one ua call.
623
        """
624
        nodeids = [node.nodeid for node in nodes]
625
        results = await self.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
626
        return [result.Value.Value for result in results]
627
628
    async def write_values(self, nodes, values):
629
        """
630
        Write values to multiple nodes in one ua call
631
        """
632
        nodeids = [node.nodeid for node in nodes]
633
        dvs = [value_to_datavalue(val) for val in values]
634
        results = await self.uaclient.set_attributes(nodeids, dvs, ua.AttributeIds.Value)
635
        for result in results:
636
            result.check()
637
638
    get_values = read_values  # legacy compatibility
639
    set_values = write_values  # legacy compatibility
640