Completed
Push — master ( 53fd8d...02b764 )
by Olivier
09:09 queued 02:31
created

UASocketClient.connect_socket()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 9
ccs 6
cts 6
cp 1
crap 1
rs 9.6666
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14 1
from opcua.common.connection import SecureConnection
15
16
17 1
class UASocketClient(object):
18
    """
19
    handle socket connection and send ua messages
20
    timeout is the timeout used while waiting for an ua answer from server
21
    """
22 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
23 1
        self.logger = logging.getLogger(__name__ + ".Socket")
24 1
        self._thread = None
25 1
        self._lock = Lock()
26 1
        self.timeout = timeout
27 1
        self._socket = None
28 1
        self._do_stop = False
29 1
        self.authentication_token = ua.NodeId()
30 1
        self._request_id = 0
31 1
        self._request_handle = 0
32 1
        self._callbackmap = {}
33 1
        self._connection = SecureConnection(security_policy)
34
35 1
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39
        should not be necessary to call directly
40
        """
41 1
        self._thread = Thread(target=self._run)
42 1
        self._thread.start()
43
44 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48
        returns future
49
        """
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            self.logger.debug("Sending: %s", request)
53 1
            try:
54 1
                binreq = struct_to_binary(request)
55
            except:
56
                # reset reqeust handle if any error
57
                # see self._create_request_header
58
                self._request_handle -= 1
59
                raise
60 1
            self._request_id += 1
61 1
            future = Future()
62 1
            if callback:
63 1
                future.add_done_callback(callback)
64 1
            self._callbackmap[self._request_id] = future
65 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
66 1
            self._socket.write(msg)
67 1
        return future
68
69 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70
        """
71
        send request to server.
72
        timeout is the timeout written in ua header
73
        returns response object if no callback is provided
74
        """
75 1
        future = self._send_request(request, callback, timeout, message_type)
76 1
        if not callback:
77 1
            data = future.result(self.timeout)
78 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
79 1
            return data
80
81 1
    def check_answer(self, data, context):
82 1
        data = data.copy()
83 1
        typeid = nodeid_from_binary(data)
84 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
85 1
            self.logger.warning("ServiceFault from server received %s", context)
86 1
            hdr = struct_from_binary(ua.ResponseHeader, data)
87 1
            hdr.ServiceResult.check()
88
            return False
89 1
        return True
90
91 1
    def _run(self):
92 1
        self.logger.info("Thread started")
93 1
        while not self._do_stop:
94 1
            try:
95 1
                self._receive()
96 1
            except ua.utils.SocketClosedException:
97 1
                self.logger.info("Socket has closed connection")
98 1
                break
99
            except UaError:
100
                self.logger.exception("Protocol Error")
101 1
        self.logger.info("Thread ended")
102
103 1
    def _receive(self):
104 1
        msg = self._connection.receive_from_socket(self._socket)
105 1
        if msg is None:
106
            return
107 1
        elif isinstance(msg, ua.Message):
108 1
            self._call_callback(msg.request_id(), msg.body())
109 1
        elif isinstance(msg, ua.Acknowledge):
110 1
            self._call_callback(0, msg)
111
        elif isinstance(msg, ua.ErrorMessage):
112
            self.logger.warning("Received an error: %s", msg)
113
        else:
114
            raise ua.UaError("Unsupported message type: %s", msg)
115
116 1
    def _call_callback(self, request_id, body):
117 1
        with self._lock:
118 1
            future = self._callbackmap.pop(request_id, None)
119 1
            if future is None:
120
                raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
121 1
        future.set_result(body)
122
123 1
    def _create_request_header(self, timeout=1000):
124 1
        hdr = ua.RequestHeader()
125 1
        hdr.AuthenticationToken = self.authentication_token
126 1
        self._request_handle += 1
127 1
        hdr.RequestHandle = self._request_handle
128 1
        hdr.TimeoutHint = timeout
129 1
        return hdr
130
131 1
    def connect_socket(self, host, port):
132
        """
133
        connect to server socket and start receiving thread
134
        """
135 1
        self.logger.info("opening connection")
136 1
        sock = socket.create_connection((host, port))
137 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
138 1
        self._socket = ua.utils.SocketWrapper(sock)
139 1
        self.start()
140
141 1
    def disconnect_socket(self):
