Completed
Pull Request — master (#602)
by
unknown
06:00
created

UASocketClient   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 173
Duplicated Lines 0 %

Test Coverage

Coverage 91.27%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 173
ccs 115
cts 126
cp 0.9127
rs 9.6
wmc 32

14 Methods

Rating   Name   Duplication   Size   Complexity  
A start() 0 8 1
B _send_request() 0 24 4
A send_request() 0 11 2
A _call_callback() 0 6 3
A check_answer() 0 9 2
A _create_request_header() 0 7 1
A __init__() 0 12 1
B _receive() 0 12 5
A _run() 0 11 4
A connect_socket() 0 9 1
A send_hello() 0 12 2
A open_secure_channel() 0 12 1
A disconnect_socket() 0 9 3
A close_secure_channel() 0 13 2
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14 1
from opcua.common.connection import SecureConnection
15
16
17 1
class UASocketClient(object):
18
    """
19
    handle socket connection and send ua messages
20
    timeout is the timeout used while waiting for an ua answer from server
21
    """
22 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
23 1
        self.logger = logging.getLogger(__name__ + ".Socket")
24 1
        self._thread = None
25 1
        self._lock = Lock()
26 1
        self.timeout = timeout
27 1
        self._socket = None
28 1
        self._do_stop = False
29 1
        self.authentication_token = ua.NodeId()
30 1
        self._request_id = 0
31 1
        self._request_handle = 0
32 1
        self._callbackmap = {}
33 1
        self._connection = SecureConnection(security_policy)
34
35 1
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39
        should not be necessary to call directly
40
        """
41 1
        self._thread = Thread(target=self._run)
42 1
        self._thread.start()
43
44 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48
        returns future
49
        """
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            self.logger.debug("Sending: %s", request)
53 1
            try:
54 1
                binreq = struct_to_binary(request)
55
            except:
56
                # reset reqeust handle if any error
57
                # see self._create_request_header
58
                self._request_handle -= 1
59
                raise
60 1
            self._request_id += 1
61 1
            future = Future()
62 1
            if callback:
63 1
                future.add_done_callback(callback)
64 1
            self._callbackmap[self._request_id] = future
65 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
66 1
            self._socket.write(msg)
67 1
        return future
68
69 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70
        """
71
        send request to server.
72
        timeout is the timeout written in ua header
73
        returns response object if no callback is provided
74
        """
75 1
        future = self._send_request(request, callback, timeout, message_type)
76 1
        if not callback:
77 1
            data = future.result(self.timeout)
78 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
79 1
            return data
80
81 1
    def check_answer(self, data, context):
82 1
        data = data.copy()
83 1
        typeid = nodeid_from_binary(data)
84 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
85 1
            self.logger.warning("ServiceFault from server received %s", context)
86 1
            hdr = struct_from_binary(ua.ResponseHeader, data)
87 1
            hdr.ServiceResult.check()
88
            return False
89 1
        return True
90
91 1
    def _run(self):
92 1
        self.logger.info("Thread started")
93 1
        while not self._do_stop:
94 1
            try:
95 1
                self._receive()
96 1
            except ua.utils.SocketClosedException:
97 1
                self.logger.info("Socket has closed connection")
98 1
                break
99
            except UaError:
100
                self.logger.exception("Protocol Error")
101 1
        self.logger.info("Thread ended")
102
103 1
    def _receive(self):
104 1
        msg = self._connection.receive_from_socket(self._socket)
105 1
        if msg is None:
106
            return
107 1
        elif isinstance(msg, ua.Message):
108 1
            self._call_callback(msg.request_id(), msg.body())
109 1
        elif isinstance(msg, ua.Acknowledge):
110 1
            self._call_callback(0, msg)
111
        elif isinstance(msg, ua.ErrorMessage):
112
            self.logger.warning("Received an error: %s", msg)
113
        else:
114
            raise ua.UaError("Unsupported message type: %s", msg)
115
116 1
    def _call_callback(self, request_id, body):
117 1
        with self._lock:
118 1
            future = self._callbackmap.pop(request_id, None)
119 1
            if future is None:
120
                raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
121 1
        future.set_result(body)
122
123 1
    def _create_request_header(self, timeout=1000):
124 1
        hdr = ua.RequestHeader()
125 1
        hdr.AuthenticationToken = self.authentication_token
126 1
        self._request_handle += 1
127 1
        hdr.RequestHandle = self._request_handle
128 1
        hdr.TimeoutHint = timeout
129 1
        return hdr
130
131 1
    def connect_socket(self, host, port):
132
        """
133
        connect to server socket and start receiving thread
134
        """
135 1
        self.logger.info("opening connection")
136 1
        sock = socket.create_connection((host, port))
137 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
138 1
        self._socket = ua.utils.SocketWrapper(sock)
139 1
        self.start()
140
141 1
    def disconnect_socket(self):
142 1
        self.logger.info("Request to close socket received")
143 1
        self._do_stop = True
144 1
        self._socket.socket.shutdown(socket.SHUT_RDWR)
145 1
        self._socket.socket.close()
146 1
        self.logger.info("Socket closed, waiting for receiver thread to terminate...")
147 1
        if self._thread and self._thread.is_alive():
148 1
            self._thread.join()
149 1
        self.logger.info("Done closing socket: Receiving thread terminated, socket disconnected")
150
151 1
    def send_hello(self, url, max_messagesize = 0, max_chunkcount = 0):
152 1
        hello = ua.Hello()
153 1
        hello.EndpointUrl = url
154 1
        hello.MaxMessageSize = max_messagesize
155 1
        hello.MaxChunkCount = max_chunkcount
156 1
        future = Future()
157 1
        with self._lock:
158 1
            self._callbackmap[0] = future
159 1
        binmsg = uatcp_to_binary(ua.MessageType.Hello, hello)
160 1
        self._socket.write(binmsg)
161 1
        ack = future.result(self.timeout)
162 1
        return ack
163
164 1
    def open_secure_channel(self, params):
165 1
        self.logger.info("open_secure_channel")
166 1
        request = ua.OpenSecureChannelRequest()
167 1
        request.Parameters = params
168 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
169
        
170
        # FIXME: we have a race condition here
171
        # we can get a packet with the new token id before we reach to store it..
172 1
        response = struct_from_binary(ua.OpenSecureChannelResponse, future.result(self.timeout))
173 1
        response.ResponseHeader.ServiceResult.check()
174 1
        self._connection.set_channel(response.Parameters)
175 1
        return response.Parameters
176
177 1
    def close_secure_channel(self):
178
        """
179
        close secure channel. It seems to trigger a shutdown of socket
180
        in most servers, so be prepare to reconnect.
181
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
182
        """
183 1
        self.logger.info("close_secure_channel")
184 1
        request = ua.CloseSecureChannelRequest()
185 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
186 1
        with self._lock:
187
            # don't expect any more answers
188 1
            future.cancel()
189 1
            self._callbackmap.clear()
190
191
        # some servers send a response here, most do not ... so we ignore
192
193
194 1
class UaClient(object):
195
196
    """
197
    low level OPC-UA client.
198
199
    It implements (almost) all methods defined in opcua spec
200
    taking in argument the structures defined in opcua spec.
201
202
    In this Python implementation  most of the structures are defined in
203
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
204
    """
205
206 1
    def __init__(self, timeout=1):
207 1
        self.logger = logging.getLogger(__name__)
208
        # _publishcallbacks should be accessed in recv thread only
209 1
        self._publishcallbacks = {}
210 1
        self._timeout = timeout
211 1
        self._uasocket = None
212 1
        self.security_policy = ua.SecurityPolicy()
213
214 1
    def set_security(self, policy):
215 1
        self.security_policy = policy
216
217 1
    def connect_socket(self, host, port):
218
        """
219
        connect to server socket and start receiving thread
220
        """
221 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self.security_policy)
222 1
        return self._uasocket.connect_socket(host, port)
