Completed
Pull Request — master (#184)
by Olivier
03:02
created

UaClient.read_attributes()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 3
dl 0
loc 12
rs 9.8
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        """
25
        :param timeout: Timeout in seconds
26
        :param security_policy: Security policy (optional)
27
        :param loop: Event loop (optional)
28
        """
29
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
30
        self.loop = loop or asyncio.get_event_loop()
31
        self.transport = None
32
        self.receive_buffer: Optional[bytes] = None
33
        self.is_receiving = False
34
        self.timeout = timeout
35
        self.authentication_token = ua.NodeId()
36
        self._request_id = 0
37
        self._request_handle = 0
38
        self._callbackmap: Dict[int, asyncio.Future] = {}
39
        self._connection = SecureConnection(security_policy)
40
        self.state = self.INITIALIZED
41
        self.closed: bool = False
42
43
    def connection_made(self, transport: asyncio.Transport):
44
        self.state = self.OPEN
45
        self.transport = transport
46
47
    def connection_lost(self, exc):
48
        self.logger.info("Socket has closed connection")
49
        self.state = self.CLOSED
50
        self.transport = None
51
52
    def data_received(self, data: bytes):
53
        if self.receive_buffer:
54
            data = self.receive_buffer + data
55
            self.receive_buffer = None
56
        self._process_received_data(data)
57
58
    def _process_received_data(self, data: bytes):
59
        """
60
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
61
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
62
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
63
        """
64
        buf = ua.utils.Buffer(data)
65
        while True:
66
            try:
67
                try:
68
                    header = header_from_binary(buf)
69
                except ua.utils.NotEnoughData:
70
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
71
                    self.receive_buffer = data
72
                    return
73
                if len(buf) < header.body_size:
74
                    self.logger.debug(
75
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buf)
76
                    )
77
                    self.receive_buffer = data
78
                    return
79
                msg = self._connection.receive_from_header_and_body(header, buf)
80
                self._process_received_message(msg)
81
                if not buf:
82
                    return
83
                # Buffer still has bytes left, try to process again
84
                data = bytes(buf)
85
            except Exception:
86
                self.logger.exception('Exception raised while parsing message from server')
87
                self.disconnect_socket()
88
                return
89
90
    def _process_received_message(self, msg):
91
        if msg is None:
92
            pass
93
        elif isinstance(msg, ua.Message):
94
            self._call_callback(msg.request_id(), msg.body())
95
        elif isinstance(msg, ua.Acknowledge):
96
            self._call_callback(0, msg)
97
        elif isinstance(msg, ua.ErrorMessage):
98
            self.logger.fatal("Received an error: %r", msg)
99
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
100
        else:
101
            raise ua.UaError(f"Unsupported message type: {msg}")
102
103
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
104
        """
105
        Send request to server, lower-level method.
106
        Timeout is the timeout written in ua header.
107
        :param request: Request
108
        :param timeout: Timeout in seconds
109
        :param message_type: UA Message Type (optional)
110
        :return: Future that resolves with the Response
111
        """
112
        request.RequestHeader = self._create_request_header(timeout)
113
        self.logger.debug('Sending: %s', request)
114
        try:
115
            binreq = struct_to_binary(request)
116
        except Exception:
117
            # reset request handle if any error
118
            # see self._create_request_header
119
            self._request_handle -= 1
120
            raise
121
        self._request_id += 1
122
        future = self.loop.create_future()
123
        self._callbackmap[self._request_id] = future
124
125
        # Change to the new security token if the connection has been renewed.
126
        if self._connection.next_security_token.TokenId != 0:
127
            self._connection.revolve_tokens()
128
129
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
130
        self.transport.write(msg)
131
        return future
132
133
    async def send_request(self, request, timeout=None, message_type=ua.MessageType.SecureMessage):
134
        """
135
        Send a request to the server.
136
        Timeout is the timeout written in ua header.
137
        Returns response object if no callback is provided.
138
        """
139
        timeout = self.timeout if timeout is None else timeout
