Completed
Push — master ( 21c561...78d618 )
by Olivier
05:34
created

UaClient.disconnect_socket()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14 1
from opcua.common.connection import SecureConnection
15
16
17 1
class UASocketClient(object):
18
    """
19
    handle socket connection and send ua messages
20
    timeout is the timeout used while waiting for an ua answer from server
21
    """
22 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
23 1
        self.logger = logging.getLogger(__name__ + ".Socket")
24 1
        self._thread = None
25 1
        self._lock = Lock()
26 1
        self.timeout = timeout
27 1
        self._socket = None
28 1
        self._do_stop = False
29 1
        self.authentication_token = ua.NodeId()
30 1
        self._request_id = 0
31 1
        self._request_handle = 0
32 1
        self._callbackmap = {}
33 1
        self._connection = SecureConnection(security_policy)
34
35 1
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39
        should not be necessary to call directly
40
        """
41 1
        self._thread = Thread(target=self._run)
42 1
        self._thread.start()
43
44 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48
        returns future
49
        """
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            self.logger.debug("Sending: %s", request)
53 1
            try:
54 1
                binreq = struct_to_binary(request)
55
            except:
56
                # reset reqeust handle if any error
57
                # see self._create_request_header
58
                self._request_handle -= 1
59
                raise
60 1
            self._request_id += 1
61 1
            future = Future()
62 1
            if callback:
63 1
                future.add_done_callback(callback)
64 1
            self._callbackmap[self._request_id] = future
65 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
66 1
            self._socket.write(msg)
67 1
        return future
68
69 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70
        """
71
        send request to server.
72
        timeout is the timeout written in ua header
73
        returns response object if no callback is provided
74
        """
75 1
        future = self._send_request(request, callback, timeout, message_type)
76 1
        if not callback:
77 1
            data = future.result(self.timeout)
78 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
79 1
            return data
80
81 1
    def check_answer(self, data, context):
82 1
        data = data.copy()
83 1
        typeid = nodeid_from_binary(data)
84 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
85 1
            self.logger.warning("ServiceFault from server received %s", context)
86 1
            hdr = struct_from_binary(ua.ResponseHeader, data)
87 1
            hdr.ServiceResult.check()
88
            return False
89 1
        return True
90
91 1
    def _run(self):
92 1
        self.logger.info("Thread started")
93 1
        while not self._do_stop:
94 1
            try:
95 1
                self._receive()
96 1
            except ua.utils.SocketClosedException:
97 1
                self.logger.info("Socket has closed connection")
98 1
                break
99
            except UaError:
100
                self.logger.exception("Protocol Error")
101 1
        self.logger.info("Thread ended")
102
103 1
    def _receive(self):
104 1
        msg = self._connection.receive_from_socket(self._socket)
105 1
        if msg is None:
106
            return
107 1
        elif isinstance(msg, ua.Message):
108 1
            self._call_callback(msg.request_id(), msg.body())
109 1
        elif isinstance(msg, ua.Acknowledge):
110 1
            self._call_callback(0, msg)
111
        elif isinstance(msg, ua.ErrorMessage):
112
            self.logger.warning("Received an error: %s", msg)
113
        else:
114
            raise ua.UaError("Unsupported message type: %s", msg)
115
116 1
    def _call_callback(self, request_id, body):
117 1
        with self._lock:
118 1
            future = self._callbackmap.pop(request_id, None)
119 1
            if future is None:
120
                raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
121 1
        future.set_result(body)
122
123 1
    def _create_request_header(self, timeout=1000):
124 1
        hdr = ua.RequestHeader()
125 1
        hdr.AuthenticationToken = self.authentication_token
126 1
        self._request_handle += 1
127 1
        hdr.RequestHandle = self._request_handle
128 1
        hdr.TimeoutHint = timeout
129 1
        return hdr
130
131 1
    def connect_socket(self, host, port):
132
        """
133
        connect to server socket and start receiving thread
134
        """
135 1
        self.logger.info("opening connection")
136 1
        sock = socket.create_connection((host, port))
137 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
138 1
        self._socket = ua.utils.SocketWrapper(sock)
139 1
        self.start()
140
141 1
    def disconnect_socket(self):
142 1
        self.logger.info("stop request")
