Completed
Push — master ( 9925e7...6c504f )
by Olivier
02:21
created

UaClient.set_monitoring_mode()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 2
dl 0
loc 12
rs 9.95
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        """
25
        :param timeout: Timeout in seconds
26
        :param security_policy: Security policy (optional)
27
        :param loop: Event loop (optional)
28
        """
29
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
30
        self.loop = loop or asyncio.get_event_loop()
31
        self.transport = None
32
        self.receive_buffer: Optional[bytes] = None
33
        self.is_receiving = False
34
        self.timeout = timeout
35
        self.authentication_token = ua.NodeId()
36
        self._request_id = 0
37
        self._request_handle = 0
38
        self._callbackmap: Dict[int, asyncio.Future] = {}
39
        self._connection = SecureConnection(security_policy)
40
        self.state = self.INITIALIZED
41
        self.closed: bool = False
42
43
    def connection_made(self, transport: asyncio.Transport):
44
        self.state = self.OPEN
45
        self.transport = transport
46
47
    def connection_lost(self, exc):
48
        self.logger.info("Socket has closed connection")
49
        self.state = self.CLOSED
50
        self.transport = None
51
52
    def data_received(self, data: bytes):
53
        if self.receive_buffer:
54
            data = self.receive_buffer + data
55
            self.receive_buffer = None
56
        self._process_received_data(data)
57
58
    def _process_received_data(self, data: bytes):
59
        """
60
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
61
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
62
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
63
        """
64
        buf = ua.utils.Buffer(data)
65
        while True:
66
            try:
67
                try:
68
                    header = header_from_binary(buf)
69
                except ua.utils.NotEnoughData:
70
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
71
                    self.receive_buffer = data
72
                    return
73
                if len(buf) < header.body_size:
74
                    self.logger.debug(
75
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
76
                    )
77
                    self.receive_buffer = data
78
                    return
79
                msg = self._connection.receive_from_header_and_body(header, buf)
80
                self._process_received_message(msg)
81
                if not buf:
82
                    return
83
                # Buffer still has bytes left, try to process again
84
                data = bytes(buf)
85
            except Exception:
86
                self.logger.exception('Exception raised while parsing message from server')
87
                self.disconnect_socket()
88
                return
89
90
    def _process_received_message(self, msg):
91
        if msg is None:
92
            pass
93
        elif isinstance(msg, ua.Message):
94
            self._call_callback(msg.request_id(), msg.body())
95
        elif isinstance(msg, ua.Acknowledge):
96
            self._call_callback(0, msg)
97
        elif isinstance(msg, ua.ErrorMessage):
98
            self.logger.fatal("Received an error: %r", msg)
99
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
100
        else:
101
            raise ua.UaError(f"Unsupported message type: {msg}")
102
103
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
104
        """
105
        Send request to server, lower-level method.
106
        Timeout is the timeout written in ua header.
107
        :param request: Request
108
        :param timeout: Timeout in seconds
109
        :param message_type: UA Message Type (optional)
110
        :return: Future that resolves with the Response
111
        """
112
        request.RequestHeader = self._create_request_header(timeout)
113
        self.logger.debug('Sending: %s', request)
114
        try:
115
            binreq = struct_to_binary(request)
116
        except Exception:
117
            # reset request handle if any error
118
            # see self._create_request_header
119
            self._request_handle -= 1
120
            raise
121
        self._request_id += 1
122
        future = self.loop.create_future()
123
        self._callbackmap[self._request_id] = future
124
125
        # Change to the new security token if the connection has been renewed.
126
        if self._connection.next_security_token.TokenId != 0:
127
            self._connection.revolve_tokens()
128
129
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
130
        self.transport.write(msg)
131
        return future
132
133
    async def send_request(self, request, timeout=None, message_type=ua.MessageType.SecureMessage):
134
        """
135
        Send a request to the server.
136
        Timeout is the timeout written in ua header.
137
        Returns response object if no callback is provided.
138
        """
139
        timeout = self.timeout if timeout is None else timeout
140
        try:
141
            data = await asyncio.wait_for(
142
                self._send_request(request, timeout, message_type),
143
                timeout if timeout else None
144
            )
145
        except Exception:
146
            if self.state != self.OPEN:
147
                raise ConnectionError("Connection is closed") from None
148
149
            raise
