Completed
Push — master ( 5ed66a...f88dce )
by Olivier
04:33 queued 23s
created

UaClient.delete_monitored_items()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 9
ccs 8
cts 8
cp 1
crap 1
rs 9.6666
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
from opcua.common.uaerrors import UaError, BadTimeout
14
15 1
16
class UASocketClient(object):
17
    """
18
    handle socket connection and send ua messages
19
    timeout is the timeout used while waiting for an ua answer from server
20 1
    """
21 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
22 1
        self.logger = logging.getLogger(__name__ + ".Socket")
23 1
        self._thread = None
24 1
        self._lock = Lock()
25 1
        self.timeout = timeout
26 1
        self._socket = None
27 1
        self._do_stop = False
28 1
        self.authentication_token = ua.NodeId()
29 1
        self._request_id = 0
30 1
        self._request_handle = 0
31 1
        self._callbackmap = {}
32
        self._connection = ua.SecureConnection(security_policy)
33 1
34
    def start(self):
35
        """
36
        Start receiving thread.
37
        this is called automatically in connect and
38
        should not be necessary to call directly
39 1
        """
40 1
        self._thread = Thread(target=self._run)
41
        self._thread.start()
42 1
43
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48 1
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            self.logger.debug("Sending: %s", request)
52 1
            try:
53
                binreq = request.to_binary()
54
            except:
55
                # reset reqeust handle if any error
56
                # see self._create_request_header
57
                self._request_handle -= 1
58 1
                raise
59 1
            self._request_id += 1
60 1
            future = Future()
61 1
            if callback:
62 1
                future.add_done_callback(callback)
63 1
            self._callbackmap[self._request_id] = future
64 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
65 1
            self._socket.write(msg)
66
        return future
67 1
68
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
69
        """
70
        send request to server.
71
        timeout is the timeout written in ua header
72
        returns response object if no callback is provided
73 1
        """
74 1
        future = self._send_request(request, callback, timeout, message_type)
75 1
        if not callback:
76 1
            data = future.result(self.timeout)
77 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
78
            return data
79 1
80 1
    def check_answer(self, data, context):
81 1
        data = data.copy()
82 1
        typeid = ua.NodeId.from_binary(data)
83 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
84 1
            self.logger.warning("ServiceFault from server received %s", context)
85 1
            hdr = ua.ResponseHeader.from_binary(data)
86
            hdr.ServiceResult.check()
87 1
            return False
88
        return True
89 1
90 1
    def _run(self):
91 1
        self.logger.info("Thread started")
92 1
        while not self._do_stop:
93 1
            try:
94 1
                self._receive()
95 1
            except ua.utils.SocketClosedException:
96 1
                self.logger.info("Socket has closed connection")
97 1
                break
98
            except UaError:
99 1
                self.logger.exception("Protocol Error")
100 1
        self.logger.info("Thread ended")
101 1
102
    def _receive(self):
103 1
        msg = self._connection.receive_from_socket(self._socket)
104 1
        if msg is None:
105 1
            return
106 1
        elif isinstance(msg, ua.Message):
107
            self._call_callback(msg.request_id(), msg.body())
108
        elif isinstance(msg, ua.Acknowledge):
109
            self._call_callback(0, msg)
110
        elif isinstance(msg, ua.ErrorMessage):
111
            self.logger.warning("Received an error: %s", msg)
112 1
        else:
113 1
            raise ua.UaError("Unsupported message type: %s", msg)
114 1
115 1
    def _call_callback(self, request_id, body):
116
        with self._lock:
117 1
            future = self._callbackmap.pop(request_id, None)
118
            if future is None:
119 1
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
120 1
        future.set_result(body)
121 1
122 1
    def _create_request_header(self, timeout=1000):
123 1
        hdr = ua.RequestHeader()
124 1
        hdr.AuthenticationToken = self.authentication_token
125 1
        self._request_handle += 1
126
        hdr.RequestHandle = self._request_handle
127 1
        hdr.TimeoutHint = timeout
128
        return hdr
129
130
    def connect_socket(self, host, port):
131 1
        """
132 1
        connect to server socket and start receiving thread
133 1
        """
134 1
        self.logger.info("opening connection")
135 1
        sock = socket.create_connection((host, port))
136
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
137 1
        self._socket = utils.SocketWrapper(sock)
138 1
        self.start()
139 1
140 1
    def disconnect_socket(self):
141 1
        self.logger.info("stop request")
142
        self._do_stop = True
143 1
        self._socket.socket.shutdown(socket.SHUT_WR)
144 1
        self._socket.socket.close()
145 1
146 1
    def send_hello(self, url):
