Completed
Push — master ( 051888...eb5e78 )
by Olivier
04:06
created

UaClient   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 348
Duplicated Lines 0 %

Test Coverage

Coverage 85.71%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 46
c 3
b 0
f 0
dl 0
loc 348
ccs 210
cts 245
cp 0.8571
rs 8.3999

How to fix   Complexity   

Complex Class

Complex classes like UaClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
from opcua.common.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14
15 1
16
class UASocketClient(object):
17
    """
18
    handle socket connection and send ua messages
19
    timeout is the timeout used while waiting for an ua answer from server
20 1
    """
21 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
22 1
        self.logger = logging.getLogger(__name__ + ".Socket")
23 1
        self._thread = None
24 1
        self._lock = Lock()
25 1
        self.timeout = timeout
26 1
        self._socket = None
27 1
        self._do_stop = False
28 1
        self.authentication_token = ua.NodeId()
29 1
        self._request_id = 0
30 1
        self._request_handle = 0
31 1
        self._callbackmap = {}
32
        self._connection = ua.SecureConnection(security_policy)
33 1
34
    def start(self):
35
        """
36
        Start receiving thread.
37
        this is called automatically in connect and
38
        should not be necessary to call directly
39 1
        """
40 1
        self._thread = Thread(target=self._run)
41
        self._thread.start()
42 1
43
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48 1
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            self.logger.debug("Sending: %s", request)
52 1
            try:
53
                binreq = request.to_binary()
54
            except:
55
                # reset reqeust handle if any error
56
                # see self._create_request_header
57
                self._request_handle -= 1
58 1
                raise
59 1
            self._request_id += 1
60 1
            future = Future()
61 1
            if callback:
62 1
                future.add_done_callback(callback)
63 1
            self._callbackmap[self._request_id] = future
64 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
65 1
            self._socket.write(msg)
66
        return future
67 1
68
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
69
        """
70
        send request to server.
71
        timeout is the timeout written in ua header
72
        returns response object if no callback is provided
73 1
        """
74 1
        future = self._send_request(request, callback, timeout, message_type)
75 1
        if not callback:
76 1
            data = future.result(self.timeout)
77 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
78
            return data
79 1
80 1
    def check_answer(self, data, context):
81 1
        data = data.copy()
82 1
        typeid = ua.NodeId.from_binary(data)
83 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
84 1
            self.logger.warning("ServiceFault from server received %s", context)
85 1
            hdr = ua.ResponseHeader.from_binary(data)
86
            hdr.ServiceResult.check()
87 1
            return False
88
        return True
89 1
90 1
    def _run(self):
91 1
        self.logger.info("Thread started")
92 1
        while not self._do_stop:
93 1
            try:
94 1
                self._receive()
95 1
            except ua.utils.SocketClosedException:
96 1
                self.logger.info("Socket has closed connection")
97 1
                break
98
            except UaError:
99 1
                self.logger.exception("Protocol Error")
100 1
        self.logger.info("Thread ended")
101 1
102
    def _receive(self):
103 1
        msg = self._connection.receive_from_socket(self._socket)
104 1
        if msg is None:
105 1
            return
106 1
        elif isinstance(msg, ua.Message):
107
            self._call_callback(msg.request_id(), msg.body())
108
        elif isinstance(msg, ua.Acknowledge):
109
            self._call_callback(0, msg)
110
        elif isinstance(msg, ua.ErrorMessage):
111
            self.logger.warning("Received an error: %s", msg)
112 1
        else:
113 1
            raise ua.UaError("Unsupported message type: %s", msg)
114 1
115 1
    def _call_callback(self, request_id, body):
116
        with self._lock:
117 1
            future = self._callbackmap.pop(request_id, None)
118
            if future is None:
119 1
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
120 1
        future.set_result(body)
121 1
122 1
    def _create_request_header(self, timeout=1000):
123 1
        hdr = ua.RequestHeader()