143 1
        self._do_stop = True
144 1
        self._socket.socket.shutdown(socket.SHUT_RDWR)
145 1
        self._socket.socket.close()
146
147 1
    def send_hello(self, url):
148 1
        hello = ua.Hello()
149 1
        hello.EndpointUrl = url
150 1
        future = Future()
151 1
        with self._lock:
152 1
            self._callbackmap[0] = future
153 1
        binmsg = uatcp_to_binary(ua.MessageType.Hello, hello)
154 1
        self._socket.write(binmsg)
155 1
        ack = future.result(self.timeout)
156 1
        return ack
157
158 1
    def open_secure_channel(self, params):
159 1
        self.logger.info("open_secure_channel")
160 1
        request = ua.OpenSecureChannelRequest()
161 1
        request.Parameters = params
162 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
163
        
164
        # FIXME: we have a race condition here
165
        # we can get a packet with the new token id before we reach to store it..
166 1
        response = struct_from_binary(ua.OpenSecureChannelResponse, future.result(self.timeout))
167 1
        response.ResponseHeader.ServiceResult.check()
168 1
        self._connection.set_channel(response.Parameters)
169 1
        return response.Parameters
170
171 1
    def close_secure_channel(self):
172
        """
173
        close secure channel. It seems to trigger a shutdown of socket
174
        in most servers, so be prepare to reconnect.
175
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
176
        """
177 1
        self.logger.info("close_secure_channel")
178 1
        request = ua.CloseSecureChannelRequest()
179 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
180 1
        with self._lock:
181
            # don't expect any more answers
182 1
            future.cancel()
183 1
            self._callbackmap.clear()
184
185
        # some servers send a response here, most do not ... so we ignore
186
187
188 1
class UaClient(object):
189
190
    """
191
    low level OPC-UA client.
192
193
    It implements (almost) all methods defined in opcua spec
194
    taking in argument the structures defined in opcua spec.
195
196
    In this Python implementation  most of the structures are defined in
197
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
198
    """
199
200 1
    def __init__(self, timeout=1):
201 1
        self.logger = logging.getLogger(__name__)
202
        # _publishcallbacks should be accessed in recv thread only
203 1
        self._publishcallbacks = {}
204 1
        self._timeout = timeout
205 1
        self._uasocket = None
206 1
        self.security_policy = ua.SecurityPolicy()
207
208 1
    def set_security(self, policy):
209 1
        self.security_policy = policy
210
211 1
    def connect_socket(self, host, port):
212
        """
213
        connect to server socket and start receiving thread
214
        """
215 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self.security_policy)
216 1
        return self._uasocket.connect_socket(host, port)
217
218 1
    def disconnect_socket(self):
219 1
        return self._uasocket.disconnect_socket()
220
221 1
    def send_hello(self, url):
222 1
        return self._uasocket.send_hello(url)
223
224 1
    def open_secure_channel(self, params):
225 1
        return self._uasocket.open_secure_channel(params)
226
227 1
    def close_secure_channel(self):
228
        """
229
        close secure channel. It seems to trigger a shutdown of socket
230
        in most servers, so be prepare to reconnect
231
        """
232 1
        return self._uasocket.close_secure_channel()
233
234 1
    def create_session(self, parameters):
235 1
        self.logger.info("create_session")
236 1
        request = ua.CreateSessionRequest()
237 1
        request.Parameters = parameters
238 1
        data = self._uasocket.send_request(request)
239 1
        response = struct_from_binary(ua.CreateSessionResponse, data)
240 1
        self.logger.debug(response)
241 1
        response.ResponseHeader.ServiceResult.check()
242 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
243 1
        return response.Parameters
244
245 1
    def activate_session(self, parameters):
246 1
        self.logger.info("activate_session")
247 1
        request = ua.ActivateSessionRequest()
248 1
        request.Parameters = parameters
249 1
        data = self._uasocket.send_request(request)
250 1
        response = struct_from_binary(ua.ActivateSessionResponse, data)
251 1
        self.logger.debug(response)
252 1
        response.ResponseHeader.ServiceResult.check()
253 1
        return response.Parameters
254
255 1
    def close_session(self, deletesubscriptions):
256 1
        self.logger.info("close_session")
257 1
        request = ua.CloseSessionRequest()
