Passed
Push — master ( 5440ff...7b97c4 )
by Olivier
03:46
created

UASocketClient   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 177
Duplicated Lines 0 %

Test Coverage

Coverage 91.27%

Importance

Changes 4
Bugs 2 Features 0
Metric Value
dl 0
loc 177
ccs 115
cts 126
cp 0.9127
rs 9.6
c 4
b 2
f 0
wmc 32

14 Methods

Rating   Name   Duplication   Size   Complexity  
A start() 0 8 1
B _send_request() 0 24 4
A send_request() 0 11 2
A check_answer() 0 9 2
A __init__() 0 12 1
B _receive() 0 12 5
A _run() 0 11 4
A _call_callback() 0 9 3
A _create_request_header() 0 7 1
A connect_socket() 0 10 1
A send_hello() 0 12 2
A open_secure_channel() 0 12 1
A disconnect_socket() 0 9 3
A close_secure_channel() 0 13 2
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14 1
from opcua.common.connection import SecureConnection
15
16
17 1
class UASocketClient(object):
18
    """
19
    handle socket connection and send ua messages
20
    timeout is the timeout used while waiting for an ua answer from server
21
    """
22 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
23 1
        self.logger = logging.getLogger(__name__ + ".Socket")
24 1
        self._thread = None
25 1
        self._lock = Lock()
26 1
        self.timeout = timeout
27 1
        self._socket = None
28 1
        self._do_stop = False
29 1
        self.authentication_token = ua.NodeId()
30 1
        self._request_id = 0
31 1
        self._request_handle = 0
32 1
        self._callbackmap = {}
33 1
        self._connection = SecureConnection(security_policy)
34
35 1
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39
        should not be necessary to call directly
40
        """
41 1
        self._thread = Thread(target=self._run)
42 1
        self._thread.start()
43
44 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48
        returns future
49
        """
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            self.logger.debug("Sending: %s", request)
53 1
            try:
54 1
                binreq = struct_to_binary(request)
55
            except Exception:
56
                # reset reqeust handle if any error
57
                # see self._create_request_header
58
                self._request_handle -= 1
59
                raise
60 1
            self._request_id += 1
61 1
            future = Future()
62 1
            if callback:
63 1
                future.add_done_callback(callback)
64 1
            self._callbackmap[self._request_id] = future
65 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
66 1
            self._socket.write(msg)
67 1
        return future
68
69 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70
        """
71
        send request to server.
72
        timeout is the timeout written in ua header
73
        returns response object if no callback is provided
74
        """
75 1
        future = self._send_request(request, callback, timeout, message_type)
76 1
        if not callback:
77 1
            data = future.result(self.timeout)
78 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
79 1
            return data
80
81 1
    def check_answer(self, data, context):
82 1
        data = data.copy()
83 1
        typeid = nodeid_from_binary(data)
84 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
85 1
            self.logger.warning("ServiceFault from server received %s", context)
86 1
            hdr = struct_from_binary(ua.ResponseHeader, data)
87 1
            hdr.ServiceResult.check()
88
            return False
89 1
        return True
90
91 1
    def _run(self):
92 1
        self.logger.info("Thread started")
93 1
        while not self._do_stop:
94 1
            try:
95 1
                self._receive()
96 1
            except ua.utils.SocketClosedException:
97 1
                self.logger.info("Socket has closed connection")
98 1
                break
99
            except UaError:
100
                self.logger.exception("Protocol Error")
101 1
        self.logger.info("Thread ended")
102
103 1
    def _receive(self):
104 1
        msg = self._connection.receive_from_socket(self._socket)
105 1
        if msg is None:
106
            return
107 1
        elif isinstance(msg, ua.Message):
108 1
            self._call_callback(msg.request_id(), msg.body())
109 1
        elif isinstance(msg, ua.Acknowledge):
110 1
            self._call_callback(0, msg)
111
        elif isinstance(msg, ua.ErrorMessage):
112
            self.logger.warning("Received an error: %s", msg)
113
        else:
114
            raise ua.UaError("Unsupported message type: %s", msg)
115
116 1
    def _call_callback(self, request_id, body):
117 1
        with self._lock:
118 1
            future = self._callbackmap.pop(request_id, None)
119 1
            if future is None:
120
                raise ua.UaError(
121
                    "No future object found for request: {0}, callbacks in list are {1}"
122
                    .format(request_id, self._callbackmap.keys())
123
                )
124 1
        future.set_result(body)
125
126 1
    def _create_request_header(self, timeout=1000):
