Passed
Pull Request — master (#45)
by
unknown
02:29
created

asyncua.client.ua_client.UaClient.close_session()   B

Complexity

Conditions 6

Size

Total Lines 19
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 15
nop 2
dl 0
loc 19
rs 8.6666
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
25
        self.loop = loop or asyncio.get_event_loop()
26
        self.transport = None
27
        self.receive_buffer: Optional[bytes] = None
28
        self.is_receiving = False
29
        self.timeout = timeout
30
        self.authentication_token = ua.NodeId()
31
        self._request_id = 0
32
        self._request_handle = 0
33
        self._callbackmap: Dict[int, asyncio.Future] = {}
34
        self._connection = SecureConnection(security_policy)
35
        self.state = self.INITIALIZED
36
37
    def connection_made(self, transport: asyncio.Transport):
38
        self.state = self.OPEN
39
        self.transport = transport
40
41
    def connection_lost(self, exc):
42
        self.logger.info("Socket has closed connection")
43
        self.state = self.CLOSED
44
        self.transport = None
45
46
    def data_received(self, data: bytes):
47
        if self.receive_buffer:
48
            data = self.receive_buffer + data
49
            self.receive_buffer = None
50
        self._process_received_data(data)
51
52
    def _process_received_data(self, data: bytes):
53
        """
54
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
55
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
56
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
57
        """
58
        buf = ua.utils.Buffer(data)
59
        while True:
60
            try:
61
                try:
62
                    header = header_from_binary(buf)
63
                except ua.utils.NotEnoughData:
64
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
65
                    self.receive_buffer = data
66
                    return
67
                if len(buf) < header.body_size:
68
                    self.logger.debug(
69
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
70
                    )
71
                    self.receive_buffer = data
72
                    return
73
                msg = self._connection.receive_from_header_and_body(header, buf)
74
                self._process_received_message(msg)
75
                if not buf:
76
                    return
77
            except Exception:
78
                self.logger.exception('Exception raised while parsing message from server')
79
                self.disconnect_socket()
80
                return
81
82
    def _process_received_message(self, msg):
83
        if msg is None:
84
            pass
85
        elif isinstance(msg, ua.Message):
86
            self._call_callback(msg.request_id(), msg.body())
87
        elif isinstance(msg, ua.Acknowledge):
88
            self._call_callback(0, msg)
89
        elif isinstance(msg, ua.ErrorMessage):
90
            self.logger.fatal("Received an error: %r", msg)
91
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
92
        else:
93
            raise ua.UaError(f"Unsupported message type: {msg}")
94
95
    def _send_request(self, request, timeout=1000, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
96
        """
97
        Send request to server, lower-level method.
98
        Timeout is the timeout written in ua header.
99
        Returns future
100
        """
101
        request.RequestHeader = self._create_request_header(timeout)
102
        self.logger.debug('Sending: %s', request)
103
        try:
104
            binreq = struct_to_binary(request)
105
        except Exception:
106
            # reset request handle if any error
107
            # see self._create_request_header
108
            self._request_handle -= 1
109
            raise
110
        self._request_id += 1
111
        future = self.loop.create_future()
112
        self._callbackmap[self._request_id] = future
113
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
114
        self.transport.write(msg)
115
        return future
116
117
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
118
        """
119
        Send a request to the server.
120
        Timeout is the timeout written in ua header.
121
        Returns response object if no callback is provided.
122
        """
123
        data = await asyncio.wait_for(
124
            self._send_request(request, timeout, message_type),
125
            timeout if timeout else None
126
        )
127
        self.check_answer(data, f" in response to {request.__class__.__name__}")
128
        return data
129
130
    def check_answer(self, data, context):
131
        data = data.copy()
132
        typeid = nodeid_from_binary(data)
133
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
134
            self.logger.warning("ServiceFault from server received %s", context)
135
            hdr = struct_from_binary(ua.ResponseHeader, data)
136
            hdr.ServiceResult.check()
137
            return False
138
        return True
139
140
    def _call_callback(self, request_id, body):
141
        try:
142
            self._callbackmap[request_id].set_result(body)
143
        except KeyError:
144
            raise ua.UaError(
145
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
146
            )
147
        except asyncio.InvalidStateError:
148
            raise ua.UaError(f"Future for request id {request_id} is already done")
149
150
    def _create_request_header(self, timeout=1000):
151
        hdr = ua.RequestHeader()
152
        hdr.AuthenticationToken = self.authentication_token
153
        self._request_handle += 1
154
        hdr.RequestHandle = self._request_handle
155
        hdr.TimeoutHint = timeout
156
        return hdr
157
158
    def disconnect_socket(self):
159
        self.logger.info("Request to close socket received")
160
        if self.transport:
161
            self.transport.close()
162
        else:
163
            self.logger.warning("disconnect_socket was called but transport is None")
164
165
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
166
        hello = ua.Hello()
167
        hello.EndpointUrl = url
168
        hello.MaxMessageSize = max_messagesize
169
        hello.MaxChunkCount = max_chunkcount
170
        ack = asyncio.Future()
171
        self._callbackmap[0] = ack
172
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
173
        return await asyncio.wait_for(ack, self.timeout)
174
175
    async def open_secure_channel(self, params):
176
        self.logger.info("open_secure_channel")
177
        request = ua.OpenSecureChannelRequest()
178
        request.Parameters = params
179
        result = await asyncio.wait_for(
180
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
181
            self.timeout
182
        )
183
        # FIXME: we have a race condition here
184
        # we can get a packet with the new token id before we reach to store it..
185
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
186
        response.ResponseHeader.ServiceResult.check()
187
        self._connection.set_channel(response.Parameters)
188
        return response.Parameters
189
190
    async def close_secure_channel(self):
191
        """
192
        Close secure channel.
193
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
194
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
195
        and should just close socket.
196
        """
197
        self.logger.info("close_secure_channel")
198
        request = ua.CloseSecureChannelRequest()
199
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
200
        # don't expect any more answers
201
        future.cancel()
202
        self._callbackmap.clear()
203
        # some servers send a response here, most do not ... so we ignore
204
205
206
class UaClient:
207
    """
208
    low level OPC-UA client.
209
210
    It implements (almost) all methods defined in asyncua spec
211
    taking in argument the structures defined in asyncua spec.
212
213
    In this Python implementation  most of the structures are defined in
214
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
215
    """
216
217
    def __init__(self, timeout=1, loop=None):
218
        self.logger = logging.getLogger(f'{__name__}.UaClient')
219
        self.loop = loop or asyncio.get_event_loop()
220
        self._subscription_callbacks = {}
221
        self._timeout = timeout
222
        self.security_policy = ua.SecurityPolicy()
223
        self.protocol: Optional[UASocketProtocol] = None
224
        self._publish_task = None
225
226
    def set_security(self, policy: ua.SecurityPolicy):
227
        self.security_policy = policy
228
229
    def _make_protocol(self):
230
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
231
        return self.protocol
232
233
    async def connect_socket(self, host: str, port: int):
234
        """Connect to server socket."""
235
        self.logger.info("opening connection")
236
        await self.loop.create_connection(self._make_protocol, host, port)
237
238
    def disconnect_socket(self):
239
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
240
            self.logger.warning("disconnect_socket was called but connection is closed")
241
            return None
242
        return self.protocol.disconnect_socket()
243
244
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
245
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
246
247
    async def open_secure_channel(self, params):
248
        return await self.protocol.open_secure_channel(params)
249
250
    async def close_secure_channel(self):
251
        """
252
        close secure channel. It seems to trigger a shutdown of socket
253
        in most servers, so be prepare to reconnect
254
        """
255
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
256
            self.logger.warning("close_secure_channel was called but connection is closed")
257
            return
258
        return await self.protocol.close_secure_channel()
259
260
    async def create_session(self, parameters):
261
        self.logger.info("create_session")
262
        request = ua.CreateSessionRequest()
263
        request.Parameters = parameters
264
        data = await self.protocol.send_request(request)
265
        response = struct_from_binary(ua.CreateSessionResponse, data)
266
        self.logger.debug(response)
267
        response.ResponseHeader.ServiceResult.check()
268
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
269
        return response.Parameters
270
271
    async def activate_session(self, parameters):
272
        self.logger.info("activate_session")
273
        request = ua.ActivateSessionRequest()
274
        request.Parameters = parameters
275
        data = await self.protocol.send_request(request)
276
        response = struct_from_binary(ua.ActivateSessionResponse, data)
277
        self.logger.debug(response)
278
        response.ResponseHeader.ServiceResult.check()
279
        return response.Parameters
280
281
    async def close_session(self, delete_subscriptions):
282
        self.logger.info("close_session")
283
        if self._publish_task and not self._publish_task.done():
284
            self._publish_task.cancel()
285
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
286
            self.logger.warning("close_session was called but connection is closed")
287
            return
288
        request = ua.CloseSessionRequest()
289
        request.DeleteSubscriptions = delete_subscriptions
290
        data = await self.protocol.send_request(request)
291
        response = struct_from_binary(ua.CloseSessionResponse, data)
292
        try:
293
            response.ResponseHeader.ServiceResult.check()
294
        except BadSessionClosed:
295
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
296
            #          we can just ignore it therefore.
297
            #          Alternatively we could make sure that there are no publish requests in flight when
298
            #          closing the session.
299
            pass
300
301
    async def browse(self, parameters):
302
        self.logger.info("browse")
303
        request = ua.BrowseRequest()
304
        request.Parameters = parameters
305
        data = await self.protocol.send_request(request)
306
        response = struct_from_binary(ua.BrowseResponse, data)
307
        self.logger.debug(response)
308
        response.ResponseHeader.ServiceResult.check()
309
        return response.Results
310
311
    async def browse_next(self, parameters):
312
        self.logger.debug("browse next")
313
        request = ua.BrowseNextRequest()
314
        request.Parameters = parameters
315
        data = await self.protocol.send_request(request)
316
        response = struct_from_binary(ua.BrowseNextResponse, data)
317
        self.logger.debug(response)
318
        response.ResponseHeader.ServiceResult.check()
319
        return response.Parameters.Results
320
321
    async def read(self, parameters):
322
        self.logger.debug("read")
323
        request = ua.ReadRequest()
324
        request.Parameters = parameters
325
        data = await self.protocol.send_request(request)
326
        response = struct_from_binary(ua.ReadResponse, data)
327
        self.logger.debug(response)
328
        response.ResponseHeader.ServiceResult.check()
329
        # cast to Enum attributes that need to
330
        for idx, rv in enumerate(parameters.NodesToRead):
331
            if rv.AttributeId == ua.AttributeIds.NodeClass:
332
                dv = response.Results[idx]
333
                if dv.StatusCode.is_good():
334
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
335
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
336
                dv = response.Results[idx]
337
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
338
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
339
        return response.Results
340
341
    async def write(self, params):
342
        self.logger.debug("write")
343
        request = ua.WriteRequest()
344
        request.Parameters = params
345
        data = await self.protocol.send_request(request)
346
        response = struct_from_binary(ua.WriteResponse, data)
347
        self.logger.debug(response)
348
        response.ResponseHeader.ServiceResult.check()
349
        return response.Results
350
351
    async def get_endpoints(self, params):
352
        self.logger.debug("get_endpoint")
353
        request = ua.GetEndpointsRequest()
354
        request.Parameters = params
355
        data = await self.protocol.send_request(request)
356
        response = struct_from_binary(ua.GetEndpointsResponse, data)
357
        self.logger.debug(response)
358
        response.ResponseHeader.ServiceResult.check()
359
        return response.Endpoints
360
361
    async def find_servers(self, params):
362
        self.logger.debug("find_servers")
363
        request = ua.FindServersRequest()
364
        request.Parameters = params
365
        data = await self.protocol.send_request(request)
366
        response = struct_from_binary(ua.FindServersResponse, data)
367
        self.logger.debug(response)
368
        response.ResponseHeader.ServiceResult.check()
369
        return response.Servers
370
371
    async def find_servers_on_network(self, params):
372
        self.logger.debug("find_servers_on_network")
373
        request = ua.FindServersOnNetworkRequest()
374
        request.Parameters = params
375
        data = await self.protocol.send_request(request)
376
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
377
        self.logger.debug(response)
378
        response.ResponseHeader.ServiceResult.check()
379
        return response.Parameters
380
381
    async def register_server(self, registered_server):
382
        self.logger.debug("register_server")
383
        request = ua.RegisterServerRequest()
384
        request.Server = registered_server
385
        data = await self.protocol.send_request(request)
386
        response = struct_from_binary(ua.RegisterServerResponse, data)
387
        self.logger.debug(response)
388
        response.ResponseHeader.ServiceResult.check()
389
        # nothing to return for this service
390
391
    async def register_server2(self, params):
392
        self.logger.debug("register_server2")
393
        request = ua.RegisterServer2Request()
394
        request.Parameters = params
395
        data = await self.protocol.send_request(request)
396
        response = struct_from_binary(ua.RegisterServer2Response, data)
397
        self.logger.debug(response)
398
        response.ResponseHeader.ServiceResult.check()
399
        return response.ConfigurationResults
400
401
    async def translate_browsepaths_to_nodeids(self, browse_paths):
402
        self.logger.debug("translate_browsepath_to_nodeid")
403
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
404
        request.Parameters.BrowsePaths = browse_paths
405
        data = await self.protocol.send_request(request)
406
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
407
        self.logger.debug(response)
408
        response.ResponseHeader.ServiceResult.check()
409
        return response.Results
410
411
    async def create_subscription(self, params, callback):
412
        self.logger.debug("create_subscription")
413
        request = ua.CreateSubscriptionRequest()
414
        request.Parameters = params
415
        data = await self.protocol.send_request(request)
416
        response = struct_from_binary(
417
            ua.CreateSubscriptionResponse,
418
            data
419
        )
420
        response.ResponseHeader.ServiceResult.check()
421
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
422
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
423
        if not self._publish_task or self._publish_task.done():
424
            # Start the publish cycle if it is not yet running
425
            self._publish_task = self.loop.create_task(self._publish())
426
        return response.Parameters
427
428
    async def delete_subscriptions(self, subscription_ids):
429
        self.logger.debug("delete_subscriptions %r", subscription_ids)
430
        request = ua.DeleteSubscriptionsRequest()
431
        request.Parameters.SubscriptionIds = subscription_ids
432
        data = await self.protocol.send_request(request)
433
        response = struct_from_binary(
434
            ua.DeleteSubscriptionsResponse,
435
            data
436
        )
437
        response.ResponseHeader.ServiceResult.check()
438
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
439
        for sid in subscription_ids:
440
            self._subscription_callbacks.pop(sid)
441
        return response.Results
442
443
    def publish(self):
444
        """
445
        This is just for maintaining symmetry.
446
        This client takes care of sending PublishRequest internally.
447
        """
448
        pass
449
450
    async def _publish(self):
451
        """
452
        Send publish request and wait for the publish response in an endless loop.
453
        Forward the `PublishResult` to the matching `Subscription` by callback.
454
        """
455
        ack = None
456
        while True:
457
            request = ua.PublishRequest()
458
            request.Parameters.SubscriptionAcknowledgements = [ack] if ack else []
459
            data = await self.protocol.send_request(request, timeout=0)
460
            try:
461
                self.protocol.check_answer(data, "while waiting for publish response")
462
            except BadTimeout:  # See Spec. Part 4, 7.28
463
                # Repeat without acknowledgement
464
                ack = None
465
                continue
466
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
467
                # BadNoSubscription is expected to be received after deleting the last subscription.
468
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
469
                # checking if there are no more subscriptions registered in this client (). A Publish response
470
                # could still arrive before the DeleteSubscription response.
471
                #
472
                # We could remove the callback already when sending the DeleteSubscription request,
473
                # but there are some legitimate reasons to keep them around, such as when the server
474
                # responds with "BadTimeout" and we should try again later instead of just removing
475
                # the subscription client-side.
476
                #
477
                # There are a variety of ways to act correctly, but the most practical solution seems
478
                # to be to just silently ignore any BadNoSubscription responses.
479
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
480
                # End task
481
                return
482
            # parse publish response
483
            try:
484
                response = struct_from_binary(ua.PublishResponse, data)
485
            except Exception:
486
                # INFO: catching the exception here might be obsolete because we already
487
                #       catch BadTimeout above. However, it's not really clear what this code
488
                #       does so it stays in, doesn't seem to hurt.
489
                self.logger.exception("Error parsing notification from server")
490
                # send publish request to server so he does stop sending notifications
491
                ack = None
492
                continue
493
            # look for matching subscription callback
494
            subscription_id = response.Parameters.SubscriptionId
495
            try:
496
                callback = self._subscription_callbacks[subscription_id]
497
            except KeyError:
498
                self.logger.warning("Received data for unknown subscription %s active are %s", subscription_id,
499
                                    self._subscription_callbacks.keys())
500
            else:
501
                try:
502
                    callback(response.Parameters)
503
                except Exception:  # we call client code, catch everything!
504
                    self.logger.exception("Exception while calling user callback: %s")
505
            # Repeat with acknowledgement
506
            ack = ua.SubscriptionAcknowledgement()
507
            ack.SubscriptionId = subscription_id
508
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
509
510
    async def create_monitored_items(self, params):
511
        self.logger.info("create_monitored_items")
512
        request = ua.CreateMonitoredItemsRequest()
513
        request.Parameters = params
514
        data = await self.protocol.send_request(request)
515
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
516
        self.logger.debug(response)
517
        response.ResponseHeader.ServiceResult.check()
518
        return response.Results
519
520
    async def delete_monitored_items(self, params):
521
        self.logger.info("delete_monitored_items")
522
        request = ua.DeleteMonitoredItemsRequest()
523
        request.Parameters = params
524
        data = await self.protocol.send_request(request)
525
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
526
        self.logger.debug(response)
527
        response.ResponseHeader.ServiceResult.check()
528
        return response.Results
529
530
    async def add_nodes(self, nodestoadd):
531
        self.logger.info("add_nodes")
532
        request = ua.AddNodesRequest()
533
        request.Parameters.NodesToAdd = nodestoadd
534
        data = await self.protocol.send_request(request)
535
        response = struct_from_binary(ua.AddNodesResponse, data)
536
        self.logger.debug(response)
537
        response.ResponseHeader.ServiceResult.check()
538
        return response.Results
539
540
    async def add_references(self, refs):
541
        self.logger.info("add_references")
542
        request = ua.AddReferencesRequest()
543
        request.Parameters.ReferencesToAdd = refs
544
        data = await self.protocol.send_request(request)
545
        response = struct_from_binary(ua.AddReferencesResponse, data)
546
        self.logger.debug(response)
547
        response.ResponseHeader.ServiceResult.check()
548
        return response.Results
549
550
    async def delete_references(self, refs):
551
        self.logger.info("delete")
552
        request = ua.DeleteReferencesRequest()
553
        request.Parameters.ReferencesToDelete = refs
554
        data = await self.protocol.send_request(request)
555
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
556
        self.logger.debug(response)
557
        response.ResponseHeader.ServiceResult.check()
558
        return response.Parameters.Results
559
560
    async def delete_nodes(self, params):
561
        self.logger.info("delete_nodes")
562
        request = ua.DeleteNodesRequest()
563
        request.Parameters = params
564
        data = await self.protocol.send_request(request)
565
        response = struct_from_binary(ua.DeleteNodesResponse, data)
566
        self.logger.debug(response)
567
        response.ResponseHeader.ServiceResult.check()
568
        return response.Results
569
570
    async def call(self, methodstocall):
571
        request = ua.CallRequest()
572
        request.Parameters.MethodsToCall = methodstocall
573
        data = await self.protocol.send_request(request)
574
        response = struct_from_binary(ua.CallResponse, data)
575
        self.logger.debug(response)
576
        response.ResponseHeader.ServiceResult.check()
577
        return response.Results
578
579
    async def history_read(self, params):
580
        self.logger.info("history_read")
581
        request = ua.HistoryReadRequest()
582
        request.Parameters = params
583
        data = await self.protocol.send_request(request)
584
        response = struct_from_binary(ua.HistoryReadResponse, data)
585
        self.logger.debug(response)
586
        response.ResponseHeader.ServiceResult.check()
587
        return response.Results
588
589
    async def modify_monitored_items(self, params):
590
        self.logger.info("modify_monitored_items")
591
        request = ua.ModifyMonitoredItemsRequest()
592
        request.Parameters = params
593
        data = await self.protocol.send_request(request)
594
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
595
        self.logger.debug(response)
596
        response.ResponseHeader.ServiceResult.check()
597
        return response.Results
598
599
    async def register_nodes(self, nodes):
600
        self.logger.info("register_nodes")
601
        request = ua.RegisterNodesRequest()
602
        request.Parameters.NodesToRegister = nodes
603
        data = await self.protocol.send_request(request)
604
        response = struct_from_binary(ua.RegisterNodesResponse, data)
605
        self.logger.debug(response)
606
        response.ResponseHeader.ServiceResult.check()
607
        return response.Parameters.RegisteredNodeIds
608
609
    async def unregister_nodes(self, nodes):
610
        self.logger.info("unregister_nodes")
611
        request = ua.UnregisterNodesRequest()
612
        request.Parameters.NodesToUnregister = nodes
613
        data = await self.protocol.send_request(request)
614
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
615
        self.logger.debug(response)
616
        response.ResponseHeader.ServiceResult.check()
617
        # nothing to return for this service
618
619
    async def get_attribute(self, nodes, attr):
620
        self.logger.info("get_attribute")
621
        request = ua.ReadRequest()
622
        for node in nodes:
623
            rv = ua.ReadValueId()
624
            rv.NodeId = node
625
            rv.AttributeId = attr
626
            request.Parameters.NodesToRead.append(rv)
627
        data = await self.protocol.send_request(request)
628
        response = struct_from_binary(ua.ReadResponse, data)
629
        response.ResponseHeader.ServiceResult.check()
630
        return response.Results
631