Passed
Pull Request — master (#120)
by Olivier
02:26
created

asyncua.client.client.Client.write_values()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 3
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
import asyncio
2
import logging
3
from typing import Union, Coroutine
4
from urllib.parse import urlparse
5
6
from asyncua import ua
7
from .ua_client import UaClient
8
from ..common.xmlimporter import XmlImporter
9
from ..common.xmlexporter import XmlExporter
10
from ..common.node import Node
11
from ..common.manage_nodes import delete_nodes
12
from ..common.subscription import Subscription
13
from ..common.shortcuts import Shortcuts
14
from ..common.structures import load_type_definitions, load_enums
15
from ..common.utils import create_nonce
16
from ..common.ua_utils import value_to_datavalue
17
from ..crypto import uacrypto, security_policies
18
19
_logger = logging.getLogger(__name__)
20
21
22
class Client:
23
    """
24
    High level client to connect to an OPC-UA server.
25
26
    This class makes it easy to connect and browse address space.
27
    It attempts to expose as much functionality as possible
28
    but if you want more flexibility it is possible and advised to
29
    use UaClient object, available as self.uaclient
30
    which offers the raw OPC-UA services interface.
31
    """
32
    def __init__(self, url: str, timeout: int = 4, loop=None):
33
        """
34
        :param url: url of the server.
35
            if you are unsure of url, write at least hostname
36
            and port and call get_endpoints
37
38
        :param timeout:
39
            Each request sent to the server expects an answer within this
40
            time. The timeout is specified in seconds.
41
        """
42
        _logger = logging.getLogger(__name__)
43
        self.loop = loop or asyncio.get_event_loop()
44
        self.server_url = urlparse(url)
45
        # take initial username and password from the url
46
        self._username = self.server_url.username
47
        self._password = self.server_url.password
48
        self.name = "Pure Python Async. Client"
49
        self.description = self.name
50
        self.application_uri = "urn:freeopcua:client"
51
        self.product_uri = "urn:freeopcua.github.io:client"
52
        self.security_policy = ua.SecurityPolicy()
53
        self.secure_channel_id = None
54
        self.secure_channel_timeout = 3600000  # 1 hour
55
        self.session_timeout = 3600000  # 1 hour
56
        self._policy_ids = []
57
        self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
58
        self.user_certificate = None
59
        self.user_private_key = None
60
        self._server_nonce = None
61
        self._session_counter = 1
62
        self.nodes = Shortcuts(self.uaclient)
63
        self.max_messagesize = 0  # No limits
64
        self.max_chunkcount = 0  # No limits
65
        self._renew_channel_task = None
66
67
    async def __aenter__(self):
68
        await self.connect()
69
        return self
70
71
    async def __aexit__(self, exc_type, exc_value, traceback):
72
        await self.disconnect()
73
74
    def __str__(self):
75
        return f"Client({self.server_url.geturl()})"
76
    __repr__ = __str__
77
78
    @staticmethod
79
    def find_endpoint(endpoints, security_mode, policy_uri):
80
        """
81
        Find endpoint with required security mode and policy URI
82
        """
83
        _logger.info("find_endpoint %r %r %r", endpoints, security_mode, policy_uri)
84
        for ep in endpoints:
85
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and ep.SecurityMode == security_mode and ep.SecurityPolicyUri == policy_uri):
86
                return ep
87
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
88
89
    def set_user(self, username: str):
90
        """
91
        Set user name for the connection.
92
        initial user from the URL will be overwritten
93
        """
94
        self._username = username
95
96
    def set_password(self, pwd: str):
97
        """
98
        Set user password for the connection.
99
        initial password from the URL will be overwritten
100
        """
101
        if not isinstance(pwd, str):
102
            raise TypeError(f"Password must be a string, got {pwd} of type {type(pwd)}")
103
        self._password = pwd
104
105
    async def set_security_string(self, string: str):
106
        """
107
        Set SecureConnection mode. String format:
108
        Policy,Mode,certificate,private_key[,server_private_key]
109
        where Policy is Basic128Rsa15, Basic256 or Basic256Sha256,
110
            Mode is Sign or SignAndEncrypt
111
            certificate, private_key and server_private_key are
112
                paths to .pem or .der files
113
        Call this before connect()
114
        """
115
        if not string:
116
            return
117
        parts = string.split(",")
118
        if len(parts) < 4:
119
            raise ua.UaError("Wrong format: `{}`, expected at least 4 comma-separated values".format(string))