127 1
        hdr = ua.RequestHeader()
128 1
        hdr.AuthenticationToken = self.authentication_token
129 1
        self._request_handle += 1
130 1
        hdr.RequestHandle = self._request_handle
131 1
        hdr.TimeoutHint = timeout
132 1
        return hdr
133
134 1
    def connect_socket(self, host, port):
135
        """
136
        connect to server socket and start receiving thread
137
        """
138 1
        self.logger.info("opening connection")
139 1
        sock = socket.create_connection((host, port))
140
        # nodelay ncessary to avoid packing in one frame, some servers do not like it
141 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
142 1
        self._socket = ua.utils.SocketWrapper(sock)
143 1
        self.start()
144
145 1
    def disconnect_socket(self):
146 1
        self.logger.info("Request to close socket received")
147 1
        self._do_stop = True
148 1
        self._socket.socket.shutdown(socket.SHUT_RDWR)
149 1
        self._socket.socket.close()
150 1
        self.logger.info("Socket closed, waiting for receiver thread to terminate...")
151 1
        if self._thread and self._thread.is_alive():
152 1
            self._thread.join()
153 1
        self.logger.info("Done closing socket: Receiving thread terminated, socket disconnected")
154
155 1
    def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
156 1
        hello = ua.Hello()
157 1
        hello.EndpointUrl = url
158 1
        hello.MaxMessageSize = max_messagesize
159 1
        hello.MaxChunkCount = max_chunkcount
160 1
        future = Future()
161 1
        with self._lock:
162 1
            self._callbackmap[0] = future
163 1
        binmsg = uatcp_to_binary(ua.MessageType.Hello, hello)
164 1
        self._socket.write(binmsg)
165 1
        ack = future.result(self.timeout)
166 1
        return ack
167
168 1
    def open_secure_channel(self, params):
169 1
        self.logger.info("open_secure_channel")
170 1
        request = ua.OpenSecureChannelRequest()
171 1
        request.Parameters = params
172 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
173
174
        # FIXME: we have a race condition here
175
        # we can get a packet with the new token id before we reach to store it..
176 1
        response = struct_from_binary(ua.OpenSecureChannelResponse, future.result(self.timeout))
177 1
        response.ResponseHeader.ServiceResult.check()
178 1
        self._connection.set_channel(response.Parameters)
179 1
        return response.Parameters
180
181 1
    def close_secure_channel(self):
182
        """
183
        close secure channel. It seems to trigger a shutdown of socket in most servers, so be prepare to reconnect.
184
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close
185
        socket
186
        """
187 1
        self.logger.info("close_secure_channel")
188 1
        request = ua.CloseSecureChannelRequest()
189 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
190 1
        with self._lock:
191
            # don't expect any more answers
192 1
            future.cancel()
193 1
            self._callbackmap.clear()
194
195
        # some servers send a response here, most do not ... so we ignore
196
197
198 1
class UaClient(object):
199
200
    """
201
    low level OPC-UA client.
202
203
    It implements (almost) all methods defined in opcua spec
204
    taking in argument the structures defined in opcua spec.
205
206
    In this Python implementation  most of the structures are defined in
207
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
208
    """
209
210 1
    def __init__(self, timeout=1):
211 1
        self.logger = logging.getLogger(__name__)
212
        # _publishcallbacks should be accessed in recv thread only
213 1
        self._publishcallbacks = {}
214 1
        self._timeout = timeout
215 1
        self._uasocket = None
216 1
        self.security_policy = ua.SecurityPolicy()
217
218 1
    def set_security(self, policy):
219 1
        self.security_policy = policy
220
221 1
    def connect_socket(self, host, port):
222
        """
223
        connect to server socket and start receiving thread
224
        """
225 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self.security_policy)
226 1
        return self._uasocket.connect_socket(host, port)
227
228 1
    def disconnect_socket(self):
229 1
        return self._uasocket.disconnect_socket()
230
231 1
    def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
232 1
        return self._uasocket.send_hello(url, max_messagesize, max_chunkcount)
233
234 1
    def open_secure_channel(self, params):
235 1
        return self._uasocket.open_secure_channel(params)
236
237 1
    def close_secure_channel(self):
238
        """
239
        close secure channel. It seems to trigger a shutdown of socket
240
        in most servers, so be prepare to reconnect
241
        """
242 1
        return self._uasocket.close_secure_channel()
243
244 1
    def create_session(self, parameters):
245 1
        self.logger.info("create_session")
246 1
        request = ua.CreateSessionRequest()
