Passed
Push — master ( f653d6...f61d2b )
by Olivier
03:00
created

asyncua.client.ua_client.UaClient._publish_loop()   D

Complexity

Conditions 12

Size

Total Lines 60
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 34
nop 1
dl 0
loc 60
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like asyncua.client.ua_client.UaClient._publish_loop() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        """
25
        :param timeout: Timeout in seconds
26
        :param security_policy: Security policy (optional)
27
        :param loop: Event loop (optional)
28
        """
29
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
30
        self.loop = loop or asyncio.get_event_loop()
31
        self.transport = None
32
        self.receive_buffer: Optional[bytes] = None
33
        self.is_receiving = False
34
        self.timeout = timeout
35
        self.authentication_token = ua.NodeId()
36
        self._request_id = 0
37
        self._request_handle = 0
38
        self._callbackmap: Dict[int, asyncio.Future] = {}
39
        self._connection = SecureConnection(security_policy)
40
        self.state = self.INITIALIZED
41
        self.closed: bool = False
42
43
    def connection_made(self, transport: asyncio.Transport):
44
        self.state = self.OPEN
45
        self.transport = transport
46
47
    def connection_lost(self, exc):
48
        self.logger.info("Socket has closed connection")
49
        self.state = self.CLOSED
50
        self.transport = None
51
52
    def data_received(self, data: bytes):
53
        if self.receive_buffer:
54
            data = self.receive_buffer + data
55
            self.receive_buffer = None
56
        self._process_received_data(data)
57
58
    def _process_received_data(self, data: bytes):
59
        """
60
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
61
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
62
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
63
        """
64
        buf = ua.utils.Buffer(data)
65
        while True:
66
            try:
67
                try:
68
                    header = header_from_binary(buf)
69
                except ua.utils.NotEnoughData:
70
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
71
                    self.receive_buffer = data
72
                    return
73
                if len(buf) < header.body_size:
74
                    self.logger.debug(
75
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
76
                    )
77
                    self.receive_buffer = data
78
                    return
79
                msg = self._connection.receive_from_header_and_body(header, buf)
80
                self._process_received_message(msg)
81
                if not buf:
82
                    return
83
                # Buffer still has bytes left, try to process again
84
                data = bytes(buf)
85
            except Exception:
86
                self.logger.exception('Exception raised while parsing message from server')
87
                self.disconnect_socket()
88
                return
89
90
    def _process_received_message(self, msg):
91
        if msg is None:
92
            pass
93
        elif isinstance(msg, ua.Message):
94
            self._call_callback(msg.request_id(), msg.body())
95
        elif isinstance(msg, ua.Acknowledge):
96
            self._call_callback(0, msg)
97
        elif isinstance(msg, ua.ErrorMessage):
98
            self.logger.fatal("Received an error: %r", msg)
99
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
100
        else:
101
            raise ua.UaError(f"Unsupported message type: {msg}")
102
103
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
104
        """
105
        Send request to server, lower-level method.
106
        Timeout is the timeout written in ua header.
107
        :param request: Request
108
        :param timeout: Timeout in seconds
109
        :param message_type: UA Message Type (optional)
110
        :return: Future that resolves with the Response
111
        """
112
        request.RequestHeader = self._create_request_header(timeout)
113
        self.logger.debug('Sending: %s', request)
114
        try:
115
            binreq = struct_to_binary(request)
116
        except Exception:
117
            # reset request handle if any error
118
            # see self._create_request_header
119
            self._request_handle -= 1
120
            raise
121
        self._request_id += 1
122
        future = self.loop.create_future()
123
        self._callbackmap[self._request_id] = future
124
125
        # Change to the new security token if the connection has been renewed.
126
        if self._connection.next_security_token.TokenId != 0:
127
            self._connection.revolve_tokens()
128
129
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
130
        self.transport.write(msg)
131
        return future
132
133
    async def send_request(self, request, timeout=None, message_type=ua.MessageType.SecureMessage):
134
        """
135
        Send a request to the server.
136
        Timeout is the timeout written in ua header.
137
        Returns response object if no callback is provided.
138
        """
139
        timeout = self.timeout if timeout is None else timeout
