Completed
Pull Request — master (#91)
by Olivier
41:49 queued 36:14
created

asyncua.client.client.Client.__aexit__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 4
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from typing import Union, Coroutine
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient
8
from ..common.xmlimporter import XmlImporter
9
from ..common.xmlexporter import XmlExporter
10
from ..common.node import Node
11
from ..common.manage_nodes import delete_nodes
12
from ..common.subscription import Subscription
13
from ..common.shortcuts import Shortcuts
14
from ..common.structures import load_type_definitions, load_enums
15
from ..common.utils import create_nonce
16
from ..common.ua_utils import value_to_datavalue
17
from ..crypto import uacrypto, security_policies
18
19
_logger = logging.getLogger(__name__)
20
21
22
class Client:
23
    """
24
    High level client to connect to an OPC-UA server.
25
26
    This class makes it easy to connect and browse address space.
27
    It attempts to expose as much functionality as possible
28
    but if you want more flexibility it is possible and advised to
29
    use UaClient object, available as self.uaclient
30
    which offers the raw OPC-UA services interface.
31
    """
32
    def __init__(self, url: str, timeout: int = 4, loop=None):
33
        """
34
        :param url: url of the server.
35
            if you are unsure of url, write at least hostname
36
            and port and call get_endpoints
37
38
        :param timeout:
39
            Each request sent to the server expects an answer within this
40
            time. The timeout is specified in seconds.
41
        """
42
        _logger = logging.getLogger(__name__)
43
        _logger.debug("Client start instantiation")
44
        self.loop = loop or asyncio.get_event_loop()
45
        self.server_url = urlparse(url)
46
        # take initial username and password from the url
47
        self._username = self.server_url.username
48
        self._password = self.server_url.password
49
        self.name = "Pure Python Async. Client"
50
        self.description = self.name
51
        self.application_uri = "urn:freeopcua:client"
52
        self.product_uri = "urn:freeopcua.github.io:client"
53
        self.security_policy = ua.SecurityPolicy()
54
        self.secure_channel_id = None
55
        self.secure_channel_timeout = 3600000  # 1 hour
56
        self.session_timeout = 3600000  # 1 hour
57
        self._policy_ids = []
58
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
59
        self.user_certificate = None
60
        self.user_private_key = None
61
        self._server_nonce = None
62
        self._session_counter = 1
63
        self.nodes = Shortcuts(self.uaclient)
64
        self.max_messagesize = 0  # No limits
65
        self.max_chunkcount = 0  # No limits
66
        self._renew_channel_task = None
67
        _logger.debug("Client instantiated %s", self)
68
69
    async def __aenter__(self):
70
        await self.connect()
71
        return self
72
73
    async def __aexit__(self, exc_type, exc_value, traceback):
74
        await self.disconnect()
75
76
    def __str__(self):
77
        return f"Client({self.server_url.geturl()})"
78
    __repr__ = __str__
79
80
    @staticmethod
81
    def find_endpoint(endpoints, security_mode, policy_uri):
82
        """
83
        Find endpoint with required security mode and policy URI
84
        """
85
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
86
        for ep in endpoints:
87
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and ep.SecurityPolicyUri == policy_uri):
88
                return ep
89
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
90
91
    def set_user(self, username: str):
92
        """
93
        Set user name for the connection.
94
        initial user from the URL will be overwritten
95
        """
96
        self._username = username
97
98
    def set_password(self, pwd: str):
99
        """
100
        Set user password for the connection.
101
        initial password from the URL will be overwritten
102
        """
103
        if not isinstance(pwd, str):
104
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
105
        self._password = pwd
106
107
    async def set_security_string(self, string: str):
108
        """
109
        Set SecureConnection mode. String format:
110
        Policy,Mode,certificate,private_key[,server_private_key]
111
        where Policy is Basic128Rsa15, Basic256 or Basic256Sha256,
112
            Mode is Sign or SignAndEncrypt
113
            certificate, private_key and server_private_key are
114
                paths to .pem or .der files
115
        Call this before connect()
116
        """
117
        if not string:
118
            return
119
        parts = string.split(",")
120
        if len(parts) < 4:
121
            raise ua.UaError("Wrong format: `{}`, expected at least 4 comma-separated values".format(string))