247 1
        request.Parameters = parameters
248 1
        data = self._uasocket.send_request(request)
249 1
        response = struct_from_binary(ua.CreateSessionResponse, data)
250 1
        self.logger.debug(response)
251 1
        response.ResponseHeader.ServiceResult.check()
252 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
253 1
        return response.Parameters
254
255 1
    def activate_session(self, parameters):
256 1
        self.logger.info("activate_session")
257 1
        request = ua.ActivateSessionRequest()
258 1
        request.Parameters = parameters
259 1
        data = self._uasocket.send_request(request)
260 1
        response = struct_from_binary(ua.ActivateSessionResponse, data)
261 1
        self.logger.debug(response)
262 1
        response.ResponseHeader.ServiceResult.check()
263 1
        return response.Parameters
264
265 1
    def close_session(self, deletesubscriptions):
266 1
        self.logger.info("close_session")
267 1
        request = ua.CloseSessionRequest()
268 1
        request.DeleteSubscriptions = deletesubscriptions
269 1
        data = self._uasocket.send_request(request)
270 1
        response = struct_from_binary(ua.CloseSessionResponse, data)
271 1
        try:
272 1
            response.ResponseHeader.ServiceResult.check()
273
        except BadSessionClosed:
274
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
275
            #          we can just ignore it therefore.
276
            #          Alternatively we could make sure that there are no publish requests in flight when
277
            #          closing the session.
278
            pass
279
280 1
    def browse(self, parameters):
281 1
        self.logger.info("browse")
282 1
        request = ua.BrowseRequest()
283 1
        request.Parameters = parameters
284 1
        data = self._uasocket.send_request(request)
285 1
        response = struct_from_binary(ua.BrowseResponse, data)
286 1
        self.logger.debug(response)
287 1
        response.ResponseHeader.ServiceResult.check()
288 1
        return response.Results
289
290 1
    def browse_next(self, parameters):
291
        self.logger.info("browse next")
292
        request = ua.BrowseNextRequest()
293
        request.Parameters = parameters
294
        data = self._uasocket.send_request(request)
295
        response = struct_from_binary(ua.BrowseNextResponse, data)
296
        self.logger.debug(response)
297
        response.ResponseHeader.ServiceResult.check()
298
        return response.Parameters.Results
299
300 1
    def read(self, parameters):
301 1
        self.logger.info("read")
302 1
        request = ua.ReadRequest()
303 1
        request.Parameters = parameters
304 1
        data = self._uasocket.send_request(request)
305 1
        response = struct_from_binary(ua.ReadResponse, data)
306 1
        self.logger.debug(response)
307 1
        response.ResponseHeader.ServiceResult.check()
308
        # cast to Enum attributes that need to
309 1
        for idx, rv in enumerate(parameters.NodesToRead):
310 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
311 1
                dv = response.Results[idx]
312 1
                if dv.StatusCode.is_good():
313 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
314 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
315 1
                dv = response.Results[idx]
316 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
317 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
318 1
        return response.Results
319
320 1
    def write(self, params):
321 1
        self.logger.info("read")
322 1
        request = ua.WriteRequest()
323 1
        request.Parameters = params
324 1
        data = self._uasocket.send_request(request)
325 1
        response = struct_from_binary(ua.WriteResponse, data)
326 1
        self.logger.debug(response)
327 1
        response.ResponseHeader.ServiceResult.check()
328 1
        return response.Results
329
330 1
    def get_endpoints(self, params):
331 1
        self.logger.info("get_endpoint")
332 1
        request = ua.GetEndpointsRequest()
333 1
        request.Parameters = params
334 1
        data = self._uasocket.send_request(request)
335 1
        response = struct_from_binary(ua.GetEndpointsResponse, data)
336 1
        self.logger.debug(response)
337 1
        response.ResponseHeader.ServiceResult.check()
338 1
        return response.Endpoints
339
340 1
    def find_servers(self, params):
341 1
        self.logger.info("find_servers")
342 1
        request = ua.FindServersRequest()
343 1
        request.Parameters = params
344 1
        data = self._uasocket.send_request(request)
345 1
        response = struct_from_binary(ua.FindServersResponse, data)
346 1
        self.logger.debug(response)
347 1
        response.ResponseHeader.ServiceResult.check()
348 1
        return response.Servers
349
350 1
    def find_servers_on_network(self, params):
351
        self.logger.info("find_servers_on_network")
352
        request = ua.FindServersOnNetworkRequest()
353
        request.Parameters = params
354
        data = self._uasocket.send_request(request)