147 1
        hello = ua.Hello()
148 1
        hello.EndpointUrl = url
149 1
        future = Future()
150 1
        with self._lock:
151 1
            self._callbackmap[0] = future
152 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
153
        self._socket.write(binmsg)
154 1
        ack = future.result(self.timeout)
155 1
        return ack
156 1
157 1
    def open_secure_channel(self, params):
158 1
        self.logger.info("open_secure_channel")
159
        request = ua.OpenSecureChannelRequest()
160 1
        request.Parameters = params
161 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
162 1
163 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
164
        response.ResponseHeader.ServiceResult.check()
165 1
        self._connection.set_channel(response.Parameters)
166
        return response.Parameters
167
168
    def close_secure_channel(self):
169
        """
170
        close secure channel. It seems to trigger a shutdown of socket
171 1
        in most servers, so be prepare to reconnect.
172 1
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
173 1
        """
174 1
        self.logger.info("close_secure_channel")
175
        request = ua.CloseSecureChannelRequest()
176 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
177 1
        with self._lock:
178
            # don't expect any more answers
179
            future.cancel()
180
            self._callbackmap.clear()
181
182 1
        # some servers send a response here, most do not ... so we ignore
183
184
185
class UaClient(object):
186
187
    """
188
    low level OPC-UA client.
189
190
    It implements (almost) all methods defined in opcua spec
191
    taking in argument the structures defined in opcua spec.
192
193
    In this Python implementation  most of the structures are defined in
194 1
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
195 1
    """
196
197 1
    def __init__(self, timeout=1):
198 1
        self.logger = logging.getLogger(__name__)
199 1
        # _publishcallbacks should be accessed in recv thread only
200 1
        self._publishcallbacks = {}
201
        self._timeout = timeout
202 1
        self._uasocket = None
203 1
        self._security_policy = ua.SecurityPolicy()
204
205 1
    def set_security(self, policy):
206
        self._security_policy = policy
207
208
    def connect_socket(self, host, port):
209 1
        """
210 1
        connect to server socket and start receiving thread
211
        """
212 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
213 1
        return self._uasocket.connect_socket(host, port)
214
215 1
    def disconnect_socket(self):
216 1
        return self._uasocket.disconnect_socket()
217
218 1
    def send_hello(self, url):
219 1
        return self._uasocket.send_hello(url)
220
221 1
    def open_secure_channel(self, params):
222
        return self._uasocket.open_secure_channel(params)
223
224
    def close_secure_channel(self):
225
        """
226 1
        close secure channel. It seems to trigger a shutdown of socket
227
        in most servers, so be prepare to reconnect
228 1
        """
229 1
        return self._uasocket.close_secure_channel()
230 1
231 1
    def create_session(self, parameters):
232 1
        self.logger.info("create_session")
233 1
        request = ua.CreateSessionRequest()
234 1
        request.Parameters = parameters
235 1
        data = self._uasocket.send_request(request)
236 1
        response = ua.CreateSessionResponse.from_binary(data)
237 1
        self.logger.debug(response)
238
        response.ResponseHeader.ServiceResult.check()
239 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
240 1
        return response.Parameters
241 1
242 1
    def activate_session(self, parameters):
243 1
        self.logger.info("activate_session")
244 1
        request = ua.ActivateSessionRequest()
245 1
        request.Parameters = parameters
246 1
        data = self._uasocket.send_request(request)
247 1
        response = ua.ActivateSessionResponse.from_binary(data)
248
        self.logger.debug(response)
249 1
        response.ResponseHeader.ServiceResult.check()
250 1
        return response.Parameters
251 1
252 1
    def close_session(self, deletesubscriptions):
253 1
        self.logger.info("close_session")
254 1
        request = ua.CloseSessionRequest()
255
        request.DeleteSubscriptions = deletesubscriptions
256
        data = self._uasocket.send_request(request)
257 1
        ua.CloseSessionResponse.from_binary(data)
258 1
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
259 1
260 1
    def browse(self, parameters):
261 1
        self.logger.info("browse")
262 1
        request = ua.BrowseRequest()
263 1
        request.Parameters = parameters
264 1
        data = self._uasocket.send_request(request)
265 1
        response = ua.BrowseResponse.from_binary(data)
266
        self.logger.debug(response)
267 1
        response.ResponseHeader.ServiceResult.check()
268 1
        return response.Results
269 1
270 1
    def read(self, parameters):
271 1
        self.logger.info("read")
272 1
        request = ua.ReadRequest()
273 1
        request.Parameters = parameters
274 1
        data = self._uasocket.send_request(request)
275
        response = ua.ReadResponse.from_binary(data)
