Completed
Push — master ( 41c04b...6083f5 )
by Olivier
03:26
created

UASocketClient   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 167
Duplicated Lines 0 %

Test Coverage

Coverage 90.83%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 167
ccs 109
cts 120
cp 0.9083
rs 10
wmc 30

14 Methods

Rating   Name   Duplication   Size   Complexity  
A close_secure_channel() 0 13 2
A _call_callback() 0 6 3
A check_answer() 0 9 2
A send_hello() 0 10 2
A open_secure_channel() 0 12 1
A _create_request_header() 0 7 1
A __init__() 0 12 1
B _receive() 0 12 5
A start() 0 8 1
A disconnect_socket() 0 5 1
A _run() 0 11 4
B _send_request() 0 24 4
A connect_socket() 0 9 1
A send_request() 0 11 2
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14
15
16 1
class UASocketClient(object):
17
    """
18
    handle socket connection and send ua messages
19
    timeout is the timeout used while waiting for an ua answer from server
20
    """
21 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
22 1
        self.logger = logging.getLogger(__name__ + ".Socket")
23 1
        self._thread = None
24 1
        self._lock = Lock()
25 1
        self.timeout = timeout
26 1
        self._socket = None
27 1
        self._do_stop = False
28 1
        self.authentication_token = ua.NodeId()
29 1
        self._request_id = 0
30 1
        self._request_handle = 0
31 1
        self._callbackmap = {}
32 1
        self._connection = ua.SecureConnection(security_policy)
33
34 1
    def start(self):
35
        """
36
        Start receiving thread.
37
        this is called automatically in connect and
38
        should not be necessary to call directly
39
        """
40 1
        self._thread = Thread(target=self._run)
41 1
        self._thread.start()
42
43 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            self.logger.debug("Sending: %s", request)
52 1
            try:
53 1
                binreq = request.to_binary()
54
            except:
55
                # reset reqeust handle if any error
56
                # see self._create_request_header
57
                self._request_handle -= 1
58
                raise
59 1
            self._request_id += 1
60 1
            future = Future()
61 1
            if callback:
62 1
                future.add_done_callback(callback)
63 1
            self._callbackmap[self._request_id] = future
64 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
65 1
            self._socket.write(msg)
66 1
        return future
67
68 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
69
        """
70
        send request to server.
71
        timeout is the timeout written in ua header
72
        returns response object if no callback is provided
73
        """
74 1
        future = self._send_request(request, callback, timeout, message_type)
75 1
        if not callback:
76 1
            data = future.result(self.timeout)
77 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
78 1
            return data
79
80 1
    def check_answer(self, data, context):
81 1
        data = data.copy()
82 1
        typeid = ua.NodeId.from_binary(data)
83 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
84 1
            self.logger.warning("ServiceFault from server received %s", context)
85 1
            hdr = ua.ResponseHeader.from_binary(data)
86 1
            hdr.ServiceResult.check()
87
            return False
88 1
        return True
89
90 1
    def _run(self):
91 1
        self.logger.info("Thread started")
92 1
        while not self._do_stop:
93 1
            try:
94 1
                self._receive()
95 1
            except ua.utils.SocketClosedException:
96 1
                self.logger.info("Socket has closed connection")
97 1
                break
98
            except UaError:
99
                self.logger.exception("Protocol Error")
100 1
        self.logger.info("Thread ended")
101
102 1
    def _receive(self):
103 1
        msg = self._connection.receive_from_socket(self._socket)
104 1
        if msg is None:
105
            return
106 1
        elif isinstance(msg, ua.Message):
107 1
            self._call_callback(msg.request_id(), msg.body())
108 1
        elif isinstance(msg, ua.Acknowledge):
109 1
            self._call_callback(0, msg)
110
        elif isinstance(msg, ua.ErrorMessage):
111
            self.logger.warning("Received an error: %s", msg)
112
        else:
113
            raise ua.UaError("Unsupported message type: %s", msg)
114
115 1
    def _call_callback(self, request_id, body):
116 1
        with self._lock:
117 1
            future = self._callbackmap.pop(request_id, None)
118 1
            if future is None:
119
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
120 1
        future.set_result(body)
121
122 1
    def _create_request_header(self, timeout=1000):
123 1
        hdr = ua.RequestHeader()
124 1
        hdr.AuthenticationToken = self.authentication_token
125 1
        self._request_handle += 1
126 1
        hdr.RequestHandle = self._request_handle
127 1
        hdr.TimeoutHint = timeout
128 1
        return hdr
129
130 1
    def connect_socket(self, host, port):
131
        """
132
        connect to server socket and start receiving thread
133
        """
134 1
        self.logger.info("opening connection")
135 1
        sock = socket.create_connection((host, port))
136 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
137 1
        self._socket = utils.SocketWrapper(sock)
138 1
        self.start()
139
140 1
    def disconnect_socket(self):
141 1
        self.logger.info("stop request")
142 1
        self._do_stop = True
143 1
        self._socket.socket.shutdown(socket.SHUT_WR)
144 1
        self._socket.socket.close()
145
146 1
    def send_hello(self, url):
147 1
        hello = ua.Hello()
148 1
        hello.EndpointUrl = url
149 1
        future = Future()
150 1
        with self._lock:
151 1
            self._callbackmap[0] = future
152 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
153 1
        self._socket.write(binmsg)
154 1
        ack = future.result(self.timeout)
155 1
        return ack
156
157 1
    def open_secure_channel(self, params):
158 1
        self.logger.info("open_secure_channel")
159 1
        request = ua.OpenSecureChannelRequest()
160 1
        request.Parameters = params
161 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
162
        
163
        # FIXME: we have a race condition here
164
        # we can get a packet with the new token id before we reach to store it..
165 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
166 1
        response.ResponseHeader.ServiceResult.check()
167 1
        self._connection.set_channel(response.Parameters)
168 1
        return response.Parameters
169
170 1
    def close_secure_channel(self):
171
        """
172
        close secure channel. It seems to trigger a shutdown of socket
173
        in most servers, so be prepare to reconnect.
174
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
175
        """
176 1
        self.logger.info("close_secure_channel")
177 1
        request = ua.CloseSecureChannelRequest()
178 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
179 1
        with self._lock:
180
            # don't expect any more answers
181 1
            future.cancel()
182 1
            self._callbackmap.clear()
183
184
        # some servers send a response here, most do not ... so we ignore
185
186
187 1
class UaClient(object):
188
189
    """
190
    low level OPC-UA client.
191
192
    It implements (almost) all methods defined in opcua spec
193
    taking in argument the structures defined in opcua spec.
194
195
    In this Python implementation  most of the structures are defined in
196
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
197
    """
198
199 1
    def __init__(self, timeout=1):
200 1
        self.logger = logging.getLogger(__name__)
201
        # _publishcallbacks should be accessed in recv thread only
202 1
        self._publishcallbacks = {}
203 1
        self._timeout = timeout
204 1
        self._uasocket = None
205 1
        self._security_policy = ua.SecurityPolicy()
206
207 1
    def set_security(self, policy):
208 1
        self._security_policy = policy
209
210 1
    def connect_socket(self, host, port):
211
        """
212
        connect to server socket and start receiving thread
213
        """
214 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
215 1
        return self._uasocket.connect_socket(host, port)
216
217 1
    def disconnect_socket(self):
218 1
        return self._uasocket.disconnect_socket()
219
220 1
    def send_hello(self, url):
221 1
        return self._uasocket.send_hello(url)
222
223 1
    def open_secure_channel(self, params):
224 1
        return self._uasocket.open_secure_channel(params)
225
226 1
    def close_secure_channel(self):
227
        """
228
        close secure channel. It seems to trigger a shutdown of socket
229
        in most servers, so be prepare to reconnect
230
        """
231 1
        return self._uasocket.close_secure_channel()
232
233 1
    def create_session(self, parameters):
234 1
        self.logger.info("create_session")
235 1
        request = ua.CreateSessionRequest()
