Passed
Push — master ( 3fb232...b005e0 )
by Olivier
02:21
created

asyncua.client.ua_client.UaClient.__init__()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 3
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        """
25
        :param timeout: Timeout in seconds
26
        :param security_policy: Security policy (optional)
27
        :param loop: Event loop (optional)
28
        """
29
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
30
        self.loop = loop or asyncio.get_event_loop()
31
        self.transport = None
32
        self.receive_buffer: Optional[bytes] = None
33
        self.is_receiving = False
34
        self.timeout = timeout
35
        self.authentication_token = ua.NodeId()
36
        self._request_id = 0
37
        self._request_handle = 0
38
        self._callbackmap: Dict[int, asyncio.Future] = {}
39
        self._connection = SecureConnection(security_policy)
40
        self.state = self.INITIALIZED
41
42
    def connection_made(self, transport: asyncio.Transport):
43
        self.state = self.OPEN
44
        self.transport = transport
45
46
    def connection_lost(self, exc):
47
        self.logger.info("Socket has closed connection")
48
        self.state = self.CLOSED
49
        self.transport = None
50
51
    def data_received(self, data: bytes):
52
        if self.receive_buffer:
53
            data = self.receive_buffer + data
54
            self.receive_buffer = None
55
        self._process_received_data(data)
56
57
    def _process_received_data(self, data: bytes):
58
        """
59
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
60
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
61
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
62
        """
63
        buf = ua.utils.Buffer(data)
64
        while True:
65
            try:
66
                try:
67
                    header = header_from_binary(buf)
68
                except ua.utils.NotEnoughData:
69
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
70
                    self.receive_buffer = data
71
                    return
72
                if len(buf) < header.body_size:
73
                    self.logger.debug(
74
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
75
                    )
76
                    self.receive_buffer = data
77
                    return
78
                msg = self._connection.receive_from_header_and_body(header, buf)
79
                self._process_received_message(msg)
80
                if not buf:
81
                    return
82
            except Exception:
83
                self.logger.exception('Exception raised while parsing message from server')
84
                self.disconnect_socket()
85
                return
86
87
    def _process_received_message(self, msg):
88
        if msg is None:
89
            pass
90
        elif isinstance(msg, ua.Message):
91
            self._call_callback(msg.request_id(), msg.body())
92
        elif isinstance(msg, ua.Acknowledge):
93
            self._call_callback(0, msg)
94
        elif isinstance(msg, ua.ErrorMessage):
95
            self.logger.fatal("Received an error: %r", msg)
96
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
97
        else:
98
            raise ua.UaError(f"Unsupported message type: {msg}")
99
100
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
101
        """
102
        Send request to server, lower-level method.
103
        Timeout is the timeout written in ua header.
104
        :param request: Request
105
        :param timeout: Timeout in seconds
106
        :param message_type: UA Message Type (optional)
107
        :return: Future that resolves with the Response
108
        """
109
        request.RequestHeader = self._create_request_header(timeout)
110
        self.logger.debug('Sending: %s', request)
111
        try:
112
            binreq = struct_to_binary(request)
113
        except Exception:
114
            # reset request handle if any error
115
            # see self._create_request_header
116
            self._request_handle -= 1
117
            raise
118
        self._request_id += 1
119
        future = self.loop.create_future()
120
        self._callbackmap[self._request_id] = future
121
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
122
        self.transport.write(msg)
123
        return future
124
125
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
126
        """
127
        Send a request to the server.
128
        Timeout is the timeout written in ua header.
129
        Returns response object if no callback is provided.
130
        """
131
        data = await asyncio.wait_for(
132
            self._send_request(request, timeout, message_type),
133
            timeout if timeout else None
134
        )
135
        self.check_answer(data, f" in response to {request.__class__.__name__}")
136
        return data
137
138
    def check_answer(self, data, context):
139
        data = data.copy()
140
        typeid = nodeid_from_binary(data)
141
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
142
            self.logger.warning("ServiceFault from server received %s", context)
143
            hdr = struct_from_binary(ua.ResponseHeader, data)
144
            hdr.ServiceResult.check()
145
            return False
146
        return True
147
148
    def _call_callback(self, request_id, body):
149
        try:
150
            self._callbackmap[request_id].set_result(body)
151
        except KeyError:
152
            raise ua.UaError(
153
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
154
            )
155
        except asyncio.InvalidStateError:
156
            raise ua.UaError(f"Future for request id {request_id} is already done")
157
158
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
159
        """
160
        :param timeout: Timeout in seconds
161
        :return: Request header
162
        """