258 1
        request.DeleteSubscriptions = deletesubscriptions
259 1
        data = self._uasocket.send_request(request)
260 1
        response = struct_from_binary(ua.CloseSessionResponse, data)
261 1
        try:
262 1
            response.ResponseHeader.ServiceResult.check()
263
        except BadSessionClosed:
264
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
265
            #          we can just ignore it therefore.
266
            #          Alternatively we could make sure that there are no publish requests in flight when
267
            #          closing the session.
268
            pass
269
270 1
    def browse(self, parameters):
271 1
        self.logger.info("browse")
272 1
        request = ua.BrowseRequest()
273 1
        request.Parameters = parameters
274 1
        data = self._uasocket.send_request(request)
275 1
        response = struct_from_binary(ua.BrowseResponse, data)
276 1
        self.logger.debug(response)
277 1
        response.ResponseHeader.ServiceResult.check()
278 1
        return response.Results
279
280 1
    def browse_next(self, parameters):
281
        self.logger.info("browse next")
282
        request = ua.BrowseNextRequest()
283
        request.Parameters = parameters
284
        data = self._uasocket.send_request(request)
285
        response = struct_from_binary(ua.BrowseNextResponse, data)
286
        self.logger.debug(response)
287
        response.ResponseHeader.ServiceResult.check()
288
        return response.Parameters.Results
289
290 1
    def read(self, parameters):
291 1
        self.logger.info("read")
292 1
        request = ua.ReadRequest()
293 1
        request.Parameters = parameters
294 1
        data = self._uasocket.send_request(request)
295 1
        response = struct_from_binary(ua.ReadResponse, data)
296 1
        self.logger.debug(response)
297 1
        response.ResponseHeader.ServiceResult.check()
298
        # cast to Enum attributes that need to
299 1
        for idx, rv in enumerate(parameters.NodesToRead):
300 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
301 1
                dv = response.Results[idx]
302 1
                if dv.StatusCode.is_good():
303 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
304 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
305 1
                dv = response.Results[idx]
306 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
307 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
308 1
        return response.Results
309
310 1
    def write(self, params):
311 1
        self.logger.info("read")
312 1
        request = ua.WriteRequest()
313 1
        request.Parameters = params
314 1
        data = self._uasocket.send_request(request)
315 1
        response = struct_from_binary(ua.WriteResponse, data)
316 1
        self.logger.debug(response)
317 1
        response.ResponseHeader.ServiceResult.check()
318 1
        return response.Results
319
320 1
    def get_endpoints(self, params):
321 1
        self.logger.info("get_endpoint")
322 1
        request = ua.GetEndpointsRequest()
323 1
        request.Parameters = params
324 1
        data = self._uasocket.send_request(request)
325 1
        response = struct_from_binary(ua.GetEndpointsResponse, data)
326 1
        self.logger.debug(response)
327 1
        response.ResponseHeader.ServiceResult.check()
328 1
        return response.Endpoints
329
330 1
    def find_servers(self, params):
331 1
        self.logger.info("find_servers")
332 1
        request = ua.FindServersRequest()
333 1
        request.Parameters = params
334 1
        data = self._uasocket.send_request(request)
335 1
        response = struct_from_binary(ua.FindServersResponse, data)
336 1
        self.logger.debug(response)
337 1
        response.ResponseHeader.ServiceResult.check()
338 1
        return response.Servers
339
340 1
    def find_servers_on_network(self, params):
341
        self.logger.info("find_servers_on_network")
342
        request = ua.FindServersOnNetworkRequest()
343
        request.Parameters = params
344
        data = self._uasocket.send_request(request)
345
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
346
        self.logger.debug(response)
347
        response.ResponseHeader.ServiceResult.check()
348
        return response.Parameters
349
350 1
    def register_server(self, registered_server):
351 1
        self.logger.info("register_server")
352 1
        request = ua.RegisterServerRequest()
353 1
        request.Server = registered_server
354 1
        data = self._uasocket.send_request(request)
355 1
        response = struct_from_binary(ua.RegisterServerResponse, data)
356 1
        self.logger.debug(response)
357 1
        response.ResponseHeader.ServiceResult.check()
358
        # nothing to return for this service
359
360 1
    def register_server2(self, params):