142 1
        self.logger.info("stop request")
143 1
        self._do_stop = True
144 1
        self._socket.socket.shutdown(socket.SHUT_RDWR)
145 1
        self._socket.socket.close()
146
147 1
    def send_hello(self, url, max_messagesize = 0, max_chunkcount = 0):
148 1
        hello = ua.Hello()
149 1
        hello.EndpointUrl = url
150 1
        hello.MaxMessageSize = max_messagesize
151 1
        hello.MaxChunkCount = max_chunkcount
152 1
        future = Future()
153 1
        with self._lock:
154 1
            self._callbackmap[0] = future
155 1
        binmsg = uatcp_to_binary(ua.MessageType.Hello, hello)
156 1
        self._socket.write(binmsg)
157 1
        ack = future.result(self.timeout)
158 1
        return ack
159
160 1
    def open_secure_channel(self, params):
161 1
        self.logger.info("open_secure_channel")
162 1
        request = ua.OpenSecureChannelRequest()
163 1
        request.Parameters = params
164 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
165
        
166
        # FIXME: we have a race condition here
167
        # we can get a packet with the new token id before we reach to store it..
168 1
        response = struct_from_binary(ua.OpenSecureChannelResponse, future.result(self.timeout))
169 1
        response.ResponseHeader.ServiceResult.check()
170 1
        self._connection.set_channel(response.Parameters)
171 1
        return response.Parameters
172
173 1
    def close_secure_channel(self):
174
        """
175
        close secure channel. It seems to trigger a shutdown of socket
176
        in most servers, so be prepare to reconnect.
177
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
178
        """
179 1
        self.logger.info("close_secure_channel")
180 1
        request = ua.CloseSecureChannelRequest()
181 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
182 1
        with self._lock:
183
            # don't expect any more answers
184 1
            future.cancel()
185 1
            self._callbackmap.clear()
186
187
        # some servers send a response here, most do not ... so we ignore
188
189
190 1
class UaClient(object):
191
192
    """
193
    low level OPC-UA client.
194
195
    It implements (almost) all methods defined in opcua spec
196
    taking in argument the structures defined in opcua spec.
197
198
    In this Python implementation  most of the structures are defined in
199
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
200
    """
201
202 1
    def __init__(self, timeout=1):
203 1
        self.logger = logging.getLogger(__name__)
204
        # _publishcallbacks should be accessed in recv thread only
205 1
        self._publishcallbacks = {}
206 1
        self._timeout = timeout
207 1
        self._uasocket = None
208 1
        self.security_policy = ua.SecurityPolicy()
209
210 1
    def set_security(self, policy):
211 1
        self.security_policy = policy
212
213 1
    def connect_socket(self, host, port):
214
        """
215
        connect to server socket and start receiving thread
216
        """
217 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self.security_policy)
218 1
        return self._uasocket.connect_socket(host, port)
219
220 1
    def disconnect_socket(self):
221 1
        return self._uasocket.disconnect_socket()
222
223 1
    def send_hello(self, url, max_messagesize = 0, max_chunkcount = 0):
224 1
        return self._uasocket.send_hello(url, max_messagesize, max_chunkcount)
225
226 1
    def open_secure_channel(self, params):
227 1
        return self._uasocket.open_secure_channel(params)
228
229 1
    def close_secure_channel(self):
230
        """
231
        close secure channel. It seems to trigger a shutdown of socket
232
        in most servers, so be prepare to reconnect
233
        """
234 1
        return self._uasocket.close_secure_channel()
235
236 1
    def create_session(self, parameters):
237 1
        self.logger.info("create_session")
238 1
        request = ua.CreateSessionRequest()
239 1
        request.Parameters = parameters
240 1
        data = self._uasocket.send_request(request)
241 1
        response = struct_from_binary(ua.CreateSessionResponse, data)
242 1
        self.logger.debug(response)
243 1
        response.ResponseHeader.ServiceResult.check()
244 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
245 1
        return response.Parameters
246
247 1
    def activate_session(self, parameters):
248 1
        self.logger.info("activate_session")
249 1
        request = ua.ActivateSessionRequest()
250 1
        request.Parameters = parameters
251 1
        data = self._uasocket.send_request(request)
252 1
        response = struct_from_binary(ua.ActivateSessionResponse, data)
253 1
        self.logger.debug(response)