140
        data = await asyncio.wait_for(
141
            self._send_request(request, timeout, message_type),
142
            timeout if timeout else None
143
        )
144
        self.check_answer(data, f" in response to {request.__class__.__name__}")
145
        return data
146
147
    def check_answer(self, data, context):
148
        data = data.copy()
149
        typeid = nodeid_from_binary(data)
150
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
151
            self.logger.warning("ServiceFault from server received %s", context)
152
            hdr = struct_from_binary(ua.ResponseHeader, data)
153
            hdr.ServiceResult.check()
154
            return False
155
        return True
156
157
    def _call_callback(self, request_id, body):
158
        try:
159
            self._callbackmap[request_id].set_result(body)
160
        except KeyError:
161
            raise ua.UaError(
162
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
163
            )
164
        except asyncio.InvalidStateError:
165
            if not self.closed:
166
                raise ua.UaError(f"Future for request id {request_id} is already done")
167
            self.logger.debug("Future for request id %s not handled due to disconnect", request_id)
168
        del self._callbackmap[request_id]
169
170
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
171
        """
172
        :param timeout: Timeout in seconds
173
        :return: Request header
174
        """
175
        hdr = ua.RequestHeader()
176
        hdr.AuthenticationToken = self.authentication_token
177
        self._request_handle += 1
178
        hdr.RequestHandle = self._request_handle
179
        hdr.TimeoutHint = timeout * 1000
180
        return hdr
181
182
    def disconnect_socket(self):
183
        self.logger.info("Request to close socket received")
184
        if self.transport:
185
            self.transport.close()
186
        else:
187
            self.logger.warning("disconnect_socket was called but transport is None")
188
189
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
190
        hello = ua.Hello()
191
        hello.EndpointUrl = url
192
        hello.MaxMessageSize = max_messagesize
193
        hello.MaxChunkCount = max_chunkcount
194
        ack = asyncio.Future()
195
        self._callbackmap[0] = ack
196
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
197
        return await asyncio.wait_for(ack, self.timeout)
198
199
    async def open_secure_channel(self, params):
200
        self.logger.info("open_secure_channel")
201
        request = ua.OpenSecureChannelRequest()
202
        request.Parameters = params
203
        result = await asyncio.wait_for(
204
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
205
            self.timeout
206
        )
207
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
208
        response.ResponseHeader.ServiceResult.check()
209
        self._connection.set_channel(response.Parameters, params.RequestType, params.ClientNonce)
210
        return response.Parameters
211
212
    async def close_secure_channel(self):
213
        """
214
        Close secure channel.
215
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
216
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
217
        and should just close socket.
218
        """
219
        self.logger.info("close_secure_channel")
220
        request = ua.CloseSecureChannelRequest()
221
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
222
        # don't expect any more answers
223
        future.cancel()
224
        self._callbackmap.clear()
225
        # some servers send a response here, most do not ... so we ignore
226
227
228
class UaClient:
229
    """
230
    low level OPC-UA client.
231
232
    It implements (almost) all methods defined in asyncua spec
233
    taking in argument the structures defined in asyncua spec.
234
235
    In this Python implementation  most of the structures are defined in
236
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
237
    """
238
239
    def __init__(self, timeout=1, loop=None):
240
        """
241
        :param timeout: Timout in seconds
242
        :param loop: Event loop (optional)
243
        """
244
        self.logger = logging.getLogger(f'{__name__}.UaClient')
245
        self.loop = loop or asyncio.get_event_loop()
246
        self._subscription_callbacks = {}
247
        self._timeout = timeout
248
        self.security_policy = ua.SecurityPolicy()
249
        self.protocol: Optional[UASocketProtocol] = None
250
        self._publish_task = None
251
252
    def set_security(self, policy: ua.SecurityPolicy):
253
        self.security_policy = policy
254
255
    def _make_protocol(self):
256
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
257
        return self.protocol
258
259
    async def connect_socket(self, host: str, port: int):
260
        """Connect to server socket."""
261
        self.logger.info("opening connection")
262
        await self.loop.create_connection(self._make_protocol, host, port)