150
151
        self.check_answer(data, f" in response to {request.__class__.__name__}")
152
        return data
153
154
    def check_answer(self, data, context):
155
        data = data.copy()
156
        typeid = nodeid_from_binary(data)
157
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
158
            self.logger.warning("ServiceFault from server received %s", context)
159
            hdr = struct_from_binary(ua.ResponseHeader, data)
160
            hdr.ServiceResult.check()
161
            return False
162
        return True
163
164
    def _call_callback(self, request_id, body):
165
        try:
166
            self._callbackmap[request_id].set_result(body)
167
        except KeyError:
168
            raise ua.UaError(
169
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
170
            )
171
        except asyncio.InvalidStateError:
172
            if not self.closed:
173
                raise ua.UaError(f"Future for request id {request_id} is already done")
174
            self.logger.debug("Future for request id %s not handled due to disconnect", request_id)
175
        del self._callbackmap[request_id]
176
177
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
178
        """
179
        :param timeout: Timeout in seconds
180
        :return: Request header
181
        """
182
        hdr = ua.RequestHeader()
183
        hdr.AuthenticationToken = self.authentication_token
184
        self._request_handle += 1
185
        hdr.RequestHandle = self._request_handle
186
        hdr.TimeoutHint = timeout * 1000
187
        return hdr
188
189
    def disconnect_socket(self):
190
        self.logger.info("Request to close socket received")
191
        if self.transport:
192
            self.transport.close()
193
        else:
194
            self.logger.warning("disconnect_socket was called but transport is None")
195
196
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
197
        hello = ua.Hello()
198
        hello.EndpointUrl = url
199
        hello.MaxMessageSize = max_messagesize
200
        hello.MaxChunkCount = max_chunkcount
201
        ack = asyncio.Future()
202
        self._callbackmap[0] = ack
203
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
204
        return await asyncio.wait_for(ack, self.timeout)
205
206
    async def open_secure_channel(self, params):
207
        self.logger.info("open_secure_channel")
208
        request = ua.OpenSecureChannelRequest()
209
        request.Parameters = params
210
        result = await asyncio.wait_for(
211
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
212
            self.timeout
213
        )
214
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
215
        response.ResponseHeader.ServiceResult.check()
216
        self._connection.set_channel(response.Parameters, params.RequestType, params.ClientNonce)
217
        return response.Parameters
218
219
    async def close_secure_channel(self):
220
        """
221
        Close secure channel.
222
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
223
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
224
        and should just close socket.
225
        """
226
        self.logger.info("close_secure_channel")
227
        request = ua.CloseSecureChannelRequest()
228
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
229
        # don't expect any more answers
230
        future.cancel()
231
        self._callbackmap.clear()
232
        # some servers send a response here, most do not ... so we ignore
233
234
235
class UaClient:
236
    """
237
    low level OPC-UA client.
238
239
    It implements (almost) all methods defined in asyncua spec
240
    taking in argument the structures defined in asyncua spec.
241
242
    In this Python implementation  most of the structures are defined in
243
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
244
    """
245
246
    def __init__(self, timeout=1, loop=None):
247
        """
248
        :param timeout: Timout in seconds
249
        :param loop: Event loop (optional)
250
        """
251
        self.logger = logging.getLogger(f'{__name__}.UaClient')
252
        self.loop = loop or asyncio.get_event_loop()
253
        self._subscription_callbacks = {}
254
        self._timeout = timeout
255
        self.security_policy = ua.SecurityPolicy()
256
        self.protocol: Optional[UASocketProtocol] = None
257
        self._publish_task = None
258
259
    def set_security(self, policy: ua.SecurityPolicy):
260
        self.security_policy = policy
261
262
    def _make_protocol(self):
263
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
264
        return self.protocol
265
266
    async def connect_socket(self, host: str, port: int):
267
        """Connect to server socket."""
268
        self.logger.info("opening connection")
269
        # Timeout the connection when the server isn't available
270
        await asyncio.wait_for(
271
            self.loop.create_connection(self._make_protocol, host, port), self._timeout
272
        )
273
274
    def disconnect_socket(self):
275
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
276
            self.logger.warning("disconnect_socket was called but connection is closed")
277
            return None
278
        return self.protocol.disconnect_socket()
279
280
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
281
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
282
283
    async def open_secure_channel(self, params):