120
        policy_class = getattr(security_policies, "SecurityPolicy{}".format(parts[0]))
121
        mode = getattr(ua.MessageSecurityMode, parts[1])
122
        return await self.set_security(policy_class, parts[2], parts[3], parts[4] if len(parts) >= 5 else None, mode)
123
124
    async def set_security(self,
125
                           policy,
126
                           certificate_path: str,
127
                           private_key_path: str,
128
                           server_certificate_path: str = None,
129
                           mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
130
        """
131
        Set SecureConnection mode.
132
        Call this before connect()
133
        """
134
        if server_certificate_path is None:
135
            # load certificate from server's list of endpoints
136
            endpoints = await self.connect_and_get_server_endpoints()
137
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
138
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
139
        else:
140
            server_cert = await uacrypto.load_certificate(server_certificate_path)
141
        cert = await uacrypto.load_certificate(certificate_path)
142
        pk = await uacrypto.load_private_key(private_key_path)
143
        self.security_policy = policy(server_cert, cert, pk, mode)
144
        self.uaclient.set_security(self.security_policy)
145
146
    async def load_client_certificate(self, path: str):
147
        """
148
        load our certificate from file, either pem or der
149
        """
150
        self.user_certificate = await uacrypto.load_certificate(path)
151
152
    async def load_private_key(self, path: str):
153
        """
154
        Load user private key. This is used for authenticating using certificate
155
        """
156
        self.user_private_key = await uacrypto.load_private_key(path)
157
158
    async def connect_and_get_server_endpoints(self):
159
        """
160
        Connect, ask server for endpoints, and disconnect
161
        """
162
        await self.connect_socket()
163
        try:
164
            await self.send_hello()
165
            await self.open_secure_channel()
166
            endpoints = await self.get_endpoints()
167
            await self.close_secure_channel()
168
        finally:
169
            self.disconnect_socket()
170
        return endpoints
171
172
    async def connect_and_find_servers(self):
173
        """
174
        Connect, ask server for a list of known servers, and disconnect
175
        """
176
        await self.connect_socket()
177
        try:
178
            await self.send_hello()
179
            await self.open_secure_channel()  # spec says it should not be necessary to open channel
180
            servers = await self.find_servers()
181
            await self.close_secure_channel()
182
        finally:
183
            self.disconnect_socket()
184
        return servers
185
186
    async def connect_and_find_servers_on_network(self):
187
        """
188
        Connect, ask server for a list of known servers on network, and disconnect
189
        """
190
        await self.connect_socket()
191
        try:
192
            await self.send_hello()
193
            await self.open_secure_channel()
194
            servers = await self.find_servers_on_network()
195
            await self.close_secure_channel()
196
        finally:
197
            self.disconnect_socket()
198
        return servers
199
200
    async def connect(self):
201
        """
202
        High level method
203
        Connect, create and activate session
204
        """
205
        _logger.info("connect")
206
        await self.connect_socket()
207
        try:
208
            await self.send_hello()
209
            await self.open_secure_channel()
210
            await self.create_session()
211
        except Exception:
212
            # clean up open socket
213
            self.disconnect_socket()
214
            raise
215
        await self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
216
217
    async def disconnect(self):
218
        """
219
        High level method
220
        Close session, secure channel and socket
221
        """
222
        _logger.info("disconnect")
223
        try:
224
            await self.close_session()
225
            await self.close_secure_channel()
226
        finally:
227
            self.disconnect_socket()
228
229
    async def connect_socket(self):
230
        """
231
        connect to socket defined in url
232
        """
233
        await self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
234
235
    def disconnect_socket(self):
236
        self.uaclient.disconnect_socket()
237
238
    async def send_hello(self):
239
        """
240
        Send OPC-UA hello to server
241
        """
242
        ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
243
        if isinstance(ack, ua.UaStatusCodeError):
244
            raise ack
245
246
    async def open_secure_channel(self, renew=False):
247
        """
248
        Open secure channel, if renew is True, renew channel
249
        """
250
        params = ua.OpenSecureChannelParameters()
251
        params.ClientProtocolVersion = 0
252
        params.RequestType = ua.SecurityTokenRequestType.Issue
253
        if renew:
254
            params.RequestType = ua.SecurityTokenRequestType.Renew
255
        params.SecurityMode = self.security_policy.Mode
256
        params.RequestedLifetime = self.secure_channel_timeout
257
        # length should be equal to the length of key of symmetric encryption
258
        nonce = create_nonce(self.security_policy.symmetric_key_size)
259
        params.ClientNonce = nonce  # this nonce is used to create a symmetric key
260
        result = await self.uaclient.open_secure_channel(params)
261
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
262
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
263
264
    async def close_secure_channel(self):
265
        return await self.uaclient.close_secure_channel()
266
267
    async def get_endpoints(self) -> list:
268
        params = ua.GetEndpointsParameters()
269
        params.EndpointUrl = self.server_url.geturl()
270
        return await self.uaclient.get_endpoints(params)
271
272
    async def register_server(self, server, discovery_configuration=None):
273
        """
274
        register a server to discovery server
275
        if discovery_configuration is provided, the newer register_server2 service call is used
276
        """
277
        serv = ua.RegisteredServer()
278
        serv.ServerUri = server.get_application_uri()
279
        serv.ProductUri = server.product_uri
280
        serv.DiscoveryUrls = [server.endpoint.geturl()]
281
        serv.ServerType = server.application_type
282
        serv.ServerNames = [ua.LocalizedText(server.name)]
283
        serv.IsOnline = True
284
        if discovery_configuration:
285
            params = ua.RegisterServer2Parameters()
286
            params.Server = serv
287
            params.DiscoveryConfiguration = discovery_configuration
288
            return await self.uaclient.register_server2(params)
289
        return await self.uaclient.register_server(serv)
290
291
    async def find_servers(self, uris=None):
292
        """
293
        send a FindServer request to the server. The answer should be a list of
294
        servers the server knows about
295
        A list of uris can be provided, only server having matching uris will be returned
296
        """
297
        if uris is None:
298
            uris = []
299
        params = ua.FindServersParameters()
300
        params.EndpointUrl = self.server_url.geturl()
301
        params.ServerUris = uris
302
        return await self.uaclient.find_servers(params)
303
304
    async def find_servers_on_network(self):
305
        params = ua.FindServersOnNetworkParameters()
306
        return await self.uaclient.find_servers_on_network(params)
307
308
    async def create_session(self):
309
        """
310
        send a CreateSessionRequest to server with reasonable parameters.
311
        If you want o modify settings look at code of this methods
312
        and make your own
313
        """
314
        desc = ua.ApplicationDescription()
315
        desc.ApplicationUri = self.application_uri
316
        desc.ProductUri = self.product_uri
317
        desc.ApplicationName = ua.LocalizedText(self.name)
318
        desc.ApplicationType = ua.ApplicationType.Client
319
        params = ua.CreateSessionParameters()
320
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
321
        nonce = create_nonce(32)
322
        params.ClientNonce = nonce
323
        params.ClientCertificate = self.security_policy.client_certificate
324
        params.ClientDescription = desc
325
        params.EndpointUrl = self.server_url.geturl()
326
        params.SessionName = f"{self.description} Session{self._session_counter}"
327
        # Requested maximum number of milliseconds that a Session should remain open without activity
328
        params.RequestedSessionTimeout = 60 * 60 * 1000
329
        params.MaxResponseMessageSize = 0  # means no max size
330
        response = await self.uaclient.create_session(params)
331
        if self.security_policy.client_certificate is None:
332
            data = nonce
333
        else:
334
            data = self.security_policy.client_certificate + nonce
335
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
336
        self._server_nonce = response.ServerNonce
337
        if not self.security_policy.server_certificate:
338
            self.security_policy.server_certificate = response.ServerCertificate
339
        elif self.security_policy.server_certificate != response.ServerCertificate:
340
            raise ua.UaError("Server certificate mismatch")
341
        # remember PolicyId's: we will use them in activate_session()
342
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
343
        self._policy_ids = ep.UserIdentityTokens
344
        #  Actual maximum number of milliseconds that a Session shall remain open without activity
345
        self.session_timeout = response.RevisedSessionTimeout
346
        self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
347
        return response
348
349
    async def _renew_channel_loop(self):
350
        """
351
        Renew the SecureChannel before the SessionTimeout will happen.
352
        In theory we could do that only if no session activity
353
        but it does not cost much..
354
        """
355
        try:
356
            duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7 * 0.001
357
            while True:
358
                # 0.7 is from spec. 0.001 is because asyncio.sleep expects time in seconds
359
                await asyncio.sleep(duration)
360
                _logger.debug("renewing channel")
361
                await self.open_secure_channel(renew=True)
362
                val = await self.nodes.server_state.read_value()
363
                _logger.debug("server state is: %s ", val)
364
        except asyncio.CancelledError:
365
            pass
366
367
    def server_policy_id(self, token_type, default):
368
        """
369
        Find PolicyId of server's UserTokenPolicy by token_type.
370
        Return default if there's no matching UserTokenPolicy.
371
        """
372
        for policy in self._policy_ids:
373
            if policy.TokenType == token_type:
374
                return policy.PolicyId
375
        return default
376
377
    def server_policy_uri(self, token_type):
378
        """
379
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
380
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
381
        of the endpoint
382
        """
383
        for policy in self._policy_ids:
384
            if policy.TokenType == token_type:
385
                if policy.SecurityPolicyUri:
386
                    return policy.SecurityPolicyUri
387
                # empty URI means "use this endpoint's policy URI"
388
                return self.security_policy.URI
389
        return self.security_policy.URI
390
391
    async def activate_session(self, username: str = None, password: str = None, certificate=None):
392
        """
393
        Activate session using either username and password or private_key
394
        """
395
        params = ua.ActivateSessionParameters()
396
        challenge = b""
397
        if self.security_policy.server_certificate is not None:
398
            challenge += self.security_policy.server_certificate
399
        if self._server_nonce is not None:
400
            challenge += self._server_nonce
401
        if self.security_policy.AsymmetricSignatureURI:
402
            params.ClientSignature.Algorithm = self.security_policy.AsymmetricSignatureURI
403
        else:
404
            params.ClientSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
405
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
406
        params.LocaleIds.append("en")
407
        if not username and not certificate:
408
            self._add_anonymous_auth(params)
409
        elif certificate:
410
            self._add_certificate_auth(params, certificate, challenge)
411
        else:
412
            self._add_user_auth(params, username, password)
413
        return await self.uaclient.activate_session(params)
414
415
    def _add_anonymous_auth(self, params):
416
        params.UserIdentityToken = ua.AnonymousIdentityToken()
417
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
418
419
    def _add_certificate_auth(self, params, certificate, challenge):
420
        params.UserIdentityToken = ua.X509IdentityToken()
421
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
422
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
423
        # specs part 4, 5.6.3.1: the data to sign is created by appending
424
        # the last serverNonce to the serverCertificate
425
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
426
        params.UserTokenSignature = ua.SignatureData()
427
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
428
        params.UserTokenSignature.Signature = sig
429
430
    def _add_user_auth(self, params, username: str, password: str):
431
        params.UserIdentityToken = ua.UserNameIdentityToken()
432
        params.UserIdentityToken.UserName = username
433
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
434
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
435
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
436
            # then the password only contains UTF-8 encoded password
437
            # and EncryptionAlgorithm is null
438
            if self._password:
439
                _logger.warning("Sending plain-text password")
440
                params.UserIdentityToken.Password = password.encode("utf8")
441
            params.UserIdentityToken.EncryptionAlgorithm = None
442
        elif self._password:
443
            data, uri = self._encrypt_password(password, policy_uri)
444
            params.UserIdentityToken.Password = data
445
            params.UserIdentityToken.EncryptionAlgorithm = uri
446
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, "username_basic256")
447
448
    def _encrypt_password(self, password: str, policy_uri):