263
264
    def disconnect_socket(self):
265
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
266
            self.logger.warning("disconnect_socket was called but connection is closed")
267
            return None
268
        return self.protocol.disconnect_socket()
269
270
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
271
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
272
273
    async def open_secure_channel(self, params):
274
        return await self.protocol.open_secure_channel(params)
275
276
    async def close_secure_channel(self):
277
        """
278
        close secure channel. It seems to trigger a shutdown of socket
279
        in most servers, so be prepare to reconnect
280
        """
281
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
282
            self.logger.warning("close_secure_channel was called but connection is closed")
283
            return
284
        return await self.protocol.close_secure_channel()
285
286
    async def create_session(self, parameters):
287
        self.logger.info("create_session")
288
        self.protocol.closed = False
289
        request = ua.CreateSessionRequest()
290
        request.Parameters = parameters
291
        data = await self.protocol.send_request(request)
292
        response = struct_from_binary(ua.CreateSessionResponse, data)
293
        self.logger.debug(response)
294
        response.ResponseHeader.ServiceResult.check()
295
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
296
        return response.Parameters
297
298
    async def activate_session(self, parameters):
299
        self.logger.info("activate_session")
300
        request = ua.ActivateSessionRequest()
301
        request.Parameters = parameters
302
        data = await self.protocol.send_request(request)
303
        response = struct_from_binary(ua.ActivateSessionResponse, data)
304
        self.logger.debug(response)
305
        response.ResponseHeader.ServiceResult.check()
306
        return response.Parameters
307
308
    async def close_session(self, delete_subscriptions):
309
        self.logger.info("close_session")
310
        self.protocol.closed = True
311
        if self._publish_task and not self._publish_task.done():
312
            self._publish_task.cancel()
313
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
314
            self.logger.warning("close_session was called but connection is closed")
315
            return
316
        request = ua.CloseSessionRequest()
317
        request.DeleteSubscriptions = delete_subscriptions
318
        data = await self.protocol.send_request(request)
319
        response = struct_from_binary(ua.CloseSessionResponse, data)
320
        try:
321
            response.ResponseHeader.ServiceResult.check()
322
        except BadSessionClosed:
323
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
324
            #          we can just ignore it therefore.
325
            #          Alternatively we could make sure that there are no publish requests in flight when
326
            #          closing the session.
327
            pass
328
329
    async def browse(self, parameters):
330
        self.logger.info("browse")
331
        request = ua.BrowseRequest()
332
        request.Parameters = parameters
333
        data = await self.protocol.send_request(request)
334
        response = struct_from_binary(ua.BrowseResponse, data)
335
        self.logger.debug(response)
336
        response.ResponseHeader.ServiceResult.check()
337
        return response.Results
338
339
    async def browse_next(self, parameters):
340
        self.logger.debug("browse next")
341
        request = ua.BrowseNextRequest()
342
        request.Parameters = parameters
343
        data = await self.protocol.send_request(request)
344
        response = struct_from_binary(ua.BrowseNextResponse, data)
345
        self.logger.debug(response)
346
        response.ResponseHeader.ServiceResult.check()
347
        return response.Parameters.Results
348
349
    async def read(self, parameters):
350
        self.logger.debug("read")
351
        request = ua.ReadRequest()
352
        request.Parameters = parameters
353
        data = await self.protocol.send_request(request)
354
        response = struct_from_binary(ua.ReadResponse, data)
355
        self.logger.debug(response)
356
        response.ResponseHeader.ServiceResult.check()
357
        # cast to Enum attributes that need to
358
        for idx, rv in enumerate(parameters.NodesToRead):
359
            if rv.AttributeId == ua.AttributeIds.NodeClass:
360
                dv = response.Results[idx]
361
                if dv.StatusCode.is_good():
362
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
363
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
364
                dv = response.Results[idx]
365
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
366
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
367
        return response.Results
368
369
    async def write(self, params):
370
        self.logger.debug("write")
371
        request = ua.WriteRequest()
372
        request.Parameters = params
373
        data = await self.protocol.send_request(request)
