Passed
Push — master ( 8b55fd...609621 )
by Olivier
02:32
created

UaClient._send_publish_request()   B

Complexity

Conditions 6

Size

Total Lines 53
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 26
nop 2
dl 0
loc 53
rs 8.3226
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
25
        self.loop = loop or asyncio.get_event_loop()
26
        self.transport = None
27
        self.receive_buffer: Optional[bytes] = None
28
        self.is_receiving = False
29
        self.timeout = timeout
30
        self.authentication_token = ua.NodeId()
31
        self._request_id = 0
32
        self._request_handle = 0
33
        self._callbackmap: Dict[int, asyncio.Future] = {}
34
        self._connection = SecureConnection(security_policy)
35
        self.state = self.INITIALIZED
36
37
    def connection_made(self, transport: asyncio.Transport):
38
        self.state = self.OPEN
39
        self.transport = transport
40
41
    def connection_lost(self, exc):
42
        self.logger.info("Socket has closed connection")
43
        self.state = self.CLOSED
44
        self.transport = None
45
46
    def data_received(self, data: bytes):
47
        if self.receive_buffer:
48
            data = self.receive_buffer + data
49
            self.receive_buffer = None
50
        self._process_received_data(data)
51
52
    def _process_received_data(self, data: bytes):
53
        """
54
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
55
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
56
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
57
        """
58
        buf = ua.utils.Buffer(data)
59
        while True:
60
            try:
61
                try:
62
                    header = header_from_binary(buf)
63
                except ua.utils.NotEnoughData:
64
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
65
                    self.receive_buffer = data
66
                    return
67
                if len(buf) < header.body_size:
68
                    self.logger.debug(
69
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
70
                    )
71
                    self.receive_buffer = data
72
                    return
73
                msg = self._connection.receive_from_header_and_body(header, buf)
74
                self._process_received_message(msg)
75
                if not buf:
76
                    return
77
            except Exception:
78
                self.logger.exception('Exception raised while parsing message from server')
79
                self.disconnect_socket()
80
                return
81
82
    def _process_received_message(self, msg):
83
        if msg is None:
84
            pass
85
        elif isinstance(msg, ua.Message):
86
            self._call_callback(msg.request_id(), msg.body())
87
        elif isinstance(msg, ua.Acknowledge):
88
            self._call_callback(0, msg)
89
        elif isinstance(msg, ua.ErrorMessage):
90
            self.logger.fatal("Received an error: %r", msg)
91
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
92
        else:
93
            raise ua.UaError(f"Unsupported message type: {msg}")
