Completed
Pull Request — master (#321)
by
unknown
02:53
created

UaClient.create_subscription()   A

Complexity

Conditions 3

Size

Total Lines 19
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 14
nop 3
dl 0
loc 19
rs 9.7
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
import asyncio
5
import logging
6
from typing import Dict, List
7
from asyncua import ua
8
from typing import Optional
9
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
10
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
11
from ..common.connection import SecureConnection
12
13
14
class UASocketProtocol(asyncio.Protocol):
15
    """
16
    Handle socket connection and send ua messages.
17
    Timeout is the timeout used while waiting for an ua answer from server.
18
    """
19
    INITIALIZED = 'initialized'
20
    OPEN = 'open'
21
    CLOSED = 'closed'
22
23
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
24
        """
25
        :param timeout: Timeout in seconds
26
        :param security_policy: Security policy (optional)
27
        :param loop: Event loop (optional)
28
        """
29
        self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
30
        self.loop = loop or asyncio.get_event_loop()
31
        self.transport = None
32
        self.receive_buffer = bytes()
33
        self.is_receiving = False
34
        self.timeout = timeout
35
        self.authentication_token = ua.NodeId()
36
        self._request_id = 0
37
        self._request_handle = 0
38
        self._callbackmap: Dict[int, asyncio.Future] = {}
39
        self._connection = SecureConnection(security_policy)
40
        self.state = self.INITIALIZED
41
        self.closed: bool = False
42
        self.renewal_lock: asyncio.Lock = asyncio.Lock()
43
44
    def connection_made(self, transport: asyncio.Transport):
45
        self.state = self.OPEN
46
        self.transport = transport
47
48
    def connection_lost(self, exc):
49
        self.logger.info("Socket has closed connection")
50
        self.state = self.CLOSED
51
        self.transport = None
52
53
    def data_received(self, data: bytes):
54
        # buffer everything
55
        self.receive_buffer += data
56
        self.loop.create_task(self._async_data_received())
57
58
    async def _async_data_received(self):
59
        """
60
        Try to parse received data as asyncua message. Data may be chunked but will be in correct order.
61
        See: https://docs.python.org/3/library/asyncio-protocol.html#asyncio.Protocol.data_received
62
        Reassembly is done by filling up a buffer until it verifies as a valid message (or a MessageChunk).
63
        """
64
        try:
65
            buffer_header = ua.utils.Buffer(self.receive_buffer)
66
            while buffer_header:
67
                try:
68
                    header = header_from_binary(buffer_header)
69
                except ua.utils.NotEnoughData:
70
                    self.logger.debug('Not enough data while parsing header from server, waiting for more')
71
                    return
72
                if len(buffer_header) < header.body_size:
73
                    self.logger.debug(
74
                        'We did not receive enough data from server. Need %s got %s', header.body_size, len(buffer_header)
75
                    )
76
                    return
77
78
                # move whole telegram into other buffer
79
                # otherwise the same header will be read by immediately following telegrams
80
                header_length = buffer_header._cur_pos
81
                telegram_length = header_length + header.body_size
82
                telegram_body = self.receive_buffer[header_length:telegram_length]
83
                self.receive_buffer = self.receive_buffer[telegram_length:]
84
                buffer_header = ua.utils.Buffer(self.receive_buffer) # old buffer_header is invalid now because underlying data changed
85
                buffer_telegram = ua.utils.Buffer(telegram_body)
86
87
                # this way, every incoming packet has the chance to block the others, but if it doesn't need to, the other packets won't notice.
88
                await self.renewal_lock.acquire() # wait here to be let in by the bouncer.
89
                if header.MessageType != ua.MessageType.SecureOpen:
90
                    self.renewal_lock.release() # don't need to keep others waiting, can unlock already.
91
                msg = self._connection.receive_from_header_and_body(header, buffer_telegram)
92
                self._process_received_message(msg)
93
        except:
94
            self.logger.exception('Exception raised while parsing message from server')
95
            self.disconnect_socket()
96
            if self.renewal_lock.locked():
97
                self.renewal_lock.release()
98
99
    def _process_received_message(self, msg):
100
        if msg is None:
101
            pass
102
        elif isinstance(msg, ua.Message):
103
            self._call_callback(msg.request_id(), msg.body())
104
        elif isinstance(msg, ua.Acknowledge):
105
            self._call_callback(0, msg)
106
        elif isinstance(msg, ua.ErrorMessage):
107
            self.logger.fatal("Received an error: %r", msg)
108
            self._call_callback(0, ua.UaStatusCodeError(msg.Error.value))
109
        else:
110
            raise ua.UaError(f"Unsupported message type: {msg}")
111
112
    def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
113
        """
114
        Send request to server, lower-level method.
115
        Timeout is the timeout written in ua header.
116
        :param request: Request
117
        :param timeout: Timeout in seconds
118
        :param message_type: UA Message Type (optional)
119
        :return: Future that resolves with the Response
120
        """
121
        request.RequestHeader = self._create_request_header(timeout)
122
        self.logger.debug('Sending: %s', request)
123
        try:
124
            binreq = struct_to_binary(request)
125
        except Exception:
126
            # reset request handle if any error
127
            # see self._create_request_header
128
            self._request_handle -= 1
129
            raise
130
        self._request_id += 1
131
        future = self.loop.create_future()
132
        self._callbackmap[self._request_id] = future
133
134
        # Change to the new security token if the connection has been renewed.
135
        if self._connection.next_security_token.TokenId != 0:
136
            self._connection.revolve_tokens()
137
138
        msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
139
        self.transport.write(msg)
140
        return future
141
142
    async def send_request(self, request, timeout=None, message_type=ua.MessageType.SecureMessage):
143
        """
144
        Send a request to the server.
145
        Timeout is the timeout written in ua header.
146
        Returns response object if no callback is provided.
147
        """
148
        timeout = self.timeout if timeout is None else timeout
149
        try:
150
            data = await asyncio.wait_for(
151
                self._send_request(request, timeout, message_type),
152
                timeout if timeout else None
153
            )
154
        except Exception:
155
            if self.state != self.OPEN:
156
                raise ConnectionError("Connection is closed") from None
157
158
            raise
159
160
        self.check_answer(data, f" in response to {request.__class__.__name__}")
161
        return data
162
163
    def check_answer(self, data, context):
164
        data = data.copy()
165
        typeid = nodeid_from_binary(data)
166
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
167
            self.logger.warning("ServiceFault from server received %s", context)
168
            hdr = struct_from_binary(ua.ResponseHeader, data)
169
            hdr.ServiceResult.check()
170
            return False
171
        return True
172
173
    def _call_callback(self, request_id, body):
174
        try:
175
            self._callbackmap[request_id].set_result(body)
176
        except KeyError:
177
            raise ua.UaError(
178
                f"No request found for request id: {request_id}, pending are {self._callbackmap.keys()}"
179
            )
180
        except asyncio.InvalidStateError:
181
            if not self.closed:
182
                raise ua.UaError(f"Future for request id {request_id} is already done")
183
            self.logger.debug("Future for request id %s not handled due to disconnect", request_id)
184
        del self._callbackmap[request_id]
185
186
    def _create_request_header(self, timeout=1) -> ua.RequestHeader:
187
        """
188
        :param timeout: Timeout in seconds
189
        :return: Request header
190
        """
191
        hdr = ua.RequestHeader()
192
        hdr.AuthenticationToken = self.authentication_token
193
        self._request_handle += 1
194
        hdr.RequestHandle = self._request_handle
195
        hdr.TimeoutHint = timeout * 1000
196
        return hdr
197
198
    def disconnect_socket(self):
199
        self.logger.info("Request to close socket received")
200
        if self.transport:
201
            self.transport.close()
202
        else:
203
            self.logger.warning("disconnect_socket was called but transport is None")
204
205
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
206
        hello = ua.Hello()
207
        hello.EndpointUrl = url
208
        hello.MaxMessageSize = max_messagesize
209
        hello.MaxChunkCount = max_chunkcount
210
        ack = asyncio.Future()
211
        self._callbackmap[0] = ack
212
        self.transport.write(uatcp_to_binary(ua.MessageType.Hello, hello))
213
        return await asyncio.wait_for(ack, self.timeout)
214
215
    async def open_secure_channel(self, params):
216
        self.logger.info("open_secure_channel")
217
        request = ua.OpenSecureChannelRequest()
218
        request.Parameters = params
219
        result = await asyncio.wait_for(
220
            self._send_request(request, message_type=ua.MessageType.SecureOpen),
221
            self.timeout
222
        )
223
        response = struct_from_binary(ua.OpenSecureChannelResponse, result)
224
        response.ResponseHeader.ServiceResult.check()
225
        self._connection.set_channel(response.Parameters, params.RequestType, params.ClientNonce)
226
        if self.renewal_lock.locked():
227
            self.renewal_lock.release()
228
        return response.Parameters
229
230
    async def close_secure_channel(self):
231
        """
232
        Close secure channel.
233
        It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
234
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response
235
        and should just close socket.
236
        """
237
        self.logger.info("close_secure_channel")
238
        request = ua.CloseSecureChannelRequest()
239
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
240
        # don't expect any more answers
241
        future.cancel()
242
        self._callbackmap.clear()
243
        # some servers send a response here, most do not ... so we ignore
244
245
246
class UaClient:
247
    """
248
    low level OPC-UA client.
249
250
    It implements (almost) all methods defined in asyncua spec
251
    taking in argument the structures defined in asyncua spec.
252
253
    In this Python implementation  most of the structures are defined in
254
    uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
255
    """
256
257
    def __init__(self, timeout=1, loop=None):
258
        """
259
        :param timeout: Timout in seconds
260
        :param loop: Event loop (optional)
261
        """
262
        self.logger = logging.getLogger(f'{__name__}.UaClient')
263
        self.loop = loop or asyncio.get_event_loop()
264
        self._subscription_callbacks = {}
265
        self._timeout = timeout
266
        self.security_policy = ua.SecurityPolicy()
267
        self.protocol: Optional[UASocketProtocol] = None
268
        self._publish_task = None
269
270
    def set_security(self, policy: ua.SecurityPolicy):
271
        self.security_policy = policy
272
273
    def _make_protocol(self):
274
        self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
275
        return self.protocol
276
277
    async def connect_socket(self, host: str, port: int):
278
        """Connect to server socket."""
279
        self.logger.info("opening connection")
280
        await self.loop.create_connection(self._make_protocol, host, port)
281
282
    def disconnect_socket(self):
283
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
284
            self.logger.warning("disconnect_socket was called but connection is closed")
285
            return None
286
        return self.protocol.disconnect_socket()
287
288
    async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
289
        await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
290
291
    async def open_secure_channel(self, params):
292
        return await self.protocol.open_secure_channel(params)
293
294
    async def close_secure_channel(self):
295
        """
296
        close secure channel. It seems to trigger a shutdown of socket
297
        in most servers, so be prepare to reconnect
298
        """
299
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
300
            self.logger.warning("close_secure_channel was called but connection is closed")
301
            return
302
        return await self.protocol.close_secure_channel()
303
304
    async def create_session(self, parameters):
305
        self.logger.info("create_session")
306
        self.protocol.closed = False
307
        request = ua.CreateSessionRequest()
308
        request.Parameters = parameters
309
        data = await self.protocol.send_request(request)
310
        response = struct_from_binary(ua.CreateSessionResponse, data)
311
        self.logger.debug(response)
312
        response.ResponseHeader.ServiceResult.check()
313
        self.protocol.authentication_token = response.Parameters.AuthenticationToken
314
        return response.Parameters
315
316
    async def activate_session(self, parameters):
317
        self.logger.info("activate_session")
318
        request = ua.ActivateSessionRequest()
319
        request.Parameters = parameters
320
        data = await self.protocol.send_request(request)
321
        response = struct_from_binary(ua.ActivateSessionResponse, data)
322
        self.logger.debug(response)
323
        response.ResponseHeader.ServiceResult.check()
324
        return response.Parameters
325
326
    async def close_session(self, delete_subscriptions):
327
        self.logger.info("close_session")
328
        self.protocol.closed = True
329
        if self._publish_task and not self._publish_task.done():
330
            self._publish_task.cancel()
331
        if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
332
            self.logger.warning("close_session was called but connection is closed")
333
            return
334
        request = ua.CloseSessionRequest()
335
        request.DeleteSubscriptions = delete_subscriptions
336
        data = await self.protocol.send_request(request)
337
        response = struct_from_binary(ua.CloseSessionResponse, data)
338
        try:
339
            response.ResponseHeader.ServiceResult.check()
340
        except BadSessionClosed:
341
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
342
            #          we can just ignore it therefore.
343
            #          Alternatively we could make sure that there are no publish requests in flight when
344
            #          closing the session.
345
            pass
346
347
    async def browse(self, parameters):
348
        self.logger.info("browse")
349
        request = ua.BrowseRequest()
350
        request.Parameters = parameters
351
        data = await self.protocol.send_request(request)
352
        response = struct_from_binary(ua.BrowseResponse, data)
353
        self.logger.debug(response)
354
        response.ResponseHeader.ServiceResult.check()
355
        return response.Results
356
357
    async def browse_next(self, parameters):
358
        self.logger.debug("browse next")
359
        request = ua.BrowseNextRequest()
360
        request.Parameters = parameters
361
        data = await self.protocol.send_request(request)
362
        response = struct_from_binary(ua.BrowseNextResponse, data)
363
        self.logger.debug(response)
364
        response.ResponseHeader.ServiceResult.check()
365
        return response.Parameters.Results
366
367
    async def read(self, parameters):
368
        self.logger.debug("read")
369
        request = ua.ReadRequest()
370
        request.Parameters = parameters
371
        data = await self.protocol.send_request(request)
372
        response = struct_from_binary(ua.ReadResponse, data)
373
        self.logger.debug(response)
374
        response.ResponseHeader.ServiceResult.check()
375
        # cast to Enum attributes that need to
376
        for idx, rv in enumerate(parameters.NodesToRead):
377
            if rv.AttributeId == ua.AttributeIds.NodeClass:
378
                dv = response.Results[idx]
379
                if dv.StatusCode.is_good():
380
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
381
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
382
                dv = response.Results[idx]
383
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
384
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
385
        return response.Results
386
387
    async def write(self, params):
388
        self.logger.debug("write")
389
        request = ua.WriteRequest()
390
        request.Parameters = params
391
        data = await self.protocol.send_request(request)
392
        response = struct_from_binary(ua.WriteResponse, data)
393
        self.logger.debug(response)
394
        response.ResponseHeader.ServiceResult.check()
395
        return response.Results
396
397
    async def get_endpoints(self, params):
398
        self.logger.debug("get_endpoint")
399
        request = ua.GetEndpointsRequest()
400
        request.Parameters = params
401
        data = await self.protocol.send_request(request)
402
        response = struct_from_binary(ua.GetEndpointsResponse, data)
403
        self.logger.debug(response)
404
        response.ResponseHeader.ServiceResult.check()
405
        return response.Endpoints
406
407
    async def find_servers(self, params):
408
        self.logger.debug("find_servers")
409
        request = ua.FindServersRequest()
410
        request.Parameters = params
411
        data = await self.protocol.send_request(request)
412
        response = struct_from_binary(ua.FindServersResponse, data)
413
        self.logger.debug(response)
414
        response.ResponseHeader.ServiceResult.check()
415
        return response.Servers
416
417
    async def find_servers_on_network(self, params):
418
        self.logger.debug("find_servers_on_network")
419
        request = ua.FindServersOnNetworkRequest()
420
        request.Parameters = params
421
        data = await self.protocol.send_request(request)
422
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
423
        self.logger.debug(response)
424
        response.ResponseHeader.ServiceResult.check()
425
        return response.Parameters
426
427
    async def register_server(self, registered_server):
428
        self.logger.debug("register_server")
429
        request = ua.RegisterServerRequest()
430
        request.Server = registered_server
431
        data = await self.protocol.send_request(request)
432
        response = struct_from_binary(ua.RegisterServerResponse, data)
433
        self.logger.debug(response)
434
        response.ResponseHeader.ServiceResult.check()
435
        # nothing to return for this service
436
437
    async def register_server2(self, params):
438
        self.logger.debug("register_server2")
439
        request = ua.RegisterServer2Request()
440
        request.Parameters = params
441
        data = await self.protocol.send_request(request)
442
        response = struct_from_binary(ua.RegisterServer2Response, data)
443
        self.logger.debug(response)
444
        response.ResponseHeader.ServiceResult.check()
445
        return response.ConfigurationResults
446
447
    async def translate_browsepaths_to_nodeids(self, browse_paths):
448
        self.logger.debug("translate_browsepath_to_nodeid")
449
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
450
        request.Parameters.BrowsePaths = browse_paths
451
        data = await self.protocol.send_request(request)
452
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
453
        self.logger.debug(response)
454
        response.ResponseHeader.ServiceResult.check()
455
        return response.Results
456
457
    async def create_subscription(self, params, callback):
458
        self.logger.debug("create_subscription")
459
        request = ua.CreateSubscriptionRequest()
460
        request.Parameters = params
461
        data = await self.protocol.send_request(request)
462
        response = struct_from_binary(
463
            ua.CreateSubscriptionResponse,
464
            data
465
        )
466
        response.ResponseHeader.ServiceResult.check()
467
        self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
468
        self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
469
        if not self._publish_task or self._publish_task.done():
470
            # Start the publish loop if it is not yet running
471
            # The current strategy is to have only one open publish request per UaClient. This might not be enough
472
            # in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
473
            # could be used if necessary.
474
            self._publish_task = self.loop.create_task(self._publish_loop())
475
        return response.Parameters
476
477
    async def delete_subscriptions(self, subscription_ids):
478
        self.logger.debug("delete_subscriptions %r", subscription_ids)
479
        request = ua.DeleteSubscriptionsRequest()
480
        request.Parameters.SubscriptionIds = subscription_ids
481
        data = await self.protocol.send_request(request)
482
        response = struct_from_binary(
483
            ua.DeleteSubscriptionsResponse,
484
            data
485
        )
486
        response.ResponseHeader.ServiceResult.check()
487
        self.logger.info("remove subscription callbacks for %r", subscription_ids)
488
        for sid in subscription_ids:
489
            self._subscription_callbacks.pop(sid)
490
        return response.Results
491
492
    async def publish(self, acks: List[ua.SubscriptionAcknowledgement]) -> ua.PublishResponse:
493
        """
494
        Send a PublishRequest to the server.
495
        """
496
        self.logger.debug('publish %r', acks)
497
        request = ua.PublishRequest()
498
        request.Parameters.SubscriptionAcknowledgements = acks if acks else []
499
        data = await self.protocol.send_request(request, timeout=0)
500
        self.protocol.check_answer(data, "while waiting for publish response")
501
        try:
502
            response = struct_from_binary(ua.PublishResponse, data)
503
        except Exception:
504
            self.logger.exception("Error parsing notification from server")
505
            raise UaStructParsingError
506
        return response
507
508
    async def _publish_loop(self):
509
        """
510
        Start a loop that sends a publish requests and waits for the publish responses.
511
        Forward the `PublishResult` to the matching `Subscription` by callback.
512
        """
513
        ack = None
514
        while True:
515
            try:
516
                response = await self.publish([ack] if ack else [])
517
            except BadTimeout:  # See Spec. Part 4, 7.28
518
                # Repeat without acknowledgement
519
                ack = None
520
                continue
521
            except BadNoSubscription:  # See Spec. Part 5, 13.8.1
522
                # BadNoSubscription is expected to be received after deleting the last subscription.
523
                # We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
524
                # checking if there are no more subscriptions registered in this client (). A Publish response
525
                # could still arrive before the DeleteSubscription response.
526
                #
527
                # We could remove the callback already when sending the DeleteSubscription request,
528
                # but there are some legitimate reasons to keep them around, such as when the server
529
                # responds with "BadTimeout" and we should try again later instead of just removing
530
                # the subscription client-side.
531
                #
532
                # There are a variety of ways to act correctly, but the most practical solution seems
533
                # to be to just silently ignore any BadNoSubscription responses.
534
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
535
                # End task
536
                return
537
            except UaStructParsingError:
538
                ack = None
539
                continue
540
            subscription_id = response.Parameters.SubscriptionId
541
            if not subscription_id:
542
                # The value 0 is used to indicate that there were no Subscriptions defined for which a
543
                # response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
544
                # End task
545
                return
546
            try:
547
                callback = self._subscription_callbacks[subscription_id]
548
            except KeyError:
549
                self.logger.warning(
550
                    "Received data for unknown subscription %s active are %s", subscription_id,
551
                    self._subscription_callbacks.keys()
552
                )
553
            else:
554
                try:
555
                    if asyncio.iscoroutinefunction(callback):
556
                        await callback(response.Parameters)
557
                    else:
558
                        callback(response.Parameters)
559
                except Exception:  # we call user code, catch everything!
560
                    self.logger.exception("Exception while calling user callback: %s")
561
            # Repeat with acknowledgement
562
            if response.Parameters.NotificationMessage.NotificationData:
563
                ack = ua.SubscriptionAcknowledgement()
564
                ack.SubscriptionId = subscription_id
565
                ack.SequenceNumber = response.Parameters.NotificationMessage.SequenceNumber
566
            else:
567
                ack = None
568
569
    async def create_monitored_items(self, params):
570
        self.logger.info("create_monitored_items")
571
        request = ua.CreateMonitoredItemsRequest()
572
        request.Parameters = params
573
        data = await self.protocol.send_request(request)
574
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
575
        self.logger.debug(response)
576
        response.ResponseHeader.ServiceResult.check()
577
        return response.Results
578
579
    async def delete_monitored_items(self, params):
580
        self.logger.info("delete_monitored_items")
581
        request = ua.DeleteMonitoredItemsRequest()
582
        request.Parameters = params
583
        data = await self.protocol.send_request(request)
584
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
585
        self.logger.debug(response)
586
        response.ResponseHeader.ServiceResult.check()
587
        return response.Results
588
589
    async def add_nodes(self, nodestoadd):
590
        self.logger.info("add_nodes")
591
        request = ua.AddNodesRequest()
592
        request.Parameters.NodesToAdd = nodestoadd
593
        data = await self.protocol.send_request(request)
594
        response = struct_from_binary(ua.AddNodesResponse, data)
595
        self.logger.debug(response)
596
        response.ResponseHeader.ServiceResult.check()
597
        return response.Results
598
599
    async def add_references(self, refs):
600
        self.logger.info("add_references")
601
        request = ua.AddReferencesRequest()
602
        request.Parameters.ReferencesToAdd = refs
603
        data = await self.protocol.send_request(request)
604
        response = struct_from_binary(ua.AddReferencesResponse, data)
605
        self.logger.debug(response)
606
        response.ResponseHeader.ServiceResult.check()
607
        return response.Results
608
609
    async def delete_references(self, refs):
610
        self.logger.info("delete")
611
        request = ua.DeleteReferencesRequest()
612
        request.Parameters.ReferencesToDelete = refs
613
        data = await self.protocol.send_request(request)
614
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
615
        self.logger.debug(response)
616
        response.ResponseHeader.ServiceResult.check()
617
        return response.Parameters.Results
618
619
    async def delete_nodes(self, params):
620
        self.logger.info("delete_nodes")
621
        request = ua.DeleteNodesRequest()
622
        request.Parameters = params
623
        data = await self.protocol.send_request(request)
624
        response = struct_from_binary(ua.DeleteNodesResponse, data)
625
        self.logger.debug(response)
626
        response.ResponseHeader.ServiceResult.check()
627
        return response.Results
628
629
    async def call(self, methodstocall):
630
        request = ua.CallRequest()
631
        request.Parameters.MethodsToCall = methodstocall
632
        data = await self.protocol.send_request(request)
633
        response = struct_from_binary(ua.CallResponse, data)
634
        self.logger.debug(response)
635
        response.ResponseHeader.ServiceResult.check()
636
        return response.Results
637
638
    async def history_read(self, params):
639
        self.logger.info("history_read")
640
        request = ua.HistoryReadRequest()
641
        request.Parameters = params
642
        data = await self.protocol.send_request(request)
643
        response = struct_from_binary(ua.HistoryReadResponse, data)
644
        self.logger.debug(response)
645
        response.ResponseHeader.ServiceResult.check()
646
        return response.Results
647
648
    async def modify_monitored_items(self, params):
649
        self.logger.info("modify_monitored_items")
650
        request = ua.ModifyMonitoredItemsRequest()
651
        request.Parameters = params
652
        data = await self.protocol.send_request(request)
653
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
654
        self.logger.debug(response)
655
        response.ResponseHeader.ServiceResult.check()
656
        return response.Results
657
658
    async def register_nodes(self, nodes):
659
        self.logger.info("register_nodes")
660
        request = ua.RegisterNodesRequest()
661
        request.Parameters.NodesToRegister = nodes
662
        data = await self.protocol.send_request(request)
663
        response = struct_from_binary(ua.RegisterNodesResponse, data)
664
        self.logger.debug(response)
665
        response.ResponseHeader.ServiceResult.check()
666
        return response.Parameters.RegisteredNodeIds
667
668
    async def unregister_nodes(self, nodes):
669
        self.logger.info("unregister_nodes")
670
        request = ua.UnregisterNodesRequest()
671
        request.Parameters.NodesToUnregister = nodes
672
        data = await self.protocol.send_request(request)
673
        response = struct_from_binary(ua.UnregisterNodesResponse, data)
674
        self.logger.debug(response)
675
        response.ResponseHeader.ServiceResult.check()
676
        # nothing to return for this service
677
678
    async def read_attributes(self, nodeids, attr):
679
        self.logger.info("read_attributes of several nodes")
680
        request = ua.ReadRequest()
681
        for nodeid in nodeids:
682
            rv = ua.ReadValueId()
683
            rv.NodeId = nodeid
684
            rv.AttributeId = attr
685
            request.Parameters.NodesToRead.append(rv)
686
        data = await self.protocol.send_request(request)
687
        response = struct_from_binary(ua.ReadResponse, data)
688
        response.ResponseHeader.ServiceResult.check()
689
        return response.Results
690
691
    async def write_attributes(self, nodeids, datavalues, attributeid=ua.AttributeIds.Value):
692
        """
693
        Set an attribute of multiple nodes
694
        datavalue is a ua.DataValue object
695
        """
696
        self.logger.info("write_attributes of several nodes")
697
        request = ua.WriteRequest()
698
        for idx, nodeid in enumerate(nodeids):
699
            attr = ua.WriteValue()
700
            attr.NodeId = nodeid
701
            attr.AttributeId = attributeid
702
            attr.Value = datavalues[idx]
703
            request.Parameters.NodesToWrite.append(attr)
704
        data = await self.protocol.send_request(request)
705
        response = struct_from_binary(ua.WriteResponse, data)
706
        response.ResponseHeader.ServiceResult.check()
707
        return response.Results
708