254 1
        response.ResponseHeader.ServiceResult.check()
255 1
        return response.Parameters
256
257 1
    def close_session(self, deletesubscriptions):
258 1
        self.logger.info("close_session")
259 1
        request = ua.CloseSessionRequest()
260 1
        request.DeleteSubscriptions = deletesubscriptions
261 1
        data = self._uasocket.send_request(request)
262 1
        response = struct_from_binary(ua.CloseSessionResponse, data)
263 1
        try:
264 1
            response.ResponseHeader.ServiceResult.check()
265
        except BadSessionClosed:
266
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
267
            #          we can just ignore it therefore.
268
            #          Alternatively we could make sure that there are no publish requests in flight when
269
            #          closing the session.
270
            pass
271
272 1
    def browse(self, parameters):
273 1
        self.logger.info("browse")
274 1
        request = ua.BrowseRequest()
275 1
        request.Parameters = parameters
276 1
        data = self._uasocket.send_request(request)
277 1
        response = struct_from_binary(ua.BrowseResponse, data)
278 1
        self.logger.debug(response)
279 1
        response.ResponseHeader.ServiceResult.check()
280 1
        return response.Results
281
282 1
    def browse_next(self, parameters):
283
        self.logger.info("browse next")
284
        request = ua.BrowseNextRequest()
285
        request.Parameters = parameters
286
        data = self._uasocket.send_request(request)
287
        response = struct_from_binary(ua.BrowseNextResponse, data)
288
        self.logger.debug(response)
289
        response.ResponseHeader.ServiceResult.check()
290
        return response.Parameters.Results
291
292 1
    def read(self, parameters):
293 1
        self.logger.info("read")
294 1
        request = ua.ReadRequest()
295 1
        request.Parameters = parameters
296 1
        data = self._uasocket.send_request(request)
297 1
        response = struct_from_binary(ua.ReadResponse, data)
298 1
        self.logger.debug(response)
299 1
        response.ResponseHeader.ServiceResult.check()
300
        # cast to Enum attributes that need to
301 1
        for idx, rv in enumerate(parameters.NodesToRead):
302 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
303 1
                dv = response.Results[idx]
304 1
                if dv.StatusCode.is_good():
305 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
306 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
307 1
                dv = response.Results[idx]
308 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
309 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
310 1
        return response.Results
311
312 1
    def write(self, params):
313 1
        self.logger.info("read")
314 1
        request = ua.WriteRequest()
315 1
        request.Parameters = params
316 1
        data = self._uasocket.send_request(request)
317 1
        response = struct_from_binary(ua.WriteResponse, data)
318 1
        self.logger.debug(response)
319 1
        response.ResponseHeader.ServiceResult.check()
320 1
        return response.Results
321
322 1
    def get_endpoints(self, params):
323 1
        self.logger.info("get_endpoint")
324 1
        request = ua.GetEndpointsRequest()
325 1
        request.Parameters = params
326 1
        data = self._uasocket.send_request(request)
327 1
        response = struct_from_binary(ua.GetEndpointsResponse, data)
328 1
        self.logger.debug(response)
329 1
        response.ResponseHeader.ServiceResult.check()
330 1
        return response.Endpoints
331
332 1
    def find_servers(self, params):
333 1
        self.logger.info("find_servers")
334 1
        request = ua.FindServersRequest()
335 1
        request.Parameters = params
336 1
        data = self._uasocket.send_request(request)
337 1
        response = struct_from_binary(ua.FindServersResponse, data)
338 1
        self.logger.debug(response)
339 1
        response.ResponseHeader.ServiceResult.check()
340 1
        return response.Servers
341
342 1
    def find_servers_on_network(self, params):
343
        self.logger.info("find_servers_on_network")
344
        request = ua.FindServersOnNetworkRequest()
345
        request.Parameters = params
346
        data = self._uasocket.send_request(request)
347
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
348
        self.logger.debug(response)
349
        response.ResponseHeader.ServiceResult.check()
350
        return response.Parameters
351
352 1
    def register_server(self, registered_server):
353 1
        self.logger.info("register_server")
354 1
        request = ua.RegisterServerRequest()
355 1
        request.Server = registered_server
356 1
        data = self._uasocket.send_request(request)
357 1
        response = struct_from_binary(ua.RegisterServerResponse, data)
