Completed
Pull Request — master (#316)
by
unknown
02:38
created

UaClient   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 348
Duplicated Lines 0 %

Test Coverage

Coverage 85.71%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 46
c 3
b 0
f 0
dl 0
loc 348
ccs 210
cts 245
cp 0.8571
rs 8.3999

How to fix   Complexity   

Complex Class

Complex classes like UaClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
from opcua.common.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14
15 1
16
class UASocketClient(object):
17
    """
18
    handle socket connection and send ua messages
19
    timeout is the timeout used while waiting for an ua answer from server
20 1
    """
21 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), socket_timeout=10):
22 1
        self.logger = logging.getLogger(__name__ + ".Socket")
23 1
        self._thread = None
24 1
        self._lock = Lock()
25 1
        self.timeout = timeout
26 1
        self.socket_timeout = socket_timeout
27 1
        self._socket = None
28 1
        self._do_stop = False
29 1
        self.authentication_token = ua.NodeId()
30 1
        self._request_id = 0
31 1
        self._request_handle = 0
32
        self._callbackmap = {}
33 1
        self._connection = ua.SecureConnection(security_policy)
34
35
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39 1
        should not be necessary to call directly
40 1
        """
41
        self._thread = Thread(target=self._run)
42 1
        self._thread.start()
43
44
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48 1
        returns future
49 1
        """
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            self.logger.debug("Sending: %s", request)
53
            try:
54
                binreq = request.to_binary()
55
            except:
56
                # reset reqeust handle if any error
57
                # see self._create_request_header
58 1
                self._request_handle -= 1
59 1
                raise
60 1
            self._request_id += 1
61 1
            future = Future()
62 1
            if callback:
63 1
                future.add_done_callback(callback)
64 1
            self._callbackmap[self._request_id] = future
65 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
66
            self._socket.write(msg)
67 1
        return future
68
69
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70
        """
71
        send request to server.
72
        timeout is the timeout written in ua header
73 1
        returns response object if no callback is provided
74 1
        """
75 1
        future = self._send_request(request, callback, timeout, message_type)
76 1
        if not callback:
77 1
            data = future.result(self.timeout)
78
            self.check_answer(data, " in response to " + request.__class__.__name__)
79 1
            return data
80 1
81 1
    def check_answer(self, data, context):
82 1
        data = data.copy()
83 1
        typeid = ua.NodeId.from_binary(data)
84 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
85 1
            self.logger.warning("ServiceFault from server received %s", context)
86
            hdr = ua.ResponseHeader.from_binary(data)
87 1
            hdr.ServiceResult.check()
88
            return False
89 1
        return True
90 1
91 1
    def _run(self):
92 1
        self.logger.info("Thread started")
93 1
        while not self._do_stop:
94 1
            try:
95 1
                self._receive()
96 1
            except ua.utils.SocketClosedException:
97 1
                self.logger.info("Socket has closed connection")
98
                break
99 1
            except UaError:
100 1
                self.logger.exception("Protocol Error")
101 1
        self.logger.info("Thread ended")
102
103 1
    def _receive(self):
104 1
        msg = self._connection.receive_from_socket(self._socket)
105 1
        if msg is None:
106 1
            return
107
        elif isinstance(msg, ua.Message):
108
            self._call_callback(msg.request_id(), msg.body())
109
        elif isinstance(msg, ua.Acknowledge):
110
            self._call_callback(0, msg)
111
        elif isinstance(msg, ua.ErrorMessage):
112 1
            self.logger.warning("Received an error: %s", msg)
113 1
        else:
114 1
            raise ua.UaError("Unsupported message type: %s", msg)
115 1
116
    def _call_callback(self, request_id, body):
117 1
        with self._lock:
118
            future = self._callbackmap.pop(request_id, None)
119 1
            if future is None:
120 1
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
121 1
        future.set_result(body)
122 1
123 1
    def _create_request_header(self, timeout=1000):
124 1
        hdr = ua.RequestHeader()