163
        hdr = ua.RequestHeader()
164
        hdr.AuthenticationToken = self.authentication_token
165
        self._request_handle += 1
166
        hdr.RequestHandle = self._request_handle
167
        hdr.TimeoutHint = timeout * 1000
168
        return hdr
169
170
    def disconnect_socket(self):
171
        self.logger.info("Request to close socket received")
172
        if self.transport:
173
            self.transport.close()
174
        else:
175
            self.logger.warning("disconnect_socket was called but transport is None")
176
177
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
178
        hello = ua.Hello()
179
        hello.EndpointUrl = url
180
        hello.MaxMessageSize = max_messagesize
181
        hello.MaxChunkCount = max_chunkcount
182
        ack = asyncio.Future()
183
        self._callbackmap[0] = ack
184
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
185
        return await asyncio.wait_for(ack, self.timeout)
186
187
    async def open_secure_channel(self, params):
188
        self.logger.info("open_secure_channel")
189
        request = ua.OpenSecureChannelRequest()
190
        request.Parameters = params
191
        result = await asyncio.wait_for(
192
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
193
            self.timeout
194
        )
195
        # FIXME: we have a race condition here
196
        # we can get a packet with the new token id before we reach to store it..
197
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
198
        response.ResponseHeader.ServiceResult.check()
199
        self._connection.set_channel(response.Parameters)
200
        return response.Parameters
201
202
    async def close_secure_channel(self):
203
        """
204
        Close secure channel.
205
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
206
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
207
        and should just close socket.
208
        """
209
        self.logger.info("close_secure_channel")
210
        request = ua.CloseSecureChannelRequest()
211
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
212
        # don't expect any more answers
213
        future.cancel()
214
        self._callbackmap.clear()
215
        # some servers send a response here, most do not ... so we ignore
216
217
218
class UaClient:
219
    """
220
    low level OPC-UA client.
221
222
    It implements (almost) all methods defined in asyncua spec
223
    taking in argument the structures defined in asyncua spec.
224
225
    In this Python implementation  most of the structures are defined in
226
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
227
    """
228
229
    def __init__(self, timeout=1, loop=None):
230
        """
231
        :param timeout: Timout in seconds
232
        :param loop: Event loop (optional)
233
        """
234
        self.logger = logging.getLogger(f'{__name__}.UaClient')
235
        self.loop = loop or asyncio.get_event_loop()
236
        self._subscription_callbacks = {}
237
        self._timeout = timeout
238
        self.security_policy = ua.SecurityPolicy()
239
        self.protocol: Optional[UASocketProtocol] = None
240
        self._publish_task = None
241
242
    def set_security(self, policy: ua.SecurityPolicy):
243
        self.security_policy = policy
244
245
    def _make_protocol(self):
246
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
247
        return self.protocol
248
249
    async def connect_socket(self, host: str, port: int):
250
        """Connect to server socket."""
251
        self.logger.info("opening connection")
252
        await self.loop.create_connection(self._make_protocol, host, port)
253
254
    def disconnect_socket(self):
255
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
256
            self.logger.warning("disconnect_socket was called but connection is closed")
257
            return None
258
        return self.protocol.disconnect_socket()
259
260
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
261
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
262
263
    async def open_secure_channel(self, params):
264
        return await self.protocol.open_secure_channel(params)
265
266
    async def close_secure_channel(self):
267
        """
268
        close secure channel. It seems to trigger a shutdown of socket
269
        in most servers, so be prepare to reconnect
270
        """
271
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
272
            self.logger.warning("close_secure_channel was called but connection is closed")
273
            return
274
        return await self.protocol.close_secure_channel()
275
276
    async def create_session(self, parameters):
277
        self.logger.info("create_session")
278
        request = ua.CreateSessionRequest()
279
        request.Parameters = parameters
280
        data = await self.protocol.send_request(request)
281
        response = struct_from_binary(ua.CreateSessionResponse, data)
282
        self.logger.debug(response)
283
        response.ResponseHeader.ServiceResult.check()
284
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
285
        return response.Parameters
286
287
    async def activate_session(self, parameters):
288
        self.logger.info("activate_session")
289
        request = ua.ActivateSessionRequest()
290
        request.Parameters = parameters
291
        data = await self.protocol.send_request(request)
292
        response = struct_from_binary(ua.ActivateSessionResponse, data)
293
        self.logger.debug(response)
294
        response.ResponseHeader.ServiceResult.check()
295
        return response.Parameters
296
297
    async def close_session(self, delete_subscriptions):
298
        self.logger.info("close_session")