276 1
        self.logger.debug(response)
277 1
        response.ResponseHeader.ServiceResult.check()
278
        # cast to Enum attributes that need to
279
        for idx, rv in enumerate(parameters.NodesToRead):
280
            if rv.AttributeId == ua.AttributeIds.NodeClass:
281 1
                dv = response.Results[idx]
282 1
                if dv.StatusCode.is_good():
283 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
284 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
285 1
                dv = response.Results[idx]
286
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
287 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
288 1
        return response.Results
289 1
290 1
    def write(self, params):
291 1
        self.logger.info("read")
292 1
        request = ua.WriteRequest()
293 1
        request.Parameters = params
294 1
        data = self._uasocket.send_request(request)
295 1
        response = ua.WriteResponse.from_binary(data)
296
        self.logger.debug(response)
297 1
        response.ResponseHeader.ServiceResult.check()
298 1
        return response.Results
299 1
300 1
    def get_endpoints(self, params):
301 1
        self.logger.info("get_endpoint")
302 1
        request = ua.GetEndpointsRequest()
303 1
        request.Parameters = params
304 1
        data = self._uasocket.send_request(request)
305 1
        response = ua.GetEndpointsResponse.from_binary(data)
306
        self.logger.debug(response)
307 1
        response.ResponseHeader.ServiceResult.check()
308 1
        return response.Endpoints
309 1
310 1
    def find_servers(self, params):
311 1
        self.logger.info("find_servers")
312 1
        request = ua.FindServersRequest()
313 1
        request.Parameters = params
314 1
        data = self._uasocket.send_request(request)
315 1
        response = ua.FindServersResponse.from_binary(data)
316
        self.logger.debug(response)
317 1
        response.ResponseHeader.ServiceResult.check()
318
        return response.Servers
319
320
    def find_servers_on_network(self, params):
321
        self.logger.info("find_servers_on_network")
322
        request = ua.FindServersOnNetworkRequest()
323
        request.Parameters = params
324
        data = self._uasocket.send_request(request)
325
        response = ua.FindServersOnNetworkResponse.from_binary(data)
326
        self.logger.debug(response)
327 1
        response.ResponseHeader.ServiceResult.check()
328 1
        return response.Parameters
329 1
330 1
    def register_server(self, registered_server):
331 1
        self.logger.info("register_server")
332 1
        request = ua.RegisterServerRequest()
333 1
        request.Server = registered_server
334 1
        data = self._uasocket.send_request(request)
335
        response = ua.RegisterServerResponse.from_binary(data)
336
        self.logger.debug(response)
337 1
        response.ResponseHeader.ServiceResult.check()
338
        # nothing to return for this service
339
340
    def register_server2(self, params):
341
        self.logger.info("register_server2")
342
        request = ua.RegisterServer2Request()
343
        request.Parameters = params
344
        data = self._uasocket.send_request(request)
345
        response = ua.RegisterServer2Response.from_binary(data)
346
        self.logger.debug(response)
347 1
        response.ResponseHeader.ServiceResult.check()
348 1
        return response.ConfigurationResults
349 1
350 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
351 1
        self.logger.info("translate_browsepath_to_nodeid")
352 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
353 1
        request.Parameters.BrowsePaths = browsepaths
354 1
        data = self._uasocket.send_request(request)
355 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
356
        self.logger.debug(response)
357 1
        response.ResponseHeader.ServiceResult.check()
358 1
        return response.Results
359 1
360 1
    def create_subscription(self, params, callback):
361 1
        self.logger.info("create_subscription")
362 1
        request = ua.CreateSubscriptionRequest()
363 1
        request.Parameters = params
364 1
        resp_fut = Future()
365
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
366 1
        self._uasocket.send_request(request, mycallbak)
367 1
        return resp_fut.result(self._timeout)
368 1
369 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
370 1
        self.logger.info("_create_subscription_callback")
371 1
        data = data_fut.result()
372 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
373 1
        self.logger.debug(response)
374
        response.ResponseHeader.ServiceResult.check()
375 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
376 1
        resp_fut.set_result(response.Parameters)
377 1
378 1
    def delete_subscriptions(self, subscriptionids):
379 1
        self.logger.info("delete_subscription")
380 1
        request = ua.DeleteSubscriptionsRequest()
381 1
        request.Parameters.SubscriptionIds = subscriptionids
382 1
        resp_fut = Future()
383
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
384 1
        self._uasocket.send_request(request, mycallbak)
385 1
        return resp_fut.result(self._timeout)
386 1
387 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
388 1
        self.logger.info("_delete_subscriptions_callback")
389 1
        data = data_fut.result()