125 1
        hdr.AuthenticationToken = self.authentication_token
126
        self._request_handle += 1
127 1
        hdr.RequestHandle = self._request_handle
128
        hdr.TimeoutHint = timeout
129
        return hdr
130
131 1
    def connect_socket(self, host, port):
132 1
        """
133 1
        connect to server socket and start receiving thread
134 1
        """
135 1
        self.logger.info("opening connection")
136
        sock = socket.create_connection((host, port), timeout=self.socket_timeout)
137 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
138 1
        self._socket = utils.SocketWrapper(sock)
139 1
        self.start()
140 1
141 1
    def disconnect_socket(self):
142
        self.logger.info("stop request")
143 1
        self._do_stop = True
144 1
        self._socket.socket.shutdown(socket.SHUT_WR)
145 1
        self._socket.socket.close()
146 1
147 1
    def send_hello(self, url):
148 1
        hello = ua.Hello()
149 1
        hello.EndpointUrl = url
150 1
        future = Future()
151 1
        with self._lock:
152 1
            self._callbackmap[0] = future
153
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
154 1
        self._socket.write(binmsg)
155 1
        ack = future.result(self.timeout)
156 1
        return ack
157 1
158 1
    def open_secure_channel(self, params):
159
        self.logger.info("open_secure_channel")
160 1
        request = ua.OpenSecureChannelRequest()
161 1
        request.Parameters = params
162 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
163 1
164
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
165 1
        response.ResponseHeader.ServiceResult.check()
166
        self._connection.set_channel(response.Parameters)
167
        return response.Parameters
168
169
    def close_secure_channel(self):
170
        """
171 1
        close secure channel. It seems to trigger a shutdown of socket
172 1
        in most servers, so be prepare to reconnect.
173 1
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
174 1
        """
175
        self.logger.info("close_secure_channel")
176 1
        request = ua.CloseSecureChannelRequest()
177 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
178
        with self._lock:
179
            # don't expect any more answers
180
            future.cancel()
181
            self._callbackmap.clear()
182 1
183
        # some servers send a response here, most do not ... so we ignore
184
185
186
class UaClient(object):
187
188
    """
189
    low level OPC-UA client.
190
191
    It implements (almost) all methods defined in opcua spec
192
    taking in argument the structures defined in opcua spec.
193
194 1
    In this Python implementation  most of the structures are defined in
195 1
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
196
    """
197 1
198 1
    def __init__(self, timeout=1, socket_timeout=10):
199 1
        self.logger = logging.getLogger(__name__)
200 1
        # _publishcallbacks should be accessed in recv thread only
201
        self._publishcallbacks = {}
202 1
        self._timeout = timeout
203 1
        self._socket_timeout = socket_timeout
204
        self._uasocket = None
205 1
        self._security_policy = ua.SecurityPolicy()
206
207
    def set_security(self, policy):
208
        self._security_policy = policy
209 1
210 1
    def connect_socket(self, host, port):
211
        """
212 1
        connect to server socket and start receiving thread
213 1
        """
214
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy, self._socket_timeout)
215 1
        return self._uasocket.connect_socket(host, port)
216 1
217
    def disconnect_socket(self):
218 1
        return self._uasocket.disconnect_socket()
219 1
220
    def send_hello(self, url):
221 1
        return self._uasocket.send_hello(url)
222
223
    def open_secure_channel(self, params):
224
        return self._uasocket.open_secure_channel(params)
225
226 1
    def close_secure_channel(self):
227
        """
228 1
        close secure channel. It seems to trigger a shutdown of socket
229 1
        in most servers, so be prepare to reconnect
230 1
        """
231 1
        return self._uasocket.close_secure_channel()
232 1
233 1
    def create_session(self, parameters):
234 1
        self.logger.info("create_session")
235 1
        request = ua.CreateSessionRequest()
236 1
        request.Parameters = parameters
237 1
        data = self._uasocket.send_request(request)
238
        response = ua.CreateSessionResponse.from_binary(data)
