Passed
Pull Request — master (#76)
by Olivier
02:44
created

asyncua.client.ua_client.UaClient.set_attributes()   A

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 13
nop 4
dl 0
loc 17
rs 9.75
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        """
25
        :param timeout: Timeout in seconds
26
        :param security_policy: Security policy (optional)
27
        :param loop: Event loop (optional)
28
        """
29
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
30
        self.loop = loop or asyncio.get_event_loop()
31
        self.transport = None
32
        self.receive_buffer: Optional[bytes] = None
33
        self.is_receiving = False
34
        self.timeout = timeout
35
        self.authentication_token = ua.NodeId()
36
        self._request_id = 0
37
        self._request_handle = 0
38
        self._callbackmap: Dict[int, asyncio.Future] = {}
39
        self._connection = SecureConnection(security_policy)
40
        self.state = self.INITIALIZED
41
42
    def connection_made(self, transport: asyncio.Transport):
43
        self.state = self.OPEN
44
        self.transport = transport
45
46
    def connection_lost(self, exc):
47
        self.logger.info("Socket has closed connection")
48
        self.state = self.CLOSED
49
        self.transport = None
50
51
    def data_received(self, data: bytes):
52
        if self.receive_buffer:
53
            data = self.receive_buffer + data
54
            self.receive_buffer = None
55
        self._process_received_data(data)
56
57
    def _process_received_data(self, data: bytes):
58
        """
59
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
60
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
61
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
62
        """
63
        buf = ua.utils.Buffer(data)
64
        while True:
65
            try:
66
                try:
67
                    header = header_from_binary(buf)
68
                except ua.utils.NotEnoughData:
69
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
70
                    self.receive_buffer = data
71
                    return
72
                if len(buf) < header.body_size:
73
                    self.logger.debug(
74
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
75
                    )
76
                    self.receive_buffer = data
77
                    return
78
                msg = self._connection.receive_from_header_and_body(header, buf)
79
                self._process_received_message(msg)
80
                if not buf:
81
                    return
82
                # Buffer still has bytes left, try to process again
83
                data = bytes(buf)
84
            except Exception:
85
                self.logger.exception('Exception raised while parsing message from server')
86
                self.disconnect_socket()
87
                return
88
89
    def _process_received_message(self, msg):
90
        if msg is None:
91
            pass
92
        elif isinstance(msg, ua.Message):
93
            self._call_callback(msg.request_id(), msg.body())
94
        elif isinstance(msg, ua.Acknowledge):
95
            self._call_callback(0, msg)
96
        elif isinstance(msg, ua.ErrorMessage):
97
            self.logger.fatal("Received an error: %r", msg)
98
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
99
        else:
100
            raise ua.UaError(f"Unsupported message type: {msg}")
101
102
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
103
        """
104
        Send request to server, lower-level method.
105
        Timeout is the timeout written in ua header.
106
        :param request: Request
107
        :param timeout: Timeout in seconds
108
        :param message_type: UA Message Type (optional)
109
        :return: Future that resolves with the Response
110
        """
111
        request.RequestHeader = self._create_request_header(timeout)
112
        self.logger.debug('Sending: %s', request)
113
        try:
114
            binreq = struct_to_binary(request)
115
        except Exception:
116
            # reset request handle if any error
117
            # see self._create_request_header
118
            self._request_handle -= 1
119
            raise
120
        self._request_id += 1
121
        future = self.loop.create_future()
122
        self._callbackmap[self._request_id] = future
123
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
124
        self.transport.write(msg)
125
        return future
126
127
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
128
        """
129
        Send a request to the server.
130
        Timeout is the timeout written in ua header.
131
        Returns response object if no callback is provided.
132
        """
133
        data = await asyncio.wait_for(
134
            self._send_request(request, timeout, message_type),
135
            timeout if timeout else None
136
        )
137
        self.check_answer(data, f" in response to {request.__class__.__name__}")
138
        return data
139
140
    def check_answer(self, data, context):
141
        data = data.copy()
142
        typeid = nodeid_from_binary(data)
143
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
144
            self.logger.warning("ServiceFault from server received %s", context)
145
            hdr = struct_from_binary(ua.ResponseHeader, data)
146
            hdr.ServiceResult.check()
147
            return False
148
        return True
149
150
    def _call_callback(self, request_id, body):
151
        try:
152
            self._callbackmap[request_id].set_result(body)
153
        except KeyError:
154
            raise ua.UaError(
155
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
156
            )
157
        except asyncio.InvalidStateError:
158
            raise ua.UaError(f"Future for request id {request_id} is already done")
159
        del self._callbackmap[request_id]
160
161
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
162
        """
163
        :param timeout: Timeout in seconds
164
        :return: Request header
165
        """
166
        hdr = ua.RequestHeader()
167
        hdr.AuthenticationToken = self.authentication_token
168
        self._request_handle += 1
169
        hdr.RequestHandle = self._request_handle
170
        hdr.TimeoutHint = timeout * 1000
171
        return hdr
172
173
    def disconnect_socket(self):
174
        self.logger.info("Request to close socket received")
175
        if self.transport:
176
            self.transport.close()
177
        else:
178
            self.logger.warning("disconnect_socket was called but transport is None")
179
180
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
181
        hello = ua.Hello()
182
        hello.EndpointUrl = url
183
        hello.MaxMessageSize = max_messagesize
184
        hello.MaxChunkCount = max_chunkcount
185
        ack = asyncio.Future()
186
        self._callbackmap[0] = ack
187
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
188
        return await asyncio.wait_for(ack, self.timeout)
189
190
    async def open_secure_channel(self, params):
191
        self.logger.info("open_secure_channel")
192
        request = ua.OpenSecureChannelRequest()
193
        request.Parameters = params
194
        result = await asyncio.wait_for(
195
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
196
            self.timeout
197
        )
198
        # FIXME: we have a race condition here
199
        # we can get a packet with the new token id before we reach to store it..
200
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
201
        response.ResponseHeader.ServiceResult.check()
202
        self._connection.set_channel(response.Parameters)
203
        return response.Parameters
204
205
    async def close_secure_channel(self):
206
        """
207
        Close secure channel.
208
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
209
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
210
        and should just close socket.
211
        """
212
        self.logger.info("close_secure_channel")
213
        request = ua.CloseSecureChannelRequest()
214
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
215
        # don't expect any more answers
216
        future.cancel()
217
        self._callbackmap.clear()
218
        # some servers send a response here, most do not ... so we ignore
219
220
221
class UaClient:
222
    """
223
    low level OPC-UA client.
224
225
    It implements (almost) all methods defined in asyncua spec
226
    taking in argument the structures defined in asyncua spec.
227
228
    In this Python implementation  most of the structures are defined in
229
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
230
    """
231
232
    def __init__(self, timeout=1, loop=None):
233
        """
234
        :param timeout: Timout in seconds
235
        :param loop: Event loop (optional)
236
        """
237
        self.logger = logging.getLogger(f'{__name__}.UaClient')
238
        self.loop = loop or asyncio.get_event_loop()
239
        self._subscription_callbacks = {}
240
        self._timeout = timeout
241
        self.security_policy = ua.SecurityPolicy()
242
        self.protocol: Optional[UASocketProtocol] = None
243
        self._publish_task = None
244
245
    def set_security(self, policy: ua.SecurityPolicy):
246
        self.security_policy = policy
247
248
    def _make_protocol(self):
249
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
250
        return self.protocol
251
252
    async def connect_socket(self, host: str, port: int):
253
        """Connect to server socket."""
254
        self.logger.info("opening connection")
255
        await self.loop.create_connection(self._make_protocol, host, port)
256
257
    def disconnect_socket(self):
258
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
259
            self.logger.warning("disconnect_socket was called but connection is closed")
260
            return None
261
        return self.protocol.disconnect_socket()
262
263
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
264
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
265
266
    async def open_secure_channel(self, params):
267
        return await self.protocol.open_secure_channel(params)
268
269
    async def close_secure_channel(self):
270
        """
271
        close secure channel. It seems to trigger a shutdown of socket
272
        in most servers, so be prepare to reconnect
273
        """
274
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
275
            self.logger.warning("close_secure_channel was called but connection is closed")
276
            return
277
        return await self.protocol.close_secure_channel()
278
279
    async def create_session(self, parameters):
280
        self.logger.info("create_session")
281
        request = ua.CreateSessionRequest()
282
        request.Parameters = parameters
283
        data = await self.protocol.send_request(request)
284
        response = struct_from_binary(ua.CreateSessionResponse, data)
285
        self.logger.debug(response)
286
        response.ResponseHeader.ServiceResult.check()
287
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
288
        return response.Parameters
289
290
    async def activate_session(self, parameters):
291
        self.logger.info("activate_session")
292
        request = ua.ActivateSessionRequest()
293
        request.Parameters = parameters
294
        data = await self.protocol.send_request(request)
295
        response = struct_from_binary(ua.ActivateSessionResponse, data)
296
        self.logger.debug(response)
297
        response.ResponseHeader.ServiceResult.check()
298
        return response.Parameters
299
300
    async def close_session(self, delete_subscriptions):
301
        self.logger.info("close_session")
302
        if self._publish_task and not self._publish_task.done():
303
            self._publish_task.cancel()
304
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
305
            self.logger.warning("close_session was called but connection is closed")
306
            return
307
        request = ua.CloseSessionRequest()
308
        request.DeleteSubscriptions = delete_subscriptions
309
        data = await self.protocol.send_request(request)
310
        response = struct_from_binary(ua.CloseSessionResponse, data)
311
        try:
312
            response.ResponseHeader.ServiceResult.check()
313
        except BadSessionClosed:
314
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
315
            #          we can just ignore it therefore.
316
            #          Alternatively we could make sure that there are no publish requests in flight when
317
            #          closing the session.
318
            pass
319
320
    async def browse(self, parameters):
321
        self.logger.info("browse")
322
        request = ua.BrowseRequest()
323
        request.Parameters = parameters
324
        data = await self.protocol.send_request(request)
325
        response = struct_from_binary(ua.BrowseResponse, data)
326
        self.logger.debug(response)
327
        response.ResponseHeader.ServiceResult.check()
328
        return response.Results
329
330
    async def browse_next(self, parameters):
331
        self.logger.debug("browse next")
332
        request = ua.BrowseNextRequest()
333
        request.Parameters = parameters
334
        data = await self.protocol.send_request(request)
335
        response = struct_from_binary(ua.BrowseNextResponse, data)
336
        self.logger.debug(response)
337
        response.ResponseHeader.ServiceResult.check()
338
        return response.Parameters.Results
339
340
    async def read(self, parameters):
341
        self.logger.debug("read")
342
        request = ua.ReadRequest()
343
        request.Parameters = parameters
344
        data = await self.protocol.send_request(request)
345
        response = struct_from_binary(ua.ReadResponse, data)
346
        self.logger.debug(response)
347
        response.ResponseHeader.ServiceResult.check()
348
        # cast to Enum attributes that need to
349
        for idx, rv in enumerate(parameters.NodesToRead):
350
            if rv.AttributeId == ua.AttributeIds.NodeClass:
351
                dv = response.Results[idx]
352
                if dv.StatusCode.is_good():
353
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
354
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
355
                dv = response.Results[idx]
356
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
357
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
358
        return response.Results
359
360
    async def write(self, params):
361
        self.logger.debug("write")
362
        request = ua.WriteRequest()
363
        request.Parameters = params
364
        data = await self.protocol.send_request(request)
365
        response = struct_from_binary(ua.WriteResponse, data)
366
        self.logger.debug(response)
367
        response.ResponseHeader.ServiceResult.check()
368
        return response.Results
369
370
    async def get_endpoints(self, params):
371
        self.logger.debug("get_endpoint")
372
        request = ua.GetEndpointsRequest()
373
        request.Parameters = params
374
        data = await self.protocol.send_request(request)
375
        response = struct_from_binary(ua.GetEndpointsResponse, data)
376
        self.logger.debug(response)
377
        response.ResponseHeader.ServiceResult.check()
378
        return response.Endpoints
379
380
    async def find_servers(self, params):
381
        self.logger.debug("find_servers")
382
        request = ua.FindServersRequest()
383
        request.Parameters = params
384
        data = await self.protocol.send_request(request)
385
        response = struct_from_binary(ua.FindServersResponse, data)
386
        self.logger.debug(response)
387
        response.ResponseHeader.ServiceResult.check()
388
        return response.Servers
389
390
    async def find_servers_on_network(self, params):
391
        self.logger.debug("find_servers_on_network")
392
        request = ua.FindServersOnNetworkRequest()
393
        request.Parameters = params
394
        data = await self.protocol.send_request(request)
395
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
396
        self.logger.debug(response)
397
        response.ResponseHeader.ServiceResult.check()
398
        return response.Parameters
399
400
    async def register_server(self, registered_server):
401
        self.logger.debug("register_server")
402
        request = ua.RegisterServerRequest()
403
        request.Server = registered_server
404
        data = await self.protocol.send_request(request)
405
        response = struct_from_binary(ua.RegisterServerResponse, data)
406
        self.logger.debug(response)
407
        response.ResponseHeader.ServiceResult.check()
408
        # nothing to return for this service
409
410
    async def register_server2(self, params):
411
        self.logger.debug("register_server2")
412
        request = ua.RegisterServer2Request()
413
        request.Parameters = params
414
        data = await self.protocol.send_request(request)
415
        response = struct_from_binary(ua.RegisterServer2Response, data)
416
        self.logger.debug(response)
417
        response.ResponseHeader.ServiceResult.check()
418
        return response.ConfigurationResults
419
420
    async def translate_browsepaths_to_nodeids(self, browse_paths):
421
        self.logger.debug("translate_browsepath_to_nodeid")
422
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
423
        request.Parameters.BrowsePaths = browse_paths
424
        data = await self.protocol.send_request(request)
425
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
426
        self.logger.debug(response)
427
        response.ResponseHeader.ServiceResult.check()
428
        return response.Results
429
430
    async def create_subscription(self, params, callback):
431
        self.logger.debug("create_subscription")
432
        request = ua.CreateSubscriptionRequest()
433
        request.Parameters = params
434
        data = await self.protocol.send_request(request)
435
        response = struct_from_binary(
436
            ua.CreateSubscriptionResponse,
437
            data
438
        )
439
        response.ResponseHeader.ServiceResult.check()
440
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
441
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
442
        if not self._publish_task or self._publish_task.done():
443
            # Start the publish loop if it is not yet running
444
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
445
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
446
            # could be used if necessary.
447
            self._publish_task = self.loop.create_task(self._publish_loop())
448
        return response.Parameters
449
450
    async def delete_subscriptions(self, subscription_ids):
451
        self.logger.debug("delete_subscriptions %r", subscription_ids)
452
        request = ua.DeleteSubscriptionsRequest()
453
        request.Parameters.SubscriptionIds = subscription_ids
454
        data = await self.protocol.send_request(request)
455
        response = struct_from_binary(
456
            ua.DeleteSubscriptionsResponse,
457
            data
458
        )
459
        response.ResponseHeader.ServiceResult.check()
460
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
461
        for sid in subscription_ids:
462
            self._subscription_callbacks.pop(sid)
463
        return response.Results
464
465
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
466
        """
467
        Send a PublishRequest to the server.
468
        """
469
        self.logger.debug('publish %r', acks)
470
        request = ua.PublishRequest()
471
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
472
        data = await self.protocol.send_request(request, timeout=0)
473
        self.protocol.check_answer(data, "while waiting for publish response")
474
        try:
475
            response = struct_from_binary(ua.PublishResponse, data)
476
        except Exception:
477
            self.logger.exception("Error parsing notification from server")
478
            raise UaStructParsingError
479
        return response
480
481
    async def _publish_loop(self):
482
        """
483
        Start a loop that sends a publish requests and waits for the publish responses.
484
        Forward the `PublishResult` to the matching `Subscription` by callback.
485
        """
486
        ack = None
487
        while True:
488
            try:
489
                response = await self.publish([ack] if ack else [])
490
            except BadTimeout:  # See Spec. Part 4, 7.28
491
                # Repeat without acknowledgement
492
                ack = None
493
                continue
494
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
495
                # BadNoSubscription is expected to be received after deleting the last subscription.
496
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
497
                # checking if there are no more subscriptions registered in this client (). A Publish response
498
                # could still arrive before the DeleteSubscription response.
499
                #
500
                # We could remove the callback already when sending the DeleteSubscription request,
501
                # but there are some legitimate reasons to keep them around, such as when the server
502
                # responds with "BadTimeout" and we should try again later instead of just removing
503
                # the subscription client-side.
504
                #
505
                # There are a variety of ways to act correctly, but the most practical solution seems
506
                # to be to just silently ignore any BadNoSubscription responses.
507
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
508
                # End task
509
                return
510
            except UaStructParsingError:
511
                ack = None
512
                continue
513
            subscription_id = response.Parameters.SubscriptionId
514
            if not subscription_id:
515
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
516
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
517
                # End task
518
                return
519
            try:
520
                callback = self._subscription_callbacks[subscription_id]
521
            except KeyError:
522
                self.logger.warning(
523
                    "Received data for unknown subscription %s active are %s", subscription_id,
524
                    self._subscription_callbacks.keys()
525
                )
526
            else:
527
                try:
528
                    callback(response.Parameters)
529
                except Exception:  # we call client code, catch everything!
530
                    self.logger.exception("Exception while calling user callback: %s")
531
            # Repeat with acknowledgement
532
            ack = ua.SubscriptionAcknowledgement()
533
            ack.SubscriptionId = subscription_id
534
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
535
536
    async def create_monitored_items(self, params):
537
        self.logger.info("create_monitored_items")
538
        request = ua.CreateMonitoredItemsRequest()
539
        request.Parameters = params
540
        data = await self.protocol.send_request(request)
541
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
542
        self.logger.debug(response)
543
        response.ResponseHeader.ServiceResult.check()
544
        return response.Results
545
546
    async def delete_monitored_items(self, params):
547
        self.logger.info("delete_monitored_items")
548
        request = ua.DeleteMonitoredItemsRequest()
549
        request.Parameters = params
550
        data = await self.protocol.send_request(request)
551
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
552
        self.logger.debug(response)
553
        response.ResponseHeader.ServiceResult.check()
554
        return response.Results
555
556
    async def add_nodes(self, nodestoadd):
557
        self.logger.info("add_nodes")
558
        request = ua.AddNodesRequest()
559
        request.Parameters.NodesToAdd = nodestoadd
560
        data = await self.protocol.send_request(request)
561
        response = struct_from_binary(ua.AddNodesResponse, data)
562
        self.logger.debug(response)
563
        response.ResponseHeader.ServiceResult.check()
564
        return response.Results
565
566
    async def add_references(self, refs):
567
        self.logger.info("add_references")
568
        request = ua.AddReferencesRequest()
569
        request.Parameters.ReferencesToAdd = refs
570
        data = await self.protocol.send_request(request)
571
        response = struct_from_binary(ua.AddReferencesResponse, data)
572
        self.logger.debug(response)
573
        response.ResponseHeader.ServiceResult.check()
574
        return response.Results
575
576
    async def delete_references(self, refs):
577
        self.logger.info("delete")
578
        request = ua.DeleteReferencesRequest()
579
        request.Parameters.ReferencesToDelete = refs
580
        data = await self.protocol.send_request(request)
581
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
582
        self.logger.debug(response)
583
        response.ResponseHeader.ServiceResult.check()
584
        return response.Parameters.Results
585
586
    async def delete_nodes(self, params):
587
        self.logger.info("delete_nodes")
588
        request = ua.DeleteNodesRequest()
589
        request.Parameters = params
590
        data = await self.protocol.send_request(request)
591
        response = struct_from_binary(ua.DeleteNodesResponse, data)
592
        self.logger.debug(response)
593
        response.ResponseHeader.ServiceResult.check()
594
        return response.Results
595
596
    async def call(self, methodstocall):
597
        request = ua.CallRequest()
598
        request.Parameters.MethodsToCall = methodstocall
599
        data = await self.protocol.send_request(request)
600
        response = struct_from_binary(ua.CallResponse, data)
601
        self.logger.debug(response)
602
        response.ResponseHeader.ServiceResult.check()
603
        return response.Results
604
605
    async def history_read(self, params):
606
        self.logger.info("history_read")
607
        request = ua.HistoryReadRequest()
608
        request.Parameters = params
609
        data = await self.protocol.send_request(request)
610
        response = struct_from_binary(ua.HistoryReadResponse, data)
611
        self.logger.debug(response)
612
        response.ResponseHeader.ServiceResult.check()
613
        return response.Results
614
615
    async def modify_monitored_items(self, params):
616
        self.logger.info("modify_monitored_items")
617
        request = ua.ModifyMonitoredItemsRequest()
618
        request.Parameters = params
619
        data = await self.protocol.send_request(request)
620
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
621
        self.logger.debug(response)
622
        response.ResponseHeader.ServiceResult.check()
623
        return response.Results
624
625
    async def register_nodes(self, nodes):
626
        self.logger.info("register_nodes")
627
        request = ua.RegisterNodesRequest()
628
        request.Parameters.NodesToRegister = nodes
629
        data = await self.protocol.send_request(request)
630
        response = struct_from_binary(ua.RegisterNodesResponse, data)
631
        self.logger.debug(response)
632
        response.ResponseHeader.ServiceResult.check()
633
        return response.Parameters.RegisteredNodeIds
634
635
    async def unregister_nodes(self, nodes):
636
        self.logger.info("unregister_nodes")
637
        request = ua.UnregisterNodesRequest()
638
        request.Parameters.NodesToUnregister = nodes
639
        data = await self.protocol.send_request(request)
640
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
641
        self.logger.debug(response)
642
        response.ResponseHeader.ServiceResult.check()
643
        # nothing to return for this service
644
645
    async def get_attributes(self, nodeids, attr):
646
        self.logger.info("get_attributes of several nodes")
647
        request = ua.ReadRequest()
648
        for nodeid in nodeids:
649
            rv = ua.ReadValueId()
650
            rv.NodeId = nodeid
651
            rv.AttributeId = attr
652
            request.Parameters.NodesToRead.append(rv)
653
        data = await self.protocol.send_request(request)
654
        response = struct_from_binary(ua.ReadResponse, data)
655
        response.ResponseHeader.ServiceResult.check()
656
        return response.Results
657
658
    async def set_attributes(self, nodeids, datavalues, attributeid=ua.AttributeIds.Value):
659
        """
660
        Set an attribute of multiple nodes
661
        datavalue is a ua.DataValue object
662
        """
663
        self.logger.info("set_attributes of several nodes")
664
        request = ua.WriteRequest()
665
        for idx, nodeid in enumerate(nodeids):
666
            attr = ua.WriteValue()
667
            attr.NodeId = nodeid
668
            attr.AttributeId = attributeid
669
            attr.Value = datavalues[idx]
670
            request.Parameters.NodesToWrite.append(attr)
671
        data = await self.protocol.send_request(request)
672
        response = struct_from_binary(ua.WriteResponse, data)
673
        response.ResponseHeader.ServiceResult.check()
674
        return response.Results
675
676
677