Passed
Pull Request — master (#45)
by
unknown
02:35
created

UaClient.close_secure_channel()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 5
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed
11
from ..common.connection import SecureConnection
12
13
14
class ResponseParseError(ValueError):
15
    """Error while parsing responses."""
16
17
18
class UASocketProtocol(asyncio.Protocol):
19
    """
20
    Handle socket connection and send ua messages.
21
    Timeout is the timeout used while waiting for an ua answer from server.
22
    """
23
    INITIALIZED = 'initialized'
24
    OPEN = 'open'
25
    CLOSED = 'closed'
26
27
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
28
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
29
        self.loop = loop or asyncio.get_event_loop()
30
        self.transport = None
31
        self.receive_buffer: Optional[bytes] = None
32
        self.is_receiving = False
33
        self.timeout = timeout
34
        self.authentication_token = ua.NodeId()
35
        self._request_id = 0
36
        self._request_handle = 0
37
        self._callbackmap: Dict[int, asyncio.Future] = {}
38
        self._connection = SecureConnection(security_policy)
39
        self.state = self.INITIALIZED
40
41
    def connection_made(self, transport: asyncio.Transport):
42
        self.state = self.OPEN
43
        self.transport = transport
44
45
    def connection_lost(self, exc):
46
        self.logger.info("Socket has closed connection")
47
        self.state = self.CLOSED
48
        self.transport = None
49
50
    def data_received(self, data: bytes):
51
        if self.receive_buffer:
52
            data = self.receive_buffer + data
53
            self.receive_buffer = None
54
        self._process_received_data(data)
55
56
    def _process_received_data(self, data: bytes):
57
        """
58
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
59
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
60
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
61
        """
62
        buf = ua.utils.Buffer(data)
63
        while True:
64
            try:
65
                try:
66
                    header = header_from_binary(buf)
67
                except ua.utils.NotEnoughData:
68
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
69
                    self.receive_buffer = data
70
                    return
71
                if len(buf) < header.body_size:
72
                    self.logger.debug(
73
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
74
                    )
75
                    self.receive_buffer = data
76
                    return
77
                msg = self._connection.receive_from_header_and_body(header, buf)
78
                self._process_received_message(msg)
79
                if not buf:
80
                    return
81
            except Exception:
82
                self.logger.exception('Exception raised while parsing message from server')
83
                self.disconnect_socket()
84
                return
85
86
    def _process_received_message(self, msg):
87
        if msg is None:
88
            pass
89
        elif isinstance(msg, ua.Message):
90
            self._call_callback(msg.request_id(), msg.body())
91
        elif isinstance(msg, ua.Acknowledge):
92
            self._call_callback(0, msg)
93
        elif isinstance(msg, ua.ErrorMessage):
94
            self.logger.fatal("Received an error: %r", msg)
95
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
96
        else:
97
            raise ua.UaError(f"Unsupported message type: {msg}")
98
99
    def _send_request(self, request, timeout=1000, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
100
        """
101
        Send request to server, lower-level method.
102
        Timeout is the timeout written in ua header.
103
        Returns future
104
        """
105
        request.RequestHeader = self._create_request_header(timeout)
106
        self.logger.debug('Sending: %s', request)
107
        try:
108
            binreq = struct_to_binary(request)
109
        except Exception:
110
            # reset request handle if any error
111
            # see self._create_request_header
112
            self._request_handle -= 1
113
            raise
114
        self._request_id += 1
115
        future = self.loop.create_future()
116
        self._callbackmap[self._request_id] = future
117
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
118
        self.transport.write(msg)
119
        return future
120
121
    async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
122
        """
123
        Send a request to the server.
124
        Timeout is the timeout written in ua header.
125
        Returns response object if no callback is provided.
126
        """
127
        data = await asyncio.wait_for(
128
            self._send_request(request, timeout, message_type),
129
            timeout if timeout else None
130
        )
131
        self.check_answer(data, f" in response to {request.__class__.__name__}")
132
        return data
133
134
    def check_answer(self, data, context):
135
        data = data.copy()
136
        typeid = nodeid_from_binary(data)
137
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
138
            self.logger.warning("ServiceFault from server received %s", context)
139
            hdr = struct_from_binary(ua.ResponseHeader, data)
140
            hdr.ServiceResult.check()
141
            return False
142
        return True
143
144
    def _call_callback(self, request_id, body):
145
        try:
146
            self._callbackmap[request_id].set_result(body)
147
        except KeyError:
148
            raise ua.UaError(
149
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
150
            )
151
        except asyncio.InvalidStateError:
152
            raise ua.UaError(f"Future for request id {request_id} is already done")
153
154
    def _create_request_header(self, timeout=1000):
155
        hdr = ua.RequestHeader()
156
        hdr.AuthenticationToken = self.authentication_token
157
        self._request_handle += 1
158
        hdr.RequestHandle = self._request_handle
159
        hdr.TimeoutHint = timeout
160
        return hdr
161
162
    def disconnect_socket(self):
163
        self.logger.info("Request to close socket received")
164
        if self.transport:
165
            self.transport.close()
166
        else:
167
            self.logger.warning("disconnect_socket was called but transport is None")
168
169
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
170
        hello = ua.Hello()
171
        hello.EndpointUrl = url
172
        hello.MaxMessageSize = max_messagesize
173
        hello.MaxChunkCount = max_chunkcount
174
        ack = asyncio.Future()
175
        self._callbackmap[0] = ack
176
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
177
        return await asyncio.wait_for(ack, self.timeout)
178
179
    async def open_secure_channel(self, params):
180
        self.logger.info("open_secure_channel")
181
        request = ua.OpenSecureChannelRequest()
182
        request.Parameters = params
183
        result = await asyncio.wait_for(
184
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
185
            self.timeout
186
        )
187
        # FIXME: we have a race condition here
188
        # we can get a packet with the new token id before we reach to store it..
189
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
190
        response.ResponseHeader.ServiceResult.check()
191
        self._connection.set_channel(response.Parameters)
192
        return response.Parameters
193
194
    async def close_secure_channel(self):
195
        """
196
        Close secure channel.
197
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
198
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
199
        and should just close socket.
200
        """
201
        self.logger.info("close_secure_channel")
202
        request = ua.CloseSecureChannelRequest()
203
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
204
        # don't expect any more answers
205
        future.cancel()
206
        self._callbackmap.clear()
207
        # some servers send a response here, most do not ... so we ignore
208
209
210
class UaClient:
211
    """
212
    low level OPC-UA client.
213
214
    It implements (almost) all methods defined in asyncua spec
215
    taking in argument the structures defined in asyncua spec.
216
217
    In this Python implementation  most of the structures are defined in
218
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
219
    """
220
221
    def __init__(self, timeout=1, loop=None):
222
        self.logger = logging.getLogger(f'{__name__}.UaClient')
223
        self.loop = loop or asyncio.get_event_loop()
224
        self._subscription_callbacks = {}
225
        self._timeout = timeout
226
        self.security_policy = ua.SecurityPolicy()
227
        self.protocol: Optional[UASocketProtocol] = None
228
        self._publish_task = None
229
230
    def set_security(self, policy: ua.SecurityPolicy):
231
        self.security_policy = policy
232
233
    def _make_protocol(self):
234
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
235
        return self.protocol
236
237
    async def connect_socket(self, host: str, port: int):
238
        """Connect to server socket."""
239
        self.logger.info("opening connection")
240
        await self.loop.create_connection(self._make_protocol, host, port)
241
242
    def disconnect_socket(self):
243
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
244
            self.logger.warning("disconnect_socket was called but connection is closed")
245
            return None
246
        return self.protocol.disconnect_socket()
247
248
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
249
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
250
251
    async def open_secure_channel(self, params):
252
        return await self.protocol.open_secure_channel(params)
253
254
    async def close_secure_channel(self):
255
        """
256
        close secure channel. It seems to trigger a shutdown of socket
257
        in most servers, so be prepare to reconnect
258
        """
259
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
260
            self.logger.warning("close_secure_channel was called but connection is closed")
261
            return
262
        return await self.protocol.close_secure_channel()
263
264
    async def create_session(self, parameters):
265
        self.logger.info("create_session")
266
        request = ua.CreateSessionRequest()
267
        request.Parameters = parameters
268
        data = await self.protocol.send_request(request)
269
        response = struct_from_binary(ua.CreateSessionResponse, data)
270
        self.logger.debug(response)
271
        response.ResponseHeader.ServiceResult.check()
272
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
273
        return response.Parameters
274
275
    async def activate_session(self, parameters):
276
        self.logger.info("activate_session")
277
        request = ua.ActivateSessionRequest()
278
        request.Parameters = parameters
279
        data = await self.protocol.send_request(request)
280
        response = struct_from_binary(ua.ActivateSessionResponse, data)
281
        self.logger.debug(response)
282
        response.ResponseHeader.ServiceResult.check()
283
        return response.Parameters
284
285
    async def close_session(self, delete_subscriptions):
286
        self.logger.info("close_session")
287
        if self._publish_task and not self._publish_task.done():
288
            self._publish_task.cancel()
289
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
290
            self.logger.warning("close_session was called but connection is closed")
291
            return
292
        request = ua.CloseSessionRequest()
293
        request.DeleteSubscriptions = delete_subscriptions
294
        data = await self.protocol.send_request(request)
295
        response = struct_from_binary(ua.CloseSessionResponse, data)
296
        try:
297
            response.ResponseHeader.ServiceResult.check()
298
        except BadSessionClosed:
299
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
300
            #          we can just ignore it therefore.
301
            #          Alternatively we could make sure that there are no publish requests in flight when
302
            #          closing the session.
303
            pass
304
305
    async def browse(self, parameters):
306
        self.logger.info("browse")
307
        request = ua.BrowseRequest()
308
        request.Parameters = parameters
309
        data = await self.protocol.send_request(request)
310
        response = struct_from_binary(ua.BrowseResponse, data)
311
        self.logger.debug(response)
312
        response.ResponseHeader.ServiceResult.check()
313
        return response.Results
314
315
    async def browse_next(self, parameters):
316
        self.logger.debug("browse next")
317
        request = ua.BrowseNextRequest()
318
        request.Parameters = parameters
319
        data = await self.protocol.send_request(request)
320
        response = struct_from_binary(ua.BrowseNextResponse, data)
321
        self.logger.debug(response)
322
        response.ResponseHeader.ServiceResult.check()
323
        return response.Parameters.Results
324
325
    async def read(self, parameters):
326
        self.logger.debug("read")
327
        request = ua.ReadRequest()
328
        request.Parameters = parameters
329
        data = await self.protocol.send_request(request)
330
        response = struct_from_binary(ua.ReadResponse, data)
331
        self.logger.debug(response)
332
        response.ResponseHeader.ServiceResult.check()
333
        # cast to Enum attributes that need to
334
        for idx, rv in enumerate(parameters.NodesToRead):
335
            if rv.AttributeId == ua.AttributeIds.NodeClass:
336
                dv = response.Results[idx]
337
                if dv.StatusCode.is_good():
338
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
339
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
340
                dv = response.Results[idx]
341
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
342
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
343
        return response.Results
344
345
    async def write(self, params):
346
        self.logger.debug("write")
347
        request = ua.WriteRequest()
348
        request.Parameters = params
349
        data = await self.protocol.send_request(request)
350
        response = struct_from_binary(ua.WriteResponse, data)
351
        self.logger.debug(response)
352
        response.ResponseHeader.ServiceResult.check()
353
        return response.Results
354
355
    async def get_endpoints(self, params):
356
        self.logger.debug("get_endpoint")
357
        request = ua.GetEndpointsRequest()
358
        request.Parameters = params
359
        data = await self.protocol.send_request(request)
360
        response = struct_from_binary(ua.GetEndpointsResponse, data)
361
        self.logger.debug(response)
362
        response.ResponseHeader.ServiceResult.check()
363
        return response.Endpoints
364
365
    async def find_servers(self, params):
366
        self.logger.debug("find_servers")
367
        request = ua.FindServersRequest()
368
        request.Parameters = params
369
        data = await self.protocol.send_request(request)
370
        response = struct_from_binary(ua.FindServersResponse, data)
371
        self.logger.debug(response)
372
        response.ResponseHeader.ServiceResult.check()
373
        return response.Servers
374
375
    async def find_servers_on_network(self, params):
376
        self.logger.debug("find_servers_on_network")
377
        request = ua.FindServersOnNetworkRequest()
378
        request.Parameters = params
379
        data = await self.protocol.send_request(request)
380
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
381
        self.logger.debug(response)
382
        response.ResponseHeader.ServiceResult.check()
383
        return response.Parameters
384
385
    async def register_server(self, registered_server):
386
        self.logger.debug("register_server")
387
        request = ua.RegisterServerRequest()
388
        request.Server = registered_server
389
        data = await self.protocol.send_request(request)
390
        response = struct_from_binary(ua.RegisterServerResponse, data)
391
        self.logger.debug(response)
392
        response.ResponseHeader.ServiceResult.check()
393
        # nothing to return for this service
394
395
    async def register_server2(self, params):
396
        self.logger.debug("register_server2")
397
        request = ua.RegisterServer2Request()
398
        request.Parameters = params
399
        data = await self.protocol.send_request(request)
400
        response = struct_from_binary(ua.RegisterServer2Response, data)
401
        self.logger.debug(response)
402
        response.ResponseHeader.ServiceResult.check()
403
        return response.ConfigurationResults
404
405
    async def translate_browsepaths_to_nodeids(self, browse_paths):
406
        self.logger.debug("translate_browsepath_to_nodeid")
407
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
408
        request.Parameters.BrowsePaths = browse_paths
409
        data = await self.protocol.send_request(request)
410
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
411
        self.logger.debug(response)
412
        response.ResponseHeader.ServiceResult.check()
413
        return response.Results
414
415
    async def create_subscription(self, params, callback):
416
        self.logger.debug("create_subscription")
417
        request = ua.CreateSubscriptionRequest()
418
        request.Parameters = params
419
        data = await self.protocol.send_request(request)
420
        response = struct_from_binary(
421
            ua.CreateSubscriptionResponse,
422
            data
423
        )
424
        response.ResponseHeader.ServiceResult.check()
425
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
426
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
427
        if not self._publish_task or self._publish_task.done():
428
            # Start the publish cycle if it is not yet running
429
            self._publish_task = self.loop.create_task(self._publish_cycle())
430
        return response.Parameters
431
432
    async def delete_subscriptions(self, subscription_ids):
433
        self.logger.debug("delete_subscriptions %r", subscription_ids)
434
        request = ua.DeleteSubscriptionsRequest()
435
        request.Parameters.SubscriptionIds = subscription_ids
436
        data = await self.protocol.send_request(request)
437
        response = struct_from_binary(
438
            ua.DeleteSubscriptionsResponse,
439
            data
440
        )
441
        response.ResponseHeader.ServiceResult.check()
442
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
443
        for sid in subscription_ids:
444
            self._subscription_callbacks.pop(sid)
445
        return response.Results
446
447
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
448
        """
449
        Send a PublishRequest to the server.
450
        """
451
        self.logger.debug('publish %r', acks)
452
        request = ua.PublishRequest()
453
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
454
        data = await self.protocol.send_request(request, timeout=0)
455
        self.protocol.check_answer(data, "while waiting for publish response")
456
        try:
457
            response = struct_from_binary(ua.PublishResponse, data)
458
        except Exception:
459
            self.logger.exception("Error parsing notification from server")
460
            raise ResponseParseError
461
        return response
462
463
    async def _publish_cycle(self):
464
        """
465
        Start publish cycle that sends a publish request and waits for the publish response in an endless loop.
466
        Forward the `PublishResult` to the matching `Subscription` by callback.
467
        """
468
        ack = None
469
        while True:
470
            try:
471
                response = await self.publish([ack] if ack else [])
472
            except BadTimeout:  # See Spec. Part 4, 7.28
473
                # Repeat without acknowledgement
474
                ack = None
475
                continue
476
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
477
                # BadNoSubscription is expected to be received after deleting the last subscription.
478
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
479
                # checking if there are no more subscriptions registered in this client (). A Publish response
480
                # could still arrive before the DeleteSubscription response.
481
                #
482
                # We could remove the callback already when sending the DeleteSubscription request,
483
                # but there are some legitimate reasons to keep them around, such as when the server
484
                # responds with "BadTimeout" and we should try again later instead of just removing
485
                # the subscription client-side.
486
                #
487
                # There are a variety of ways to act correctly, but the most practical solution seems
488
                # to be to just silently ignore any BadNoSubscription responses.
489
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
490
                # End task
491
                return
492
            except ResponseParseError:
493
                ack = None
494
                continue
495
            subscription_id = response.Parameters.SubscriptionId
496
            if not subscription_id:
497
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
498
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
499
                # End task
500
                return
501
            try:
502
                callback = self._subscription_callbacks[subscription_id]
503
            except KeyError:
504
                self.logger.warning(
505
                    "Received data for unknown subscription %s active are %s", subscription_id,
506
                    self._subscription_callbacks.keys()
507
                )
508
            else:
509
                try:
510
                    callback(response.Parameters)
511
                except Exception:  # we call client code, catch everything!
512
                    self.logger.exception("Exception while calling user callback: %s")
513
            # Repeat with acknowledgement
514
            ack = ua.SubscriptionAcknowledgement()
515
            ack.SubscriptionId = subscription_id
516
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
517
518
    async def create_monitored_items(self, params):
519
        self.logger.info("create_monitored_items")
520
        request = ua.CreateMonitoredItemsRequest()
521
        request.Parameters = params
522
        data = await self.protocol.send_request(request)
523
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
524
        self.logger.debug(response)
525
        response.ResponseHeader.ServiceResult.check()
526
        return response.Results
527
528
    async def delete_monitored_items(self, params):
529
        self.logger.info("delete_monitored_items")
530
        request = ua.DeleteMonitoredItemsRequest()
531
        request.Parameters = params
532
        data = await self.protocol.send_request(request)
533
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
534
        self.logger.debug(response)
535
        response.ResponseHeader.ServiceResult.check()
536
        return response.Results
537
538
    async def add_nodes(self, nodestoadd):
539
        self.logger.info("add_nodes")
540
        request = ua.AddNodesRequest()
541
        request.Parameters.NodesToAdd = nodestoadd
542
        data = await self.protocol.send_request(request)
543
        response = struct_from_binary(ua.AddNodesResponse, data)
544
        self.logger.debug(response)
545
        response.ResponseHeader.ServiceResult.check()
546
        return response.Results
547
548
    async def add_references(self, refs):
549
        self.logger.info("add_references")
550
        request = ua.AddReferencesRequest()
551
        request.Parameters.ReferencesToAdd = refs
552
        data = await self.protocol.send_request(request)
553
        response = struct_from_binary(ua.AddReferencesResponse, data)
554
        self.logger.debug(response)
555
        response.ResponseHeader.ServiceResult.check()
556
        return response.Results
557
558
    async def delete_references(self, refs):
559
        self.logger.info("delete")
560
        request = ua.DeleteReferencesRequest()
561
        request.Parameters.ReferencesToDelete = refs
562
        data = await self.protocol.send_request(request)
563
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
564
        self.logger.debug(response)
565
        response.ResponseHeader.ServiceResult.check()
566
        return response.Parameters.Results
567
568
    async def delete_nodes(self, params):
569
        self.logger.info("delete_nodes")
570
        request = ua.DeleteNodesRequest()
571
        request.Parameters = params
572
        data = await self.protocol.send_request(request)
573
        response = struct_from_binary(ua.DeleteNodesResponse, data)
574
        self.logger.debug(response)
575
        response.ResponseHeader.ServiceResult.check()
576
        return response.Results
577
578
    async def call(self, methodstocall):
579
        request = ua.CallRequest()
580
        request.Parameters.MethodsToCall = methodstocall
581
        data = await self.protocol.send_request(request)
582
        response = struct_from_binary(ua.CallResponse, data)
583
        self.logger.debug(response)
584
        response.ResponseHeader.ServiceResult.check()
585
        return response.Results
586
587
    async def history_read(self, params):
588
        self.logger.info("history_read")
589
        request = ua.HistoryReadRequest()
590
        request.Parameters = params
591
        data = await self.protocol.send_request(request)
592
        response = struct_from_binary(ua.HistoryReadResponse, data)
593
        self.logger.debug(response)
594
        response.ResponseHeader.ServiceResult.check()
595
        return response.Results
596
597
    async def modify_monitored_items(self, params):
598
        self.logger.info("modify_monitored_items")
599
        request = ua.ModifyMonitoredItemsRequest()
600
        request.Parameters = params
601
        data = await self.protocol.send_request(request)
602
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
603
        self.logger.debug(response)
604
        response.ResponseHeader.ServiceResult.check()
605
        return response.Results
606
607
    async def register_nodes(self, nodes):
608
        self.logger.info("register_nodes")
609
        request = ua.RegisterNodesRequest()
610
        request.Parameters.NodesToRegister = nodes
611
        data = await self.protocol.send_request(request)
612
        response = struct_from_binary(ua.RegisterNodesResponse, data)
613
        self.logger.debug(response)
614
        response.ResponseHeader.ServiceResult.check()
615
        return response.Parameters.RegisteredNodeIds
616
617
    async def unregister_nodes(self, nodes):
618
        self.logger.info("unregister_nodes")
619
        request = ua.UnregisterNodesRequest()
620
        request.Parameters.NodesToUnregister = nodes
621
        data = await self.protocol.send_request(request)
622
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
623
        self.logger.debug(response)
624
        response.ResponseHeader.ServiceResult.check()
625
        # nothing to return for this service
626
627
    async def get_attribute(self, nodes, attr):
628
        self.logger.info("get_attribute")
629
        request = ua.ReadRequest()
630
        for node in nodes:
631
            rv = ua.ReadValueId()
632
            rv.NodeId = node
633
            rv.AttributeId = attr
634
            request.Parameters.NodesToRead.append(rv)
635
        data = await self.protocol.send_request(request)
636
        response = struct_from_binary(ua.ReadResponse, data)
637
        response.ResponseHeader.ServiceResult.check()
638
        return response.Results
639