UASocketProtocol.send_hello()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 4
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        """
25
        :param timeout: Timeout in seconds
26
        :param security_policy: Security policy (optional)
27
        :param loop: Event loop (optional)
28
        """
29
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
30
        self.loop = loop or asyncio.get_event_loop()
31
        self.transport = None
32
        self.receive_buffer: Optional[bytes] = None
33
        self.is_receiving = False
34
        self.timeout = timeout
35
        self.authentication_token = ua.NodeId()
36
        self._request_id = 0
37
        self._request_handle = 0
38
        self._callbackmap: Dict[int, asyncio.Future] = {}
39
        self._connection = SecureConnection(security_policy)
40
        self.state = self.INITIALIZED
41
        self.closed: bool = False
42
        # needed to pass params from asynchronous request to synchronous data receive callback, as well as
43
        # passing back the processed response to the request so that it can return it.
44
        self._open_secure_channel_exchange = None
45
46
    def connection_made(self, transport: asyncio.Transport):
47
        self.state = self.OPEN
48
        self.transport = transport
49
50
    def connection_lost(self, exc):
51
        self.logger.info("Socket has closed connection")
52
        self.state = self.CLOSED
53
        self.transport = None
54
55
    def data_received(self, data: bytes):
56
        if self.receive_buffer:
57
            data = self.receive_buffer + data
58
            self.receive_buffer = None
59
        self._process_received_data(data)
60
61
    def _process_received_data(self, data: bytes):
62
        """
63
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
64
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
65
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
66
        """
67
        buf = ua.utils.Buffer(data)
68
        while True:
69
            try:
70
                try:
71
                    header = header_from_binary(buf)
72
                except ua.utils.NotEnoughData:
73
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
74
                    self.receive_buffer = data
75
                    return
76
                if len(buf) < header.body_size:
77
                    self.logger.debug('We did not receive enough data from server. Need %s got %s', header.body_size, len(buf))
78
                    self.receive_buffer = data
79
                    return
80
                msg = self._connection.receive_from_header_and_body(header, buf)
81
                self._process_received_message(msg)
82
                if header.MessageType == ua.MessageType.SecureOpen:
83
                    params = self._open_secure_channel_exchange
84
                    self._open_secure_channel_exchange = struct_from_binary(ua.OpenSecureChannelResponse, msg.body())
85
                    self._open_secure_channel_exchange.ResponseHeader.ServiceResult.check()
86
                    self._connection.set_channel(self._open_secure_channel_exchange.Parameters, params.RequestType, params.ClientNonce)
87
                if not buf:
88
                    return
89
                # Buffer still has bytes left, try to process again
90
                data = bytes(buf)
91
            except Exception:
92
                self.logger.exception('Exception raised while parsing message from server')
93
                self.disconnect_socket()
94
                return
95
96
    def _process_received_message(self, msg):
97
        if msg is None:
98
            pass
99
        elif isinstance(msg, ua.Message):
100
            self._call_callback(msg.request_id(), msg.body())
101
        elif isinstance(msg, ua.Acknowledge):
102
            self._call_callback(0, msg)
103
        elif isinstance(msg, ua.ErrorMessage):
104
            self.logger.fatal("Received an error: %r", msg)
105
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
106
        else:
107
            raise ua.UaError(f"Unsupported message type: {msg}")
108
109
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
110
        """
111
        Send request to server, lower-level method.
112
        Timeout is the timeout written in ua header.
113
        :param request: Request
114
        :param timeout: Timeout in seconds
115
        :param message_type: UA Message Type (optional)
116
        :return: Future that resolves with the Response
117
        """
118
        request.RequestHeader = self._create_request_header(timeout)
119
        self.logger.debug('Sending: %s', request)
120
        try:
121
            binreq = struct_to_binary(request)
122
        except Exception:
123
            # reset request handle if any error
124
            # see self._create_request_header
125
            self._request_handle -= 1
126
            raise
127
        self._request_id += 1
128
        future = self.loop.create_future()
129
        self._callbackmap[self._request_id] = future
130
131
        # Change to the new security token if the connection has been renewed.
132
        if self._connection.next_security_token.TokenId != 0:
133
            self._connection.revolve_tokens()
134
135
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
136
        self.transport.write(msg)
137
        return future
138
139
    async def send_request(self, request, timeout=None, message_type=ua.MessageType.SecureMessage):