449
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
450
        # see specs part 4, 7.36.3: if the token is encrypted, password
451
        # shall be converted to UTF-8 and serialized with server nonce
452
        passwd = password.encode("utf8")
453
        if self._server_nonce is not None:
454
            passwd += self._server_nonce
455
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
456
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
457
        return data, uri
458
459
    async def close_session(self) -> Coroutine:
460
        """
461
        Close session
462
        """
463
        self._renew_channel_task.cancel()
464
        await self._renew_channel_task
465
        return await self.uaclient.close_session(True)
466
467
    def get_root_node(self):
468
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
469
470
    def get_objects_node(self):
471
        _logger.info("get_objects_node")
472
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
473
474
    def get_server_node(self):
475
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
476
477
    def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
478
        """
479
        Get node using NodeId object or a string representing a NodeId.
480
        """
481
        return Node(self.uaclient, nodeid)
482
483
    async def create_subscription(self, period, handler):
484
        """
485
        Create a subscription.
486
        Returns a Subscription object which allows to subscribe to events or data changes on server.
487
488
        :param period: Either a publishing interval in milliseconds or a `CreateSubscriptionParameters` instance.
489
        The second option should be used, if the asyncua-server has problems with the default options.
490
        :param handler: Class instance with data_change and/or event methods (see `SubHandler`
491
        base class for details). Remember not to block the main event loop inside the handler methods.
492
        """