299
        if self._publish_task and not self._publish_task.done():
300
            self._publish_task.cancel()
301
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
302
            self.logger.warning("close_session was called but connection is closed")
303
            return
304
        request = ua.CloseSessionRequest()
305
        request.DeleteSubscriptions = delete_subscriptions
306
        data = await self.protocol.send_request(request)
307
        response = struct_from_binary(ua.CloseSessionResponse, data)
308
        try:
309
            response.ResponseHeader.ServiceResult.check()
310
        except BadSessionClosed:
311
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
312
            #          we can just ignore it therefore.
313
            #          Alternatively we could make sure that there are no publish requests in flight when
314
            #          closing the session.
315
            pass
316
317
    async def browse(self, parameters):
318
        self.logger.info("browse")
319
        request = ua.BrowseRequest()
320
        request.Parameters = parameters
321
        data = await self.protocol.send_request(request)
322
        response = struct_from_binary(ua.BrowseResponse, data)
323
        self.logger.debug(response)
324
        response.ResponseHeader.ServiceResult.check()
325
        return response.Results
326
327
    async def browse_next(self, parameters):
328
        self.logger.debug("browse next")
329
        request = ua.BrowseNextRequest()
330
        request.Parameters = parameters
331
        data = await self.protocol.send_request(request)
332
        response = struct_from_binary(ua.BrowseNextResponse, data)
333
        self.logger.debug(response)
334
        response.ResponseHeader.ServiceResult.check()
335
        return response.Parameters.Results
336
337
    async def read(self, parameters):
338
        self.logger.debug("read")
339
        request = ua.ReadRequest()
340
        request.Parameters = parameters
341
        data = await self.protocol.send_request(request)
342
        response = struct_from_binary(ua.ReadResponse, data)
343
        self.logger.debug(response)
344
        response.ResponseHeader.ServiceResult.check()
345
        # cast to Enum attributes that need to
346
        for idx, rv in enumerate(parameters.NodesToRead):
347
            if rv.AttributeId == ua.AttributeIds.NodeClass:
348
                dv = response.Results[idx]
349
                if dv.StatusCode.is_good():
350
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
351
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
352
                dv = response.Results[idx]
353
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
354
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
355
        return response.Results
356
357
    async def write(self, params):
358
        self.logger.debug("write")
359
        request = ua.WriteRequest()
360
        request.Parameters = params
361
        data = await self.protocol.send_request(request)
362
        response = struct_from_binary(ua.WriteResponse, data)
363
        self.logger.debug(response)
364
        response.ResponseHeader.ServiceResult.check()
365
        return response.Results
366
367
    async def get_endpoints(self, params):
368
        self.logger.debug("get_endpoint")
369
        request = ua.GetEndpointsRequest()
370
        request.Parameters = params
371
        data = await self.protocol.send_request(request)
372
        response = struct_from_binary(ua.GetEndpointsResponse, data)
373
        self.logger.debug(response)
374
        response.ResponseHeader.ServiceResult.check()
375
        return response.Endpoints
376
377
    async def find_servers(self, params):
378
        self.logger.debug("find_servers")
379
        request = ua.FindServersRequest()
380
        request.Parameters = params
381
        data = await self.protocol.send_request(request)
382
        response = struct_from_binary(ua.FindServersResponse, data)
383
        self.logger.debug(response)
384
        response.ResponseHeader.ServiceResult.check()
385
        return response.Servers
386
387
    async def find_servers_on_network(self, params):
388
        self.logger.debug("find_servers_on_network")
389
        request = ua.FindServersOnNetworkRequest()
390
        request.Parameters = params
391
        data = await self.protocol.send_request(request)
392
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
393
        self.logger.debug(response)
394
        response.ResponseHeader.ServiceResult.check()
395
        return response.Parameters
396
397
    async def register_server(self, registered_server):
398
        self.logger.debug("register_server")
399
        request = ua.RegisterServerRequest()
400
        request.Server = registered_server
401
        data = await self.protocol.send_request(request)
402
        response = struct_from_binary(ua.RegisterServerResponse, data)
403
        self.logger.debug(response)
404
        response.ResponseHeader.ServiceResult.check()
405
        # nothing to return for this service
406
407
    async def register_server2(self, params):
408
        self.logger.debug("register_server2")
409
        request = ua.RegisterServer2Request()
410
        request.Parameters = params
411
        data = await self.protocol.send_request(request)
412
        response = struct_from_binary(ua.RegisterServer2Response, data)
413
        self.logger.debug(response)
414
        response.ResponseHeader.ServiceResult.check()