223
224 1
    def disconnect_socket(self):
225 1
        return self._uasocket.disconnect_socket()
226
227 1
    def send_hello(self, url, max_messagesize = 0, max_chunkcount = 0):
228 1
        return self._uasocket.send_hello(url, max_messagesize, max_chunkcount)
229
230 1
    def open_secure_channel(self, params):
231 1
        return self._uasocket.open_secure_channel(params)
232
233 1
    def close_secure_channel(self):
234
        """
235
        close secure channel. It seems to trigger a shutdown of socket
236
        in most servers, so be prepare to reconnect
237
        """
238 1
        return self._uasocket.close_secure_channel()
239
240 1
    def create_session(self, parameters):
241 1
        self.logger.info("create_session")
242 1
        request = ua.CreateSessionRequest()
243 1
        request.Parameters = parameters
244 1
        data = self._uasocket.send_request(request)
245 1
        response = struct_from_binary(ua.CreateSessionResponse, data)
246 1
        self.logger.debug(response)
247 1
        response.ResponseHeader.ServiceResult.check()
248 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
249 1
        return response.Parameters
250
251 1
    def activate_session(self, parameters):
252 1
        self.logger.info("activate_session")
253 1
        request = ua.ActivateSessionRequest()
254 1
        request.Parameters = parameters