124 1
        hdr.AuthenticationToken = self.authentication_token
125 1
        self._request_handle += 1
126
        hdr.RequestHandle = self._request_handle
127 1
        hdr.TimeoutHint = timeout
128
        return hdr
129
130
    def connect_socket(self, host, port):
131 1
        """
132 1
        connect to server socket and start receiving thread
133 1
        """
134 1
        self.logger.info("opening connection")
135 1
        sock = socket.create_connection((host, port))
136
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
137 1
        self._socket = utils.SocketWrapper(sock)
138 1
        self.start()
139 1
140 1
    def disconnect_socket(self):
141 1
        self.logger.info("stop request")
142
        self._do_stop = True
143 1
        self._socket.socket.shutdown(socket.SHUT_WR)
144 1
        self._socket.socket.close()
145 1
146 1
    def send_hello(self, url):
147 1
        hello = ua.Hello()
148 1
        hello.EndpointUrl = url
149 1
        future = Future()
150 1
        with self._lock:
151 1
            self._callbackmap[0] = future
152 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
153
        self._socket.write(binmsg)
154 1
        ack = future.result(self.timeout)
155 1
        return ack
156 1
157 1
    def open_secure_channel(self, params):
158 1
        self.logger.info("open_secure_channel")
159
        request = ua.OpenSecureChannelRequest()
160 1
        request.Parameters = params
161 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
162 1
163 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
164
        response.ResponseHeader.ServiceResult.check()
165 1
        self._connection.set_channel(response.Parameters)
166
        return response.Parameters
167
168
    def close_secure_channel(self):
169
        """
170
        close secure channel. It seems to trigger a shutdown of socket
171 1
        in most servers, so be prepare to reconnect.
172 1
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
173 1
        """
174 1
        self.logger.info("close_secure_channel")
175
        request = ua.CloseSecureChannelRequest()
176 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
177 1
        with self._lock:
178
            # don't expect any more answers
179
            future.cancel()
180
            self._callbackmap.clear()
181
182 1
        # some servers send a response here, most do not ... so we ignore
183
184
185
class UaClient(object):
186
187
    """
188
    low level OPC-UA client.
189
190
    It implements (almost) all methods defined in opcua spec
191
    taking in argument the structures defined in opcua spec.
192
193
    In this Python implementation  most of the structures are defined in
194 1
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
195 1
    """
196
197 1
    def __init__(self, timeout=1):
198 1
        self.logger = logging.getLogger(__name__)
199 1
        # _publishcallbacks should be accessed in recv thread only
200 1
        self._publishcallbacks = {}
201
        self._timeout = timeout
202 1
        self._uasocket = None
203 1
        self._security_policy = ua.SecurityPolicy()
204
205 1
    def set_security(self, policy):
206
        self._security_policy = policy
207
208
    def connect_socket(self, host, port):
209 1
        """
210 1
        connect to server socket and start receiving thread
211
        """
212 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
213 1
        return self._uasocket.connect_socket(host, port)
214
215 1
    def disconnect_socket(self):
216 1
        return self._uasocket.disconnect_socket()
217
218 1
    def send_hello(self, url):
219 1
        return self._uasocket.send_hello(url)
220
221 1
    def open_secure_channel(self, params):
222
        return self._uasocket.open_secure_channel(params)
223
224
    def close_secure_channel(self):
225
        """
226 1
        close secure channel. It seems to trigger a shutdown of socket
227
        in most servers, so be prepare to reconnect
228 1
        """
229 1
        return self._uasocket.close_secure_channel()
230 1
231 1
    def create_session(self, parameters):
232 1
        self.logger.info("create_session")
233 1
        request = ua.CreateSessionRequest()
234 1
        request.Parameters = parameters
235 1
        data = self._uasocket.send_request(request)
236 1
        response = ua.CreateSessionResponse.from_binary(data)
237 1
        self.logger.debug(response)
238
        response.ResponseHeader.ServiceResult.check()