236 1
        request.Parameters = parameters
237 1
        data = self._uasocket.send_request(request)
238 1
        response = ua.CreateSessionResponse.from_binary(data)
239 1
        self.logger.debug(response)
240 1
        response.ResponseHeader.ServiceResult.check()
241 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
242 1
        return response.Parameters
243
244 1
    def activate_session(self, parameters):
245 1
        self.logger.info("activate_session")
246 1
        request = ua.ActivateSessionRequest()
247 1
        request.Parameters = parameters
248 1
        data = self._uasocket.send_request(request)
249 1
        response = ua.ActivateSessionResponse.from_binary(data)
250 1
        self.logger.debug(response)
251 1
        response.ResponseHeader.ServiceResult.check()
252 1
        return response.Parameters
253
254 1
    def close_session(self, deletesubscriptions):
255 1
        self.logger.info("close_session")
256 1
        request = ua.CloseSessionRequest()
257 1
        request.DeleteSubscriptions = deletesubscriptions
258 1
        data = self._uasocket.send_request(request)
259 1
        response = ua.CloseSessionResponse.from_binary(data)
260 1
        try:
261 1
            response.ResponseHeader.ServiceResult.check()
262
        except BadSessionClosed:
263
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
264
            #          we can just ignore it therefore.
265
            #          Alternatively we could make sure that there are no publish requests in flight when
266
            #          closing the session.
267
            pass
268
269 1
    def browse(self, parameters):
270 1
        self.logger.info("browse")
271 1
        request = ua.BrowseRequest()
272 1
        request.Parameters = parameters
273 1
        data = self._uasocket.send_request(request)
274 1
        response = ua.BrowseResponse.from_binary(data)
275 1
        self.logger.debug(response)
276 1
        response.ResponseHeader.ServiceResult.check()
277 1
        return response.Results
278
279 1
    def read(self, parameters):
280 1
        self.logger.info("read")
281 1
        request = ua.ReadRequest()
282 1
        request.Parameters = parameters
283 1
        data = self._uasocket.send_request(request)
284 1
        response = ua.ReadResponse.from_binary(data)
285 1
        self.logger.debug(response)
286 1
        response.ResponseHeader.ServiceResult.check()
287
        # cast to Enum attributes that need to
288 1
        for idx, rv in enumerate(parameters.NodesToRead):
289 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
290 1
                dv = response.Results[idx]
291 1
                if dv.StatusCode.is_good():
292 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
293 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
294 1
                dv = response.Results[idx]
295 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
296 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
297 1
        return response.Results
298
299 1
    def write(self, params):
300 1
        self.logger.info("read")
301 1
        request = ua.WriteRequest()
302 1
        request.Parameters = params
303 1
        data = self._uasocket.send_request(request)
304 1
        response = ua.WriteResponse.from_binary(data)
305 1
        self.logger.debug(response)
306 1
        response.ResponseHeader.ServiceResult.check()
307 1
        return response.Results
308
309 1
    def get_endpoints(self, params):
310 1
        self.logger.info("get_endpoint")
311 1
        request = ua.GetEndpointsRequest()
312 1
        request.Parameters = params
313 1
        data = self._uasocket.send_request(request)
314 1
        response = ua.GetEndpointsResponse.from_binary(data)
315 1
        self.logger.debug(response)
316 1
        response.ResponseHeader.ServiceResult.check()
317 1
        return response.Endpoints
318
319 1
    def find_servers(self, params):
320 1
        self.logger.info("find_servers")
321 1
        request = ua.FindServersRequest()
322 1
        request.Parameters = params
323 1
        data = self._uasocket.send_request(request)
324 1
        response = ua.FindServersResponse.from_binary(data)
325 1
        self.logger.debug(response)
326 1
        response.ResponseHeader.ServiceResult.check()
327 1
        return response.Servers
328
329 1
    def find_servers_on_network(self, params):
330
        self.logger.info("find_servers_on_network")
331
        request = ua.FindServersOnNetworkRequest()
332
        request.Parameters = params
333
        data = self._uasocket.send_request(request)