94
95
    def _send_request(self, request, timeout=1000, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
96
        """
97
        Send request to server, lower-level method.
98
        Timeout is the timeout written in ua header.
99
        Returns future
100
        """
101
        request.RequestHeader = self._create_request_header(timeout)
102
        self.logger.debug('Sending: %s', request)
103
        try:
104
            binreq = struct_to_binary(request)
105
        except Exception:
106
            # reset request handle if any error
107
            # see self._create_request_header
108
            self._request_handle -= 1
109
            raise
110
        self._request_id += 1
111
        future = self.loop.create_future()
112
        self._callbackmap[self._request_id] = future
113
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
114
        self.transport.write(msg)
115
        return future
116
117
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
118
        """
119
        Send a request to the server.
120
        Timeout is the timeout written in ua header.
121
        Returns response object if no callback is provided.
122
        """
123
        data = await asyncio.wait_for(
124
            self._send_request(request, timeout, message_type),
125
            timeout if timeout else None
126
        )
127
        self.check_answer(data, f" in response to {request.__class__.__name__}")
128
        return data
129
130
    def check_answer(self, data, context):
131
        data = data.copy()
132
        typeid = nodeid_from_binary(data)
133
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
134
            self.logger.warning("ServiceFault from server received %s", context)
135
            hdr = struct_from_binary(ua.ResponseHeader, data)
136
            hdr.ServiceResult.check()
137
            return False
138
        return True
139
140
    def _call_callback(self, request_id, body):
141
        future = self._callbackmap.pop(request_id, None)
142
        if future is None:
143
            raise ua.UaError(
144
                f"No request found for requestid: {request_id}, callbacks in list are {self._callbackmap.keys()}"
145
            )
146
        future.set_result(body)
147
148
    def _create_request_header(self, timeout=1000):
149
        hdr = ua.RequestHeader()
150
        hdr.AuthenticationToken = self.authentication_token
151
        self._request_handle += 1
152
        hdr.RequestHandle = self._request_handle
153
        hdr.TimeoutHint = timeout
154
        return hdr
155
156
    def disconnect_socket(self):
157
        self.logger.info("Request to close socket received")
158
        if self.transport:
159
            self.transport.close()
160
        else:
161
            self.logger.warning("disconnect_socket was called but transport is None")
162
163
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
164
        hello = ua.Hello()
165
        hello.EndpointUrl = url
166
        hello.MaxMessageSize = max_messagesize
167
        hello.MaxChunkCount = max_chunkcount
168
        ack = asyncio.Future()
169
        self._callbackmap[0] = ack
170
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
171
        return await asyncio.wait_for(ack, self.timeout)
172
173
    async def open_secure_channel(self, params):
174
        self.logger.info("open_secure_channel")
175
        request = ua.OpenSecureChannelRequest()
176
        request.Parameters = params
177
        result = await asyncio.wait_for(
178
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
179
            self.timeout
180
        )
181
        # FIXME: we have a race condition here
182
        # we can get a packet with the new token id before we reach to store it..
183
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
184
        response.ResponseHeader.ServiceResult.check()
185
        self._connection.set_channel(response.Parameters)
186
        return response.Parameters
187
188
    async def close_secure_channel(self):
189
        """
190
        Close secure channel.
191
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
192
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
193
        and should just close socket.
194
        """
195
        self.logger.info("close_secure_channel")
196
        request = ua.CloseSecureChannelRequest()
197
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
198
        # don't expect any more answers
199
        future.cancel()
200
        self._callbackmap.clear()
201
        # some servers send a response here, most do not ... so we ignore
202
203
204
class UaClient:
205
    """
206
    low level OPC-UA client.
207
208
    It implements (almost) all methods defined in asyncua spec
209
    taking in argument the structures defined in asyncua spec.
210
211
    In this Python implementation  most of the structures are defined in
212
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
213
    """
214
215
    def __init__(self, timeout=1, loop=None):
216
        self.logger = logging.getLogger(f'{__name__}.UaClient')
217
        self.loop = loop or asyncio.get_event_loop()
218
        self._publish_callbacks = {}
219
        self._timeout = timeout
220
        self.security_policy = ua.SecurityPolicy()
221
        self.protocol: Optional[UASocketProtocol] = None
222
        self._sub_cond = asyncio.Condition()
223
        self._sub_data_queue = []
224
225
    def set_security(self, policy: ua.SecurityPolicy):
226
        self.security_policy = policy
227
228
    def _make_protocol(self):
229
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
230
        return self.protocol
231
232
    async def connect_socket(self, host: str, port: int):
233
        """Connect to server socket."""
234
        self.logger.info("opening connection")
235
        await self.loop.create_connection(self._make_protocol, host, port)
236
237
    def disconnect_socket(self):
238
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
239
            self.logger.warning("disconnect_socket was called but connection is closed")
240
            return None
241
        return self.protocol.disconnect_socket()
242
243
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
244
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
245
246
    async def open_secure_channel(self, params):
247
        return await self.protocol.open_secure_channel(params)
248
249
    async def close_secure_channel(self):
250
        """
251
        close secure channel. It seems to trigger a shutdown of socket
252
        in most servers, so be prepare to reconnect
253
        """
254
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
255
            self.logger.warning("close_secure_channel was called but connection is closed")
256
            return
257
        return await self.protocol.close_secure_channel()
258
259
    async def create_session(self, parameters):
260
        self.logger.info("create_session")
261
        request = ua.CreateSessionRequest()
262
        request.Parameters = parameters
263
        data = await self.protocol.send_request(request)
264
        response = struct_from_binary(ua.CreateSessionResponse, data)
265
        self.logger.debug(response)
266
        response.ResponseHeader.ServiceResult.check()
267
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
268
        return response.Parameters
269
270
    async def activate_session(self, parameters):
271
        self.logger.info("activate_session")
272
        request = ua.ActivateSessionRequest()
273
        request.Parameters = parameters
274
        data = await self.protocol.send_request(request)
275
        response = struct_from_binary(ua.ActivateSessionResponse, data)
276
        self.logger.debug(response)
277
        response.ResponseHeader.ServiceResult.check()
278
        return response.Parameters
279
280
    async def close_session(self, delete_subscriptions):
281
        self.logger.info("close_session")
282
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
283
            self.logger.warning("close_session was called but connection is closed")
284
            return
285
        request = ua.CloseSessionRequest()
286
        request.DeleteSubscriptions = delete_subscriptions
287
        data = await self.protocol.send_request(request)
288
        response = struct_from_binary(ua.CloseSessionResponse, data)
289
        try:
290
            response.ResponseHeader.ServiceResult.check()
291
        except BadSessionClosed:
292
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
293
            #          we can just ignore it therefore.
294
            #          Alternatively we could make sure that there are no publish requests in flight when
295
            #          closing the session.
296
            pass
297
298
    async def browse(self, parameters):
299
        self.logger.info("browse")
300
        request = ua.BrowseRequest()
301
        request.Parameters = parameters
302
        data = await self.protocol.send_request(request)
303
        response = struct_from_binary(ua.BrowseResponse, data)
304
        self.logger.debug(response)
305
        response.ResponseHeader.ServiceResult.check()
306
        return response.Results
307
308
    async def browse_next(self, parameters):
309
        self.logger.info("browse next")
310
        request = ua.BrowseNextRequest()
311
        request.Parameters = parameters
312
        data = await self.protocol.send_request(request)
313
        response = struct_from_binary(ua.BrowseNextResponse, data)
314
        self.logger.debug(response)
315
        response.ResponseHeader.ServiceResult.check()
316
        return response.Parameters.Results
317
318
    async def read(self, parameters):
319
        self.logger.info("read")
320
        request = ua.ReadRequest()
321
        request.Parameters = parameters
322
        data = await self.protocol.send_request(request)
323
        response = struct_from_binary(ua.ReadResponse, data)
324
        self.logger.debug(response)
325
        response.ResponseHeader.ServiceResult.check()
326
        # cast to Enum attributes that need to
327
        for idx, rv in enumerate(parameters.NodesToRead):
328
            if rv.AttributeId == ua.AttributeIds.NodeClass:
329
                dv = response.Results[idx]
330
                if dv.StatusCode.is_good():
331
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
332
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
333
                dv = response.Results[idx]
334
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
335
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
336
        return response.Results
337
338
    async def write(self, params):
339
        self.logger.info("read")
340
        request = ua.WriteRequest()
341
        request.Parameters = params
342
        data = await self.protocol.send_request(request)
343
        response = struct_from_binary(ua.WriteResponse, data)
344
        self.logger.debug(response)
345
        response.ResponseHeader.ServiceResult.check()
346
        return response.Results
347
348
    async def get_endpoints(self, params):
349
        self.logger.info("get_endpoint")
350
        request = ua.GetEndpointsRequest()
351
        request.Parameters = params
352
        data = await self.protocol.send_request(request)
353
        response = struct_from_binary(ua.GetEndpointsResponse, data)
354
        self.logger.debug(response)
355
        response.ResponseHeader.ServiceResult.check()
356
        return response.Endpoints
357
358
    async def find_servers(self, params):
359
        self.logger.info("find_servers")
360
        request = ua.FindServersRequest()
361
        request.Parameters = params
362
        data = await self.protocol.send_request(request)
363
        response = struct_from_binary(ua.FindServersResponse, data)
364
        self.logger.debug(response)
365
        response.ResponseHeader.ServiceResult.check()
366
        return response.Servers
367
368
    async def find_servers_on_network(self, params):
369
        self.logger.info("find_servers_on_network")
370
        request = ua.FindServersOnNetworkRequest()
371
        request.Parameters = params
372
        data = await self.protocol.send_request(request)
373
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
374
        self.logger.debug(response)
375
        response.ResponseHeader.ServiceResult.check()
376
        return response.Parameters
377
378
    async def register_server(self, registered_server):
379
        self.logger.info("register_server")
380
        request = ua.RegisterServerRequest()
381
        request.Server = registered_server
382
        data = await self.protocol.send_request(request)
383
        response = struct_from_binary(ua.RegisterServerResponse, data)
384
        self.logger.debug(response)
385
        response.ResponseHeader.ServiceResult.check()
386
        # nothing to return for this service
387
388
    async def register_server2(self, params):
389
        self.logger.info("register_server2")
390
        request = ua.RegisterServer2Request()
391
        request.Parameters = params
392
        data = await self.protocol.send_request(request)
393
        response = struct_from_binary(ua.RegisterServer2Response, data)
394
        self.logger.debug(response)
395
        response.ResponseHeader.ServiceResult.check()
396
        return response.ConfigurationResults
397
398
    async def translate_browsepaths_to_nodeids(self, browse_paths):
399
        self.logger.info("translate_browsepath_to_nodeid")
400
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
401
        request.Parameters.BrowsePaths = browse_paths
402
        data = await self.protocol.send_request(request)
403
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
404
        self.logger.debug(response)
405
        response.ResponseHeader.ServiceResult.check()
406
        return response.Results
407
408
    async def create_subscription(self, params, callback):
409
        self.logger.info("create_subscription")
410
        request = ua.CreateSubscriptionRequest()
411
        request.Parameters = params
412
        data = await self.protocol.send_request(request)
413
        response = struct_from_binary(
414
            ua.CreateSubscriptionResponse,
415
            data
416
        )
417
        self.logger.info("create subscription callback")
418
        self.logger.debug(response)
419
        response.ResponseHeader.ServiceResult.check()
420
        self._publish_callbacks[response.Parameters.SubscriptionId] = callback
421
        return response.Parameters
422
423
    async def delete_subscriptions(self, subscription_ids):
424
        self.logger.info("delete_subscription")
425
        request = ua.DeleteSubscriptionsRequest()
426
        request.Parameters.SubscriptionIds = subscription_ids
427
        data = await self.protocol.send_request(request)
428
        response = struct_from_binary(
429
            ua.DeleteSubscriptionsResponse,
430
            data
431
        )
432
        self.logger.info("delete subscriptions callback")
433
        self.logger.debug(response)
434
        response.ResponseHeader.ServiceResult.check()
435
        for sid in subscription_ids:
436
            self._publish_callbacks.pop(sid)
437
        return response.Results
438
439
    async def publish(self, acks=None):
440
        self.logger.info("publish")
441
        if acks is None:
442
            acks = []
443
        request = ua.PublishRequest()
444
        request.Parameters.SubscriptionAcknowledgements = acks
445
        # We do not want to wait for publish response in this task
446
        self.loop.create_task(self._send_publish_request(request))
447
448
    """
449
    # to avoid fire and forget of task we could use a loop:
450
    def _sub_data_received(self, future):
451
        data = future.result()
452
        self.loop.create_task(self._enqueue_sub_data(data))
453
454
    async def _enqueue_sub_data(self, data):
455
        self._sub_data_queue.append(data)
456
        with self._sub_cond:
457
            self._sub_cond.notify()
458
459
    async def _subscribtion_loop(self):
460
        while True:
461
            async with self._sub_cond:
462
                await self._sub_cond.wait()
463
                data = self._sub_data_queue.pop(0)
464
                await self._call_publish_callback(data)
465
    """
466
467
    async def _send_publish_request(self, request):
468
        """
469
        Send publish request and wait for publish response.
470
        :param request:
471
        :return:
472
        """
473
        data = await self.protocol.send_request(request, timeout=0)
474
        try:
475
            self.protocol.check_answer(data, "while waiting for publish response")
476
        except BadTimeout:
477
            # Spec Part 4, 7.28
478
            await self.publish()
479
            return
480
        except BadNoSubscription:  # Spec Part 5, 13.8.1
481
            # BadNoSubscription is expected after deleting the last subscription.
482
            #
483
            # We should therefore also check for len(self._publishcallbacks) == 0, but
484
            # this gets us into trouble if a Publish response arrives before the
485
            # DeleteSubscription response.
486
            #
487
            # We could remove the callback already when sending the DeleteSubscription request,
488
            # but there are some legitimate reasons to keep them around, such as when the server
489
            # responds with "BadTimeout" and we should try again later instead of just removing
490
            # the subscription client-side.
491
            #
492
            # There are a variety of ways to act correctly, but the most practical solution seems
493
            # to be to just ignore any BadNoSubscription responses.
494
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
495
            return
496
        # parse publish response
497
        try:
498
            response = struct_from_binary(ua.PublishResponse, data)
499
            self.logger.debug(response)
500
        except Exception:
501
            # INFO: catching the exception here might be obsolete because we already
502
            #       catch BadTimeout above. However, it's not really clear what this code
503
            #       does so it stays in, doesn't seem to hurt.
504
            self.logger.exception("Error parsing notification from server")
505
            # send publish request ot server so he does stop sending notifications
506
            await self.publish([])
507
            return
508
        # look for callback
509
        try:
510
            callback = self._publish_callbacks[response.Parameters.SubscriptionId]
511
        except KeyError:
512
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
513
            return
514
        # do callback
515
        try:
516
            await callback(response.Parameters)
517
        except Exception:
518
            # we call client code, catch everything!
519
            self.logger.exception("Exception while calling user callback: %s")
520
521
    async def create_monitored_items(self, params):
522
        self.logger.info("create_monitored_items")
523
        request = ua.CreateMonitoredItemsRequest()
524
        request.Parameters = params
525
        data = await self.protocol.send_request(request)
526
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
527
        self.logger.debug(response)
528
        response.ResponseHeader.ServiceResult.check()
529
        return response.Results
530
531
    async def delete_monitored_items(self, params):
532
        self.logger.info("delete_monitored_items")
533
        request = ua.DeleteMonitoredItemsRequest()
534
        request.Parameters = params
535
        data = await self.protocol.send_request(request)
536
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
537
        self.logger.debug(response)
538
        response.ResponseHeader.ServiceResult.check()
539
        return response.Results
540
541
    async def add_nodes(self, nodestoadd):
542
        self.logger.info("add_nodes")
543
        request = ua.AddNodesRequest()
544
        request.Parameters.NodesToAdd = nodestoadd
545
        data = await self.protocol.send_request(request)
546
        response = struct_from_binary(ua.AddNodesResponse, data)
547
        self.logger.debug(response)
548
        response.ResponseHeader.ServiceResult.check()
549
        return response.Results
550
551
    async def add_references(self, refs):
552
        self.logger.info("add_references")
553
        request = ua.AddReferencesRequest()
554
        request.Parameters.ReferencesToAdd = refs
555
        data = await self.protocol.send_request(request)
556
        response = struct_from_binary(ua.AddReferencesResponse, data)
557
        self.logger.debug(response)
558
        response.ResponseHeader.ServiceResult.check()
559
        return response.Results
560
561
    async def delete_references(self, refs):
562
        self.logger.info("delete")
563
        request = ua.DeleteReferencesRequest()
564
        request.Parameters.ReferencesToDelete = refs
565
        data = await self.protocol.send_request(request)
566
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
567
        self.logger.debug(response)
568
        response.ResponseHeader.ServiceResult.check()
569
        return response.Parameters.Results
570
571
    async def delete_nodes(self, params):
572
        self.logger.info("delete_nodes")
573
        request = ua.DeleteNodesRequest()
574
        request.Parameters = params
575
        data = await self.protocol.send_request(request)
576
        response = struct_from_binary(ua.DeleteNodesResponse, data)
577
        self.logger.debug(response)
578
        response.ResponseHeader.ServiceResult.check()
579
        return response.Results
580
581
    async def call(self, methodstocall):
582
        request = ua.CallRequest()
583
        request.Parameters.MethodsToCall = methodstocall
584
        data = await self.protocol.send_request(request)
585
        response = struct_from_binary(ua.CallResponse, data)
586
        self.logger.debug(response)
587
        response.ResponseHeader.ServiceResult.check()
588
        return response.Results
589
590
    async def history_read(self, params):
591
        self.logger.info("history_read")
592
        request = ua.HistoryReadRequest()
593
        request.Parameters = params
594
        data = await self.protocol.send_request(request)
595
        response = struct_from_binary(ua.HistoryReadResponse, data)
596
        self.logger.debug(response)
597
        response.ResponseHeader.ServiceResult.check()
598
        return response.Results
599
600
    async def modify_monitored_items(self, params):
601
        self.logger.info("modify_monitored_items")
602
        request = ua.ModifyMonitoredItemsRequest()
603
        request.Parameters = params
604
        data = await self.protocol.send_request(request)
605
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
606
        self.logger.debug(response)
607
        response.ResponseHeader.ServiceResult.check()
608
        return response.Results
609
610
    async def register_nodes(self, nodes):
611
        self.logger.info("register_nodes")
612
        request = ua.RegisterNodesRequest()
613
        request.Parameters.NodesToRegister = nodes
614
        data = await self.protocol.send_request(request)
615
        response = struct_from_binary(ua.RegisterNodesResponse, data)
616
        self.logger.debug(response)
617
        response.ResponseHeader.ServiceResult.check()
618
        return response.Parameters.RegisteredNodeIds
619
620
    async def unregister_nodes(self, nodes):
621
        self.logger.info("unregister_nodes")
622
        request = ua.UnregisterNodesRequest()
623
        request.Parameters.NodesToUnregister = nodes
624
        data = await self.protocol.send_request(request)
625
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
626
        self.logger.debug(response)
627
        response.ResponseHeader.ServiceResult.check()
628
        # nothing to return for this service
629
630
    async def get_attribute(self, nodes, attr):
631
        self.logger.info("get_attribute")
632
        request = ua.ReadRequest()
633
        for node in nodes:
634
            rv = ua.ReadValueId()
635
            rv.NodeId = node
636
            rv.AttributeId = attr
637
            request.Parameters.NodesToRead.append(rv)
638
        data = await self.protocol.send_request(request)
639
        response = struct_from_binary(ua.ReadResponse, data)
640
        response.ResponseHeader.ServiceResult.check()
641
        return response.Results
642