239 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
240 1
        return response.Parameters
241 1
242 1
    def activate_session(self, parameters):
243 1
        self.logger.info("activate_session")
244 1
        request = ua.ActivateSessionRequest()
245 1
        request.Parameters = parameters
246 1
        data = self._uasocket.send_request(request)
247 1
        response = ua.ActivateSessionResponse.from_binary(data)
248
        self.logger.debug(response)
249 1
        response.ResponseHeader.ServiceResult.check()
250 1
        return response.Parameters
251 1
252 1
    def close_session(self, deletesubscriptions):
253 1
        self.logger.info("close_session")
254 1
        request = ua.CloseSessionRequest()
255
        request.DeleteSubscriptions = deletesubscriptions
256
        data = self._uasocket.send_request(request)
257 1
        response = ua.CloseSessionResponse.from_binary(data)
258 1
        try:
259 1
            response.ResponseHeader.ServiceResult.check()
260 1
        except BadSessionClosed:
261 1
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
262 1
            #          we can just ignore it therefore.
263 1
            #          Alternatively we could make sure that there are no publish requests in flight when
264 1
            #          closing the session.
265 1
            pass
266
267 1
    def browse(self, parameters):
268 1
        self.logger.info("browse")
269 1
        request = ua.BrowseRequest()
270 1
        request.Parameters = parameters
271 1
        data = self._uasocket.send_request(request)
272 1
        response = ua.BrowseResponse.from_binary(data)
273 1
        self.logger.debug(response)
274 1
        response.ResponseHeader.ServiceResult.check()
275
        return response.Results
276 1
277 1
    def read(self, parameters):
278
        self.logger.info("read")
279
        request = ua.ReadRequest()
280
        request.Parameters = parameters
281 1
        data = self._uasocket.send_request(request)
282 1
        response = ua.ReadResponse.from_binary(data)
283 1
        self.logger.debug(response)
284 1
        response.ResponseHeader.ServiceResult.check()
285 1
        # cast to Enum attributes that need to
286
        for idx, rv in enumerate(parameters.NodesToRead):
287 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
288 1
                dv = response.Results[idx]
289 1
                if dv.StatusCode.is_good():
290 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
291 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
292 1
                dv = response.Results[idx]
293 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
294 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
295 1
        return response.Results
296
297 1
    def write(self, params):
298 1
        self.logger.info("read")
299 1
        request = ua.WriteRequest()
300 1
        request.Parameters = params
301 1
        data = self._uasocket.send_request(request)
302 1
        response = ua.WriteResponse.from_binary(data)
303 1
        self.logger.debug(response)
304 1
        response.ResponseHeader.ServiceResult.check()
305 1
        return response.Results
306
307 1
    def get_endpoints(self, params):
308 1
        self.logger.info("get_endpoint")
309 1
        request = ua.GetEndpointsRequest()
310 1
        request.Parameters = params
311 1
        data = self._uasocket.send_request(request)
312 1
        response = ua.GetEndpointsResponse.from_binary(data)
313 1
        self.logger.debug(response)
314 1
        response.ResponseHeader.ServiceResult.check()
315 1
        return response.Endpoints
316
317 1
    def find_servers(self, params):
318
        self.logger.info("find_servers")
319
        request = ua.FindServersRequest()
320
        request.Parameters = params
321
        data = self._uasocket.send_request(request)
322
        response = ua.FindServersResponse.from_binary(data)
323
        self.logger.debug(response)
324
        response.ResponseHeader.ServiceResult.check()
325
        return response.Servers
326
327 1
    def find_servers_on_network(self, params):
328 1
        self.logger.info("find_servers_on_network")
329 1
        request = ua.FindServersOnNetworkRequest()
330 1
        request.Parameters = params
331 1
        data = self._uasocket.send_request(request)
332 1
        response = ua.FindServersOnNetworkResponse.from_binary(data)
333 1
        self.logger.debug(response)
334 1
        response.ResponseHeader.ServiceResult.check()