361
        self.logger.info("register_server2")
362
        request = ua.RegisterServer2Request()
363
        request.Parameters = params
364
        data = self._uasocket.send_request(request)
365
        response = struct_from_binary(ua.RegisterServer2Response, data)
366
        self.logger.debug(response)
367
        response.ResponseHeader.ServiceResult.check()
368
        return response.ConfigurationResults
369
370 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
371 1
        self.logger.info("translate_browsepath_to_nodeid")
372 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
373 1
        request.Parameters.BrowsePaths = browsepaths
374 1
        data = self._uasocket.send_request(request)
375 1
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
376 1
        self.logger.debug(response)
377 1
        response.ResponseHeader.ServiceResult.check()
378 1
        return response.Results
379
380 1
    def create_subscription(self, params, callback):
381 1
        self.logger.info("create_subscription")
382 1
        request = ua.CreateSubscriptionRequest()
383 1
        request.Parameters = params
384 1
        resp_fut = Future()
385 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
386 1
        self._uasocket.send_request(request, mycallbak)
387 1
        return resp_fut.result(self._timeout)
388
389 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
390 1
        self.logger.info("_create_subscription_callback")
391 1
        data = data_fut.result()
392 1
        response = struct_from_binary(ua.CreateSubscriptionResponse, data)
393 1
        self.logger.debug(response)
394 1
        response.ResponseHeader.ServiceResult.check()
395 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
396 1
        resp_fut.set_result(response.Parameters)
397
398 1
    def delete_subscriptions(self, subscriptionids):
399 1
        self.logger.info("delete_subscription")
400 1
        request = ua.DeleteSubscriptionsRequest()
401 1
        request.Parameters.SubscriptionIds = subscriptionids
402 1
        resp_fut = Future()
403 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
404 1
        self._uasocket.send_request(request, mycallbak)
405 1
        return resp_fut.result(self._timeout)
406
407 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
408 1
        self.logger.info("_delete_subscriptions_callback")
409 1
        data = data_fut.result()
410 1
        response = struct_from_binary(ua.DeleteSubscriptionsResponse, data)
411 1
        self.logger.debug(response)
412 1
        response.ResponseHeader.ServiceResult.check()
413 1
        for sid in subscriptionids:
414 1
            self._publishcallbacks.pop(sid)
415 1
        resp_fut.set_result(response.Results)
416
417 1
    def publish(self, acks=None):
418 1
        self.logger.info("publish")
419 1
        if acks is None:
420 1
            acks = []
421 1
        request = ua.PublishRequest()
422 1
        request.Parameters.SubscriptionAcknowledgements = acks
423
        # timeout could be set to 0 (= no timeout) but some servers do not support it
424 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
425
426 1
    def _call_publish_callback(self, future):
427 1
        self.logger.info("call_publish_callback")
428 1
        data = future.result()
429
430
        # check if answer looks ok
431 1
        try:
432 1
            self._uasocket.check_answer(data, "while waiting for publish response")
433
        except BadTimeout: # Spec Part 4, 7.28
434
            self.publish()
435
            return
436
        except BadNoSubscription: # Spec Part 5, 13.8.1
437
            # BadNoSubscription is expected after deleting the last subscription.
438
            #
439
            # We should therefore also check for len(self._publishcallbacks) == 0, but
440
            # this gets us into trouble if a Publish response arrives before the
441
            # DeleteSubscription response.
442
            #
443
            # We could remove the callback already when sending the DeleteSubscription request,
444
            # but there are some legitimate reasons to keep them around, such as when the server
445
            # responds with "BadTimeout" and we should try again later instead of just removing
446
            # the subscription client-side.
447
            #
448
            # There are a variety of ways to act correctly, but the most practical solution seems
449
            # to be to just ignore any BadNoSubscription responses.
450
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
451
            return
452
453
        # parse publish response
454 1
        try:
455 1
            response = struct_from_binary(ua.PublishResponse, data)
456 1
            self.logger.debug(response)
457
        except Exception:
458
            # INFO: catching the exception here might be obsolete because we already
459
            #       catch BadTimeout above. However, it's not really clear what this code
460
            #       does so it stays in, doesn't seem to hurt.
461
            self.logger.exception("Error parsing notificatipn from server")
