Passed
Pull Request — master (#45)
by
unknown
01:58
created

UASocketProtocol._process_received_message()   A

Complexity

Conditions 5

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 11
nop 2
dl 0
loc 12
rs 9.3333
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
25
        self.loop = loop or asyncio.get_event_loop()
26
        self.transport = None
27
        self.receive_buffer: Optional[bytes] = None
28
        self.is_receiving = False
29
        self.timeout = timeout
30
        self.authentication_token = ua.NodeId()
31
        self._request_id = 0
32
        self._request_handle = 0
33
        self._callbackmap: Dict[int, asyncio.Future] = {}
34
        self._connection = SecureConnection(security_policy)
35
        self.state = self.INITIALIZED
36
37
    def connection_made(self, transport: asyncio.Transport):
38
        self.state = self.OPEN
39
        self.transport = transport
40
41
    def connection_lost(self, exc):
42
        self.logger.info("Socket has closed connection")
43
        self.state = self.CLOSED
44
        self.transport = None
45
46
    def data_received(self, data: bytes):
47
        if self.receive_buffer:
48
            data = self.receive_buffer + data
49
            self.receive_buffer = None
50
        self._process_received_data(data)
51
52
    def _process_received_data(self, data: bytes):
53
        """
54
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
55
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
56
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
57
        """
58
        buf = ua.utils.Buffer(data)
59
        while True:
60
            try:
61
                try:
62
                    header = header_from_binary(buf)
63
                except ua.utils.NotEnoughData:
64
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
65
                    self.receive_buffer = data
66
                    return
67
                if len(buf) < header.body_size:
68
                    self.logger.debug(
69
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
70
                    )
71
                    self.receive_buffer = data
72
                    return
73
                msg = self._connection.receive_from_header_and_body(header, buf)
74
                self._process_received_message(msg)
75
                if not buf:
76
                    return
77
            except Exception:
78
                self.logger.exception('Exception raised while parsing message from server')
79
                self.disconnect_socket()
80
                return
81
82
    def _process_received_message(self, msg):
83
        if msg is None:
84
            pass
85
        elif isinstance(msg, ua.Message):
86
            self._call_callback(msg.request_id(), msg.body())
87
        elif isinstance(msg, ua.Acknowledge):
88
            self._call_callback(0, msg)
89
        elif isinstance(msg, ua.ErrorMessage):
90
            self.logger.fatal("Received an error: %r", msg)
91
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
92
        else:
93
            raise ua.UaError(f"Unsupported message type: {msg}")
94
95
    def _send_request(self, request, timeout=1000, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
96
        """
97
        Send request to server, lower-level method.
98
        Timeout is the timeout written in ua header.
99
        Returns future
100
        """
101
        request.RequestHeader = self._create_request_header(timeout)
102
        self.logger.debug('Sending: %s', request)
103
        try:
104
            binreq = struct_to_binary(request)
105
        except Exception:
106
            # reset request handle if any error
107
            # see self._create_request_header
108
            self._request_handle -= 1
109
            raise
110
        self._request_id += 1
111
        future = self.loop.create_future()
112
        self._callbackmap[self._request_id] = future
113
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
114
        self.transport.write(msg)
115
        return future
116
117
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
118
        """
119
        Send a request to the server.
120
        Timeout is the timeout written in ua header.
121
        Returns response object if no callback is provided.
122
        """
123
        data = await asyncio.wait_for(
124
            self._send_request(request, timeout, message_type),
125
            timeout if timeout else None
126
        )
127
        self.check_answer(data, f" in response to {request.__class__.__name__}")
128
        return data
129
130
    def check_answer(self, data, context):
131
        data = data.copy()
132
        typeid = nodeid_from_binary(data)
133
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
134
            self.logger.warning("ServiceFault from server received %s", context)
135
            hdr = struct_from_binary(ua.ResponseHeader, data)
136
            hdr.ServiceResult.check()
137
            return False
138
        return True
139
140
    def _call_callback(self, request_id, body):
141
        try:
142
            self._callbackmap[request_id].set_result(body)
143
        except KeyError:
144
            raise ua.UaError(
145
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
146
            )
147
        except asyncio.InvalidStateError:
148
            raise ua.UaError(f"Future for request id {request_id} is already done")
149
150
    def _create_request_header(self, timeout=1000):
151
        hdr = ua.RequestHeader()
152
        hdr.AuthenticationToken = self.authentication_token
153
        self._request_handle += 1
154
        hdr.RequestHandle = self._request_handle
155
        hdr.TimeoutHint = timeout
156
        return hdr
157
158
    def disconnect_socket(self):
159
        self.logger.info("Request to close socket received")
160
        if self.transport:
161
            self.transport.close()
162
        else:
163
            self.logger.warning("disconnect_socket was called but transport is None")
164
165
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
166
        hello = ua.Hello()
167
        hello.EndpointUrl = url
168
        hello.MaxMessageSize = max_messagesize
169
        hello.MaxChunkCount = max_chunkcount
170
        ack = asyncio.Future()
171
        self._callbackmap[0] = ack
172
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
173
        return await asyncio.wait_for(ack, self.timeout)
174
175
    async def open_secure_channel(self, params):
176
        self.logger.info("open_secure_channel")
177
        request = ua.OpenSecureChannelRequest()
178
        request.Parameters = params
179
        result = await asyncio.wait_for(
180
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
181
            self.timeout
182
        )
183
        # FIXME: we have a race condition here
184
        # we can get a packet with the new token id before we reach to store it..
185
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
186
        response.ResponseHeader.ServiceResult.check()
187
        self._connection.set_channel(response.Parameters)
188
        return response.Parameters
189
190
    async def close_secure_channel(self):
191
        """
192
        Close secure channel.
193
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
194
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
195
        and should just close socket.
196
        """
197
        self.logger.info("close_secure_channel")
198
        request = ua.CloseSecureChannelRequest()
199
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
200
        # don't expect any more answers
201
        future.cancel()
202
        self._callbackmap.clear()
203
        # some servers send a response here, most do not ... so we ignore
204
205
206
class UaClient:
207
    """
208
    low level OPC-UA client.
209
210
    It implements (almost) all methods defined in asyncua spec
211
    taking in argument the structures defined in asyncua spec.
212
213
    In this Python implementation  most of the structures are defined in
214
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
215
    """
216
217
    def __init__(self, timeout=1, loop=None):
218
        self.logger = logging.getLogger(f'{__name__}.UaClient')
219
        self.loop = loop or asyncio.get_event_loop()
220
        self._subscription_callbacks = {}
221
        self._timeout = timeout
222
        self.security_policy = ua.SecurityPolicy()
223
        self.protocol: Optional[UASocketProtocol] = None
224
        self._publish_task = None
225
226
    def set_security(self, policy: ua.SecurityPolicy):
227
        self.security_policy = policy
228
229
    def _make_protocol(self):
230
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
231
        return self.protocol
232
233
    async def connect_socket(self, host: str, port: int):
234
        """Connect to server socket."""
235
        self.logger.info("opening connection")
236
        await self.loop.create_connection(self._make_protocol, host, port)
237
238
    def disconnect_socket(self):
239
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
240
            self.logger.warning("disconnect_socket was called but connection is closed")
241
            return None
242
        return self.protocol.disconnect_socket()
243
244
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
245
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
246
247
    async def open_secure_channel(self, params):
248
        return await self.protocol.open_secure_channel(params)
249
250
    async def close_secure_channel(self):
251
        """
252
        close secure channel. It seems to trigger a shutdown of socket
253
        in most servers, so be prepare to reconnect
254
        """
255
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
256
            self.logger.warning("close_secure_channel was called but connection is closed")
257
            return
258
        return await self.protocol.close_secure_channel()
259
260
    async def create_session(self, parameters):
261
        self.logger.info("create_session")
262
        request = ua.CreateSessionRequest()
263
        request.Parameters = parameters
264
        data = await self.protocol.send_request(request)
265
        response = struct_from_binary(ua.CreateSessionResponse, data)
266
        self.logger.debug(response)
267
        response.ResponseHeader.ServiceResult.check()
268
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
269
        return response.Parameters
270
271
    async def activate_session(self, parameters):
272
        self.logger.info("activate_session")
273
        request = ua.ActivateSessionRequest()
274
        request.Parameters = parameters
275
        data = await self.protocol.send_request(request)
276
        response = struct_from_binary(ua.ActivateSessionResponse, data)
277
        self.logger.debug(response)
278
        response.ResponseHeader.ServiceResult.check()
279
        return response.Parameters
280
281
    async def close_session(self, delete_subscriptions):
282
        self.logger.info("close_session")
283
        if self._publish_task and not self._publish_task.done():
284
            self._publish_task.cancel()
285
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
286
            self.logger.warning("close_session was called but connection is closed")
287
            return
288
        request = ua.CloseSessionRequest()
289
        request.DeleteSubscriptions = delete_subscriptions
290
        data = await self.protocol.send_request(request)
291
        response = struct_from_binary(ua.CloseSessionResponse, data)
292
        try:
293
            response.ResponseHeader.ServiceResult.check()
294
        except BadSessionClosed:
295
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
296
            #          we can just ignore it therefore.
297
            #          Alternatively we could make sure that there are no publish requests in flight when
298
            #          closing the session.
299
            pass
300
301
    async def browse(self, parameters):
302
        self.logger.info("browse")
303
        request = ua.BrowseRequest()
304
        request.Parameters = parameters
305
        data = await self.protocol.send_request(request)
306
        response = struct_from_binary(ua.BrowseResponse, data)
307
        self.logger.debug(response)
308
        response.ResponseHeader.ServiceResult.check()
309
        return response.Results
310
311
    async def browse_next(self, parameters):
312
        self.logger.debug("browse next")
313
        request = ua.BrowseNextRequest()
314
        request.Parameters = parameters
315
        data = await self.protocol.send_request(request)
316
        response = struct_from_binary(ua.BrowseNextResponse, data)
317
        self.logger.debug(response)
318
        response.ResponseHeader.ServiceResult.check()
319
        return response.Parameters.Results
320
321
    async def read(self, parameters):
322
        self.logger.debug("read")
323
        request = ua.ReadRequest()
324
        request.Parameters = parameters
325
        data = await self.protocol.send_request(request)
326
        response = struct_from_binary(ua.ReadResponse, data)
327
        self.logger.debug(response)
328
        response.ResponseHeader.ServiceResult.check()
329
        # cast to Enum attributes that need to
330
        for idx, rv in enumerate(parameters.NodesToRead):
331
            if rv.AttributeId == ua.AttributeIds.NodeClass:
332
                dv = response.Results[idx]
333
                if dv.StatusCode.is_good():
334
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
335
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
336
                dv = response.Results[idx]
337
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
338
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
339
        return response.Results
340
341
    async def write(self, params):
342
        self.logger.debug("write")
343
        request = ua.WriteRequest()
344
        request.Parameters = params
345
        data = await self.protocol.send_request(request)
346
        response = struct_from_binary(ua.WriteResponse, data)
347
        self.logger.debug(response)
348
        response.ResponseHeader.ServiceResult.check()
349
        return response.Results
350
351
    async def get_endpoints(self, params):
352
        self.logger.debug("get_endpoint")
353
        request = ua.GetEndpointsRequest()
354
        request.Parameters = params
355
        data = await self.protocol.send_request(request)
356
        response = struct_from_binary(ua.GetEndpointsResponse, data)
357
        self.logger.debug(response)
358
        response.ResponseHeader.ServiceResult.check()
359
        return response.Endpoints
360
361
    async def find_servers(self, params):
362
        self.logger.debug("find_servers")
363
        request = ua.FindServersRequest()
364
        request.Parameters = params
365
        data = await self.protocol.send_request(request)
366
        response = struct_from_binary(ua.FindServersResponse, data)
367
        self.logger.debug(response)
368
        response.ResponseHeader.ServiceResult.check()
369
        return response.Servers
370
371
    async def find_servers_on_network(self, params):
372
        self.logger.debug("find_servers_on_network")
373
        request = ua.FindServersOnNetworkRequest()
374
        request.Parameters = params
375
        data = await self.protocol.send_request(request)
376
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
377
        self.logger.debug(response)
378
        response.ResponseHeader.ServiceResult.check()
379
        return response.Parameters
380
381
    async def register_server(self, registered_server):
382
        self.logger.debug("register_server")
383
        request = ua.RegisterServerRequest()
384
        request.Server = registered_server
385
        data = await self.protocol.send_request(request)
386
        response = struct_from_binary(ua.RegisterServerResponse, data)
387
        self.logger.debug(response)
388
        response.ResponseHeader.ServiceResult.check()
389
        # nothing to return for this service
390
391
    async def register_server2(self, params):
392
        self.logger.debug("register_server2")
393
        request = ua.RegisterServer2Request()
394
        request.Parameters = params
395
        data = await self.protocol.send_request(request)
396
        response = struct_from_binary(ua.RegisterServer2Response, data)
397
        self.logger.debug(response)
398
        response.ResponseHeader.ServiceResult.check()
399
        return response.ConfigurationResults
400
401
    async def translate_browsepaths_to_nodeids(self, browse_paths):
402
        self.logger.debug("translate_browsepath_to_nodeid")
403
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
404
        request.Parameters.BrowsePaths = browse_paths
405
        data = await self.protocol.send_request(request)
406
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
407
        self.logger.debug(response)
408
        response.ResponseHeader.ServiceResult.check()
409
        return response.Results
410
411
    async def create_subscription(self, params, callback):
412
        self.logger.debug("create_subscription")
413
        request = ua.CreateSubscriptionRequest()
414
        request.Parameters = params
415
        data = await self.protocol.send_request(request)
416
        response = struct_from_binary(
417
            ua.CreateSubscriptionResponse,
418
            data
419
        )
420
        response.ResponseHeader.ServiceResult.check()
421
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
422
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
423
        if not self._publish_task or self._publish_task.done():
424
            # Start the publish cycle if it is not yet running
425
            self._publish_task = self.loop.create_task(self._publish_loop())
426
        return response.Parameters
427
428
    async def delete_subscriptions(self, subscription_ids):
429
        self.logger.debug("delete_subscriptions %r", subscription_ids)
430
        request = ua.DeleteSubscriptionsRequest()
431
        request.Parameters.SubscriptionIds = subscription_ids
432
        data = await self.protocol.send_request(request)
433
        response = struct_from_binary(
434
            ua.DeleteSubscriptionsResponse,
435
            data
436
        )
437
        response.ResponseHeader.ServiceResult.check()
438
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
439
        for sid in subscription_ids:
440
            self._subscription_callbacks.pop(sid)
441
        return response.Results
442
443
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
444
        """
445
        Send a PublishRequest to the server.
446
        """
447
        self.logger.debug('publish %r', acks)
448
        request = ua.PublishRequest()
449
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
450
        data = await self.protocol.send_request(request, timeout=0)
451
        self.protocol.check_answer(data, "while waiting for publish response")
452
        try:
453
            response = struct_from_binary(ua.PublishResponse, data)
454
        except Exception:
455
            self.logger.exception("Error parsing notification from server")
456
            raise UaStructParsingError
457
        return response
458
459
    async def _publish_loop(self):
460
        """
461
        Start publish cycle that sends a publish request and waits for the publish response in an endless loop.
462
        Forward the `PublishResult` to the matching `Subscription` by callback.
463
        """
464
        ack = None
465
        while True:
466
            try:
467
                response = await self.publish([ack] if ack else [])
468
            except BadTimeout:  # See Spec. Part 4, 7.28
469
                # Repeat without acknowledgement
470
                ack = None
471
                continue
472
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
473
                # BadNoSubscription is expected to be received after deleting the last subscription.
474
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
475
                # checking if there are no more subscriptions registered in this client (). A Publish response
476
                # could still arrive before the DeleteSubscription response.
477
                #
478
                # We could remove the callback already when sending the DeleteSubscription request,
479
                # but there are some legitimate reasons to keep them around, such as when the server
480
                # responds with "BadTimeout" and we should try again later instead of just removing
481
                # the subscription client-side.
482
                #
483
                # There are a variety of ways to act correctly, but the most practical solution seems
484
                # to be to just silently ignore any BadNoSubscription responses.
485
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
486
                # End task
487
                return
488
            except UaStructParsingError:
489
                ack = None
490
                continue
491
            subscription_id = response.Parameters.SubscriptionId
492
            if not subscription_id:
493
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
494
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
495
                # End task
496
                return
497
            try:
498
                callback = self._subscription_callbacks[subscription_id]
499
            except KeyError:
500
                self.logger.warning(
501
                    "Received data for unknown subscription %s active are %s", subscription_id,
502
                    self._subscription_callbacks.keys()
503
                )
504
            else:
505
                try:
506
                    callback(response.Parameters)
507
                except Exception:  # we call client code, catch everything!
508
                    self.logger.exception("Exception while calling user callback: %s")
509
            # Repeat with acknowledgement
510
            ack = ua.SubscriptionAcknowledgement()
511
            ack.SubscriptionId = subscription_id
512
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
513
514
    async def create_monitored_items(self, params):
515
        self.logger.info("create_monitored_items")
516
        request = ua.CreateMonitoredItemsRequest()
517
        request.Parameters = params
518
        data = await self.protocol.send_request(request)
519
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
520
        self.logger.debug(response)
521
        response.ResponseHeader.ServiceResult.check()
522
        return response.Results
523
524
    async def delete_monitored_items(self, params):
525
        self.logger.info("delete_monitored_items")
526
        request = ua.DeleteMonitoredItemsRequest()
527
        request.Parameters = params
528
        data = await self.protocol.send_request(request)
529
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
530
        self.logger.debug(response)
531
        response.ResponseHeader.ServiceResult.check()
532
        return response.Results
533
534
    async def add_nodes(self, nodestoadd):
535
        self.logger.info("add_nodes")
536
        request = ua.AddNodesRequest()
537
        request.Parameters.NodesToAdd = nodestoadd
538
        data = await self.protocol.send_request(request)
539
        response = struct_from_binary(ua.AddNodesResponse, data)
540
        self.logger.debug(response)
541
        response.ResponseHeader.ServiceResult.check()
542
        return response.Results
543
544
    async def add_references(self, refs):
545
        self.logger.info("add_references")
546
        request = ua.AddReferencesRequest()
547
        request.Parameters.ReferencesToAdd = refs
548
        data = await self.protocol.send_request(request)
549
        response = struct_from_binary(ua.AddReferencesResponse, data)
550
        self.logger.debug(response)
551
        response.ResponseHeader.ServiceResult.check()
552
        return response.Results
553
554
    async def delete_references(self, refs):
555
        self.logger.info("delete")
556
        request = ua.DeleteReferencesRequest()
557
        request.Parameters.ReferencesToDelete = refs
558
        data = await self.protocol.send_request(request)
559
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
560
        self.logger.debug(response)
561
        response.ResponseHeader.ServiceResult.check()
562
        return response.Parameters.Results
563
564
    async def delete_nodes(self, params):
565
        self.logger.info("delete_nodes")
566
        request = ua.DeleteNodesRequest()
567
        request.Parameters = params
568
        data = await self.protocol.send_request(request)
569
        response = struct_from_binary(ua.DeleteNodesResponse, data)
570
        self.logger.debug(response)
571
        response.ResponseHeader.ServiceResult.check()
572
        return response.Results
573
574
    async def call(self, methodstocall):
575
        request = ua.CallRequest()
576
        request.Parameters.MethodsToCall = methodstocall
577
        data = await self.protocol.send_request(request)
578
        response = struct_from_binary(ua.CallResponse, data)
579
        self.logger.debug(response)
580
        response.ResponseHeader.ServiceResult.check()
581
        return response.Results
582
583
    async def history_read(self, params):
584
        self.logger.info("history_read")
585
        request = ua.HistoryReadRequest()
586
        request.Parameters = params
587
        data = await self.protocol.send_request(request)
588
        response = struct_from_binary(ua.HistoryReadResponse, data)
589
        self.logger.debug(response)
590
        response.ResponseHeader.ServiceResult.check()
591
        return response.Results
592
593
    async def modify_monitored_items(self, params):
594
        self.logger.info("modify_monitored_items")
595
        request = ua.ModifyMonitoredItemsRequest()
596
        request.Parameters = params
597
        data = await self.protocol.send_request(request)
598
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
599
        self.logger.debug(response)
600
        response.ResponseHeader.ServiceResult.check()
601
        return response.Results
602
603
    async def register_nodes(self, nodes):
604
        self.logger.info("register_nodes")
605
        request = ua.RegisterNodesRequest()
606
        request.Parameters.NodesToRegister = nodes
607
        data = await self.protocol.send_request(request)
608
        response = struct_from_binary(ua.RegisterNodesResponse, data)
609
        self.logger.debug(response)
610
        response.ResponseHeader.ServiceResult.check()
611
        return response.Parameters.RegisteredNodeIds
612
613
    async def unregister_nodes(self, nodes):
614
        self.logger.info("unregister_nodes")
615
        request = ua.UnregisterNodesRequest()
616
        request.Parameters.NodesToUnregister = nodes
617
        data = await self.protocol.send_request(request)
618
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
619
        self.logger.debug(response)
620
        response.ResponseHeader.ServiceResult.check()
621
        # nothing to return for this service
622
623
    async def get_attribute(self, nodes, attr):
624
        self.logger.info("get_attribute")
625
        request = ua.ReadRequest()
626
        for node in nodes:
627
            rv = ua.ReadValueId()
628
            rv.NodeId = node
629
            rv.AttributeId = attr
630
            request.Parameters.NodesToRead.append(rv)
631
        data = await self.protocol.send_request(request)
632
        response = struct_from_binary(ua.ReadResponse, data)
633
        response.ResponseHeader.ServiceResult.check()
634
        return response.Results
635