Passed
Pull Request — master (#157)
by
unknown
02:45
created

asyncua.client.client.Client.get_endpoints()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from typing import Union, Coroutine
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient, UASocketProtocol
8
from ..common.manage_nodes import delete_nodes
9
from ..common.node import Node
10
from ..common.shortcuts import Shortcuts
11
from ..common.structures import load_type_definitions, load_enums
12
from ..common.subscription import Subscription
13
from ..common.ua_utils import value_to_datavalue
14
from ..common.utils import create_nonce
15
from ..common.xmlexporter import XmlExporter
16
from ..common.xmlimporter import XmlImporter
17
from ..crypto import uacrypto, security_policies
18
19
_logger = logging.getLogger(__name__)
20
21
22
class Client:
23
    """
24
    High level client to connect to an OPC-UA server.
25
26
    This class makes it easy to connect and browse address space.
27
    It attempts to expose as much functionality as possible
28
    but if you want more flexibility it is possible and advised to
29
    use UaClient object, available as self.uaclient
30
    which offers the raw OPC-UA services interface.
31
    """
32
33
    def __init__(self, url: str, timeout: int = 4, loop=None):
34
        """
35
        :param url: url of the server.
36
            if you are unsure of url, write at least hostname
37
            and port and call get_endpoints
38
39
        :param timeout:
40
            Each request sent to the server expects an answer within this
41
            time. The timeout is specified in seconds.
42
43
        Some other client parameters can be changed by setting
44
        attributes on the constructed object:
45
        See the source code for the exhaustive list.
46
        """
47
        self.loop = loop or asyncio.get_event_loop()
48
        self.server_url = urlparse(url)
49
        # take initial username and password from the url
50
        self._username = self.server_url.username
51
        self._password = self.server_url.password
52
        self.name = "Pure Python Async. Client"
53
        self.description = self.name
54
        self.application_uri = "urn:freeopcua:client"
55
        self.product_uri = "urn:freeopcua.github.io:client"
56
        self.security_policy = ua.SecurityPolicy()
57
        self.secure_channel_id = None
58
        self.secure_channel_timeout = 3600000  # 1 hour
59
        self.session_timeout = 3600000  # 1 hour
60
        self._policy_ids = []
61
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
62
        self.user_certificate = None
63
        self.user_private_key = None
64
        self._server_nonce = None
65
        self._session_counter = 1
66
        self.nodes = Shortcuts(self.uaclient)
67
        self.max_messagesize = 0  # No limits
68
        self.max_chunkcount = 0  # No limits
69
        self._renew_channel_task = None
70
71
    async def __aenter__(self):
72
        await self.connect()
73
        return self
74
75
    async def __aexit__(self, exc_type, exc_value, traceback):
76
        await self.disconnect()
77
78
    def __str__(self):
79
        return f"Client({self.server_url.geturl()})"
80
81
    __repr__ = __str__
82
83
    @staticmethod
84
    def find_endpoint(endpoints, security_mode, policy_uri):
85
        """
86
        Find endpoint with required security mode and policy URI
87
        """
88
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
89
        for ep in endpoints:
90
            if (ep.EndpointUrl.startswith(
91
                    ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and ep.SecurityPolicyUri == policy_uri):
92
                return ep
93
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
94
95
    def set_user(self, username: str):
96
        """
97
        Set user name for the connection.
98
        initial user from the URL will be overwritten
99
        """
100
        self._username = username
101
102
    def set_password(self, pwd: str):
103
        """
104
        Set user password for the connection.
105
        initial password from the URL will be overwritten
106
        """
107
        if not isinstance(pwd, str):
108
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
109
        self._password = pwd
110
111
    async def set_security_string(self, string: str):
112
        """
113
        Set SecureConnection mode.
114
115
        :param string: Mode format ``Policy,Mode,certificate,private_key[,server_private_key]``
116
117
        where:
118
119
        - ``Policy`` is ``Basic128Rsa15``, ``Basic256`` or ``Basic256Sha256``
120
        - ``Mode`` is ``Sign`` or ``SignAndEncrypt``
121
        - ``certificate``, ``private_key`` and ``server_private_key`` are paths to ``.pem`` or ``.der`` files
122
123
        Call this before connect()
124
        """
125
        if not string:
126
            return
127
        parts = string.split(",")
128
        if len(parts) < 4:
129
            raise ua.UaError("Wrong format: `{}`, expected at least 4 comma-separated values".format(string))
130
        policy_class = getattr(security_policies, "SecurityPolicy{}".format(parts[0]))
131
        mode = getattr(ua.MessageSecurityMode, parts[1])
132
        return await self.set_security(policy_class, parts[2], parts[3], parts[4] if len(parts) >= 5 else None, mode)
133
134
    async def set_security(self,
135
                           policy,
136
                           certificate_path: str,
137
                           private_key_path: str,
138
                           server_certificate_path: str = None,
139
                           mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
140
        """
141
        Set SecureConnection mode.
142
        Call this before connect()
143
        """
144
        if server_certificate_path is None:
145
            # load certificate from server's list of endpoints
146
            endpoints = await self.connect_and_get_server_endpoints()
147
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
148
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
149
        else:
150
            server_cert = await uacrypto.load_certificate(server_certificate_path)
151
        cert = await uacrypto.load_certificate(certificate_path)
152
        pk = await uacrypto.load_private_key(private_key_path)
153
        self.security_policy = policy(server_cert, cert, pk, mode)
154
        self.uaclient.set_security(self.security_policy)
155
156
    async def load_client_certificate(self, path: str):
157
        """
158
        load our certificate from file, either pem or der
159
        """
160
        self.user_certificate = await uacrypto.load_certificate(path)
161
162
    async def load_private_key(self, path, password=None, format=None):
163
        """
164
        Load user private key. This is used for authenticating using certificate
165
        """
166
        self.user_private_key = await uacrypto.load_private_key(path, password, format)
167
168
    async def connect_and_get_server_endpoints(self):
169
        """
170
        Connect, ask server for endpoints, and disconnect
171
        """
172
        await self.connect_socket()
173
        try:
174
            await self.send_hello()
175
            await self.open_secure_channel()
176
            endpoints = await self.get_endpoints()
177
            await self.close_secure_channel()
178
        finally:
179
            self.disconnect_socket()
180
        return endpoints
181
182
    async def connect_and_find_servers(self):
183
        """
184
        Connect, ask server for a list of known servers, and disconnect
185
        """
186
        await self.connect_socket()
187
        try:
188
            await self.send_hello()
189
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
190
            servers = await self.find_servers()
191
            await self.close_secure_channel()
192
        finally:
193
            self.disconnect_socket()
194
        return servers
195
196
    async def connect_and_find_servers_on_network(self):
197
        """
198
        Connect, ask server for a list of known servers on network, and disconnect
199
        """
200
        await self.connect_socket()
201
        try:
202
            await self.send_hello()
203
            await self.open_secure_channel()
204
            servers = await self.find_servers_on_network()
205
            await self.close_secure_channel()
206
        finally:
207
            self.disconnect_socket()
208
        return servers
209
210
    async def connect(self):
211
        """
212
        High level method
213
        Connect, create and activate session
214
        """
215
        _logger.info("connect")
216
        await self.connect_socket()
217
        try:
218
            await self.send_hello()
219
            await self.open_secure_channel()
220
            await self.create_session()
221
        except Exception:
222
            # clean up open socket
223
            self.disconnect_socket()
224
            raise
225
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
226
227
    async def disconnect(self):
228
        """
229
        High level method
230
        Close session, secure channel and socket
231
        """
232
        _logger.info("disconnect")
233
        try:
234
            await self.close_session()
235
            await self.close_secure_channel()
236
        finally:
237
            self.disconnect_socket()
238
239
    async def connect_socket(self):
240
        """
241
        connect to socket defined in url
242
        """
243
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
244
245
    def disconnect_socket(self):
246
        self.uaclient.disconnect_socket()
247
248
    async def send_hello(self):
249
        """
250
        Send OPC-UA hello to server
251
        """
252
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
253
        if isinstance(ack, ua.UaStatusCodeError):
254
            raise ack
255
256
    async def open_secure_channel(self, renew=False):
257
        """
258
        Open secure channel, if renew is True, renew channel
259
        """
260
        params = ua.OpenSecureChannelParameters()
261
        params.ClientProtocolVersion = 0
262
        params.RequestType = ua.SecurityTokenRequestType.Issue
263
        if renew:
264
            params.RequestType = ua.SecurityTokenRequestType.Renew
265
        params.SecurityMode = self.security_policy.Mode
266
        params.RequestedLifetime = self.secure_channel_timeout
267
        # length should be equal to the length of key of symmetric encryption
268
        params.ClientNonce = create_nonce(self.security_policy.symmetric_key_size)
269
        result = await self.uaclient.open_secure_channel(params)
270
        if self.secure_channel_timeout != result.SecurityToken.RevisedLifetime:
271
            _logger.info("Requested secure channel timeout to be %dms, got %dms instead", self.secure_channel_timeout,
272
                         result.SecurityToken.RevisedLifetime)
273
            self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
274
275
    async def close_secure_channel(self):
276
        return await self.uaclient.close_secure_channel()
277
278
    async def get_endpoints(self) -> list:
279
        """Get a list of OPC-UA endpoints."""
280
281
        params = ua.GetEndpointsParameters()
282
        params.EndpointUrl = self.server_url.geturl()
283
        return await self.uaclient.get_endpoints(params)
284
285
    async def register_server(self, server, discovery_configuration=None):
286
        """
287
        register a server to discovery server
288
        if discovery_configuration is provided, the newer register_server2 service call is used
289
        """
290
        serv = ua.RegisteredServer()
291
        serv.ServerUri = server.get_application_uri()
292
        serv.ProductUri = server.product_uri
293
        serv.DiscoveryUrls = [server.endpoint.geturl()]
294
        serv.ServerType = server.application_type
295
        serv.ServerNames = [ua.LocalizedText(server.name)]
296
        serv.IsOnline = True
297
        if discovery_configuration:
298
            params = ua.RegisterServer2Parameters()
299
            params.Server = serv
300
            params.DiscoveryConfiguration = discovery_configuration
301
            return await self.uaclient.register_server2(params)
302
        return await self.uaclient.register_server(serv)
303
304
    async def find_servers(self, uris=None):
305
        """
306
        send a FindServer request to the server. The answer should be a list of
307
        servers the server knows about
308
        A list of uris can be provided, only server having matching uris will be returned
309
        """
310
        if uris is None:
311
            uris = []
312
        params = ua.FindServersParameters()
313
        params.EndpointUrl = self.server_url.geturl()
314
        params.ServerUris = uris
315
        return await self.uaclient.find_servers(params)
316
317
    async def find_servers_on_network(self):
318
        params = ua.FindServersOnNetworkParameters()
319
        return await self.uaclient.find_servers_on_network(params)
320
321
    async def create_session(self):
322
        """
323
        send a CreateSessionRequest to server with reasonable parameters.
324
        If you want o modify settings look at code of this methods
325
        and make your own
326
        """
327
        desc = ua.ApplicationDescription()
328
        desc.ApplicationUri = self.application_uri
329
        desc.ProductUri = self.product_uri
330
        desc.ApplicationName = ua.LocalizedText(self.name)
331
        desc.ApplicationType = ua.ApplicationType.Client
332
        params = ua.CreateSessionParameters()
333
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
334
        nonce = create_nonce(32)
335
        params.ClientNonce = nonce
336
        params.ClientCertificate = self.security_policy.client_certificate
337
        params.ClientDescription = desc
338
        params.EndpointUrl = self.server_url.geturl()
339
        params.SessionName = f"{self.description} Session{self._session_counter}"
340
        # Requested maximum number of milliseconds that a Session should remain open without activity
341
        params.RequestedSessionTimeout = self.session_timeout
342
        params.MaxResponseMessageSize = 0  # means no max size
343
        response = await self.uaclient.create_session(params)
344
        if self.security_policy.client_certificate is None:
345
            data = nonce
346
        else:
347
            data = self.security_policy.client_certificate + nonce
348
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
349
        self._server_nonce = response.ServerNonce
350
        if not self.security_policy.server_certificate:
351
            self.security_policy.server_certificate = response.ServerCertificate
352
        elif self.security_policy.server_certificate != response.ServerCertificate:
353
            raise ua.UaError("Server certificate mismatch")
354
        # remember PolicyId's: we will use them in activate_session()
355
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
356
        self._policy_ids = ep.UserIdentityTokens
357
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
358
        if self.session_timeout != response.RevisedSessionTimeout:
359
            _logger.warning("Requested session timeout to be %dms, got %dms instead", self.secure_channel_timeout,
360
                            response.RevisedSessionTimeout)
361
            self.session_timeout = response.RevisedSessionTimeout
362
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
363
        return response
364
365
    async def _renew_channel_loop(self):
366
        """
367
        Renew the SecureChannel before the SessionTimeout will happen.
368
        In theory we could do that only if no session activity
369
        but it does not cost much..
370
        """
371
        try:
372
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 / 1000
373
            while True:
374
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
375
                await asyncio.sleep(duration)
376
                _logger.debug("renewing channel")
377
                await self.open_secure_channel(renew=True)
378
                val = await self.nodes.server_state.read_value()
379
                _logger.debug("server state is: %s ", val)
380
        except asyncio.CancelledError:
381
            pass
382
        except:
383
            _logger.exception("Error while renewing session")
384
            raise
385
386
    def server_policy_id(self, token_type, default):
387
        """
388
        Find PolicyId of server's UserTokenPolicy by token_type.
389
        Return default if there's no matching UserTokenPolicy.
390
        """
391
        for policy in self._policy_ids:
392
            if policy.TokenType == token_type:
393
                return policy.PolicyId
394
        return default
395
396
    def server_policy_uri(self, token_type):
397
        """
398
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
399
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
400
        of the endpoint
401
        """
402
        for policy in self._policy_ids:
403
            if policy.TokenType == token_type:
404
                if policy.SecurityPolicyUri:
405
                    return policy.SecurityPolicyUri
406
                # empty URI means "use this endpoint's policy URI"
407
                return self.security_policy.URI
408
        return self.security_policy.URI
409
410
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
411
        """
412
        Activate session using either username and password or private_key
413
        """
414
        params = ua.ActivateSessionParameters()
415
        challenge = b""
416
        if self.security_policy.server_certificate is not None:
417
            challenge += self.security_policy.server_certificate
418
        if self._server_nonce is not None:
419
            challenge += self._server_nonce
420
        if self.security_policy.AsymmetricSignatureURI:
421
            params.ClientSignature.Algorithm = self.security_policy.AsymmetricSignatureURI
422
        else:
423
            params.ClientSignature.Algorithm = (
424
                security_policies.SecurityPolicyBasic256.AsymmetricSignatureURI
425
            )
426
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
427
        params.LocaleIds.append("en")
428
        if not username and not certificate:
429
            self._add_anonymous_auth(params)
430
        elif certificate:
431
            self._add_certificate_auth(params, certificate, challenge)
432
        else:
433
            self._add_user_auth(params, username, password)
434
        return await self.uaclient.activate_session(params)
435
436
    def _add_anonymous_auth(self, params):
437
        params.UserIdentityToken = ua.AnonymousIdentityToken()
438
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
439
440
    def _add_certificate_auth(self, params, certificate, challenge):
441
        params.UserIdentityToken = ua.X509IdentityToken()
442
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
443
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
444
        # specs part 4, 5.6.3.1: the data to sign is created by appending
445
        # the last serverNonce to the serverCertificate
446
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
447
        params.UserTokenSignature = ua.SignatureData()
448
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
449
        params.UserTokenSignature.Signature = sig
450
451
    def _add_user_auth(self, params, username: str, password: str):
452
        params.UserIdentityToken = ua.UserNameIdentityToken()
453
        params.UserIdentityToken.UserName = username
454
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
455
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
456
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
457
            # then the password only contains UTF-8 encoded password
458
            # and EncryptionAlgorithm is null
459
            if self._password:
460
                _logger.warning("Sending plain-text password")
461
                params.UserIdentityToken.Password = password.encode("utf8")
462
            params.UserIdentityToken.EncryptionAlgorithm = None
463
        elif self._password:
464
            data, uri = self._encrypt_password(password, policy_uri)
465
            params.UserIdentityToken.Password = data
466
            params.UserIdentityToken.EncryptionAlgorithm = uri
467
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
468
469
    def _encrypt_password(self, password: str, policy_uri):
470
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
471
        # see specs part 4, 7.36.3: if the token is encrypted, password
472
        # shall be converted to UTF-8 and serialized with server nonce
473
        passwd = password.encode("utf8")
474
        if self._server_nonce is not None:
475
            passwd += self._server_nonce
476
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
477
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
478
        return data, uri
479
480
    async def close_session(self) -> Coroutine:
481
        """
482
        Close session
483
        """
484
        self._renew_channel_task.cancel()
485
        await self._renew_channel_task
486
        return await self.uaclient.close_session(True)
487
488
    def get_root_node(self):
489
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
490
491
    def get_objects_node(self):
492
        _logger.info("get_objects_node")
493
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
494
495
    def get_server_node(self):
496
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
497
498
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
499
        """
500
        Get node using NodeId object or a string representing a NodeId.
501
        """
502
        return Node(self.uaclient, nodeid)
503
504
    async def create_subscription(self, period, handler):
505
        """
506
        Create a subscription.
507
        Returns a Subscription object which allows to subscribe to events or data changes on server.
508
509
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
510
            The second option should be used, if the asyncua-server has problems with the default options.
511
512
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
513
            base class for details). Remember not to block the main event loop inside the handler methods.
514
        """
515
        if isinstance(period, ua.CreateSubscriptionParameters):
516
            params = period
517
        else:
518
            params = ua.CreateSubscriptionParameters()
519
            params.RequestedPublishingInterval = period
520
            params.RequestedLifetimeCount = 10000
521
            params.RequestedMaxKeepAliveCount = 3000
522
            params.MaxNotificationsPerPublish = 10000
523
            params.PublishingEnabled = True
524
            params.Priority = 0
525
        subscription = Subscription(self.uaclient, params, handler)
526
        await subscription.init()
527
        return subscription
528
529
    def get_namespace_array(self) -> Coroutine:
530
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
531
        return ns_node.read_value()
532
533
    async def get_namespace_index(self, uri):
534
        uries = await self.get_namespace_array()
535
        _logger.info("get_namespace_index %s %r", type(uries), uries)
536
        return uries.index(uri)
537
538
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
539
        return delete_nodes(self.uaclient, nodes, recursive)
540
541
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
542
        """
543
        Import nodes defined in xml
544
        """
545
        importer = XmlImporter(self)
546
        return importer.import_xml(path, xmlstring)
547
548
    async def export_xml(self, nodes, path):
549
        """
550
        Export defined nodes to xml
551
        """
552
        exp = XmlExporter(self)
553
        await exp.build_etree(nodes)
554
        await exp.write_xml(path)
555
556 View Code Duplication
    async def register_namespace(self, uri):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
        """
558
        Register a new namespace. Nodes should in custom namespace, not 0.
559
        This method is mainly implemented for symetry with server
560
        """
561
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
562
        uries = await ns_node.read_value()
563
        if uri in uries:
564
            return uries.index(uri)
565
        uries.append(uri)
566
        await ns_node.write_value(uries)
567
        return len(uries) - 1
568
569
    def load_type_definitions(self, nodes=None) -> Coroutine:
570
        """
571
        Load custom types (custom structures/extension objects) definition from server
572
        Generate Python classes for custom structures/extension objects defined in server
573
        These classes will available in ua module
574
        """
575
        return load_type_definitions(self, nodes)
576
577
    def load_enums(self) -> Coroutine:
578
        """
579
        generate Python enums for custom enums on server.
580
        This enums will be available in ua module
581
        """
582
        return load_enums(self)
583
584
    async def register_nodes(self, nodes):
585
        """
586
        Register nodes for faster read and write access (if supported by server)
587
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
588
        available as node.basenodeid
589
        """
590
        nodeids = [node.nodeid for node in nodes]
591
        nodeids = await self.uaclient.register_nodes(nodeids)
592
        for node, nodeid in zip(nodes, nodeids):
593
            node.basenodeid = node.nodeid
594
            node.nodeid = nodeid
595
        return nodes
596
597
    async def unregister_nodes(self, nodes):
598
        """
599
        Unregister nodes
600
        """
601
        nodeids = [node.nodeid for node in nodes]
602
        await self.uaclient.unregister_nodes(nodeids)
603
        for node in nodes:
604
            if not node.basenodeid:
605
                continue
606
            node.nodeid = node.basenodeid
607
            node.basenodeid = None
608
609
    async def read_values(self, nodes):
610
        """
611
        Read the value of multiple nodes in one ua call.
612
        """
613
        nodeids = [node.nodeid for node in nodes]
614
        results = await self.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
615
        return [result.Value.Value for result in results]
616
617
    async def write_values(self, nodes, values):
618
        """
619
        Write values to multiple nodes in one ua call
620
        """
621
        nodeids = [node.nodeid for node in nodes]
622
        dvs = [value_to_datavalue(val) for val in values]
623
        results = await self.uaclient.set_attributes(nodeids, dvs, ua.AttributeIds.Value)
624
        for result in results:
625
            result.check()
626
627
    async def is_connected(self):
628
        """
629
        Get OPC-UA client connection status.
630
        """
631
        return self.uaclient.protocol.state == UASocketProtocol.OPEN
632
633
    get_values = read_values  # legacy compatibility
634
    set_values = write_values  # legacy compatibility
635