140
        """
141
        Send a request to the server.
142
        Timeout is the timeout written in ua header.
143
        Returns response object if no callback is provided.
144
        """
145
        timeout = self.timeout if timeout is None else timeout
146
        try:
147
            data = await asyncio.wait_for(self._send_request(request, timeout, message_type), timeout if timeout else None)
148
        except Exception:
149
            if self.state != self.OPEN:
150
                raise ConnectionError("Connection is closed") from None
151
152
            raise
153
154
        self.check_answer(data, f" in response to {request.__class__.__name__}")
155
        return data
156
157
    def check_answer(self, data, context):
158
        data = data.copy()
159
        typeid = nodeid_from_binary(data)
160
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
161
            self.logger.warning("ServiceFault from server received %s", context)
162
            hdr = struct_from_binary(ua.ResponseHeader, data)
163
            hdr.ServiceResult.check()
164
            return False
165
        return True
166
167
    def _call_callback(self, request_id, body):
168
        try:
169
            self._callbackmap[request_id].set_result(body)
170
        except KeyError:
171
            raise ua.UaError(f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}")
172
        except asyncio.InvalidStateError:
173
            if not self.closed:
174
                raise ua.UaError(f"Future for request id {request_id} is already done")
175
            self.logger.debug("Future for request id %s not handled due to disconnect", request_id)
176
        del self._callbackmap[request_id]
177
178
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
179
        """
180
        :param timeout: Timeout in seconds
181
        :return: Request header
182
        """
183
        hdr = ua.RequestHeader()
184
        hdr.AuthenticationToken = self.authentication_token
185
        self._request_handle += 1
186
        hdr.RequestHandle = self._request_handle
187
        hdr.TimeoutHint = timeout * 1000
188
        return hdr
189
190
    def disconnect_socket(self):
191
        self.logger.info("Request to close socket received")
192
        if self.transport:
193
            self.transport.close()
194
        else:
195
            self.logger.warning("disconnect_socket was called but transport is None")
196
197
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
198
        hello = ua.Hello()
199
        hello.EndpointUrl = url
200
        hello.MaxMessageSize = max_messagesize
201
        hello.MaxChunkCount = max_chunkcount
202
        ack = asyncio.Future()
203
        self._callbackmap[0] = ack
204
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
205
        return await asyncio.wait_for(ack, self.timeout)
206
207
    async def open_secure_channel(self, params):
208
        self.logger.info("open_secure_channel")
209
        request = ua.OpenSecureChannelRequest()
210
        request.Parameters = params
211
        if self._open_secure_channel_exchange is not None:
212
            raise RuntimeError('Two Open Secure Channel requests can not happen too close to each other. ' 'The response must be processed and returned before the next request can be sent.')
213
        self._open_secure_channel_exchange = params
214
        await asyncio.wait_for(self._send_request(request, message_type=ua.MessageType.SecureOpen), self.timeout)
215
        _return = self._open_secure_channel_exchange.Parameters
216
        self._open_secure_channel_exchange = None
217
        return _return
218
219
    async def close_secure_channel(self):
220
        """
221
        Close secure channel.
222
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
223
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
224
        and should just close socket.
225
        """
226
        self.logger.info("close_secure_channel")
227
        request = ua.CloseSecureChannelRequest()
228
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
229
        # don't expect any more answers
230
        future.cancel()
231
        self._callbackmap.clear()
232
        # some servers send a response here, most do not ... so we ignore
233
234
235
class UaClient:
236
    """
237
    low level OPC-UA client.
238
239
    It implements (almost) all methods defined in asyncua spec
240
    taking in argument the structures defined in asyncua spec.
241
242
    In this Python implementation  most of the structures are defined in
243
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
244
    """
245
    def __init__(self, timeout=1, loop=None):
246
        """
247
        :param timeout: Timout in seconds
248
        :param loop: Event loop (optional)
249
        """
250
        self.logger = logging.getLogger(f'{__name__}.UaClient')
251
        self.loop = loop or asyncio.get_event_loop()
252
        self._subscription_callbacks = {}
253
        self._timeout = timeout
254
        self.security_policy = ua.SecurityPolicy()
255
        self.protocol: Optional[UASocketProtocol] = None
256
        self._publish_task = None
257
258
    def set_security(self, policy: ua.SecurityPolicy):
259
        self.security_policy = policy
260
261
    def _make_protocol(self):
262
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
263
        return self.protocol
264
265
    async def connect_socket(self, host: str, port: int):