334
        response = ua.FindServersOnNetworkResponse.from_binary(data)
335
        self.logger.debug(response)
336
        response.ResponseHeader.ServiceResult.check()
337
        return response.Parameters
338
339 1
    def register_server(self, registered_server):
340 1
        self.logger.info("register_server")
341 1
        request = ua.RegisterServerRequest()
342 1
        request.Server = registered_server
343 1
        data = self._uasocket.send_request(request)
344 1
        response = ua.RegisterServerResponse.from_binary(data)
345 1
        self.logger.debug(response)
346 1
        response.ResponseHeader.ServiceResult.check()
347
        # nothing to return for this service
348
349 1
    def register_server2(self, params):
350
        self.logger.info("register_server2")
351
        request = ua.RegisterServer2Request()
352
        request.Parameters = params
353
        data = self._uasocket.send_request(request)
354
        response = ua.RegisterServer2Response.from_binary(data)
355
        self.logger.debug(response)
356
        response.ResponseHeader.ServiceResult.check()
357
        return response.ConfigurationResults
358
359 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
360 1
        self.logger.info("translate_browsepath_to_nodeid")
361 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
362 1
        request.Parameters.BrowsePaths = browsepaths
363 1
        data = self._uasocket.send_request(request)
364 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
365 1
        self.logger.debug(response)
366 1
        response.ResponseHeader.ServiceResult.check()
367 1
        return response.Results
368
369 1
    def create_subscription(self, params, callback):
370 1
        self.logger.info("create_subscription")
371 1
        request = ua.CreateSubscriptionRequest()
372 1
        request.Parameters = params
373 1
        resp_fut = Future()
374 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
375 1
        self._uasocket.send_request(request, mycallbak)
376 1
        return resp_fut.result(self._timeout)
377
378 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
379 1
        self.logger.info("_create_subscription_callback")
380 1
        data = data_fut.result()
381 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
382 1
        self.logger.debug(response)
383 1
        response.ResponseHeader.ServiceResult.check()
384 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
385 1
        resp_fut.set_result(response.Parameters)
386
387 1
    def delete_subscriptions(self, subscriptionids):
388 1
        self.logger.info("delete_subscription")
389 1
        request = ua.DeleteSubscriptionsRequest()
390 1
        request.Parameters.SubscriptionIds = subscriptionids
391 1
        resp_fut = Future()
392 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
393 1
        self._uasocket.send_request(request, mycallbak)
394 1
        return resp_fut.result(self._timeout)
395
396 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
397 1
        self.logger.info("_delete_subscriptions_callback")
398 1
        data = data_fut.result()
399 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
400 1
        self.logger.debug(response)
401 1
        response.ResponseHeader.ServiceResult.check()
402 1
        for sid in subscriptionids:
403 1
            self._publishcallbacks.pop(sid)
404 1
        resp_fut.set_result(response.Results)
405
406 1
    def publish(self, acks=None):
407 1
        self.logger.info("publish")
408 1
        if acks is None:
409 1
            acks = []
410 1
        request = ua.PublishRequest()
411 1
        request.Parameters.SubscriptionAcknowledgements = acks
412
        # timeout could be set to 0 (= no timeout) but some servers do not support it
413 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
414
415 1
    def _call_publish_callback(self, future):
416 1
        self.logger.info("call_publish_callback")
417 1
        data = future.result()
418
419
        # check if answer looks ok
420 1
        try:
421 1
            self._uasocket.check_answer(data, "while waiting for publish response")
422
        except BadTimeout: # Spec Part 4, 7.28
423
            self.publish()
424
            return
425
        except BadNoSubscription: # Spec Part 5, 13.8.1
426
            # BadNoSubscription is expected after deleting the last subscription.
427
            #
428
            # We should therefore also check for len(self._publishcallbacks) == 0, but
429
            # this gets us into trouble if a Publish response arrives before the
430
            # DeleteSubscription response.
431
            #
432
            # We could remove the callback already when sending the DeleteSubscription request,