284
        return await self.protocol.open_secure_channel(params)
285
286
    async def close_secure_channel(self):
287
        """
288
        close secure channel. It seems to trigger a shutdown of socket
289
        in most servers, so be prepare to reconnect
290
        """
291
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
292
            self.logger.warning("close_secure_channel was called but connection is closed")
293
            return
294
        return await self.protocol.close_secure_channel()
295
296
    async def create_session(self, parameters):
297
        self.logger.info("create_session")
298
        self.protocol.closed = False
299
        request = ua.CreateSessionRequest()
300
        request.Parameters = parameters
301
        data = await self.protocol.send_request(request)
302
        response = struct_from_binary(ua.CreateSessionResponse, data)
303
        self.logger.debug(response)
304
        response.ResponseHeader.ServiceResult.check()
305
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
306
        return response.Parameters
307
308
    async def activate_session(self, parameters):
309
        self.logger.info("activate_session")
310
        request = ua.ActivateSessionRequest()
311
        request.Parameters = parameters
312
        data = await self.protocol.send_request(request)
313
        response = struct_from_binary(ua.ActivateSessionResponse, data)
314
        self.logger.debug(response)
315
        response.ResponseHeader.ServiceResult.check()
316
        return response.Parameters
317
318
    async def close_session(self, delete_subscriptions):
319
        self.logger.info("close_session")
320
        self.protocol.closed = True
321
        if self._publish_task and not self._publish_task.done():
322
            self._publish_task.cancel()
323
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
324
            self.logger.warning("close_session was called but connection is closed")
325
            return
326
        request = ua.CloseSessionRequest()
327
        request.DeleteSubscriptions = delete_subscriptions
328
        data = await self.protocol.send_request(request)
329
        response = struct_from_binary(ua.CloseSessionResponse, data)
330
        try:
331
            response.ResponseHeader.ServiceResult.check()
332
        except BadSessionClosed:
333
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
334
            #          we can just ignore it therefore.
335
            #          Alternatively we could make sure that there are no publish requests in flight when
336
            #          closing the session.
337
            pass
338
339
    async def browse(self, parameters):
340
        self.logger.info("browse")
341
        request = ua.BrowseRequest()
342
        request.Parameters = parameters
343
        data = await self.protocol.send_request(request)
344
        response = struct_from_binary(ua.BrowseResponse, data)
345
        self.logger.debug(response)
346
        response.ResponseHeader.ServiceResult.check()
347
        return response.Results
348
349
    async def browse_next(self, parameters):
350
        self.logger.debug("browse next")
351
        request = ua.BrowseNextRequest()
352
        request.Parameters = parameters
353
        data = await self.protocol.send_request(request)
354
        response = struct_from_binary(ua.BrowseNextResponse, data)
355
        self.logger.debug(response)
356
        response.ResponseHeader.ServiceResult.check()
357
        return response.Parameters.Results
358
359
    async def read(self, parameters):
360
        self.logger.debug("read")
361
        request = ua.ReadRequest()
362
        request.Parameters = parameters
363
        data = await self.protocol.send_request(request)
364
        response = struct_from_binary(ua.ReadResponse, data)
365
        self.logger.debug(response)
366
        response.ResponseHeader.ServiceResult.check()
367
        # cast to Enum attributes that need to
368
        for idx, rv in enumerate(parameters.NodesToRead):
369
            if rv.AttributeId == ua.AttributeIds.NodeClass:
370
                dv = response.Results[idx]
371
                if dv.StatusCode.is_good():
372
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
373
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
374
                dv = response.Results[idx]
375
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
376
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
377
        return response.Results
378
379
    async def write(self, params):
380
        self.logger.debug("write")
381
        request = ua.WriteRequest()
382
        request.Parameters = params
383
        data = await self.protocol.send_request(request)
384
        response = struct_from_binary(ua.WriteResponse, data)
385
        self.logger.debug(response)
386
        response.ResponseHeader.ServiceResult.check()
387
        return response.Results
388
389
    async def get_endpoints(self, params):
390
        self.logger.debug("get_endpoint")
391
        request = ua.GetEndpointsRequest()
392
        request.Parameters = params
393
        data = await self.protocol.send_request(request)
394
        response = struct_from_binary(ua.GetEndpointsResponse, data)
395
        self.logger.debug(response)