355
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
356
        self.logger.debug(response)
357
        response.ResponseHeader.ServiceResult.check()
358
        return response.Parameters
359
360 1
    def register_server(self, registered_server):
361 1
        self.logger.info("register_server")
362 1
        request = ua.RegisterServerRequest()
363 1
        request.Server = registered_server
364 1
        data = self._uasocket.send_request(request)
365 1
        response = struct_from_binary(ua.RegisterServerResponse, data)
366 1
        self.logger.debug(response)
367 1
        response.ResponseHeader.ServiceResult.check()
368
        # nothing to return for this service
369
370 1
    def register_server2(self, params):
371
        self.logger.info("register_server2")
372
        request = ua.RegisterServer2Request()
373
        request.Parameters = params
374
        data = self._uasocket.send_request(request)
375
        response = struct_from_binary(ua.RegisterServer2Response, data)
376
        self.logger.debug(response)
377
        response.ResponseHeader.ServiceResult.check()
378
        return response.ConfigurationResults
379
380 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
381 1
        self.logger.info("translate_browsepath_to_nodeid")
382 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
383 1
        request.Parameters.BrowsePaths = browsepaths
384 1
        data = self._uasocket.send_request(request)
385 1
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
386 1
        self.logger.debug(response)
387 1
        response.ResponseHeader.ServiceResult.check()
388 1
        return response.Results
389
390 1
    def create_subscription(self, params, callback):
391 1
        self.logger.info("create_subscription")
392 1
        request = ua.CreateSubscriptionRequest()
393 1
        request.Parameters = params
394 1
        resp_fut = Future()
395 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
396 1
        self._uasocket.send_request(request, mycallbak)
397 1
        return resp_fut.result(self._timeout)
398
399 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
400 1
        self.logger.info("_create_subscription_callback")
401 1
        data = data_fut.result()
402 1
        response = struct_from_binary(ua.CreateSubscriptionResponse, data)
403 1
        self.logger.debug(response)
404 1
        response.ResponseHeader.ServiceResult.check()
405 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
406 1
        resp_fut.set_result(response.Parameters)
407
408 1
    def delete_subscriptions(self, subscriptionids):
409 1
        self.logger.info("delete_subscription")
410 1
        request = ua.DeleteSubscriptionsRequest()
411 1
        request.Parameters.SubscriptionIds = subscriptionids
412 1
        resp_fut = Future()
413 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
414 1
        self._uasocket.send_request(request, mycallbak)
415 1
        return resp_fut.result(self._timeout)
416
417 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
418 1
        self.logger.info("_delete_subscriptions_callback")
419 1
        data = data_fut.result()
420 1
        response = struct_from_binary(ua.DeleteSubscriptionsResponse, data)
421 1
        self.logger.debug(response)
422 1
        response.ResponseHeader.ServiceResult.check()
423 1
        for sid in subscriptionids:
424 1
            self._publishcallbacks.pop(sid)
425 1
        resp_fut.set_result(response.Results)
426
427 1
    def publish(self, acks=None):
428 1
        self.logger.info("publish")
429 1
        if acks is None:
430 1
            acks = []
431 1
        request = ua.PublishRequest()
432 1
        request.Parameters.SubscriptionAcknowledgements = acks
433 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=0)
434
435 1
    def _call_publish_callback(self, future):
436 1
        self.logger.info("call_publish_callback")
437 1
        data = future.result()
438
439
        # check if answer looks ok
440 1
        try:
441 1
            self._uasocket.check_answer(data, "while waiting for publish response")
442
        except BadTimeout:  # Spec Part 4, 7.28
443
            self.publish()
444
            return
445
        except BadNoSubscription:  # Spec Part 5, 13.8.1
446
            # BadNoSubscription is expected after deleting the last subscription.
447
            #
448
            # We should therefore also check for len(self._publishcallbacks) == 0, but
449
            # this gets us into trouble if a Publish response arrives before the
450
            # DeleteSubscription response.
451
            #
452
            # We could remove the callback already when sending the DeleteSubscription request,
453
            # but there are some legitimate reasons to keep them around, such as when the server
454
            # responds with "BadTimeout" and we should try again later instead of just removing
455
            # the subscription client-side.
456
            #
457
            # There are a variety of ways to act correctly, but the most practical solution seems
458
            # to be to just ignore any BadNoSubscription responses.
459
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
460
            return
461
462
        # parse publish response
463 1
        try:
464 1
            response = struct_from_binary(ua.PublishResponse, data)