374
        response = struct_from_binary(ua.WriteResponse, data)
375
        self.logger.debug(response)
376
        response.ResponseHeader.ServiceResult.check()
377
        return response.Results
378
379
    async def get_endpoints(self, params):
380
        self.logger.debug("get_endpoint")
381
        request = ua.GetEndpointsRequest()
382
        request.Parameters = params
383
        data = await self.protocol.send_request(request)
384
        response = struct_from_binary(ua.GetEndpointsResponse, data)
385
        self.logger.debug(response)
386
        response.ResponseHeader.ServiceResult.check()
387
        return response.Endpoints
388
389
    async def find_servers(self, params):
390
        self.logger.debug("find_servers")
391
        request = ua.FindServersRequest()
392
        request.Parameters = params
393
        data = await self.protocol.send_request(request)
394
        response = struct_from_binary(ua.FindServersResponse, data)
395
        self.logger.debug(response)
396
        response.ResponseHeader.ServiceResult.check()
397
        return response.Servers
398
399
    async def find_servers_on_network(self, params):
400
        self.logger.debug("find_servers_on_network")
401
        request = ua.FindServersOnNetworkRequest()
402
        request.Parameters = params
403
        data = await self.protocol.send_request(request)
404
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
405
        self.logger.debug(response)
406
        response.ResponseHeader.ServiceResult.check()
407
        return response.Parameters
408
409
    async def register_server(self, registered_server):
410
        self.logger.debug("register_server")
411
        request = ua.RegisterServerRequest()
412
        request.Server = registered_server
413
        data = await self.protocol.send_request(request)
414
        response = struct_from_binary(ua.RegisterServerResponse, data)
415
        self.logger.debug(response)
416
        response.ResponseHeader.ServiceResult.check()
417
        # nothing to return for this service
418
419
    async def register_server2(self, params):
420
        self.logger.debug("register_server2")
421
        request = ua.RegisterServer2Request()
422
        request.Parameters = params
423
        data = await self.protocol.send_request(request)
424
        response = struct_from_binary(ua.RegisterServer2Response, data)
425
        self.logger.debug(response)
426
        response.ResponseHeader.ServiceResult.check()
427
        return response.ConfigurationResults
428
429
    async def translate_browsepaths_to_nodeids(self, browse_paths):
430
        self.logger.debug("translate_browsepath_to_nodeid")
431
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
432
        request.Parameters.BrowsePaths = browse_paths
433
        data = await self.protocol.send_request(request)
434
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
435
        self.logger.debug(response)
436
        response.ResponseHeader.ServiceResult.check()
437
        return response.Results
438
439
    async def create_subscription(self, params, callback):
440
        self.logger.debug("create_subscription")
441
        request = ua.CreateSubscriptionRequest()
442
        request.Parameters = params
443
        data = await self.protocol.send_request(request)
444
        response = struct_from_binary(
445
            ua.CreateSubscriptionResponse,
446
            data
447
        )
448
        response.ResponseHeader.ServiceResult.check()
449
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
450
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
451
        if not self._publish_task or self._publish_task.done():
452
            # Start the publish loop if it is not yet running
453
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
454
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
455
            # could be used if necessary.
456
            self._publish_task = self.loop.create_task(self._publish_loop())
457
        return response.Parameters
458
459
    async def delete_subscriptions(self, subscription_ids):
460
        self.logger.debug("delete_subscriptions %r", subscription_ids)
461
        request = ua.DeleteSubscriptionsRequest()
462
        request.Parameters.SubscriptionIds = subscription_ids
463
        data = await self.protocol.send_request(request)
464
        response = struct_from_binary(
465
            ua.DeleteSubscriptionsResponse,
466
            data
467
        )
468
        response.ResponseHeader.ServiceResult.check()
469
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
470
        for sid in subscription_ids:
471
            self._subscription_callbacks.pop(sid)
472
        return response.Results
473
474
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
475
        """
476
        Send a PublishRequest to the server.
477
        """
478
        self.logger.debug('publish %r', acks)
479
        request = ua.PublishRequest()
