Completed
Pull Request — master (#76)
by Olivier
02:32
created

asyncua.client.ua_client.UaClient.read()   B

Complexity

Conditions 7

Size

Total Lines 19
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 18
nop 2
dl 0
loc 19
rs 8
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
class UASocketProtocol(asyncio.Protocol):
14
    """
15
    Handle socket connection and send ua messages.
16
    Timeout is the timeout used while waiting for an ua answer from server.
17
    """
18
    INITIALIZED = 'initialized'
19
    OPEN = 'open'
20
    CLOSED = 'closed'
21
22
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
23
        """
24
        :param timeout: Timeout in seconds
25
        :param security_policy: Security policy (optional)
26
        :param loop: Event loop (optional)
27
        """
28
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
29
        self.loop = loop or asyncio.get_event_loop()
30
        self.transport = None
31
        self.receive_buffer: Optional[bytes] = None
32
        self.is_receiving = False
33
        self.timeout = timeout
34
        self.authentication_token = ua.NodeId()
35
        self._request_id = 0
36
        self._request_handle = 0
37
        self._callbackmap: Dict[int, asyncio.Future] = {}
38
        self._connection = SecureConnection(security_policy)
39
        self.state = self.INITIALIZED
40
41
    def connection_made(self, transport: asyncio.Transport):
42
        self.state = self.OPEN
43
        self.transport = transport
44
45
    def connection_lost(self, exc):
46
        self.logger.info("Socket has closed connection")
47
        self.state = self.CLOSED
48
        self.transport = None
49
50
    def data_received(self, data: bytes):
51
        if self.receive_buffer:
52
            data = self.receive_buffer + data
53
            self.receive_buffer = None
54
        self._process_received_data(data)
55
56
    def _process_received_data(self, data: bytes):
57
        """
58
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
59
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
60
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
61
        """
62
        buf = ua.utils.Buffer(data)
63
        while True:
64
            try:
65
                try:
66
                    header = header_from_binary(buf)
67
                except ua.utils.NotEnoughData:
68
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
69
                    self.receive_buffer = data
70
                    return
71
                if len(buf) < header.body_size:
72
                    self.logger.debug(
73
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
74
                    )
75
                    self.receive_buffer = data
76
                    return
77
                msg = self._connection.receive_from_header_and_body(header, buf)
78
                self._process_received_message(msg)
79
                if not buf:
80
                    return
81
                # Buffer still has bytes left, try to process again
82
                data = bytes(buf)
83
            except Exception:
84
                self.logger.exception('Exception raised while parsing message from server')
85
                self.disconnect_socket()
86
                return
87
88
    def _process_received_message(self, msg):
89
        if msg is None:
90
            pass
91
        elif isinstance(msg, ua.Message):
92
            self._call_callback(msg.request_id(), msg.body())
93
        elif isinstance(msg, ua.Acknowledge):
94
            self._call_callback(0, msg)
95
        elif isinstance(msg, ua.ErrorMessage):
96
            self.logger.fatal("Received an error: %r", msg)
97
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
98
        else:
99
            raise ua.UaError(f"Unsupported message type: {msg}")
100
101
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
102
        """
103
        Send request to server, lower-level method.
104
        Timeout is the timeout written in ua header.
105
        :param request: Request
106
        :param timeout: Timeout in seconds
107
        :param message_type: UA Message Type (optional)
108
        :return: Future that resolves with the Response
109
        """
110
        request.RequestHeader = self._create_request_header(timeout)
111
        self.logger.debug('Sending: %s', request)
112
        try:
113
            binreq = struct_to_binary(request)
114
        except Exception:
115
            # reset request handle if any error
116
            # see self._create_request_header
117
            self._request_handle -= 1
118
            raise
119
        self._request_id += 1
120
        future = self.loop.create_future()
121
        self._callbackmap[self._request_id] = future
122
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
123
        self.transport.write(msg)
124
        return future
125
126
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
127
        """
128
        Send a request to the server.
129
        Timeout is the timeout written in ua header.
130
        Returns response object if no callback is provided.
131
        """
132
        data = await asyncio.wait_for(
133
            self._send_request(request, timeout, message_type),
134
            timeout if timeout else None
135
        )
136
        self.check_answer(data, f" in response to {request.__class__.__name__}")
137
        return data
138
139
    def check_answer(self, data, context):
140
        data = data.copy()
141
        typeid = nodeid_from_binary(data)
142
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
143
            self.logger.warning("ServiceFault from server received %s", context)
144
            hdr = struct_from_binary(ua.ResponseHeader, data)
145
            hdr.ServiceResult.check()
146
            return False
147
        return True
148
149
    def _call_callback(self, request_id, body):
150
        try:
151
            self._callbackmap[request_id].set_result(body)
152
        except KeyError:
153
            raise ua.UaError(
154
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
155
            )
156
        except asyncio.InvalidStateError:
157
            raise ua.UaError(f"Future for request id {request_id} is already done")
158
        del self._callbackmap[request_id]
159
160
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
161
        """
162
        :param timeout: Timeout in seconds
163
        :return: Request header
164
        """
165
        hdr = ua.RequestHeader()
166
        hdr.AuthenticationToken = self.authentication_token
167
        self._request_handle += 1
168
        hdr.RequestHandle = self._request_handle
169
        hdr.TimeoutHint = timeout * 1000
170
        return hdr
171
172
    def disconnect_socket(self):
173
        self.logger.info("Request to close socket received")
174
        if self.transport:
175
            self.transport.close()
176
        else:
177
            self.logger.warning("disconnect_socket was called but transport is None")
178
179
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
180
        hello = ua.Hello()
181
        hello.EndpointUrl = url
182
        hello.MaxMessageSize = max_messagesize
183
        hello.MaxChunkCount = max_chunkcount
184
        ack = asyncio.Future()
185
        self._callbackmap[0] = ack
186
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
187
        return await asyncio.wait_for(ack, self.timeout)
188
189
    async def open_secure_channel(self, params):
190
        self.logger.info("open_secure_channel")
191
        request = ua.OpenSecureChannelRequest()
192
        request.Parameters = params
193
        result = await asyncio.wait_for(
194
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
195
            self.timeout
196
        )
197
        # FIXME: we have a race condition here
198
        # we can get a packet with the new token id before we reach to store it..
199
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
200
        response.ResponseHeader.ServiceResult.check()
201
        self._connection.set_channel(response.Parameters)
202
        return response.Parameters
203
204
    async def close_secure_channel(self):
205
        """
206
        Close secure channel.
207
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
208
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
209
        and should just close socket.
210
        """
211
        self.logger.info("close_secure_channel")
212
        request = ua.CloseSecureChannelRequest()
213
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
214
        # don't expect any more answers
215
        future.cancel()
216
        self._callbackmap.clear()
217
        # some servers send a response here, most do not ... so we ignore
218
219
220
class UaClient:
221
    """
222
    low level OPC-UA client.
223
224
    It implements (almost) all methods defined in asyncua spec
225
    taking in argument the structures defined in asyncua spec.
226
227
    In this Python implementation  most of the structures are defined in
228
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
229
    """
230
231
    def __init__(self, timeout=1, loop=None):
232
        """
233
        :param timeout: Timout in seconds
234
        :param loop: Event loop (optional)
235
        """
236
        self.logger = logging.getLogger(f'{__name__}.UaClient')
237
        self.loop = loop or asyncio.get_event_loop()
238
        self._subscription_callbacks = {}
239
        self._timeout = timeout
240
        self.security_policy = ua.SecurityPolicy()
241
        self.protocol: Optional[UASocketProtocol] = None
242
        self._publish_task = None
243
244
    def set_security(self, policy: ua.SecurityPolicy):
245
        self.security_policy = policy
246
247
    def _make_protocol(self):
248
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
249
        return self.protocol
250
251
    async def connect_socket(self, host: str, port: int):
252
        """Connect to server socket."""
253
        self.logger.info("opening connection")
254
        await self.loop.create_connection(self._make_protocol, host, port)
255
256
    def disconnect_socket(self):
257
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
258
            self.logger.warning("disconnect_socket was called but connection is closed")
259
            return None
260
        return self.protocol.disconnect_socket()
261
262
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
263
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
264
265
    async def open_secure_channel(self, params):
266
        return await self.protocol.open_secure_channel(params)
267
268
    async def close_secure_channel(self):
269
        """
270
        close secure channel. It seems to trigger a shutdown of socket
271
        in most servers, so be prepare to reconnect
272
        """
273
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
274
            self.logger.warning("close_secure_channel was called but connection is closed")
275
            return
276
        return await self.protocol.close_secure_channel()
277
278
    async def create_session(self, parameters):
279
        self.logger.info("create_session")
280
        request = ua.CreateSessionRequest()
281
        request.Parameters = parameters
282
        data = await self.protocol.send_request(request)
283
        response = struct_from_binary(ua.CreateSessionResponse, data)
284
        self.logger.debug(response)
285
        response.ResponseHeader.ServiceResult.check()
286
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
287
        return response.Parameters
288
289
    async def activate_session(self, parameters):
290
        self.logger.info("activate_session")
291
        request = ua.ActivateSessionRequest()
292
        request.Parameters = parameters
293
        data = await self.protocol.send_request(request)
294
        response = struct_from_binary(ua.ActivateSessionResponse, data)
295
        self.logger.debug(response)
296
        response.ResponseHeader.ServiceResult.check()
297
        return response.Parameters
298
299
    async def close_session(self, delete_subscriptions):
300
        self.logger.info("close_session")
301
        if self._publish_task and not self._publish_task.done():
302
            self._publish_task.cancel()
303
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
304
            self.logger.warning("close_session was called but connection is closed")
305
            return
306
        request = ua.CloseSessionRequest()
307
        request.DeleteSubscriptions = delete_subscriptions
308
        data = await self.protocol.send_request(request)
309
        response = struct_from_binary(ua.CloseSessionResponse, data)
310
        try:
311
            response.ResponseHeader.ServiceResult.check()
312
        except BadSessionClosed:
313
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
314
            #          we can just ignore it therefore.
315
            #          Alternatively we could make sure that there are no publish requests in flight when
316
            #          closing the session.
317
            pass
318
319
    async def browse(self, parameters):
320
        self.logger.info("browse")
321
        request = ua.BrowseRequest()
322
        request.Parameters = parameters
323
        data = await self.protocol.send_request(request)
324
        response = struct_from_binary(ua.BrowseResponse, data)
325
        self.logger.debug(response)
326
        response.ResponseHeader.ServiceResult.check()
327
        return response.Results
328
329
    async def browse_next(self, parameters):
330
        self.logger.debug("browse next")
331
        request = ua.BrowseNextRequest()
332
        request.Parameters = parameters
333
        data = await self.protocol.send_request(request)
334
        response = struct_from_binary(ua.BrowseNextResponse, data)
335
        self.logger.debug(response)
336
        response.ResponseHeader.ServiceResult.check()
337
        return response.Parameters.Results
338
339
    async def read(self, parameters):
340
        self.logger.debug("read")
341
        request = ua.ReadRequest()
342
        request.Parameters = parameters
343
        data = await self.protocol.send_request(request)
344
        response = struct_from_binary(ua.ReadResponse, data)
345
        self.logger.debug(response)
346
        response.ResponseHeader.ServiceResult.check()
347
        # cast to Enum attributes that need to
348
        for idx, rv in enumerate(parameters.NodesToRead):
349
            if rv.AttributeId == ua.AttributeIds.NodeClass:
350
                dv = response.Results[idx]
351
                if dv.StatusCode.is_good():
352
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
353
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
354
                dv = response.Results[idx]
355
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
356
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
357
        return response.Results
358
359
    async def write(self, params):
360
        self.logger.debug("write")
361
        request = ua.WriteRequest()
362
        request.Parameters = params
363
        data = await self.protocol.send_request(request)
364
        response = struct_from_binary(ua.WriteResponse, data)
365
        self.logger.debug(response)
366
        response.ResponseHeader.ServiceResult.check()
367
        return response.Results
368
369
    async def get_endpoints(self, params):
370
        self.logger.debug("get_endpoint")
371
        request = ua.GetEndpointsRequest()
372
        request.Parameters = params
373
        data = await self.protocol.send_request(request)
374
        response = struct_from_binary(ua.GetEndpointsResponse, data)
375
        self.logger.debug(response)
376
        response.ResponseHeader.ServiceResult.check()
377
        return response.Endpoints
378
379
    async def find_servers(self, params):
380
        self.logger.debug("find_servers")
381
        request = ua.FindServersRequest()
382
        request.Parameters = params
383
        data = await self.protocol.send_request(request)
384
        response = struct_from_binary(ua.FindServersResponse, data)
385
        self.logger.debug(response)
386
        response.ResponseHeader.ServiceResult.check()
387
        return response.Servers
388
389
    async def find_servers_on_network(self, params):
390
        self.logger.debug("find_servers_on_network")
391
        request = ua.FindServersOnNetworkRequest()
392
        request.Parameters = params
393
        data = await self.protocol.send_request(request)
394
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
395
        self.logger.debug(response)
396
        response.ResponseHeader.ServiceResult.check()
397
        return response.Parameters
398
399
    async def register_server(self, registered_server):
400
        self.logger.debug("register_server")
401
        request = ua.RegisterServerRequest()
402
        request.Server = registered_server
403
        data = await self.protocol.send_request(request)
404
        response = struct_from_binary(ua.RegisterServerResponse, data)
405
        self.logger.debug(response)
406
        response.ResponseHeader.ServiceResult.check()
407
        # nothing to return for this service
408
409
    async def register_server2(self, params):
410
        self.logger.debug("register_server2")
411
        request = ua.RegisterServer2Request()
412
        request.Parameters = params
413
        data = await self.protocol.send_request(request)
414
        response = struct_from_binary(ua.RegisterServer2Response, data)
415
        self.logger.debug(response)
416
        response.ResponseHeader.ServiceResult.check()
417
        return response.ConfigurationResults
418
419
    async def translate_browsepaths_to_nodeids(self, browse_paths):
420
        self.logger.debug("translate_browsepath_to_nodeid")
421
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
422
        request.Parameters.BrowsePaths = browse_paths
423
        data = await self.protocol.send_request(request)
424
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
425
        self.logger.debug(response)
426
        response.ResponseHeader.ServiceResult.check()
427
        return response.Results
428
429
    async def create_subscription(self, params, callback):
430
        self.logger.debug("create_subscription")
431
        request = ua.CreateSubscriptionRequest()
432
        request.Parameters = params
433
        data = await self.protocol.send_request(request)
434
        response = struct_from_binary(
435
            ua.CreateSubscriptionResponse,
436
            data
437
        )
438
        response.ResponseHeader.ServiceResult.check()
439
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
440
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
441
        if not self._publish_task or self._publish_task.done():
442
            # Start the publish loop if it is not yet running
443
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
444
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
445
            # could be used if necessary.
446
            self._publish_task = self.loop.create_task(self._publish_loop())
447
        return response.Parameters
448
449
    async def delete_subscriptions(self, subscription_ids):
450
        self.logger.debug("delete_subscriptions %r", subscription_ids)
451
        request = ua.DeleteSubscriptionsRequest()
452
        request.Parameters.SubscriptionIds = subscription_ids
453
        data = await self.protocol.send_request(request)
454
        response = struct_from_binary(
455
            ua.DeleteSubscriptionsResponse,
456
            data
457
        )
458
        response.ResponseHeader.ServiceResult.check()
459
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
460
        for sid in subscription_ids:
461
            self._subscription_callbacks.pop(sid)
462
        return response.Results
463
464
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
465
        """
466
        Send a PublishRequest to the server.
467
        """
468
        self.logger.debug('publish %r', acks)
469
        request = ua.PublishRequest()
470
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
471
        data = await self.protocol.send_request(request, timeout=0)
472
        self.protocol.check_answer(data, "while waiting for publish response")
473
        try:
474
            response = struct_from_binary(ua.PublishResponse, data)
475
        except Exception:
476
            self.logger.exception("Error parsing notification from server")
477
            raise UaStructParsingError
478
        return response
479
480
    async def _publish_loop(self):
481
        """
482
        Start a loop that sends a publish requests and waits for the publish responses.
483
        Forward the `PublishResult` to the matching `Subscription` by callback.
484
        """
485
        ack = None
486
        while True:
487
            try:
488
                response = await self.publish([ack] if ack else [])
489
            except BadTimeout:  # See Spec. Part 4, 7.28
490
                # Repeat without acknowledgement
491
                ack = None
492
                continue
493
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
494
                # BadNoSubscription is expected to be received after deleting the last subscription.
495
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
496
                # checking if there are no more subscriptions registered in this client (). A Publish response
497
                # could still arrive before the DeleteSubscription response.
498
                #
499
                # We could remove the callback already when sending the DeleteSubscription request,
500
                # but there are some legitimate reasons to keep them around, such as when the server
501
                # responds with "BadTimeout" and we should try again later instead of just removing
502
                # the subscription client-side.
503
                #
504
                # There are a variety of ways to act correctly, but the most practical solution seems
505
                # to be to just silently ignore any BadNoSubscription responses.
506
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
507
                # End task
508
                return
509
            except UaStructParsingError:
510
                ack = None
511
                continue
512
            subscription_id = response.Parameters.SubscriptionId
513
            if not subscription_id:
514
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
515
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
516
                # End task
517
                return
518
            try:
519
                callback = self._subscription_callbacks[subscription_id]
520
            except KeyError:
521
                self.logger.warning(
522
                    "Received data for unknown subscription %s active are %s", subscription_id,
523
                    self._subscription_callbacks.keys()
524
                )
525
            else:
526
                try:
527
                    callback(response.Parameters)
528
                except Exception:  # we call client code, catch everything!
529
                    self.logger.exception("Exception while calling user callback: %s")
530
            # Repeat with acknowledgement
531
            ack = ua.SubscriptionAcknowledgement()
532
            ack.SubscriptionId = subscription_id
533
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
534
535
    async def create_monitored_items(self, params):
536
        self.logger.info("create_monitored_items")
537
        request = ua.CreateMonitoredItemsRequest()
538
        request.Parameters = params
539
        data = await self.protocol.send_request(request)
540
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
541
        self.logger.debug(response)
542
        response.ResponseHeader.ServiceResult.check()
543
        return response.Results
544
545
    async def delete_monitored_items(self, params):
546
        self.logger.info("delete_monitored_items")
547
        request = ua.DeleteMonitoredItemsRequest()
548
        request.Parameters = params
549
        data = await self.protocol.send_request(request)
550
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
551
        self.logger.debug(response)
552
        response.ResponseHeader.ServiceResult.check()
553
        return response.Results
554
555
    async def add_nodes(self, nodestoadd):
556
        self.logger.info("add_nodes")
557
        request = ua.AddNodesRequest()
558
        request.Parameters.NodesToAdd = nodestoadd
559
        data = await self.protocol.send_request(request)
560
        response = struct_from_binary(ua.AddNodesResponse, data)
561
        self.logger.debug(response)
562
        response.ResponseHeader.ServiceResult.check()
563
        return response.Results
564
565
    async def add_references(self, refs):
566
        self.logger.info("add_references")
567
        request = ua.AddReferencesRequest()
568
        request.Parameters.ReferencesToAdd = refs
569
        data = await self.protocol.send_request(request)
570
        response = struct_from_binary(ua.AddReferencesResponse, data)
571
        self.logger.debug(response)
572
        response.ResponseHeader.ServiceResult.check()
573
        return response.Results
574
575
    async def delete_references(self, refs):
576
        self.logger.info("delete")
577
        request = ua.DeleteReferencesRequest()
578
        request.Parameters.ReferencesToDelete = refs
579
        data = await self.protocol.send_request(request)
580
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
581
        self.logger.debug(response)
582
        response.ResponseHeader.ServiceResult.check()
583
        return response.Parameters.Results
584
585
    async def delete_nodes(self, params):
586
        self.logger.info("delete_nodes")
587
        request = ua.DeleteNodesRequest()
588
        request.Parameters = params
589
        data = await self.protocol.send_request(request)
590
        response = struct_from_binary(ua.DeleteNodesResponse, data)
591
        self.logger.debug(response)
592
        response.ResponseHeader.ServiceResult.check()
593
        return response.Results
594
595
    async def call(self, methodstocall):
596
        request = ua.CallRequest()
597
        request.Parameters.MethodsToCall = methodstocall
598
        data = await self.protocol.send_request(request)
599
        response = struct_from_binary(ua.CallResponse, data)
600
        self.logger.debug(response)
601
        response.ResponseHeader.ServiceResult.check()
602
        return response.Results
603
604
    async def history_read(self, params):
605
        self.logger.info("history_read")
606
        request = ua.HistoryReadRequest()
607
        request.Parameters = params
608
        data = await self.protocol.send_request(request)
609
        response = struct_from_binary(ua.HistoryReadResponse, data)
610
        self.logger.debug(response)
611
        response.ResponseHeader.ServiceResult.check()
612
        return response.Results
613
614
    async def modify_monitored_items(self, params):
615
        self.logger.info("modify_monitored_items")
616
        request = ua.ModifyMonitoredItemsRequest()
617
        request.Parameters = params
618
        data = await self.protocol.send_request(request)
619
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
620
        self.logger.debug(response)
621
        response.ResponseHeader.ServiceResult.check()
622
        return response.Results
623
624
    async def register_nodes(self, nodes):
625
        self.logger.info("register_nodes")
626
        request = ua.RegisterNodesRequest()
627
        request.Parameters.NodesToRegister = nodes
628
        data = await self.protocol.send_request(request)
629
        response = struct_from_binary(ua.RegisterNodesResponse, data)
630
        self.logger.debug(response)
631
        response.ResponseHeader.ServiceResult.check()
632
        return response.Parameters.RegisteredNodeIds
633
634
    async def unregister_nodes(self, nodes):
635
        self.logger.info("unregister_nodes")
636
        request = ua.UnregisterNodesRequest()
637
        request.Parameters.NodesToUnregister = nodes
638
        data = await self.protocol.send_request(request)
639
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
640
        self.logger.debug(response)
641
        response.ResponseHeader.ServiceResult.check()
642
        # nothing to return for this service
643
644
    async def get_attributes(self, nodeids, attr):
645
        self.logger.info("get_attributes of several nodes")
646
        request = ua.ReadRequest()
647
        for nodeid in nodeids:
648
            rv = ua.ReadValueId()
649
            rv.NodeId = nodeid
650
            rv.AttributeId = attr
651
            request.Parameters.NodesToRead.append(rv)
652
        data = await self.protocol.send_request(request)
653
        response = struct_from_binary(ua.ReadResponse, data)
654
        response.ResponseHeader.ServiceResult.check()
655
        return response.Results
656
657
    async def set_attributes(self, nodeids, datavalues, attributeid=ua.AttributeIds.Value):
658
        """
659
        Set an attribute of multiple nodes
660
        datavalue is a ua.DataValue object
661
        """
662
        self.logger.info("set_attributes of several nodes")
663
        request = ua.WriteRequest()
664
        for idx, nodeid in enumerate(nodeids):
665
            attr = ua.WriteValue()
666
            attr.NodeId = nodeid
667
            attr.AttributeId = attributeid
668
            attr.Value = datavalues[idx]
669
            request.Parameters.NodesToWrite.append(attr)
670
        data = await self.protocol.send_request(request)
671
        response = struct_from_binary(ua.WriteResponse, data)
672
        response.ResponseHeader.ServiceResult.check()
673
        return response.Results
674
675
676