Passed
Pull Request — master (#45)
by
unknown
02:29
created

asyncua.client.ua_client.UaClient.browse()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 2
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
25
        self.loop = loop or asyncio.get_event_loop()
26
        self.transport = None
27
        self.receive_buffer: Optional[bytes] = None
28
        self.is_receiving = False
29
        self.timeout = timeout
30
        self.authentication_token = ua.NodeId()
31
        self._request_id = 0
32
        self._request_handle = 0
33
        self._callbackmap: Dict[int, asyncio.Future] = {}
34
        self._connection = SecureConnection(security_policy)
35
        self.state = self.INITIALIZED
36
37
    def connection_made(self, transport: asyncio.Transport):
38
        self.state = self.OPEN
39
        self.transport = transport
40
41
    def connection_lost(self, exc):
42
        self.logger.info("Socket has closed connection")
43
        self.state = self.CLOSED
44
        self.transport = None
45
46
    def data_received(self, data: bytes):
47
        if self.receive_buffer:
48
            data = self.receive_buffer + data
49
            self.receive_buffer = None
50
        self._process_received_data(data)
51
52
    def _process_received_data(self, data: bytes):
53
        """
54
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
55
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
56
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
57
        """
58
        buf = ua.utils.Buffer(data)
59
        while True:
60
            try:
61
                try:
62
                    header = header_from_binary(buf)
63
                except ua.utils.NotEnoughData:
64
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
65
                    self.receive_buffer = data
66
                    return
67
                if len(buf) < header.body_size:
68
                    self.logger.debug(
69
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
70
                    )
71
                    self.receive_buffer = data
72
                    return
73
                msg = self._connection.receive_from_header_and_body(header, buf)
74
                self._process_received_message(msg)
75
                if not buf:
76
                    return
77
            except Exception:
78
                self.logger.exception('Exception raised while parsing message from server')
79
                self.disconnect_socket()
80
                return
81
82
    def _process_received_message(self, msg):
83
        if msg is None:
84
            pass
85
        elif isinstance(msg, ua.Message):
86
            self._call_callback(msg.request_id(), msg.body())
87
        elif isinstance(msg, ua.Acknowledge):
88
            self._call_callback(0, msg)
89
        elif isinstance(msg, ua.ErrorMessage):
90
            self.logger.fatal("Received an error: %r", msg)
91
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
92
        else:
93
            raise ua.UaError(f"Unsupported message type: {msg}")
94
95
    def _send_request(self, request, timeout=1000, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
96
        """
97
        Send request to server, lower-level method.
98
        Timeout is the timeout written in ua header.
99
        Returns future
100
        """
101
        request.RequestHeader = self._create_request_header(timeout)
102
        self.logger.debug('Sending: %s', request)
103
        try:
104
            binreq = struct_to_binary(request)
105
        except Exception:
106
            # reset request handle if any error
107
            # see self._create_request_header
108
            self._request_handle -= 1
109
            raise
110
        self._request_id += 1
111
        future = self.loop.create_future()
112
        self._callbackmap[self._request_id] = future
113
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
114
        self.transport.write(msg)
115
        return future
116
117
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
118
        """
119
        Send a request to the server.
120
        Timeout is the timeout written in ua header.
121
        Returns response object if no callback is provided.
122
        """
123
        data = await asyncio.wait_for(
124
            self._send_request(request, timeout, message_type),
125
            timeout if timeout else None
126
        )
127
        self.check_answer(data, f" in response to {request.__class__.__name__}")
128
        return data
129
130
    def check_answer(self, data, context):
131
        data = data.copy()
132
        typeid = nodeid_from_binary(data)
133
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
134
            self.logger.warning("ServiceFault from server received %s", context)
135
            hdr = struct_from_binary(ua.ResponseHeader, data)
136
            hdr.ServiceResult.check()
137
            return False
138
        return True
139
140
    def _call_callback(self, request_id, body):
141
        try:
142
            self._callbackmap[request_id].set_result(body)
143
        except KeyError:
144
            raise ua.UaError(
145
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
146
            )
147
        except asyncio.InvalidStateError:
148
            raise ua.UaError(f"Future for request id {request_id} is already done")
149
150
    def _create_request_header(self, timeout=1000):
151
        hdr = ua.RequestHeader()
152
        hdr.AuthenticationToken = self.authentication_token
153
        self._request_handle += 1
154
        hdr.RequestHandle = self._request_handle
155
        hdr.TimeoutHint = timeout
156
        return hdr
157
158
    def disconnect_socket(self):
159
        self.logger.info("Request to close socket received")
160
        if self.transport:
161
            self.transport.close()
162
        else:
163
            self.logger.warning("disconnect_socket was called but transport is None")
164
165
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
166
        hello = ua.Hello()
167
        hello.EndpointUrl = url
168
        hello.MaxMessageSize = max_messagesize
169
        hello.MaxChunkCount = max_chunkcount
170
        ack = asyncio.Future()
171
        self._callbackmap[0] = ack
172
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
173
        return await asyncio.wait_for(ack, self.timeout)
174
175
    async def open_secure_channel(self, params):
176
        self.logger.info("open_secure_channel")
177
        request = ua.OpenSecureChannelRequest()
178
        request.Parameters = params
179
        result = await asyncio.wait_for(
180
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
181
            self.timeout
182
        )
183
        # FIXME: we have a race condition here
184
        # we can get a packet with the new token id before we reach to store it..
185
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
186
        response.ResponseHeader.ServiceResult.check()
187
        self._connection.set_channel(response.Parameters)
188
        return response.Parameters
189
190
    async def close_secure_channel(self):
191
        """
192
        Close secure channel.
193
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
194
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
195
        and should just close socket.
196
        """
197
        self.logger.info("close_secure_channel")
198
        request = ua.CloseSecureChannelRequest()
199
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
200
        # don't expect any more answers
201
        future.cancel()
202
        self._callbackmap.clear()
203
        # some servers send a response here, most do not ... so we ignore
204
205
206
class UaClient:
207
    """
208
    low level OPC-UA client.
209
210
    It implements (almost) all methods defined in asyncua spec
211
    taking in argument the structures defined in asyncua spec.
212
213
    In this Python implementation  most of the structures are defined in
214
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
215
    """
216
217
    def __init__(self, timeout=1, loop=None):
218
        self.logger = logging.getLogger(f'{__name__}.UaClient')
219
        self.loop = loop or asyncio.get_event_loop()
220
        self._subscription_callbacks = {}
221
        self._timeout = timeout
222
        self.security_policy = ua.SecurityPolicy()
223
        self.protocol: Optional[UASocketProtocol] = None
224
        self._publish_task = None
225
226
    def set_security(self, policy: ua.SecurityPolicy):
227
        self.security_policy = policy
228
229
    def _make_protocol(self):
230
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
231
        return self.protocol
232
233
    async def connect_socket(self, host: str, port: int):
234
        """Connect to server socket."""
235
        self.logger.info("opening connection")
236
        await self.loop.create_connection(self._make_protocol, host, port)
237
238
    def disconnect_socket(self):
239
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
240
            self.logger.warning("disconnect_socket was called but connection is closed")
241
            return None
242
        return self.protocol.disconnect_socket()
243
244
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
245
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
246
247
    async def open_secure_channel(self, params):
248
        return await self.protocol.open_secure_channel(params)
249
250
    async def close_secure_channel(self):
251
        """
252
        close secure channel. It seems to trigger a shutdown of socket
253
        in most servers, so be prepare to reconnect
254
        """
255
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
256
            self.logger.warning("close_secure_channel was called but connection is closed")
257
            return
258
        return await self.protocol.close_secure_channel()
259
260
    async def create_session(self, parameters):
261
        self.logger.info("create_session")
262
        request = ua.CreateSessionRequest()
263
        request.Parameters = parameters
264
        data = await self.protocol.send_request(request)
265
        response = struct_from_binary(ua.CreateSessionResponse, data)
266
        self.logger.debug(response)
267
        response.ResponseHeader.ServiceResult.check()
268
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
269
        return response.Parameters
270
271
    async def activate_session(self, parameters):
272
        self.logger.info("activate_session")
273
        request = ua.ActivateSessionRequest()
274
        request.Parameters = parameters
275
        data = await self.protocol.send_request(request)
276
        response = struct_from_binary(ua.ActivateSessionResponse, data)
277
        self.logger.debug(response)
278
        response.ResponseHeader.ServiceResult.check()
279
        return response.Parameters
280
281
    async def close_session(self, delete_subscriptions):
282
        self.logger.info("close_session")
283
        if self._publish_task and not self._publish_task.done():
284
            self._publish_task.cancel()
285
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
286
            self.logger.warning("close_session was called but connection is closed")
287
            return
288
        request = ua.CloseSessionRequest()
289
        request.DeleteSubscriptions = delete_subscriptions
290
        data = await self.protocol.send_request(request)
291
        response = struct_from_binary(ua.CloseSessionResponse, data)
292
        try:
293
            response.ResponseHeader.ServiceResult.check()
294
        except BadSessionClosed:
295
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
296
            #          we can just ignore it therefore.
297
            #          Alternatively we could make sure that there are no publish requests in flight when
298
            #          closing the session.
299
            pass
300
301
    async def browse(self, parameters):
302
        self.logger.info("browse")
303
        request = ua.BrowseRequest()
304
        request.Parameters = parameters
305
        data = await self.protocol.send_request(request)
306
        response = struct_from_binary(ua.BrowseResponse, data)
307
        self.logger.debug(response)
308
        response.ResponseHeader.ServiceResult.check()
309
        return response.Results
310
311
    async def browse_next(self, parameters):
312
        self.logger.debug("browse next")
313
        request = ua.BrowseNextRequest()
314
        request.Parameters = parameters
315
        data = await self.protocol.send_request(request)
316
        response = struct_from_binary(ua.BrowseNextResponse, data)
317
        self.logger.debug(response)
318
        response.ResponseHeader.ServiceResult.check()
319
        return response.Parameters.Results
320
321
    async def read(self, parameters):
322
        self.logger.debug("read")
323
        request = ua.ReadRequest()
324
        request.Parameters = parameters
325
        data = await self.protocol.send_request(request)
326
        response = struct_from_binary(ua.ReadResponse, data)
327
        self.logger.debug(response)
328
        response.ResponseHeader.ServiceResult.check()
329
        # cast to Enum attributes that need to
330
        for idx, rv in enumerate(parameters.NodesToRead):
331
            if rv.AttributeId == ua.AttributeIds.NodeClass:
332
                dv = response.Results[idx]
333
                if dv.StatusCode.is_good():
334
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
335
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
336
                dv = response.Results[idx]
337
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
338
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
339
        return response.Results
340
341
    async def write(self, params):
342
        self.logger.debug("write")
343
        request = ua.WriteRequest()
344
        request.Parameters = params
345
        data = await self.protocol.send_request(request)
346
        response = struct_from_binary(ua.WriteResponse, data)
347
        self.logger.debug(response)
348
        response.ResponseHeader.ServiceResult.check()
349
        return response.Results
350
351
    async def get_endpoints(self, params):
352
        self.logger.debug("get_endpoint")
353
        request = ua.GetEndpointsRequest()
354
        request.Parameters = params
355
        data = await self.protocol.send_request(request)
356
        response = struct_from_binary(ua.GetEndpointsResponse, data)
357
        self.logger.debug(response)
358
        response.ResponseHeader.ServiceResult.check()
359
        return response.Endpoints
360
361
    async def find_servers(self, params):
362
        self.logger.debug("find_servers")
363
        request = ua.FindServersRequest()
364
        request.Parameters = params
365
        data = await self.protocol.send_request(request)
366
        response = struct_from_binary(ua.FindServersResponse, data)
367
        self.logger.debug(response)
368
        response.ResponseHeader.ServiceResult.check()
369
        return response.Servers
370
371
    async def find_servers_on_network(self, params):
372
        self.logger.debug("find_servers_on_network")
373
        request = ua.FindServersOnNetworkRequest()
374
        request.Parameters = params
375
        data = await self.protocol.send_request(request)
376
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
377
        self.logger.debug(response)
378
        response.ResponseHeader.ServiceResult.check()
379
        return response.Parameters
380
381
    async def register_server(self, registered_server):
382
        self.logger.debug("register_server")
383
        request = ua.RegisterServerRequest()
384
        request.Server = registered_server
385
        data = await self.protocol.send_request(request)
386
        response = struct_from_binary(ua.RegisterServerResponse, data)
387
        self.logger.debug(response)
388
        response.ResponseHeader.ServiceResult.check()
389
        # nothing to return for this service
390
391
    async def register_server2(self, params):
392
        self.logger.debug("register_server2")
393
        request = ua.RegisterServer2Request()
394
        request.Parameters = params
395
        data = await self.protocol.send_request(request)
396
        response = struct_from_binary(ua.RegisterServer2Response, data)
397
        self.logger.debug(response)
398
        response.ResponseHeader.ServiceResult.check()
399
        return response.ConfigurationResults
400
401
    async def translate_browsepaths_to_nodeids(self, browse_paths):
402
        self.logger.debug("translate_browsepath_to_nodeid")
403
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
404
        request.Parameters.BrowsePaths = browse_paths
405
        data = await self.protocol.send_request(request)
406
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
407
        self.logger.debug(response)
408
        response.ResponseHeader.ServiceResult.check()
409
        return response.Results
410
411
    async def create_subscription(self, params, callback):
412
        self.logger.debug("create_subscription")
413
        request = ua.CreateSubscriptionRequest()
414
        request.Parameters = params
415
        data = await self.protocol.send_request(request)
416
        response = struct_from_binary(
417
            ua.CreateSubscriptionResponse,
418
            data
419
        )
420
        response.ResponseHeader.ServiceResult.check()
421
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
422
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
423
        if not self._publish_task or self._publish_task.done():
424
            # Start the publish cycle on the first subscription
425
            self._publish_task = self.loop.create_task(self._publish())
426
        return response.Parameters
427
428
    async def delete_subscriptions(self, subscription_ids):
429
        self.logger.debug("delete_subscriptions %r", subscription_ids)
430
        request = ua.DeleteSubscriptionsRequest()
431
        request.Parameters.SubscriptionIds = subscription_ids
432
        data = await self.protocol.send_request(request)
433
        response = struct_from_binary(
434
            ua.DeleteSubscriptionsResponse,
435
            data
436
        )
437
        response.ResponseHeader.ServiceResult.check()
438
        self.logger.info("remove subscription callbacks")
439
        for sid in subscription_ids:
440
            self._subscription_callbacks.pop(sid)
441
        return response.Results
442
443
    def publish(self):
444
        """
445
        This is just for maintaining symmetry.
446
        This client takes care of sending PublishRequest internally.
447
        """
448
        pass
449
450
    async def _publish(self):
451
        """
452
        Cyclically send publish request and wait for the publish response.
453
        Send the `PublishResult` to the matching `Subscription`.
454
        """
455
        ack = None
456
        while True:
457
            if not self._subscription_callbacks:
458
                # End Task if there are no more subscriptions
459
                break
460
            self.logger.debug("publish")
461
            request = ua.PublishRequest()
462
            request.Parameters.SubscriptionAcknowledgements = [ack] if ack else []
463
            data = await self.protocol.send_request(request, timeout=0)
464
            try:
465
                self.protocol.check_answer(data, "while waiting for publish response")
466
            except BadTimeout:
467
                # Spec Part 4, 7.28
468
                # Repeat without Acknowledgement
469
                ack = None
470
                continue
471
            except BadNoSubscription:  # Spec Part 5, 13.8.1
472
                # BadNoSubscription is expected after deleting the last subscription.
473
                #
474
                # We should therefore also check for len(self._publishcallbacks) == 0, but
475
                # this gets us into trouble if a Publish response arrives before the
476
                # DeleteSubscription response.
477
                #
478
                # We could remove the callback already when sending the DeleteSubscription request,
479
                # but there are some legitimate reasons to keep them around, such as when the server
480
                # responds with "BadTimeout" and we should try again later instead of just removing
481
                # the subscription client-side.
482
                #
483
                # There are a variety of ways to act correctly, but the most practical solution seems
484
                # to be to just ignore any BadNoSubscription responses.
485
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
486
                ack = None
487
                continue
488
            # parse publish response
489
            try:
490
                response = struct_from_binary(ua.PublishResponse, data)
491
                self.logger.debug(response)
492
            except Exception:
493
                # INFO: catching the exception here might be obsolete because we already
494
                #       catch BadTimeout above. However, it's not really clear what this code
495
                #       does so it stays in, doesn't seem to hurt.
496
                self.logger.exception("Error parsing notification from server")
497
                # send publish request to server so he does stop sending notifications
498
                ack = None
499
                continue
500
            # look for matching subscription callback
501
            subscription_id = response.Parameters.SubscriptionId
502
            try:
503
                callback = self._subscription_callbacks[subscription_id]
504
            except KeyError:
505
                self.logger.warning("Received data for unknown subscription %s active are %s", subscription_id,
506
                                    self._subscription_callbacks.keys())
507
            else:
508
                # do callback
509
                try:
510
                    callback(response.Parameters)
511
                except Exception:
512
                    # we call client code, catch everything!
513
                    self.logger.exception("Exception while calling user callback: %s")
514
            # Repeat with acknowledgement
515
            ack = ua.SubscriptionAcknowledgement()
516
            ack.SubscriptionId = subscription_id
517
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
518
519
    async def create_monitored_items(self, params):
520
        self.logger.info("create_monitored_items")
521
        request = ua.CreateMonitoredItemsRequest()
522
        request.Parameters = params
523
        data = await self.protocol.send_request(request)
524
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
525
        self.logger.debug(response)
526
        response.ResponseHeader.ServiceResult.check()
527
        return response.Results
528
529
    async def delete_monitored_items(self, params):
530
        self.logger.info("delete_monitored_items")
531
        request = ua.DeleteMonitoredItemsRequest()
532
        request.Parameters = params
533
        data = await self.protocol.send_request(request)
534
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
535
        self.logger.debug(response)
536
        response.ResponseHeader.ServiceResult.check()
537
        return response.Results
538
539
    async def add_nodes(self, nodestoadd):
540
        self.logger.info("add_nodes")
541
        request = ua.AddNodesRequest()
542
        request.Parameters.NodesToAdd = nodestoadd
543
        data = await self.protocol.send_request(request)
544
        response = struct_from_binary(ua.AddNodesResponse, data)
545
        self.logger.debug(response)
546
        response.ResponseHeader.ServiceResult.check()
547
        return response.Results
548
549
    async def add_references(self, refs):
550
        self.logger.info("add_references")
551
        request = ua.AddReferencesRequest()
552
        request.Parameters.ReferencesToAdd = refs
553
        data = await self.protocol.send_request(request)
554
        response = struct_from_binary(ua.AddReferencesResponse, data)
555
        self.logger.debug(response)
556
        response.ResponseHeader.ServiceResult.check()
557
        return response.Results
558
559
    async def delete_references(self, refs):
560
        self.logger.info("delete")
561
        request = ua.DeleteReferencesRequest()
562
        request.Parameters.ReferencesToDelete = refs
563
        data = await self.protocol.send_request(request)
564
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
565
        self.logger.debug(response)
566
        response.ResponseHeader.ServiceResult.check()
567
        return response.Parameters.Results
568
569
    async def delete_nodes(self, params):
570
        self.logger.info("delete_nodes")
571
        request = ua.DeleteNodesRequest()
572
        request.Parameters = params
573
        data = await self.protocol.send_request(request)
574
        response = struct_from_binary(ua.DeleteNodesResponse, data)
575
        self.logger.debug(response)
576
        response.ResponseHeader.ServiceResult.check()
577
        return response.Results
578
579
    async def call(self, methodstocall):
580
        request = ua.CallRequest()
581
        request.Parameters.MethodsToCall = methodstocall
582
        data = await self.protocol.send_request(request)
583
        response = struct_from_binary(ua.CallResponse, data)
584
        self.logger.debug(response)
585
        response.ResponseHeader.ServiceResult.check()
586
        return response.Results
587
588
    async def history_read(self, params):
589
        self.logger.info("history_read")
590
        request = ua.HistoryReadRequest()
591
        request.Parameters = params
592
        data = await self.protocol.send_request(request)
593
        response = struct_from_binary(ua.HistoryReadResponse, data)
594
        self.logger.debug(response)
595
        response.ResponseHeader.ServiceResult.check()
596
        return response.Results
597
598
    async def modify_monitored_items(self, params):
599
        self.logger.info("modify_monitored_items")
600
        request = ua.ModifyMonitoredItemsRequest()
601
        request.Parameters = params
602
        data = await self.protocol.send_request(request)
603
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
604
        self.logger.debug(response)
605
        response.ResponseHeader.ServiceResult.check()
606
        return response.Results
607
608
    async def register_nodes(self, nodes):
609
        self.logger.info("register_nodes")
610
        request = ua.RegisterNodesRequest()
611
        request.Parameters.NodesToRegister = nodes
612
        data = await self.protocol.send_request(request)
613
        response = struct_from_binary(ua.RegisterNodesResponse, data)
614
        self.logger.debug(response)
615
        response.ResponseHeader.ServiceResult.check()
616
        return response.Parameters.RegisteredNodeIds
617
618
    async def unregister_nodes(self, nodes):
619
        self.logger.info("unregister_nodes")
620
        request = ua.UnregisterNodesRequest()
621
        request.Parameters.NodesToUnregister = nodes
622
        data = await self.protocol.send_request(request)
623
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
624
        self.logger.debug(response)
625
        response.ResponseHeader.ServiceResult.check()
626
        # nothing to return for this service
627
628
    async def get_attribute(self, nodes, attr):
629
        self.logger.info("get_attribute")
630
        request = ua.ReadRequest()
631
        for node in nodes:
632
            rv = ua.ReadValueId()
633
            rv.NodeId = node
634
            rv.AttributeId = attr
635
            request.Parameters.NodesToRead.append(rv)
636
        data = await self.protocol.send_request(request)
637
        response = struct_from_binary(ua.ReadResponse, data)
638
        response.ResponseHeader.ServiceResult.check()
639
        return response.Results
640