266
        """Connect to server socket."""
267
        self.logger.info("opening connection")
268
        # Timeout the connection when the server isn't available
269
        await asyncio.wait_for(self.loop.create_connection(self._make_protocol, host, port), self._timeout)
270
271
    def disconnect_socket(self):
272
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
273
            self.logger.warning("disconnect_socket was called but connection is closed")
274
            return None
275
        return self.protocol.disconnect_socket()
276
277
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
278
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
279
280
    async def open_secure_channel(self, params):
281
        return await self.protocol.open_secure_channel(params)
282
283
    async def close_secure_channel(self):
284
        """
285
        close secure channel. It seems to trigger a shutdown of socket
286
        in most servers, so be prepare to reconnect
287
        """
288
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
289
            self.logger.warning("close_secure_channel was called but connection is closed")
290
            return
291
        return await self.protocol.close_secure_channel()
292
293
    async def create_session(self, parameters):
294
        self.logger.info("create_session")
295
        self.protocol.closed = False
296
        request = ua.CreateSessionRequest()
297
        request.Parameters = parameters
298
        data = await self.protocol.send_request(request)
299
        response = struct_from_binary(ua.CreateSessionResponse, data)
300
        self.logger.debug(response)
301
        response.ResponseHeader.ServiceResult.check()
302
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
303
        return response.Parameters
304
305
    async def activate_session(self, parameters):
306
        self.logger.info("activate_session")
307
        request = ua.ActivateSessionRequest()
308
        request.Parameters = parameters
309
        data = await self.protocol.send_request(request)
310
        response = struct_from_binary(ua.ActivateSessionResponse, data)
311
        self.logger.debug(response)
312
        response.ResponseHeader.ServiceResult.check()
313
        return response.Parameters
314
315
    async def close_session(self, delete_subscriptions):
316
        self.logger.info("close_session")
317
        self.protocol.closed = True
318
        if self._publish_task and not self._publish_task.done():
319
            self._publish_task.cancel()
320
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
321
            self.logger.warning("close_session was called but connection is closed")
322
            return
323
        request = ua.CloseSessionRequest()
324
        request.DeleteSubscriptions = delete_subscriptions
325
        data = await self.protocol.send_request(request)
326
        response = struct_from_binary(ua.CloseSessionResponse, data)
327
        try:
328
            response.ResponseHeader.ServiceResult.check()
329
        except BadSessionClosed:
330
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
331
            #          we can just ignore it therefore.
332
            #          Alternatively we could make sure that there are no publish requests in flight when
333
            #          closing the session.
334
            pass
335
336
    async def browse(self, parameters):
337
        self.logger.info("browse")
338
        request = ua.BrowseRequest()
339
        request.Parameters = parameters
340
        data = await self.protocol.send_request(request)
341
        response = struct_from_binary(ua.BrowseResponse, data)
342
        self.logger.debug(response)
343
        response.ResponseHeader.ServiceResult.check()
344
        return response.Results
345
346
    async def browse_next(self, parameters):
347
        self.logger.debug("browse next")
348
        request = ua.BrowseNextRequest()
349
        request.Parameters = parameters
350
        data = await self.protocol.send_request(request)
351
        response = struct_from_binary(ua.BrowseNextResponse, data)
352
        self.logger.debug(response)
353
        response.ResponseHeader.ServiceResult.check()
354
        return response.Parameters.Results
355
356
    async def read(self, parameters):
357
        self.logger.debug("read")
358
        request = ua.ReadRequest()
359
        request.Parameters = parameters
360
        data = await self.protocol.send_request(request)
361
        response = struct_from_binary(ua.ReadResponse, data)
362
        self.logger.debug(response)
363
        response.ResponseHeader.ServiceResult.check()
364
        # cast to Enum attributes that need to
365
        for idx, rv in enumerate(parameters.NodesToRead):
366
            if rv.AttributeId == ua.AttributeIds.NodeClass:
367
                dv = response.Results[idx]
368
                if dv.StatusCode.is_good():
369
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
370
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
371
                dv = response.Results[idx]
372
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
373
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
374
        return response.Results
375
376
    async def write(self, params):
377
        self.logger.debug("write")
378
        request = ua.WriteRequest()
379
        request.Parameters = params
380
        data = await self.protocol.send_request(request)
381
        response = struct_from_binary(ua.WriteResponse, data)
382
        self.logger.debug(response)