239 1
        self.logger.debug(response)
240 1
        response.ResponseHeader.ServiceResult.check()
241 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
242 1
        return response.Parameters
243 1
244 1
    def activate_session(self, parameters):
245 1
        self.logger.info("activate_session")
246 1
        request = ua.ActivateSessionRequest()
247 1
        request.Parameters = parameters
248
        data = self._uasocket.send_request(request)
249 1
        response = ua.ActivateSessionResponse.from_binary(data)
250 1
        self.logger.debug(response)
251 1
        response.ResponseHeader.ServiceResult.check()
252 1
        return response.Parameters
253 1
254 1
    def close_session(self, deletesubscriptions):
255
        self.logger.info("close_session")
256
        request = ua.CloseSessionRequest()
257 1
        request.DeleteSubscriptions = deletesubscriptions
258 1
        data = self._uasocket.send_request(request)
259 1
        response = ua.CloseSessionResponse.from_binary(data)
260 1
        try:
261 1
            response.ResponseHeader.ServiceResult.check()
262 1
        except BadSessionClosed:
263 1
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
264 1
            #          we can just ignore it therefore.
265 1
            #          Alternatively we could make sure that there are no publish requests in flight when
266
            #          closing the session.
267 1
            pass
268 1
269 1
    def browse(self, parameters):
270 1
        self.logger.info("browse")
271 1
        request = ua.BrowseRequest()
272 1
        request.Parameters = parameters
273 1
        data = self._uasocket.send_request(request)
274 1
        response = ua.BrowseResponse.from_binary(data)
275
        self.logger.debug(response)
276 1
        response.ResponseHeader.ServiceResult.check()
277 1
        return response.Results
278
279
    def read(self, parameters):
280
        self.logger.info("read")
281 1
        request = ua.ReadRequest()
282 1
        request.Parameters = parameters
283 1
        data = self._uasocket.send_request(request)
284 1
        response = ua.ReadResponse.from_binary(data)
285 1
        self.logger.debug(response)
286
        response.ResponseHeader.ServiceResult.check()
287 1
        # cast to Enum attributes that need to
288 1
        for idx, rv in enumerate(parameters.NodesToRead):
289 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
290 1
                dv = response.Results[idx]
291 1
                if dv.StatusCode.is_good():
292 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
293 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
294 1
                dv = response.Results[idx]
295 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
296
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
297 1
        return response.Results
298 1
299 1
    def write(self, params):
300 1
        self.logger.info("read")
301 1
        request = ua.WriteRequest()
302 1
        request.Parameters = params
303 1
        data = self._uasocket.send_request(request)
304 1
        response = ua.WriteResponse.from_binary(data)
305 1
        self.logger.debug(response)
306
        response.ResponseHeader.ServiceResult.check()
307 1
        return response.Results
308 1
309 1
    def get_endpoints(self, params):
310 1
        self.logger.info("get_endpoint")
311 1
        request = ua.GetEndpointsRequest()
312 1
        request.Parameters = params
313 1
        data = self._uasocket.send_request(request)
314 1
        response = ua.GetEndpointsResponse.from_binary(data)
315 1
        self.logger.debug(response)
316
        response.ResponseHeader.ServiceResult.check()
317 1
        return response.Endpoints
318
319
    def find_servers(self, params):
320
        self.logger.info("find_servers")
321
        request = ua.FindServersRequest()
322
        request.Parameters = params
323
        data = self._uasocket.send_request(request)
324
        response = ua.FindServersResponse.from_binary(data)
325
        self.logger.debug(response)
326
        response.ResponseHeader.ServiceResult.check()
327 1
        return response.Servers
328 1
329 1
    def find_servers_on_network(self, params):
330 1
        self.logger.info("find_servers_on_network")
331 1
        request = ua.FindServersOnNetworkRequest()
332 1
        request.Parameters = params
333 1
        data = self._uasocket.send_request(request)
334 1
        response = ua.FindServersOnNetworkResponse.from_binary(data)