493
        if isinstance(period, ua.CreateSubscriptionParameters):
494
            params = period
495
        else:
496
            params = ua.CreateSubscriptionParameters()
497
            params.RequestedPublishingInterval = period
498
            params.RequestedLifetimeCount = 10000
499
            params.RequestedMaxKeepAliveCount = 3000
500
            params.MaxNotificationsPerPublish = 10000
501
            params.PublishingEnabled = True
502
            params.Priority = 0
503
        subscription = Subscription(self.uaclient, params, handler)
504
        await subscription.init()
505
        return subscription
506
507
    def get_namespace_array(self) -> Coroutine:
508
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
509
        return ns_node.read_value()
510
511
    async def get_namespace_index(self, uri):
512
        uries = await self.get_namespace_array()
513
        _logger.info("get_namespace_index %s %r", type(uries), uries)
514
        return uries.index(uri)
515
516
    def delete_nodes(self, nodes, recursive=False) -> Coroutine:
517
        return delete_nodes(self.uaclient, nodes, recursive)
518
519
    def import_xml(self, path=None, xmlstring=None) -> Coroutine:
520
        """
521
        Import nodes defined in xml
522
        """
523
        importer = XmlImporter(self)
524
        return importer.import_xml(path, xmlstring)
525
526
    async def export_xml(self, nodes, path):