480
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
481
        data = await self.protocol.send_request(request, timeout=0)
482
        self.protocol.check_answer(data, "while waiting for publish response")
483
        try:
484
            response = struct_from_binary(ua.PublishResponse, data)
485
        except Exception:
486
            self.logger.exception("Error parsing notification from server")
487
            raise UaStructParsingError
488
        return response
489
490
    async def _publish_loop(self):
491
        """
492
        Start a loop that sends a publish requests and waits for the publish responses.
493
        Forward the `PublishResult` to the matching `Subscription` by callback.
494
        """
495
        ack = None
496
        while True:
497
            try:
498
                response = await self.publish([ack] if ack else [])
499
            except BadTimeout:  # See Spec. Part 4, 7.28
500
                # Repeat without acknowledgement
501
                ack = None
502
                continue
503
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
504
                # BadNoSubscription is expected to be received after deleting the last subscription.
505
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
506
                # checking if there are no more subscriptions registered in this client (). A Publish response
507
                # could still arrive before the DeleteSubscription response.
508
                #
509
                # We could remove the callback already when sending the DeleteSubscription request,
510
                # but there are some legitimate reasons to keep them around, such as when the server
511
                # responds with "BadTimeout" and we should try again later instead of just removing
512
                # the subscription client-side.
513
                #
514
                # There are a variety of ways to act correctly, but the most practical solution seems
515
                # to be to just silently ignore any BadNoSubscription responses.
516
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
517
                # End task
518
                return
519
            except UaStructParsingError:
520
                ack = None
521
                continue
522
            subscription_id = response.Parameters.SubscriptionId
523
            if not subscription_id:
524
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
525
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
526
                # End task
527
                return
528
            try:
529
                callback = self._subscription_callbacks[subscription_id]
530
            except KeyError:
531
                self.logger.warning(
532
                    "Received data for unknown subscription %s active are %s", subscription_id,
533
                    self._subscription_callbacks.keys()
534
                )
535
            else:
536
                try:
537
                    callback(response.Parameters)
538
                except Exception:  # we call user code, catch everything!
539
                    self.logger.exception("Exception while calling user callback: %s")
540
            # Repeat with acknowledgement
541
            ack = ua.SubscriptionAcknowledgement()
542
            ack.SubscriptionId = subscription_id
543
            ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
544
545
    async def create_monitored_items(self, params):
546
        self.logger.info("create_monitored_items")
547
        request = ua.CreateMonitoredItemsRequest()
548
        request.Parameters = params
549
        data = await self.protocol.send_request(request)
550
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
551
        self.logger.debug(response)
552
        response.ResponseHeader.ServiceResult.check()
553
        return response.Results
554
555
    async def delete_monitored_items(self, params):
556
        self.logger.info("delete_monitored_items")
557
        request = ua.DeleteMonitoredItemsRequest()
558
        request.Parameters = params
559
        data = await self.protocol.send_request(request)
560
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
561
        self.logger.debug(response)
562
        response.ResponseHeader.ServiceResult.check()
563
        return response.Results
564
565
    async def add_nodes(self, nodestoadd):
566
        self.logger.info("add_nodes")
567
        request = ua.AddNodesRequest()
568
        request.Parameters.NodesToAdd = nodestoadd
569
        data = await self.protocol.send_request(request)
570
        response = struct_from_binary(ua.AddNodesResponse, data)
571
        self.logger.debug(response)
572
        response.ResponseHeader.ServiceResult.check()
573
        return response.Results
574
575
    async def add_references(self, refs):
576
        self.logger.info("add_references")
577
        request = ua.AddReferencesRequest()
578
        request.Parameters.ReferencesToAdd = refs
579
        data = await self.protocol.send_request(request)
580
        response = struct_from_binary(ua.AddReferencesResponse, data)
581
        self.logger.debug(response)
582
        response.ResponseHeader.ServiceResult.check()
583
        return response.Results
584
585
    async def delete_references(self, refs):
586
        self.logger.info("delete")
587
        request = ua.DeleteReferencesRequest()