335
        return response.Parameters
336
337 1
    def register_server(self, registered_server):
338
        self.logger.info("register_server")
339
        request = ua.RegisterServerRequest()
340
        request.Server = registered_server
341
        data = self._uasocket.send_request(request)
342
        response = ua.RegisterServerResponse.from_binary(data)
343
        self.logger.debug(response)
344
        response.ResponseHeader.ServiceResult.check()
345
        # nothing to return for this service
346
347 1
    def register_server2(self, params):
348 1
        self.logger.info("register_server2")
349 1
        request = ua.RegisterServer2Request()
350 1
        request.Parameters = params
351 1
        data = self._uasocket.send_request(request)
352 1
        response = ua.RegisterServer2Response.from_binary(data)
353 1
        self.logger.debug(response)
354 1
        response.ResponseHeader.ServiceResult.check()
355 1
        return response.ConfigurationResults
356
357 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
358 1
        self.logger.info("translate_browsepath_to_nodeid")
359 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
360 1
        request.Parameters.BrowsePaths = browsepaths
361 1
        data = self._uasocket.send_request(request)
362 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
363 1
        self.logger.debug(response)
364 1
        response.ResponseHeader.ServiceResult.check()
365
        return response.Results
366 1
367 1
    def create_subscription(self, params, callback):
368 1
        self.logger.info("create_subscription")
369 1
        request = ua.CreateSubscriptionRequest()
370 1
        request.Parameters = params
371 1
        resp_fut = Future()
372 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
373 1
        self._uasocket.send_request(request, mycallbak)
374
        return resp_fut.result(self._timeout)
375 1
376 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
377 1
        self.logger.info("_create_subscription_callback")
378 1
        data = data_fut.result()
379 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
380 1
        self.logger.debug(response)
381 1
        response.ResponseHeader.ServiceResult.check()
382 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
383
        resp_fut.set_result(response.Parameters)
384 1
385 1
    def delete_subscriptions(self, subscriptionids):
386 1
        self.logger.info("delete_subscription")
387 1
        request = ua.DeleteSubscriptionsRequest()
388 1
        request.Parameters.SubscriptionIds = subscriptionids
389 1
        resp_fut = Future()
390 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
391 1
        self._uasocket.send_request(request, mycallbak)
392 1
        return resp_fut.result(self._timeout)
393
394 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
395 1
        self.logger.info("_delete_subscriptions_callback")
396 1
        data = data_fut.result()
397 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
398 1
        self.logger.debug(response)
399 1
        response.ResponseHeader.ServiceResult.check()
400 1
        for sid in subscriptionids:
401
            self._publishcallbacks.pop(sid)
402 1
        resp_fut.set_result(response.Results)
403 1
404 1
    def publish(self, acks=None):
405 1
        self.logger.info("publish")
406 1
        if acks is None:
407 1
            acks = []
408 1
        request = ua.PublishRequest()
409
        request.Parameters.SubscriptionAcknowledgements = acks
410
        # timeout could be set to 0 (= no timeout) but some servers do not support it
411
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
412
413 1
    def _call_publish_callback(self, future):
414
        self.logger.info("call_publish_callback")
415
        data = future.result()
416 1
417 1
        # check if answer looks ok
418 1
        try:
419
            self._uasocket.check_answer(data, "while waiting for publish response")
420
        except BadTimeout: # Spec Part 4, 7.28
421
            self.publish()
422 1
            return
423 1
        except BadNoSubscription: # Spec Part 5, 13.8.1
424 1
            # BadNoSubscription is expected after deleting the last subscription.
425 1
            #
426 1
            # We should therefore also check for len(self._publishcallbacks) == 0, but
427 1
            # this gets us into trouble if a Publish response arrives before the
428 1
            # DeleteSubscription response.
429 1
            #
430 1
            # We could remove the callback already when sending the DeleteSubscription request,
431
            # but there are some legitimate reasons to keep them around, such as when the server