527
        """
528
        Export defined nodes to xml
529
        """
530
        exp = XmlExporter(self)
531
        await exp.build_etree(nodes)
532
        await exp.write_xml(path)
533
534 View Code Duplication
    async def register_namespace(self, uri):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
535
        """
536
        Register a new namespace. Nodes should in custom namespace, not 0.
537
        This method is mainly implemented for symetry with server
538
        """
539
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
540
        uries = await ns_node.read_value()
541
        if uri in uries:
542
            return uries.index(uri)
543
        uries.append(uri)
544
        await ns_node.write_value(uries)
545
        return len(uries) - 1
546
547
    def load_type_definitions(self, nodes=None) -> Coroutine:
548
        """
549
        Load custom types (custom structures/extension objects) definition from server
550
        Generate Python classes for custom structures/extension objects defined in server
551
        These classes will available in ua module
552
        """
553
        return load_type_definitions(self, nodes)
554
555
    def load_enums(self) -> Coroutine:
556
        """
557
        generate Python enums for custom enums on server.
558
        This enums will be available in ua module
559
        """
560
        return load_enums(self)
561
562
    async def register_nodes(self, nodes):
563
        """
564
        Register nodes for faster read and write access (if supported by server)
565
        Rmw: This call modifies the nodeid of the nodes, the original nodeid is
566
        available as node.basenodeid
567
        """
568
        nodeids = [node.nodeid for node in nodes]
569
        nodeids = await self.uaclient.register_nodes(nodeids)
570
        for node, nodeid in zip(nodes, nodeids):
571
            node.basenodeid = node.nodeid
572
            node.nodeid = nodeid
573
        return nodes
574
575
    async def unregister_nodes(self, nodes):
576
        """
577
        Unregister nodes
578
        """
579
        nodeids = [node.nodeid for node in nodes]
580
        await self.uaclient.unregister_nodes(nodeids)
581
        for node in nodes:
582
            if not node.basenodeid:
583
                continue
584
            node.nodeid = node.basenodeid
585
            node.basenodeid = None
586
587
    async def read_values(self, nodes):
588
        """
589
        Read the value of multiple nodes in one ua call.
590
        """
591
        nodeids = [node.nodeid for node in nodes]
592
        results = await self.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
593
        return [result.Value.Value for result in results]
594
595
    async def write_values(self, nodes, values):
596
        """
597
        Write values to multiple nodes in one ua call
598
        """
599
        nodeids = [node.nodeid for node in nodes]
600
        dvs = [value_to_datavalue(val) for val in values]
601
        results = await self.uaclient.set_attributes(nodeids, dvs, ua.AttributeIds.Value)
602
        for result in results:
603
            result.check()
604
605
    get_values = read_values  # legacy compatibility
606
    set_values = write_values  # legacy compatibility
607