358 1
        self.logger.debug(response)
359 1
        response.ResponseHeader.ServiceResult.check()
360
        # nothing to return for this service
361
362 1
    def register_server2(self, params):
363
        self.logger.info("register_server2")
364
        request = ua.RegisterServer2Request()
365
        request.Parameters = params
366
        data = self._uasocket.send_request(request)
367
        response = struct_from_binary(ua.RegisterServer2Response, data)
368
        self.logger.debug(response)
369
        response.ResponseHeader.ServiceResult.check()
370
        return response.ConfigurationResults
371
372 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
373 1
        self.logger.info("translate_browsepath_to_nodeid")
374 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
375 1
        request.Parameters.BrowsePaths = browsepaths
376 1
        data = self._uasocket.send_request(request)
377 1
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
378 1
        self.logger.debug(response)
379 1
        response.ResponseHeader.ServiceResult.check()
380 1
        return response.Results
381
382 1
    def create_subscription(self, params, callback):
383 1
        self.logger.info("create_subscription")
384 1
        request = ua.CreateSubscriptionRequest()
385 1
        request.Parameters = params
386 1
        resp_fut = Future()
387 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
388 1
        self._uasocket.send_request(request, mycallbak)
389 1
        return resp_fut.result(self._timeout)
390
391 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
392 1
        self.logger.info("_create_subscription_callback")
393 1
        data = data_fut.result()
394 1
        response = struct_from_binary(ua.CreateSubscriptionResponse, data)
395 1
        self.logger.debug(response)
396 1
        response.ResponseHeader.ServiceResult.check()
397 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
398 1
        resp_fut.set_result(response.Parameters)
399
400 1
    def delete_subscriptions(self, subscriptionids):
401 1
        self.logger.info("delete_subscription")
402 1
        request = ua.DeleteSubscriptionsRequest()
403 1
        request.Parameters.SubscriptionIds = subscriptionids
404 1
        resp_fut = Future()
405 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
406 1
        self._uasocket.send_request(request, mycallbak)
407 1
        return resp_fut.result(self._timeout)
408
409 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
410 1
        self.logger.info("_delete_subscriptions_callback")
411 1
        data = data_fut.result()
412 1
        response = struct_from_binary(ua.DeleteSubscriptionsResponse, data)
413 1
        self.logger.debug(response)
414 1
        response.ResponseHeader.ServiceResult.check()
415 1
        for sid in subscriptionids:
416 1
            self._publishcallbacks.pop(sid)
417 1
        resp_fut.set_result(response.Results)
418
419 1
    def publish(self, acks=None):
420 1
        self.logger.info("publish")
421 1
        if acks is None:
422 1
            acks = []
423 1
        request = ua.PublishRequest()
424 1
        request.Parameters.SubscriptionAcknowledgements = acks
425 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=0)
426
427 1
    def _call_publish_callback(self, future):
428 1
        self.logger.info("call_publish_callback")
429 1
        data = future.result()
430
431
        # check if answer looks ok
432 1
        try:
433 1
            self._uasocket.check_answer(data, "while waiting for publish response")
434
        except BadTimeout: # Spec Part 4, 7.28
435
            self.publish()
436
            return
437
        except BadNoSubscription: # Spec Part 5, 13.8.1
438
            # BadNoSubscription is expected after deleting the last subscription.
439
            #
440
            # We should therefore also check for len(self._publishcallbacks) == 0, but
441
            # this gets us into trouble if a Publish response arrives before the
442
            # DeleteSubscription response.
443
            #
444
            # We could remove the callback already when sending the DeleteSubscription request,
445
            # but there are some legitimate reasons to keep them around, such as when the server
446
            # responds with "BadTimeout" and we should try again later instead of just removing
447
            # the subscription client-side.
448
            #
449
            # There are a variety of ways to act correctly, but the most practical solution seems
450
            # to be to just ignore any BadNoSubscription responses.
451
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
452
            return
453
454
        # parse publish response
455 1
        try:
456 1
            response = struct_from_binary(ua.PublishResponse, data)
457 1
            self.logger.debug(response)
458
        except Exception:
459
            # INFO: catching the exception here might be obsolete because we already
460
            #       catch BadTimeout above. However, it's not really clear what this code
461
            #       does so it stays in, doesn't seem to hurt.