396
        response.ResponseHeader.ServiceResult.check()
397
        return response.Endpoints
398
399
    async def find_servers(self, params):
400
        self.logger.debug("find_servers")
401
        request = ua.FindServersRequest()
402
        request.Parameters = params
403
        data = await self.protocol.send_request(request)
404
        response = struct_from_binary(ua.FindServersResponse, data)
405
        self.logger.debug(response)
406
        response.ResponseHeader.ServiceResult.check()
407
        return response.Servers
408
409
    async def find_servers_on_network(self, params):
410
        self.logger.debug("find_servers_on_network")
411
        request = ua.FindServersOnNetworkRequest()
412
        request.Parameters = params
413
        data = await self.protocol.send_request(request)
414
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
415
        self.logger.debug(response)
416
        response.ResponseHeader.ServiceResult.check()
417
        return response.Parameters
418
419
    async def register_server(self, registered_server):
420
        self.logger.debug("register_server")
421
        request = ua.RegisterServerRequest()
422
        request.Server = registered_server
423
        data = await self.protocol.send_request(request)
424
        response = struct_from_binary(ua.RegisterServerResponse, data)
425
        self.logger.debug(response)
426
        response.ResponseHeader.ServiceResult.check()
427
        # nothing to return for this service
428
429
    async def register_server2(self, params):
430
        self.logger.debug("register_server2")
431
        request = ua.RegisterServer2Request()
432
        request.Parameters = params
433
        data = await self.protocol.send_request(request)
434
        response = struct_from_binary(ua.RegisterServer2Response, data)
435
        self.logger.debug(response)
436
        response.ResponseHeader.ServiceResult.check()
437
        return response.ConfigurationResults
438
439
    async def translate_browsepaths_to_nodeids(self, browse_paths):
440
        self.logger.debug("translate_browsepath_to_nodeid")
441
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
442
        request.Parameters.BrowsePaths = browse_paths
443
        data = await self.protocol.send_request(request)
444
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
445
        self.logger.debug(response)
446
        response.ResponseHeader.ServiceResult.check()
447
        return response.Results
448
449
    async def create_subscription(self, params, callback):
450
        self.logger.debug("create_subscription")
451
        request = ua.CreateSubscriptionRequest()
452
        request.Parameters = params
453
        data = await self.protocol.send_request(request)
454
        response = struct_from_binary(
455
            ua.CreateSubscriptionResponse,
456
            data
457
        )
458
        response.ResponseHeader.ServiceResult.check()
459
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
460
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
461
        if not self._publish_task or self._publish_task.done():
462
            # Start the publish loop if it is not yet running
463
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
464
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
465
            # could be used if necessary.
466
            self._publish_task = self.loop.create_task(self._publish_loop())
467
        return response.Parameters
468
469
    async def delete_subscriptions(self, subscription_ids):
470
        self.logger.debug("delete_subscriptions %r", subscription_ids)
471
        request = ua.DeleteSubscriptionsRequest()
472
        request.Parameters.SubscriptionIds = subscription_ids
473
        data = await self.protocol.send_request(request)
474
        response = struct_from_binary(
475
            ua.DeleteSubscriptionsResponse,
476
            data
477
        )
478
        response.ResponseHeader.ServiceResult.check()
479
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
480
        for sid in subscription_ids:
481
            self._subscription_callbacks.pop(sid)
482
        return response.Results
483
484
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
485
        """
486
        Send a PublishRequest to the server.
487
        """
488
        self.logger.debug('publish %r', acks)
489
        request = ua.PublishRequest()
490
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
491
        data = await self.protocol.send_request(request, timeout=0)
492
        self.protocol.check_answer(data, "while waiting for publish response")
493
        try:
494
            response = struct_from_binary(ua.PublishResponse, data)
495
        except Exception:
496
            self.logger.exception("Error parsing notification from server")
497
            raise UaStructParsingError
498
        return response
499
500
    async def _publish_loop(self):
501
        """
502
        Start a loop that sends a publish requests and waits for the publish responses.
503
        Forward the `PublishResult` to the matching `Subscription` by callback.
504
        """
505
        ack = None
506
        while True:
507
            try:
508
                response = await self.publish([ack] if ack else [])
509
            except BadTimeout:  # See Spec. Part 4, 7.28
510
                # Repeat without acknowledgement