335
        self.logger.debug(response)
336
        response.ResponseHeader.ServiceResult.check()
337 1
        return response.Parameters
338
339
    def register_server(self, registered_server):
340
        self.logger.info("register_server")
341
        request = ua.RegisterServerRequest()
342
        request.Server = registered_server
343
        data = self._uasocket.send_request(request)
344
        response = ua.RegisterServerResponse.from_binary(data)
345
        self.logger.debug(response)
346
        response.ResponseHeader.ServiceResult.check()
347 1
        # nothing to return for this service
348 1
349 1
    def register_server2(self, params):
350 1
        self.logger.info("register_server2")
351 1
        request = ua.RegisterServer2Request()
352 1
        request.Parameters = params
353 1
        data = self._uasocket.send_request(request)
354 1
        response = ua.RegisterServer2Response.from_binary(data)
355 1
        self.logger.debug(response)
356
        response.ResponseHeader.ServiceResult.check()
357 1
        return response.ConfigurationResults
358 1
359 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
360 1
        self.logger.info("translate_browsepath_to_nodeid")
361 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
362 1
        request.Parameters.BrowsePaths = browsepaths
363 1
        data = self._uasocket.send_request(request)
364 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
365
        self.logger.debug(response)
366 1
        response.ResponseHeader.ServiceResult.check()
367 1
        return response.Results
368 1
369 1
    def create_subscription(self, params, callback):
370 1
        self.logger.info("create_subscription")
371 1
        request = ua.CreateSubscriptionRequest()
372 1
        request.Parameters = params
373 1
        resp_fut = Future()
374
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
375 1
        self._uasocket.send_request(request, mycallbak)
376 1
        return resp_fut.result(self._timeout)
377 1
378 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
379 1
        self.logger.info("_create_subscription_callback")
380 1
        data = data_fut.result()
381 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
382 1
        self.logger.debug(response)
383
        response.ResponseHeader.ServiceResult.check()
384 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
385 1
        resp_fut.set_result(response.Parameters)
386 1
387 1
    def delete_subscriptions(self, subscriptionids):
388 1
        self.logger.info("delete_subscription")
389 1
        request = ua.DeleteSubscriptionsRequest()
390 1
        request.Parameters.SubscriptionIds = subscriptionids
391 1
        resp_fut = Future()
392 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
393
        self._uasocket.send_request(request, mycallbak)
394 1
        return resp_fut.result(self._timeout)
395 1
396 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
397 1
        self.logger.info("_delete_subscriptions_callback")
398 1
        data = data_fut.result()
399 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
400 1
        self.logger.debug(response)
401
        response.ResponseHeader.ServiceResult.check()
402 1
        for sid in subscriptionids:
403 1
            self._publishcallbacks.pop(sid)
404 1
        resp_fut.set_result(response.Results)
405 1
406 1
    def publish(self, acks=None):
407 1
        self.logger.info("publish")
408 1
        if acks is None:
409
            acks = []
410
        request = ua.PublishRequest()
411
        request.Parameters.SubscriptionAcknowledgements = acks
412
        # timeout could be set to 0 (= no timeout) but some servers do not support it
413 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
414
415
    def _call_publish_callback(self, future):
416 1
        self.logger.info("call_publish_callback")
417 1
        data = future.result()
418 1
419
        # check if answer looks ok
420
        try:
421
            self._uasocket.check_answer(data, "while waiting for publish response")
422 1
        except BadTimeout: # Spec Part 4, 7.28
423 1
            self.publish()
424 1
            return
425 1
        except BadNoSubscription: # Spec Part 5, 13.8.1
426 1
            # BadNoSubscription is expected after deleting the last subscription.
427 1
            #
428 1
            # We should therefore also check for len(self._publishcallbacks) == 0, but
429 1
            # this gets us into trouble if a Publish response arrives before the
430 1
            # DeleteSubscription response.
431
            #
432 1
            # We could remove the callback already when sending the DeleteSubscription request,
