Passed
Pull Request — master (#45)
by
unknown
01:53
created

asyncua.client.ua_client.UaClient.close_session()   B

Complexity

Conditions 6

Size

Total Lines 19
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 15
nop 2
dl 0
loc 19
rs 8.6666
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
25
        self.loop = loop or asyncio.get_event_loop()
26
        self.transport = None
27
        self.receive_buffer: Optional[bytes] = None
28
        self.is_receiving = False
29
        self.timeout = timeout
30
        self.authentication_token = ua.NodeId()
31
        self._request_id = 0
32
        self._request_handle = 0
33
        self._callbackmap: Dict[int, asyncio.Future] = {}
34
        self._connection = SecureConnection(security_policy)
35
        self.state = self.INITIALIZED
36
37
    def connection_made(self, transport: asyncio.Transport):
38
        self.state = self.OPEN
39
        self.transport = transport
40
41
    def connection_lost(self, exc):
42
        self.logger.info("Socket has closed connection")
43
        self.state = self.CLOSED
44
        self.transport = None
45
46
    def data_received(self, data: bytes):
47
        if self.receive_buffer:
48
            data = self.receive_buffer + data
49
            self.receive_buffer = None
50
        self._process_received_data(data)
51
52
    def _process_received_data(self, data: bytes):
53
        """
54
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
55
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
56
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
57
        """
58
        buf = ua.utils.Buffer(data)
59
        while True:
60
            try:
61
                try:
62
                    header = header_from_binary(buf)
63
                except ua.utils.NotEnoughData:
64
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
65
                    self.receive_buffer = data
66
                    return
67
                if len(buf) < header.body_size:
68
                    self.logger.debug(
69
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
70
                    )
71
                    self.receive_buffer = data
72
                    return
73
                msg = self._connection.receive_from_header_and_body(header, buf)
74
                self._process_received_message(msg)
75
                if not buf:
76
                    return
77
            except Exception:
78
                self.logger.exception('Exception raised while parsing message from server')
79
                self.disconnect_socket()
80
                return
81
82
    def _process_received_message(self, msg):
83
        if msg is None:
84
            pass
85
        elif isinstance(msg, ua.Message):
86
            self._call_callback(msg.request_id(), msg.body())
87
        elif isinstance(msg, ua.Acknowledge):
88
            self._call_callback(0, msg)
89
        elif isinstance(msg, ua.ErrorMessage):
90
            self.logger.fatal("Received an error: %r", msg)
91
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
92
        else:
93
            raise ua.UaError(f"Unsupported message type: {msg}")
94
95
    def _send_request(self, request, timeout=1000, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
96
        """
97
        Send request to server, lower-level method.
98
        Timeout is the timeout written in ua header.
99
        Returns future
100
        """
101
        request.RequestHeader = self._create_request_header(timeout)
102
        self.logger.debug('Sending: %s', request)
103
        try:
104
            binreq = struct_to_binary(request)
105
        except Exception:
106
            # reset request handle if any error
107
            # see self._create_request_header
108
            self._request_handle -= 1
109
            raise
110
        self._request_id += 1
111
        future = self.loop.create_future()
112
        self._callbackmap[self._request_id] = future
113
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
114
        self.transport.write(msg)
115
        return future
116
117
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
118
        """
119
        Send a request to the server.
120
        Timeout is the timeout written in ua header.
121
        Returns response object if no callback is provided.
122
        """
123
        data = await asyncio.wait_for(
124
            self._send_request(request, timeout, message_type),
125
            timeout if timeout else None
126
        )
127
        self.check_answer(data, f" in response to {request.__class__.__name__}")
128
        return data
129
130
    def check_answer(self, data, context):
131
        data = data.copy()
132
        typeid = nodeid_from_binary(data)
133
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
134
            self.logger.warning("ServiceFault from server received %s", context)
135
            hdr = struct_from_binary(ua.ResponseHeader, data)
136
            hdr.ServiceResult.check()
137
            return False
138
        return True
139
140
    def _call_callback(self, request_id, body):
141
        try:
142
            self._callbackmap[request_id].set_result(body)
143
        except KeyError:
144
            raise ua.UaError(
145
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
146
            )
147
        except asyncio.InvalidStateError:
148
            raise ua.UaError(f"Future for request id {request_id} is already done")
149
150
    def _create_request_header(self, timeout=1000):
151
        hdr = ua.RequestHeader()
152
        hdr.AuthenticationToken = self.authentication_token
153
        self._request_handle += 1
154
        hdr.RequestHandle = self._request_handle
155
        hdr.TimeoutHint = timeout
156
        return hdr
157
158
    def disconnect_socket(self):
159
        self.logger.info("Request to close socket received")
160
        if self.transport:
161
            self.transport.close()
162
        else:
163
            self.logger.warning("disconnect_socket was called but transport is None")
164
165
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
166
        hello = ua.Hello()
167
        hello.EndpointUrl = url
168
        hello.MaxMessageSize = max_messagesize
169
        hello.MaxChunkCount = max_chunkcount
170
        ack = asyncio.Future()
171
        self._callbackmap[0] = ack
172
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
173
        return await asyncio.wait_for(ack, self.timeout)
174
175
    async def open_secure_channel(self, params):
176
        self.logger.info("open_secure_channel")
177
        request = ua.OpenSecureChannelRequest()
178
        request.Parameters = params
179
        result = await asyncio.wait_for(
180
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
181
            self.timeout
182
        )
183
        # FIXME: we have a race condition here
184
        # we can get a packet with the new token id before we reach to store it..
185
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
186
        response.ResponseHeader.ServiceResult.check()
187
        self._connection.set_channel(response.Parameters)
188
        return response.Parameters
189
190
    async def close_secure_channel(self):
191
        """
192
        Close secure channel.
193
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
194
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
195
        and should just close socket.
196
        """
197
        self.logger.info("close_secure_channel")
198
        request = ua.CloseSecureChannelRequest()
199
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
200
        # don't expect any more answers
201
        future.cancel()
202
        self._callbackmap.clear()
203
        # some servers send a response here, most do not ... so we ignore
204
205
206
class UaClient:
207
    """
208
    low level OPC-UA client.
209
210
    It implements (almost) all methods defined in asyncua spec
211
    taking in argument the structures defined in asyncua spec.
212
213
    In this Python implementation  most of the structures are defined in
214
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
215
    """
216
217
    def __init__(self, timeout=1, loop=None):
218
        self.logger = logging.getLogger(f'{__name__}.UaClient')
219
        self.loop = loop or asyncio.get_event_loop()
220
        self._subscription_callbacks = {}
221
        self._timeout = timeout
222
        self.security_policy = ua.SecurityPolicy()
223
        self.protocol: Optional[UASocketProtocol] = None
224
        self._publish_task = None
225
226
    def set_security(self, policy: ua.SecurityPolicy):
227
        self.security_policy = policy
228
229
    def _make_protocol(self):
230
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
231
        return self.protocol
232
233
    async def connect_socket(self, host: str, port: int):
234
        """Connect to server socket."""
235
        self.logger.info("opening connection")
236
        await self.loop.create_connection(self._make_protocol, host, port)
237
238
    def disconnect_socket(self):
239
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
240
            self.logger.warning("disconnect_socket was called but connection is closed")
241
            return None
242
        return self.protocol.disconnect_socket()
243
244
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
245
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
246
247
    async def open_secure_channel(self, params):
248
        return await self.protocol.open_secure_channel(params)
249
250
    async def close_secure_channel(self):
251
        """
252
        close secure channel. It seems to trigger a shutdown of socket
253
        in most servers, so be prepare to reconnect
254
        """
255
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
256
            self.logger.warning("close_secure_channel was called but connection is closed")
257
            return
258
        return await self.protocol.close_secure_channel()
259
260
    async def create_session(self, parameters):
261
        self.logger.info("create_session")
262
        request = ua.CreateSessionRequest()
263
        request.Parameters = parameters
264
        data = await self.protocol.send_request(request)
265
        response = struct_from_binary(ua.CreateSessionResponse, data)
266
        self.logger.debug(response)
267
        response.ResponseHeader.ServiceResult.check()
268
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
269
        return response.Parameters
270
271
    async def activate_session(self, parameters):
272
        self.logger.info("activate_session")
273
        request = ua.ActivateSessionRequest()
274
        request.Parameters = parameters
275
        data = await self.protocol.send_request(request)
276
        response = struct_from_binary(ua.ActivateSessionResponse, data)
277
        self.logger.debug(response)
278
        response.ResponseHeader.ServiceResult.check()
279
        return response.Parameters
280
281
    async def close_session(self, delete_subscriptions):
282
        self.logger.info("close_session")
283
        if self._publish_task and not self._publish_task.done():
284
            self._publish_task.cancel()
285
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
286
            self.logger.warning("close_session was called but connection is closed")
287
            return
288
        request = ua.CloseSessionRequest()
289
        request.DeleteSubscriptions = delete_subscriptions
290
        data = await self.protocol.send_request(request)
291
        response = struct_from_binary(ua.CloseSessionResponse, data)
292
        try:
293
            response.ResponseHeader.ServiceResult.check()
294
        except BadSessionClosed:
295
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
296
            #          we can just ignore it therefore.
297
            #          Alternatively we could make sure that there are no publish requests in flight when
298
            #          closing the session.
299
            pass
300
301
    async def browse(self, parameters):
302
        self.logger.info("browse")
303
        request = ua.BrowseRequest()
304
        request.Parameters = parameters
305
        data = await self.protocol.send_request(request)
306
        response = struct_from_binary(ua.BrowseResponse, data)
307
        self.logger.debug(response)
308
        response.ResponseHeader.ServiceResult.check()
309
        return response.Results
310
311
    async def browse_next(self, parameters):
312
        self.logger.debug("browse next")
313
        request = ua.BrowseNextRequest()
314
        request.Parameters = parameters
315
        data = await self.protocol.send_request(request)
316
        response = struct_from_binary(ua.BrowseNextResponse, data)
317
        self.logger.debug(response)
318
        response.ResponseHeader.ServiceResult.check()
319
        return response.Parameters.Results
320
321
    async def read(self, parameters):
322
        self.logger.debug("read")
323
        request = ua.ReadRequest()
324
        request.Parameters = parameters
325
        data = await self.protocol.send_request(request)
326
        response = struct_from_binary(ua.ReadResponse, data)
327
        self.logger.debug(response)
328
        response.ResponseHeader.ServiceResult.check()
329
        # cast to Enum attributes that need to
330
        for idx, rv in enumerate(parameters.NodesToRead):
331
            if rv.AttributeId == ua.AttributeIds.NodeClass:
332
                dv = response.Results[idx]
333
                if dv.StatusCode.is_good():
334
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
335
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
336
                dv = response.Results[idx]
337
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
338
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
339
        return response.Results
340
341
    async def write(self, params):
342
        self.logger.debug("write")
343
        request = ua.WriteRequest()
344
        request.Parameters = params
345
        data = await self.protocol.send_request(request)
346
        response = struct_from_binary(ua.WriteResponse, data)
347
        self.logger.debug(response)
348
        response.ResponseHeader.ServiceResult.check()
349
        return response.Results
350
351
    async def get_endpoints(self, params):
352
        self.logger.debug("get_endpoint")
353
        request = ua.GetEndpointsRequest()
354
        request.Parameters = params
355
        data = await self.protocol.send_request(request)
356
        response = struct_from_binary(ua.GetEndpointsResponse, data)
357
        self.logger.debug(response)
358
        response.ResponseHeader.ServiceResult.check()
359
        return response.Endpoints
360
361
    async def find_servers(self, params):
362
        self.logger.debug("find_servers")
363
        request = ua.FindServersRequest()
364
        request.Parameters = params
365
        data = await self.protocol.send_request(request)
366
        response = struct_from_binary(ua.FindServersResponse, data)
367
        self.logger.debug(response)
368
        response.ResponseHeader.ServiceResult.check()
369
        return response.Servers
370
371
    async def find_servers_on_network(self, params):
372
        self.logger.debug("find_servers_on_network")
373
        request = ua.FindServersOnNetworkRequest()
374
        request.Parameters = params
375
        data = await self.protocol.send_request(request)
376
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
377
        self.logger.debug(response)
378
        response.ResponseHeader.ServiceResult.check()
379
        return response.Parameters
380
381
    async def register_server(self, registered_server):
382
        self.logger.debug("register_server")
383
        request = ua.RegisterServerRequest()
384
        request.Server = registered_server
385
        data = await self.protocol.send_request(request)
386
        response = struct_from_binary(ua.RegisterServerResponse, data)
387
        self.logger.debug(response)
388
        response.ResponseHeader.ServiceResult.check()
389
        # nothing to return for this service
390
391
    async def register_server2(self, params):
392
        self.logger.debug("register_server2")
393
        request = ua.RegisterServer2Request()
394
        request.Parameters = params
395
        data = await self.protocol.send_request(request)
396
        response = struct_from_binary(ua.RegisterServer2Response, data)
397
        self.logger.debug(response)
398
        response.ResponseHeader.ServiceResult.check()
399
        return response.ConfigurationResults
400
401
    async def translate_browsepaths_to_nodeids(self, browse_paths):
402
        self.logger.debug("translate_browsepath_to_nodeid")
403
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
404
        request.Parameters.BrowsePaths = browse_paths
405
        data = await self.protocol.send_request(request)
406
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
407
        self.logger.debug(response)
408
        response.ResponseHeader.ServiceResult.check()
409
        return response.Results
410
411
    async def create_subscription(self, params, callback):
412
        self.logger.debug("create_subscription")
413
        request = ua.CreateSubscriptionRequest()
414
        request.Parameters = params
415
        data = await self.protocol.send_request(request)
416
        response = struct_from_binary(
417
            ua.CreateSubscriptionResponse,
418
            data
419
        )
420
        response.ResponseHeader.ServiceResult.check()
421
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
422
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
423
        if not self._publish_task or self._publish_task.done():
424
            # Start the publish loop if it is not yet running
425
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
426
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
427
            # could be used if necessary.
428
            self._publish_task = self.loop.create_task(self._publish_loop())
429
        return response.Parameters
430
431
    async def delete_subscriptions(self, subscription_ids):
432
        self.logger.debug("delete_subscriptions %r", subscription_ids)
433
        request = ua.DeleteSubscriptionsRequest()
434
        request.Parameters.SubscriptionIds = subscription_ids
435
        data = await self.protocol.send_request(request)
436
        response = struct_from_binary(
437
            ua.DeleteSubscriptionsResponse,
438
            data
439
        )
440
        response.ResponseHeader.ServiceResult.check()
441
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
442
        for sid in subscription_ids:
443
            self._subscription_callbacks.pop(sid)
444
        return response.Results
445
446
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
447
        """
448
        Send a PublishRequest to the server.
449
        """
450
        self.logger.debug('publish %r', acks)
451
        request = ua.PublishRequest()
452
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
453
        data = await self.protocol.send_request(request, timeout=0)
454
        self.protocol.check_answer(data, "while waiting for publish response")
455
        try:
456
            response = struct_from_binary(ua.PublishResponse, data)
457
        except Exception:
458
            self.logger.exception("Error parsing notification from server")
459
            raise UaStructParsingError
460
        return response
461
462
    async def _publish_loop(self):
463
        """
464
        Start a loop that sends a publish requests and waits for the publish responses.
465
        Forward the `PublishResult` to the matching `Subscription` by callback.
466
        """
467
        ack = None
468
        while True:
469
            try:
470
                response = await self.publish([ack] if ack else [])
471
            except BadTimeout:  # See Spec. Part 4, 7.28
472
                # Repeat without acknowledgement
473
                ack = None
474
                continue
475
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
476
                # BadNoSubscription is expected to be received after deleting the last subscription.
477
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
478
                # checking if there are no more subscriptions registered in this client (). A Publish response
479
                # could still arrive before the DeleteSubscription response.
480
                #
481
                # We could remove the callback already when sending the DeleteSubscription request,
482
                # but there are some legitimate reasons to keep them around, such as when the server
483
                # responds with "BadTimeout" and we should try again later instead of just removing
484
                # the subscription client-side.
485
                #
486
                # There are a variety of ways to act correctly, but the most practical solution seems
487
                # to be to just silently ignore any BadNoSubscription responses.
488
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
489
                # End task
490
                return
491
            except UaStructParsingError:
492
                ack = None
493
                continue
494
            subscription_id = response.Parameters.SubscriptionId
495
            if not subscription_id:
496
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
497
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
498
                # End task
499
                return
500
            try:
501
                callback = self._subscription_callbacks[subscription_id]
502
            except KeyError:
503
                self.logger.warning(
504
                    "Received data for unknown subscription %s active are %s", subscription_id,
505
                    self._subscription_callbacks.keys()
506
                )
507
            else:
508
                try:
509
                    callback(response.Parameters)
510
                except Exception:  # we call client code, catch everything!
511
                    self.logger.exception("Exception while calling user callback: %s")
512
            # Repeat with acknowledgement
513
            ack = ua.SubscriptionAcknowledgement()
514
            ack.SubscriptionId = subscription_id
515
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
516
517
    async def create_monitored_items(self, params):
518
        self.logger.info("create_monitored_items")
519
        request = ua.CreateMonitoredItemsRequest()
520
        request.Parameters = params
521
        data = await self.protocol.send_request(request)
522
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
523
        self.logger.debug(response)
524
        response.ResponseHeader.ServiceResult.check()
525
        return response.Results
526
527
    async def delete_monitored_items(self, params):
528
        self.logger.info("delete_monitored_items")
529
        request = ua.DeleteMonitoredItemsRequest()
530
        request.Parameters = params
531
        data = await self.protocol.send_request(request)
532
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
533
        self.logger.debug(response)
534
        response.ResponseHeader.ServiceResult.check()
535
        return response.Results
536
537
    async def add_nodes(self, nodestoadd):
538
        self.logger.info("add_nodes")
539
        request = ua.AddNodesRequest()
540
        request.Parameters.NodesToAdd = nodestoadd
541
        data = await self.protocol.send_request(request)
542
        response = struct_from_binary(ua.AddNodesResponse, data)
543
        self.logger.debug(response)
544
        response.ResponseHeader.ServiceResult.check()
545
        return response.Results
546
547
    async def add_references(self, refs):
548
        self.logger.info("add_references")
549
        request = ua.AddReferencesRequest()
550
        request.Parameters.ReferencesToAdd = refs
551
        data = await self.protocol.send_request(request)
552
        response = struct_from_binary(ua.AddReferencesResponse, data)
553
        self.logger.debug(response)
554
        response.ResponseHeader.ServiceResult.check()
555
        return response.Results
556
557
    async def delete_references(self, refs):
558
        self.logger.info("delete")
559
        request = ua.DeleteReferencesRequest()
560
        request.Parameters.ReferencesToDelete = refs
561
        data = await self.protocol.send_request(request)
562
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
563
        self.logger.debug(response)
564
        response.ResponseHeader.ServiceResult.check()
565
        return response.Parameters.Results
566
567
    async def delete_nodes(self, params):
568
        self.logger.info("delete_nodes")
569
        request = ua.DeleteNodesRequest()
570
        request.Parameters = params
571
        data = await self.protocol.send_request(request)
572
        response = struct_from_binary(ua.DeleteNodesResponse, data)
573
        self.logger.debug(response)
574
        response.ResponseHeader.ServiceResult.check()
575
        return response.Results
576
577
    async def call(self, methodstocall):
578
        request = ua.CallRequest()
579
        request.Parameters.MethodsToCall = methodstocall
580
        data = await self.protocol.send_request(request)
581
        response = struct_from_binary(ua.CallResponse, data)
582
        self.logger.debug(response)
583
        response.ResponseHeader.ServiceResult.check()
584
        return response.Results
585
586
    async def history_read(self, params):
587
        self.logger.info("history_read")
588
        request = ua.HistoryReadRequest()
589
        request.Parameters = params
590
        data = await self.protocol.send_request(request)
591
        response = struct_from_binary(ua.HistoryReadResponse, data)
592
        self.logger.debug(response)
593
        response.ResponseHeader.ServiceResult.check()
594
        return response.Results
595
596
    async def modify_monitored_items(self, params):
597
        self.logger.info("modify_monitored_items")
598
        request = ua.ModifyMonitoredItemsRequest()
599
        request.Parameters = params
600
        data = await self.protocol.send_request(request)
601
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
602
        self.logger.debug(response)
603
        response.ResponseHeader.ServiceResult.check()
604
        return response.Results
605
606
    async def register_nodes(self, nodes):
607
        self.logger.info("register_nodes")
608
        request = ua.RegisterNodesRequest()
609
        request.Parameters.NodesToRegister = nodes
610
        data = await self.protocol.send_request(request)
611
        response = struct_from_binary(ua.RegisterNodesResponse, data)
612
        self.logger.debug(response)
613
        response.ResponseHeader.ServiceResult.check()
614
        return response.Parameters.RegisteredNodeIds
615
616
    async def unregister_nodes(self, nodes):
617
        self.logger.info("unregister_nodes")
618
        request = ua.UnregisterNodesRequest()
619
        request.Parameters.NodesToUnregister = nodes
620
        data = await self.protocol.send_request(request)
621
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
622
        self.logger.debug(response)
623
        response.ResponseHeader.ServiceResult.check()
624
        # nothing to return for this service
625
626
    async def get_attribute(self, nodes, attr):
627
        self.logger.info("get_attribute")
628
        request = ua.ReadRequest()
629
        for node in nodes:
630
            rv = ua.ReadValueId()
631
            rv.NodeId = node
632
            rv.AttributeId = attr
633
            request.Parameters.NodesToRead.append(rv)
634
        data = await self.protocol.send_request(request)
635
        response = struct_from_binary(ua.ReadResponse, data)
636
        response.ResponseHeader.ServiceResult.check()
637
        return response.Results
638