511
                ack = None
512
                continue
513
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
514
                # BadNoSubscription is expected to be received after deleting the last subscription.
515
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
516
                # checking if there are no more subscriptions registered in this client (). A Publish response
517
                # could still arrive before the DeleteSubscription response.
518
                #
519
                # We could remove the callback already when sending the DeleteSubscription request,
520
                # but there are some legitimate reasons to keep them around, such as when the server
521
                # responds with "BadTimeout" and we should try again later instead of just removing
522
                # the subscription client-side.
523
                #
524
                # There are a variety of ways to act correctly, but the most practical solution seems
525
                # to be to just silently ignore any BadNoSubscription responses.
526
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
527
                # End task
528
                return
529
            except UaStructParsingError:
530
                ack = None
531
                continue
532
            subscription_id = response.Parameters.SubscriptionId
533
            if not subscription_id:
534
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
535
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
536
                # End task
537
                return
538
            try:
539
                callback = self._subscription_callbacks[subscription_id]
540
            except KeyError:
541
                self.logger.warning(
542
                    "Received data for unknown subscription %s active are %s", subscription_id,
543
                    self._subscription_callbacks.keys()
544
                )
545
            else:
546
                try:
547
                    if asyncio.iscoroutinefunction(callback):
548
                        await callback(response.Parameters)
549
                    else:
550
                        callback(response.Parameters)
551
                except Exception:  # we call user code, catch everything!
552
                    self.logger.exception("Exception while calling user callback: %s")
553
            # Repeat with acknowledgement
554
            if response.Parameters.NotificationMessage.NotificationData:
555
                ack = ua.SubscriptionAcknowledgement()
556
                ack.SubscriptionId = subscription_id
557
                ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
558
            else:
559
                ack = None
560
561
    async def create_monitored_items(self, params):
562
        self.logger.info("create_monitored_items")
563
        request = ua.CreateMonitoredItemsRequest()
564
        request.Parameters = params
565
        data = await self.protocol.send_request(request)
566
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
567
        self.logger.debug(response)
568
        response.ResponseHeader.ServiceResult.check()
569
        return response.Results
570
571
    async def delete_monitored_items(self, params):
572
        self.logger.info("delete_monitored_items")
573
        request = ua.DeleteMonitoredItemsRequest()
574
        request.Parameters = params
575
        data = await self.protocol.send_request(request)
576
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
577
        self.logger.debug(response)
578
        response.ResponseHeader.ServiceResult.check()
579
        return response.Results
580
581
    async def add_nodes(self, nodestoadd):
582
        self.logger.info("add_nodes")
583
        request = ua.AddNodesRequest()
584
        request.Parameters.NodesToAdd = nodestoadd
585
        data = await self.protocol.send_request(request)
586
        response = struct_from_binary(ua.AddNodesResponse, data)
587
        self.logger.debug(response)
588
        response.ResponseHeader.ServiceResult.check()
589
        return response.Results
590
591
    async def add_references(self, refs):
592
        self.logger.info("add_references")
593
        request = ua.AddReferencesRequest()
594
        request.Parameters.ReferencesToAdd = refs
595
        data = await self.protocol.send_request(request)
596
        response = struct_from_binary(ua.AddReferencesResponse, data)
597
        self.logger.debug(response)
598
        response.ResponseHeader.ServiceResult.check()
599
        return response.Results
600
601
    async def delete_references(self, refs):
602
        self.logger.info("delete")
603
        request = ua.DeleteReferencesRequest()
604
        request.Parameters.ReferencesToDelete = refs
605
        data = await self.protocol.send_request(request)
606
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
607
        self.logger.debug(response)
608
        response.ResponseHeader.ServiceResult.check()
609
        return response.Parameters.Results
610
611
    async def delete_nodes(self, params):
612
        self.logger.info("delete_nodes")
613
        request = ua.DeleteNodesRequest()
614
        request.Parameters = params
615
        data = await self.protocol.send_request(request)
616
        response = struct_from_binary(ua.DeleteNodesResponse, data)
617
        self.logger.debug(response)
618
        response.ResponseHeader.ServiceResult.check()
619
        return response.Results
620
621
    async def call(self, methodstocall):
622
        request = ua.CallRequest()
623
        request.Parameters.MethodsToCall = methodstocall