255 1
        data = self._uasocket.send_request(request)
256 1
        response = struct_from_binary(ua.ActivateSessionResponse, data)
257 1
        self.logger.debug(response)
258 1
        response.ResponseHeader.ServiceResult.check()
259 1
        return response.Parameters
260
261 1
    def close_session(self, deletesubscriptions):
262 1
        self.logger.info("close_session")
263 1
        request = ua.CloseSessionRequest()
264 1
        request.DeleteSubscriptions = deletesubscriptions
265 1
        data = self._uasocket.send_request(request)
266 1
        response = struct_from_binary(ua.CloseSessionResponse, data)
267 1
        try:
268 1
            response.ResponseHeader.ServiceResult.check()
269
        except BadSessionClosed:
270
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
271
            #          we can just ignore it therefore.
272
            #          Alternatively we could make sure that there are no publish requests in flight when
273
            #          closing the session.
274
            pass
275
276 1
    def browse(self, parameters):
277 1
        self.logger.info("browse")
278 1
        request = ua.BrowseRequest()
279 1
        request.Parameters = parameters
280 1
        data = self._uasocket.send_request(request)
281 1
        response = struct_from_binary(ua.BrowseResponse, data)
282 1
        self.logger.debug(response)
283 1
        response.ResponseHeader.ServiceResult.check()
284 1
        return response.Results
285
286 1
    def browse_next(self, parameters):
287
        self.logger.info("browse next")
288
        request = ua.BrowseNextRequest()
289
        request.Parameters = parameters
290
        data = self._uasocket.send_request(request)
291
        response = struct_from_binary(ua.BrowseNextResponse, data)
292
        self.logger.debug(response)
293
        response.ResponseHeader.ServiceResult.check()
294
        return response.Parameters.Results
295
296 1
    def read(self, parameters):
297 1
        self.logger.info("read")
298 1
        request = ua.ReadRequest()
299 1
        request.Parameters = parameters
300 1
        data = self._uasocket.send_request(request)
301 1
        response = struct_from_binary(ua.ReadResponse, data)
302 1
        self.logger.debug(response)
303 1
        response.ResponseHeader.ServiceResult.check()
304
        # cast to Enum attributes that need to
305 1
        for idx, rv in enumerate(parameters.NodesToRead):
306 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
307 1
                dv = response.Results[idx]
308 1
                if dv.StatusCode.is_good():
309 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
310 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
311 1
                dv = response.Results[idx]
312 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
313 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
314 1
        return response.Results
315
316 1
    def write(self, params):
317 1
        self.logger.info("read")
318 1
        request = ua.WriteRequest()
319 1
        request.Parameters = params
320 1
        data = self._uasocket.send_request(request)
321 1
        response = struct_from_binary(ua.WriteResponse, data)
322 1
        self.logger.debug(response)
323 1
        response.ResponseHeader.ServiceResult.check()
324 1
        return response.Results
325
326 1
    def get_endpoints(self, params):
327 1
        self.logger.info("get_endpoint")
328 1
        request = ua.GetEndpointsRequest()
329 1
        request.Parameters = params
330 1
        data = self._uasocket.send_request(request)
331 1
        response = struct_from_binary(ua.GetEndpointsResponse, data)
332 1
        self.logger.debug(response)
333 1
        response.ResponseHeader.ServiceResult.check()
334 1
        return response.Endpoints
335
336 1
    def find_servers(self, params):
337 1
        self.logger.info("find_servers")
338 1
        request = ua.FindServersRequest()
339 1
        request.Parameters = params
340 1
        data = self._uasocket.send_request(request)
341 1
        response = struct_from_binary(ua.FindServersResponse, data)
342 1
        self.logger.debug(response)
343 1
        response.ResponseHeader.ServiceResult.check()
344 1
        return response.Servers
345
346 1
    def find_servers_on_network(self, params):
347
        self.logger.info("find_servers_on_network")
348
        request = ua.FindServersOnNetworkRequest()
349
        request.Parameters = params
350
        data = self._uasocket.send_request(request)
351
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
352
        self.logger.debug(response)
353
        response.ResponseHeader.ServiceResult.check()