383
        response.ResponseHeader.ServiceResult.check()
384
        return response.Results
385
386
    async def get_endpoints(self, params):
387
        self.logger.debug("get_endpoint")
388
        request = ua.GetEndpointsRequest()
389
        request.Parameters = params
390
        data = await self.protocol.send_request(request)
391
        response = struct_from_binary(ua.GetEndpointsResponse, data)
392
        self.logger.debug(response)
393
        response.ResponseHeader.ServiceResult.check()
394
        return response.Endpoints
395
396
    async def find_servers(self, params):
397
        self.logger.debug("find_servers")
398
        request = ua.FindServersRequest()
399
        request.Parameters = params
400
        data = await self.protocol.send_request(request)
401
        response = struct_from_binary(ua.FindServersResponse, data)
402
        self.logger.debug(response)
403
        response.ResponseHeader.ServiceResult.check()
404
        return response.Servers
405
406
    async def find_servers_on_network(self, params):
407
        self.logger.debug("find_servers_on_network")
408
        request = ua.FindServersOnNetworkRequest()
409
        request.Parameters = params
410
        data = await self.protocol.send_request(request)
411
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
412
        self.logger.debug(response)
413
        response.ResponseHeader.ServiceResult.check()
414
        return response.Parameters
415
416
    async def register_server(self, registered_server):
417
        self.logger.debug("register_server")
418
        request = ua.RegisterServerRequest()
419
        request.Server = registered_server
420
        data = await self.protocol.send_request(request)
421
        response = struct_from_binary(ua.RegisterServerResponse, data)
422
        self.logger.debug(response)
423
        response.ResponseHeader.ServiceResult.check()
424
        # nothing to return for this service
425
426
    async def register_server2(self, params):
427
        self.logger.debug("register_server2")
428
        request = ua.RegisterServer2Request()
429
        request.Parameters = params
430
        data = await self.protocol.send_request(request)
431
        response = struct_from_binary(ua.RegisterServer2Response, data)
432
        self.logger.debug(response)
433
        response.ResponseHeader.ServiceResult.check()
434
        return response.ConfigurationResults
435
436
    async def translate_browsepaths_to_nodeids(self, browse_paths):
437
        self.logger.debug("translate_browsepath_to_nodeid")
438
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
439
        request.Parameters.BrowsePaths = browse_paths
440
        data = await self.protocol.send_request(request)
441
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
442
        self.logger.debug(response)
443
        response.ResponseHeader.ServiceResult.check()
444
        return response.Results
445
446
    async def create_subscription(self, params, callback):
447
        self.logger.debug("create_subscription")
448
        request = ua.CreateSubscriptionRequest()
449
        request.Parameters = params
450
        data = await self.protocol.send_request(request)
451
        response = struct_from_binary(ua.CreateSubscriptionResponse, data)
452
        response.ResponseHeader.ServiceResult.check()
453
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
454
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
455
        if not self._publish_task or self._publish_task.done():
456
            # Start the publish loop if it is not yet running
457
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
458
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
459
            # could be used if necessary.
460
            self._publish_task = self.loop.create_task(self._publish_loop())
461
        return response.Parameters
462
463
    async def delete_subscriptions(self, subscription_ids):
464
        self.logger.debug("delete_subscriptions %r", subscription_ids)
465
        request = ua.DeleteSubscriptionsRequest()
466
        request.Parameters.SubscriptionIds = subscription_ids
467
        data = await self.protocol.send_request(request)
468
        response = struct_from_binary(ua.DeleteSubscriptionsResponse, data)
469
        response.ResponseHeader.ServiceResult.check()
470
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
471
        for sid in subscription_ids:
472
            self._subscription_callbacks.pop(sid)
473
        return response.Results
474
475
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
476
        """
477
        Send a PublishRequest to the server.
478
        """
479
        self.logger.debug('publish %r', acks)
480
        request = ua.PublishRequest()
481
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
482
        data = await self.protocol.send_request(request, timeout=0)
483
        self.protocol.check_answer(data, "while waiting for publish response")
484
        try:
485
            response = struct_from_binary(ua.PublishResponse, data)
486
        except Exception:
487
            self.logger.exception("Error parsing notification from server")
488
            raise UaStructParsingError
489
        return response
490
491
    async def _publish_loop(self):
492
        """
493
        Start a loop that sends a publish requests and waits for the publish responses.
494
        Forward the `PublishResult` to the matching `Subscription` by callback.
495
        """
