Completed
Pull Request — master (#256)
by
unknown
05:40
created

UaClient   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 338
Duplicated Lines 0 %

Test Coverage

Coverage 85.71%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
dl 0
loc 338
ccs 210
cts 245
cp 0.8571
rs 8.3673
c 2
b 0
f 0
wmc 45

32 Methods

Rating   Name   Duplication   Size   Complexity  
A _delete_subscriptions_callback() 0 9 2
A browse() 0 9 1
A find_servers() 0 9 1
A close_secure_channel() 0 6 1
A translate_browsepaths_to_nodeids() 0 9 1
B read() 0 19 7
A find_servers_on_network() 0 9 1
A send_hello() 0 2 1
A set_security() 0 2 1
A close_session() 0 6 1
A create_subscription() 0 8 1
A history_read() 0 9 1
A delete_monitored_items() 0 9 1
A publish() 0 7 2
A register_server2() 0 9 1
A modify_monitored_items() 0 9 1
A open_secure_channel() 0 2 1
A add_nodes() 0 9 1
A create_monitored_items() 0 9 1
A register_server() 0 8 1
A get_endpoints() 0 9 1
A write() 0 9 1
A connect_socket() 0 6 1
A create_session() 0 10 1
A activate_session() 0 9 1
B _call_publish_callback() 0 49 6
A delete_nodes() 0 9 1
A delete_subscriptions() 0 8 1
A call() 0 8 1
A disconnect_socket() 0 2 1
A _create_subscription_callback() 0 8 1
A __init__() 0 7 1

How to fix   Complexity   

Complex Class

Complex classes like UaClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
from opcua.common.uaerrors import UaError
14
from opcua.common.uaerrors import UaStatusCodeError
15 1
from opcua.ua.status_codes import StatusCodes
16
17
18
class UASocketClient(object):
19
    """
20 1
    handle socket connection and send ua messages
21 1
    timeout is the timeout used while waiting for an ua answer from server
22 1
    """
23 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
24 1
        self.logger = logging.getLogger(__name__ + ".Socket")
25 1
        self._thread = None
26 1
        self._lock = Lock()
27 1
        self.timeout = timeout
28 1
        self._socket = None
29 1
        self._do_stop = False
30 1
        self.authentication_token = ua.NodeId()
31 1
        self._request_id = 0
32
        self._request_handle = 0
33 1
        self._callbackmap = {}
34
        self._connection = ua.SecureConnection(security_policy)
35
36
    def start(self):
37
        """
38
        Start receiving thread.
39 1
        this is called automatically in connect and
40 1
        should not be necessary to call directly
41
        """
42 1
        self._thread = Thread(target=self._run)
43
        self._thread.start()
44
45
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
46
        """
47
        send request to server, lower-level method
48 1
        timeout is the timeout written in ua header
49 1
        returns future
50 1
        """
51 1
        with self._lock:
52 1
            request.RequestHeader = self._create_request_header(timeout)
53
            self.logger.debug("Sending: %s", request)
54
            try:
55
                binreq = request.to_binary()
56
            except:
57
                # reset reqeust handle if any error
58 1
                # see self._create_request_header
59 1
                self._request_handle -= 1
60 1
                raise
61 1
            self._request_id += 1
62 1
            future = Future()
63 1
            if callback:
64 1
                future.add_done_callback(callback)
65 1
            self._callbackmap[self._request_id] = future
66
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
67 1
            self._socket.write(msg)
68
        return future
69
70
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
71
        """
72
        send request to server.
73 1
        timeout is the timeout written in ua header
74 1
        returns response object if no callback is provided
75 1
        """
76 1
        future = self._send_request(request, callback, timeout, message_type)
77 1
        if not callback:
78
            data = future.result(self.timeout)
79 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
80 1
            return data
81 1
82 1
    def check_answer(self, data, context):
83 1
        data = data.copy()
84 1
        typeid = ua.NodeId.from_binary(data)
85 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
86
            self.logger.warning("ServiceFault from server received: %s", context)
87 1
            hdr = ua.ResponseHeader.from_binary(data)
88
            hdr.ServiceResult.check()
89 1
            return False
90 1
        return True
91 1
92 1
    def _run(self):
93 1
        self.logger.info("Thread started")
94 1
        while not self._do_stop:
95 1
            try:
96 1
                self._receive()
97 1
            except ua.utils.SocketClosedException:
98
                self.logger.info("Socket has closed connection")
99 1
                break
100 1
            except UaError:
101 1
                self.logger.exception("Protocol Error")
102
        self.logger.info("Thread ended")
103 1
104 1
    def _receive(self):
105 1
        msg = self._connection.receive_from_socket(self._socket)
106 1
        if msg is None:
107
            return
108
        elif isinstance(msg, ua.Message):
109
            self._call_callback(msg.request_id(), msg.body())
110
        elif isinstance(msg, ua.Acknowledge):
111
            self._call_callback(0, msg)
112 1
        elif isinstance(msg, ua.ErrorMessage):
113 1
            self.logger.warning("Received an error: %s", msg)
114 1
        else:
115 1
            raise ua.UaError("Unsupported message type: %s", msg)
116
117 1
    def _call_callback(self, request_id, body):
118
        with self._lock:
119 1
            future = self._callbackmap.pop(request_id, None)
120 1
            if future is None:
121 1
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
122 1
        future.set_result(body)
123 1
124 1
    def _create_request_header(self, timeout=1000):
125 1
        hdr = ua.RequestHeader()
126
        hdr.AuthenticationToken = self.authentication_token
127 1
        self._request_handle += 1
128
        hdr.RequestHandle = self._request_handle
129
        hdr.TimeoutHint = timeout
130
        return hdr
131 1
132 1
    def connect_socket(self, host, port):
133 1
        """
134 1
        connect to server socket and start receiving thread
135 1
        """
136
        self.logger.info("opening connection")
137 1
        sock = socket.create_connection((host, port))
138 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
139 1
        self._socket = utils.SocketWrapper(sock)
140 1
        self.start()
141 1
142
    def disconnect_socket(self):
143 1
        self.logger.info("stop request")
144 1
        self._do_stop = True
145 1
        self._socket.socket.shutdown(socket.SHUT_WR)
146 1
        self._socket.socket.close()
147 1
148 1
    def send_hello(self, url):
149 1
        hello = ua.Hello()
150 1
        hello.EndpointUrl = url
151 1
        future = Future()
152 1
        with self._lock:
153
            self._callbackmap[0] = future
154 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
155 1
        self._socket.write(binmsg)
156 1
        ack = future.result(self.timeout)
157 1
        return ack
158 1
159
    def open_secure_channel(self, params):
160 1
        self.logger.info("open_secure_channel")
161 1
        request = ua.OpenSecureChannelRequest()
162 1
        request.Parameters = params
163 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
164
165 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
166
        response.ResponseHeader.ServiceResult.check()
167
        self._connection.set_channel(response.Parameters)
168
        return response.Parameters
169
170
    def close_secure_channel(self):
171 1
        """
172 1
        close secure channel. It seems to trigger a shutdown of socket
173 1
        in most servers, so be prepare to reconnect.
174 1
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
175
        """
176 1
        self.logger.info("close_secure_channel")
177 1
        request = ua.CloseSecureChannelRequest()
178
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
179
        with self._lock:
180
            # don't expect any more answers
181
            future.cancel()
182 1
            self._callbackmap.clear()
183
184
        # some servers send a response here, most do not ... so we ignore
185
186
187
class UaClient(object):
188
189
    """
190
    low level OPC-UA client.
191
192
    It implements (almost) all methods defined in opcua spec
193
    taking in argument the structures defined in opcua spec.
194 1
195 1
    In this Python implementation  most of the structures are defined in
196
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
197 1
    """
198 1
199 1
    def __init__(self, timeout=1):
200 1
        self.logger = logging.getLogger(__name__)
201
        # _publishcallbacks should be accessed in recv thread only
202 1
        self._publishcallbacks = {}
203 1
        self._timeout = timeout
204
        self._uasocket = None
205 1
        self._security_policy = ua.SecurityPolicy()
206
207
    def set_security(self, policy):
208
        self._security_policy = policy
209 1
210 1
    def connect_socket(self, host, port):
211
        """
212 1
        connect to server socket and start receiving thread
213 1
        """
214
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
215 1
        return self._uasocket.connect_socket(host, port)
216 1
217
    def disconnect_socket(self):
218 1
        return self._uasocket.disconnect_socket()
219 1
220
    def send_hello(self, url):
221 1
        return self._uasocket.send_hello(url)
222
223
    def open_secure_channel(self, params):
224
        return self._uasocket.open_secure_channel(params)
225
226 1
    def close_secure_channel(self):
227
        """
228 1
        close secure channel. It seems to trigger a shutdown of socket
229 1
        in most servers, so be prepare to reconnect
230 1
        """
231 1
        return self._uasocket.close_secure_channel()
232 1
233 1
    def create_session(self, parameters):
234 1
        self.logger.info("create_session")
235 1
        request = ua.CreateSessionRequest()
236 1
        request.Parameters = parameters
237 1
        data = self._uasocket.send_request(request)
238
        response = ua.CreateSessionResponse.from_binary(data)
239 1
        self.logger.debug(response)
240 1
        response.ResponseHeader.ServiceResult.check()
241 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
242 1
        return response.Parameters
243 1
244 1
    def activate_session(self, parameters):
245 1
        self.logger.info("activate_session")
246 1
        request = ua.ActivateSessionRequest()
247 1
        request.Parameters = parameters
248
        data = self._uasocket.send_request(request)
249 1
        response = ua.ActivateSessionResponse.from_binary(data)
250 1
        self.logger.debug(response)
251 1
        response.ResponseHeader.ServiceResult.check()
252 1
        return response.Parameters
253 1
254 1
    def close_session(self, deletesubscriptions):
255
        self.logger.info("close_session")
256
        request = ua.CloseSessionRequest()
257 1
        request.DeleteSubscriptions = deletesubscriptions
258 1
        data = self._uasocket.send_request(request)
259 1
        ua.CloseSessionResponse.from_binary(data)
260 1
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
261 1
262 1
    def browse(self, parameters):
263 1
        self.logger.info("browse")
264 1
        request = ua.BrowseRequest()
265 1
        request.Parameters = parameters
266
        data = self._uasocket.send_request(request)
267 1
        response = ua.BrowseResponse.from_binary(data)
268 1
        self.logger.debug(response)
269 1
        response.ResponseHeader.ServiceResult.check()
270 1
        return response.Results
271 1
272 1
    def read(self, parameters):
273 1
        self.logger.info("read")
274 1
        request = ua.ReadRequest()
275
        request.Parameters = parameters
276 1
        data = self._uasocket.send_request(request)
277 1
        response = ua.ReadResponse.from_binary(data)
278
        self.logger.debug(response)
279
        response.ResponseHeader.ServiceResult.check()
280
        # cast to Enum attributes that need to
281 1
        for idx, rv in enumerate(parameters.NodesToRead):
282 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
283 1
                dv = response.Results[idx]
284 1
                if dv.StatusCode.is_good():
285 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
286
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
287 1
                dv = response.Results[idx]
288 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
289 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
290 1
        return response.Results
291 1
292 1
    def write(self, params):
293 1
        self.logger.info("read")
294 1
        request = ua.WriteRequest()
295 1
        request.Parameters = params
296
        data = self._uasocket.send_request(request)
297 1
        response = ua.WriteResponse.from_binary(data)
298 1
        self.logger.debug(response)
299 1
        response.ResponseHeader.ServiceResult.check()
300 1
        return response.Results
301 1
302 1
    def get_endpoints(self, params):
303 1
        self.logger.info("get_endpoint")
304 1
        request = ua.GetEndpointsRequest()
305 1
        request.Parameters = params
306
        data = self._uasocket.send_request(request)
307 1
        response = ua.GetEndpointsResponse.from_binary(data)
308 1
        self.logger.debug(response)
309 1
        response.ResponseHeader.ServiceResult.check()
310 1
        return response.Endpoints
311 1
312 1
    def find_servers(self, params):
313 1
        self.logger.info("find_servers")
314 1
        request = ua.FindServersRequest()
315 1
        request.Parameters = params
316
        data = self._uasocket.send_request(request)
317 1
        response = ua.FindServersResponse.from_binary(data)
318
        self.logger.debug(response)
319
        response.ResponseHeader.ServiceResult.check()
320
        return response.Servers
321
322
    def find_servers_on_network(self, params):
323
        self.logger.info("find_servers_on_network")
324
        request = ua.FindServersOnNetworkRequest()
325
        request.Parameters = params
326
        data = self._uasocket.send_request(request)
327 1
        response = ua.FindServersOnNetworkResponse.from_binary(data)
328 1
        self.logger.debug(response)
329 1
        response.ResponseHeader.ServiceResult.check()
330 1
        return response.Parameters
331 1
332 1
    def register_server(self, registered_server):
333 1
        self.logger.info("register_server")
334 1
        request = ua.RegisterServerRequest()
335
        request.Server = registered_server
336
        data = self._uasocket.send_request(request)
337 1
        response = ua.RegisterServerResponse.from_binary(data)
338
        self.logger.debug(response)
339
        response.ResponseHeader.ServiceResult.check()
340
        # nothing to return for this service
341
342
    def register_server2(self, params):
343
        self.logger.info("register_server2")
344
        request = ua.RegisterServer2Request()
345
        request.Parameters = params
346
        data = self._uasocket.send_request(request)
347 1
        response = ua.RegisterServer2Response.from_binary(data)
348 1
        self.logger.debug(response)
349 1
        response.ResponseHeader.ServiceResult.check()
350 1
        return response.ConfigurationResults
351 1
352 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
353 1
        self.logger.info("translate_browsepath_to_nodeid")
354 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
355 1
        request.Parameters.BrowsePaths = browsepaths
356
        data = self._uasocket.send_request(request)
357 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
358 1
        self.logger.debug(response)
359 1
        response.ResponseHeader.ServiceResult.check()
360 1
        return response.Results
361 1
362 1
    def create_subscription(self, params, callback):
363 1
        self.logger.info("create_subscription")
364 1
        request = ua.CreateSubscriptionRequest()
365
        request.Parameters = params
366 1
        resp_fut = Future()
367 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
368 1
        self._uasocket.send_request(request, mycallbak)
369 1
        return resp_fut.result(self._timeout)
370 1
371 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
372 1
        self.logger.info("_create_subscription_callback")
373 1
        data = data_fut.result()
374
        response = ua.CreateSubscriptionResponse.from_binary(data)
375 1
        self.logger.debug(response)
376 1
        response.ResponseHeader.ServiceResult.check()
377 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
378 1
        resp_fut.set_result(response.Parameters)
379 1
380 1
    def delete_subscriptions(self, subscriptionids):
381 1
        self.logger.info("delete_subscription")
382 1
        request = ua.DeleteSubscriptionsRequest()
383
        request.Parameters.SubscriptionIds = subscriptionids
384 1
        resp_fut = Future()
385 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
386 1
        self._uasocket.send_request(request, mycallbak)
387 1
        return resp_fut.result(self._timeout)
388 1
389 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
390 1
        self.logger.info("_delete_subscriptions_callback")
391 1
        data = data_fut.result()
392 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
393
        self.logger.debug(response)
394 1
        response.ResponseHeader.ServiceResult.check()
395 1
        for sid in subscriptionids:
396 1
            self._publishcallbacks.pop(sid)
397 1
        resp_fut.set_result(response.Results)
398 1
399 1
    def publish(self, acks=None):
400 1
        self.logger.info("publish")
401
        if acks is None:
402 1
            acks = []
403 1
        request = ua.PublishRequest()
404 1
        request.Parameters.SubscriptionAcknowledgements = acks
405 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
406 1
407 1
    def _call_publish_callback(self, future):
408 1
        self.logger.info("call_publish_callback")
409
        data = future.result()
410
411
        # check if answer looks ok
412
        try:
413 1
            self._uasocket.check_answer(data, "during waiting for publish response")
414
        except UaStatusCodeError as e:
415
            if e.code == StatusCodes.BadNoSubscription:
416 1
                self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
417 1
                # Spec Part 5, Paragraph 13.8.1
418 1
                # BadNoSubscription is expected after deleting the last subscription.
419
                #
420
                # We should therefore also check for len(self._publishcallbacks) == 0, but
421
                # this gets us into trouble if a Publish response arrives before the
422 1
                # DeleteSubscription response.
423 1
                #
424 1
                # We could remove the callback already when sending the DeleteSubscription request,
425 1
                # but there are some legitimate reasons to keep them around, such as when the server
426 1
                # responds with "BadTimeout" and we should try again later instead of just removing
427 1
                # the subscription client-side.
428 1
                #
429 1
                # There are a variety of ways to act correctly, but the most practical solution seems
430 1
                # to be to just ignore any BadNoSubscription responses.
431
                return
432 1
            else:
433 1
                raise
434 1
435 1
        # parse publish response
436 1
        try:
437 1
            response = ua.PublishResponse.from_binary(data)
438 1
            self.logger.debug(response)
439 1
        except Exception:
440 1
            self.logger.exception("Error parsing notificatipn from server")
441
            self.publish([]) #send publish request ot server so he does stop sending notifications
442 1
            return
443 1
444 1
        # look for callback
445 1
        try:
446 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
447 1
        except KeyError:
448 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
449 1
            return
450 1
451
        # do callback
452 1
        try:
453 1
            callback(response.Parameters)
454 1
        except Exception:  # we call client code, catch everything!
455 1
            self.logger.exception("Exception while calling user callback: %s")
456 1
457 1
    def create_monitored_items(self, params):
458 1
        self.logger.info("create_monitored_items")
459 1
        request = ua.CreateMonitoredItemsRequest()
460 1
        request.Parameters = params
461
        data = self._uasocket.send_request(request)
462 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
463 1
        self.logger.debug(response)
464 1
        response.ResponseHeader.ServiceResult.check()
465 1
        return response.Results
466 1
467 1
    def delete_monitored_items(self, params):
468 1
        self.logger.info("delete_monitored_items")
469 1
        request = ua.DeleteMonitoredItemsRequest()
470
        request.Parameters = params
471 1
        data = self._uasocket.send_request(request)
472
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
473
        self.logger.debug(response)
474
        response.ResponseHeader.ServiceResult.check()
475
        return response.Results
476
477
    def add_nodes(self, nodestoadd):
478
        self.logger.info("add_nodes")
479
        request = ua.AddNodesRequest()
480
        request.Parameters.NodesToAdd = nodestoadd
481
        data = self._uasocket.send_request(request)
482
        response = ua.AddNodesResponse.from_binary(data)
483
        self.logger.debug(response)
484
        response.ResponseHeader.ServiceResult.check()
485
        return response.Results
486
487
    def delete_nodes(self, nodestodelete):
488
        self.logger.info("delete_nodes")
489
        request = ua.DeleteNodesRequest()
490
        request.Parameters.NodesToDelete = nodestodelete
491
        data = self._uasocket.send_request(request)
492
        response = ua.DeleteNodesResponse.from_binary(data)
493
        self.logger.debug(response)
494
        response.ResponseHeader.ServiceResult.check()
495
        return response.Results
496
497
    def call(self, methodstocall):
498
        request = ua.CallRequest()
499
        request.Parameters.MethodsToCall = methodstocall
500
        data = self._uasocket.send_request(request)
501
        response = ua.CallResponse.from_binary(data)
502
        self.logger.debug(response)
503
        response.ResponseHeader.ServiceResult.check()
504
        return response.Results
505
506
    def history_read(self, params):
507
        self.logger.info("history_read")
508
        request = ua.HistoryReadRequest()
509
        request.Parameters = params
510
        data = self._uasocket.send_request(request)
511
        response = ua.HistoryReadResponse.from_binary(data)
512
        self.logger.debug(response)
513
        response.ResponseHeader.ServiceResult.check()
514
        return response.Results
515
516
    def modify_monitored_items(self, params):
517
        self.logger.info("modify_monitored_items")
518
        request = ua.ModifyMonitoredItemsRequest()
519
        request.Parameters = params
520
        data = self._uasocket.send_request(request)
521
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
522
        self.logger.debug(response)
523
        response.ResponseHeader.ServiceResult.check()
524
        return response.Results
525