465 1
            self.logger.debug(response)
466
        except Exception:
467
            # INFO: catching the exception here might be obsolete because we already
468
            #       catch BadTimeout above. However, it's not really clear what this code
469
            #       does so it stays in, doesn't seem to hurt.
470
            self.logger.exception("Error parsing notificatipn from server")
471
            self.publish([])  # send publish request ot server so he does stop sending notifications
472
            return
473
474
        # look for callback
475 1
        try:
476 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
477
        except KeyError:
478
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
479
            return
480
481
        # do callback
482 1
        try:
483 1
            callback(response.Parameters)
484
        except Exception:  # we call client code, catch everything!
485
            self.logger.exception("Exception while calling user callback: %s")
486
487 1
    def create_monitored_items(self, params):
488 1
        self.logger.info("create_monitored_items")
489 1
        request = ua.CreateMonitoredItemsRequest()
490 1
        request.Parameters = params
491 1
        data = self._uasocket.send_request(request)
492 1
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
493 1
        self.logger.debug(response)
494 1
        response.ResponseHeader.ServiceResult.check()
495 1
        return response.Results
496
497 1
    def delete_monitored_items(self, params):
498 1
        self.logger.info("delete_monitored_items")
499 1
        request = ua.DeleteMonitoredItemsRequest()
500 1
        request.Parameters = params
501 1
        data = self._uasocket.send_request(request)
502 1
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
503 1
        self.logger.debug(response)
504 1
        response.ResponseHeader.ServiceResult.check()
505 1
        return response.Results
506
507 1
    def add_nodes(self, nodestoadd):
508 1
        self.logger.info("add_nodes")
509 1
        request = ua.AddNodesRequest()
510 1
        request.Parameters.NodesToAdd = nodestoadd
511 1
        data = self._uasocket.send_request(request)
512 1
        response = struct_from_binary(ua.AddNodesResponse, data)
513 1
        self.logger.debug(response)
514 1
        response.ResponseHeader.ServiceResult.check()
515 1
        return response.Results
516
517 1
    def add_references(self, refs):
518 1
        self.logger.info("add_references")
519 1
        request = ua.AddReferencesRequest()
520 1
        request.Parameters.ReferencesToAdd = refs
521 1
        data = self._uasocket.send_request(request)
522 1
        response = struct_from_binary(ua.AddReferencesResponse, data)
523 1
        self.logger.debug(response)
524 1
        response.ResponseHeader.ServiceResult.check()
525 1
        return response.Results
526
527 1
    def delete_references(self, refs):
528 1
        self.logger.info("delete")
529 1
        request = ua.DeleteReferencesRequest()
530 1
        request.Parameters.ReferencesToDelete = refs
531 1
        data = self._uasocket.send_request(request)
532 1
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
533 1
        self.logger.debug(response)
534 1
        response.ResponseHeader.ServiceResult.check()
535 1
        return response.Parameters.Results
536
537 1
    def delete_nodes(self, params):
538 1
        self.logger.info("delete_nodes")
539 1
        request = ua.DeleteNodesRequest()
540 1
        request.Parameters = params
541 1
        data = self._uasocket.send_request(request)
542 1
        response = struct_from_binary(ua.DeleteNodesResponse, data)
543 1
        self.logger.debug(response)
544 1
        response.ResponseHeader.ServiceResult.check()
545 1
        return response.Results
546
547 1
    def call(self, methodstocall):
548 1
        request = ua.CallRequest()
549 1
        request.Parameters.MethodsToCall = methodstocall
550 1
        data = self._uasocket.send_request(request)
551 1
        response = struct_from_binary(ua.CallResponse, data)
552 1
        self.logger.debug(response)
553 1
        response.ResponseHeader.ServiceResult.check()
554 1
        return response.Results
555
556 1
    def history_read(self, params):
557
        self.logger.info("history_read")
558
        request = ua.HistoryReadRequest()
559
        request.Parameters = params
560
        data = self._uasocket.send_request(request)
561
        response = struct_from_binary(ua.HistoryReadResponse, data)
562
        self.logger.debug(response)
563
        response.ResponseHeader.ServiceResult.check()
564
        return response.Results
565
566 1
    def modify_monitored_items(self, params):
567
        self.logger.info("modify_monitored_items")
568
        request = ua.ModifyMonitoredItemsRequest()
569
        request.Parameters = params
570
        data = self._uasocket.send_request(request)
571
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
572
        self.logger.debug(response)
573
        response.ResponseHeader.ServiceResult.check()
574
        return response.Results
575