140
        try:
141
            data = await asyncio.wait_for(
142
                self._send_request(request, timeout, message_type),
143
                timeout if timeout else None
144
            )
145
        except Exception:
146
            if self.state != self.OPEN:
147
                raise ConnectionError("Connection is closed") from None
148
149
            raise
150
151
        self.check_answer(data, f" in response to {request.__class__.__name__}")
152
        return data
153
154
    def check_answer(self, data, context):
155
        data = data.copy()
156
        typeid = nodeid_from_binary(data)
157
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
158
            self.logger.warning("ServiceFault from server received %s", context)
159
            hdr = struct_from_binary(ua.ResponseHeader, data)
160
            hdr.ServiceResult.check()
161
            return False
162
        return True
163
164
    def _call_callback(self, request_id, body):
165
        try:
166
            self._callbackmap[request_id].set_result(body)
167
        except KeyError:
168
            raise ua.UaError(
169
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
170
            )
171
        except asyncio.InvalidStateError:
172
            if not self.closed:
173
                raise ua.UaError(f"Future for request id {request_id} is already done")
174
            self.logger.debug("Future for request id %s not handled due to disconnect", request_id)
175
        del self._callbackmap[request_id]
176
177
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
178
        """
179
        :param timeout: Timeout in seconds
180
        :return: Request header
181
        """
182
        hdr = ua.RequestHeader()
183
        hdr.AuthenticationToken = self.authentication_token
184
        self._request_handle += 1
185
        hdr.RequestHandle = self._request_handle
186
        hdr.TimeoutHint = timeout * 1000
187
        return hdr
188
189
    def disconnect_socket(self):
190
        self.logger.info("Request to close socket received")
191
        if self.transport:
192
            self.transport.close()
193
        else:
194
            self.logger.warning("disconnect_socket was called but transport is None")
195
196
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
197
        hello = ua.Hello()
198
        hello.EndpointUrl = url
199
        hello.MaxMessageSize = max_messagesize
200
        hello.MaxChunkCount = max_chunkcount
201
        ack = asyncio.Future()
202
        self._callbackmap[0] = ack
203
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
204
        return await asyncio.wait_for(ack, self.timeout)
205
206
    async def open_secure_channel(self, params):
207
        self.logger.info("open_secure_channel")
208
        request = ua.OpenSecureChannelRequest()
209
        request.Parameters = params
210
        result = await asyncio.wait_for(
211
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
212
            self.timeout
213
        )
214
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
215
        response.ResponseHeader.ServiceResult.check()
216
        self._connection.set_channel(response.Parameters, params.RequestType, params.ClientNonce)
217
        return response.Parameters
218
219
    async def close_secure_channel(self):
220
        """
221
        Close secure channel.
222
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
223
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
224
        and should just close socket.
225
        """
226
        self.logger.info("close_secure_channel")
227
        request = ua.CloseSecureChannelRequest()
228
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
229
        # don't expect any more answers
230
        future.cancel()
231
        self._callbackmap.clear()
232
        # some servers send a response here, most do not ... so we ignore
233
234
235
class UaClient:
236
    """
237
    low level OPC-UA client.
238
239
    It implements (almost) all methods defined in asyncua spec
240
    taking in argument the structures defined in asyncua spec.
241
242
    In this Python implementation  most of the structures are defined in
243
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
244
    """
245
246
    def __init__(self, timeout=1, loop=None):
247
        """
248
        :param timeout: Timout in seconds
249
        :param loop: Event loop (optional)
250
        """
251
        self.logger = logging.getLogger(f'{__name__}.UaClient')
252
        self.loop = loop or asyncio.get_event_loop()
253
        self._subscription_callbacks = {}
254
        self._timeout = timeout
255
        self.security_policy = ua.SecurityPolicy()
256
        self.protocol: Optional[UASocketProtocol] = None
257
        self._publish_task = None
258
259
    def set_security(self, policy: ua.SecurityPolicy):
260
        self.security_policy = policy
261
262
    def _make_protocol(self):
263
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
264
        return self.protocol
265
266
    async def connect_socket(self, host: str, port: int):