415
        return response.ConfigurationResults
416
417
    async def translate_browsepaths_to_nodeids(self, browse_paths):
418
        self.logger.debug("translate_browsepath_to_nodeid")
419
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
420
        request.Parameters.BrowsePaths = browse_paths
421
        data = await self.protocol.send_request(request)
422
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
423
        self.logger.debug(response)
424
        response.ResponseHeader.ServiceResult.check()
425
        return response.Results
426
427
    async def create_subscription(self, params, callback):
428
        self.logger.debug("create_subscription")
429
        request = ua.CreateSubscriptionRequest()
430
        request.Parameters = params
431
        data = await self.protocol.send_request(request)
432
        response = struct_from_binary(
433
            ua.CreateSubscriptionResponse,
434
            data
435
        )
436
        response.ResponseHeader.ServiceResult.check()
437
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
438
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
439
        if not self._publish_task or self._publish_task.done():
440
            # Start the publish loop if it is not yet running
441
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
442
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
443
            # could be used if necessary.
444
            self._publish_task = self.loop.create_task(self._publish_loop())
445
        return response.Parameters
446
447
    async def delete_subscriptions(self, subscription_ids):
448
        self.logger.debug("delete_subscriptions %r", subscription_ids)
449
        request = ua.DeleteSubscriptionsRequest()
450
        request.Parameters.SubscriptionIds = subscription_ids
451
        data = await self.protocol.send_request(request)
452
        response = struct_from_binary(
453
            ua.DeleteSubscriptionsResponse,
454
            data
455
        )
456
        response.ResponseHeader.ServiceResult.check()
457
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
458
        for sid in subscription_ids:
459
            self._subscription_callbacks.pop(sid)
460
        return response.Results
461
462
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
463
        """
464
        Send a PublishRequest to the server.
465
        """
466
        self.logger.debug('publish %r', acks)
467
        request = ua.PublishRequest()
468
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
469
        data = await self.protocol.send_request(request, timeout=0)
470
        self.protocol.check_answer(data, "while waiting for publish response")
471
        try:
472
            response = struct_from_binary(ua.PublishResponse, data)
473
        except Exception:
474
            self.logger.exception("Error parsing notification from server")
475
            raise UaStructParsingError
476
        return response
477
478
    async def _publish_loop(self):
479
        """
480
        Start a loop that sends a publish requests and waits for the publish responses.
481
        Forward the `PublishResult` to the matching `Subscription` by callback.
482
        """
483
        ack = None
484
        while True:
485
            try:
486
                response = await self.publish([ack] if ack else [])
487
            except BadTimeout:  # See Spec. Part 4, 7.28
488
                # Repeat without acknowledgement
489
                ack = None
490
                continue
491
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
492
                # BadNoSubscription is expected to be received after deleting the last subscription.
493
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
494
                # checking if there are no more subscriptions registered in this client (). A Publish response
495
                # could still arrive before the DeleteSubscription response.
496
                #
497
                # We could remove the callback already when sending the DeleteSubscription request,
498
                # but there are some legitimate reasons to keep them around, such as when the server
499
                # responds with "BadTimeout" and we should try again later instead of just removing
500
                # the subscription client-side.
501
                #
502
                # There are a variety of ways to act correctly, but the most practical solution seems
503
                # to be to just silently ignore any BadNoSubscription responses.
504
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
505
                # End task
506
                return
507
            except UaStructParsingError:
508
                ack = None
509
                continue
510
            subscription_id = response.Parameters.SubscriptionId
511
            if not subscription_id:
512
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
513
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
514
                # End task
515
                return
516
            try:
517
                callback = self._subscription_callbacks[subscription_id]
518
            except KeyError:
519
                self.logger.warning(
520
                    "Received data for unknown subscription %s active are %s", subscription_id,
521
                    self._subscription_callbacks.keys()
522
                )
523
            else:
524
                try:
525
                    callback(response.Parameters)
526
                except Exception:  # we call client code, catch everything!
527
                    self.logger.exception("Exception while calling user callback: %s")
528
            # Repeat with acknowledgement
529
            ack = ua.SubscriptionAcknowledgement()
530
            ack.SubscriptionId = subscription_id
531
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
532
533
    async def create_monitored_items(self, params):
534
        self.logger.info("create_monitored_items")
535
        request = ua.CreateMonitoredItemsRequest()
536
        request.Parameters = params
537
        data = await self.protocol.send_request(request)
538
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
539
        self.logger.debug(response)
540
        response.ResponseHeader.ServiceResult.check()
541
        return response.Results