462
            self.publish([]) #send publish request ot server so he does stop sending notifications
463
            return
464
465
        # look for callback
466 1
        try:
467 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
468
        except KeyError:
469
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
470
            return
471
472
        # do callback
473 1
        try:
474 1
            callback(response.Parameters)
475
        except Exception:  # we call client code, catch everything!
476
            self.logger.exception("Exception while calling user callback: %s")
477
478 1
    def create_monitored_items(self, params):
479 1
        self.logger.info("create_monitored_items")
480 1
        request = ua.CreateMonitoredItemsRequest()
481 1
        request.Parameters = params
482 1
        data = self._uasocket.send_request(request)
483 1
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
484 1
        self.logger.debug(response)
485 1
        response.ResponseHeader.ServiceResult.check()
486 1
        return response.Results
487
488 1
    def delete_monitored_items(self, params):
489 1
        self.logger.info("delete_monitored_items")
490 1
        request = ua.DeleteMonitoredItemsRequest()
491 1
        request.Parameters = params
492 1
        data = self._uasocket.send_request(request)
493 1
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
494 1
        self.logger.debug(response)
495 1
        response.ResponseHeader.ServiceResult.check()
496 1
        return response.Results
497
498 1
    def add_nodes(self, nodestoadd):
499 1
        self.logger.info("add_nodes")
500 1
        request = ua.AddNodesRequest()
501 1
        request.Parameters.NodesToAdd = nodestoadd
502 1
        data = self._uasocket.send_request(request)
503 1
        response = struct_from_binary(ua.AddNodesResponse, data)
504 1
        self.logger.debug(response)
505 1
        response.ResponseHeader.ServiceResult.check()
506 1
        return response.Results
507
508 1
    def add_references(self, refs):
509 1
        self.logger.info("add_references")
510 1
        request = ua.AddReferencesRequest()
511 1
        request.Parameters.ReferencesToAdd = refs
512 1
        data = self._uasocket.send_request(request)
513 1
        response = struct_from_binary(ua.AddReferencesResponse, data)
514 1
        self.logger.debug(response)
515 1
        response.ResponseHeader.ServiceResult.check()
516 1
        return response.Results
517
518 1
    def delete_references(self, refs):
519 1
        self.logger.info("delete")
520 1
        request = ua.DeleteReferencesRequest()
521 1
        request.Parameters.ReferencesToDelete = refs
522 1
        data = self._uasocket.send_request(request)
523 1
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
524 1
        self.logger.debug(response)
525 1
        response.ResponseHeader.ServiceResult.check()
526 1
        return response.Parameters.Results
527
528
529 1
    def delete_nodes(self, params):
530 1
        self.logger.info("delete_nodes")
531 1
        request = ua.DeleteNodesRequest()
532 1
        request.Parameters = params
533 1
        data = self._uasocket.send_request(request)
534 1
        response = struct_from_binary(ua.DeleteNodesResponse, data)
535 1
        self.logger.debug(response)
536 1
        response.ResponseHeader.ServiceResult.check()
537 1
        return response.Results
538
539 1
    def call(self, methodstocall):
540 1
        request = ua.CallRequest()
541 1
        request.Parameters.MethodsToCall = methodstocall
542 1
        data = self._uasocket.send_request(request)
543 1
        response = struct_from_binary(ua.CallResponse, data)
544 1
        self.logger.debug(response)
545 1
        response.ResponseHeader.ServiceResult.check()
546 1
        return response.Results
547
548 1
    def history_read(self, params):
549
        self.logger.info("history_read")
550
        request = ua.HistoryReadRequest()
551
        request.Parameters = params
552
        data = self._uasocket.send_request(request)
553
        response = struct_from_binary(ua.HistoryReadResponse, data)
554
        self.logger.debug(response)
555
        response.ResponseHeader.ServiceResult.check()
556
        return response.Results
557
558 1
    def modify_monitored_items(self, params):
559
        self.logger.info("modify_monitored_items")
560
        request = ua.ModifyMonitoredItemsRequest()
561
        request.Parameters = params
562
        data = self._uasocket.send_request(request)
563
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
564
        self.logger.debug(response)
565
        response.ResponseHeader.ServiceResult.check()
566
        return response.Results
567