267
        """Connect to server socket."""
268
        self.logger.info("opening connection")
269
        await self.loop.create_connection(self._make_protocol, host, port)
270
271
    def disconnect_socket(self):
272
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
273
            self.logger.warning("disconnect_socket was called but connection is closed")
274
            return None
275
        return self.protocol.disconnect_socket()
276
277
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
278
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
279
280
    async def open_secure_channel(self, params):
281
        return await self.protocol.open_secure_channel(params)
282
283
    async def close_secure_channel(self):
284
        """
285
        close secure channel. It seems to trigger a shutdown of socket
286
        in most servers, so be prepare to reconnect
287
        """
288
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
289
            self.logger.warning("close_secure_channel was called but connection is closed")
290
            return
291
        return await self.protocol.close_secure_channel()
292
293
    async def create_session(self, parameters):
294
        self.logger.info("create_session")
295
        self.protocol.closed = False
296
        request = ua.CreateSessionRequest()
297
        request.Parameters = parameters
298
        data = await self.protocol.send_request(request)
299
        response = struct_from_binary(ua.CreateSessionResponse, data)
300
        self.logger.debug(response)
301
        response.ResponseHeader.ServiceResult.check()
302
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
303
        return response.Parameters
304
305
    async def activate_session(self, parameters):
306
        self.logger.info("activate_session")
307
        request = ua.ActivateSessionRequest()
308
        request.Parameters = parameters
309
        data = await self.protocol.send_request(request)
310
        response = struct_from_binary(ua.ActivateSessionResponse, data)
311
        self.logger.debug(response)
312
        response.ResponseHeader.ServiceResult.check()
313
        return response.Parameters
314
315
    async def close_session(self, delete_subscriptions):
316
        self.logger.info("close_session")
317
        self.protocol.closed = True
318
        if self._publish_task and not self._publish_task.done():
319
            self._publish_task.cancel()
320
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
321
            self.logger.warning("close_session was called but connection is closed")
322
            return
323
        request = ua.CloseSessionRequest()
324
        request.DeleteSubscriptions = delete_subscriptions
325
        data = await self.protocol.send_request(request)
326
        response = struct_from_binary(ua.CloseSessionResponse, data)
327
        try:
328
            response.ResponseHeader.ServiceResult.check()
329
        except BadSessionClosed:
330
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
331
            #          we can just ignore it therefore.
332
            #          Alternatively we could make sure that there are no publish requests in flight when
333
            #          closing the session.
334
            pass
335
336
    async def browse(self, parameters):
337
        self.logger.info("browse")
338
        request = ua.BrowseRequest()
339
        request.Parameters = parameters
340
        data = await self.protocol.send_request(request)
341
        response = struct_from_binary(ua.BrowseResponse, data)
342
        self.logger.debug(response)
343
        response.ResponseHeader.ServiceResult.check()
344
        return response.Results
345
346
    async def browse_next(self, parameters):
347
        self.logger.debug("browse next")
348
        request = ua.BrowseNextRequest()
349
        request.Parameters = parameters
350
        data = await self.protocol.send_request(request)
351
        response = struct_from_binary(ua.BrowseNextResponse, data)
352
        self.logger.debug(response)
353
        response.ResponseHeader.ServiceResult.check()
354
        return response.Parameters.Results
355
356
    async def read(self, parameters):
357
        self.logger.debug("read")
358
        request = ua.ReadRequest()
359
        request.Parameters = parameters
360
        data = await self.protocol.send_request(request)
361
        response = struct_from_binary(ua.ReadResponse, data)
362
        self.logger.debug(response)
363
        response.ResponseHeader.ServiceResult.check()
364
        # cast to Enum attributes that need to
365
        for idx, rv in enumerate(parameters.NodesToRead):
366
            if rv.AttributeId == ua.AttributeIds.NodeClass:
367
                dv = response.Results[idx]
368
                if dv.StatusCode.is_good():
369
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
370
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
371
                dv = response.Results[idx]
372
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
373
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
374
        return response.Results
375
376
    async def write(self, params):
377
        self.logger.debug("write")
378
        request = ua.WriteRequest()
379
        request.Parameters = params