433
            # but there are some legitimate reasons to keep them around, such as when the server
434
            # responds with "BadTimeout" and we should try again later instead of just removing
435
            # the subscription client-side.
436
            #
437
            # There are a variety of ways to act correctly, but the most practical solution seems
438
            # to be to just ignore any BadNoSubscription responses.
439
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
440
            return
441
442
        # parse publish response
443 1
        try:
444 1
            response = ua.PublishResponse.from_binary(data)
445 1
            self.logger.debug(response)
446
        except Exception:
447
            # INFO: catching the exception here might be obsolete because we already
448
            #       catch BadTimeout above. However, it's not really clear what this code
449
            #       does so it stays in, doesn't seem to hurt.
450
            self.logger.exception("Error parsing notificatipn from server")
451
            self.publish([]) #send publish request ot server so he does stop sending notifications
452
            return
453
454
        # look for callback
455 1
        try:
456 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
457
        except KeyError:
458
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
459
            return
460
461
        # do callback
462 1
        try:
463 1
            callback(response.Parameters)
464
        except Exception:  # we call client code, catch everything!
465
            self.logger.exception("Exception while calling user callback: %s")
466
467 1
    def create_monitored_items(self, params):
468 1
        self.logger.info("create_monitored_items")
469 1
        request = ua.CreateMonitoredItemsRequest()
470 1
        request.Parameters = params
471 1
        data = self._uasocket.send_request(request)
472 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
473 1
        self.logger.debug(response)
474 1
        response.ResponseHeader.ServiceResult.check()
475 1
        return response.Results
476
477 1
    def delete_monitored_items(self, params):
478 1
        self.logger.info("delete_monitored_items")
479 1
        request = ua.DeleteMonitoredItemsRequest()
480 1
        request.Parameters = params
481 1
        data = self._uasocket.send_request(request)
482 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
483 1
        self.logger.debug(response)
484 1
        response.ResponseHeader.ServiceResult.check()
485 1
        return response.Results
486
487 1
    def add_nodes(self, nodestoadd):
488 1
        self.logger.info("add_nodes")
489 1
        request = ua.AddNodesRequest()
490 1
        request.Parameters.NodesToAdd = nodestoadd
491 1
        data = self._uasocket.send_request(request)
492 1
        response = ua.AddNodesResponse.from_binary(data)
493 1
        self.logger.debug(response)
494 1
        response.ResponseHeader.ServiceResult.check()
495 1
        return response.Results
496
497 1
    def delete_nodes(self, params):
498 1
        self.logger.info("delete_nodes")
499 1
        request = ua.DeleteNodesRequest()
500 1
        request.Parameters = params
501 1
        data = self._uasocket.send_request(request)
502 1
        response = ua.DeleteNodesResponse.from_binary(data)
503 1
        self.logger.debug(response)
504 1
        response.ResponseHeader.ServiceResult.check()
505 1
        return response.Results
506
507 1
    def call(self, methodstocall):
508 1
        request = ua.CallRequest()
509 1
        request.Parameters.MethodsToCall = methodstocall
510 1
        data = self._uasocket.send_request(request)
511 1
        response = ua.CallResponse.from_binary(data)
512 1
        self.logger.debug(response)
513 1
        response.ResponseHeader.ServiceResult.check()
514 1
        return response.Results
515
516 1
    def history_read(self, params):
517
        self.logger.info("history_read")
518
        request = ua.HistoryReadRequest()
519
        request.Parameters = params
520
        data = self._uasocket.send_request(request)
521
        response = ua.HistoryReadResponse.from_binary(data)
522
        self.logger.debug(response)
523
        response.ResponseHeader.ServiceResult.check()
524
        return response.Results
525
526 1
    def modify_monitored_items(self, params):
527
        self.logger.info("modify_monitored_items")
528
        request = ua.ModifyMonitoredItemsRequest()
529
        request.Parameters = params
530
        data = self._uasocket.send_request(request)
531
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
532
        self.logger.debug(response)
533
        response.ResponseHeader.ServiceResult.check()
534
        return response.Results
535