390 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
391 1
        self.logger.debug(response)
392 1
        response.ResponseHeader.ServiceResult.check()
393
        for sid in subscriptionids:
394 1
            self._publishcallbacks.pop(sid)
395 1
        resp_fut.set_result(response.Results)
396 1
397 1
    def publish(self, acks=None):
398 1
        self.logger.info("publish")
399 1
        if acks is None:
400 1
            acks = []
401
        request = ua.PublishRequest()
402 1
        request.Parameters.SubscriptionAcknowledgements = acks
403 1
        # timeout could be set to 0 (= no timeout) but some servers do not support it
404 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
405 1
406 1
    def _call_publish_callback(self, future):
407 1
        self.logger.info("call_publish_callback")
408 1
        data = future.result()
409
410
        try:
411
            self._uasocket.check_answer(data, "while waiting for publish response")
412
        except BadTimeout: # Spec Part 4, 7.28
413 1
            self.publish()
414
            return
415
416 1
        try:
417 1
            response = ua.PublishResponse.from_binary(data)
418 1
            self.logger.debug(response)
419
        except Exception:
420
            # INFO: catching the exception here might be obsolete because we already
421
            #       catch BadTimeout above. However, it's not really clear what this code
422 1
            #       does so it stays in, doesn't seem to hurt.
423 1
            self.logger.exception("Error parsing notificatipn from server")
424 1
            self.publish([]) #send publish request ot server so he does stop sending notifications
425 1
            return
426 1
427 1
        try:
428 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
429 1
        except KeyError:
430 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
431
            return
432 1
433 1
        try:
434 1
            callback(response.Parameters)
435 1
        except Exception:  # we call client code, catch everything!
436 1
            self.logger.exception("Exception while calling user callback: %s")
437 1
438 1
    def create_monitored_items(self, params):
439 1
        self.logger.info("create_monitored_items")
440 1
        request = ua.CreateMonitoredItemsRequest()
441
        request.Parameters = params
442 1
        data = self._uasocket.send_request(request)
443 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
444 1
        self.logger.debug(response)
445 1
        response.ResponseHeader.ServiceResult.check()
446 1
        return response.Results
447 1
448 1
    def delete_monitored_items(self, params):
449 1
        self.logger.info("delete_monitored_items")
450 1
        request = ua.DeleteMonitoredItemsRequest()
451
        request.Parameters = params
452 1
        data = self._uasocket.send_request(request)
453 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
454 1
        self.logger.debug(response)
455 1
        response.ResponseHeader.ServiceResult.check()
456 1
        return response.Results
457 1
458 1
    def add_nodes(self, nodestoadd):
459 1
        self.logger.info("add_nodes")
460 1
        request = ua.AddNodesRequest()
461
        request.Parameters.NodesToAdd = nodestoadd
462 1
        data = self._uasocket.send_request(request)
463 1
        response = ua.AddNodesResponse.from_binary(data)
464 1
        self.logger.debug(response)
465 1
        response.ResponseHeader.ServiceResult.check()
466 1
        return response.Results
467 1
468 1
    def delete_nodes(self, nodestodelete):
469 1
        self.logger.info("delete_nodes")
470
        request = ua.DeleteNodesRequest()
471 1
        request.Parameters.NodesToDelete = nodestodelete
472
        data = self._uasocket.send_request(request)
473
        response = ua.DeleteNodesResponse.from_binary(data)
474
        self.logger.debug(response)
475
        response.ResponseHeader.ServiceResult.check()
476
        return response.Results
477
478
    def call(self, methodstocall):
479
        request = ua.CallRequest()
480
        request.Parameters.MethodsToCall = methodstocall
481
        data = self._uasocket.send_request(request)
482
        response = ua.CallResponse.from_binary(data)
483
        self.logger.debug(response)
484
        response.ResponseHeader.ServiceResult.check()
485
        return response.Results
486
487
    def history_read(self, params):
488
        self.logger.info("history_read")
489
        request = ua.HistoryReadRequest()
490
        request.Parameters = params
491
        data = self._uasocket.send_request(request)
492
        response = ua.HistoryReadResponse.from_binary(data)
493
        self.logger.debug(response)
494
        response.ResponseHeader.ServiceResult.check()
495
        return response.Results
496
497
    def modify_monitored_items(self, params):
498
        self.logger.info("modify_monitored_items")
499
        request = ua.ModifyMonitoredItemsRequest()
500
        request.Parameters = params
501
        data = self._uasocket.send_request(request)
502
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
503
        self.logger.debug(response)
504
        response.ResponseHeader.ServiceResult.check()
505
        return response.Results
506