380
        data = await self.protocol.send_request(request)
381
        response = struct_from_binary(ua.WriteResponse, data)
382
        self.logger.debug(response)
383
        response.ResponseHeader.ServiceResult.check()
384
        return response.Results
385
386
    async def get_endpoints(self, params):
387
        self.logger.debug("get_endpoint")
388
        request = ua.GetEndpointsRequest()
389
        request.Parameters = params
390
        data = await self.protocol.send_request(request)
391
        response = struct_from_binary(ua.GetEndpointsResponse, data)
392
        self.logger.debug(response)
393
        response.ResponseHeader.ServiceResult.check()
394
        return response.Endpoints
395
396
    async def find_servers(self, params):
397
        self.logger.debug("find_servers")
398
        request = ua.FindServersRequest()
399
        request.Parameters = params
400
        data = await self.protocol.send_request(request)
401
        response = struct_from_binary(ua.FindServersResponse, data)
402
        self.logger.debug(response)
403
        response.ResponseHeader.ServiceResult.check()
404
        return response.Servers
405
406
    async def find_servers_on_network(self, params):
407
        self.logger.debug("find_servers_on_network")
408
        request = ua.FindServersOnNetworkRequest()
409
        request.Parameters = params
410
        data = await self.protocol.send_request(request)
411
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
412
        self.logger.debug(response)
413
        response.ResponseHeader.ServiceResult.check()
414
        return response.Parameters
415
416
    async def register_server(self, registered_server):
417
        self.logger.debug("register_server")
418
        request = ua.RegisterServerRequest()
419
        request.Server = registered_server
420
        data = await self.protocol.send_request(request)
421
        response = struct_from_binary(ua.RegisterServerResponse, data)
422
        self.logger.debug(response)
423
        response.ResponseHeader.ServiceResult.check()
424
        # nothing to return for this service
425
426
    async def register_server2(self, params):
427
        self.logger.debug("register_server2")
428
        request = ua.RegisterServer2Request()
429
        request.Parameters = params
430
        data = await self.protocol.send_request(request)
431
        response = struct_from_binary(ua.RegisterServer2Response, data)
432
        self.logger.debug(response)
433
        response.ResponseHeader.ServiceResult.check()
434
        return response.ConfigurationResults
435
436
    async def translate_browsepaths_to_nodeids(self, browse_paths):
437
        self.logger.debug("translate_browsepath_to_nodeid")
438
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
439
        request.Parameters.BrowsePaths = browse_paths
440
        data = await self.protocol.send_request(request)
441
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
442
        self.logger.debug(response)
443
        response.ResponseHeader.ServiceResult.check()
444
        return response.Results
445
446
    async def create_subscription(self, params, callback):
447
        self.logger.debug("create_subscription")
448
        request = ua.CreateSubscriptionRequest()
449
        request.Parameters = params
450
        data = await self.protocol.send_request(request)
451
        response = struct_from_binary(
452
            ua.CreateSubscriptionResponse,
453
            data
454
        )
455
        response.ResponseHeader.ServiceResult.check()
456
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
457
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
458
        if not self._publish_task or self._publish_task.done():
459
            # Start the publish loop if it is not yet running
460
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
461
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
462
            # could be used if necessary.
463
            self._publish_task = self.loop.create_task(self._publish_loop())
464
        return response.Parameters
465
466
    async def delete_subscriptions(self, subscription_ids):
467
        self.logger.debug("delete_subscriptions %r", subscription_ids)
468
        request = ua.DeleteSubscriptionsRequest()
469
        request.Parameters.SubscriptionIds = subscription_ids
470
        data = await self.protocol.send_request(request)
471
        response = struct_from_binary(
472
            ua.DeleteSubscriptionsResponse,
473
            data
474
        )
475
        response.ResponseHeader.ServiceResult.check()
476
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
477
        for sid in subscription_ids:
478
            self._subscription_callbacks.pop(sid)
479
        return response.Results
480
481
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
482
        """
483
        Send a PublishRequest to the server.
484
        """
485
        self.logger.debug('publish %r', acks)
486
        request = ua.PublishRequest()