542
543
    async def delete_monitored_items(self, params):
544
        self.logger.info("delete_monitored_items")
545
        request = ua.DeleteMonitoredItemsRequest()
546
        request.Parameters = params
547
        data = await self.protocol.send_request(request)
548
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
549
        self.logger.debug(response)
550
        response.ResponseHeader.ServiceResult.check()
551
        return response.Results
552
553
    async def add_nodes(self, nodestoadd):
554
        self.logger.info("add_nodes")
555
        request = ua.AddNodesRequest()
556
        request.Parameters.NodesToAdd = nodestoadd
557
        data = await self.protocol.send_request(request)
558
        response = struct_from_binary(ua.AddNodesResponse, data)
559
        self.logger.debug(response)
560
        response.ResponseHeader.ServiceResult.check()
561
        return response.Results
562
563
    async def add_references(self, refs):
564
        self.logger.info("add_references")
565
        request = ua.AddReferencesRequest()
566
        request.Parameters.ReferencesToAdd = refs
567
        data = await self.protocol.send_request(request)
568
        response = struct_from_binary(ua.AddReferencesResponse, data)
569
        self.logger.debug(response)
570
        response.ResponseHeader.ServiceResult.check()
571
        return response.Results
572
573
    async def delete_references(self, refs):
574
        self.logger.info("delete")
575
        request = ua.DeleteReferencesRequest()
576
        request.Parameters.ReferencesToDelete = refs
577
        data = await self.protocol.send_request(request)
578
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
579
        self.logger.debug(response)
580
        response.ResponseHeader.ServiceResult.check()
581
        return response.Parameters.Results
582
583
    async def delete_nodes(self, params):
584
        self.logger.info("delete_nodes")
585
        request = ua.DeleteNodesRequest()
586
        request.Parameters = params
587
        data = await self.protocol.send_request(request)
588
        response = struct_from_binary(ua.DeleteNodesResponse, data)
589
        self.logger.debug(response)
590
        response.ResponseHeader.ServiceResult.check()
591
        return response.Results
592
593
    async def call(self, methodstocall):
594
        request = ua.CallRequest()
595
        request.Parameters.MethodsToCall = methodstocall
596
        data = await self.protocol.send_request(request)
597
        response = struct_from_binary(ua.CallResponse, data)
598
        self.logger.debug(response)
599
        response.ResponseHeader.ServiceResult.check()
600
        return response.Results
601
602
    async def history_read(self, params):
603
        self.logger.info("history_read")
604
        request = ua.HistoryReadRequest()
605
        request.Parameters = params
606
        data = await self.protocol.send_request(request)
607
        response = struct_from_binary(ua.HistoryReadResponse, data)
608
        self.logger.debug(response)
609
        response.ResponseHeader.ServiceResult.check()
610
        return response.Results
611
612
    async def modify_monitored_items(self, params):
613
        self.logger.info("modify_monitored_items")
614
        request = ua.ModifyMonitoredItemsRequest()
615
        request.Parameters = params
616
        data = await self.protocol.send_request(request)
617
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
618
        self.logger.debug(response)
619
        response.ResponseHeader.ServiceResult.check()
620
        return response.Results
621
622
    async def register_nodes(self, nodes):
623
        self.logger.info("register_nodes")
624
        request = ua.RegisterNodesRequest()
625
        request.Parameters.NodesToRegister = nodes
626
        data = await self.protocol.send_request(request)
627
        response = struct_from_binary(ua.RegisterNodesResponse, data)
628
        self.logger.debug(response)
629
        response.ResponseHeader.ServiceResult.check()
630
        return response.Parameters.RegisteredNodeIds
631
632
    async def unregister_nodes(self, nodes):
633
        self.logger.info("unregister_nodes")
634
        request = ua.UnregisterNodesRequest()
635
        request.Parameters.NodesToUnregister = nodes
636
        data = await self.protocol.send_request(request)
637
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
638
        self.logger.debug(response)
639
        response.ResponseHeader.ServiceResult.check()
640
        # nothing to return for this service
641
642
    async def get_attribute(self, nodes, attr):
643
        self.logger.info("get_attribute")
644
        request = ua.ReadRequest()
645
        for node in nodes:
646
            rv = ua.ReadValueId()
647
            rv.NodeId = node
648
            rv.AttributeId = attr
649
            request.Parameters.NodesToRead.append(rv)
650
        data = await self.protocol.send_request(request)
651
        response = struct_from_binary(ua.ReadResponse, data)
652
        response.ResponseHeader.ServiceResult.check()
653
        return response.Results
654