432 1
            # responds with "BadTimeout" and we should try again later instead of just removing
433 1
            # the subscription client-side.
434 1
            #
435 1
            # There are a variety of ways to act correctly, but the most practical solution seems
436 1
            # to be to just ignore any BadNoSubscription responses.
437 1
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
438 1
            return
439 1
440 1
        # parse publish response
441
        try:
442 1
            response = ua.PublishResponse.from_binary(data)
443 1
            self.logger.debug(response)
444 1
        except Exception:
445 1
            # INFO: catching the exception here might be obsolete because we already
446 1
            #       catch BadTimeout above. However, it's not really clear what this code
447 1
            #       does so it stays in, doesn't seem to hurt.
448 1
            self.logger.exception("Error parsing notificatipn from server")
449 1
            self.publish([]) #send publish request ot server so he does stop sending notifications
450 1
            return
451
452 1
        # look for callback
453 1
        try:
454 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
455 1
        except KeyError:
456 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
457 1
            return
458 1
459 1
        # do callback
460 1
        try:
461
            callback(response.Parameters)
462 1
        except Exception:  # we call client code, catch everything!
463 1
            self.logger.exception("Exception while calling user callback: %s")
464 1
465 1
    def create_monitored_items(self, params):
466 1
        self.logger.info("create_monitored_items")
467 1
        request = ua.CreateMonitoredItemsRequest()
468 1
        request.Parameters = params
469 1
        data = self._uasocket.send_request(request)
470
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
471 1
        self.logger.debug(response)
472
        response.ResponseHeader.ServiceResult.check()
473
        return response.Results
474
475
    def delete_monitored_items(self, params):
476
        self.logger.info("delete_monitored_items")
477
        request = ua.DeleteMonitoredItemsRequest()
478
        request.Parameters = params
479
        data = self._uasocket.send_request(request)
480
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
481
        self.logger.debug(response)
482
        response.ResponseHeader.ServiceResult.check()
483
        return response.Results
484
485
    def add_nodes(self, nodestoadd):
486
        self.logger.info("add_nodes")
487
        request = ua.AddNodesRequest()
488
        request.Parameters.NodesToAdd = nodestoadd
489
        data = self._uasocket.send_request(request)
490
        response = ua.AddNodesResponse.from_binary(data)
491
        self.logger.debug(response)
492
        response.ResponseHeader.ServiceResult.check()
493
        return response.Results
494
495
    def delete_nodes(self, params):
496
        self.logger.info("delete_nodes")
497
        request = ua.DeleteNodesRequest()
498
        request.Parameters = params
499
        data = self._uasocket.send_request(request)
500
        response = ua.DeleteNodesResponse.from_binary(data)
501
        self.logger.debug(response)
502
        response.ResponseHeader.ServiceResult.check()
503
        return response.Results
504
505
    def call(self, methodstocall):
506
        request = ua.CallRequest()
507
        request.Parameters.MethodsToCall = methodstocall
508
        data = self._uasocket.send_request(request)
509
        response = ua.CallResponse.from_binary(data)
510
        self.logger.debug(response)
511
        response.ResponseHeader.ServiceResult.check()
512
        return response.Results
513
514
    def history_read(self, params):
515
        self.logger.info("history_read")
516
        request = ua.HistoryReadRequest()
517
        request.Parameters = params
518
        data = self._uasocket.send_request(request)
519
        response = ua.HistoryReadResponse.from_binary(data)
520
        self.logger.debug(response)
521
        response.ResponseHeader.ServiceResult.check()
522
        return response.Results
523
524
    def modify_monitored_items(self, params):
525
        self.logger.info("modify_monitored_items")
526
        request = ua.ModifyMonitoredItemsRequest()
527
        request.Parameters = params
528
        data = self._uasocket.send_request(request)
529
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
530
        self.logger.debug(response)
531
        response.ResponseHeader.ServiceResult.check()
532
        return response.Results
533