487
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
488
        data = await self.protocol.send_request(request, timeout=0)
489
        self.protocol.check_answer(data, "while waiting for publish response")
490
        try:
491
            response = struct_from_binary(ua.PublishResponse, data)
492
        except Exception:
493
            self.logger.exception("Error parsing notification from server")
494
            raise UaStructParsingError
495
        return response
496
497
    async def _publish_loop(self):
498
        """
499
        Start a loop that sends a publish requests and waits for the publish responses.
500
        Forward the `PublishResult` to the matching `Subscription` by callback.
501
        """
502
        ack = None
503
        while True:
504
            try:
505
                response = await self.publish([ack] if ack else [])
506
            except BadTimeout:  # See Spec. Part 4, 7.28
507
                # Repeat without acknowledgement
508
                ack = None
509
                continue
510
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
511
                # BadNoSubscription is expected to be received after deleting the last subscription.
512
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
513
                # checking if there are no more subscriptions registered in this client (). A Publish response
514
                # could still arrive before the DeleteSubscription response.
515
                #
516
                # We could remove the callback already when sending the DeleteSubscription request,
517
                # but there are some legitimate reasons to keep them around, such as when the server
518
                # responds with "BadTimeout" and we should try again later instead of just removing
519
                # the subscription client-side.
520
                #
521
                # There are a variety of ways to act correctly, but the most practical solution seems
522
                # to be to just silently ignore any BadNoSubscription responses.
523
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
524
                # End task
525
                return
526
            except UaStructParsingError:
527
                ack = None
528
                continue
529
            subscription_id = response.Parameters.SubscriptionId
530
            if not subscription_id:
531
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
532
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
533
                # End task
534
                return
535
            try:
536
                callback = self._subscription_callbacks[subscription_id]
537
            except KeyError:
538
                self.logger.warning(
539
                    "Received data for unknown subscription %s active are %s", subscription_id,
540
                    self._subscription_callbacks.keys()
541
                )
542
            else:
543
                try:
544
                    if asyncio.iscoroutinefunction(callback):
545
                        await callback(response.Parameters)
546
                    else:
547
                        callback(response.Parameters)
548
                except Exception:  # we call user code, catch everything!
549
                    self.logger.exception("Exception while calling user callback: %s")
550
            # Repeat with acknowledgement
551
            if response.Parameters.NotificationMessage.NotificationData:
552
                ack = ua.SubscriptionAcknowledgement()
553
                ack.SubscriptionId = subscription_id
554
                ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
555
            else:
556
                ack = None
557
558
    async def create_monitored_items(self, params):
559
        self.logger.info("create_monitored_items")
560
        request = ua.CreateMonitoredItemsRequest()
561
        request.Parameters = params
562
        data = await self.protocol.send_request(request)
563
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
564
        self.logger.debug(response)
565
        response.ResponseHeader.ServiceResult.check()
566
        return response.Results
567
568
    async def delete_monitored_items(self, params):
569
        self.logger.info("delete_monitored_items")
570
        request = ua.DeleteMonitoredItemsRequest()
571
        request.Parameters = params
572
        data = await self.protocol.send_request(request)
573
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
574
        self.logger.debug(response)
575
        response.ResponseHeader.ServiceResult.check()
576
        return response.Results
577
578
    async def add_nodes(self, nodestoadd):
579
        self.logger.info("add_nodes")
580
        request = ua.AddNodesRequest()
581
        request.Parameters.NodesToAdd = nodestoadd
582
        data = await self.protocol.send_request(request)
583
        response = struct_from_binary(ua.AddNodesResponse, data)
584
        self.logger.debug(response)
585
        response.ResponseHeader.ServiceResult.check()
586
        return response.Results
587
588
    async def add_references(self, refs):
589
        self.logger.info("add_references")
590
        request = ua.AddReferencesRequest()
591
        request.Parameters.ReferencesToAdd = refs
592
        data = await self.protocol.send_request(request)
593
        response = struct_from_binary(ua.AddReferencesResponse, data)
594
        self.logger.debug(response)
595
        response.ResponseHeader.ServiceResult.check()
596
        return response.Results
597
598
    async def delete_references(self, refs):