496
        ack = None
497
        while True:
498
            try:
499
                response = await self.publish([ack] if ack else [])
500
            except BadTimeout:  # See Spec. Part 4, 7.28
501
                # Repeat without acknowledgement
502
                ack = None
503
                continue
504
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
505
                # BadNoSubscription is expected to be received after deleting the last subscription.
506
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
507
                # checking if there are no more subscriptions registered in this client (). A Publish response
508
                # could still arrive before the DeleteSubscription response.
509
                #
510
                # We could remove the callback already when sending the DeleteSubscription request,
511
                # but there are some legitimate reasons to keep them around, such as when the server
512
                # responds with "BadTimeout" and we should try again later instead of just removing
513
                # the subscription client-side.
514
                #
515
                # There are a variety of ways to act correctly, but the most practical solution seems
516
                # to be to just silently ignore any BadNoSubscription responses.
517
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
518
                # End task
519
                return
520
            except UaStructParsingError:
521
                ack = None
522
                continue
523
            subscription_id = response.Parameters.SubscriptionId
524
            if not subscription_id:
525
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
526
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
527
                # End task
528
                return
529
            try:
530
                callback = self._subscription_callbacks[subscription_id]
531
            except KeyError:
532
                self.logger.warning("Received data for unknown subscription %s active are %s", subscription_id, self._subscription_callbacks.keys())
533
            else:
534
                try:
535
                    if asyncio.iscoroutinefunction(callback):
536
                        await callback(response.Parameters)
537
                    else:
538
                        callback(response.Parameters)
539
                except Exception:  # we call user code, catch everything!
540
                    self.logger.exception("Exception while calling user callback: %s")
541
            # Repeat with acknowledgement
542
            if response.Parameters.NotificationMessage.NotificationData:
543
                ack = ua.SubscriptionAcknowledgement()
544
                ack.SubscriptionId = subscription_id
545
                ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
546
            else:
547
                ack = None
548
549
    async def create_monitored_items(self, params):
550
        self.logger.info("create_monitored_items")
551
        request = ua.CreateMonitoredItemsRequest()
552
        request.Parameters = params
553
        data = await self.protocol.send_request(request)
554
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
555
        self.logger.debug(response)
556
        response.ResponseHeader.ServiceResult.check()
557
        return response.Results
558
559
    async def delete_monitored_items(self, params):
560
        self.logger.info("delete_monitored_items")
561
        request = ua.DeleteMonitoredItemsRequest()
562
        request.Parameters = params
563
        data = await self.protocol.send_request(request)
564
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
565
        self.logger.debug(response)
566
        response.ResponseHeader.ServiceResult.check()
567
        return response.Results
568
569
    async def add_nodes(self, nodestoadd):
570
        self.logger.info("add_nodes")
571
        request = ua.AddNodesRequest()
572
        request.Parameters.NodesToAdd = nodestoadd
573
        data = await self.protocol.send_request(request)
574
        response = struct_from_binary(ua.AddNodesResponse, data)
575
        self.logger.debug(response)
576
        response.ResponseHeader.ServiceResult.check()
577
        return response.Results
578
579
    async def add_references(self, refs):
580
        self.logger.info("add_references")
581
        request = ua.AddReferencesRequest()
582
        request.Parameters.ReferencesToAdd = refs
583
        data = await self.protocol.send_request(request)
584
        response = struct_from_binary(ua.AddReferencesResponse, data)
585
        self.logger.debug(response)
586
        response.ResponseHeader.ServiceResult.check()
587
        return response.Results
588
589
    async def delete_references(self, refs):
590
        self.logger.info("delete")
591
        request = ua.DeleteReferencesRequest()
592
        request.Parameters.ReferencesToDelete = refs
593
        data = await self.protocol.send_request(request)
594
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
595
        self.logger.debug(response)
596
        response.ResponseHeader.ServiceResult.check()
597
        return response.Parameters.Results
598
599
    async def delete_nodes(self, params):
600
        self.logger.info("delete_nodes")
601
        request = ua.DeleteNodesRequest()
602
        request.Parameters = params
603
        data = await self.protocol.send_request(request)
604
        response = struct_from_binary(ua.DeleteNodesResponse, data)
605
        self.logger.debug(response)
606
        response.ResponseHeader.ServiceResult.check()
607
        return response.Results
608
609
    async def call(self, methodstocall):