354
        return response.Parameters
355
356 1
    def register_server(self, registered_server):
357 1
        self.logger.info("register_server")
358 1
        request = ua.RegisterServerRequest()
359 1
        request.Server = registered_server
360 1
        data = self._uasocket.send_request(request)
361 1
        response = struct_from_binary(ua.RegisterServerResponse, data)
362 1
        self.logger.debug(response)
363 1
        response.ResponseHeader.ServiceResult.check()
364
        # nothing to return for this service
365
366 1
    def register_server2(self, params):
367
        self.logger.info("register_server2")
368
        request = ua.RegisterServer2Request()
369
        request.Parameters = params
370
        data = self._uasocket.send_request(request)
371
        response = struct_from_binary(ua.RegisterServer2Response, data)
372
        self.logger.debug(response)
373
        response.ResponseHeader.ServiceResult.check()
374
        return response.ConfigurationResults
375
376 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
377 1
        self.logger.info("translate_browsepath_to_nodeid")
378 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
379 1
        request.Parameters.BrowsePaths = browsepaths
380 1
        data = self._uasocket.send_request(request)
381 1
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
382 1
        self.logger.debug(response)
383 1
        response.ResponseHeader.ServiceResult.check()
384 1
        return response.Results
385
386 1
    def create_subscription(self, params, callback):
387 1
        self.logger.info("create_subscription")
388 1
        request = ua.CreateSubscriptionRequest()
389 1
        request.Parameters = params
390 1
        resp_fut = Future()
391 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
392 1
        self._uasocket.send_request(request, mycallbak)
393 1
        return resp_fut.result(self._timeout)
394
395 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
396 1
        self.logger.info("_create_subscription_callback")
397 1
        data = data_fut.result()
398 1
        response = struct_from_binary(ua.CreateSubscriptionResponse, data)
399 1
        self.logger.debug(response)
400 1
        response.ResponseHeader.ServiceResult.check()
401 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
402 1
        resp_fut.set_result(response.Parameters)
403
404 1
    def delete_subscriptions(self, subscriptionids):
405 1
        self.logger.info("delete_subscription")
406 1
        request = ua.DeleteSubscriptionsRequest()
407 1
        request.Parameters.SubscriptionIds = subscriptionids
408 1
        resp_fut = Future()
409 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
410 1
        self._uasocket.send_request(request, mycallbak)
411 1
        return resp_fut.result(self._timeout)
412
413 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
414 1
        self.logger.info("_delete_subscriptions_callback")
415 1
        data = data_fut.result()
416 1
        response = struct_from_binary(ua.DeleteSubscriptionsResponse, data)
417 1
        self.logger.debug(response)
418 1
        response.ResponseHeader.ServiceResult.check()
419 1
        for sid in subscriptionids:
420 1
            self._publishcallbacks.pop(sid)
421 1
        resp_fut.set_result(response.Results)
422
423 1
    def publish(self, acks=None):
424 1
        self.logger.info("publish")
425 1
        if acks is None:
426 1
            acks = []
427 1
        request = ua.PublishRequest()
428 1
        request.Parameters.SubscriptionAcknowledgements = acks
429 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=0)
430
431 1
    def _call_publish_callback(self, future):
432 1
        self.logger.info("call_publish_callback")
433 1
        data = future.result()
434
435
        # check if answer looks ok
436 1
        try:
437 1
            self._uasocket.check_answer(data, "while waiting for publish response")
438
        except BadTimeout: # Spec Part 4, 7.28
439
            self.publish()
440
            return
441
        except BadNoSubscription: # Spec Part 5, 13.8.1
442
            # BadNoSubscription is expected after deleting the last subscription.
443
            #
444
            # We should therefore also check for len(self._publishcallbacks) == 0, but
445
            # this gets us into trouble if a Publish response arrives before the
446
            # DeleteSubscription response.
447
            #
448
            # We could remove the callback already when sending the DeleteSubscription request,
449
            # but there are some legitimate reasons to keep them around, such as when the server
450
            # responds with "BadTimeout" and we should try again later instead of just removing
451
            # the subscription client-side.
452
            #
453
            # There are a variety of ways to act correctly, but the most practical solution seems
454
            # to be to just ignore any BadNoSubscription responses.
455
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
456
            return
457
458
        # parse publish response
459 1
        try:
460 1
            response = struct_from_binary(ua.PublishResponse, data)
461 1
            self.logger.debug(response)
462
        except Exception:
463
            # INFO: catching the exception here might be obsolete because we already
464
            #       catch BadTimeout above. However, it's not really clear what this code
465
            #       does so it stays in, doesn't seem to hurt.
466
            self.logger.exception("Error parsing notificatipn from server")
467
            self.publish([]) #send publish request ot server so he does stop sending notifications
468
            return
469
470
        # look for callback
471 1
        try:
472 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
473
        except KeyError:
474
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
475
            return
476
477
        # do callback
478 1
        try:
479 1
            callback(response.Parameters)
480
        except Exception:  # we call client code, catch everything!
481
            self.logger.exception("Exception while calling user callback: %s")
482
483 1
    def create_monitored_items(self, params):
484 1
        self.logger.info("create_monitored_items")
485 1
        request = ua.CreateMonitoredItemsRequest()
486 1
        request.Parameters = params
487 1
        data = self._uasocket.send_request(request)
488 1
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
489 1
        self.logger.debug(response)
490 1
        response.ResponseHeader.ServiceResult.check()
491 1
        return response.Results
492
493 1
    def delete_monitored_items(self, params):
494 1
        self.logger.info("delete_monitored_items")
495 1
        request = ua.DeleteMonitoredItemsRequest()
496 1
        request.Parameters = params
497 1
        data = self._uasocket.send_request(request)
498 1
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
499 1
        self.logger.debug(response)
500 1
        response.ResponseHeader.ServiceResult.check()
501 1
        return response.Results
502
503 1
    def add_nodes(self, nodestoadd):
504 1
        self.logger.info("add_nodes")
505 1
        request = ua.AddNodesRequest()
506 1
        request.Parameters.NodesToAdd = nodestoadd
507 1
        data = self._uasocket.send_request(request)
508 1
        response = struct_from_binary(ua.AddNodesResponse, data)
509 1
        self.logger.debug(response)
510 1
        response.ResponseHeader.ServiceResult.check()
511 1
        return response.Results
512
513 1
    def add_references(self, refs):
514 1
        self.logger.info("add_references")
515 1
        request = ua.AddReferencesRequest()
516 1
        request.Parameters.ReferencesToAdd = refs
517 1
        data = self._uasocket.send_request(request)
518 1
        response = struct_from_binary(ua.AddReferencesResponse, data)
519 1
        self.logger.debug(response)
520 1
        response.ResponseHeader.ServiceResult.check()
521 1
        return response.Results
522
523 1
    def delete_references(self, refs):
524 1
        self.logger.info("delete")
525 1
        request = ua.DeleteReferencesRequest()
526 1
        request.Parameters.ReferencesToDelete = refs
527 1
        data = self._uasocket.send_request(request)
528 1
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
529 1
        self.logger.debug(response)
530 1
        response.ResponseHeader.ServiceResult.check()
531 1
        return response.Parameters.Results
532
533
534 1
    def delete_nodes(self, params):
535 1
        self.logger.info("delete_nodes")
536 1
        request = ua.DeleteNodesRequest()
537 1
        request.Parameters = params
538 1
        data = self._uasocket.send_request(request)
539 1
        response = struct_from_binary(ua.DeleteNodesResponse, data)
540 1
        self.logger.debug(response)
541 1
        response.ResponseHeader.ServiceResult.check()
542 1
        return response.Results
543
544 1
    def call(self, methodstocall):
545 1
        request = ua.CallRequest()
546 1
        request.Parameters.MethodsToCall = methodstocall
547 1
        data = self._uasocket.send_request(request)
548 1
        response = struct_from_binary(ua.CallResponse, data)
549 1
        self.logger.debug(response)
550 1
        response.ResponseHeader.ServiceResult.check()
551 1
        return response.Results
552
553 1
    def history_read(self, params):
554
        self.logger.info("history_read")
555
        request = ua.HistoryReadRequest()
556
        request.Parameters = params
557
        data = self._uasocket.send_request(request)
558
        response = struct_from_binary(ua.HistoryReadResponse, data)
559
        self.logger.debug(response)
560
        response.ResponseHeader.ServiceResult.check()
561
        return response.Results
562
563 1
    def modify_monitored_items(self, params):
564
        self.logger.info("modify_monitored_items")
565
        request = ua.ModifyMonitoredItemsRequest()
566
        request.Parameters = params
567
        data = self._uasocket.send_request(request)
568
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
569
        self.logger.debug(response)
570
        response.ResponseHeader.ServiceResult.check()
571
        return response.Results
572