122
        policy_class = getattr(security_policies, "SecurityPolicy{}".format(parts[0]))
123
        mode = getattr(ua.MessageSecurityMode, parts[1])
124
        return await self.set_security(policy_class, parts[2], parts[3], parts[4] if len(parts) >= 5 else None, mode)
125
126
    async def set_security(self,
127
                           policy,
128
                           certificate_path: str,
129
                           private_key_path: str,
130
                           server_certificate_path: str = None,
131
                           mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
132
        """
133
        Set SecureConnection mode.
134
        Call this before connect()
135
        """
136
        if server_certificate_path is None:
137
            # load certificate from server's list of endpoints
138
            endpoints = await self.connect_and_get_server_endpoints()
139
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
140
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
141
        else:
142
            server_cert = await uacrypto.load_certificate(server_certificate_path)
143
        cert = await uacrypto.load_certificate(certificate_path)
144
        pk = await uacrypto.load_private_key(private_key_path)
145
        self.security_policy = policy(server_cert, cert, pk, mode)
146
        self.uaclient.set_security(self.security_policy)
147
148
    async def load_client_certificate(self, path: str):
149
        """
150
        load our certificate from file, either pem or der
151
        """
152
        self.user_certificate = await uacrypto.load_certificate(path)
153
154
    async def load_private_key(self, path: str):
155
        """
156
        Load user private key. This is used for authenticating using certificate
157
        """
158
        self.user_private_key = await uacrypto.load_private_key(path)
159
160
    async def connect_and_get_server_endpoints(self):
161
        """
162
        Connect, ask server for endpoints, and disconnect
163
        """
164
        await self.connect_socket()
165
        try:
166
            await self.send_hello()
167
            await self.open_secure_channel()
168
            endpoints = await self.get_endpoints()
169
            await self.close_secure_channel()
170
        finally:
171
            self.disconnect_socket()
172
        return endpoints
173
174
    async def connect_and_find_servers(self):
175
        """
176
        Connect, ask server for a list of known servers, and disconnect
177
        """
178
        await self.connect_socket()
179
        try:
180
            await self.send_hello()
181
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
182
            servers = await self.find_servers()
183
            await self.close_secure_channel()
184
        finally:
185
            self.disconnect_socket()
186
        return servers
187
188
    async def connect_and_find_servers_on_network(self):
189
        """
190
        Connect, ask server for a list of known servers on network, and disconnect
191
        """
192
        await self.connect_socket()
193
        try:
194
            await self.send_hello()
195
            await self.open_secure_channel()
196
            servers = await self.find_servers_on_network()
197
            await self.close_secure_channel()
198
        finally:
199
            self.disconnect_socket()
200
        return servers
201
202
    async def connect(self):
203
        """
204
        High level method
205
        Connect, create and activate session
206
        """
207
        _logger.info("connect")
208
        await self.connect_socket()
209
        try:
210
            await self.send_hello()
211
            await self.open_secure_channel()
212
            await self.create_session()
213
        except Exception:
214
            # clean up open socket
215
            self.disconnect_socket()
216
            raise
217
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
218
219
    async def disconnect(self):
220
        """
221
        High level method
222
        Close session, secure channel and socket
223
        """
224
        _logger.info("disconnect")
225
        try:
226
            await self.close_session()
227
            await self.close_secure_channel()
228
        finally:
229
            self.disconnect_socket()
230
231
    async def connect_socket(self):
232
        """
233
        connect to socket defined in url
234
        """
235
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
236
237
    def disconnect_socket(self):
238
        self.uaclient.disconnect_socket()
239
240
    async def send_hello(self):
241
        """
242
        Send OPC-UA hello to server
243
        """
244
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
245
        if isinstance(ack, ua.UaStatusCodeError):
246
            raise ack
247
248
    async def open_secure_channel(self, renew=False):
249
        """
250
        Open secure channel, if renew is True, renew channel
251
        """
252
        params = ua.OpenSecureChannelParameters()
253
        params.ClientProtocolVersion = 0
254
        params.RequestType = ua.SecurityTokenRequestType.Issue
255
        if renew:
256
            params.RequestType = ua.SecurityTokenRequestType.Renew
257
        params.SecurityMode = self.security_policy.Mode
258
        params.RequestedLifetime = self.secure_channel_timeout
259
        # length should be equal to the length of key of symmetric encryption
260
        nonce = create_nonce(self.security_policy.symmetric_key_size)
261
        params.ClientNonce = nonce  # this nonce is used to create a symmetric key
262
        result = await self.uaclient.open_secure_channel(params)
263
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
264
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
265
266
    async def close_secure_channel(self):
267
        return await self.uaclient.close_secure_channel()
268
269
    async def get_endpoints(self) -> list:
270
        params = ua.GetEndpointsParameters()
271
        params.EndpointUrl = self.server_url.geturl()
272
        return await self.uaclient.get_endpoints(params)
273
274
    async def register_server(self, server, discovery_configuration=None):
275
        """
276
        register a server to discovery server
277
        if discovery_configuration is provided, the newer register_server2 service call is used
278
        """
279
        serv = ua.RegisteredServer()
280
        serv.ServerUri = server.get_application_uri()
281
        serv.ProductUri = server.product_uri
282
        serv.DiscoveryUrls = [server.endpoint.geturl()]
283
        serv.ServerType = server.application_type
284
        serv.ServerNames = [ua.LocalizedText(server.name)]
285
        serv.IsOnline = True
286
        if discovery_configuration:
287
            params = ua.RegisterServer2Parameters()
288
            params.Server = serv
289
            params.DiscoveryConfiguration = discovery_configuration
290
            return await self.uaclient.register_server2(params)
291
        return await self.uaclient.register_server(serv)
292
293
    async def find_servers(self, uris=None):
294
        """
295
        send a FindServer request to the server. The answer should be a list of
296
        servers the server knows about
297
        A list of uris can be provided, only server having matching uris will be returned
298
        """
299
        if uris is None:
300
            uris = []
301
        params = ua.FindServersParameters()
302
        params.EndpointUrl = self.server_url.geturl()
303
        params.ServerUris = uris
304
        return await self.uaclient.find_servers(params)
305
306
    async def find_servers_on_network(self):
307
        params = ua.FindServersOnNetworkParameters()
308
        return await self.uaclient.find_servers_on_network(params)
309
310
    async def create_session(self):
311
        """
312
        send a CreateSessionRequest to server with reasonable parameters.
313
        If you want o modify settings look at code of this methods
314
        and make your own
315
        """
316
        desc = ua.ApplicationDescription()
317
        desc.ApplicationUri = self.application_uri
318
        desc.ProductUri = self.product_uri
319
        desc.ApplicationName = ua.LocalizedText(self.name)
320
        desc.ApplicationType = ua.ApplicationType.Client
321
        params = ua.CreateSessionParameters()
322
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
323
        nonce = create_nonce(32)
324
        params.ClientNonce = nonce
325
        params.ClientCertificate = self.security_policy.client_certificate
326
        params.ClientDescription = desc
327
        params.EndpointUrl = self.server_url.geturl()
328
        params.SessionName = f"{self.description} Session{self._session_counter}"
329
        # Requested maximum number of milliseconds that a Session should remain open without activity
330
        params.RequestedSessionTimeout = 60 * 60 * 1000
331
        params.MaxResponseMessageSize = 0  # means no max size
332
        response = await self.uaclient.create_session(params)
333
        if self.security_policy.client_certificate is None:
334
            data = nonce
335
        else:
336
            data = self.security_policy.client_certificate + nonce
337
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
338
        self._server_nonce = response.ServerNonce
339
        if not self.security_policy.server_certificate:
340
            self.security_policy.server_certificate = response.ServerCertificate
341
        elif self.security_policy.server_certificate != response.ServerCertificate:
342
            raise ua.UaError("Server certificate mismatch")
343
        # remember PolicyId's: we will use them in activate_session()
344
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
345
        self._policy_ids = ep.UserIdentityTokens
346
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
347
        self.session_timeout = response.RevisedSessionTimeout
348
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
349
        return response
350
351
    async def _renew_channel_loop(self):
352
        """
353
        Renew the SecureChannel before the SessionTimeout will happen.
354
        In theory we could do that only if no session activity
355
        but it does not cost much..
356
        """
357
        try:
358
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 * 0.001
359
            while True:
360
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
361
                await asyncio.sleep(duration)
362
                _logger.debug("renewing channel")
363
                await self.open_secure_channel(renew=True)
364
                val = await self.nodes.server_state.get_value()
365
                _logger.debug("server state is: %s ", val)
366
        except asyncio.CancelledError:
367
            pass
368
369
    def server_policy_id(self, token_type, default):
370
        """
371
        Find PolicyId of server's UserTokenPolicy by token_type.
372
        Return default if there's no matching UserTokenPolicy.
373
        """
374
        for policy in self._policy_ids:
375
            if policy.TokenType == token_type:
376
                return policy.PolicyId
377
        return default
378
379
    def server_policy_uri(self, token_type):
380
        """
381
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
382
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
383
        of the endpoint
384
        """
385
        for policy in self._policy_ids:
386
            if policy.TokenType == token_type:
387
                if policy.SecurityPolicyUri:
388
                    return policy.SecurityPolicyUri
389
                # empty URI means "use this endpoint's policy URI"
390
                return self.security_policy.URI
391
        return self.security_policy.URI
392
393
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
394
        """
395
        Activate session using either username and password or private_key
396
        """
397
        params = ua.ActivateSessionParameters()
398
        challenge = b""
399
        if self.security_policy.server_certificate is not None:
400
            challenge += self.security_policy.server_certificate
401
        if self._server_nonce is not None:
402
            challenge += self._server_nonce
403
        if self.security_policy.AsymmetricSignatureURI:
404
            params.ClientSignature.Algorithm = self.security_policy.AsymmetricSignatureURI
405
        else:
406
            params.ClientSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
407
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
408
        params.LocaleIds.append("en")
409
        if not username and not certificate:
410
            self._add_anonymous_auth(params)
411
        elif certificate:
412
            self._add_certificate_auth(params, certificate, challenge)
413
        else:
414
            self._add_user_auth(params, username, password)
415
        return await self.uaclient.activate_session(params)
416
417
    def _add_anonymous_auth(self, params):
418
        params.UserIdentityToken = ua.AnonymousIdentityToken()
419
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
420
421
    def _add_certificate_auth(self, params, certificate, challenge):
422
        params.UserIdentityToken = ua.X509IdentityToken()
423
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
424
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
425
        # specs part 4, 5.6.3.1: the data to sign is created by appending
426
        # the last serverNonce to the serverCertificate
427
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
428
        params.UserTokenSignature = ua.SignatureData()
429
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
430
        params.UserTokenSignature.Signature = sig
431
432
    def _add_user_auth(self, params, username: str, password: str):
433
        params.UserIdentityToken = ua.UserNameIdentityToken()
434
        params.UserIdentityToken.UserName = username
435
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
436
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
437
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
438
            # then the password only contains UTF-8 encoded password
439
            # and EncryptionAlgorithm is null
440
            if self._password:
441
                _logger.warning("Sending plain-text password")
442
                params.UserIdentityToken.Password = password.encode("utf8")
443
            params.UserIdentityToken.EncryptionAlgorithm = None
444
        elif self._password:
445
            data, uri = self._encrypt_password(password, policy_uri)
446
            params.UserIdentityToken.Password = data
447
            params.UserIdentityToken.EncryptionAlgorithm = uri
448
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
449
450
    def _encrypt_password(self, password: str, policy_uri):
451
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
452
        # see specs part 4, 7.36.3: if the token is encrypted, password
453
        # shall be converted to UTF-8 and serialized with server nonce
454
        passwd = password.encode("utf8")
455
        if self._server_nonce is not None:
456
            passwd += self._server_nonce
457
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
458
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
459
        return data, uri
460
461
    async def close_session(self) -> Coroutine:
462
        """
463
        Close session
464
        """
465
        self._renew_channel_task.cancel()
466
        await self._renew_channel_task
467
        return await self.uaclient.close_session(True)
468
469
    def get_root_node(self):
470
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
471
472
    def get_objects_node(self):
473
        _logger.info("get_objects_node")
474
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
475
476
    def get_server_node(self):
477
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
478
479
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
480
        """
481
        Get node using NodeId object or a string representing a NodeId.
482
        """
483
        return Node(self.uaclient, nodeid)
484
485
    async def create_subscription(self, period, handler):
486
        """
487
        Create a subscription.
488
        Returns a Subscription object which allows to subscribe to events or data changes on server.
489
490
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
491
        The second option should be used, if the asyncua-server has problems with the default options.
492
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
493
        base class for details). Remember not to block the main event loop inside the handler methods.
494
        """
495
        if isinstance(period, ua.CreateSubscriptionParameters):
496
            params = period
497
        else:
498
            params = ua.CreateSubscriptionParameters()
499
            params.RequestedPublishingInterval = period
500
            params.RequestedLifetimeCount = 10000
501
            params.RequestedMaxKeepAliveCount = 3000
502
            params.MaxNotificationsPerPublish = 10000
503
            params.PublishingEnabled = True
504
            params.Priority = 0
505
        subscription = Subscription(self.uaclient, params, handler)
506
        await subscription.init()
507
        return subscription
508
509
    def get_namespace_array(self) -> Coroutine:
510
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
511
        return ns_node.get_value()
512
513
    async def get_namespace_index(self, uri):
514
        uries = await self.get_namespace_array()
515
        _logger.info("get_namespace_index %s %r", type(uries), uries)
516
        return uries.index(uri)
517
518
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
519
        return delete_nodes(self.uaclient, nodes, recursive)
520
521
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
522
        """
523
        Import nodes defined in xml
524
        """
525
        importer = XmlImporter(self)
526
        return importer.import_xml(path, xmlstring)
527
528
    async def export_xml(self, nodes, path):
529
        """
530
        Export defined nodes to xml
531
        """
532
        exp = XmlExporter(self)
533
        await exp.build_etree(nodes)
534
        await exp.write_xml(path)
535
536 View Code Duplication
    async def register_namespace(self, uri):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
        """
538
        Register a new namespace. Nodes should in custom namespace, not 0.
539
        This method is mainly implemented for symetry with server
540
        """
541
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
542
        uries = await ns_node.get_value()
543
        if uri in uries:
544
            return uries.index(uri)
545
        uries.append(uri)
546
        await ns_node.set_value(uries)
547
        return len(uries) - 1
548
549
    def load_type_definitions(self, nodes=None) -> Coroutine:
550
        """
551
        Load custom types (custom structures/extension objects) definition from server
552
        Generate Python classes for custom structures/extension objects defined in server
553
        These classes will available in ua module
554
        """
555
        return load_type_definitions(self, nodes)
556
557
    def load_enums(self) -> Coroutine:
558
        """
559
        generate Python enums for custom enums on server.
560
        This enums will be available in ua module
561
        """
562
        return load_enums(self)
563
564
    async def register_nodes(self, nodes):
565
        """
566
        Register nodes for faster read and write access (if supported by server)
567
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
568
        available as node.basenodeid
569
        """
570
        nodeids = [node.nodeid for node in nodes]
571
        nodeids = await self.uaclient.register_nodes(nodeids)
572
        for node, nodeid in zip(nodes, nodeids):
573
            node.basenodeid = node.nodeid
574
            node.nodeid = nodeid
575
        return nodes
576
577
    async def unregister_nodes(self, nodes):
578
        """
579
        Unregister nodes
580
        """
581
        nodeids = [node.nodeid for node in nodes]
582
        await self.uaclient.unregister_nodes(nodeids)
583
        for node in nodes:
584
            if not node.basenodeid:
585
                continue
586
            node.nodeid = node.basenodeid
587
            node.basenodeid = None
588
589
    async def get_values(self, nodes):
590
        """
591
        Read the value of multiple nodes in one ua call.
592
        """
593
        nodeids = [node.nodeid for node in nodes]
594
        results = await self.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
595
        return [result.Value.Value for result in results]
596
597
    async def set_values(self, nodes, values):
598
        """
599
        Write values to multiple nodes in one ua call
600
        """
601
        nodeids = [node.nodeid for node in nodes]
602
        dvs = [value_to_datavalue(val) for val in values]
603
        results = await self.uaclient.set_attributes(nodeids, dvs, ua.AttributeIds.Value)
604
        for result in results:
605
            result.check()
606