610
        request = ua.CallRequest()
611
        request.Parameters.MethodsToCall = methodstocall
612
        data = await self.protocol.send_request(request)
613
        response = struct_from_binary(ua.CallResponse, data)
614
        self.logger.debug(response)
615
        response.ResponseHeader.ServiceResult.check()
616
        return response.Results
617
618
    async def history_read(self, params):
619
        self.logger.info("history_read")
620
        request = ua.HistoryReadRequest()
621
        request.Parameters = params
622
        data = await self.protocol.send_request(request)
623
        response = struct_from_binary(ua.HistoryReadResponse, data)
624
        self.logger.debug(response)
625
        response.ResponseHeader.ServiceResult.check()
626
        return response.Results
627
628
    async def modify_monitored_items(self, params):
629
        self.logger.info("modify_monitored_items")
630
        request = ua.ModifyMonitoredItemsRequest()
631
        request.Parameters = params
632
        data = await self.protocol.send_request(request)
633
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
634
        self.logger.debug(response)
635
        response.ResponseHeader.ServiceResult.check()
636
        return response.Results
637
638
    async def register_nodes(self, nodes):
639
        self.logger.info("register_nodes")
640
        request = ua.RegisterNodesRequest()
641
        request.Parameters.NodesToRegister = nodes
642
        data = await self.protocol.send_request(request)
643
        response = struct_from_binary(ua.RegisterNodesResponse, data)
644
        self.logger.debug(response)
645
        response.ResponseHeader.ServiceResult.check()
646
        return response.Parameters.RegisteredNodeIds
647
648
    async def unregister_nodes(self, nodes):
649
        self.logger.info("unregister_nodes")
650
        request = ua.UnregisterNodesRequest()
651
        request.Parameters.NodesToUnregister = nodes
652
        data = await self.protocol.send_request(request)
653
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
654
        self.logger.debug(response)
655
        response.ResponseHeader.ServiceResult.check()
656
        # nothing to return for this service
657
658
    async def read_attributes(self, nodeids, attr):
659
        self.logger.info("read_attributes of several nodes")
660
        request = ua.ReadRequest()
661
        for nodeid in nodeids:
662
            rv = ua.ReadValueId()
663
            rv.NodeId = nodeid
664
            rv.AttributeId = attr
665
            request.Parameters.NodesToRead.append(rv)
666
        data = await self.protocol.send_request(request)
667
        response = struct_from_binary(ua.ReadResponse, data)
668
        response.ResponseHeader.ServiceResult.check()
669
        return response.Results
670
671
    async def write_attributes(self, nodeids, datavalues, attributeid=ua.AttributeIds.Value):
672
        """
673
        Set an attribute of multiple nodes
674
        datavalue is a ua.DataValue object
675
        """
676
        self.logger.info("write_attributes of several nodes")
677
        request = ua.WriteRequest()
678
        for idx, nodeid in enumerate(nodeids):
679
            attr = ua.WriteValue()
680
            attr.NodeId = nodeid
681
            attr.AttributeId = attributeid
682
            attr.Value = datavalues[idx]
683
            request.Parameters.NodesToWrite.append(attr)
684
        data = await self.protocol.send_request(request)
685
        response = struct_from_binary(ua.WriteResponse, data)
686
        response.ResponseHeader.ServiceResult.check()
687
        return response.Results
688
689
    async def set_monitoring_mode(self, params) -> ua.uatypes.StatusCode:
690
        """
691
        Update the subscription monitoring mode
692
        """
693
        self.logger.info("set_monitoring_mode")
694
        request = ua.SetMonitoringModeRequest()
695
        request.Parameters = params
696
        data = await self.protocol.send_request(request)
697
        response = struct_from_binary(ua.SetMonitoringModeResponse, data)
698
        self.logger.debug(response)
699
        response.ResponseHeader.ServiceResult.check()
700
        return response.Parameters.Results
701
702
    async def set_publishing_mode(self, params) -> ua.uatypes.StatusCode:
703
        """
704
        Update the subscription publishing mode
705
        """
706
        self.logger.info("set_publishing_mode")
707
        request = ua.SetPublishingModeRequest()
708
        request.Parameters = params
709
        data = await self.protocol.send_request(request)
710
        response = struct_from_binary(ua.SetPublishingModeResponse, data)
711
        self.logger.debug(response)
712
        response.ResponseHeader.ServiceResult.check()
713
        return response.Parameters.Results
714