624
        data = await self.protocol.send_request(request)
625
        response = struct_from_binary(ua.CallResponse, data)
626
        self.logger.debug(response)
627
        response.ResponseHeader.ServiceResult.check()
628
        return response.Results
629
630
    async def history_read(self, params):
631
        self.logger.info("history_read")
632
        request = ua.HistoryReadRequest()
633
        request.Parameters = params
634
        data = await self.protocol.send_request(request)
635
        response = struct_from_binary(ua.HistoryReadResponse, data)
636
        self.logger.debug(response)
637
        response.ResponseHeader.ServiceResult.check()
638
        return response.Results
639
640
    async def modify_monitored_items(self, params):
641
        self.logger.info("modify_monitored_items")
642
        request = ua.ModifyMonitoredItemsRequest()
643
        request.Parameters = params
644
        data = await self.protocol.send_request(request)
645
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
646
        self.logger.debug(response)
647
        response.ResponseHeader.ServiceResult.check()
648
        return response.Results
649
650
    async def register_nodes(self, nodes):
651
        self.logger.info("register_nodes")
652
        request = ua.RegisterNodesRequest()
653
        request.Parameters.NodesToRegister = nodes
654
        data = await self.protocol.send_request(request)
655
        response = struct_from_binary(ua.RegisterNodesResponse, data)
656
        self.logger.debug(response)
657
        response.ResponseHeader.ServiceResult.check()
658
        return response.Parameters.RegisteredNodeIds
659
660
    async def unregister_nodes(self, nodes):
661
        self.logger.info("unregister_nodes")
662
        request = ua.UnregisterNodesRequest()
663
        request.Parameters.NodesToUnregister = nodes
664
        data = await self.protocol.send_request(request)
665
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
666
        self.logger.debug(response)
667
        response.ResponseHeader.ServiceResult.check()
668
        # nothing to return for this service
669
670
    async def read_attributes(self, nodeids, attr):
671
        self.logger.info("read_attributes of several nodes")
672
        request = ua.ReadRequest()
673
        for nodeid in nodeids:
674
            rv = ua.ReadValueId()
675
            rv.NodeId = nodeid
676
            rv.AttributeId = attr
677
            request.Parameters.NodesToRead.append(rv)
678
        data = await self.protocol.send_request(request)
679
        response = struct_from_binary(ua.ReadResponse, data)
680
        response.ResponseHeader.ServiceResult.check()
681
        return response.Results
682
683
    async def write_attributes(self, nodeids, datavalues, attributeid=ua.AttributeIds.Value):
684
        """
685
        Set an attribute of multiple nodes
686
        datavalue is a ua.DataValue object
687
        """
688
        self.logger.info("write_attributes of several nodes")
689
        request = ua.WriteRequest()
690
        for idx, nodeid in enumerate(nodeids):
691
            attr = ua.WriteValue()
692
            attr.NodeId = nodeid
693
            attr.AttributeId = attributeid
694
            attr.Value = datavalues[idx]
695
            request.Parameters.NodesToWrite.append(attr)
696
        data = await self.protocol.send_request(request)
697
        response = struct_from_binary(ua.WriteResponse, data)
698
        response.ResponseHeader.ServiceResult.check()
699
        return response.Results
700
701
    async def set_monitoring_mode(self, params) -> ua.uatypes.StatusCode:
702
        """
703
        Update the subscription monitoring mode
704
        """
705
        self.logger.info("set_monitoring_mode")
706
        request = ua.SetMonitoringModeRequest()
707
        request.Parameters = params
708
        data = await self.protocol.send_request(request)
709
        response = struct_from_binary(ua.SetMonitoringModeResponse, data)
710
        self.logger.debug(response)
711
        response.ResponseHeader.ServiceResult.check()
712
        return response.Parameters.Results
713
714
    async def set_publishing_mode(self, params) -> ua.uatypes.StatusCode:
715
        """
716
        Update the subscription publishing mode
717
        """
718
        self.logger.info("set_publishing_mode")
719
        request = ua.SetPublishingModeRequest()
720
        request.Parameters = params
721
        data = await self.protocol.send_request(request)
722
        response = struct_from_binary(ua.SetPublishingModeResponse, data)
723
        self.logger.debug(response)
724
        response.ResponseHeader.ServiceResult.check()
725
        return response.Parameters.Results
726