462
            self.logger.exception("Error parsing notificatipn from server")
463
            self.publish([]) #send publish request ot server so he does stop sending notifications
464
            return
465
466
        # look for callback
467 1
        try:
468 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
469
        except KeyError:
470
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
471
            return
472
473
        # do callback
474 1
        try:
475 1
            callback(response.Parameters)
476
        except Exception:  # we call client code, catch everything!
477
            self.logger.exception("Exception while calling user callback: %s")
478
479 1
    def create_monitored_items(self, params):
480 1
        self.logger.info("create_monitored_items")
481 1
        request = ua.CreateMonitoredItemsRequest()
482 1
        request.Parameters = params
483 1
        data = self._uasocket.send_request(request)
484 1
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
485 1
        self.logger.debug(response)
486 1
        response.ResponseHeader.ServiceResult.check()
487 1
        return response.Results
488
489 1
    def delete_monitored_items(self, params):
490 1
        self.logger.info("delete_monitored_items")
491 1
        request = ua.DeleteMonitoredItemsRequest()
492 1
        request.Parameters = params
493 1
        data = self._uasocket.send_request(request)
494 1
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
495 1
        self.logger.debug(response)
496 1
        response.ResponseHeader.ServiceResult.check()
497 1
        return response.Results
498
499 1
    def add_nodes(self, nodestoadd):
500 1
        self.logger.info("add_nodes")
501 1
        request = ua.AddNodesRequest()
502 1
        request.Parameters.NodesToAdd = nodestoadd
503 1
        data = self._uasocket.send_request(request)
504 1
        response = struct_from_binary(ua.AddNodesResponse, data)
505 1
        self.logger.debug(response)
506 1
        response.ResponseHeader.ServiceResult.check()
507 1
        return response.Results
508
509 1
    def add_references(self, refs):
510 1
        self.logger.info("add_references")
511 1
        request = ua.AddReferencesRequest()
512 1
        request.Parameters.ReferencesToAdd = refs
513 1
        data = self._uasocket.send_request(request)
514 1
        response = struct_from_binary(ua.AddReferencesResponse, data)
515 1
        self.logger.debug(response)
516 1
        response.ResponseHeader.ServiceResult.check()
517 1
        return response.Results
518
519 1
    def delete_references(self, refs):
520 1
        self.logger.info("delete")
521 1
        request = ua.DeleteReferencesRequest()
522 1
        request.Parameters.ReferencesToDelete = refs
523 1
        data = self._uasocket.send_request(request)
524 1
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
525 1
        self.logger.debug(response)
526 1
        response.ResponseHeader.ServiceResult.check()
527 1
        return response.Parameters.Results
528
529
530 1
    def delete_nodes(self, params):
531 1
        self.logger.info("delete_nodes")
532 1
        request = ua.DeleteNodesRequest()
533 1
        request.Parameters = params
534 1
        data = self._uasocket.send_request(request)
535 1
        response = struct_from_binary(ua.DeleteNodesResponse, data)
536 1
        self.logger.debug(response)
537 1
        response.ResponseHeader.ServiceResult.check()
538 1
        return response.Results
539
540 1
    def call(self, methodstocall):
541 1
        request = ua.CallRequest()
542 1
        request.Parameters.MethodsToCall = methodstocall
543 1
        data = self._uasocket.send_request(request)
544 1
        response = struct_from_binary(ua.CallResponse, data)
545 1
        self.logger.debug(response)
546 1
        response.ResponseHeader.ServiceResult.check()
547 1
        return response.Results
548
549 1
    def history_read(self, params):
550
        self.logger.info("history_read")
551
        request = ua.HistoryReadRequest()
552
        request.Parameters = params
553
        data = self._uasocket.send_request(request)
554
        response = struct_from_binary(ua.HistoryReadResponse, data)
555
        self.logger.debug(response)
556
        response.ResponseHeader.ServiceResult.check()
557
        return response.Results
558
559 1
    def modify_monitored_items(self, params):
560
        self.logger.info("modify_monitored_items")
561
        request = ua.ModifyMonitoredItemsRequest()
562
        request.Parameters = params
563
        data = self._uasocket.send_request(request)
564
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
565
        self.logger.debug(response)
566
        response.ResponseHeader.ServiceResult.check()
567
        return response.Results
568