599
        self.logger.info("delete")
600
        request = ua.DeleteReferencesRequest()
601
        request.Parameters.ReferencesToDelete = refs
602
        data = await self.protocol.send_request(request)
603
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
604
        self.logger.debug(response)
605
        response.ResponseHeader.ServiceResult.check()
606
        return response.Parameters.Results
607
608
    async def delete_nodes(self, params):
609
        self.logger.info("delete_nodes")
610
        request = ua.DeleteNodesRequest()
611
        request.Parameters = params
612
        data = await self.protocol.send_request(request)
613
        response = struct_from_binary(ua.DeleteNodesResponse, data)
614
        self.logger.debug(response)
615
        response.ResponseHeader.ServiceResult.check()
616
        return response.Results
617
618
    async def call(self, methodstocall):
619
        request = ua.CallRequest()
620
        request.Parameters.MethodsToCall = methodstocall
621
        data = await self.protocol.send_request(request)
622
        response = struct_from_binary(ua.CallResponse, data)
623
        self.logger.debug(response)
624
        response.ResponseHeader.ServiceResult.check()
625
        return response.Results
626
627
    async def history_read(self, params):
628
        self.logger.info("history_read")
629
        request = ua.HistoryReadRequest()
630
        request.Parameters = params
631
        data = await self.protocol.send_request(request)
632
        response = struct_from_binary(ua.HistoryReadResponse, data)
633
        self.logger.debug(response)
634
        response.ResponseHeader.ServiceResult.check()
635
        return response.Results
636
637
    async def modify_monitored_items(self, params):
638
        self.logger.info("modify_monitored_items")
639
        request = ua.ModifyMonitoredItemsRequest()
640
        request.Parameters = params
641
        data = await self.protocol.send_request(request)
642
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
643
        self.logger.debug(response)
644
        response.ResponseHeader.ServiceResult.check()
645
        return response.Results
646
647
    async def register_nodes(self, nodes):
648
        self.logger.info("register_nodes")
649
        request = ua.RegisterNodesRequest()
650
        request.Parameters.NodesToRegister = nodes
651
        data = await self.protocol.send_request(request)
652
        response = struct_from_binary(ua.RegisterNodesResponse, data)
653
        self.logger.debug(response)
654
        response.ResponseHeader.ServiceResult.check()
655
        return response.Parameters.RegisteredNodeIds
656
657
    async def unregister_nodes(self, nodes):
658
        self.logger.info("unregister_nodes")
659
        request = ua.UnregisterNodesRequest()
660
        request.Parameters.NodesToUnregister = nodes
661
        data = await self.protocol.send_request(request)
662
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
663
        self.logger.debug(response)
664
        response.ResponseHeader.ServiceResult.check()
665
        # nothing to return for this service
666
667
    async def read_attributes(self, nodeids, attr):
668
        self.logger.info("read_attributes of several nodes")
669
        request = ua.ReadRequest()
670
        for nodeid in nodeids:
671
            rv = ua.ReadValueId()
672
            rv.NodeId = nodeid
673
            rv.AttributeId = attr
674
            request.Parameters.NodesToRead.append(rv)
675
        data = await self.protocol.send_request(request)
676
        response = struct_from_binary(ua.ReadResponse, data)
677
        response.ResponseHeader.ServiceResult.check()
678
        return response.Results
679
680
    async def write_attributes(self, nodeids, datavalues, attributeid=ua.AttributeIds.Value):
681
        """
682
        Set an attribute of multiple nodes
683
        datavalue is a ua.DataValue object
684
        """
685
        self.logger.info("write_attributes of several nodes")
686
        request = ua.WriteRequest()
687
        for idx, nodeid in enumerate(nodeids):
688
            attr = ua.WriteValue()
689
            attr.NodeId = nodeid
690
            attr.AttributeId = attributeid
691
            attr.Value = datavalues[idx]
692
            request.Parameters.NodesToWrite.append(attr)
693
        data = await self.protocol.send_request(request)
694
        response = struct_from_binary(ua.WriteResponse, data)
695
        response.ResponseHeader.ServiceResult.check()
696
        return response.Results
697
698
699