433 1
            # but there are some legitimate reasons to keep them around, such as when the server
434 1
            # responds with "BadTimeout" and we should try again later instead of just removing
435 1
            # the subscription client-side.
436 1
            #
437 1
            # There are a variety of ways to act correctly, but the most practical solution seems
438 1
            # to be to just ignore any BadNoSubscription responses.
439 1
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
440 1
            return
441
442 1
        # parse publish response
443 1
        try:
444 1
            response = ua.PublishResponse.from_binary(data)
445 1
            self.logger.debug(response)
446 1
        except Exception:
447 1
            # INFO: catching the exception here might be obsolete because we already
448 1
            #       catch BadTimeout above. However, it's not really clear what this code
449 1
            #       does so it stays in, doesn't seem to hurt.
450 1
            self.logger.exception("Error parsing notificatipn from server")
451
            self.publish([]) #send publish request ot server so he does stop sending notifications
452 1
            return
453 1
454 1
        # look for callback
455 1
        try:
456 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
457 1
        except KeyError:
458 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
459 1
            return
460 1
461
        # do callback
462 1
        try:
463 1
            callback(response.Parameters)
464 1
        except Exception:  # we call client code, catch everything!
465 1
            self.logger.exception("Exception while calling user callback: %s")
466 1
467 1
    def create_monitored_items(self, params):
468 1
        self.logger.info("create_monitored_items")
469 1
        request = ua.CreateMonitoredItemsRequest()
470
        request.Parameters = params
471 1
        data = self._uasocket.send_request(request)
472
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
473
        self.logger.debug(response)
474
        response.ResponseHeader.ServiceResult.check()
475
        return response.Results
476
477
    def delete_monitored_items(self, params):
478
        self.logger.info("delete_monitored_items")
479
        request = ua.DeleteMonitoredItemsRequest()
480
        request.Parameters = params
481
        data = self._uasocket.send_request(request)
482
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
483
        self.logger.debug(response)
484
        response.ResponseHeader.ServiceResult.check()
485
        return response.Results
486
487
    def add_nodes(self, nodestoadd):
488
        self.logger.info("add_nodes")
489
        request = ua.AddNodesRequest()
490
        request.Parameters.NodesToAdd = nodestoadd
491
        data = self._uasocket.send_request(request)
492
        response = ua.AddNodesResponse.from_binary(data)
493
        self.logger.debug(response)
494
        response.ResponseHeader.ServiceResult.check()
495
        return response.Results
496
497
    def delete_nodes(self, params):
498
        self.logger.info("delete_nodes")
499
        request = ua.DeleteNodesRequest()
500
        request.Parameters = params
501
        data = self._uasocket.send_request(request)
502
        response = ua.DeleteNodesResponse.from_binary(data)
503
        self.logger.debug(response)
504
        response.ResponseHeader.ServiceResult.check()
505
        return response.Results
506
507
    def call(self, methodstocall):
508
        request = ua.CallRequest()
509
        request.Parameters.MethodsToCall = methodstocall
510
        data = self._uasocket.send_request(request)
511
        response = ua.CallResponse.from_binary(data)
512
        self.logger.debug(response)
513
        response.ResponseHeader.ServiceResult.check()
514
        return response.Results
515
516
    def history_read(self, params):
517
        self.logger.info("history_read")
518
        request = ua.HistoryReadRequest()
519
        request.Parameters = params
520
        data = self._uasocket.send_request(request)
521
        response = ua.HistoryReadResponse.from_binary(data)
522
        self.logger.debug(response)
523
        response.ResponseHeader.ServiceResult.check()
524
        return response.Results
525
526
    def modify_monitored_items(self, params):
527
        self.logger.info("modify_monitored_items")
528
        request = ua.ModifyMonitoredItemsRequest()
529
        request.Parameters = params
530
        data = self._uasocket.send_request(request)
531
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
532
        self.logger.debug(response)
533
        response.ResponseHeader.ServiceResult.check()
534
        return response.Results
535