588
        request.Parameters.ReferencesToDelete = refs
589
        data = await self.protocol.send_request(request)
590
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
591
        self.logger.debug(response)
592
        response.ResponseHeader.ServiceResult.check()
593
        return response.Parameters.Results
594
595
    async def delete_nodes(self, params):
596
        self.logger.info("delete_nodes")
597
        request = ua.DeleteNodesRequest()
598
        request.Parameters = params
599
        data = await self.protocol.send_request(request)
600
        response = struct_from_binary(ua.DeleteNodesResponse, data)
601
        self.logger.debug(response)
602
        response.ResponseHeader.ServiceResult.check()
603
        return response.Results
604
605
    async def call(self, methodstocall):
606
        request = ua.CallRequest()
607
        request.Parameters.MethodsToCall = methodstocall
608
        data = await self.protocol.send_request(request)
609
        response = struct_from_binary(ua.CallResponse, data)
610
        self.logger.debug(response)
611
        response.ResponseHeader.ServiceResult.check()
612
        return response.Results
613
614
    async def history_read(self, params):
615
        self.logger.info("history_read")
616
        request = ua.HistoryReadRequest()
617
        request.Parameters = params
618
        data = await self.protocol.send_request(request)
619
        response = struct_from_binary(ua.HistoryReadResponse, data)
620
        self.logger.debug(response)
621
        response.ResponseHeader.ServiceResult.check()
622
        return response.Results
623
624
    async def modify_monitored_items(self, params):
625
        self.logger.info("modify_monitored_items")
626
        request = ua.ModifyMonitoredItemsRequest()
627
        request.Parameters = params
628
        data = await self.protocol.send_request(request)
629
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
630
        self.logger.debug(response)
631
        response.ResponseHeader.ServiceResult.check()
632
        return response.Results
633
634
    async def register_nodes(self, nodes):
635
        self.logger.info("register_nodes")
636
        request = ua.RegisterNodesRequest()
637
        request.Parameters.NodesToRegister = nodes
638
        data = await self.protocol.send_request(request)
639
        response = struct_from_binary(ua.RegisterNodesResponse, data)
640
        self.logger.debug(response)
641
        response.ResponseHeader.ServiceResult.check()
642
        return response.Parameters.RegisteredNodeIds
643
644
    async def unregister_nodes(self, nodes):
645
        self.logger.info("unregister_nodes")
646
        request = ua.UnregisterNodesRequest()
647
        request.Parameters.NodesToUnregister = nodes
648
        data = await self.protocol.send_request(request)
649
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
650
        self.logger.debug(response)
651
        response.ResponseHeader.ServiceResult.check()
652
        # nothing to return for this service
653
654
    async def read_attributes(self, nodeids, attr):
655
        self.logger.info("read_attributes of several nodes")
656
        request = ua.ReadRequest()
657
        for nodeid in nodeids:
658
            rv = ua.ReadValueId()
659
            rv.NodeId = nodeid
660
            rv.AttributeId = attr
661
            request.Parameters.NodesToRead.append(rv)
662
        data = await self.protocol.send_request(request)
663
        response = struct_from_binary(ua.ReadResponse, data)
664
        response.ResponseHeader.ServiceResult.check()
665
        return response.Results
666
667
    async def write_attributes(self, nodeids, datavalues, attributeid=ua.AttributeIds.Value):
668
        """
669
        Set an attribute of multiple nodes
670
        datavalue is a ua.DataValue object
671
        """
672
        self.logger.info("write_attributes of several nodes")
673
        request = ua.WriteRequest()
674
        for idx, nodeid in enumerate(nodeids):
675
            attr = ua.WriteValue()
676
            attr.NodeId = nodeid
677
            attr.AttributeId = attributeid
678
            attr.Value = datavalues[idx]
679
            request.Parameters.NodesToWrite.append(attr)
680
        data = await self.protocol.send_request(request)
681
        response = struct_from_binary(ua.WriteResponse, data)
682
        response.ResponseHeader.ServiceResult.check()
683
        return response.Results
684
685
686